Merge "Instead of asserting, remove active streams if their sockets return failure"

This commit is contained in:
Andreas Huber
2011-11-09 14:47:51 -08:00
committed by Android (Google) Code Review

View File

@@ -220,7 +220,7 @@ void ARTPConnection::onRemoveStream(const sp<AMessage> &msg) {
}
if (it == mStreams.end()) {
TRESPASS();
return;
}
mStreams.erase(it);
@@ -274,41 +274,52 @@ void ARTPConnection::onPollStreams() {
}
int res = select(maxSocket + 1, &rs, NULL, NULL, &tv);
CHECK_GE(res, 0);
if (res > 0) {
for (List<StreamInfo>::iterator it = mStreams.begin();
it != mStreams.end(); ++it) {
List<StreamInfo>::iterator it = mStreams.begin();
while (it != mStreams.end()) {
if ((*it).mIsInjected) {
++it;
continue;
}
status_t err = OK;
if (FD_ISSET(it->mRTPSocket, &rs)) {
receive(&*it, true);
err = receive(&*it, true);
}
if (FD_ISSET(it->mRTCPSocket, &rs)) {
receive(&*it, false);
if (err == OK && FD_ISSET(it->mRTCPSocket, &rs)) {
err = receive(&*it, false);
}
if (err == -ECONNRESET) {
// socket failure, this stream is dead, Jim.
LOGW("failed to receive RTP/RTCP datagram.");
it = mStreams.erase(it);
continue;
}
++it;
}
}
postPollEvent();
int64_t nowUs = ALooper::GetNowUs();
if (mLastReceiverReportTimeUs <= 0
|| mLastReceiverReportTimeUs + 5000000ll <= nowUs) {
sp<ABuffer> buffer = new ABuffer(kMaxUDPSize);
for (List<StreamInfo>::iterator it = mStreams.begin();
it != mStreams.end(); ++it) {
List<StreamInfo>::iterator it = mStreams.begin();
while (it != mStreams.end()) {
StreamInfo *s = &*it;
if (s->mIsInjected) {
++it;
continue;
}
if (s->mNumRTCPPacketsReceived == 0) {
// We have never received any RTCP packets on this stream,
// we don't even know where to send a report.
++it;
continue;
}
@@ -327,16 +338,34 @@ void ARTPConnection::onPollStreams() {
if (buffer->size() > 0) {
ALOGV("Sending RR...");
ssize_t n = sendto(
ssize_t n;
do {
n = sendto(
s->mRTCPSocket, buffer->data(), buffer->size(), 0,
(const struct sockaddr *)&s->mRemoteRTCPAddr,
sizeof(s->mRemoteRTCPAddr));
} while (n < 0 && errno == EINTR);
if (n <= 0) {
LOGW("failed to send RTCP receiver report (%s).",
n == 0 ? "connection gone" : strerror(errno));
it = mStreams.erase(it);
continue;
}
CHECK_EQ(n, (ssize_t)buffer->size());
mLastReceiverReportTimeUs = nowUs;
}
++it;
}
}
if (!mStreams.empty()) {
postPollEvent();
}
}
status_t ARTPConnection::receive(StreamInfo *s, bool receiveRTP) {
@@ -350,16 +379,19 @@ status_t ARTPConnection::receive(StreamInfo *s, bool receiveRTP) {
(!receiveRTP && s->mNumRTCPPacketsReceived == 0)
? sizeof(s->mRemoteRTCPAddr) : 0;
ssize_t nbytes = recvfrom(
ssize_t nbytes;
do {
nbytes = recvfrom(
receiveRTP ? s->mRTPSocket : s->mRTCPSocket,
buffer->data(),
buffer->capacity(),
0,
remoteAddrLen > 0 ? (struct sockaddr *)&s->mRemoteRTCPAddr : NULL,
remoteAddrLen > 0 ? &remoteAddrLen : NULL);
} while (nbytes < 0 && errno == EINTR);
if (nbytes < 0) {
return -1;
if (nbytes <= 0) {
return -ECONNRESET;
}
buffer->setRange(0, nbytes);