Dispatch multiple touch events in parallel.

This change enables the input dispatcher to send multiple touch
events to an application without waiting for them to be acknowledged.
Essentially this is a variation on the old streaming optimization
but it is much more comprehensive.  Event dispatch will stall as
soon as 0.5sec of unacknowledged events are accumulated or a
focused event (such as a key event) needs to be delivered.

Streaming input events makes a tremendous difference in application
performance.  The next step will be to enable batching of input
events on the client side once again.

This is part of a series of changes to improve input system pipelining.

Bug: 5963420
Change-Id: I025df90c06165d719fcca7f63eed322a5cce4a78
This commit is contained in:
Jeff Brown
2012-02-06 19:12:47 -08:00
parent 8b4be56030
commit d1c48a0525
3 changed files with 199 additions and 158 deletions

View File

@@ -28,6 +28,13 @@
namespace android {
// Socket buffer size. The default is typically about 128KB, which is much larger than
// we really need. So we make it smaller. It just needs to be big enough to hold
// a few dozen large multi-finger motion events in the case where an application gets
// behind processing touches.
static const size_t SOCKET_BUFFER_SIZE = 32 * 1024;
// --- InputMessage ---
bool InputMessage::isValid(size_t actualSize) const {
@@ -93,6 +100,12 @@ status_t InputChannel::openInputChannelPair(const String8& name,
return result;
}
int bufferSize = SOCKET_BUFFER_SIZE;
setsockopt(sockets[0], SOL_SOCKET, SO_SNDBUF, &bufferSize, sizeof(bufferSize));
setsockopt(sockets[0], SOL_SOCKET, SO_RCVBUF, &bufferSize, sizeof(bufferSize));
setsockopt(sockets[1], SOL_SOCKET, SO_SNDBUF, &bufferSize, sizeof(bufferSize));
setsockopt(sockets[1], SOL_SOCKET, SO_RCVBUF, &bufferSize, sizeof(bufferSize));
String8 serverChannelName = name;
serverChannelName.append(" (server)");
outServerChannel = new InputChannel(serverChannelName, sockets[0]);

View File

@@ -70,6 +70,12 @@ const nsecs_t APP_SWITCH_TIMEOUT = 500 * 1000000LL; // 0.5sec
// before considering it stale and dropping it.
const nsecs_t STALE_EVENT_TIMEOUT = 10000 * 1000000LL; // 10sec
// Amount of time to allow touch events to be streamed out to a connection before requiring
// that the first event be finished. This value extends the ANR timeout by the specified
// amount. For example, if streaming is allowed to get ahead by one second relative to the
// queue of waiting unfinished events, then ANRs will similarly be delayed by one second.
const nsecs_t STREAM_AHEAD_EVENT_TIMEOUT = 500 * 1000000LL; // 0.5sec
static inline nsecs_t now() {
return systemTime(SYSTEM_TIME_MONOTONIC);
@@ -1035,7 +1041,8 @@ int32_t InputDispatcher::findFocusedWindowTargetsLocked(nsecs_t currentTime,
}
// If the currently focused window is still working on previous events then keep waiting.
if (! isWindowFinishedWithPreviousInputLocked(mFocusedWindowHandle)) {
if (!isWindowReadyForMoreInputLocked(currentTime,
mFocusedWindowHandle, true /*focusedEvent*/)) {
#if DEBUG_FOCUS
ALOGD("Waiting because focused window still processing previous input.");
#endif
@@ -1398,7 +1405,8 @@ int32_t InputDispatcher::findTouchedWindowTargetsLocked(nsecs_t currentTime,
}
// If the touched window is still working on previous events then keep waiting.
if (! isWindowFinishedWithPreviousInputLocked(touchedWindow.windowHandle)) {
if (!isWindowReadyForMoreInputLocked(currentTime,
touchedWindow.windowHandle, false /*focusedEvent*/)) {
#if DEBUG_FOCUS
ALOGD("Waiting because touched window still processing previous input.");
#endif
@@ -1609,15 +1617,33 @@ bool InputDispatcher::isWindowObscuredAtPointLocked(
return false;
}
bool InputDispatcher::isWindowFinishedWithPreviousInputLocked(
const sp<InputWindowHandle>& windowHandle) {
bool InputDispatcher::isWindowReadyForMoreInputLocked(nsecs_t currentTime,
const sp<InputWindowHandle>& windowHandle, bool focusedEvent) {
ssize_t connectionIndex = getConnectionIndexLocked(windowHandle->getInputChannel());
if (connectionIndex >= 0) {
sp<Connection> connection = mConnectionsByFd.valueAt(connectionIndex);
return connection->outboundQueue.isEmpty();
} else {
return true;
if (connection->inputPublisherBlocked) {
return false;
}
if (focusedEvent) {
// If the event relies on input focus (such as a key event), then we must
// wait for all previous events to complete before delivering it because they
// may move focus elsewhere.
return connection->outboundQueue.isEmpty()
&& connection->waitQueue.isEmpty();
}
// Touch events can always be sent to a window because the user intended to touch
// whatever was visible immediately. Even if focus changes or a new window appears,
// the touch event was meant for whatever happened to be on screen at the time.
// However, if the wait queue is piling up with lots of events, then hold up
// new events for awhile. This condition ensures that ANRs still work.
if (!connection->waitQueue.isEmpty()
&& currentTime >= connection->waitQueue.head->eventEntry->eventTime
+ STREAM_AHEAD_EVENT_TIMEOUT) {
return false;
}
}
return true;
}
String8 InputDispatcher::getApplicationWindowLabelLocked(
@@ -1834,100 +1860,110 @@ void InputDispatcher::startDispatchCycleLocked(nsecs_t currentTime,
connection->getInputChannelName());
#endif
ALOG_ASSERT(connection->status == Connection::STATUS_NORMAL);
ALOG_ASSERT(! connection->outboundQueue.isEmpty());
while (connection->status == Connection::STATUS_NORMAL
&& !connection->outboundQueue.isEmpty()) {
DispatchEntry* dispatchEntry = connection->outboundQueue.head;
DispatchEntry* dispatchEntry = connection->outboundQueue.head;
ALOG_ASSERT(! dispatchEntry->inProgress);
// Publish the event.
status_t status;
EventEntry* eventEntry = dispatchEntry->eventEntry;
switch (eventEntry->type) {
case EventEntry::TYPE_KEY: {
KeyEntry* keyEntry = static_cast<KeyEntry*>(eventEntry);
// Mark the dispatch entry as in progress.
dispatchEntry->inProgress = true;
// Publish the key event.
status = connection->inputPublisher.publishKeyEvent(
keyEntry->deviceId, keyEntry->source,
dispatchEntry->resolvedAction, dispatchEntry->resolvedFlags,
keyEntry->keyCode, keyEntry->scanCode,
keyEntry->metaState, keyEntry->repeatCount, keyEntry->downTime,
keyEntry->eventTime);
break;
}
// Publish the event.
status_t status;
EventEntry* eventEntry = dispatchEntry->eventEntry;
switch (eventEntry->type) {
case EventEntry::TYPE_KEY: {
KeyEntry* keyEntry = static_cast<KeyEntry*>(eventEntry);
case EventEntry::TYPE_MOTION: {
MotionEntry* motionEntry = static_cast<MotionEntry*>(eventEntry);
// Publish the key event.
status = connection->inputPublisher.publishKeyEvent(
keyEntry->deviceId, keyEntry->source,
dispatchEntry->resolvedAction, dispatchEntry->resolvedFlags,
keyEntry->keyCode, keyEntry->scanCode,
keyEntry->metaState, keyEntry->repeatCount, keyEntry->downTime,
keyEntry->eventTime);
PointerCoords scaledCoords[MAX_POINTERS];
const PointerCoords* usingCoords = motionEntry->pointerCoords;
if (status) {
ALOGE("channel '%s' ~ Could not publish key event, "
"status=%d", connection->getInputChannelName(), status);
abortBrokenDispatchCycleLocked(currentTime, connection, true /*notify*/);
// Set the X and Y offset depending on the input source.
float xOffset, yOffset, scaleFactor;
if ((motionEntry->source & AINPUT_SOURCE_CLASS_POINTER)
&& !(dispatchEntry->targetFlags & InputTarget::FLAG_ZERO_COORDS)) {
scaleFactor = dispatchEntry->scaleFactor;
xOffset = dispatchEntry->xOffset * scaleFactor;
yOffset = dispatchEntry->yOffset * scaleFactor;
if (scaleFactor != 1.0f) {
for (size_t i = 0; i < motionEntry->pointerCount; i++) {
scaledCoords[i] = motionEntry->pointerCoords[i];
scaledCoords[i].scale(scaleFactor);
}
usingCoords = scaledCoords;
}
} else {
xOffset = 0.0f;
yOffset = 0.0f;
scaleFactor = 1.0f;
// We don't want the dispatch target to know.
if (dispatchEntry->targetFlags & InputTarget::FLAG_ZERO_COORDS) {
for (size_t i = 0; i < motionEntry->pointerCount; i++) {
scaledCoords[i].clear();
}
usingCoords = scaledCoords;
}
}
// Publish the motion event.
status = connection->inputPublisher.publishMotionEvent(
motionEntry->deviceId, motionEntry->source,
dispatchEntry->resolvedAction, dispatchEntry->resolvedFlags,
motionEntry->edgeFlags, motionEntry->metaState, motionEntry->buttonState,
xOffset, yOffset,
motionEntry->xPrecision, motionEntry->yPrecision,
motionEntry->downTime, motionEntry->eventTime,
motionEntry->pointerCount, motionEntry->pointerProperties,
usingCoords);
break;
}
default:
ALOG_ASSERT(false);
return;
}
break;
}
case EventEntry::TYPE_MOTION: {
MotionEntry* motionEntry = static_cast<MotionEntry*>(eventEntry);
PointerCoords scaledCoords[MAX_POINTERS];
const PointerCoords* usingCoords = motionEntry->pointerCoords;
// Set the X and Y offset depending on the input source.
float xOffset, yOffset, scaleFactor;
if (motionEntry->source & AINPUT_SOURCE_CLASS_POINTER
&& !(dispatchEntry->targetFlags & InputTarget::FLAG_ZERO_COORDS)) {
scaleFactor = dispatchEntry->scaleFactor;
xOffset = dispatchEntry->xOffset * scaleFactor;
yOffset = dispatchEntry->yOffset * scaleFactor;
if (scaleFactor != 1.0f) {
for (size_t i = 0; i < motionEntry->pointerCount; i++) {
scaledCoords[i] = motionEntry->pointerCoords[i];
scaledCoords[i].scale(scaleFactor);
}
usingCoords = scaledCoords;
}
} else {
xOffset = 0.0f;
yOffset = 0.0f;
scaleFactor = 1.0f;
// We don't want the dispatch target to know.
if (dispatchEntry->targetFlags & InputTarget::FLAG_ZERO_COORDS) {
for (size_t i = 0; i < motionEntry->pointerCount; i++) {
scaledCoords[i].clear();
}
usingCoords = scaledCoords;
}
}
// Publish the motion event.
status = connection->inputPublisher.publishMotionEvent(
motionEntry->deviceId, motionEntry->source,
dispatchEntry->resolvedAction, dispatchEntry->resolvedFlags,
motionEntry->edgeFlags, motionEntry->metaState, motionEntry->buttonState,
xOffset, yOffset,
motionEntry->xPrecision, motionEntry->yPrecision,
motionEntry->downTime, motionEntry->eventTime,
motionEntry->pointerCount, motionEntry->pointerProperties,
usingCoords);
// Check the result.
if (status) {
ALOGE("channel '%s' ~ Could not publish motion event, "
"status=%d", connection->getInputChannelName(), status);
abortBrokenDispatchCycleLocked(currentTime, connection, true /*notify*/);
if (status == WOULD_BLOCK) {
if (connection->waitQueue.isEmpty()) {
ALOGE("channel '%s' ~ Could not publish event because the pipe is full. "
"This is unexpected because the wait queue is empty, so the pipe "
"should be empty and we shouldn't have any problems writing an "
"event to it, status=%d", connection->getInputChannelName(), status);
abortBrokenDispatchCycleLocked(currentTime, connection, true /*notify*/);
} else {
// Pipe is full and we are waiting for the app to finish process some events
// before sending more events to it.
#if DEBUG_DISPATCH_CYCLE
ALOGD("channel '%s' ~ Could not publish event because the pipe is full, "
"waiting for the application to catch up",
connection->getInputChannelName());
#endif
connection->inputPublisherBlocked = true;
}
} else {
ALOGE("channel '%s' ~ Could not publish event due to an unexpected error, "
"status=%d", connection->getInputChannelName(), status);
abortBrokenDispatchCycleLocked(currentTime, connection, true /*notify*/);
}
return;
}
break;
}
default: {
ALOG_ASSERT(false);
// Re-enqueue the event on the wait queue.
connection->outboundQueue.dequeue(dispatchEntry);
connection->waitQueue.enqueueAtTail(dispatchEntry);
}
}
// Notify other system components.
onDispatchCycleStartedLocked(currentTime, connection);
}
void InputDispatcher::finishDispatchCycleLocked(nsecs_t currentTime,
@@ -1937,6 +1973,8 @@ void InputDispatcher::finishDispatchCycleLocked(nsecs_t currentTime,
connection->getInputChannelName(), toString(handled));
#endif
connection->inputPublisherBlocked = false;
if (connection->status == Connection::STATUS_BROKEN
|| connection->status == Connection::STATUS_ZOMBIE) {
return;
@@ -1946,28 +1984,6 @@ void InputDispatcher::finishDispatchCycleLocked(nsecs_t currentTime,
onDispatchCycleFinishedLocked(currentTime, connection, handled);
}
void InputDispatcher::startNextDispatchCycleLocked(nsecs_t currentTime,
const sp<Connection>& connection) {
// Start the next dispatch cycle for this connection.
while (! connection->outboundQueue.isEmpty()) {
DispatchEntry* dispatchEntry = connection->outboundQueue.head;
if (dispatchEntry->inProgress) {
// Finished.
connection->outboundQueue.dequeueAtHead();
if (dispatchEntry->hasForegroundTarget()) {
decrementPendingForegroundDispatchesLocked(dispatchEntry->eventEntry);
}
delete dispatchEntry;
} else {
// If the head is not in progress, then we must have already dequeued the in
// progress event, which means we actually aborted it.
// So just start the next event for this connection.
startDispatchCycleLocked(currentTime, connection);
return;
}
}
}
void InputDispatcher::abortBrokenDispatchCycleLocked(nsecs_t currentTime,
const sp<Connection>& connection, bool notify) {
#if DEBUG_DISPATCH_CYCLE
@@ -1975,8 +1991,9 @@ void InputDispatcher::abortBrokenDispatchCycleLocked(nsecs_t currentTime,
connection->getInputChannelName(), toString(notify));
#endif
// Clear the outbound queue.
drainOutboundQueueLocked(connection.get());
// Clear the dispatch queues.
drainDispatchQueueLocked(&connection->outboundQueue);
drainDispatchQueueLocked(&connection->waitQueue);
// The connection appears to be unrecoverably broken.
// Ignore already broken or zombie connections.
@@ -1990,16 +2007,20 @@ void InputDispatcher::abortBrokenDispatchCycleLocked(nsecs_t currentTime,
}
}
void InputDispatcher::drainOutboundQueueLocked(Connection* connection) {
while (! connection->outboundQueue.isEmpty()) {
DispatchEntry* dispatchEntry = connection->outboundQueue.dequeueAtHead();
if (dispatchEntry->hasForegroundTarget()) {
decrementPendingForegroundDispatchesLocked(dispatchEntry->eventEntry);
}
delete dispatchEntry;
void InputDispatcher::drainDispatchQueueLocked(Queue<DispatchEntry>* queue) {
while (!queue->isEmpty()) {
DispatchEntry* dispatchEntry = queue->dequeueAtHead();
releaseDispatchEntryLocked(dispatchEntry);
}
}
void InputDispatcher::releaseDispatchEntryLocked(DispatchEntry* dispatchEntry) {
if (dispatchEntry->hasForegroundTarget()) {
decrementPendingForegroundDispatchesLocked(dispatchEntry->eventEntry);
}
delete dispatchEntry;
}
int InputDispatcher::handleReceiveCallback(int fd, int events, void* data) {
InputDispatcher* d = static_cast<InputDispatcher*>(data);
@@ -2121,9 +2142,7 @@ void InputDispatcher::synthesizeCancelationEventsForConnectionLocked(
cancelationEventEntry->release();
}
if (!connection->outboundQueue.head->inProgress) {
startDispatchCycleLocked(currentTime, connection);
}
startDispatchCycleLocked(currentTime, connection);
}
}
@@ -3139,10 +3158,6 @@ ssize_t InputDispatcher::getConnectionIndexLocked(const sp<InputChannel>& inputC
return -1;
}
void InputDispatcher::onDispatchCycleStartedLocked(
nsecs_t currentTime, const sp<Connection>& connection) {
}
void InputDispatcher::onDispatchCycleFinishedLocked(
nsecs_t currentTime, const sp<Connection>& connection, bool handled) {
CommandEntry* commandEntry = postCommandLocked(
@@ -3243,24 +3258,37 @@ void InputDispatcher::doDispatchCycleFinishedLockedInterruptible(
sp<Connection> connection = commandEntry->connection;
bool handled = commandEntry->handled;
bool skipNext = false;
if (!connection->outboundQueue.isEmpty()) {
DispatchEntry* dispatchEntry = connection->outboundQueue.head;
if (dispatchEntry->inProgress) {
if (dispatchEntry->eventEntry->type == EventEntry::TYPE_KEY) {
KeyEntry* keyEntry = static_cast<KeyEntry*>(dispatchEntry->eventEntry);
skipNext = afterKeyEventLockedInterruptible(connection,
dispatchEntry, keyEntry, handled);
} else if (dispatchEntry->eventEntry->type == EventEntry::TYPE_MOTION) {
MotionEntry* motionEntry = static_cast<MotionEntry*>(dispatchEntry->eventEntry);
skipNext = afterMotionEventLockedInterruptible(connection,
dispatchEntry, motionEntry, handled);
if (!connection->waitQueue.isEmpty()) {
// Handle post-event policy actions.
bool restartEvent;
DispatchEntry* dispatchEntry = connection->waitQueue.head;
if (dispatchEntry->eventEntry->type == EventEntry::TYPE_KEY) {
KeyEntry* keyEntry = static_cast<KeyEntry*>(dispatchEntry->eventEntry);
restartEvent = afterKeyEventLockedInterruptible(connection,
dispatchEntry, keyEntry, handled);
} else if (dispatchEntry->eventEntry->type == EventEntry::TYPE_MOTION) {
MotionEntry* motionEntry = static_cast<MotionEntry*>(dispatchEntry->eventEntry);
restartEvent = afterMotionEventLockedInterruptible(connection,
dispatchEntry, motionEntry, handled);
} else {
restartEvent = false;
}
// Dequeue the event and start the next cycle.
// Note that because the lock might have been released, it is possible that the
// contents of the wait queue to have been drained, so we need to double-check
// a few things.
if (connection->waitQueue.head == dispatchEntry) {
connection->waitQueue.dequeueAtHead();
if (restartEvent && connection->status == Connection::STATUS_NORMAL) {
connection->outboundQueue.enqueueAtHead(dispatchEntry);
} else {
releaseDispatchEntryLocked(dispatchEntry);
}
}
}
if (!skipNext) {
startNextDispatchCycleLocked(now(), connection);
// Start the next dispatch cycle for this connection.
startDispatchCycleLocked(now(), connection);
}
}
@@ -3324,11 +3352,9 @@ bool InputDispatcher::afterKeyEventLockedInterruptible(const sp<Connection>& con
if (connection->status != Connection::STATUS_NORMAL) {
connection->inputState.removeFallbackKey(originalKeyCode);
return true; // skip next cycle
return false;
}
ALOG_ASSERT(connection->outboundQueue.head == dispatchEntry);
// Latch the fallback keycode for this key on an initial down.
// The fallback keycode cannot change at any other point in the lifecycle.
if (initialDown) {
@@ -3406,10 +3432,7 @@ bool InputDispatcher::afterKeyEventLockedInterruptible(const sp<Connection>& con
"originalKeyCode=%d, fallbackKeyCode=%d, fallbackMetaState=%08x",
originalKeyCode, fallbackKeyCode, keyEntry->metaState);
#endif
dispatchEntry->inProgress = false;
startDispatchCycleLocked(now(), connection);
return true; // already started next cycle
return true; // restart the event
} else {
#if DEBUG_OUTBOUND_EVENT_DETAILS
ALOGD("Unhandled key event: No fallback key.");
@@ -3604,7 +3627,6 @@ InputDispatcher::DispatchEntry::DispatchEntry(EventEntry* eventEntry,
int32_t targetFlags, float xOffset, float yOffset, float scaleFactor) :
eventEntry(eventEntry), targetFlags(targetFlags),
xOffset(xOffset), yOffset(yOffset), scaleFactor(scaleFactor),
inProgress(false),
resolvedAction(0), resolvedFlags(0) {
eventEntry->refCount += 1;
}
@@ -3943,7 +3965,7 @@ InputDispatcher::Connection::Connection(const sp<InputChannel>& inputChannel,
const sp<InputWindowHandle>& inputWindowHandle, bool monitor) :
status(STATUS_NORMAL), inputChannel(inputChannel), inputWindowHandle(inputWindowHandle),
monitor(monitor),
inputPublisher(inputChannel) {
inputPublisher(inputChannel), inputPublisherBlocked(false) {
}
InputDispatcher::Connection::~Connection() {

View File

@@ -528,9 +528,6 @@ private:
float yOffset;
float scaleFactor;
// True if dispatch has started.
bool inProgress;
// Set to the resolved action and flags when the event is enqueued.
int32_t resolvedAction;
int32_t resolvedFlags;
@@ -782,8 +779,18 @@ private:
bool monitor;
InputPublisher inputPublisher;
InputState inputState;
// True if the socket is full and no further events can be published until
// the application consumes some of the input.
bool inputPublisherBlocked;
// Queue of events that need to be published to the connection.
Queue<DispatchEntry> outboundQueue;
// Queue of events that have been published to the connection but that have not
// yet received a "finished" response from the application.
Queue<DispatchEntry> waitQueue;
explicit Connection(const sp<InputChannel>& inputChannel,
const sp<InputWindowHandle>& inputWindowHandle, bool monitor);
@@ -976,7 +983,8 @@ private:
const InjectionState* injectionState);
bool isWindowObscuredAtPointLocked(const sp<InputWindowHandle>& windowHandle,
int32_t x, int32_t y) const;
bool isWindowFinishedWithPreviousInputLocked(const sp<InputWindowHandle>& windowHandle);
bool isWindowReadyForMoreInputLocked(nsecs_t currentTime,
const sp<InputWindowHandle>& windowHandle, bool focusedEvent);
String8 getApplicationWindowLabelLocked(const sp<InputApplicationHandle>& applicationHandle,
const sp<InputWindowHandle>& windowHandle);
@@ -993,10 +1001,10 @@ private:
void startDispatchCycleLocked(nsecs_t currentTime, const sp<Connection>& connection);
void finishDispatchCycleLocked(nsecs_t currentTime, const sp<Connection>& connection,
bool handled);
void startNextDispatchCycleLocked(nsecs_t currentTime, const sp<Connection>& connection);
void abortBrokenDispatchCycleLocked(nsecs_t currentTime, const sp<Connection>& connection,
bool notify);
void drainOutboundQueueLocked(Connection* connection);
void drainDispatchQueueLocked(Queue<DispatchEntry>* queue);
void releaseDispatchEntryLocked(DispatchEntry* dispatchEntry);
static int handleReceiveCallback(int fd, int events, void* data);
void synthesizeCancelationEventsForAllConnectionsLocked(
@@ -1025,8 +1033,6 @@ private:
void deactivateConnectionLocked(Connection* connection);
// Interesting events that we might like to log or tell the framework about.
void onDispatchCycleStartedLocked(
nsecs_t currentTime, const sp<Connection>& connection);
void onDispatchCycleFinishedLocked(
nsecs_t currentTime, const sp<Connection>& connection, bool handled);
void onDispatchCycleBrokenLocked(