Switch stagefright's approach to prefetching to the new model. The java MediaPlayer is now notified about rebuffering start/end via info messages.

Change-Id: If8185ba329ce8b6663b1ad39a4efb0ad3be81df2
This commit is contained in:
Andreas Huber
2010-06-10 11:17:50 -07:00
parent a6ef19a8f4
commit 4d61f602bf
13 changed files with 124 additions and 1429 deletions

View File

@@ -88756,6 +88756,28 @@
visibility="public"
>
</field>
<field name="MEDIA_INFO_BUFFERING_END"
type="int"
transient="false"
volatile="false"
value="702"
static="true"
final="true"
deprecated="not deprecated"
visibility="public"
>
</field>
<field name="MEDIA_INFO_BUFFERING_START"
type="int"
transient="false"
volatile="false"
value="701"
static="true"
final="true"
deprecated="not deprecated"
visibility="public"
>
</field>
<field name="MEDIA_INFO_METADATA_UPDATE"
type="int"
transient="false"

View File

@@ -93,6 +93,11 @@ enum media_info_type {
// The video is too complex for the decoder: it can't decode frames fast
// enough. Possibly only the audio plays fine at this stage.
MEDIA_INFO_VIDEO_TRACK_LAGGING = 700,
// MediaPlayer is temporarily pausing playback internally in order to
// buffer more data.
MEDIA_INFO_BUFFERING_START = 701,
// MediaPlayer is resuming playback after filling buffers.
MEDIA_INFO_BUFFERING_END = 702,
// 8xx
// Bad interleaving means that a media has been improperly interleaved or not
// interleaved at all, e.g has all the video samples first then all the audio

View File

@@ -1,66 +0,0 @@
/*
* Copyright (C) 2009 The Android Open Source Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef CACHING_DATASOURCE_H_
#define CACHING_DATASOURCE_H_
#include <media/stagefright/DataSource.h>
#include <media/stagefright/MediaErrors.h>
#include <utils/threads.h>
namespace android {
class CachingDataSource : public DataSource {
public:
CachingDataSource(
const sp<DataSource> &source, size_t pageSize, int numPages);
virtual status_t initCheck() const;
virtual ssize_t readAt(off_t offset, void *data, size_t size);
virtual status_t getSize(off_t *size);
virtual uint32_t flags();
protected:
virtual ~CachingDataSource();
private:
struct Page {
Page *mPrev, *mNext;
off_t mOffset;
size_t mLength;
void *mData;
};
sp<DataSource> mSource;
void *mData;
size_t mPageSize;
Page *mFirst, *mLast;
Page *allocate_page();
Mutex mLock;
CachingDataSource(const CachingDataSource &);
CachingDataSource &operator=(const CachingDataSource &);
};
} // namespace android
#endif // CACHING_DATASOURCE_H_

View File

@@ -1,103 +0,0 @@
/*
* Copyright (C) 2009 The Android Open Source Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef HTTP_DATASOURCE_H_
#define HTTP_DATASOURCE_H_
#include <media/stagefright/DataSource.h>
#include <utils/String8.h>
#include <utils/threads.h>
namespace android {
class HTTPStream;
class HTTPDataSource : public DataSource {
public:
HTTPDataSource(
const char *host, int port, const char *path,
const KeyedVector<String8, String8> *headers = NULL);
HTTPDataSource(
const char *uri,
const KeyedVector<String8, String8> *headers = NULL);
status_t connect();
void disconnect();
virtual status_t initCheck() const;
virtual ssize_t readAt(off_t offset, void *data, size_t size);
virtual status_t getSize(off_t *size);
virtual uint32_t flags();
protected:
virtual ~HTTPDataSource();
private:
enum {
kBufferSize = 64 * 1024,
// If we encounter a socket-read error we'll try reconnecting
// and restarting the read for at most this many times.
kMaxNumRetries = 3,
};
enum State {
DISCONNECTED,
CONNECTING,
CONNECTED
};
State mState;
mutable Mutex mStateLock;
String8 mHeaders;
String8 mStartingHost;
String8 mStartingPath;
int mStartingPort;
HTTPStream *mHttp;
void *mBuffer;
size_t mBufferLength;
off_t mBufferOffset;
bool mContentLengthValid;
unsigned long long mContentLength;
int32_t mNumRetriesLeft;
void init(const KeyedVector<String8, String8> *headers);
ssize_t sendRangeRequest(size_t offset);
void initHeaders(const KeyedVector<String8, String8> *overrides);
status_t connectWithRedirectsAndRange(off_t rangeStart);
void applyTimeoutResponse();
HTTPDataSource(const HTTPDataSource &);
HTTPDataSource &operator=(const HTTPDataSource &);
};
} // namespace android
#endif // HTTP_DATASOURCE_H_

View File

@@ -1523,6 +1523,15 @@ public class MediaPlayer
*/
public static final int MEDIA_INFO_VIDEO_TRACK_LAGGING = 700;
/** MediaPlayer is temporarily pausing playback internally in order to
* buffer more data.
*/
public static final int MEDIA_INFO_BUFFERING_START = 701;
/** MediaPlayer is resuming playback after filling buffers.
*/
public static final int MEDIA_INFO_BUFFERING_END = 702;
/** Bad interleaving means that a media has been improperly interleaved or
* not interleaved at all, e.g has all the video samples first then all the
* audio ones. Video is playing but a lot of disk seeks may be happening.

View File

@@ -22,19 +22,18 @@ LOCAL_SRC_FILES += \
AudioPlayer.cpp \
AudioSource.cpp \
AwesomePlayer.cpp \
CachingDataSource.cpp \
CameraSource.cpp \
DataSource.cpp \
FileSource.cpp \
HTTPDataSource.cpp \
HTTPStream.cpp \
JPEGSource.cpp \
MP3Extractor.cpp \
MPEG4Extractor.cpp \
MPEG4Writer.cpp \
MediaExtractor.cpp \
NuCachedSource2.cpp \
NuHTTPDataSource.cpp \
OggExtractor.cpp \
Prefetcher.cpp \
SampleIterator.cpp \
SampleTable.cpp \
ShoutcastSource.cpp \

View File

@@ -23,12 +23,12 @@
#include "include/ARTSPController.h"
#include "include/AwesomePlayer.h"
#include "include/LiveSource.h"
#include "include/Prefetcher.h"
#include "include/SoftwareRenderer.h"
#include "include/NuCachedSource2.h"
#include "include/ThrottledSource.h"
#include <binder/IPCThreadState.h>
#include <media/stagefright/AudioPlayer.h>
#include <media/stagefright/CachingDataSource.h>
#include <media/stagefright/DataSource.h>
#include <media/stagefright/FileSource.h>
#include <media/stagefright/MediaBuffer.h>
@@ -354,11 +354,7 @@ void AwesomePlayer::reset_l() {
cancelPlayerEvents();
if (mPrefetcher != NULL) {
CHECK_EQ(mPrefetcher->getStrongCount(), 1);
}
mPrefetcher.clear();
mCachedSource.clear();
mAudioTrack.clear();
mVideoTrack.clear();
@@ -448,30 +444,45 @@ void AwesomePlayer::onBufferingUpdate() {
}
mBufferingEventPending = false;
int64_t durationUs;
{
Mutex::Autolock autoLock(mMiscStateLock);
durationUs = mDurationUs;
if (mCachedSource == NULL) {
return;
}
int64_t cachedDurationUs = mPrefetcher->getCachedDurationUs();
size_t lowWatermark = 400000;
size_t highWatermark = 1000000;
LOGI("cache holds %.2f secs worth of data.", cachedDurationUs / 1E6);
off_t size;
if (mDurationUs >= 0 && mCachedSource->getSize(&size) == OK) {
int64_t bitrate = size * 8000000ll / mDurationUs; // in bits/sec
if (durationUs >= 0) {
int64_t positionUs;
getPosition(&positionUs);
size_t cachedSize = mCachedSource->cachedSize();
int64_t cachedDurationUs = cachedSize * 8000000ll / bitrate;
cachedDurationUs += positionUs;
double percentage = (double)cachedDurationUs / mDurationUs;
double percentage = (double)cachedDurationUs / durationUs;
notifyListener_l(MEDIA_BUFFERING_UPDATE, percentage * 100.0);
postBufferingEvent_l();
} else {
// LOGE("Not sending buffering status because duration is unknown.");
postBufferingEvent_l();
lowWatermark = 2 * bitrate / 8; // 2 secs
highWatermark = 10 * bitrate / 8; // 10 secs
}
bool eos;
size_t cachedDataRemaining = mCachedSource->approxDataRemaining(&eos);
if ((mFlags & PLAYING) && !eos && (cachedDataRemaining < lowWatermark)) {
LOGI("cache is running low (< %d) , pausing.", lowWatermark);
mFlags |= CACHE_UNDERRUN;
pause_l();
notifyListener_l(MEDIA_INFO, MEDIA_INFO_BUFFERING_START);
} else if ((mFlags & CACHE_UNDERRUN)
&& (eos || cachedDataRemaining > highWatermark)) {
LOGI("cache has filled up (> %d), resuming.", highWatermark);
mFlags &= ~CACHE_UNDERRUN;
play_l();
notifyListener_l(MEDIA_INFO, MEDIA_INFO_BUFFERING_END);
}
postBufferingEvent_l();
}
void AwesomePlayer::onStreamDone() {
@@ -508,6 +519,9 @@ void AwesomePlayer::onStreamDone() {
status_t AwesomePlayer::play() {
Mutex::Autolock autoLock(mLock);
mFlags &= ~CACHE_UNDERRUN;
return play_l();
}
@@ -632,6 +646,9 @@ void AwesomePlayer::initRenderer_l() {
status_t AwesomePlayer::pause() {
Mutex::Autolock autoLock(mLock);
mFlags &= ~CACHE_UNDERRUN;
return pause_l();
}
@@ -652,7 +669,7 @@ status_t AwesomePlayer::pause_l() {
}
bool AwesomePlayer::isPlaying() const {
return mFlags & PLAYING;
return (mFlags & PLAYING) || (mFlags & CACHE_UNDERRUN);
}
void AwesomePlayer::setISurface(const sp<ISurface> &isurface) {
@@ -719,6 +736,11 @@ status_t AwesomePlayer::seekTo(int64_t timeUs) {
}
status_t AwesomePlayer::seekTo_l(int64_t timeUs) {
if (mFlags & CACHE_UNDERRUN) {
mFlags &= ~CACHE_UNDERRUN;
play_l();
}
mSeeking = true;
mSeekNotificationSent = false;
mSeekTimeUs = timeUs;
@@ -764,10 +786,6 @@ status_t AwesomePlayer::getVideoDimensions(
void AwesomePlayer::setAudioSource(sp<MediaSource> source) {
CHECK(source != NULL);
if (mPrefetcher != NULL) {
source = mPrefetcher->addSource(source);
}
mAudioTrack = source;
}
@@ -814,10 +832,6 @@ status_t AwesomePlayer::initAudioDecoder() {
void AwesomePlayer::setVideoSource(sp<MediaSource> source) {
CHECK(source != NULL);
if (mPrefetcher != NULL) {
source = mPrefetcher->addSource(source);
}
mVideoTrack = source;
}
@@ -869,6 +883,21 @@ void AwesomePlayer::onVideoEvent() {
mVideoBuffer->release();
mVideoBuffer = NULL;
}
if (mCachedSource != NULL && mAudioSource != NULL) {
// We're going to seek the video source first, followed by
// the audio source.
// In order to avoid jumps in the DataSource offset caused by
// the audio codec prefetching data from the old locations
// while the video codec is already reading data from the new
// locations, we'll "pause" the audio source, causing it to
// stop reading input data until a subsequent seek.
if (mAudioPlayer != NULL) {
mAudioPlayer->pause();
}
mAudioSource->pause();
}
}
if (!mVideoBuffer) {
@@ -925,6 +954,7 @@ void AwesomePlayer::onVideoEvent() {
LOGV("seeking audio to %lld us (%.2f secs).", timeUs, timeUs / 1E6);
mAudioPlayer->seekTo(timeUs);
mAudioPlayer->resume();
mWatchForAudioSeekComplete = true;
mWatchForAudioEOS = true;
} else if (!mSeekNotificationSent) {
@@ -1012,10 +1042,6 @@ void AwesomePlayer::postStreamDoneEvent_l(status_t status) {
}
void AwesomePlayer::postBufferingEvent_l() {
if (mPrefetcher == NULL) {
return;
}
if (mBufferingEventPending) {
return;
}
@@ -1123,10 +1149,10 @@ status_t AwesomePlayer::finishSetDataSource_l() {
sp<DataSource> dataSource;
if (!strncasecmp("http://", mUri.string(), 7)) {
mConnectingDataSource = new HTTPDataSource(mUri, &mUriHeaders);
mConnectingDataSource = new NuHTTPDataSource;
mLock.unlock();
status_t err = mConnectingDataSource->connect();
status_t err = mConnectingDataSource->connect(mUri/*, mUriHeaders */);
mLock.lock();
if (err != OK) {
@@ -1136,22 +1162,29 @@ status_t AwesomePlayer::finishSetDataSource_l() {
return err;
}
dataSource = new CachingDataSource(
mConnectingDataSource, 64 * 1024, 10);
#if 0
mCachedSource = new NuCachedSource2(
new ThrottledSource(
mConnectingDataSource, 50 * 1024 /* bytes/sec */));
#else
mCachedSource = new NuCachedSource2(mConnectingDataSource);
#endif
mConnectingDataSource.clear();
dataSource = mCachedSource;
} else if (!strncasecmp(mUri.string(), "httplive://", 11)) {
String8 uri("http://");
uri.append(mUri.string() + 11);
dataSource = new LiveSource(uri.string());
if (dataSource->flags() & DataSource::kWantsPrefetching) {
mPrefetcher = new Prefetcher;
}
mCachedSource = new NuCachedSource2(dataSource);
dataSource = mCachedSource;
sp<MediaExtractor> extractor =
MediaExtractor::Create(dataSource, MEDIA_MIMETYPE_CONTAINER_MPEG2TS);
return setDataSource_l(extractor);
} else if (!strncasecmp("rtsp://", mUri.string(), 7)) {
if (mLooper == NULL) {
mLooper = new ALooper;
@@ -1183,10 +1216,6 @@ status_t AwesomePlayer::finishSetDataSource_l() {
return UNKNOWN_ERROR;
}
if (dataSource->flags() & DataSource::kWantsPrefetching) {
mPrefetcher = new Prefetcher;
}
return setDataSource_l(extractor);
}
@@ -1211,8 +1240,6 @@ bool AwesomePlayer::ContinuePreparation(void *cookie) {
}
void AwesomePlayer::onPrepareAsyncEvent() {
sp<Prefetcher> prefetcher;
{
Mutex::Autolock autoLock(mLock);
@@ -1248,39 +1275,6 @@ void AwesomePlayer::onPrepareAsyncEvent() {
return;
}
}
prefetcher = mPrefetcher;
}
if (prefetcher != NULL) {
{
Mutex::Autolock autoLock(mLock);
if (mFlags & PREPARE_CANCELLED) {
LOGI("prepare was cancelled before preparing the prefetcher");
prefetcher.clear();
abortPrepare(UNKNOWN_ERROR);
return;
}
}
LOGI("calling prefetcher->prepare()");
status_t result =
prefetcher->prepare(&AwesomePlayer::ContinuePreparation, this);
prefetcher.clear();
if (result == OK) {
LOGI("prefetcher is done preparing");
} else {
Mutex::Autolock autoLock(mLock);
CHECK_EQ(result, -EINTR);
LOGI("prefetcher->prepare() was cancelled early.");
abortPrepare(UNKNOWN_ERROR);
return;
}
}
Mutex::Autolock autoLock(mLock);

View File

@@ -1,165 +0,0 @@
/*
* Copyright (C) 2009 The Android Open Source Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <stdlib.h>
#include <string.h>
#include <media/stagefright/CachingDataSource.h>
#include <media/stagefright/MediaDebug.h>
namespace android {
CachingDataSource::CachingDataSource(
const sp<DataSource> &source, size_t pageSize, int numPages)
: mSource(source),
mData(malloc(pageSize * numPages)),
mPageSize(pageSize),
mFirst(NULL),
mLast(NULL) {
for (int i = 0; i < numPages; ++i) {
Page *page = new Page;
page->mPrev = mLast;
page->mNext = NULL;
if (mLast == NULL) {
mFirst = page;
} else {
mLast->mNext = page;
}
mLast = page;
page->mOffset = -1;
page->mLength = 0;
page->mData = (char *)mData + mPageSize * i;
}
}
CachingDataSource::~CachingDataSource() {
Page *page = mFirst;
while (page != NULL) {
Page *next = page->mNext;
delete page;
page = next;
}
mFirst = mLast = NULL;
free(mData);
mData = NULL;
}
status_t CachingDataSource::initCheck() const {
return mSource->initCheck();
}
status_t CachingDataSource::getSize(off_t *size) {
return mSource->getSize(size);
}
uint32_t CachingDataSource::flags() {
return mSource->flags();
}
ssize_t CachingDataSource::readAt(off_t offset, void *data, size_t size) {
Mutex::Autolock autoLock(mLock);
size_t total = 0;
while (size > 0) {
Page *page = mFirst;
while (page != NULL) {
if (page->mOffset >= 0 && offset >= page->mOffset
&& offset < page->mOffset + (off_t)page->mLength) {
break;
}
page = page->mNext;
}
if (page == NULL) {
page = allocate_page();
page->mOffset = offset - offset % mPageSize;
ssize_t n = mSource->readAt(page->mOffset, page->mData, mPageSize);
if (n < 0) {
page->mLength = 0;
} else {
page->mLength = (size_t)n;
}
mFirst->mPrev = page;
page->mNext = mFirst;
page->mPrev = NULL;
mFirst = page;
if (n < 0) {
return n;
}
if (offset >= page->mOffset + (off_t)page->mLength) {
break;
}
} else {
// Move "page" to the front in LRU order.
if (page->mNext != NULL) {
page->mNext->mPrev = page->mPrev;
} else {
mLast = page->mPrev;
}
if (page->mPrev != NULL) {
page->mPrev->mNext = page->mNext;
} else {
mFirst = page->mNext;
}
mFirst->mPrev = page;
page->mNext = mFirst;
page->mPrev = NULL;
mFirst = page;
}
size_t copy = page->mLength - (offset - page->mOffset);
if (copy > size) {
copy = size;
}
memcpy(data,(const char *)page->mData + (offset - page->mOffset),
copy);
total += copy;
if (page->mLength < mPageSize) {
// This was the final page. There is no more data beyond it.
break;
}
offset += copy;
size -= copy;
data = (char *)data + copy;
}
return total;
}
CachingDataSource::Page *CachingDataSource::allocate_page() {
// The last page is the least recently used, i.e. oldest.
Page *page = mLast;
page->mPrev->mNext = NULL;
mLast = page->mPrev;
page->mPrev = NULL;
return page;
}
} // namespace android

View File

@@ -20,13 +20,13 @@
#include "include/WAVExtractor.h"
#include "include/OggExtractor.h"
#include "include/MPEG2TSExtractor.h"
#include "include/NuCachedSource2.h"
#include "include/NuHTTPDataSource.h"
#include "matroska/MatroskaExtractor.h"
#include <media/stagefright/CachingDataSource.h>
#include <media/stagefright/DataSource.h>
#include <media/stagefright/FileSource.h>
#include <media/stagefright/HTTPDataSource.h>
#include <media/stagefright/MediaErrors.h>
#include <utils/String8.h>
@@ -108,11 +108,11 @@ sp<DataSource> DataSource::CreateFromURI(
if (!strncasecmp("file://", uri, 7)) {
source = new FileSource(uri + 7);
} else if (!strncasecmp("http://", uri, 7)) {
sp<HTTPDataSource> httpSource = new HTTPDataSource(uri, headers);
if (httpSource->connect() != OK) {
sp<NuHTTPDataSource> httpSource = new NuHTTPDataSource;
if (httpSource->connect(uri /* , headers */) != OK) {
return NULL;
}
source = new CachingDataSource(httpSource, 64 * 1024, 10);
source = new NuCachedSource2(httpSource);
} else {
// Assume it's a filename.
source = new FileSource(uri);

View File

@@ -1,457 +0,0 @@
/*
* Copyright (C) 2009 The Android Open Source Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
//#define LOG_NDEBUG 0
#define LOG_TAG "HTTPDataSource"
#include <utils/Log.h>
#include "include/stagefright_string.h"
#include "include/HTTPStream.h"
#include <stdlib.h>
#include <cutils/properties.h>
#include <media/stagefright/HTTPDataSource.h>
#include <media/stagefright/MediaDebug.h>
namespace android {
status_t HTTPDataSource::connectWithRedirectsAndRange(off_t rangeStart) {
string host = mStartingHost.string();
string path = mStartingPath.string();
int port = mStartingPort;
LOGV("Connecting to host '%s', port %d, path '%s'",
host.c_str(), port, path.c_str());
int numRedirectsRemaining = 5;
while (numRedirectsRemaining-- > 0) {
{
Mutex::Autolock autoLock(mStateLock);
if (mState == DISCONNECTED) {
return UNKNOWN_ERROR;
}
}
status_t err = mHttp->connect(host.c_str(), port);
if (err != OK) {
return err;
}
String8 request;
request.append("GET ");
request.append(path.c_str());
request.append(" HTTP/1.1\r\n");
request.append(mHeaders);
request.append("Host: ");
request.append(host.c_str());
request.append("\r\n");
if (rangeStart > 0) {
char range[128];
sprintf(range, "Range: bytes=%ld-\r\n", rangeStart);
request.append(range);
}
request.append("\r\n");
err = mHttp->send(request.string());
if (err != OK) {
return err;
}
int httpStatus;
err = mHttp->receive_header(&httpStatus);
if (err != OK) {
return err;
}
if (httpStatus >= 200 && httpStatus < 300) {
applyTimeoutResponse();
return OK;
}
if (httpStatus != 301 && httpStatus != 302) {
LOGE("HTTP request failed w/ http status %d", httpStatus);
return ERROR_IO;
}
string location;
CHECK(mHttp->find_header_value("Location", &location));
CHECK(string(location, 0, 7) == "http://");
location.erase(0, 7);
string::size_type slashPos = location.find('/');
if (slashPos == string::npos) {
slashPos = location.size();
location += '/';
}
mHttp->disconnect();
LOGV("Redirecting to %s\n", location.c_str());
host = string(location, 0, slashPos);
string::size_type colonPos = host.find(':');
if (colonPos != string::npos) {
const char *start = host.c_str() + colonPos + 1;
char *end;
long tmp = strtol(start, &end, 10);
CHECK(end > start && (*end == '\0'));
port = (tmp >= 0 && tmp < 65536) ? (int)tmp : 80;
host.erase(colonPos, host.size() - colonPos);
} else {
port = 80;
}
path = string(location, slashPos);
mStartingHost = host.c_str();
mStartingPath = path.c_str();
mStartingPort = port;
}
return ERROR_IO;
}
void HTTPDataSource::applyTimeoutResponse() {
string timeout;
if (mHttp->find_header_value("X-SocketTimeout", &timeout)) {
const char *s = timeout.c_str();
char *end;
long tmp = strtol(s, &end, 10);
if (end == s || *end != '\0') {
LOGW("Illegal X-SocketTimeout value given.");
return;
}
LOGI("overriding default timeout, new timeout is %ld seconds", tmp);
mHttp->setReceiveTimeout(tmp);
}
}
HTTPDataSource::HTTPDataSource(
const char *uri, const KeyedVector<String8, String8> *headers) {
CHECK(!strncasecmp("http://", uri, 7));
string host;
string path;
int port;
char *slash = strchr(uri + 7, '/');
if (slash == NULL) {
host = uri + 7;
path = "/";
} else {
host = string(uri + 7, slash - (uri + 7));
path = slash;
}
char *colon = strchr(host.c_str(), ':');
if (colon == NULL) {
port = 80;
} else {
char *end;
long tmp = strtol(colon + 1, &end, 10);
CHECK(end > colon + 1);
CHECK(tmp > 0 && tmp < 65536);
port = tmp;
host = string(host, 0, colon - host.c_str());
}
mStartingHost = host.c_str();
mStartingPath = path.c_str();
mStartingPort = port;
init(headers);
}
HTTPDataSource::HTTPDataSource(
const char *_host, int port, const char *_path,
const KeyedVector<String8, String8> *headers) {
mStartingHost = _host;
mStartingPath = _path;
mStartingPort = port;
init(headers);
}
void HTTPDataSource::init(const KeyedVector<String8, String8> *headers) {
mState = DISCONNECTED;
mHttp = new HTTPStream;
initHeaders(headers);
mBuffer = malloc(kBufferSize);
mNumRetriesLeft = kMaxNumRetries;
}
status_t HTTPDataSource::connect() {
{
Mutex::Autolock autoLock(mStateLock);
if (mState != DISCONNECTED) {
return ERROR_ALREADY_CONNECTED;
}
mState = CONNECTING;
}
mBufferLength = 0;
mBufferOffset = 0;
mContentLengthValid = false;
status_t err = connectWithRedirectsAndRange(0);
if (err != OK) {
Mutex::Autolock autoLock(mStateLock);
if (mState != CONNECTING) {
LOGV("connect() cancelled");
}
mState = DISCONNECTED;
return err;
}
string value;
if (mHttp->find_header_value("Content-Length", &value)) {
char *end;
mContentLength = strtoull(value.c_str(), &end, 10);
mContentLengthValid = true;
}
Mutex::Autolock autoLock(mStateLock);
if (mState != CONNECTING) {
// disconnect was called when we had just successfully connected.
LOGV("connect() cancelled (we had just succeeded connecting)");
mHttp->disconnect();
return UNKNOWN_ERROR;
}
mState = CONNECTED;
return OK;
}
void HTTPDataSource::disconnect() {
Mutex::Autolock autoLock(mStateLock);
if (mState == CONNECTING || mState == CONNECTED) {
mHttp->disconnect();
mState = DISCONNECTED;
}
}
status_t HTTPDataSource::initCheck() const {
Mutex::Autolock autoLock(mStateLock);
return (mState == CONNECTED) ? (status_t)OK : ERROR_NOT_CONNECTED;
}
status_t HTTPDataSource::getSize(off_t *size) {
*size = 0;
{
Mutex::Autolock autoLock(mStateLock);
if (mState != CONNECTED) {
return ERROR_NOT_CONNECTED;
}
}
if (!mContentLengthValid) {
return ERROR_UNSUPPORTED;
}
*size = mContentLength;
return OK;
}
HTTPDataSource::~HTTPDataSource() {
disconnect();
delete mHttp;
mHttp = NULL;
free(mBuffer);
mBuffer = NULL;
}
ssize_t HTTPDataSource::sendRangeRequest(size_t offset) {
status_t err = connectWithRedirectsAndRange(offset);
if (err != OK) {
return err;
}
string value;
if (!mHttp->find_header_value("Content-Length", &value)) {
return kBufferSize;
}
char *end;
unsigned long contentLength = strtoul(value.c_str(), &end, 10);
return contentLength;
}
ssize_t HTTPDataSource::readAt(off_t offset, void *data, size_t size) {
LOGV("readAt %ld, size %d", offset, size);
rinse_repeat:
{
Mutex::Autolock autoLock(mStateLock);
if (mState != CONNECTED) {
return ERROR_NOT_CONNECTED;
}
}
if (offset >= mBufferOffset
&& offset < (off_t)(mBufferOffset + mBufferLength)) {
size_t num_bytes_available = mBufferLength - (offset - mBufferOffset);
size_t copy = num_bytes_available;
if (copy > size) {
copy = size;
}
memcpy(data, (const char *)mBuffer + (offset - mBufferOffset), copy);
if (copy < size) {
LOGV("short read (1), returning %d vs. %d requested", copy, size);
}
return copy;
}
ssize_t contentLength = 0;
if (offset != (off_t)(mBufferOffset + mBufferLength)) {
LOGV("new range offset=%ld (old=%ld)",
offset, mBufferOffset + mBufferLength);
mHttp->disconnect();
contentLength = sendRangeRequest(offset);
if (contentLength > kBufferSize) {
contentLength = kBufferSize;
}
} else {
contentLength = kBufferSize;
}
mBufferOffset = offset;
if (mContentLengthValid
&& mBufferOffset + contentLength >= (off_t)mContentLength) {
// If we never triggered a range request but know the content length,
// make sure to not read more data than there could be, otherwise
// we'd block indefinitely if the server doesn't close the connection.
contentLength = mContentLength - mBufferOffset;
}
if (contentLength <= 0) {
return contentLength;
}
ssize_t num_bytes_received = mHttp->receive(mBuffer, contentLength);
if (num_bytes_received < 0
|| (mContentLengthValid && num_bytes_received < contentLength)) {
if (mNumRetriesLeft-- > 0) {
mHttp->disconnect();
mBufferLength = 0;
num_bytes_received = connectWithRedirectsAndRange(mBufferOffset);
if (num_bytes_received == OK) {
LOGI("retrying connection succeeded.");
goto rinse_repeat;
}
LOGE("retrying connection failed");
}
mBufferLength = 0;
return num_bytes_received;
}
mBufferLength = (size_t)num_bytes_received;
size_t copy = mBufferLength;
if (copy > size) {
copy = size;
}
memcpy(data, mBuffer, copy);
return copy;
}
void HTTPDataSource::initHeaders(
const KeyedVector<String8, String8> *overrides) {
mHeaders = String8();
mHeaders.append("User-Agent: stagefright/1.0 (Linux;Android ");
#if (PROPERTY_VALUE_MAX < 8)
#error "PROPERTY_VALUE_MAX must be at least 8"
#endif
char value[PROPERTY_VALUE_MAX];
property_get("ro.build.version.release", value, "Unknown");
mHeaders.append(value);
mHeaders.append(")\r\n");
if (overrides == NULL) {
return;
}
for (size_t i = 0; i < overrides->size(); ++i) {
String8 line;
line.append(overrides->keyAt(i));
line.append(": ");
line.append(overrides->valueAt(i));
line.append("\r\n");
mHeaders.append(line);
}
}
uint32_t HTTPDataSource::flags() {
uint32_t f = kWantsPrefetching;
if (!strcasecmp(mStartingHost.string(), "localhost")
|| !strcmp(mStartingHost.string(), "127.0.0.1")) {
f |= kStreamedFromLocalHost;
}
return f;
}
} // namespace android

View File

@@ -1,474 +0,0 @@
/*
* Copyright (C) 2010 The Android Open Source Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#define LOG_TAG "Prefetcher"
//#define LOG_NDEBUG 0
#include <utils/Log.h>
#include "include/Prefetcher.h"
#include <media/stagefright/MediaBuffer.h>
#include <media/stagefright/MediaDebug.h>
#include <media/stagefright/MediaErrors.h>
#include <media/stagefright/MediaSource.h>
#include <media/stagefright/MetaData.h>
#include <utils/List.h>
namespace android {
struct PrefetchedSource : public MediaSource {
PrefetchedSource(
size_t index,
const sp<MediaSource> &source);
virtual status_t start(MetaData *params);
virtual status_t stop();
virtual status_t read(
MediaBuffer **buffer, const ReadOptions *options);
virtual sp<MetaData> getFormat();
protected:
virtual ~PrefetchedSource();
private:
friend struct Prefetcher;
Mutex mLock;
Condition mCondition;
sp<MediaSource> mSource;
size_t mIndex;
bool mStarted;
bool mReachedEOS;
status_t mFinalStatus;
int64_t mSeekTimeUs;
int64_t mCacheDurationUs;
size_t mCacheSizeBytes;
bool mPrefetcherStopped;
bool mCurrentlyPrefetching;
List<MediaBuffer *> mCachedBuffers;
// Returns true iff source is currently caching.
bool getCacheDurationUs(int64_t *durationUs, size_t *totalSize = NULL);
void updateCacheDuration_l();
void clearCache_l();
void cacheMore();
void onPrefetcherStopped();
PrefetchedSource(const PrefetchedSource &);
PrefetchedSource &operator=(const PrefetchedSource &);
};
Prefetcher::Prefetcher()
: mDone(false),
mThreadExited(false) {
startThread();
}
Prefetcher::~Prefetcher() {
stopThread();
}
sp<MediaSource> Prefetcher::addSource(const sp<MediaSource> &source) {
Mutex::Autolock autoLock(mLock);
sp<PrefetchedSource> psource =
new PrefetchedSource(mSources.size(), source);
mSources.add(psource);
return psource;
}
void Prefetcher::startThread() {
mThreadExited = false;
mDone = false;
int res = androidCreateThreadEtc(
ThreadWrapper, this, "Prefetcher",
ANDROID_PRIORITY_DEFAULT, 0, &mThread);
CHECK_EQ(res, 1);
}
void Prefetcher::stopThread() {
Mutex::Autolock autoLock(mLock);
while (!mThreadExited) {
mDone = true;
mCondition.signal();
mCondition.wait(mLock);
}
}
// static
int Prefetcher::ThreadWrapper(void *me) {
static_cast<Prefetcher *>(me)->threadFunc();
return 0;
}
// Cache at most 1 min for each source.
static int64_t kMaxCacheDurationUs = 60 * 1000000ll;
// At the same time cache at most 5MB per source.
static size_t kMaxCacheSizeBytes = 5 * 1024 * 1024;
// If the amount of cached data drops below this,
// fill the cache up to the max duration again.
static int64_t kLowWaterDurationUs = 5000000ll;
void Prefetcher::threadFunc() {
bool fillingCache = false;
for (;;) {
sp<PrefetchedSource> minSource;
int64_t minCacheDurationUs = -1;
{
Mutex::Autolock autoLock(mLock);
if (mDone) {
break;
}
mCondition.waitRelative(
mLock, fillingCache ? 10000000ll : 1000000000ll);
ssize_t minIndex = -1;
for (size_t i = 0; i < mSources.size(); ++i) {
sp<PrefetchedSource> source = mSources[i].promote();
if (source == NULL) {
continue;
}
int64_t cacheDurationUs;
size_t cacheSizeBytes;
if (!source->getCacheDurationUs(&cacheDurationUs, &cacheSizeBytes)) {
continue;
}
if (cacheSizeBytes > kMaxCacheSizeBytes) {
LOGI("max cache size reached");
continue;
}
if (mSources.size() > 1 && cacheDurationUs >= kMaxCacheDurationUs) {
LOGI("max duration reached, size = %d bytes", cacheSizeBytes);
continue;
}
if (minIndex < 0 || cacheDurationUs < minCacheDurationUs) {
minCacheDurationUs = cacheDurationUs;
minIndex = i;
minSource = source;
}
}
if (minIndex < 0) {
if (fillingCache) {
LOGV("[%p] done filling the cache, above high water mark.",
this);
fillingCache = false;
}
continue;
}
}
if (!fillingCache && minCacheDurationUs < kLowWaterDurationUs) {
LOGI("[%p] cache below low water mark, filling cache.", this);
fillingCache = true;
}
if (fillingCache) {
// Make sure not to hold the lock while calling into the source.
// The lock guards the list of sources, not the individual sources
// themselves.
minSource->cacheMore();
}
}
Mutex::Autolock autoLock(mLock);
for (size_t i = 0; i < mSources.size(); ++i) {
sp<PrefetchedSource> source = mSources[i].promote();
if (source == NULL) {
continue;
}
source->onPrefetcherStopped();
}
mThreadExited = true;
mCondition.signal();
}
int64_t Prefetcher::getCachedDurationUs(bool *noMoreData) {
Mutex::Autolock autoLock(mLock);
int64_t minCacheDurationUs = -1;
ssize_t minIndex = -1;
bool anySourceActive = false;
for (size_t i = 0; i < mSources.size(); ++i) {
int64_t cacheDurationUs;
sp<PrefetchedSource> source = mSources[i].promote();
if (source == NULL) {
continue;
}
if (source->getCacheDurationUs(&cacheDurationUs)) {
anySourceActive = true;
}
if (minIndex < 0 || cacheDurationUs < minCacheDurationUs) {
minCacheDurationUs = cacheDurationUs;
minIndex = i;
}
}
if (noMoreData) {
*noMoreData = !anySourceActive;
}
return minCacheDurationUs < 0 ? 0 : minCacheDurationUs;
}
status_t Prefetcher::prepare(
bool (*continueFunc)(void *cookie), void *cookie) {
// Fill the cache.
int64_t duration;
bool noMoreData;
do {
usleep(100000);
if (continueFunc && !(*continueFunc)(cookie)) {
return -EINTR;
}
duration = getCachedDurationUs(&noMoreData);
} while (!noMoreData && duration < 2000000ll);
return OK;
}
////////////////////////////////////////////////////////////////////////////////
PrefetchedSource::PrefetchedSource(
size_t index,
const sp<MediaSource> &source)
: mSource(source),
mIndex(index),
mStarted(false),
mReachedEOS(false),
mSeekTimeUs(0),
mCacheDurationUs(0),
mCacheSizeBytes(0),
mPrefetcherStopped(false),
mCurrentlyPrefetching(false) {
}
PrefetchedSource::~PrefetchedSource() {
if (mStarted) {
stop();
}
}
status_t PrefetchedSource::start(MetaData *params) {
CHECK(!mStarted);
Mutex::Autolock autoLock(mLock);
status_t err = mSource->start(params);
if (err != OK) {
return err;
}
mStarted = true;
return OK;
}
status_t PrefetchedSource::stop() {
CHECK(mStarted);
Mutex::Autolock autoLock(mLock);
while (mCurrentlyPrefetching) {
mCondition.wait(mLock);
}
clearCache_l();
status_t err = mSource->stop();
mStarted = false;
return err;
}
status_t PrefetchedSource::read(
MediaBuffer **out, const ReadOptions *options) {
*out = NULL;
Mutex::Autolock autoLock(mLock);
CHECK(mStarted);
int64_t seekTimeUs;
if (options && options->getSeekTo(&seekTimeUs)) {
CHECK(seekTimeUs >= 0);
clearCache_l();
mReachedEOS = false;
mSeekTimeUs = seekTimeUs;
}
while (!mPrefetcherStopped && !mReachedEOS && mCachedBuffers.empty()) {
mCondition.wait(mLock);
}
if (mCachedBuffers.empty()) {
return mReachedEOS ? mFinalStatus : ERROR_END_OF_STREAM;
}
*out = *mCachedBuffers.begin();
mCachedBuffers.erase(mCachedBuffers.begin());
updateCacheDuration_l();
mCacheSizeBytes -= (*out)->size();
return OK;
}
sp<MetaData> PrefetchedSource::getFormat() {
return mSource->getFormat();
}
bool PrefetchedSource::getCacheDurationUs(
int64_t *durationUs, size_t *totalSize) {
Mutex::Autolock autoLock(mLock);
*durationUs = mCacheDurationUs;
if (totalSize != NULL) {
*totalSize = mCacheSizeBytes;
}
if (!mStarted || mReachedEOS) {
return false;
}
return true;
}
void PrefetchedSource::cacheMore() {
MediaSource::ReadOptions options;
Mutex::Autolock autoLock(mLock);
if (!mStarted) {
return;
}
mCurrentlyPrefetching = true;
if (mSeekTimeUs >= 0) {
options.setSeekTo(mSeekTimeUs);
mSeekTimeUs = -1;
}
// Ensure our object does not go away while we're not holding
// the lock.
sp<PrefetchedSource> me = this;
mLock.unlock();
MediaBuffer *buffer;
status_t err = mSource->read(&buffer, &options);
mLock.lock();
if (err != OK) {
mCurrentlyPrefetching = false;
mReachedEOS = true;
mFinalStatus = err;
mCondition.signal();
return;
}
CHECK(buffer != NULL);
MediaBuffer *copy = new MediaBuffer(buffer->range_length());
memcpy(copy->data(),
(const uint8_t *)buffer->data() + buffer->range_offset(),
buffer->range_length());
sp<MetaData> from = buffer->meta_data();
sp<MetaData> to = copy->meta_data();
int64_t timeUs;
if (from->findInt64(kKeyTime, &timeUs)) {
to->setInt64(kKeyTime, timeUs);
}
buffer->release();
buffer = NULL;
mCachedBuffers.push_back(copy);
updateCacheDuration_l();
mCacheSizeBytes += copy->size();
mCurrentlyPrefetching = false;
mCondition.signal();
}
void PrefetchedSource::updateCacheDuration_l() {
if (mCachedBuffers.size() < 2) {
mCacheDurationUs = 0;
} else {
int64_t firstTimeUs, lastTimeUs;
CHECK((*mCachedBuffers.begin())->meta_data()->findInt64(
kKeyTime, &firstTimeUs));
CHECK((*--mCachedBuffers.end())->meta_data()->findInt64(
kKeyTime, &lastTimeUs));
mCacheDurationUs = lastTimeUs - firstTimeUs;
}
}
void PrefetchedSource::clearCache_l() {
List<MediaBuffer *>::iterator it = mCachedBuffers.begin();
while (it != mCachedBuffers.end()) {
(*it)->release();
it = mCachedBuffers.erase(it);
}
updateCacheDuration_l();
mCacheSizeBytes = 0;
}
void PrefetchedSource::onPrefetcherStopped() {
Mutex::Autolock autoLock(mLock);
mPrefetcherStopped = true;
mCondition.signal();
}
} // namespace android

View File

@@ -18,11 +18,11 @@
#define AWESOME_PLAYER_H_
#include "NuHTTPDataSource.h"
#include "TimedEventQueue.h"
#include <media/MediaPlayerInterface.h>
#include <media/stagefright/DataSource.h>
#include <media/stagefright/HTTPDataSource.h>
#include <media/stagefright/OMXClient.h>
#include <utils/threads.h>
@@ -33,8 +33,8 @@ struct DataSource;
struct MediaBuffer;
struct MediaExtractor;
struct MediaSource;
struct Prefetcher;
struct TimeSource;
struct NuCachedSource2;
struct ALooper;
struct ARTSPController;
@@ -101,6 +101,7 @@ private:
PREPARED = 16,
AT_EOS = 32,
PREPARE_CANCELLED = 64,
CACHE_UNDERRUN = 128,
};
mutable Mutex mLock;
@@ -169,8 +170,8 @@ private:
MediaBuffer *mLastVideoBuffer;
MediaBuffer *mVideoBuffer;
sp<Prefetcher> mPrefetcher;
sp<HTTPDataSource> mConnectingDataSource;
sp<NuHTTPDataSource> mConnectingDataSource;
sp<NuCachedSource2> mCachedSource;
sp<ALooper> mLooper;
sp<ARTSPController> mRTSPController;

View File

@@ -1,70 +0,0 @@
/*
* Copyright (C) 2010 The Android Open Source Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef PREFETCHER_H_
#define PREFETCHER_H_
#include <utils/RefBase.h>
#include <utils/Vector.h>
#include <utils/threads.h>
namespace android {
struct MediaSource;
struct PrefetchedSource;
struct Prefetcher : public RefBase {
Prefetcher();
// Given an existing MediaSource returns a new MediaSource
// that will benefit from prefetching/caching the original one.
sp<MediaSource> addSource(const sp<MediaSource> &source);
int64_t getCachedDurationUs(bool *noMoreData = NULL);
// If provided (non-NULL), "continueFunc" will be called repeatedly
// while preparing and preparation will finish early if it returns
// false. In this case "-EINTR" is returned as a result.
status_t prepare(
bool (*continueFunc)(void *cookie) = NULL,
void *cookie = NULL);
protected:
virtual ~Prefetcher();
private:
Mutex mLock;
Condition mCondition;
Vector<wp<PrefetchedSource> > mSources;
android_thread_id_t mThread;
bool mDone;
bool mThreadExited;
void startThread();
void stopThread();
static int ThreadWrapper(void *me);
void threadFunc();
Prefetcher(const Prefetcher &);
Prefetcher &operator=(const Prefetcher &);
};
} // namespace android
#endif // PREFETCHER_H_