Merge "Support for RTP packets arriving interleaved with RTSP responses." into gingerbread

This commit is contained in:
Andreas Huber
2010-08-26 13:47:16 -07:00
committed by Android (Google) Code Review
6 changed files with 217 additions and 22 deletions

View File

@@ -57,6 +57,8 @@ struct ARTPConnection::StreamInfo {
int32_t mNumRTCPPacketsReceived; int32_t mNumRTCPPacketsReceived;
struct sockaddr_in mRemoteRTCPAddr; struct sockaddr_in mRemoteRTCPAddr;
bool mIsInjected;
}; };
ARTPConnection::ARTPConnection(uint32_t flags) ARTPConnection::ARTPConnection(uint32_t flags)
@@ -72,13 +74,15 @@ void ARTPConnection::addStream(
int rtpSocket, int rtcpSocket, int rtpSocket, int rtcpSocket,
const sp<ASessionDescription> &sessionDesc, const sp<ASessionDescription> &sessionDesc,
size_t index, size_t index,
const sp<AMessage> &notify) { const sp<AMessage> &notify,
bool injected) {
sp<AMessage> msg = new AMessage(kWhatAddStream, id()); sp<AMessage> msg = new AMessage(kWhatAddStream, id());
msg->setInt32("rtp-socket", rtpSocket); msg->setInt32("rtp-socket", rtpSocket);
msg->setInt32("rtcp-socket", rtcpSocket); msg->setInt32("rtcp-socket", rtcpSocket);
msg->setObject("session-desc", sessionDesc); msg->setObject("session-desc", sessionDesc);
msg->setSize("index", index); msg->setSize("index", index);
msg->setMessage("notify", notify); msg->setMessage("notify", notify);
msg->setInt32("injected", injected);
msg->post(); msg->post();
} }
@@ -154,6 +158,12 @@ void ARTPConnection::onMessageReceived(const sp<AMessage> &msg) {
break; break;
} }
case kWhatInjectPacket:
{
onInjectPacket(msg);
break;
}
default: default:
{ {
TRESPASS(); TRESPASS();
@@ -172,6 +182,11 @@ void ARTPConnection::onAddStream(const sp<AMessage> &msg) {
CHECK(msg->findInt32("rtcp-socket", &s)); CHECK(msg->findInt32("rtcp-socket", &s));
info->mRTCPSocket = s; info->mRTCPSocket = s;
int32_t injected;
CHECK(msg->findInt32("injected", &injected));
info->mIsInjected = injected;
sp<RefBase> obj; sp<RefBase> obj;
CHECK(msg->findObject("session-desc", &obj)); CHECK(msg->findObject("session-desc", &obj));
info->mSessionDesc = static_cast<ASessionDescription *>(obj.get()); info->mSessionDesc = static_cast<ASessionDescription *>(obj.get());
@@ -182,7 +197,9 @@ void ARTPConnection::onAddStream(const sp<AMessage> &msg) {
info->mNumRTCPPacketsReceived = 0; info->mNumRTCPPacketsReceived = 0;
memset(&info->mRemoteRTCPAddr, 0, sizeof(info->mRemoteRTCPAddr)); memset(&info->mRemoteRTCPAddr, 0, sizeof(info->mRemoteRTCPAddr));
postPollEvent(); if (!injected) {
postPollEvent();
}
} }
void ARTPConnection::onRemoveStream(const sp<AMessage> &msg) { void ARTPConnection::onRemoveStream(const sp<AMessage> &msg) {
@@ -231,6 +248,10 @@ void ARTPConnection::onPollStreams() {
int maxSocket = -1; int maxSocket = -1;
for (List<StreamInfo>::iterator it = mStreams.begin(); for (List<StreamInfo>::iterator it = mStreams.begin();
it != mStreams.end(); ++it) { it != mStreams.end(); ++it) {
if ((*it).mIsInjected) {
continue;
}
FD_SET(it->mRTPSocket, &rs); FD_SET(it->mRTPSocket, &rs);
FD_SET(it->mRTCPSocket, &rs); FD_SET(it->mRTCPSocket, &rs);
@@ -248,6 +269,10 @@ void ARTPConnection::onPollStreams() {
if (res > 0) { if (res > 0) {
for (List<StreamInfo>::iterator it = mStreams.begin(); for (List<StreamInfo>::iterator it = mStreams.begin();
it != mStreams.end(); ++it) { it != mStreams.end(); ++it) {
if ((*it).mIsInjected) {
continue;
}
if (FD_ISSET(it->mRTPSocket, &rs)) { if (FD_ISSET(it->mRTPSocket, &rs)) {
receive(&*it, true); receive(&*it, true);
} }
@@ -301,6 +326,8 @@ void ARTPConnection::onPollStreams() {
} }
status_t ARTPConnection::receive(StreamInfo *s, bool receiveRTP) { status_t ARTPConnection::receive(StreamInfo *s, bool receiveRTP) {
CHECK(!s->mIsInjected);
sp<ABuffer> buffer = new ABuffer(65536); sp<ABuffer> buffer = new ABuffer(65536);
socklen_t remoteAddrLen = socklen_t remoteAddrLen =
@@ -559,5 +586,42 @@ sp<ARTPSource> ARTPConnection::findSource(StreamInfo *info, uint32_t srcId) {
return source; return source;
} }
void ARTPConnection::injectPacket(int index, const sp<ABuffer> &buffer) {
sp<AMessage> msg = new AMessage(kWhatInjectPacket, id());
msg->setInt32("index", index);
msg->setObject("buffer", buffer);
msg->post();
}
void ARTPConnection::onInjectPacket(const sp<AMessage> &msg) {
int32_t index;
CHECK(msg->findInt32("index", &index));
sp<RefBase> obj;
CHECK(msg->findObject("buffer", &obj));
sp<ABuffer> buffer = static_cast<ABuffer *>(obj.get());
List<StreamInfo>::iterator it = mStreams.begin();
while (it != mStreams.end()
&& it->mRTPSocket != index && it->mRTCPSocket != index) {
++it;
}
if (it == mStreams.end()) {
TRESPASS();
}
StreamInfo *s = &*it;
status_t err;
if (it->mRTPSocket == index) {
err = parseRTP(s, buffer);
} else {
++s->mNumRTCPPacketsReceived;
err = parseRTCP(s, buffer);
}
}
} // namespace android } // namespace android

View File

@@ -38,10 +38,13 @@ struct ARTPConnection : public AHandler {
void addStream( void addStream(
int rtpSocket, int rtcpSocket, int rtpSocket, int rtcpSocket,
const sp<ASessionDescription> &sessionDesc, size_t index, const sp<ASessionDescription> &sessionDesc, size_t index,
const sp<AMessage> &notify); const sp<AMessage> &notify,
bool injected);
void removeStream(int rtpSocket, int rtcpSocket); void removeStream(int rtpSocket, int rtcpSocket);
void injectPacket(int index, const sp<ABuffer> &buffer);
// Creates a pair of UDP datagram sockets bound to adjacent ports // Creates a pair of UDP datagram sockets bound to adjacent ports
// (the rtpSocket is bound to an even port, the rtcpSocket to the // (the rtpSocket is bound to an even port, the rtcpSocket to the
// next higher port). // next higher port).
@@ -57,6 +60,7 @@ private:
kWhatAddStream, kWhatAddStream,
kWhatRemoveStream, kWhatRemoveStream,
kWhatPollStreams, kWhatPollStreams,
kWhatInjectPacket,
}; };
static const int64_t kSelectTimeoutUs; static const int64_t kSelectTimeoutUs;
@@ -72,6 +76,7 @@ private:
void onAddStream(const sp<AMessage> &msg); void onAddStream(const sp<AMessage> &msg);
void onRemoveStream(const sp<AMessage> &msg); void onRemoveStream(const sp<AMessage> &msg);
void onPollStreams(); void onPollStreams();
void onInjectPacket(const sp<AMessage> &msg);
void onSendReceiverReports(); void onSendReceiverReports();
status_t receive(StreamInfo *info, bool receiveRTP); status_t receive(StreamInfo *info, bool receiveRTP);

View File

@@ -83,7 +83,8 @@ status_t ARTPSession::setup(const sp<ASessionDescription> &desc) {
sp<AMessage> notify = new AMessage(kWhatAccessUnitComplete, id()); sp<AMessage> notify = new AMessage(kWhatAccessUnitComplete, id());
notify->setSize("track-index", mTracks.size() - 1); notify->setSize("track-index", mTracks.size() - 1);
mRTPConn->addStream(rtpSocket, rtcpSocket, mDesc, i, notify); mRTPConn->addStream(
rtpSocket, rtcpSocket, mDesc, i, notify, false /* injected */);
info->mPacketSource = source; info->mPacketSource = source;
} }

View File

@@ -19,6 +19,7 @@
#include <media/stagefright/foundation/ABuffer.h> #include <media/stagefright/foundation/ABuffer.h>
#include <media/stagefright/foundation/ADebug.h> #include <media/stagefright/foundation/ADebug.h>
#include <media/stagefright/foundation/AMessage.h> #include <media/stagefright/foundation/AMessage.h>
#include <media/stagefright/MediaErrors.h>
#include <arpa/inet.h> #include <arpa/inet.h>
#include <fcntl.h> #include <fcntl.h>
@@ -67,6 +68,12 @@ void ARTSPConnection::sendRequest(
msg->post(); msg->post();
} }
void ARTSPConnection::observeBinaryData(const sp<AMessage> &reply) {
sp<AMessage> msg = new AMessage(kWhatObserveBinaryData, id());
msg->setMessage("reply", reply);
msg->post();
}
void ARTSPConnection::onMessageReceived(const sp<AMessage> &msg) { void ARTSPConnection::onMessageReceived(const sp<AMessage> &msg) {
switch (msg->what()) { switch (msg->what()) {
case kWhatConnect: case kWhatConnect:
@@ -89,6 +96,12 @@ void ARTSPConnection::onMessageReceived(const sp<AMessage> &msg) {
onReceiveResponse(); onReceiveResponse();
break; break;
case kWhatObserveBinaryData:
{
CHECK(msg->findMessage("reply", &mObserveBinaryMessage));
break;
}
default: default:
TRESPASS(); TRESPASS();
break; break;
@@ -396,16 +409,13 @@ void ARTSPConnection::postReceiveReponseEvent() {
mReceiveResponseEventPending = true; mReceiveResponseEventPending = true;
} }
bool ARTSPConnection::receiveLine(AString *line) { status_t ARTSPConnection::receive(void *data, size_t size) {
line->clear(); size_t offset = 0;
while (offset < size) {
bool sawCR = false; ssize_t n = recv(mSocket, (uint8_t *)data + offset, size - offset, 0);
for (;;) {
char c;
ssize_t n = recv(mSocket, &c, 1, 0);
if (n == 0) { if (n == 0) {
// Server closed the connection. // Server closed the connection.
return false; return ERROR_IO;
} else if (n < 0) { } else if (n < 0) {
if (errno == EINTR) { if (errno == EINTR) {
continue; continue;
@@ -414,6 +424,22 @@ bool ARTSPConnection::receiveLine(AString *line) {
TRESPASS(); TRESPASS();
} }
offset += (size_t)n;
}
return OK;
}
bool ARTSPConnection::receiveLine(AString *line) {
line->clear();
bool sawCR = false;
for (;;) {
char c;
if (receive(&c, 1) != OK) {
return false;
}
if (sawCR && c == '\n') { if (sawCR && c == '\n') {
line->erase(line->size() - 1, 1); line->erase(line->size() - 1, 1);
return true; return true;
@@ -421,17 +447,59 @@ bool ARTSPConnection::receiveLine(AString *line) {
line->append(&c, 1); line->append(&c, 1);
if (c == '$' && line->size() == 1) {
// Special-case for interleaved binary data.
return true;
}
sawCR = (c == '\r'); sawCR = (c == '\r');
} }
} }
bool ARTSPConnection::receiveRTSPReponse() { sp<ABuffer> ARTSPConnection::receiveBinaryData() {
sp<ARTSPResponse> response = new ARTSPResponse; uint8_t x[3];
if (receive(x, 3) != OK) {
return NULL;
}
if (!receiveLine(&response->mStatusLine)) { sp<ABuffer> buffer = new ABuffer((x[1] << 8) | x[2]);
if (receive(buffer->data(), buffer->size()) != OK) {
return NULL;
}
buffer->meta()->setInt32("index", (int32_t)x[0]);
return buffer;
}
bool ARTSPConnection::receiveRTSPReponse() {
AString statusLine;
if (!receiveLine(&statusLine)) {
return false; return false;
} }
if (statusLine == "$") {
sp<ABuffer> buffer = receiveBinaryData();
if (buffer == NULL) {
return false;
}
if (mObserveBinaryMessage != NULL) {
sp<AMessage> notify = mObserveBinaryMessage->dup();
notify->setObject("buffer", buffer);
notify->post();
} else {
LOG(WARNING) << "received binary data, but no one cares.";
}
return true;
}
sp<ARTSPResponse> response = new ARTSPResponse;
response->mStatusLine = statusLine;
LOG(INFO) << "status: " << response->mStatusLine; LOG(INFO) << "status: " << response->mStatusLine;
ssize_t space1 = response->mStatusLine.find(" "); ssize_t space1 = response->mStatusLine.find(" ");

View File

@@ -40,6 +40,8 @@ struct ARTSPConnection : public AHandler {
void sendRequest(const char *request, const sp<AMessage> &reply); void sendRequest(const char *request, const sp<AMessage> &reply);
void observeBinaryData(const sp<AMessage> &reply);
protected: protected:
virtual ~ARTSPConnection(); virtual ~ARTSPConnection();
virtual void onMessageReceived(const sp<AMessage> &msg); virtual void onMessageReceived(const sp<AMessage> &msg);
@@ -57,6 +59,7 @@ private:
kWhatCompleteConnection = 'comc', kWhatCompleteConnection = 'comc',
kWhatSendRequest = 'sreq', kWhatSendRequest = 'sreq',
kWhatReceiveResponse = 'rres', kWhatReceiveResponse = 'rres',
kWhatObserveBinaryData = 'obin',
}; };
static const int64_t kSelectTimeoutUs; static const int64_t kSelectTimeoutUs;
@@ -69,6 +72,8 @@ private:
KeyedVector<int32_t, sp<AMessage> > mPendingRequests; KeyedVector<int32_t, sp<AMessage> > mPendingRequests;
sp<AMessage> mObserveBinaryMessage;
void onConnect(const sp<AMessage> &msg); void onConnect(const sp<AMessage> &msg);
void onDisconnect(const sp<AMessage> &msg); void onDisconnect(const sp<AMessage> &msg);
void onCompleteConnection(const sp<AMessage> &msg); void onCompleteConnection(const sp<AMessage> &msg);
@@ -80,7 +85,9 @@ private:
// Return false iff something went unrecoverably wrong. // Return false iff something went unrecoverably wrong.
bool receiveRTSPReponse(); bool receiveRTSPReponse();
status_t receive(void *data, size_t size);
bool receiveLine(AString *line); bool receiveLine(AString *line);
sp<ABuffer> receiveBinaryData();
bool notifyResponseListener(const sp<ARTSPResponse> &response); bool notifyResponseListener(const sp<ARTSPResponse> &response);
static bool ParseURL( static bool ParseURL(

View File

@@ -29,6 +29,8 @@
#include <media/stagefright/foundation/AMessage.h> #include <media/stagefright/foundation/AMessage.h>
#include <media/stagefright/MediaErrors.h> #include <media/stagefright/MediaErrors.h>
#define USE_TCP_INTERLEAVED 0
namespace android { namespace android {
struct MyHandler : public AHandler { struct MyHandler : public AHandler {
@@ -55,6 +57,9 @@ struct MyHandler : public AHandler {
mLooper->registerHandler(mConn); mLooper->registerHandler(mConn);
(1 ? mNetLooper : mLooper)->registerHandler(mRTPConn); (1 ? mNetLooper : mLooper)->registerHandler(mRTPConn);
sp<AMessage> notify = new AMessage('biny', id());
mConn->observeBinaryData(notify);
sp<AMessage> reply = new AMessage('conn', id()); sp<AMessage> reply = new AMessage('conn', id());
mConn->connect(mSessionURL.c_str(), reply); mConn->connect(mSessionURL.c_str(), reply);
} }
@@ -91,6 +96,8 @@ struct MyHandler : public AHandler {
sp<AMessage> reply = new AMessage('desc', id()); sp<AMessage> reply = new AMessage('desc', id());
mConn->sendRequest(request.c_str(), reply); mConn->sendRequest(request.c_str(), reply);
} else {
(new AMessage('disc', id()))->post();
} }
break; break;
} }
@@ -183,8 +190,10 @@ struct MyHandler : public AHandler {
if (result != OK) { if (result != OK) {
if (track) { if (track) {
close(track->mRTPSocket); if (!track->mUsingInterleavedTCP) {
close(track->mRTCPSocket); close(track->mRTPSocket);
close(track->mRTCPSocket);
}
mTracks.removeItemsAt(trackIndex); mTracks.removeItemsAt(trackIndex);
} }
@@ -216,7 +225,7 @@ struct MyHandler : public AHandler {
mRTPConn->addStream( mRTPConn->addStream(
track->mRTPSocket, track->mRTCPSocket, track->mRTPSocket, track->mRTCPSocket,
mSessionDesc, index, mSessionDesc, index,
notify); notify, track->mUsingInterleavedTCP);
mSetupTracksSuccessful = true; mSetupTracksSuccessful = true;
} }
@@ -263,6 +272,9 @@ struct MyHandler : public AHandler {
mDoneMsg->setInt32("result", OK); mDoneMsg->setInt32("result", OK);
mDoneMsg->post(); mDoneMsg->post();
mDoneMsg = NULL; mDoneMsg = NULL;
sp<AMessage> timeout = new AMessage('tiou', id());
timeout->post(10000000ll);
} else { } else {
sp<AMessage> reply = new AMessage('disc', id()); sp<AMessage> reply = new AMessage('disc', id());
mConn->disconnect(reply); mConn->disconnect(reply);
@@ -451,6 +463,29 @@ struct MyHandler : public AHandler {
break; break;
} }
case 'biny':
{
sp<RefBase> obj;
CHECK(msg->findObject("buffer", &obj));
sp<ABuffer> buffer = static_cast<ABuffer *>(obj.get());
int32_t index;
CHECK(buffer->meta()->findInt32("index", &index));
mRTPConn->injectPacket(index, buffer);
break;
}
case 'tiou':
{
if (mFirstAccessUnit) {
LOG(WARNING) << "Never received any data, disconnecting.";
}
(new AMessage('abor', id()))->post();
break;
}
default: default:
TRESPASS(); TRESPASS();
break; break;
@@ -485,6 +520,7 @@ private:
struct TrackInfo { struct TrackInfo {
int mRTPSocket; int mRTPSocket;
int mRTCPSocket; int mRTCPSocket;
bool mUsingInterleavedTCP;
sp<APacketSource> mPacketSource; sp<APacketSource> mPacketSource;
}; };
@@ -515,19 +551,33 @@ private:
mTracks.push(TrackInfo()); mTracks.push(TrackInfo());
TrackInfo *info = &mTracks.editItemAt(mTracks.size() - 1); TrackInfo *info = &mTracks.editItemAt(mTracks.size() - 1);
info->mPacketSource = source; info->mPacketSource = source;
info->mUsingInterleavedTCP = false;
unsigned rtpPort;
ARTPConnection::MakePortPair(
&info->mRTPSocket, &info->mRTCPSocket, &rtpPort);
AString request = "SETUP "; AString request = "SETUP ";
request.append(trackURL); request.append(trackURL);
request.append(" RTSP/1.0\r\n"); request.append(" RTSP/1.0\r\n");
#if USE_TCP_INTERLEAVED
size_t interleaveIndex = 2 * (mTracks.size() - 1);
info->mUsingInterleavedTCP = true;
info->mRTPSocket = interleaveIndex;
info->mRTCPSocket = interleaveIndex + 1;
request.append("Transport: RTP/AVP/TCP;interleaved=");
request.append(interleaveIndex);
request.append("-");
request.append(interleaveIndex + 1);
#else
unsigned rtpPort;
ARTPConnection::MakePortPair(
&info->mRTPSocket, &info->mRTCPSocket, &rtpPort);
request.append("Transport: RTP/AVP/UDP;unicast;client_port="); request.append("Transport: RTP/AVP/UDP;unicast;client_port=");
request.append(rtpPort); request.append(rtpPort);
request.append("-"); request.append("-");
request.append(rtpPort + 1); request.append(rtpPort + 1);
#endif
request.append("\r\n"); request.append("\r\n");
if (index > 1) { if (index > 1) {