Merge "Fix StatsCompanionService pull on bucket ends" into pi-dev

This commit is contained in:
TreeHugger Robot
2018-03-29 17:00:25 +00:00
committed by Android (Google) Code Review
16 changed files with 235 additions and 167 deletions

View File

@@ -79,8 +79,6 @@ StatsLogProcessor::StatsLogProcessor(const sp<UidMap>& uidMap,
mSendBroadcast(sendBroadcast),
mTimeBaseSec(timeBaseSec),
mLastLogTimestamp(0) {
StatsPullerManager statsPullerManager;
statsPullerManager.SetTimeBaseSec(mTimeBaseSec);
}
StatsLogProcessor::~StatsLogProcessor() {
@@ -177,7 +175,7 @@ void StatsLogProcessor::OnLogEvent(LogEvent* event) {
uint64_t curTimeSec = getElapsedRealtimeSec();
if (curTimeSec - mLastPullerCacheClearTimeSec > StatsdStats::kPullerCacheClearIntervalSec) {
mStatsPullerManager.ClearPullerCacheIfNecessary(curTimeSec);
mStatsPullerManager.ClearPullerCacheIfNecessary(curTimeSec * NS_PER_SEC);
mLastPullerCacheClearTimeSec = curTimeSec;
}

View File

@@ -595,7 +595,7 @@ status_t StatsService::cmd_log_app_breadcrumb(FILE* out, const Vector<String8>&
status_t StatsService::cmd_print_pulled_metrics(FILE* out, const Vector<String8>& args) {
int s = atoi(args[1].c_str());
vector<shared_ptr<LogEvent> > stats;
if (mStatsPullerManager.Pull(s, &stats)) {
if (mStatsPullerManager.Pull(s, getElapsedRealtimeNs(), &stats)) {
for (const auto& it : stats) {
fprintf(out, "Pull from %d: %s\n", s, it->ToString().c_str());
}

View File

@@ -35,26 +35,31 @@ void StatsPuller::SetUidMap(const sp<UidMap>& uidMap) { mUidMap = uidMap; }
// ValueMetric has a minimum bucket size of 10min so that we don't pull too frequently
StatsPuller::StatsPuller(const int tagId)
: mTagId(tagId) {
mCoolDownSec = StatsPullerManagerImpl::kAllPullAtomInfo.find(tagId)->second.coolDownSec;
VLOG("Puller for tag %d created. Cooldown set to %ld", mTagId, mCoolDownSec);
mCoolDownNs = StatsPullerManagerImpl::kAllPullAtomInfo.find(tagId)->second.coolDownNs;
VLOG("Puller for tag %d created. Cooldown set to %lld", mTagId, (long long)mCoolDownNs);
}
bool StatsPuller::Pull(std::vector<std::shared_ptr<LogEvent>>* data) {
bool StatsPuller::Pull(const int64_t elapsedTimeNs, std::vector<std::shared_ptr<LogEvent>>* data) {
lock_guard<std::mutex> lock(mLock);
int64_t wallClockTimeNs = getWallClockNs();
StatsdStats::getInstance().notePull(mTagId);
long curTime = getElapsedRealtimeSec();
if (curTime - mLastPullTimeSec < mCoolDownSec) {
if (elapsedTimeNs - mLastPullTimeNs < mCoolDownNs) {
(*data) = mCachedData;
StatsdStats::getInstance().notePullFromCache(mTagId);
return true;
}
if (mMinPullIntervalSec > curTime - mLastPullTimeSec) {
mMinPullIntervalSec = curTime - mLastPullTimeSec;
StatsdStats::getInstance().updateMinPullIntervalSec(mTagId, mMinPullIntervalSec);
if (mMinPullIntervalNs > elapsedTimeNs - mLastPullTimeNs) {
mMinPullIntervalNs = elapsedTimeNs - mLastPullTimeNs;
StatsdStats::getInstance().updateMinPullIntervalSec(mTagId,
mMinPullIntervalNs / NS_PER_SEC);
}
mCachedData.clear();
mLastPullTimeSec = curTime;
mLastPullTimeNs = elapsedTimeNs;
bool ret = PullInternal(&mCachedData);
for (const shared_ptr<LogEvent>& data : mCachedData) {
data->setElapsedTimestampNs(elapsedTimeNs);
data->setLogdWallClockTimestampNs(wallClockTimeNs);
}
if (ret) {
mergeIsolatedUidsToHostUid(mCachedData, mUidMap, mTagId);
(*data) = mCachedData;
@@ -70,12 +75,12 @@ int StatsPuller::clearCache() {
lock_guard<std::mutex> lock(mLock);
int ret = mCachedData.size();
mCachedData.clear();
mLastPullTimeSec = 0;
mLastPullTimeNs = 0;
return ret;
}
int StatsPuller::ClearCacheIfNecessary(long timestampSec) {
if (timestampSec - mLastPullTimeSec > mCoolDownSec) {
int StatsPuller::ClearCacheIfNecessary(int64_t timestampNs) {
if (timestampNs - mLastPullTimeNs > mCoolDownNs) {
return clearCache();
} else {
return 0;

View File

@@ -37,13 +37,13 @@ public:
virtual ~StatsPuller() {}
bool Pull(std::vector<std::shared_ptr<LogEvent>>* data);
bool Pull(const int64_t timeNs, std::vector<std::shared_ptr<LogEvent>>* data);
// Clear cache immediately
int ForceClearCache();
// Clear cache if elapsed time is more than cooldown time
int ClearCacheIfNecessary(long timestampSec);
int ClearCacheIfNecessary(int64_t timestampNs);
static void SetUidMap(const sp<UidMap>& uidMap);
@@ -59,9 +59,9 @@ private:
// If a pull request comes before cooldown, a cached version from purevious pull
// will be returned.
// The actual value should be determined by individual pullers.
long mCoolDownSec;
int64_t mCoolDownNs;
// For puller stats
long mMinPullIntervalSec = LONG_MAX;
int64_t mMinPullIntervalNs = LONG_MAX;
virtual bool PullInternal(std::vector<std::shared_ptr<LogEvent>>* data) = 0;
@@ -69,7 +69,7 @@ private:
// cached data will be returned.
std::vector<std::shared_ptr<LogEvent>> mCachedData;
long mLastPullTimeSec;
int64_t mLastPullTimeNs;
int clearCache();

View File

@@ -26,10 +26,9 @@ class StatsPullerManager {
public:
virtual ~StatsPullerManager() {}
virtual void RegisterReceiver(int tagId,
wp <PullDataReceiver> receiver,
long intervalMs) {
mPullerManager.RegisterReceiver(tagId, receiver, intervalMs);
virtual void RegisterReceiver(int tagId, wp<PullDataReceiver> receiver, int64_t nextPullTimeNs,
int64_t intervalNs) {
mPullerManager.RegisterReceiver(tagId, receiver, nextPullTimeNs, intervalNs);
};
virtual void UnRegisterReceiver(int tagId, wp <PullDataReceiver> receiver) {
@@ -45,13 +44,9 @@ class StatsPullerManager {
mPullerManager.OnAlarmFired();
}
virtual bool
Pull(const int tagId, vector<std::shared_ptr<LogEvent>>* data) {
return mPullerManager.Pull(tagId, data);
}
void SetTimeBaseSec(const long timeBaseSec) {
mPullerManager.SetTimeBaseSec(timeBaseSec);
virtual bool Pull(const int tagId, const int64_t timesNs,
vector<std::shared_ptr<LogEvent>>* data) {
return mPullerManager.Pull(tagId, timesNs, data);
}
int ForceClearPullerCache() {
@@ -62,8 +57,8 @@ class StatsPullerManager {
mPullerManager.SetStatsCompanionService(statsCompanionService);
}
int ClearPullerCacheIfNecessary(long timestampSec) {
return mPullerManager.ClearPullerCacheIfNecessary(timestampSec);
int ClearPullerCacheIfNecessary(int64_t timestampNs) {
return mPullerManager.ClearPullerCacheIfNecessary(timestampNs);
}
private:

View File

@@ -19,15 +19,17 @@
#include <android/os/IStatsCompanionService.h>
#include <cutils/log.h>
#include <math.h>
#include <algorithm>
#include <climits>
#include "../StatsService.h"
#include "../logd/LogEvent.h"
#include "../stats_log_util.h"
#include "../statscompanion_util.h"
#include "ResourceHealthManagerPuller.h"
#include "ResourceThermalManagerPuller.h"
#include "StatsCompanionServicePuller.h"
#include "StatsService.h"
#include "StatsPullerManagerImpl.h"
#include "SubsystemSleepStatePuller.h"
#include "statslog.h"
@@ -47,89 +49,136 @@ namespace statsd {
const std::map<int, PullAtomInfo> StatsPullerManagerImpl::kAllPullAtomInfo = {
// wifi_bytes_transfer
{android::util::WIFI_BYTES_TRANSFER,
{{2, 3, 4, 5}, {}, 1,
{{2, 3, 4, 5},
{},
1 * NS_PER_SEC,
new StatsCompanionServicePuller(android::util::WIFI_BYTES_TRANSFER)}},
// wifi_bytes_transfer_by_fg_bg
{android::util::WIFI_BYTES_TRANSFER_BY_FG_BG,
{{3, 4, 5, 6}, {2}, 1,
{{3, 4, 5, 6},
{2},
1 * NS_PER_SEC,
new StatsCompanionServicePuller(android::util::WIFI_BYTES_TRANSFER_BY_FG_BG)}},
// mobile_bytes_transfer
{android::util::MOBILE_BYTES_TRANSFER,
{{2, 3, 4, 5}, {}, 1,
{{2, 3, 4, 5},
{},
1 * NS_PER_SEC,
new StatsCompanionServicePuller(android::util::MOBILE_BYTES_TRANSFER)}},
// mobile_bytes_transfer_by_fg_bg
{android::util::MOBILE_BYTES_TRANSFER_BY_FG_BG,
{{3, 4, 5, 6}, {2}, 1,
{{3, 4, 5, 6},
{2},
1 * NS_PER_SEC,
new StatsCompanionServicePuller(android::util::MOBILE_BYTES_TRANSFER_BY_FG_BG)}},
// bluetooth_bytes_transfer
{android::util::BLUETOOTH_BYTES_TRANSFER,
{{2, 3}, {}, 1, new StatsCompanionServicePuller(android::util::BLUETOOTH_BYTES_TRANSFER)}},
{{2, 3},
{},
1 * NS_PER_SEC,
new StatsCompanionServicePuller(android::util::BLUETOOTH_BYTES_TRANSFER)}},
// kernel_wakelock
{android::util::KERNEL_WAKELOCK,
{{}, {}, 1, new StatsCompanionServicePuller(android::util::KERNEL_WAKELOCK)}},
{{}, {}, 1 * NS_PER_SEC, new StatsCompanionServicePuller(android::util::KERNEL_WAKELOCK)}},
// subsystem_sleep_state
{android::util::SUBSYSTEM_SLEEP_STATE,
{{}, {}, 1, new SubsystemSleepStatePuller()}},
{{}, {}, 1 * NS_PER_SEC, new SubsystemSleepStatePuller()}},
// cpu_time_per_freq
{android::util::CPU_TIME_PER_FREQ,
{{3}, {2}, 1, new StatsCompanionServicePuller(android::util::CPU_TIME_PER_FREQ)}},
{{3},
{2},
1 * NS_PER_SEC,
new StatsCompanionServicePuller(android::util::CPU_TIME_PER_FREQ)}},
// cpu_time_per_uid
{android::util::CPU_TIME_PER_UID,
{{2, 3}, {}, 1, new StatsCompanionServicePuller(android::util::CPU_TIME_PER_UID)}},
{{2, 3},
{},
1 * NS_PER_SEC,
new StatsCompanionServicePuller(android::util::CPU_TIME_PER_UID)}},
// cpu_time_per_uid_freq
// the throttling is 3sec, handled in frameworks/base/core/java/com/android/internal/os/KernelCpuProcReader
// the throttling is 3sec, handled in
// frameworks/base/core/java/com/android/internal/os/KernelCpuProcReader
{android::util::CPU_TIME_PER_UID_FREQ,
{{4}, {2,3}, 0, new StatsCompanionServicePuller(android::util::CPU_TIME_PER_UID_FREQ)}},
{{4},
{2, 3},
1 * NS_PER_SEC,
new StatsCompanionServicePuller(android::util::CPU_TIME_PER_UID_FREQ)}},
// cpu_active_time
// the throttling is 3sec, handled in frameworks/base/core/java/com/android/internal/os/KernelCpuProcReader
// the throttling is 3sec, handled in
// frameworks/base/core/java/com/android/internal/os/KernelCpuProcReader
{android::util::CPU_ACTIVE_TIME,
{{2}, {}, 0, new StatsCompanionServicePuller(android::util::CPU_ACTIVE_TIME)}},
{{2},
{},
1 * NS_PER_SEC,
new StatsCompanionServicePuller(android::util::CPU_ACTIVE_TIME)}},
// cpu_cluster_time
// the throttling is 3sec, handled in frameworks/base/core/java/com/android/internal/os/KernelCpuProcReader
// the throttling is 3sec, handled in
// frameworks/base/core/java/com/android/internal/os/KernelCpuProcReader
{android::util::CPU_CLUSTER_TIME,
{{3}, {2}, 0, new StatsCompanionServicePuller(android::util::CPU_CLUSTER_TIME)}},
{{3},
{2},
1 * NS_PER_SEC,
new StatsCompanionServicePuller(android::util::CPU_CLUSTER_TIME)}},
// wifi_activity_energy_info
{android::util::WIFI_ACTIVITY_ENERGY_INFO,
{{}, {}, 1, new StatsCompanionServicePuller(android::util::WIFI_ACTIVITY_ENERGY_INFO)}},
{{},
{},
1 * NS_PER_SEC,
new StatsCompanionServicePuller(android::util::WIFI_ACTIVITY_ENERGY_INFO)}},
// modem_activity_info
{android::util::MODEM_ACTIVITY_INFO,
{{}, {}, 1, new StatsCompanionServicePuller(android::util::MODEM_ACTIVITY_INFO)}},
{{},
{},
1 * NS_PER_SEC,
new StatsCompanionServicePuller(android::util::MODEM_ACTIVITY_INFO)}},
// bluetooth_activity_info
{android::util::BLUETOOTH_ACTIVITY_INFO,
{{}, {}, 1, new StatsCompanionServicePuller(android::util::BLUETOOTH_ACTIVITY_INFO)}},
{{},
{},
1 * NS_PER_SEC,
new StatsCompanionServicePuller(android::util::BLUETOOTH_ACTIVITY_INFO)}},
// system_elapsed_realtime
{android::util::SYSTEM_ELAPSED_REALTIME,
{{}, {}, 1, new StatsCompanionServicePuller(android::util::SYSTEM_ELAPSED_REALTIME)}},
{{},
{},
1 * NS_PER_SEC,
new StatsCompanionServicePuller(android::util::SYSTEM_ELAPSED_REALTIME)}},
// system_uptime
{android::util::SYSTEM_UPTIME,
{{}, {}, 1, new StatsCompanionServicePuller(android::util::SYSTEM_UPTIME)}},
{{}, {}, 1 * NS_PER_SEC, new StatsCompanionServicePuller(android::util::SYSTEM_UPTIME)}},
// disk_space
{android::util::DISK_SPACE,
{{}, {}, 1, new StatsCompanionServicePuller(android::util::DISK_SPACE)}},
{{}, {}, 1 * NS_PER_SEC, new StatsCompanionServicePuller(android::util::DISK_SPACE)}},
// remaining_battery_capacity
{android::util::REMAINING_BATTERY_CAPACITY,
{{}, {}, 1, new ResourceHealthManagerPuller(android::util::REMAINING_BATTERY_CAPACITY)}},
{{},
{},
1 * NS_PER_SEC,
new ResourceHealthManagerPuller(android::util::REMAINING_BATTERY_CAPACITY)}},
// full_battery_capacity
{android::util::FULL_BATTERY_CAPACITY,
{{}, {}, 1, new ResourceHealthManagerPuller(android::util::FULL_BATTERY_CAPACITY)}},
{{},
{},
1 * NS_PER_SEC,
new ResourceHealthManagerPuller(android::util::FULL_BATTERY_CAPACITY)}},
// process_memory_state
{android::util::PROCESS_MEMORY_STATE,
{{4,5,6,7,8},
{2,3},
0,
{{4, 5, 6, 7, 8},
{2, 3},
1 * NS_PER_SEC,
new StatsCompanionServicePuller(android::util::PROCESS_MEMORY_STATE)}},
// temperature
{android::util::TEMPERATURE, {{}, {}, 1, new ResourceThermalManagerPuller()}}};
StatsPullerManagerImpl::StatsPullerManagerImpl()
: mCurrentPullingInterval(LONG_MAX) {
StatsPullerManagerImpl::StatsPullerManagerImpl() : mNextPullTimeNs(LONG_MAX) {
}
bool StatsPullerManagerImpl::Pull(int tagId, vector<shared_ptr<LogEvent>>* data) {
bool StatsPullerManagerImpl::Pull(const int tagId, const int64_t timeNs,
vector<shared_ptr<LogEvent>>* data) {
VLOG("Initiating pulling %d", tagId);
if (kAllPullAtomInfo.find(tagId) != kAllPullAtomInfo.end()) {
bool ret = kAllPullAtomInfo.find(tagId)->second.puller->Pull(data);
bool ret = kAllPullAtomInfo.find(tagId)->second.puller->Pull(timeNs, data);
VLOG("pulled %d items", (int)data->size());
return ret;
} else {
@@ -148,12 +197,14 @@ bool StatsPullerManagerImpl::PullerForMatcherExists(int tagId) const {
}
void StatsPullerManagerImpl::updateAlarmLocked() {
long currentTimeMs = getElapsedRealtimeMillis();
long nextAlarmTimeMs = currentTimeMs + mCurrentPullingInterval -
(currentTimeMs - mTimeBaseSec * 1000) % mCurrentPullingInterval;
if (mNextPullTimeNs == LONG_MAX) {
VLOG("No need to set alarms. Skipping");
return;
}
sp<IStatsCompanionService> statsCompanionServiceCopy = mStatsCompanionService;
if (statsCompanionServiceCopy != nullptr) {
statsCompanionServiceCopy->setPullingAlarms(nextAlarmTimeMs, mCurrentPullingInterval);
statsCompanionServiceCopy->setPullingAlarm(mNextPullTimeNs / 1000000);
} else {
VLOG("StatsCompanionService not available. Alarm not set.");
}
@@ -174,7 +225,7 @@ void StatsPullerManagerImpl::SetStatsCompanionService(
}
void StatsPullerManagerImpl::RegisterReceiver(int tagId, wp<PullDataReceiver> receiver,
long intervalMs) {
int64_t nextPullTimeNs, int64_t intervalNs) {
AutoMutex _l(mLock);
auto& receivers = mReceivers[tagId];
for (auto it = receivers.begin(); it != receivers.end(); it++) {
@@ -185,21 +236,24 @@ void StatsPullerManagerImpl::RegisterReceiver(int tagId, wp<PullDataReceiver> re
}
ReceiverInfo receiverInfo;
receiverInfo.receiver = receiver;
receiverInfo.timeInfo.first = intervalMs;
receivers.push_back(receiverInfo);
// Round it to the nearest minutes. This is the limit of alarm manager.
// In practice, we should limit it higher.
long roundedIntervalMs = intervalMs/1000/60 * 1000 * 60;
// In practice, we should always have larger buckets.
int64_t roundedIntervalNs = intervalNs / NS_PER_SEC / 60 * NS_PER_SEC * 60;
// Scheduled pulling should be at least 1 min apart.
// This can be lower in cts tests, in which case we round it to 1 min.
if (roundedIntervalMs < 60 * 1000) {
roundedIntervalMs = 60 * 1000;
if (roundedIntervalNs < 60 * (int64_t)NS_PER_SEC) {
roundedIntervalNs = 60 * (int64_t)NS_PER_SEC;
}
receiverInfo.intervalNs = roundedIntervalNs;
receiverInfo.nextPullTimeNs = nextPullTimeNs;
receivers.push_back(receiverInfo);
// There is only one alarm for all pulled events. So only set it to the smallest denom.
if (roundedIntervalMs < mCurrentPullingInterval) {
VLOG("Updating pulling interval %ld", intervalMs);
mCurrentPullingInterval = roundedIntervalMs;
if (nextPullTimeNs < mNextPullTimeNs) {
VLOG("Updating next pull time %lld", (long long)mNextPullTimeNs);
mNextPullTimeNs = nextPullTimeNs;
updateAlarmLocked();
}
VLOG("Puller for tagId %d registered of %d", tagId, (int)receivers.size());
@@ -224,16 +278,22 @@ void StatsPullerManagerImpl::UnRegisterReceiver(int tagId, wp<PullDataReceiver>
void StatsPullerManagerImpl::OnAlarmFired() {
AutoMutex _l(mLock);
uint64_t currentTimeMs = getElapsedRealtimeMillis();
int64_t currentTimeNs = getElapsedRealtimeNs();
int64_t minNextPullTimeNs = LONG_MAX;
vector<pair<int, vector<ReceiverInfo*>>> needToPull =
vector<pair<int, vector<ReceiverInfo*>>>();
for (auto& pair : mReceivers) {
vector<ReceiverInfo*> receivers = vector<ReceiverInfo*>();
if (pair.second.size() != 0) {
for (auto& receiverInfo : pair.second) {
if (receiverInfo.timeInfo.first + receiverInfo.timeInfo.second > currentTimeMs) {
for (ReceiverInfo& receiverInfo : pair.second) {
if (receiverInfo.nextPullTimeNs < currentTimeNs) {
receivers.push_back(&receiverInfo);
} else {
if (receiverInfo.nextPullTimeNs < minNextPullTimeNs) {
minNextPullTimeNs = receiverInfo.nextPullTimeNs;
}
}
}
if (receivers.size() > 0) {
@@ -244,18 +304,29 @@ void StatsPullerManagerImpl::OnAlarmFired() {
for (const auto& pullInfo : needToPull) {
vector<shared_ptr<LogEvent>> data;
if (Pull(pullInfo.first, &data)) {
if (Pull(pullInfo.first, currentTimeNs, &data)) {
for (const auto& receiverInfo : pullInfo.second) {
sp<PullDataReceiver> receiverPtr = receiverInfo->receiver.promote();
if (receiverPtr != nullptr) {
receiverPtr->onDataPulled(data);
receiverInfo->timeInfo.second = currentTimeMs;
// we may have just come out of a coma, compute next pull time
receiverInfo->nextPullTimeNs =
ceil((double_t)(currentTimeNs - receiverInfo->nextPullTimeNs) /
receiverInfo->intervalNs) *
receiverInfo->intervalNs +
receiverInfo->nextPullTimeNs;
if (receiverInfo->nextPullTimeNs < minNextPullTimeNs) {
minNextPullTimeNs = receiverInfo->nextPullTimeNs;
}
} else {
VLOG("receiver already gone.");
}
}
}
}
mNextPullTimeNs = minNextPullTimeNs;
updateAlarmLocked();
}
int StatsPullerManagerImpl::ForceClearPullerCache() {
@@ -266,10 +337,10 @@ int StatsPullerManagerImpl::ForceClearPullerCache() {
return totalCleared;
}
int StatsPullerManagerImpl::ClearPullerCacheIfNecessary(long timestampSec) {
int StatsPullerManagerImpl::ClearPullerCacheIfNecessary(int64_t timestampNs) {
int totalCleared = 0;
for (const auto& pulledAtom : kAllPullAtomInfo) {
totalCleared += pulledAtom.second.puller->ClearCacheIfNecessary(timestampSec);
totalCleared += pulledAtom.second.puller->ClearCacheIfNecessary(timestampNs);
}
return totalCleared;
}

View File

@@ -41,7 +41,7 @@ typedef struct {
std::vector<int> nonAdditiveFields;
// How long should the puller wait before doing an actual pull again. Default
// 1 sec. Set this to 0 if this is handled elsewhere.
long coolDownSec = 1;
int64_t coolDownNs = 1 * NS_PER_SEC;
// The actual puller
sp<StatsPuller> puller;
} PullAtomInfo;
@@ -50,7 +50,8 @@ class StatsPullerManagerImpl : public virtual RefBase {
public:
static StatsPullerManagerImpl& GetInstance();
void RegisterReceiver(int tagId, wp<PullDataReceiver> receiver, long intervalMs);
void RegisterReceiver(int tagId, wp<PullDataReceiver> receiver, int64_t nextPullTimeNs,
int64_t intervalNs);
void UnRegisterReceiver(int tagId, wp<PullDataReceiver> receiver);
@@ -59,13 +60,11 @@ public:
void OnAlarmFired();
bool Pull(const int tagId, vector<std::shared_ptr<LogEvent>>* data);
void SetTimeBaseSec(long timeBaseSec) {mTimeBaseSec = timeBaseSec;};
bool Pull(const int tagId, const int64_t timeNs, vector<std::shared_ptr<LogEvent>>* data);
int ForceClearPullerCache();
int ClearPullerCacheIfNecessary(long timestampSec);
int ClearPullerCacheIfNecessary(int64_t timestampNs);
void SetStatsCompanionService(sp<IStatsCompanionService> statsCompanionService);
@@ -77,8 +76,8 @@ public:
sp<IStatsCompanionService> mStatsCompanionService = nullptr;
typedef struct {
// pull_interval_sec : last_pull_time_sec
std::pair<uint64_t, uint64_t> timeInfo;
int64_t nextPullTimeNs;
int64_t intervalNs;
wp<PullDataReceiver> receiver;
} ReceiverInfo;
@@ -90,12 +89,7 @@ public:
void updateAlarmLocked();
long mCurrentPullingInterval;
// for pulled metrics, it is important for the buckets to be aligned to multiple of smallest
// bucket size. All pulled metrics start pulling based on this time, so that they can be
// correctly attributed to the correct buckets.
long mTimeBaseSec;
int64_t mNextPullTimeNs;
};
} // namespace statsd

View File

@@ -112,7 +112,8 @@ GaugeMetricProducer::GaugeMetricProducer(const ConfigKey& key, const GaugeMetric
// Kicks off the puller immediately.
if (mPullTagId != -1 && mSamplingType == GaugeMetric::RANDOM_ONE_SAMPLE) {
mStatsPullerManager->RegisterReceiver(mPullTagId, this, bucketSizeMills);
mStatsPullerManager->RegisterReceiver(
mPullTagId, this, mCurrentBucketStartTimeNs + mBucketSizeNs, mBucketSizeNs);
}
VLOG("Gauge metric %lld created. bucket size %lld start_time: %lld sliced %d",
@@ -255,7 +256,7 @@ void GaugeMetricProducer::pullLocked() {
}
vector<std::shared_ptr<LogEvent>> allData;
if (!mStatsPullerManager->Pull(mPullTagId, &allData)) {
if (!mStatsPullerManager->Pull(mPullTagId, getElapsedRealtimeNs(), &allData)) {
ALOGE("Gauge Stats puller failed for tag: %d", mPullTagId);
return;
}

View File

@@ -110,10 +110,12 @@ ValueMetricProducer::ValueMetricProducer(const ConfigKey& key, const ValueMetric
}
mConditionSliced = (metric.links().size() > 0) || (mDimensionsInCondition.size() > 0);
if (!metric.has_condition() && mPullTagId != -1) {
VLOG("Setting up periodic pulling for %d", mPullTagId);
mStatsPullerManager->RegisterReceiver(mPullTagId, this, bucketSizeMills);
// Kicks off the puller immediately.
if (mPullTagId != -1) {
mStatsPullerManager->RegisterReceiver(
mPullTagId, this, mCurrentBucketStartTimeNs + mBucketSizeNs, mBucketSizeNs);
}
VLOG("value metric %lld created. bucket size %lld start_time: %lld",
(long long)metric.id(), (long long)mBucketSizeNs, (long long)mStartTimeNs);
}
@@ -194,26 +196,21 @@ void ValueMetricProducer::onDumpReportLocked(const uint64_t dumpTimeNs,
// TODO: Clear mDimensionKeyMap once the report is dumped.
}
void ValueMetricProducer::onConditionChangedLocked(const bool condition, const uint64_t eventTime) {
void ValueMetricProducer::onConditionChangedLocked(const bool condition,
const uint64_t eventTimeNs) {
mCondition = condition;
if (eventTime < mCurrentBucketStartTimeNs) {
VLOG("Skip event due to late arrival: %lld vs %lld", (long long)eventTime,
if (eventTimeNs < mCurrentBucketStartTimeNs) {
VLOG("Skip event due to late arrival: %lld vs %lld", (long long)eventTimeNs,
(long long)mCurrentBucketStartTimeNs);
return;
}
flushIfNeededLocked(eventTime);
flushIfNeededLocked(eventTimeNs);
if (mPullTagId != -1) {
if (mCondition == true) {
mStatsPullerManager->RegisterReceiver(mPullTagId, this, mBucketSizeNs / 1000 / 1000);
} else if (mCondition == false) {
mStatsPullerManager->UnRegisterReceiver(mPullTagId, this);
}
vector<shared_ptr<LogEvent>> allData;
if (mStatsPullerManager->Pull(mPullTagId, &allData)) {
if (mStatsPullerManager->Pull(mPullTagId, eventTimeNs, &allData)) {
if (allData.size() == 0) {
return;
}

View File

@@ -53,7 +53,7 @@ public:
if (mPullTagId != -1) {
vector<shared_ptr<LogEvent>> allData;
mStatsPullerManager->Pull(mPullTagId, &allData);
mStatsPullerManager->Pull(mPullTagId, eventTimeNs, &allData);
if (allData.size() == 0) {
// This shouldn't happen since this valuemetric is not useful now.
}

View File

@@ -221,7 +221,8 @@ void writeFieldValueTreeToStream(int tagId, const std::vector<FieldValue>& value
int64_t TimeUnitToBucketSizeInMillisGuardrailed(int uid, TimeUnit unit) {
int64_t bucketSizeMillis = TimeUnitToBucketSizeInMillis(unit);
if (bucketSizeMillis > 1000 && bucketSizeMillis < 5 * 60 * 1000LL && uid != AID_SHELL) {
if (bucketSizeMillis > 1000 && bucketSizeMillis < 5 * 60 * 1000LL && uid != AID_SHELL &&
uid != AID_ROOT) {
bucketSizeMillis = 5 * 60 * 1000LL;
}
return bucketSizeMillis;

View File

@@ -63,7 +63,7 @@ TEST(GaugeMetricProducerTest, TestNoCondition) {
// For now we still need this so that it doesn't do real pulling.
shared_ptr<MockStatsPullerManager> pullerManager =
make_shared<StrictMock<MockStatsPullerManager>>();
EXPECT_CALL(*pullerManager, RegisterReceiver(tagId, _, _)).WillOnce(Return());
EXPECT_CALL(*pullerManager, RegisterReceiver(tagId, _, _, _)).WillOnce(Return());
EXPECT_CALL(*pullerManager, UnRegisterReceiver(tagId, _)).WillOnce(Return());
GaugeMetricProducer gaugeProducer(kConfigKey, metric, -1 /*-1 meaning no condition*/, wizard,
@@ -213,10 +213,11 @@ TEST(GaugeMetricProducerTest, TestPulledWithUpgrade) {
shared_ptr<MockStatsPullerManager> pullerManager =
make_shared<StrictMock<MockStatsPullerManager>>();
EXPECT_CALL(*pullerManager, RegisterReceiver(tagId, _, _)).WillOnce(Return());
EXPECT_CALL(*pullerManager, RegisterReceiver(tagId, _, _, _)).WillOnce(Return());
EXPECT_CALL(*pullerManager, UnRegisterReceiver(tagId, _)).WillOnce(Return());
EXPECT_CALL(*pullerManager, Pull(tagId, _))
.WillOnce(Invoke([](int tagId, vector<std::shared_ptr<LogEvent>>* data) {
EXPECT_CALL(*pullerManager, Pull(tagId, _, _))
.WillOnce(Invoke([](int tagId, int64_t timeNs,
vector<std::shared_ptr<LogEvent>>* data) {
data->clear();
shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, eventUpgradeTimeNs);
event->write("some value");
@@ -281,10 +282,11 @@ TEST(GaugeMetricProducerTest, TestWithCondition) {
shared_ptr<MockStatsPullerManager> pullerManager =
make_shared<StrictMock<MockStatsPullerManager>>();
EXPECT_CALL(*pullerManager, RegisterReceiver(tagId, _, _)).WillOnce(Return());
EXPECT_CALL(*pullerManager, RegisterReceiver(tagId, _, _, _)).WillOnce(Return());
EXPECT_CALL(*pullerManager, UnRegisterReceiver(tagId, _)).WillOnce(Return());
EXPECT_CALL(*pullerManager, Pull(tagId, _))
.WillOnce(Invoke([](int tagId, vector<std::shared_ptr<LogEvent>>* data) {
EXPECT_CALL(*pullerManager, Pull(tagId, _, _))
.WillOnce(Invoke([](int tagId, int64_t timeNs,
vector<std::shared_ptr<LogEvent>>* data) {
data->clear();
shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucketStartTimeNs + 10);
event->write("some value");
@@ -372,10 +374,11 @@ TEST(GaugeMetricProducerTest, TestWithSlicedCondition) {
shared_ptr<MockStatsPullerManager> pullerManager =
make_shared<StrictMock<MockStatsPullerManager>>();
EXPECT_CALL(*pullerManager, RegisterReceiver(tagId, _, _)).WillOnce(Return());
EXPECT_CALL(*pullerManager, RegisterReceiver(tagId, _, _, _)).WillOnce(Return());
EXPECT_CALL(*pullerManager, UnRegisterReceiver(tagId, _)).WillOnce(Return());
EXPECT_CALL(*pullerManager, Pull(tagId, _))
.WillOnce(Invoke([](int tagId, vector<std::shared_ptr<LogEvent>>* data) {
EXPECT_CALL(*pullerManager, Pull(tagId, _, _))
.WillOnce(Invoke([](int tagId, int64_t timeNs,
vector<std::shared_ptr<LogEvent>>* data) {
data->clear();
shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucketStartTimeNs + 10);
event->write(1000);
@@ -420,7 +423,7 @@ TEST(GaugeMetricProducerTest, TestAnomalyDetection) {
shared_ptr<MockStatsPullerManager> pullerManager =
make_shared<StrictMock<MockStatsPullerManager>>();
EXPECT_CALL(*pullerManager, RegisterReceiver(tagId, _, _)).WillOnce(Return());
EXPECT_CALL(*pullerManager, RegisterReceiver(tagId, _, _, _)).WillOnce(Return());
EXPECT_CALL(*pullerManager, UnRegisterReceiver(tagId, _)).WillOnce(Return());
GaugeMetric metric;

View File

@@ -62,7 +62,7 @@ TEST(ValueMetricProducerTest, TestNonDimensionalEvents) {
// For now we still need this so that it doesn't do real pulling.
shared_ptr<MockStatsPullerManager> pullerManager =
make_shared<StrictMock<MockStatsPullerManager>>();
EXPECT_CALL(*pullerManager, RegisterReceiver(tagId, _, _)).WillOnce(Return());
EXPECT_CALL(*pullerManager, RegisterReceiver(tagId, _, _, _)).WillOnce(Return());
EXPECT_CALL(*pullerManager, UnRegisterReceiver(tagId, _)).WillOnce(Return());
ValueMetricProducer valueProducer(kConfigKey, metric, -1 /*-1 meaning no condition*/, wizard,
@@ -141,11 +141,12 @@ TEST(ValueMetricProducerTest, TestEventsWithNonSlicedCondition) {
sp<MockConditionWizard> wizard = new NaggyMock<MockConditionWizard>();
shared_ptr<MockStatsPullerManager> pullerManager =
make_shared<StrictMock<MockStatsPullerManager>>();
EXPECT_CALL(*pullerManager, RegisterReceiver(tagId, _, _)).WillOnce(Return());
EXPECT_CALL(*pullerManager, RegisterReceiver(tagId, _, _, _)).WillOnce(Return());
EXPECT_CALL(*pullerManager, UnRegisterReceiver(tagId, _)).WillRepeatedly(Return());
EXPECT_CALL(*pullerManager, Pull(tagId, _))
.WillOnce(Invoke([](int tagId, vector<std::shared_ptr<LogEvent>>* data) {
EXPECT_CALL(*pullerManager, Pull(tagId, _, _))
.WillOnce(Invoke([](int tagId, int64_t timeNs,
vector<std::shared_ptr<LogEvent>>* data) {
data->clear();
shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucketStartTimeNs + 10);
event->write(tagId);
@@ -154,7 +155,8 @@ TEST(ValueMetricProducerTest, TestEventsWithNonSlicedCondition) {
data->push_back(event);
return true;
}))
.WillOnce(Invoke([](int tagId, vector<std::shared_ptr<LogEvent>>* data) {
.WillOnce(Invoke([](int tagId, int64_t timeNs,
vector<std::shared_ptr<LogEvent>>* data) {
data->clear();
shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucket2StartTimeNs + 10);
event->write(tagId);
@@ -260,10 +262,11 @@ TEST(ValueMetricProducerTest, TestPulledValueWithUpgrade) {
sp<MockConditionWizard> wizard = new NaggyMock<MockConditionWizard>();
shared_ptr<MockStatsPullerManager> pullerManager =
make_shared<StrictMock<MockStatsPullerManager>>();
EXPECT_CALL(*pullerManager, RegisterReceiver(tagId, _, _)).WillOnce(Return());
EXPECT_CALL(*pullerManager, RegisterReceiver(tagId, _, _, _)).WillOnce(Return());
EXPECT_CALL(*pullerManager, UnRegisterReceiver(tagId, _)).WillOnce(Return());
EXPECT_CALL(*pullerManager, Pull(tagId, _))
.WillOnce(Invoke([](int tagId, vector<std::shared_ptr<LogEvent>>* data) {
EXPECT_CALL(*pullerManager, Pull(tagId, _, _))
.WillOnce(Invoke([](int tagId, int64_t timeNs,
vector<std::shared_ptr<LogEvent>>* data) {
data->clear();
shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucketStartTimeNs + 10);
event->write(tagId);

View File

@@ -35,9 +35,11 @@ public:
class MockStatsPullerManager : public StatsPullerManager {
public:
MOCK_METHOD3(RegisterReceiver, void(int tagId, wp<PullDataReceiver> receiver, long intervalMs));
MOCK_METHOD4(RegisterReceiver, void(int tagId, wp<PullDataReceiver> receiver,
int64_t nextPulltimeNs, int64_t intervalNs));
MOCK_METHOD2(UnRegisterReceiver, void(int tagId, wp<PullDataReceiver> receiver));
MOCK_METHOD2(Pull, bool(const int pullCode, vector<std::shared_ptr<LogEvent>>* data));
MOCK_METHOD3(Pull, bool(const int pullCode, const int64_t timeNs,
vector<std::shared_ptr<LogEvent>>* data));
};
class MockUidMap : public UidMap {

View File

@@ -47,10 +47,10 @@ interface IStatsCompanionService {
* Uses AlarmManager.setRepeating API, so if the timestamp is in past, alarm fires immediately,
* and alarm is inexact.
*/
oneway void setPullingAlarms(long timestampMs, long intervalMs);
oneway void setPullingAlarm(long nextPullTimeMs);
/** Cancel any repeating pulling alarm. */
oneway void cancelPullingAlarms();
oneway void cancelPullingAlarm();
/**
* Register an alarm when we want to trigger subscribers at the given

View File

@@ -466,34 +466,32 @@ public class StatsCompanionService extends IStatsCompanionService.Stub {
}
@Override // Binder call
public void setPullingAlarms(long timestampMs, long intervalMs) {
enforceCallingPermission();
if (DEBUG)
Slog.d(TAG, "Setting pulling alarm for " + timestampMs + " every " + intervalMs + "ms");
final long callingToken = Binder.clearCallingIdentity();
try {
// using ELAPSED_REALTIME, not ELAPSED_REALTIME_WAKEUP, so if device is asleep, will
// only fire when it awakens.
// This alarm is inexact, leaving its exactness completely up to the OS optimizations.
// TODO: totally inexact means that stats per bucket could be quite off. Is this okay?
mAlarmManager.setRepeating(AlarmManager.ELAPSED_REALTIME, timestampMs, intervalMs,
mPullingAlarmIntent);
} finally {
Binder.restoreCallingIdentity(callingToken);
}
public void setPullingAlarm(long nextPullTimeMs) {
enforceCallingPermission();
if (DEBUG)
Slog.d(TAG,
"Setting pulling alarm in about " + (nextPullTimeMs - SystemClock.elapsedRealtime()));
final long callingToken = Binder.clearCallingIdentity();
try {
// using ELAPSED_REALTIME, not ELAPSED_REALTIME_WAKEUP, so if device is asleep, will
// only fire when it awakens.
mAlarmManager.setExact(AlarmManager.ELAPSED_REALTIME, nextPullTimeMs, mPullingAlarmIntent);
} finally {
Binder.restoreCallingIdentity(callingToken);
}
}
@Override // Binder call
public void cancelPullingAlarms() {
enforceCallingPermission();
if (DEBUG)
Slog.d(TAG, "Cancelling pulling alarm");
final long callingToken = Binder.clearCallingIdentity();
try {
mAlarmManager.cancel(mPullingAlarmIntent);
} finally {
Binder.restoreCallingIdentity(callingToken);
}
public void cancelPullingAlarm() {
enforceCallingPermission();
if (DEBUG)
Slog.d(TAG, "Cancelling pulling alarm");
final long callingToken = Binder.clearCallingIdentity();
try {
mAlarmManager.cancel(mPullingAlarmIntent);
} finally {
Binder.restoreCallingIdentity(callingToken);
}
}
private void addNetworkStats(
@@ -1109,7 +1107,7 @@ public class StatsCompanionService extends IStatsCompanionService.Stub {
mContext.unregisterReceiver(mUserUpdateReceiver);
mContext.unregisterReceiver(mShutdownEventReceiver);
cancelAnomalyAlarm();
cancelPullingAlarms();
cancelPullingAlarm();
}
}