android native looper逻辑分析


代码示例来自github上android源码的master分支,随android版本演进实现可能会变化.

looper的源码

总结如上图,一个looper监听了三部分,最上面的是java的message队列,按时间排序,添加新的message可以通过nativeWake唤醒epoll,到时间后通过target.dispatchMessage(msg)处理message; 中间的fd列表,当有新内容时会被唤醒,新内容通过callback->handleEvent(fd, events, data)处理; 下面的native的message对列,按时间排序,添加新的message可以通过wake唤醒epoll,到时间后通过handler->handleMessage(message)处理message.

Looper.h Looper.cpp 源码位于libutil目录,可见该逻辑是独立于具体业务的.

//Looper.h
struct Message {
    int what;
};
typedef union epoll_data {
    void    *ptr;
    int      fd;
    uint32_t u32;
    uint64_t u64;
} epoll_data_t;
struct epoll_event {
    uint32_t     events;    /* Epoll events */
    epoll_data_t data;      /* User data variable */
};
struct Request {
    int fd;
    int ident;
    int events;
    int seq;
    sp<LooperCallback> callback;
    void* data;
    void Request::initEventItem(struct epoll_event* eventItem) const {
        int epollEvents = 0;
        if (events & EVENT_INPUT) epollEvents |= EPOLLIN;
        if (events & EVENT_OUTPUT) epollEvents |= EPOLLOUT;
        memset(eventItem, 0, sizeof(epoll_event)); // zero out unused members of data field union
        eventItem->events = epollEvents;
        eventItem->data.fd = fd;
    }
};
struct Response {
    int events;
    Request request;
};
struct MessageEnvelope {
    nsecs_t uptime;
    sp<MessageHandler> handler;
    Message message;
};
static pthread_once_t gTLSOnce = PTHREAD_ONCE_INIT;
static pthread_key_t gTLSKey = 0;
static const int EPOLL_SIZE_HINT = 8;
KeyedVector<int, Request> mRequests;  // guarded by mLock
Vector<Response> mResponses;
Vector<MessageEnvelope> mMessageEnvelopes; // guarded by mLock
Looper::Looper(bool allowNonCallbacks) {
    mWakeEventFd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
    AutoMutex _l(mLock);
    rebuildEpollLocked();
}
typedef int (*Looper_callbackFunc)(int fd, int events, void* data);

class MessageHandler : public virtual RefBase {
public:
    virtual void handleMessage(const Message& message) = 0;
};
class WeakMessageHandler : public MessageHandler {
public:
    virtual void handleMessage(const Message& message){
        sp<MessageHandler> handler = mHandler.promote();
        if (handler != NULL) {
            handler->handleMessage(message);
        }
    }
private:
    wp<MessageHandler> mHandler;
};
class LooperCallback : public virtual RefBase {
public:
    virtual int handleEvent(int fd, int events, void* data) = 0;
};

