Ensure that the TRTP retry buffer has contiguous sequence numbers
Previously, sequence numbers for audio packets were assigned by the TX player before packets were queued to the sender. This caused a race between assignment of sequence numbers on audio packets and sequence numbers on heartbeat packets. A heartbeat could get queued and added to the retry buffer before an audio packet with an earlier sequence number got queued. This CL centralizes packet sequence number assignment and insertion into the retry buffer inside AAH_TXSender::doSendPacket_l. It also makes explicit what operations can be done on a TRTPPacket before and after packing. Change-Id: I6d02eae81061983e4def4f1b3dd7c1625467b151
This commit is contained in:
committed by
Mike Lockwood
parent
d070dca89f
commit
e066b8fce4
@@ -13,12 +13,15 @@
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#define LOG_TAG "LibAAH_RTP"
|
||||
#include <utils/Log.h>
|
||||
|
||||
#include <arpa/inet.h>
|
||||
#include <string.h>
|
||||
|
||||
#include <media/stagefright/MediaDebug.h>
|
||||
|
||||
#include "aah_tx_packet.h"
|
||||
|
||||
namespace android {
|
||||
@@ -30,6 +33,130 @@ TRTPPacket::~TRTPPacket() {
|
||||
delete mPacket;
|
||||
}
|
||||
|
||||
/*** TRTP packet properties ***/
|
||||
|
||||
void TRTPPacket::setSeqNumber(uint16_t val) {
|
||||
mSeqNumber = val;
|
||||
|
||||
if (mIsPacked) {
|
||||
const int kTRTPSeqNumberOffset = 2;
|
||||
uint16_t* buf = reinterpret_cast<uint16_t*>(
|
||||
mPacket + kTRTPSeqNumberOffset);
|
||||
*buf = htons(mSeqNumber);
|
||||
}
|
||||
}
|
||||
|
||||
uint16_t TRTPPacket::getSeqNumber() const {
|
||||
return mSeqNumber;
|
||||
}
|
||||
|
||||
void TRTPPacket::setPTS(int64_t val) {
|
||||
CHECK(!mIsPacked);
|
||||
mPTS = val;
|
||||
mPTSValid = true;
|
||||
}
|
||||
|
||||
int64_t TRTPPacket::getPTS() const {
|
||||
return mPTS;
|
||||
}
|
||||
|
||||
void TRTPPacket::setEpoch(uint32_t val) {
|
||||
mEpoch = val;
|
||||
|
||||
if (mIsPacked) {
|
||||
const int kTRTPEpochOffset = 8;
|
||||
uint32_t* buf = reinterpret_cast<uint32_t*>(
|
||||
mPacket + kTRTPEpochOffset);
|
||||
uint32_t val = ntohl(*buf);
|
||||
val &= ~(kTRTPEpochMask << kTRTPEpochShift);
|
||||
val |= (mEpoch & kTRTPEpochMask) << kTRTPEpochShift;
|
||||
*buf = htonl(val);
|
||||
}
|
||||
}
|
||||
|
||||
void TRTPPacket::setProgramID(uint16_t val) {
|
||||
CHECK(!mIsPacked);
|
||||
mProgramID = val;
|
||||
}
|
||||
|
||||
void TRTPPacket::setSubstreamID(uint16_t val) {
|
||||
CHECK(!mIsPacked);
|
||||
mSubstreamID = val;
|
||||
}
|
||||
|
||||
|
||||
void TRTPPacket::setClockTransform(const LinearTransform& trans) {
|
||||
CHECK(!mIsPacked);
|
||||
mClockTranform = trans;
|
||||
mClockTranformValid = true;
|
||||
}
|
||||
|
||||
uint8_t* TRTPPacket::getPacket() const {
|
||||
CHECK(mIsPacked);
|
||||
return mPacket;
|
||||
}
|
||||
|
||||
int TRTPPacket::getPacketLen() const {
|
||||
CHECK(mIsPacked);
|
||||
return mPacketLen;
|
||||
}
|
||||
|
||||
void TRTPPacket::setExpireTime(nsecs_t val) {
|
||||
CHECK(!mIsPacked);
|
||||
mExpireTime = val;
|
||||
}
|
||||
|
||||
nsecs_t TRTPPacket::getExpireTime() const {
|
||||
return mExpireTime;
|
||||
}
|
||||
|
||||
/*** TRTP audio packet properties ***/
|
||||
|
||||
void TRTPAudioPacket::setCodecType(TRTPAudioCodecType val) {
|
||||
CHECK(!mIsPacked);
|
||||
mCodecType = val;
|
||||
}
|
||||
|
||||
void TRTPAudioPacket::setRandomAccessPoint(bool val) {
|
||||
CHECK(!mIsPacked);
|
||||
mRandomAccessPoint = val;
|
||||
}
|
||||
|
||||
void TRTPAudioPacket::setDropable(bool val) {
|
||||
CHECK(!mIsPacked);
|
||||
mDropable = val;
|
||||
}
|
||||
|
||||
void TRTPAudioPacket::setDiscontinuity(bool val) {
|
||||
CHECK(!mIsPacked);
|
||||
mDiscontinuity = val;
|
||||
}
|
||||
|
||||
void TRTPAudioPacket::setEndOfStream(bool val) {
|
||||
CHECK(!mIsPacked);
|
||||
mEndOfStream = val;
|
||||
}
|
||||
|
||||
void TRTPAudioPacket::setVolume(uint8_t val) {
|
||||
CHECK(!mIsPacked);
|
||||
mVolume = val;
|
||||
}
|
||||
|
||||
void TRTPAudioPacket::setAccessUnitData(void* data, int len) {
|
||||
CHECK(!mIsPacked);
|
||||
mAccessUnitData = data;
|
||||
mAccessUnitLen = len;
|
||||
}
|
||||
|
||||
/*** TRTP control packet properties ***/
|
||||
|
||||
void TRTPControlPacket::setCommandID(TRTPCommandID val) {
|
||||
CHECK(!mIsPacked);
|
||||
mCommandID = val;
|
||||
}
|
||||
|
||||
/*** TRTP packet serializers ***/
|
||||
|
||||
void TRTPPacket::writeU8(uint8_t*& buf, uint8_t val) {
|
||||
*buf = val;
|
||||
buf++;
|
||||
@@ -76,7 +203,7 @@ void TRTPPacket::writeTRTPHeader(uint8_t*& buf,
|
||||
writeU32(buf, 0);
|
||||
}
|
||||
writeU32(buf,
|
||||
((mEpoch & kTRTPEpochMask) << 10) |
|
||||
((mEpoch & kTRTPEpochMask) << kTRTPEpochShift) |
|
||||
((mProgramID & 0x1F) << 5) |
|
||||
(mSubstreamID & 0x1F));
|
||||
|
||||
@@ -100,6 +227,10 @@ void TRTPPacket::writeTRTPHeader(uint8_t*& buf,
|
||||
}
|
||||
|
||||
bool TRTPAudioPacket::pack() {
|
||||
if (mIsPacked) {
|
||||
return false;
|
||||
}
|
||||
|
||||
int packetLen = kRTPHeaderLen +
|
||||
mAccessUnitLen +
|
||||
TRTPHeaderLen();
|
||||
@@ -130,6 +261,7 @@ bool TRTPAudioPacket::pack() {
|
||||
|
||||
memcpy(cur, mAccessUnitData, mAccessUnitLen);
|
||||
|
||||
mIsPacked = true;
|
||||
return true;
|
||||
}
|
||||
|
||||
@@ -171,6 +303,10 @@ int TRTPAudioPacket::TRTPHeaderLen() const {
|
||||
}
|
||||
|
||||
bool TRTPControlPacket::pack() {
|
||||
if (mIsPacked) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// command packets contain a 2-byte command ID
|
||||
int packetLen = kRTPHeaderLen +
|
||||
TRTPHeaderLen() +
|
||||
@@ -188,6 +324,7 @@ bool TRTPControlPacket::pack() {
|
||||
writeTRTPHeader(cur, true, packetLen);
|
||||
writeU16(cur, mCommandID);
|
||||
|
||||
mIsPacked = true;
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
@@ -33,7 +33,8 @@ class TRTPPacket : public RefBase {
|
||||
};
|
||||
|
||||
TRTPPacket(TRTPHeaderType headerType)
|
||||
: mVersion(2)
|
||||
: mIsPacked(false)
|
||||
, mVersion(2)
|
||||
, mPadding(false)
|
||||
, mExtension(false)
|
||||
, mCsrcCount(0)
|
||||
@@ -54,31 +55,28 @@ class TRTPPacket : public RefBase {
|
||||
public:
|
||||
virtual ~TRTPPacket();
|
||||
|
||||
void setSeqNumber(uint16_t val) { mSeqNumber = val; }
|
||||
uint16_t getSeqNumber() const { return mSeqNumber; }
|
||||
void setPTS(int64_t val) {
|
||||
mPTS = val;
|
||||
mPTSValid = true;
|
||||
}
|
||||
int64_t getPTS() const { return mPTS; }
|
||||
void setEpoch(uint32_t val) { mEpoch = val; }
|
||||
void setProgramID(uint16_t val) { mProgramID = val; }
|
||||
void setSubstreamID(uint16_t val) { mSubstreamID = val; }
|
||||
void setClockTransform(const LinearTransform& trans) {
|
||||
mClockTranform = trans;
|
||||
mClockTranformValid = true;
|
||||
}
|
||||
void setSeqNumber(uint16_t val);
|
||||
uint16_t getSeqNumber() const;
|
||||
|
||||
uint8_t* getPacket() const { return mPacket; }
|
||||
int getPacketLen() const { return mPacketLen; }
|
||||
void setPTS(int64_t val);
|
||||
int64_t getPTS() const;
|
||||
|
||||
void setExpireTime(nsecs_t val) { mExpireTime = val; }
|
||||
nsecs_t getExpireTime() const { return mExpireTime; }
|
||||
void setEpoch(uint32_t val);
|
||||
void setProgramID(uint16_t val);
|
||||
void setSubstreamID(uint16_t val);
|
||||
void setClockTransform(const LinearTransform& trans);
|
||||
|
||||
uint8_t* getPacket() const;
|
||||
int getPacketLen() const;
|
||||
|
||||
void setExpireTime(nsecs_t val);
|
||||
nsecs_t getExpireTime() const;
|
||||
|
||||
virtual bool pack() = 0;
|
||||
|
||||
// mask for the number of bits in a TRTP epoch
|
||||
static const uint32_t kTRTPEpochMask = (1 << 22) - 1;
|
||||
static const int kTRTPEpochShift = 10;
|
||||
|
||||
protected:
|
||||
static const int kRTPHeaderLen = 12;
|
||||
@@ -93,6 +91,8 @@ class TRTPPacket : public RefBase {
|
||||
void writeU32(uint8_t*& buf, uint32_t val);
|
||||
void writeU64(uint8_t*& buf, uint64_t val);
|
||||
|
||||
bool mIsPacked;
|
||||
|
||||
uint8_t mVersion;
|
||||
bool mPadding;
|
||||
bool mExtension;
|
||||
@@ -135,16 +135,13 @@ class TRTPAudioPacket : public TRTPPacket {
|
||||
kCodecMPEG1Audio = 3,
|
||||
};
|
||||
|
||||
void setCodecType(TRTPAudioCodecType val) { mCodecType = val; }
|
||||
void setRandomAccessPoint(bool val) { mRandomAccessPoint = val; }
|
||||
void setDropable(bool val) { mDropable = val; }
|
||||
void setDiscontinuity(bool val) { mDiscontinuity = val; }
|
||||
void setEndOfStream(bool val) { mEndOfStream = val; }
|
||||
void setVolume(uint8_t val) { mVolume = val; }
|
||||
void setAccessUnitData(void* data, int len) {
|
||||
mAccessUnitData = data;
|
||||
mAccessUnitLen = len;
|
||||
}
|
||||
void setCodecType(TRTPAudioCodecType val);
|
||||
void setRandomAccessPoint(bool val);
|
||||
void setDropable(bool val);
|
||||
void setDiscontinuity(bool val);
|
||||
void setEndOfStream(bool val);
|
||||
void setVolume(uint8_t val);
|
||||
void setAccessUnitData(void* data, int len);
|
||||
|
||||
virtual bool pack();
|
||||
|
||||
@@ -174,7 +171,7 @@ class TRTPControlPacket : public TRTPPacket {
|
||||
kCommandEOS = 3,
|
||||
};
|
||||
|
||||
void setCommandID(TRTPCommandID val) { mCommandID = val; }
|
||||
void setCommandID(TRTPCommandID val);
|
||||
|
||||
virtual bool pack();
|
||||
|
||||
|
||||
@@ -1075,7 +1075,6 @@ void AAH_TXPlayer::queuePacketToSender_l(const sp<TRTPPacket>& packet) {
|
||||
return;
|
||||
}
|
||||
|
||||
mAAH_Sender->assignSeqNumber(mEndpoint, packet);
|
||||
message->setInt32(AAH_TXSender::kSendPacketIPAddr, mEndpoint.addr);
|
||||
message->setInt32(AAH_TXSender::kSendPacketPort, mEndpoint.port);
|
||||
}
|
||||
|
||||
@@ -180,24 +180,6 @@ void AAH_TXSender::unregisterEndpoint(const Endpoint& endpoint) {
|
||||
}
|
||||
}
|
||||
|
||||
void AAH_TXSender::assignSeqNumber(const Endpoint& endpoint,
|
||||
const sp<TRTPPacket>& packet) {
|
||||
Mutex::Autolock lock(mEndpointLock);
|
||||
assignSeqNumber_l(endpoint, packet);
|
||||
}
|
||||
|
||||
void AAH_TXSender::assignSeqNumber_l(const Endpoint& endpoint,
|
||||
const sp<TRTPPacket>& packet) {
|
||||
EndpointState* eps = mEndpointMap.valueFor(endpoint);
|
||||
if (!eps) {
|
||||
// the endpoint state has disappeared, so the player that sent this
|
||||
// packet must be dead.
|
||||
return;
|
||||
}
|
||||
packet->setEpoch(eps->epoch);
|
||||
packet->setSeqNumber(eps->trtpSeqNumber++);
|
||||
}
|
||||
|
||||
void AAH_TXSender::onMessageReceived(const sp<AMessage>& msg) {
|
||||
switch (msg->what()) {
|
||||
case kWhatSendPacket:
|
||||
@@ -219,8 +201,6 @@ void AAH_TXSender::onMessageReceived(const sp<AMessage>& msg) {
|
||||
}
|
||||
|
||||
void AAH_TXSender::onSendPacket(const sp<AMessage>& msg) {
|
||||
LOGV("*** %s", __PRETTY_FUNCTION__);
|
||||
|
||||
sp<RefBase> obj;
|
||||
CHECK(msg->findObject(kSendPacketTRTPPacket, &obj));
|
||||
sp<TRTPPacket> packet = static_cast<TRTPPacket*>(obj.get());
|
||||
@@ -233,19 +213,33 @@ void AAH_TXSender::onSendPacket(const sp<AMessage>& msg) {
|
||||
CHECK(msg->findInt32(kSendPacketPort, &port32));
|
||||
uint16_t port = port32;
|
||||
|
||||
doSendPacket(packet, ipAddr, port);
|
||||
|
||||
addToRetryBuffer(Endpoint(ipAddr, port), packet);
|
||||
Mutex::Autolock lock(mEndpointLock);
|
||||
doSendPacket_l(packet, Endpoint(ipAddr, port));
|
||||
}
|
||||
|
||||
void AAH_TXSender::doSendPacket(sp<TRTPPacket> packet,
|
||||
uint32_t ipAddr,
|
||||
uint16_t port) {
|
||||
void AAH_TXSender::doSendPacket_l(const sp<TRTPPacket>& packet,
|
||||
const Endpoint& endpoint) {
|
||||
EndpointState* eps = mEndpointMap.valueFor(endpoint);
|
||||
if (!eps) {
|
||||
// the endpoint state has disappeared, so the player that sent this
|
||||
// packet must be dead.
|
||||
return;
|
||||
}
|
||||
|
||||
// assign the packet's sequence number
|
||||
packet->setEpoch(eps->epoch);
|
||||
packet->setSeqNumber(eps->trtpSeqNumber++);
|
||||
|
||||
// add the packet to the retry buffer
|
||||
RetryBuffer& retry = eps->retry;
|
||||
retry.push_back(packet);
|
||||
|
||||
// send the packet
|
||||
struct sockaddr_in addr;
|
||||
memset(&addr, 0, sizeof(addr));
|
||||
addr.sin_family = AF_INET;
|
||||
addr.sin_addr.s_addr = ipAddr;
|
||||
addr.sin_port = htons(port);
|
||||
addr.sin_addr.s_addr = endpoint.addr;
|
||||
addr.sin_port = htons(endpoint.port);
|
||||
|
||||
ssize_t result = sendto(mSocket,
|
||||
packet->getPacket(),
|
||||
@@ -258,32 +252,7 @@ void AAH_TXSender::doSendPacket(sp<TRTPPacket> packet,
|
||||
}
|
||||
}
|
||||
|
||||
void AAH_TXSender::addToRetryBuffer(const Endpoint& endpoint,
|
||||
const sp<TRTPPacket>& packet) {
|
||||
Mutex::Autolock lock(mEndpointLock);
|
||||
|
||||
EndpointState* eps = mEndpointMap.valueFor(endpoint);
|
||||
if (!eps) {
|
||||
return;
|
||||
}
|
||||
|
||||
addToRetryBuffer_l(eps, packet);
|
||||
}
|
||||
|
||||
void AAH_TXSender::addToRetryBuffer_l(EndpointState* eps,
|
||||
const sp<TRTPPacket>& packet) {
|
||||
RetryBuffer& retry = eps->retry;
|
||||
retry.push_back(packet);
|
||||
|
||||
LOGV("*** %s seq=%hu size=%d",
|
||||
__PRETTY_FUNCTION__,
|
||||
packet->getSeqNumber(),
|
||||
retry.size());
|
||||
}
|
||||
|
||||
void AAH_TXSender::trimRetryBuffers() {
|
||||
LOGV("*** %s", __PRETTY_FUNCTION__);
|
||||
|
||||
Mutex::Autolock lock(mEndpointLock);
|
||||
|
||||
nsecs_t localTimeNow = systemTime();
|
||||
@@ -302,8 +271,6 @@ void AAH_TXSender::trimRetryBuffers() {
|
||||
}
|
||||
}
|
||||
|
||||
LOGV("*** %s size=%d", __PRETTY_FUNCTION__, retry.size());
|
||||
|
||||
if (retry.isEmpty() && eps->playerRefCount == 0) {
|
||||
endpointsToRemove.add(mEndpointMap.keyAt(i));
|
||||
}
|
||||
@@ -336,13 +303,11 @@ void AAH_TXSender::sendHeartbeats() {
|
||||
sp<TRTPControlPacket> packet = new TRTPControlPacket();
|
||||
packet->setCommandID(TRTPControlPacket::kCommandNop);
|
||||
|
||||
assignSeqNumber_l(ep, packet);
|
||||
packet->setExpireTime(systemTime() +
|
||||
AAH_TXPlayer::kAAHRetryKeepAroundTimeNs);
|
||||
packet->pack();
|
||||
|
||||
doSendPacket(packet, ep.addr, ep.port);
|
||||
addToRetryBuffer_l(eps, packet);
|
||||
doSendPacket_l(packet, ep);
|
||||
}
|
||||
|
||||
// schedule the next heartbeat
|
||||
|
||||
@@ -67,8 +67,6 @@ class AAH_TXSender : public virtual RefBase {
|
||||
|
||||
uint16_t registerEndpoint(const Endpoint& endpoint);
|
||||
void unregisterEndpoint(const Endpoint& endpoint);
|
||||
void assignSeqNumber(const Endpoint& endpoint,
|
||||
const sp<TRTPPacket>& packet);
|
||||
|
||||
enum {
|
||||
kWhatSendPacket,
|
||||
@@ -106,15 +104,11 @@ class AAH_TXSender : public virtual RefBase {
|
||||
friend class AHandlerReflector<AAH_TXSender>;
|
||||
void onMessageReceived(const sp<AMessage>& msg);
|
||||
void onSendPacket(const sp<AMessage>& msg);
|
||||
void doSendPacket(sp<TRTPPacket> packet, uint32_t ipAddr, uint16_t port);
|
||||
void addToRetryBuffer(const Endpoint& endpoint,
|
||||
const sp<TRTPPacket>& packet);
|
||||
void addToRetryBuffer_l(EndpointState* eps,
|
||||
const sp<TRTPPacket>& packet);
|
||||
void doSendPacket_l(const sp<TRTPPacket>& packet,
|
||||
const Endpoint& endpoint);
|
||||
void trimRetryBuffers();
|
||||
void sendHeartbeats();
|
||||
void assignSeqNumber_l(const Endpoint& endpoint,
|
||||
const sp<TRTPPacket>& packet);
|
||||
|
||||
sp<ALooper> mLooper;
|
||||
sp<AHandlerReflector<AAH_TXSender> > mReflector;
|
||||
|
||||
|
||||
Reference in New Issue
Block a user