Sketch of Native input for MessageQueue / Looper / ViewRoot

MessageQueue now uses a socket for internal signalling, and is prepared
to also handle any number of event input pipes, once the plumbing is
set up with ViewRoot / Looper to tell it about them as appropriate.

Change-Id: If9eda174a6c26887dc51b12b14b390e724e73ab3
This commit is contained in:
Christopher Tate
2010-05-06 12:07:10 -07:00
parent ab4f3c60da
commit fa9e7c05c7
7 changed files with 441 additions and 20 deletions

View File

@@ -36,8 +36,9 @@ public class MessageQueue {
Message mMessages;
private final ArrayList mIdleHandlers = new ArrayList();
private boolean mQuiting = false;
private int mObject = 0; // used by native code
boolean mQuitAllowed = true;
/**
* Callback interface for discovering when a thread is going to block
* waiting for more messages.
@@ -85,16 +86,49 @@ public class MessageQueue {
}
}
MessageQueue() {
// Add an input pipe to the set being selected over. If token is
// negative, remove 'handler's entry from the current set and forget
// about it.
void setInputToken(int token, int region, Handler handler) {
if (token >= 0) nativeRegisterInputStream(token, region, handler);
else nativeUnregisterInputStream(token);
}
MessageQueue() {
nativeInit();
}
private native void nativeInit();
/**
* @param token fd of the readable end of the input stream
* @param region fd of the ashmem region used for data transport alongside the 'token' fd
* @param handler Handler from which to make input messages based on data read from the fd
*/
private native void nativeRegisterInputStream(int token, int region, Handler handler);
private native void nativeUnregisterInputStream(int token);
private native void nativeSignal();
/**
* Wait until the designated time for new messages to arrive.
*
* @param when Timestamp in SystemClock.uptimeMillis() base of the next message in the queue.
* If 'when' is zero, the method will check for incoming messages without blocking. If
* 'when' is negative, the method will block forever waiting for the next message.
* @return
*/
private native int nativeWaitForNext(long when);
final Message next() {
boolean tryIdle = true;
// when we start out, we'll just touch the input pipes and then go from there
long timeToNextEventMillis = 0;
while (true) {
long now;
Object[] idlers = null;
nativeWaitForNext(timeToNextEventMillis);
// Try to retrieve the next message, returning if found.
synchronized (this) {
now = SystemClock.uptimeMillis();
@@ -135,20 +169,17 @@ public class MessageQueue {
synchronized (this) {
// No messages, nobody to tell about it... time to wait!
try {
if (mMessages != null) {
if (mMessages.when-now > 0) {
Binder.flushPendingCommands();
this.wait(mMessages.when-now);
}
} else {
if (mMessages != null) {
if (mMessages.when - now > 0) {
Binder.flushPendingCommands();
this.wait();
timeToNextEventMillis = mMessages.when - now;
}
}
catch (InterruptedException e) {
} else {
Binder.flushPendingCommands();
timeToNextEventMillis = -1;
}
}
// loop to the while(true) and do the appropriate nativeWait(when)
}
}
@@ -190,7 +221,6 @@ public class MessageQueue {
if (p == null || when == 0 || when < p.when) {
msg.next = p;
mMessages = msg;
this.notify();
} else {
Message prev = null;
while (p != null && p.when <= when) {
@@ -199,8 +229,8 @@ public class MessageQueue {
}
msg.next = prev.next;
prev.next = msg;
this.notify();
}
nativeSignal();
}
return true;
}
@@ -321,7 +351,7 @@ public class MessageQueue {
void poke()
{
synchronized (this) {
this.notify();
nativeSignal();
}
}
}

View File

@@ -179,7 +179,7 @@ public class ParcelFileDescriptor implements Parcelable {
/**
* An InputStream you can create on a ParcelFileDescriptor, which will
* take care of calling {@link ParcelFileDescriptor#close
* ParcelFileDescritor.close()} for you when the stream is closed.
* ParcelFileDescriptor.close()} for you when the stream is closed.
*/
public static class AutoCloseInputStream extends FileInputStream {
private final ParcelFileDescriptor mFd;
@@ -198,7 +198,7 @@ public class ParcelFileDescriptor implements Parcelable {
/**
* An OutputStream you can create on a ParcelFileDescriptor, which will
* take care of calling {@link ParcelFileDescriptor#close
* ParcelFileDescritor.close()} for you when the stream is closed.
* ParcelFileDescriptor.close()} for you when the stream is closed.
*/
public static class AutoCloseOutputStream extends FileOutputStream {
private final ParcelFileDescriptor mFd;

View File

@@ -26,12 +26,12 @@ import android.graphics.Rect;
import android.graphics.Region;
import android.os.*;
import android.os.Process;
import android.os.SystemProperties;
import android.util.AndroidRuntimeException;
import android.util.Config;
import android.util.DisplayMetrics;
import android.util.Log;
import android.util.EventLog;
import android.util.Slog;
import android.util.SparseArray;
import android.view.View.MeasureSpec;
import android.view.accessibility.AccessibilityEvent;
@@ -50,6 +50,7 @@ import android.Manifest;
import android.media.AudioManager;
import java.lang.ref.WeakReference;
import java.io.FileDescriptor;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
@@ -76,6 +77,7 @@ public final class ViewRoot extends Handler implements ViewParent,
/** @noinspection PointlessBooleanExpression*/
private static final boolean DEBUG_DRAW = false || LOCAL_LOGV;
private static final boolean DEBUG_LAYOUT = false || LOCAL_LOGV;
private static final boolean DEBUG_INPUT = true || LOCAL_LOGV;
private static final boolean DEBUG_INPUT_RESIZE = false || LOCAL_LOGV;
private static final boolean DEBUG_ORIENTATION = false || LOCAL_LOGV;
private static final boolean DEBUG_TRACKBALL = false || LOCAL_LOGV;
@@ -425,6 +427,9 @@ public final class ViewRoot extends Handler implements ViewParent,
}
}
// fd [0] is the receiver, [1] is the sender
private native int[] makeInputChannel();
/**
* We have one child
*/
@@ -469,6 +474,14 @@ public final class ViewRoot extends Handler implements ViewParent,
mAdded = true;
int res; /* = WindowManagerImpl.ADD_OKAY; */
// Set up the input event channel
if (false) {
int[] fds = makeInputChannel();
if (DEBUG_INPUT) {
Log.v(TAG, "makeInputChannel() returned " + fds);
}
}
// Schedule the first layout -before- adding to the window
// manager, to make sure we do the relayout before receiving
// any other events from the system.

View File

@@ -51,6 +51,7 @@ LOCAL_SRC_FILES:= \
android_os_Debug.cpp \
android_os_FileUtils.cpp \
android_os_MemoryFile.cpp \
android_os_MessageQueue.cpp \
android_os_ParcelFileDescriptor.cpp \
android_os_Power.cpp \
android_os_StatFs.cpp \

View File

@@ -128,6 +128,7 @@ extern int register_android_nio_utils(JNIEnv* env);
extern int register_android_pim_EventRecurrence(JNIEnv* env);
extern int register_android_text_format_Time(JNIEnv* env);
extern int register_android_os_Debug(JNIEnv* env);
extern int register_android_os_MessageQueue(JNIEnv* env);
extern int register_android_os_ParcelFileDescriptor(JNIEnv *env);
extern int register_android_os_Power(JNIEnv *env);
extern int register_android_os_StatFs(JNIEnv *env);
@@ -1249,6 +1250,7 @@ static const RegJNIRec gRegJNI[] = {
REG_JNI(register_android_os_Debug),
REG_JNI(register_android_os_FileObserver),
REG_JNI(register_android_os_FileUtils),
REG_JNI(register_android_os_MessageQueue),
REG_JNI(register_android_os_ParcelFileDescriptor),
REG_JNI(register_android_os_Power),
REG_JNI(register_android_os_StatFs),

View File

@@ -0,0 +1,338 @@
/*
* Copyright (C) 2010 The Android Open Source Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#define LOG_TAG "MQNative"
#include "JNIHelp.h"
#include <sys/socket.h>
#include <sys/select.h>
#include <sys/time.h>
#include <fcntl.h>
#include <android_runtime/AndroidRuntime.h>
#include <utils/SystemClock.h>
#include <utils/Vector.h>
#include <utils/Log.h>
using namespace android;
// ----------------------------------------------------------------------------
static struct {
jclass mClass;
jfieldID mObject; // native object attached to the DVM MessageQueue
} gMessageQueueOffsets;
static struct {
jclass mClass;
jmethodID mConstructor;
} gKeyEventOffsets;
// TODO: also MotionEvent offsets etc. a la gKeyEventOffsets
static struct {
jclass mClass;
jmethodID mObtain; // obtain(Handler h, int what, Object obj)
} gMessageOffsets;
// ----------------------------------------------------------------------------
static void doThrow(JNIEnv* env, const char* exc, const char* msg = NULL)
{
if (jniThrowException(env, exc, msg) != 0)
assert(false);
}
// ----------------------------------------------------------------------------
class MessageQueueNative {
public:
MessageQueueNative(int readSocket, int writeSocket);
~MessageQueueNative();
// select on all FDs until the designated time; forever if wakeupTime is < 0
int waitForSignal(jobject mqueue, jlong wakeupTime);
// signal the queue-ready pipe
void signalQueuePipe();
// Specify a new input pipe, passing in responsibility for the socket fd and
// ashmem region
int registerInputPipe(JNIEnv* env, int socketFd, int memRegionFd, jobject handler);
// Forget about this input pipe, closing the socket and ashmem region as well
int unregisterInputPipe(JNIEnv* env, int socketFd);
size_t numRegisteredPipes() const { return mInputPipes.size(); }
private:
struct InputPipe {
int fd;
int region;
jobject handler;
InputPipe() {}
InputPipe(int _fd, int _r, jobject _h) : fd(_fd), region(_r), handler(_h) {}
};
// consume an event from a socket, put it on the DVM MessageQueue indicated,
// and notify the other end of the pipe that we've consumed it.
void queueEventFromPipe(const InputPipe& pipe, jobject mqueue);
int mQueueReadFd, mQueueWriteFd;
Vector<InputPipe> mInputPipes;
};
MessageQueueNative::MessageQueueNative(int readSocket, int writeSocket)
: mQueueReadFd(readSocket), mQueueWriteFd(writeSocket) {
}
MessageQueueNative::~MessageQueueNative() {
}
int MessageQueueNative::waitForSignal(jobject mqueue, jlong timeoutMillis) {
struct timeval tv, *timeout;
fd_set fdset;
if (timeoutMillis < 0) {
timeout = NULL;
} else {
if (timeoutMillis == 0) {
tv.tv_sec = 0;
tv.tv_usec = 0;
} else {
tv.tv_sec = (timeoutMillis / 1000);
tv.tv_usec = (timeoutMillis - (1000 * tv.tv_sec)) * 1000;
}
timeout = &tv;
}
// always rebuild the fd set from scratch
FD_ZERO(&fdset);
// the queue signalling pipe
FD_SET(mQueueReadFd, &fdset);
int maxFd = mQueueReadFd;
// and the input sockets themselves
for (size_t i = 0; i < mInputPipes.size(); i++) {
FD_SET(mInputPipes[i].fd, &fdset);
if (maxFd < mInputPipes[i].fd) {
maxFd = mInputPipes[i].fd;
}
}
// now wait
int res = select(maxFd + 1, &fdset, NULL, NULL, timeout);
// Error? Just return it and bail
if (res < 0) return res;
// What happened -- timeout or actual data arrived?
if (res == 0) {
// select() returned zero, which means we timed out, which means that it's time
// to deliver the head element that was already on the queue. Just fall through
// without doing anything else.
} else {
// Data (or a queue signal) arrived!
//
// If it's data, pull the data off the pipe, build a new Message with it, put it on
// the DVM-side MessageQueue (pointed to by the 'mqueue' parameter), then proceed
// into the queue-signal case.
//
// If a queue signal arrived, just consume any data pending in that pipe and
// fall out.
bool queue_signalled = (FD_ISSET(mQueueReadFd, &fdset) != 0);
for (size_t i = 0; i < mInputPipes.size(); i++) {
if (FD_ISSET(mInputPipes[i].fd, &fdset)) {
queueEventFromPipe(mInputPipes[i], mqueue);
queue_signalled = true; // we know a priori that queueing the event does this
}
}
// Okay, stuff went on the queue. Consume the contents of the signal pipe
// now that we're awake and about to start dispatching messages again.
if (queue_signalled) {
uint8_t buf[16];
ssize_t nRead;
do {
nRead = read(mQueueReadFd, buf, sizeof(buf));
} while (nRead > 0); // in nonblocking mode we'll get -1 when it's drained
}
}
return 0;
}
// signals to the queue pipe are one undefined byte. it's just a "data has arrived" token
// and the pipe is drained on receipt of at least one signal
void MessageQueueNative::signalQueuePipe() {
int dummy[1];
write(mQueueWriteFd, dummy, 1);
}
void MessageQueueNative::queueEventFromPipe(const InputPipe& inPipe, jobject mqueue) {
// !!! TODO: read the event data from the InputPipe's ashmem region, convert it to a DVM
// event object of the proper type [MotionEvent or KeyEvent], create a Message holding
// it as appropriate, point the Message to the Handler associated with this InputPipe,
// and call up to the DVM MessageQueue implementation to enqueue it for delivery.
}
// the number of registered pipes on success; < 0 on error
int MessageQueueNative::registerInputPipe(JNIEnv* env,
int socketFd, int memRegionFd, jobject handler) {
// make sure this fd is not already known to us
for (size_t i = 0; i < mInputPipes.size(); i++) {
if (mInputPipes[i].fd == socketFd) {
LOGE("Attempt to re-register input fd %d", socketFd);
return -1;
}
}
mInputPipes.push( InputPipe(socketFd, memRegionFd, env->NewGlobalRef(handler)) );
return mInputPipes.size();
}
// Remove an input pipe from our bookkeeping. Also closes the socket and ashmem
// region file descriptor!
//
// returns the number of remaining input pipes on success; < 0 on error
int MessageQueueNative::unregisterInputPipe(JNIEnv* env, int socketFd) {
for (size_t i = 0; i < mInputPipes.size(); i++) {
if (mInputPipes[i].fd == socketFd) {
close(mInputPipes[i].fd);
close(mInputPipes[i].region);
env->DeleteGlobalRef(mInputPipes[i].handler);
mInputPipes.removeAt(i);
return mInputPipes.size();
}
}
LOGW("Tried to unregister input pipe %d but not found!", socketFd);
return -1;
}
// ----------------------------------------------------------------------------
namespace android {
static void android_os_MessageQueue_init(JNIEnv* env, jobject obj) {
// Create the pipe
int fds[2];
int err = socketpair(AF_LOCAL, SOCK_STREAM, 0, fds);
if (err != 0) {
doThrow(env, "java/lang/RuntimeException", "Unable to create socket pair");
}
MessageQueueNative *mqn = new MessageQueueNative(fds[0], fds[1]);
if (mqn == NULL) {
close(fds[0]);
close(fds[1]);
doThrow(env, "java/lang/RuntimeException", "Unable to allocate native queue");
}
int flags = fcntl(fds[0], F_GETFL);
fcntl(fds[0], F_SETFL, flags | O_NONBLOCK);
flags = fcntl(fds[1], F_GETFL);
fcntl(fds[1], F_SETFL, flags | O_NONBLOCK);
env->SetIntField(obj, gMessageQueueOffsets.mObject, (jint)mqn);
}
static void android_os_MessageQueue_signal(JNIEnv* env, jobject obj) {
MessageQueueNative *mqn = (MessageQueueNative*) env->GetIntField(obj, gMessageQueueOffsets.mObject);
if (mqn != NULL) {
mqn->signalQueuePipe();
} else {
doThrow(env, "java/lang/IllegalStateException", "Queue not initialized");
}
}
static int android_os_MessageQueue_waitForNext(JNIEnv* env, jobject obj, jlong when) {
MessageQueueNative *mqn = (MessageQueueNative*) env->GetIntField(obj, gMessageQueueOffsets.mObject);
if (mqn != NULL) {
int res = mqn->waitForSignal(obj, when);
return res; // the DVM event, if any, has been constructed and queued now
}
return -1;
}
static void android_os_MessageQueue_registerInputStream(JNIEnv* env, jobject obj,
jint socketFd, jint regionFd, jobject handler) {
MessageQueueNative *mqn = (MessageQueueNative*) env->GetIntField(obj, gMessageQueueOffsets.mObject);
if (mqn != NULL) {
mqn->registerInputPipe(env, socketFd, regionFd, handler);
} else {
doThrow(env, "java/lang/IllegalStateException", "Queue not initialized");
}
}
static void android_os_MessageQueue_unregisterInputStream(JNIEnv* env, jobject obj,
jint socketFd) {
MessageQueueNative *mqn = (MessageQueueNative*) env->GetIntField(obj, gMessageQueueOffsets.mObject);
if (mqn != NULL) {
mqn->unregisterInputPipe(env, socketFd);
} else {
doThrow(env, "java/lang/IllegalStateException", "Queue not initialized");
}
}
// ----------------------------------------------------------------------------
const char* const kKeyEventPathName = "android/view/KeyEvent";
const char* const kMessagePathName = "android/os/Message";
const char* const kMessageQueuePathName = "android/os/MessageQueue";
static JNINativeMethod gMessageQueueMethods[] = {
/* name, signature, funcPtr */
{ "nativeInit", "()V", (void*)android_os_MessageQueue_init },
{ "nativeSignal", "()V", (void*)android_os_MessageQueue_signal },
{ "nativeWaitForNext", "(J)I", (void*)android_os_MessageQueue_waitForNext },
{ "nativeRegisterInputStream", "(IILandroid/os/Handler;)V", (void*)android_os_MessageQueue_registerInputStream },
{ "nativeUnregisterInputStream", "(I)V", (void*)android_os_MessageQueue_unregisterInputStream },
};
int register_android_os_MessageQueue(JNIEnv* env) {
jclass clazz;
clazz = env->FindClass(kMessageQueuePathName);
LOG_FATAL_IF(clazz == NULL, "Unable to find class android.os.MessageQueue");
gMessageQueueOffsets.mClass = (jclass) env->NewGlobalRef(clazz);
gMessageQueueOffsets.mObject = env->GetFieldID(clazz, "mObject", "I");
assert(gMessageQueueOffsets.mObject);
clazz = env->FindClass(kMessagePathName);
LOG_FATAL_IF(clazz == NULL, "Unable to find class android.os.Message");
gMessageOffsets.mClass = (jclass) env->NewGlobalRef(clazz);
gMessageOffsets.mObtain = env->GetStaticMethodID(clazz, "obtain",
"(Landroid/os/Handler;ILjava/lang/Object;)Landroid/os/Message;");
assert(gMessageOffsets.mObtain);
clazz = env->FindClass(kKeyEventPathName);
LOG_FATAL_IF(clazz == NULL, "Unable to find class android.view.KeyEvent");
gKeyEventOffsets.mClass = (jclass) env->NewGlobalRef(clazz);
gKeyEventOffsets.mConstructor = env->GetMethodID(clazz, "<init>", "(JJIIIIIII)V");
assert(gKeyEventOffsets.mConstructor);
return AndroidRuntime::registerNativeMethods(env, kMessageQueuePathName,
gMessageQueueMethods, NELEM(gMessageQueueMethods));
}
}; // end of namespace android

View File

@@ -16,6 +16,7 @@
#include <stdio.h>
#include <assert.h>
#include <sys/socket.h>
#include <core/SkCanvas.h>
#include <core/SkDevice.h>
@@ -24,6 +25,7 @@
#include "GraphicsJNI.h"
#include "jni.h"
#include <nativehelper/JNIHelp.h>
#include <android_runtime/AndroidRuntime.h>
#include <utils/misc.h>
@@ -78,6 +80,39 @@ static void android_view_ViewRoot_abandonGlCaches(JNIEnv* env, jobject) {
SkGLCanvas::AbandonAllTextures();
}
static jintArray android_view_ViewRoot_makeInputChannel(JNIEnv* env, jobject) {
int fd[2];
jint* arrayData = NULL;
// Create the pipe
int err = socketpair(AF_LOCAL, SOCK_STREAM, 0, fd);
if (err != 0) {
fprintf(stderr, "socketpair() failed: %d\n", errno);
doThrow(env, "java/lang/RuntimeException", "Unable to create pipe");
return NULL;
}
// Set up the return array
jintArray array = env->NewIntArray(2);
if (env->ExceptionCheck()) {
fprintf(stderr, "Exception allocating fd array");
goto bail;
}
arrayData = env->GetIntArrayElements(array, 0);
arrayData[0] = fd[0];
arrayData[1] = fd[1];
env->ReleaseIntArrayElements(array, arrayData, 0);
return array;
bail:
env->DeleteLocalRef(array);
close(fd[0]);
close(fd[1]);
return NULL;
}
// ----------------------------------------------------------------------------
const char* const kClassPathName = "android/view/ViewRoot";
@@ -86,7 +121,9 @@ static JNINativeMethod gMethods[] = {
{ "nativeShowFPS", "(Landroid/graphics/Canvas;I)V",
(void*)android_view_ViewRoot_showFPS },
{ "nativeAbandonGlCaches", "()V",
(void*)android_view_ViewRoot_abandonGlCaches }
(void*)android_view_ViewRoot_abandonGlCaches },
{ "makeInputChannel", "()[I",
(void*)android_view_ViewRoot_makeInputChannel }
};
int register_android_view_ViewRoot(JNIEnv* env) {