class SimpleLooperCallback : public LooperCallback {
public:
int SimpleLooperCallback::handleEvent(int fd, int events, void* data) {
    return mCallback(fd, events, data);
}

private:
    Looper_callbackFunc mCallback;
};
int Looper::addFd(int fd, int ident, int events, Looper_callbackFunc callback, void* data) {
    return addFd(fd, ident, events, callback ? new SimpleLooperCallback(callback) : NULL, data);
}
int Looper::addFd(int fd, int ident, int events, const sp<LooperCallback>& callback, void* data) {
    { // acquire lock
        AutoMutex _l(mLock);
        Request request;
        request.fd = fd;
        request.ident = ident;
        request.events = events;
        request.seq = mNextRequestSeq++;
        request.callback = callback;
        request.data = data;
        if (mNextRequestSeq == -1) mNextRequestSeq = 0; // reserve sequence number -1

        struct epoll_event eventItem;
        request.initEventItem(&eventItem);

        ssize_t requestIndex = mRequests.indexOfKey(fd);
        if (requestIndex < 0) {
            <big><b>int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, fd, & eventItem);</b></big>
            if (epollResult < 0) {
                ALOGE("Error adding epoll events for fd %d: %s", fd, strerror(errno));
                return -1;
            }
            <big><b>mRequests.add(fd, request);</b></big>
        } else {
            int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_MOD, fd, & eventItem);
            mRequests.replaceValueAt(requestIndex, request);
        }
    } // release lock
    return 1;
}
void Looper::sendMessageAtTime(nsecs_t uptime, const sp<MessageHandler>& handler,
        const Message& message) {
    size_t i = 0;
    { // acquire lock
        AutoMutex _l(mLock);
        size_t messageCount = mMessageEnvelopes.size();
        while (i < messageCount && uptime >= mMessageEnvelopes.itemAt(i).uptime) {
            i += 1;
        }
        MessageEnvelope messageEnvelope(uptime, handler, message);
        <big><b>mMessageEnvelopes.insertAt(messageEnvelope, i, 1);</b></big>
        if (mSendingMessage) {
            return;
        }
    } // release lock

    // Wake the poll loop only when we enqueue a new message at the head.
    if (i == 0) {
        wake();
    }
}
inline int pollOnce(int timeoutMillis) {
    return pollOnce(timeoutMillis, NULL, NULL, NULL);
}
int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) {
    int result = 0;
    for (;;) {
        if (result != 0) {
            return result;
        }
        result = pollInner(timeoutMillis);
    }
}

int Looper::pollInner(int timeoutMillis) {
    // Poll.
    int result = POLL_WAKE;
    // We are about to idle.
    mPolling = true;
    struct epoll_event eventItems[EPOLL_MAX_EVENTS];
    <big><b>int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis);</b></big>
    // No longer idling.
    mPolling = false;
    
    // Acquire lock.
    mLock.lock();

    // Check for poll timeout.
    if (eventCount == 0) {
        result = POLL_TIMEOUT;
        goto Done;
    }
    for (int i = 0; i < eventCount; i++) {
        int fd = eventItems[i].data.fd;
        uint32_t epollEvents = eventItems[i].events;
        if (fd == mWakeEventFd) {
            awoken();
        } else {
            ssize_t requestIndex = mRequests.indexOfKey(fd);
            if (requestIndex >= 0) {
                int events = 0;
                if (epollEvents & EPOLLIN) events |= EVENT_INPUT;
                if (epollEvents & EPOLLOUT) events |= EVENT_OUTPUT;
                if (epollEvents & EPOLLERR) events |= EVENT_ERROR;
                if (epollEvents & EPOLLHUP) events |= EVENT_HANGUP;
                <big><b>pushResponse(events, mRequests.valueAt(requestIndex));</b></big>
            }
        }
    }
Done: 
    // Invoke pending message callbacks.
    mNextMessageUptime = LLONG_MAX;
    while (mMessageEnvelopes.size() != 0) {
        nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
        const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(0);
        if (messageEnvelope.uptime <= now) {
            { // obtain handler
                sp&lt;MessageHandler&gt; handler = messageEnvelope.handler;
                Message message = messageEnvelope.message;
                mMessageEnvelopes.removeAt(0);
                mSendingMessage = true;
                mLock.unlock();
                <big><b>handler->handleMessage(message);</b></big>
            } // release handler

            mLock.lock();
            mSendingMessage = false;
            result = POLL_CALLBACK;
        } else {
            // The last message left at the head of the queue determines the next wakeup time.
            mNextMessageUptime = messageEnvelope.uptime;
            break;
        }
    }
    // Release lock.
    mLock.unlock();

    // Invoke all response callbacks.
    for (size_t i = 0; i < mResponses.size(); i++) {
        Response& response = mResponses.editItemAt(i);
        if (response.request.ident == POLL_CALLBACK) {
            int fd = response.request.fd;
            int events = response.events;
            void* data = response.request.data;
            <big><b>int callbackResult = response.request.callback->handleEvent(fd, events, data);</b></big>
            if (callbackResult == 0) {
                removeFd(fd, response.request.seq);
            }
            response.request.callback.clear();
            result = POLL_CALLBACK;
        }
    }
    return result;
}
void Looper::pushResponse(int events, const Request& request) {
    Response response;
    response.events = events;
    response.request = request;
    mResponses.push(response);
}

