代码示例来自github上android源码的master分支,随android版本演进实现可能会变化.
总结如上图,一个looper监听了三部分,最上面的是java的message队列,按时间排序,添加新的message可以通过nativeWake唤醒epoll,到时间后通过target.dispatchMessage(msg)处理message;
中间的fd列表,当有新内容时会被唤醒,新内容通过callback->handleEvent(fd, events, data)处理;
下面的native的message对列,按时间排序,添加新的message可以通过wake唤醒epoll,到时间后通过handler->handleMessage(message)处理message.
Looper.h Looper.cpp 源码位于libutil目录,可见该逻辑是独立于具体业务的.
//Looper.h
struct Message {
int what;
};
typedef union epoll_data {
void *ptr;
int fd;
uint32_t u32;
uint64_t u64;
} epoll_data_t;
struct epoll_event {
uint32_t events; /* Epoll events */
epoll_data_t data; /* User data variable */
};
struct Request {
int fd;
int ident;
int events;
int seq;
sp<LooperCallback> callback;
void* data;
void Request::initEventItem(struct epoll_event* eventItem) const {
int epollEvents = 0;
if (events & EVENT_INPUT) epollEvents |= EPOLLIN;
if (events & EVENT_OUTPUT) epollEvents |= EPOLLOUT;
memset(eventItem, 0, sizeof(epoll_event)); // zero out unused members of data field union
eventItem->events = epollEvents;
eventItem->data.fd = fd;
}
};
struct Response {
int events;
Request request;
};
struct MessageEnvelope {
nsecs_t uptime;
sp<MessageHandler> handler;
Message message;
};
static pthread_once_t gTLSOnce = PTHREAD_ONCE_INIT;
static pthread_key_t gTLSKey = 0;
static const int EPOLL_SIZE_HINT = 8;
KeyedVector<int, Request> mRequests; // guarded by mLock
Vector<Response> mResponses;
Vector<MessageEnvelope> mMessageEnvelopes; // guarded by mLock
Looper::Looper(bool allowNonCallbacks) {
mWakeEventFd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
AutoMutex _l(mLock);
rebuildEpollLocked();
}
typedef int (*Looper_callbackFunc)(int fd, int events, void* data);
class MessageHandler : public virtual RefBase {
public:
virtual void handleMessage(const Message& message) = 0;
};
class WeakMessageHandler : public MessageHandler {
public:
virtual void handleMessage(const Message& message){
sp<MessageHandler> handler = mHandler.promote();
if (handler != NULL) {
handler->handleMessage(message);
}
}
private:
wp<MessageHandler> mHandler;
};
class LooperCallback : public virtual RefBase {
public:
virtual int handleEvent(int fd, int events, void* data) = 0;
};
class SimpleLooperCallback : public LooperCallback {
public:
int SimpleLooperCallback::handleEvent(int fd, int events, void* data) {
return mCallback(fd, events, data);
}
private:
Looper_callbackFunc mCallback;
};
int Looper::addFd(int fd, int ident, int events, Looper_callbackFunc callback, void* data) {
return addFd(fd, ident, events, callback ? new SimpleLooperCallback(callback) : NULL, data);
}
int Looper::addFd(int fd, int ident, int events, const sp<LooperCallback>& callback, void* data) {
{ // acquire lock
AutoMutex _l(mLock);
Request request;
request.fd = fd;
request.ident = ident;
request.events = events;
request.seq = mNextRequestSeq++;
request.callback = callback;
request.data = data;
if (mNextRequestSeq == -1) mNextRequestSeq = 0; // reserve sequence number -1
struct epoll_event eventItem;
request.initEventItem(&eventItem);
ssize_t requestIndex = mRequests.indexOfKey(fd);
if (requestIndex < 0) {
<big><b>int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, fd, & eventItem);</b></big>
if (epollResult < 0) {
ALOGE("Error adding epoll events for fd %d: %s", fd, strerror(errno));
return -1;
}
<big><b>mRequests.add(fd, request);</b></big>
} else {
int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_MOD, fd, & eventItem);
mRequests.replaceValueAt(requestIndex, request);
}
} // release lock
return 1;
}
void Looper::sendMessageAtTime(nsecs_t uptime, const sp<MessageHandler>& handler,
const Message& message) {
size_t i = 0;
{ // acquire lock
AutoMutex _l(mLock);
size_t messageCount = mMessageEnvelopes.size();
while (i < messageCount && uptime >= mMessageEnvelopes.itemAt(i).uptime) {
i += 1;
}
MessageEnvelope messageEnvelope(uptime, handler, message);
<big><b>mMessageEnvelopes.insertAt(messageEnvelope, i, 1);</b></big>
if (mSendingMessage) {
return;
}
} // release lock
// Wake the poll loop only when we enqueue a new message at the head.
if (i == 0) {
wake();
}
}
inline int pollOnce(int timeoutMillis) {
return pollOnce(timeoutMillis, NULL, NULL, NULL);
}
int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) {
int result = 0;
for (;;) {
if (result != 0) {
return result;
}
result = pollInner(timeoutMillis);
}
}
int Looper::pollInner(int timeoutMillis) {
// Poll.
int result = POLL_WAKE;
// We are about to idle.
mPolling = true;
struct epoll_event eventItems[EPOLL_MAX_EVENTS];
<big><b>int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis);</b></big>
// No longer idling.
mPolling = false;
// Acquire lock.
mLock.lock();
// Check for poll timeout.
if (eventCount == 0) {
result = POLL_TIMEOUT;
goto Done;
}
for (int i = 0; i < eventCount; i++) {
int fd = eventItems[i].data.fd;
uint32_t epollEvents = eventItems[i].events;
if (fd == mWakeEventFd) {
awoken();
} else {
ssize_t requestIndex = mRequests.indexOfKey(fd);
if (requestIndex >= 0) {
int events = 0;
if (epollEvents & EPOLLIN) events |= EVENT_INPUT;
if (epollEvents & EPOLLOUT) events |= EVENT_OUTPUT;
if (epollEvents & EPOLLERR) events |= EVENT_ERROR;
if (epollEvents & EPOLLHUP) events |= EVENT_HANGUP;
<big><b>pushResponse(events, mRequests.valueAt(requestIndex));</b></big>
}
}
}
Done:
// Invoke pending message callbacks.
mNextMessageUptime = LLONG_MAX;
while (mMessageEnvelopes.size() != 0) {
nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(0);
if (messageEnvelope.uptime <= now) {
{ // obtain handler
sp<MessageHandler> handler = messageEnvelope.handler;
Message message = messageEnvelope.message;
mMessageEnvelopes.removeAt(0);
mSendingMessage = true;
mLock.unlock();
<big><b>handler->handleMessage(message);</b></big>
} // release handler
mLock.lock();
mSendingMessage = false;
result = POLL_CALLBACK;
} else {
// The last message left at the head of the queue determines the next wakeup time.
mNextMessageUptime = messageEnvelope.uptime;
break;
}
}
// Release lock.
mLock.unlock();
// Invoke all response callbacks.
for (size_t i = 0; i < mResponses.size(); i++) {
Response& response = mResponses.editItemAt(i);
if (response.request.ident == POLL_CALLBACK) {
int fd = response.request.fd;
int events = response.events;
void* data = response.request.data;
<big><b>int callbackResult = response.request.callback->handleEvent(fd, events, data);</b></big>
if (callbackResult == 0) {
removeFd(fd, response.request.seq);
}
response.request.callback.clear();
result = POLL_CALLBACK;
}
}
return result;
}
void Looper::pushResponse(int events, const Request& request) {
Response response;
response.events = events;
response.request = request;
mResponses.push(response);
}
looper主要监听两种事件,一.通过epoll监听文件句柄(文件、socket、pipe),当有内容到文件句柄中时程序就会醒过来;二.looper handler的消息处理,通过epoll的超时机制处理.
InputDispatcherThread与应用程序之间传递事件时的通信----socket通信,触摸按键事件就是通过socket接入looper的,下面主要分析key、 touch事件是如何通过IPC传到当前view的,windowMange不涉及,activity内的不涉及.
<big><b>WindowInputEventReceiver mInputEventReceiver = new WindowInputEventReceiver(mInputChannel,
Looper.myLooper());</b></big>
final class WindowInputEventReceiver extends InputEventReceiver {
public WindowInputEventReceiver(InputChannel inputChannel, Looper looper) {
super(inputChannel, looper);
}
@Override
public void onInputEvent(InputEvent event) {
<big><b>enqueueInputEvent(event, this, 0, true);</b></big>
}
@Override
public void onBatchedInputEventPending() {
if (mUnbufferedInputDispatch) {
super.onBatchedInputEventPending();
} else {
scheduleConsumeBatchedInput();
}
}
@Override
public void dispose() {
unscheduleConsumeBatchedInput();
super.dispose();
}
}
void enqueueInputEvent(InputEvent event,
InputEventReceiver receiver, int flags, boolean processImmediately) {
<big><b>QueuedInputEvent q = obtainQueuedInputEvent(event, receiver, flags);</b></big>
QueuedInputEvent last = mPendingInputEventTail;
if (last == null) {
<big><b>mPendingInputEventHead = q;</b></big>
mPendingInputEventTail = q;
} else {
last.mNext = q;
mPendingInputEventTail = q;
}
if (processImmediately) {
<big><b>doProcessInputEvents();</b></big>
} else {
scheduleProcessInputEvents();
}
}
void doProcessInputEvents() {
// Deliver all pending input events in the queue.
while (mPendingInputEventHead != null) {
QueuedInputEvent q = mPendingInputEventHead;
mPendingInputEventHead = q.mNext;
<big><b>deliverInputEvent(q);</b></big>
}
}
private void deliverInputEvent(QueuedInputEvent q) {
InputStage stage;
if (q.shouldSendToSynthesizer()) {
stage = mSyntheticInputStage;
} else {
stage = q.shouldSkipIme() ? mFirstPostImeInputStage : mFirstInputStage;
}
if (stage != null) {
<big><b>stage.deliver(q);</b></big>
} else {
finishInputEvent(q);
}
}
abstract class InputStage {
private final InputStage mNext;
protected static final int FORWARD = 0;
protected static final int FINISH_HANDLED = 1;
protected static final int FINISH_NOT_HANDLED = 2;
/**
* Delivers an event to be processed.
*/
public final void deliver(QueuedInputEvent q) {
if ((q.mFlags & QueuedInputEvent.FLAG_FINISHED) != 0) {
forward(q);
} else if (shouldDropInputEvent(q)) {
finish(q, false);
} else {
<big><b>apply(q, onProcess(q));</b></big>
}
}
}
final class ViewPostImeInputStage extends InputStage {
public ViewPostImeInputStage(InputStage next) {
super(next);
}
@Override
protected int onProcess(QueuedInputEvent q) {
if (q.mEvent instanceof KeyEvent) {
<big><b>return processKeyEvent(q);</b></big>
} else {
final int source = q.mEvent.getSource();
if ((source & InputDevice.SOURCE_CLASS_POINTER) != 0) {
<big><b>return processPointerEvent(q);</b></big>
} else if ((source & InputDevice.SOURCE_CLASS_TRACKBALL) != 0) {
return processTrackballEvent(q);
} else {
return processGenericMotionEvent(q);
}
}
}
private int processKeyEvent(QueuedInputEvent q) {
final KeyEvent event = (KeyEvent)q.mEvent;
// Deliver the key to the view hierarchy.
<big><b>if (mView.dispatchKeyEvent(event)) {</b></big>
return FINISH_HANDLED;
}
return FORWARD;
}
}
mView.dispatchKeyEvent(event)就把事件发给根view了,后续就view内部的事件分发了. InputEventReceiver.java
public InputEventReceiver(InputChannel inputChannel, Looper looper) {
mInputChannel = inputChannel;
mMessageQueue = looper.getQueue();
mReceiverPtr = nativeInit(new WeakReference<InputEventReceiver>(this),
inputChannel, mMessageQueue);
mCloseGuard.open("dispose");
}
// Called from native code.
@SuppressWarnings("unused")
private void dispatchInputEvent(int seq, InputEvent event) {
mSeqMap.put(event.getSequenceNumber(), seq);
onInputEvent(event);
}
对应的c代码为 androidviewInputEventReceiver.cpp
static jlong nativeInit(JNIEnv* env, jclass clazz, jobject receiverWeak,
jobject inputChannelObj, jobject messageQueueObj) {
sp<InputChannel> inputChannel = android_view_InputChannel_getInputChannel(env,
inputChannelObj);
sp<MessageQueue> messageQueue = android_os_MessageQueue_getMessageQueue(env, messageQueueObj);
sp<NativeInputEventReceiver> receiver = new NativeInputEventReceiver(env,
receiverWeak, inputChannel, messageQueue);
<big><b>status_t status = receiver->initialize();</b></big>
return reinterpret_cast<jlong>(receiver.get());
}
status_t NativeInputEventReceiver::initialize() {
<big><b>setFdEvents(ALOOPER_EVENT_INPUT);</b></big>
return OK;
}
void NativeInputEventReceiver::setFdEvents(int events) {
if (mFdEvents != events) {
mFdEvents = events;
int fd = mInputConsumer.getChannel()->getFd();
if (events) {
<big><b>mMessageQueue->getLooper()->addFd(fd, 0, events, this, NULL);</b></big>
} else {
mMessageQueue->getLooper()->removeFd(fd);
}
}
}
status_t NativeInputEventReceiver::initialize() {
setFdEvents(ALOOPER_EVENT_INPUT);
return OK;
}
int NativeInputEventReceiver::handleEvent(int receiveFd, int events, void* data) {
if (events & ALOOPER_EVENT_INPUT) {
JNIEnv* env = AndroidRuntime::getJNIEnv();
<big><b>status_t status = consumeEvents(env, false /*consumeBatches*/, -1, NULL);</b></big>
return status == OK || status == NO_MEMORY ? 1 : 0;
}
return 1;
}
status_t NativeInputEventReceiver::consumeEvents(JNIEnv* env,
bool consumeBatches, nsecs_t frameTime, bool* outConsumedBatch) {
ScopedLocalRef<jobject> receiverObj(env, NULL);
bool skipCallbacks = false;
for (;;) {
uint32_t seq;
InputEvent* inputEvent;
<big><b>status_t status = mInputConsumer.consume(&mInputEventFactory,
consumeBatches, frameTime, &seq, &inputEvent);</b></big>
if (!skipCallbacks) {
jobject inputEventObj;
switch (inputEvent->getType()) {
case AINPUT_EVENT_TYPE_KEY:
inputEventObj = android_view_KeyEvent_fromNative(env,
static_cast<KeyEvent*>(inputEvent));
break;
case AINPUT_EVENT_TYPE_MOTION: {
MotionEvent* motionEvent = static_cast<MotionEvent*>(inputEvent);
inputEventObj = android_view_MotionEvent_obtainAsCopy(env, motionEvent);
break;
}
default:
assert(false); // InputConsumer should prevent this from ever happening
inputEventObj = NULL;
}
if (inputEventObj) {
if (kDebugDispatchCycle) {
ALOGD("channel '%s' ~ Dispatching input event.", getInputChannelName());
}
<big><b>env->CallVoidMethod(receiverObj.get(),
gInputEventReceiverClassInfo.dispatchInputEvent, seq, inputEventObj);</b></big>
env->DeleteLocalRef(inputEventObj);
} else {
ALOGW("channel '%s' ~ Failed to obtain event object.", getInputChannelName());
skipCallbacks = true;
}
}
}
}
gInputEventReceiverClassInfo.dispatchInputEvent = GetMethodIDOrDie(env,
gInputEventReceiverClassInfo.clazz,
"dispatchInputEvent", "(ILandroid/view/InputEvent;)V");
status_t InputConsumer::consume(InputEventFactoryInterface* factory,
bool consumeBatches, nsecs_t frameTime, uint32_t* outSeq, InputEvent** outEvent) {
while (!*outEvent) {
// Receive a fresh message.
<big><b>status_t result = mChannel->receiveMessage(&mMsg);</b></big>
switch (mMsg.header.type) {
case InputMessage::TYPE_KEY: {
KeyEvent* keyEvent = factory->createKeyEvent();
if (!keyEvent) return NO_MEMORY;
initializeKeyEvent(keyEvent, &mMsg);
*outSeq = mMsg.body.key.seq;
*outEvent = keyEvent;
break;
}
case AINPUT_EVENT_TYPE_MOTION: {
ssize_t batchIndex = findBatch(mMsg.body.motion.deviceId, mMsg.body.motion.source);
MotionEvent* motionEvent = factory->createMotionEvent();
if (! motionEvent) return NO_MEMORY;
updateTouchState(&mMsg);
initializeMotionEvent(motionEvent, &mMsg);
*outSeq = mMsg.body.motion.seq;
*outEvent = motionEvent;
break;
}
return UNKNOWN_ERROR;
}
}
return OK;
}
status_t InputChannel::receiveMessage(InputMessage* msg) {
ssize_t nRead;
do {
<big><b>nRead = ::recv(mFd, msg, sizeof(InputMessage), MSG_DONTWAIT);</b></big>
} while (nRead == -1 && errno == EINTR);
return OK;
}
//创建socket
status_t InputChannel::openInputChannelPair(const String8& name,
sp<InputChannel>& outServerChannel, sp<InputChannel>& outClientChannel) {
int sockets[2];
if (socketpair(AF_UNIX, SOCK_SEQPACKET, 0, sockets)) {
status_t result = -errno;
return result;
}
int bufferSize = SOCKET_BUFFER_SIZE;
setsockopt(sockets[0], SOL_SOCKET, SO_SNDBUF, &bufferSize, sizeof(bufferSize));
setsockopt(sockets[0], SOL_SOCKET, SO_RCVBUF, &bufferSize, sizeof(bufferSize));
setsockopt(sockets[1], SOL_SOCKET, SO_SNDBUF, &bufferSize, sizeof(bufferSize));
setsockopt(sockets[1], SOL_SOCKET, SO_RCVBUF, &bufferSize, sizeof(bufferSize));
String8 serverChannelName = name;
serverChannelName.append(" (server)");
outServerChannel = new InputChannel(serverChannelName, sockets[0]);
String8 clientChannelName = name;
clientChannelName.append(" (client)");
outClientChannel = new InputChannel(clientChannelName, sockets[1]);
return OK;
}