looper主要监听两种事件,一.通过epoll监听文件句柄(文件、socket、pipe),当有内容到文件句柄中时程序就会醒过来;二.looper handler的消息处理,通过epoll的超时机制处理.

looper使用fd的示例,按键事件的分发

InputDispatcherThread与应用程序之间传递事件时的通信----socket通信,触摸按键事件就是通过socket接入looper的,下面主要分析key、 touch事件是如何通过IPC传到当前view的,windowMange不涉及,activity内的不涉及.

ViewRootImpl.java

<big><b>WindowInputEventReceiver mInputEventReceiver = new WindowInputEventReceiver(mInputChannel,
        Looper.myLooper());</b></big>
final class WindowInputEventReceiver extends InputEventReceiver {
    public WindowInputEventReceiver(InputChannel inputChannel, Looper looper) {
        super(inputChannel, looper);
    }

    @Override
    public void onInputEvent(InputEvent event) {
        <big><b>enqueueInputEvent(event, this, 0, true);</b></big>
    }

    @Override
    public void onBatchedInputEventPending() {
        if (mUnbufferedInputDispatch) {
            super.onBatchedInputEventPending();
        } else {
            scheduleConsumeBatchedInput();
        }
    }

    @Override
    public void dispose() {
        unscheduleConsumeBatchedInput();
        super.dispose();
    }
}
void enqueueInputEvent(InputEvent event,
        InputEventReceiver receiver, int flags, boolean processImmediately) {
        <big><b>QueuedInputEvent q = obtainQueuedInputEvent(event, receiver, flags);</b></big>
            QueuedInputEvent last = mPendingInputEventTail;
    if (last == null) {
        <big><b>mPendingInputEventHead = q;</b></big>
        mPendingInputEventTail = q;
    } else {
        last.mNext = q;
        mPendingInputEventTail = q;
    }
    if (processImmediately) {
        <big><b>doProcessInputEvents();</b></big>
    } else {
        scheduleProcessInputEvents();
    }
}
void doProcessInputEvents() {
    // Deliver all pending input events in the queue.
    while (mPendingInputEventHead != null) {
        QueuedInputEvent q = mPendingInputEventHead;
        mPendingInputEventHead = q.mNext;
        <big><b>deliverInputEvent(q);</b></big>
    }
}
private void deliverInputEvent(QueuedInputEvent q) {
    InputStage stage;
    if (q.shouldSendToSynthesizer()) {
        stage = mSyntheticInputStage;
    } else {
        stage = q.shouldSkipIme() ? mFirstPostImeInputStage : mFirstInputStage;
    }
    if (stage != null) {
        <big><b>stage.deliver(q);</b></big>
    } else {
        finishInputEvent(q);
    }
}

abstract class InputStage {
    private final InputStage mNext;

    protected static final int FORWARD = 0;
    protected static final int FINISH_HANDLED = 1;
    protected static final int FINISH_NOT_HANDLED = 2;
    /**
     * Delivers an event to be processed.
     */
    public final void deliver(QueuedInputEvent q) {
        if ((q.mFlags & QueuedInputEvent.FLAG_FINISHED) != 0) {
            forward(q);
        } else if (shouldDropInputEvent(q)) {
            finish(q, false);
        } else {
            <big><b>apply(q, onProcess(q));</b></big>
        }
    }

}
final class ViewPostImeInputStage extends InputStage {
    public ViewPostImeInputStage(InputStage next) {
        super(next);
    }

    @Override
    protected int onProcess(QueuedInputEvent q) {
        if (q.mEvent instanceof KeyEvent) {
            <big><b>return processKeyEvent(q);</b></big>
        } else {
            final int source = q.mEvent.getSource();
            if ((source & InputDevice.SOURCE_CLASS_POINTER) != 0) {
                <big><b>return processPointerEvent(q);</b></big>
            } else if ((source & InputDevice.SOURCE_CLASS_TRACKBALL) != 0) {
                return processTrackballEvent(q);
            } else {
                return processGenericMotionEvent(q);
            }
        }
    }
    private int processKeyEvent(QueuedInputEvent q) {
        final KeyEvent event = (KeyEvent)q.mEvent;

        // Deliver the key to the view hierarchy.
        <big><b>if (mView.dispatchKeyEvent(event)) {</b></big>
            return FINISH_HANDLED;
        }
        return FORWARD;
    }
}

mView.dispatchKeyEvent(event)就把事件发给根view了,后续就view内部的事件分发了. InputEventReceiver.java

public InputEventReceiver(InputChannel inputChannel, Looper looper) {
    mInputChannel = inputChannel;
    mMessageQueue = looper.getQueue();
    mReceiverPtr = nativeInit(new WeakReference<InputEventReceiver>(this),
            inputChannel, mMessageQueue);

    mCloseGuard.open("dispose");
}
// Called from native code.
@SuppressWarnings("unused")
private void dispatchInputEvent(int seq, InputEvent event) {
    mSeqMap.put(event.getSequenceNumber(), seq);
    onInputEvent(event);
}

对应的c代码为 androidviewInputEventReceiver.cpp

static jlong nativeInit(JNIEnv* env, jclass clazz, jobject receiverWeak,
        jobject inputChannelObj, jobject messageQueueObj) {
    sp<InputChannel> inputChannel = android_view_InputChannel_getInputChannel(env,
            inputChannelObj);
    sp<MessageQueue> messageQueue = android_os_MessageQueue_getMessageQueue(env, messageQueueObj);
    sp<NativeInputEventReceiver> receiver = new NativeInputEventReceiver(env,
            receiverWeak, inputChannel, messageQueue);
    <big><b>status_t status = receiver->initialize();</b></big>
    return reinterpret_cast<jlong>(receiver.get());
}

status_t NativeInputEventReceiver::initialize() {
    <big><b>setFdEvents(ALOOPER_EVENT_INPUT);</b></big>
    return OK;
}
void NativeInputEventReceiver::setFdEvents(int events) {
    if (mFdEvents != events) {
        mFdEvents = events;
        int fd = mInputConsumer.getChannel()->getFd();
        if (events) {
            <big><b>mMessageQueue->getLooper()->addFd(fd, 0, events, this, NULL);</b></big>
        } else {
            mMessageQueue->getLooper()->removeFd(fd);
        }
    }
}
status_t NativeInputEventReceiver::initialize() {
    setFdEvents(ALOOPER_EVENT_INPUT);
    return OK;
}
int NativeInputEventReceiver::handleEvent(int receiveFd, int events, void* data) {
    if (events & ALOOPER_EVENT_INPUT) {
        JNIEnv* env = AndroidRuntime::getJNIEnv();
        <big><b>status_t status = consumeEvents(env, false /*consumeBatches*/, -1, NULL);</b></big>
        return status == OK || status == NO_MEMORY ? 1 : 0;
    }
    return 1;
}
status_t NativeInputEventReceiver::consumeEvents(JNIEnv* env,
        bool consumeBatches, nsecs_t frameTime, bool* outConsumedBatch) {
    ScopedLocalRef<jobject> receiverObj(env, NULL);
    bool skipCallbacks = false;
    for (;;) {
        uint32_t seq;
        InputEvent* inputEvent;
        <big><b>status_t status = mInputConsumer.consume(&mInputEventFactory,
                consumeBatches, frameTime, &seq, &inputEvent);</b></big>

        if (!skipCallbacks) {
            jobject inputEventObj;
            switch (inputEvent->getType()) {
            case AINPUT_EVENT_TYPE_KEY:
                inputEventObj = android_view_KeyEvent_fromNative(env,
                        static_cast<KeyEvent*>(inputEvent));
                break;
            case AINPUT_EVENT_TYPE_MOTION: {
                MotionEvent* motionEvent = static_cast<MotionEvent*>(inputEvent);
                inputEventObj = android_view_MotionEvent_obtainAsCopy(env, motionEvent);
                break;
            }
            default:
                assert(false); // InputConsumer should prevent this from ever happening
                inputEventObj = NULL;
            }
            if (inputEventObj) {
                if (kDebugDispatchCycle) {
                    ALOGD("channel '%s' ~ Dispatching input event.", getInputChannelName());
                }
                <big><b>env->CallVoidMethod(receiverObj.get(),
                        gInputEventReceiverClassInfo.dispatchInputEvent, seq, inputEventObj);</b></big>
                env->DeleteLocalRef(inputEventObj);
            } else {
                ALOGW("channel '%s' ~ Failed to obtain event object.", getInputChannelName());
                skipCallbacks = true;
            }
        }
    }
}

gInputEventReceiverClassInfo.dispatchInputEvent = GetMethodIDOrDie(env,
        gInputEventReceiverClassInfo.clazz,
        "dispatchInputEvent", "(ILandroid/view/InputEvent;)V");

InputTransport.cpp

status_t InputConsumer::consume(InputEventFactoryInterface* factory,
        bool consumeBatches, nsecs_t frameTime, uint32_t* outSeq, InputEvent** outEvent) {
    while (!*outEvent) {
        // Receive a fresh message.
        <big><b>status_t result = mChannel->receiveMessage(&mMsg);</b></big>
        switch (mMsg.header.type) {
        case InputMessage::TYPE_KEY: {
            KeyEvent* keyEvent = factory->createKeyEvent();
            if (!keyEvent) return NO_MEMORY;
            initializeKeyEvent(keyEvent, &mMsg);
            *outSeq = mMsg.body.key.seq;
            *outEvent = keyEvent;
            break;
        }
        case AINPUT_EVENT_TYPE_MOTION: {
            ssize_t batchIndex = findBatch(mMsg.body.motion.deviceId, mMsg.body.motion.source);
            MotionEvent* motionEvent = factory->createMotionEvent();
            if (! motionEvent) return NO_MEMORY;
            updateTouchState(&mMsg);
            initializeMotionEvent(motionEvent, &mMsg);
            *outSeq = mMsg.body.motion.seq;
            *outEvent = motionEvent;
            break;
        }
            return UNKNOWN_ERROR;
        }
    }
    return OK;
}
status_t InputChannel::receiveMessage(InputMessage* msg) {
    ssize_t nRead;
    do {
        <big><b>nRead = ::recv(mFd, msg, sizeof(InputMessage), MSG_DONTWAIT);</b></big>
    } while (nRead == -1 && errno == EINTR);
    return OK;
}
//创建socket
status_t InputChannel::openInputChannelPair(const String8& name,
        sp<InputChannel>& outServerChannel, sp<InputChannel>& outClientChannel) {
    int sockets[2];
    if (socketpair(AF_UNIX, SOCK_SEQPACKET, 0, sockets)) {
        status_t result = -errno;
        return result;
    }
    int bufferSize = SOCKET_BUFFER_SIZE;
    setsockopt(sockets[0], SOL_SOCKET, SO_SNDBUF, &bufferSize, sizeof(bufferSize));
    setsockopt(sockets[0], SOL_SOCKET, SO_RCVBUF, &bufferSize, sizeof(bufferSize));
    setsockopt(sockets[1], SOL_SOCKET, SO_SNDBUF, &bufferSize, sizeof(bufferSize));
    setsockopt(sockets[1], SOL_SOCKET, SO_RCVBUF, &bufferSize, sizeof(bufferSize));
    String8 serverChannelName = name;
    serverChannelName.append(" (server)");
    outServerChannel = new InputChannel(serverChannelName, sockets[0]);
    String8 clientChannelName = name;
    clientChannelName.append(" (client)");
    outClientChannel = new InputChannel(clientChannelName, sockets[1]);
    return OK;
}

Copyright © FengGuangtu 2017