diff --git a/cmds/statsd/src/StatsService.h b/cmds/statsd/src/StatsService.h index 774a3e94e05bc..b5957b57a4c7b 100644 --- a/cmds/statsd/src/StatsService.h +++ b/cmds/statsd/src/StatsService.h @@ -272,6 +272,10 @@ private: FRIEND_TEST(PartialBucketE2eTest, TestCountMetricSplitOnUpgrade); FRIEND_TEST(PartialBucketE2eTest, TestCountMetricSplitOnRemoval); FRIEND_TEST(PartialBucketE2eTest, TestCountMetricWithoutSplit); + FRIEND_TEST(PartialBucketE2eTest, TestValueMetricWithoutMinPartialBucket); + FRIEND_TEST(PartialBucketE2eTest, TestValueMetricWithMinPartialBucket); + FRIEND_TEST(PartialBucketE2eTest, TestGaugeMetricWithoutMinPartialBucket); + FRIEND_TEST(PartialBucketE2eTest, TestGaugeMetricWithMinPartialBucket); }; } // namespace statsd diff --git a/cmds/statsd/src/metrics/GaugeMetricProducer.cpp b/cmds/statsd/src/metrics/GaugeMetricProducer.cpp index 6886f7c2e87a9..12708567043bf 100644 --- a/cmds/statsd/src/metrics/GaugeMetricProducer.cpp +++ b/cmds/statsd/src/metrics/GaugeMetricProducer.cpp @@ -47,6 +47,9 @@ const int FIELD_ID_ID = 1; const int FIELD_ID_GAUGE_METRICS = 8; // for GaugeMetricDataWrapper const int FIELD_ID_DATA = 1; +const int FIELD_ID_SKIPPED = 2; +const int FIELD_ID_SKIPPED_START = 1; +const int FIELD_ID_SKIPPED_END = 2; // for GaugeMetricData const int FIELD_ID_DIMENSION_IN_WHAT = 1; const int FIELD_ID_DIMENSION_IN_CONDITION = 2; @@ -66,6 +69,7 @@ GaugeMetricProducer::GaugeMetricProducer(const ConfigKey& key, const GaugeMetric : MetricProducer(metric.id(), key, timeBaseNs, conditionIndex, wizard), mStatsPullerManager(statsPullerManager), mPullTagId(pullTagId), + mMinBucketSizeNs(metric.min_bucket_size_nanos()), mDimensionSoftLimit(StatsdStats::kAtomDimensionKeySizeLimitMap.find(pullTagId) != StatsdStats::kAtomDimensionKeySizeLimitMap.end() ? StatsdStats::kAtomDimensionKeySizeLimitMap.at(pullTagId).first @@ -174,6 +178,15 @@ void GaugeMetricProducer::onDumpReportLocked(const int64_t dumpTimeNs, protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_ID, (long long)mMetricId); uint64_t protoToken = protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_ID_GAUGE_METRICS); + for (const auto& pair : mSkippedBuckets) { + uint64_t wrapperToken = + protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_SKIPPED); + protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_SKIPPED_START, (long long)pair.first); + protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_SKIPPED_END, (long long)pair.second); + protoOutput->end(wrapperToken); + } + mSkippedBuckets.clear(); + for (const auto& pair : mPastBuckets) { const MetricDimensionKey& dimensionKey = pair.first; @@ -440,12 +453,16 @@ void GaugeMetricProducer::flushCurrentBucketLocked(const int64_t& eventTimeNs) { info.mBucketEndNs = fullBucketEndTimeNs; } - for (const auto& slice : *mCurrentSlicedBucket) { - info.mGaugeAtoms = slice.second; - auto& bucketList = mPastBuckets[slice.first]; - bucketList.push_back(info); - VLOG("Gauge gauge metric %lld, dump key value: %s", (long long)mMetricId, - slice.first.toString().c_str()); + if (info.mBucketEndNs - mCurrentBucketStartTimeNs >= mMinBucketSizeNs) { + for (const auto& slice : *mCurrentSlicedBucket) { + info.mGaugeAtoms = slice.second; + auto& bucketList = mPastBuckets[slice.first]; + bucketList.push_back(info); + VLOG("Gauge gauge metric %lld, dump key value: %s", (long long)mMetricId, + slice.first.toString().c_str()); + } + } else { + mSkippedBuckets.emplace_back(info.mBucketStartNs, info.mBucketEndNs); } // If we have anomaly trackers, we need to update the partial bucket values. diff --git a/cmds/statsd/src/metrics/GaugeMetricProducer.h b/cmds/statsd/src/metrics/GaugeMetricProducer.h index 08765c28dbc39..71d5912df6ae1 100644 --- a/cmds/statsd/src/metrics/GaugeMetricProducer.h +++ b/cmds/statsd/src/metrics/GaugeMetricProducer.h @@ -136,6 +136,11 @@ private: // this slice (ie, for partial buckets, we use the last partial bucket in this full bucket). std::shared_ptr mCurrentSlicedBucketForAnomaly; + // Pairs of (elapsed start, elapsed end) denoting buckets that were skipped. + std::list> mSkippedBuckets; + + const int64_t mMinBucketSizeNs; + // Translate Atom based bucket to single numeric value bucket for anomaly and updates the map // for each slice with the latest value. void updateCurrentSlicedBucketForAnomaly(); diff --git a/cmds/statsd/src/metrics/ValueMetricProducer.cpp b/cmds/statsd/src/metrics/ValueMetricProducer.cpp index 844c72801ea61..27fd78f4ed94d 100644 --- a/cmds/statsd/src/metrics/ValueMetricProducer.cpp +++ b/cmds/statsd/src/metrics/ValueMetricProducer.cpp @@ -50,6 +50,9 @@ const int FIELD_ID_ID = 1; const int FIELD_ID_VALUE_METRICS = 7; // for ValueMetricDataWrapper const int FIELD_ID_DATA = 1; +const int FIELD_ID_SKIPPED = 2; +const int FIELD_ID_SKIPPED_START = 1; +const int FIELD_ID_SKIPPED_END = 2; // for ValueMetricData const int FIELD_ID_DIMENSION_IN_WHAT = 1; const int FIELD_ID_DIMENSION_IN_CONDITION = 2; @@ -69,6 +72,7 @@ ValueMetricProducer::ValueMetricProducer(const ConfigKey& key, const ValueMetric mValueField(metric.value_field()), mStatsPullerManager(statsPullerManager), mPullTagId(pullTagId), + mMinBucketSizeNs(metric.min_bucket_size_nanos()), mDimensionSoftLimit(StatsdStats::kAtomDimensionKeySizeLimitMap.find(pullTagId) != StatsdStats::kAtomDimensionKeySizeLimitMap.end() ? StatsdStats::kAtomDimensionKeySizeLimitMap.at(pullTagId).first @@ -156,12 +160,21 @@ void ValueMetricProducer::onDumpReportLocked(const int64_t dumpTimeNs, } else { flushIfNeededLocked(dumpTimeNs); } - if (mPastBuckets.empty()) { + if (mPastBuckets.empty() && mSkippedBuckets.empty()) { return; } protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_ID, (long long)mMetricId); uint64_t protoToken = protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_ID_VALUE_METRICS); + for (const auto& pair : mSkippedBuckets) { + uint64_t wrapperToken = + protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_SKIPPED); + protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_SKIPPED_START, (long long)pair.first); + protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_SKIPPED_END, (long long)pair.second); + protoOutput->end(wrapperToken); + } + mSkippedBuckets.clear(); + for (const auto& pair : mPastBuckets) { const MetricDimensionKey& dimensionKey = pair.first; VLOG(" dimension key %s", dimensionKey.toString().c_str()); @@ -391,18 +404,23 @@ void ValueMetricProducer::flushCurrentBucketLocked(const int64_t& eventTimeNs) { info.mBucketEndNs = fullBucketEndTimeNs; } - int tainted = 0; - for (const auto& slice : mCurrentSlicedBucket) { - tainted += slice.second.tainted; - tainted += slice.second.startUpdated; - if (slice.second.hasValue) { - info.mValue = slice.second.sum; - // it will auto create new vector of ValuebucketInfo if the key is not found. - auto& bucketList = mPastBuckets[slice.first]; - bucketList.push_back(info); + if (info.mBucketEndNs - mCurrentBucketStartTimeNs >= mMinBucketSizeNs) { + // The current bucket is large enough to keep. + int tainted = 0; + for (const auto& slice : mCurrentSlicedBucket) { + tainted += slice.second.tainted; + tainted += slice.second.startUpdated; + if (slice.second.hasValue) { + info.mValue = slice.second.sum; + // it will auto create new vector of ValuebucketInfo if the key is not found. + auto& bucketList = mPastBuckets[slice.first]; + bucketList.push_back(info); + } } + VLOG("%d tainted pairs in the bucket", tainted); + } else { + mSkippedBuckets.emplace_back(info.mBucketStartNs, info.mBucketEndNs); } - VLOG("%d tainted pairs in the bucket", tainted); if (eventTimeNs > fullBucketEndTimeNs) { // If full bucket, send to anomaly tracker. // Accumulate partial buckets with current value and then send to anomaly tracker. diff --git a/cmds/statsd/src/metrics/ValueMetricProducer.h b/cmds/statsd/src/metrics/ValueMetricProducer.h index 9c5a56c7b40ed..8df30d3de2844 100644 --- a/cmds/statsd/src/metrics/ValueMetricProducer.h +++ b/cmds/statsd/src/metrics/ValueMetricProducer.h @@ -148,6 +148,11 @@ private: // TODO: Add a lock to mPastBuckets. std::unordered_map> mPastBuckets; + // Pairs of (elapsed start, elapsed end) denoting buckets that were skipped. + std::list> mSkippedBuckets; + + const int64_t mMinBucketSizeNs; + // Util function to check whether the specified dimension hits the guardrail. bool hitGuardRailLocked(const MetricDimensionKey& newKey); diff --git a/cmds/statsd/src/packages/UidMap.cpp b/cmds/statsd/src/packages/UidMap.cpp index b3425a4d7a8f0..8eb53276e7507 100644 --- a/cmds/statsd/src/packages/UidMap.cpp +++ b/cmds/statsd/src/packages/UidMap.cpp @@ -166,6 +166,8 @@ void UidMap::updateApp(const int64_t& timestamp, const String16& app_16, const i } else { // Only notify the listeners if this is an app upgrade. If this app is being installed // for the first time, then we don't notify the listeners. + // It's also OK to split again if we're forming a partial bucket after re-installing an + // app after deletion. getListenerListCopyLocked(&broadcastList); } mChanges.emplace_back(false, timestamp, appName, uid, versionCode, prevVersion); diff --git a/cmds/statsd/src/stats_log.proto b/cmds/statsd/src/stats_log.proto index eaa7bf1343e04..e1b6386a55c14 100644 --- a/cmds/statsd/src/stats_log.proto +++ b/cmds/statsd/src/stats_log.proto @@ -121,6 +121,11 @@ message StatsLogReport { // Fields 2 and 3 are reserved. + message SkippedBuckets { + optional int64 start_elapsed_nanos = 1; + optional int64 end_elapsed_nanos = 2; + } + message EventMetricDataWrapper { repeated EventMetricData data = 1; } @@ -132,10 +137,12 @@ message StatsLogReport { } message ValueMetricDataWrapper { repeated ValueMetricData data = 1; + repeated SkippedBuckets skipped = 2; } message GaugeMetricDataWrapper { repeated GaugeMetricData data = 1; + repeated SkippedBuckets skipped = 2; } oneof data { diff --git a/cmds/statsd/src/statsd_config.proto b/cmds/statsd/src/statsd_config.proto index 870f92e11c0a4..fd365602427d3 100644 --- a/cmds/statsd/src/statsd_config.proto +++ b/cmds/statsd/src/statsd_config.proto @@ -236,6 +236,8 @@ message GaugeMetric { ALL_CONDITION_CHANGES = 2; } optional SamplingType sampling_type = 9 [default = RANDOM_ONE_SAMPLE] ; + + optional int64 min_bucket_size_nanos = 10; } message ValueMetric { @@ -259,6 +261,8 @@ message ValueMetric { SUM = 1; } optional AggregationType aggregation_type = 8 [default = SUM]; + + optional int64 min_bucket_size_nanos = 10; } message Alert { diff --git a/cmds/statsd/tests/e2e/PartialBucket_e2e_test.cpp b/cmds/statsd/tests/e2e/PartialBucket_e2e_test.cpp index af07f6bee3509..a34aaf059ce53 100644 --- a/cmds/statsd/tests/e2e/PartialBucket_e2e_test.cpp +++ b/cmds/statsd/tests/e2e/PartialBucket_e2e_test.cpp @@ -14,6 +14,7 @@ #include +#include #include "src/StatsLogProcessor.h" #include "src/StatsService.h" #include "src/stats_log_util.h" @@ -39,9 +40,13 @@ void SendConfig(StatsService& service, const StatsdConfig& config) { service.addConfiguration(kConfigKey, configAsVec, String16(kAndroid.c_str())); } -ConfigMetricsReport GetReports(StatsService& service) { +ConfigMetricsReport GetReports(sp processor, int64_t timestamp, + bool include_current = false) { vector output; - service.getData(kConfigKey, String16(kAndroid.c_str()), &output); + IPCThreadState* ipc = IPCThreadState::self(); + ConfigKey configKey(ipc->getCallingUid(), kConfigKey); + processor->onDumpReport(configKey, timestamp, include_current /* include_current_bucket*/, + &output); ConfigMetricsReportList reports; reports.ParseFromArray(output.data(), output.size()); EXPECT_EQ(1, reports.reports_size()); @@ -61,6 +66,47 @@ StatsdConfig MakeConfig() { return config; } +StatsdConfig MakeValueMetricConfig(int64_t minTime) { + StatsdConfig config; + config.add_allowed_log_source("AID_ROOT"); // LogEvent defaults to UID of root. + + auto temperatureAtomMatcher = CreateTemperatureAtomMatcher(); + *config.add_atom_matcher() = temperatureAtomMatcher; + *config.add_atom_matcher() = CreateScreenTurnedOnAtomMatcher(); + *config.add_atom_matcher() = CreateScreenTurnedOffAtomMatcher(); + + auto valueMetric = config.add_value_metric(); + valueMetric->set_id(123456); + valueMetric->set_what(temperatureAtomMatcher.id()); + *valueMetric->mutable_value_field() = + CreateDimensions(android::util::TEMPERATURE, {3 /* temperature degree field */}); + *valueMetric->mutable_dimensions_in_what() = + CreateDimensions(android::util::TEMPERATURE, {2 /* sensor name field */}); + valueMetric->set_bucket(FIVE_MINUTES); + valueMetric->set_min_bucket_size_nanos(minTime); + return config; +} + +StatsdConfig MakeGaugeMetricConfig(int64_t minTime) { + StatsdConfig config; + config.add_allowed_log_source("AID_ROOT"); // LogEvent defaults to UID of root. + + auto temperatureAtomMatcher = CreateTemperatureAtomMatcher(); + *config.add_atom_matcher() = temperatureAtomMatcher; + *config.add_atom_matcher() = CreateScreenTurnedOnAtomMatcher(); + *config.add_atom_matcher() = CreateScreenTurnedOffAtomMatcher(); + + auto gaugeMetric = config.add_gauge_metric(); + gaugeMetric->set_id(123456); + gaugeMetric->set_what(temperatureAtomMatcher.id()); + gaugeMetric->mutable_gauge_fields_filter()->set_include_all(true); + *gaugeMetric->mutable_dimensions_in_what() = + CreateDimensions(android::util::TEMPERATURE, {2 /* sensor name field */}); + gaugeMetric->set_bucket(FIVE_MINUTES); + gaugeMetric->set_min_bucket_size_nanos(minTime); + return config; +} + TEST(PartialBucketE2eTest, TestCountMetricWithoutSplit) { StatsService service(nullptr); SendConfig(service, MakeConfig()); @@ -70,7 +116,7 @@ TEST(PartialBucketE2eTest, TestCountMetricWithoutSplit) { service.mProcessor->OnLogEvent(CreateAppCrashEvent(100, start + 1).get()); service.mProcessor->OnLogEvent(CreateAppCrashEvent(100, start + 2).get()); - ConfigMetricsReport report = GetReports(service); + ConfigMetricsReport report = GetReports(service.mProcessor, start + 3); // Expect no metrics since the bucket has not finished yet. EXPECT_EQ(0, report.metrics_size()); } @@ -89,7 +135,7 @@ TEST(PartialBucketE2eTest, TestCountMetricNoSplitOnNewApp) { // Goes into the second bucket. service.mProcessor->OnLogEvent(CreateAppCrashEvent(100, start + 3).get()); - ConfigMetricsReport report = GetReports(service); + ConfigMetricsReport report = GetReports(service.mProcessor, start + 4); EXPECT_EQ(0, report.metrics_size()); } @@ -106,7 +152,7 @@ TEST(PartialBucketE2eTest, TestCountMetricSplitOnUpgrade) { // Goes into the second bucket. service.mProcessor->OnLogEvent(CreateAppCrashEvent(100, start + 3).get()); - ConfigMetricsReport report = GetReports(service); + ConfigMetricsReport report = GetReports(service.mProcessor, start + 4); EXPECT_EQ(1, report.metrics_size()); EXPECT_EQ(1, report.metrics(0).count_metrics().data(0).bucket_info(0).count()); } @@ -124,11 +170,85 @@ TEST(PartialBucketE2eTest, TestCountMetricSplitOnRemoval) { // Goes into the second bucket. service.mProcessor->OnLogEvent(CreateAppCrashEvent(100, start + 3).get()); - ConfigMetricsReport report = GetReports(service); + ConfigMetricsReport report = GetReports(service.mProcessor, start + 4); EXPECT_EQ(1, report.metrics_size()); EXPECT_EQ(1, report.metrics(0).count_metrics().data(0).bucket_info(0).count()); } +TEST(PartialBucketE2eTest, TestValueMetricWithoutMinPartialBucket) { + StatsService service(nullptr); + // Partial buckets don't occur when app is first installed. + service.mUidMap->updateApp(1, String16(kApp1.c_str()), 1, 1); + SendConfig(service, MakeValueMetricConfig(0)); + const long start = getElapsedRealtimeNs(); // This is the start-time the metrics producers are + // initialized with. + + service.mProcessor->informPullAlarmFired(5 * 60 * NS_PER_SEC + start); + service.mUidMap->updateApp(5 * 60 * NS_PER_SEC + start + 2, String16(kApp1.c_str()), 1, 2); + + ConfigMetricsReport report = + GetReports(service.mProcessor, 5 * 60 * NS_PER_SEC + start + 100, true); + EXPECT_EQ(1, report.metrics_size()); + EXPECT_EQ(0, report.metrics(0).value_metrics().skipped_size()); +} + +TEST(PartialBucketE2eTest, TestValueMetricWithMinPartialBucket) { + StatsService service(nullptr); + // Partial buckets don't occur when app is first installed. + service.mUidMap->updateApp(1, String16(kApp1.c_str()), 1, 1); + SendConfig(service, MakeValueMetricConfig(60 * NS_PER_SEC /* One minute */)); + const long start = getElapsedRealtimeNs(); // This is the start-time the metrics producers are + // initialized with. + + const int64_t endSkipped = 5 * 60 * NS_PER_SEC + start + 2; + service.mProcessor->informPullAlarmFired(5 * 60 * NS_PER_SEC + start); + service.mUidMap->updateApp(endSkipped, String16(kApp1.c_str()), 1, 2); + + ConfigMetricsReport report = + GetReports(service.mProcessor, 5 * 60 * NS_PER_SEC + start + 100 * NS_PER_SEC, true); + EXPECT_EQ(1, report.metrics_size()); + EXPECT_EQ(1, report.metrics(0).value_metrics().skipped_size()); + // Can't test the start time since it will be based on the actual time when the pulling occurs. + EXPECT_EQ(endSkipped, report.metrics(0).value_metrics().skipped(0).end_elapsed_nanos()); +} + +TEST(PartialBucketE2eTest, TestGaugeMetricWithoutMinPartialBucket) { + StatsService service(nullptr); + // Partial buckets don't occur when app is first installed. + service.mUidMap->updateApp(1, String16(kApp1.c_str()), 1, 1); + SendConfig(service, MakeGaugeMetricConfig(0)); + const long start = getElapsedRealtimeNs(); // This is the start-time the metrics producers are + // initialized with. + + service.mProcessor->informPullAlarmFired(5 * 60 * NS_PER_SEC + start); + service.mUidMap->updateApp(5 * 60 * NS_PER_SEC + start + 2, String16(kApp1.c_str()), 1, 2); + + ConfigMetricsReport report = + GetReports(service.mProcessor, 5 * 60 * NS_PER_SEC + start + 100, true); + EXPECT_EQ(1, report.metrics_size()); + EXPECT_EQ(0, report.metrics(0).gauge_metrics().skipped_size()); +} + +TEST(PartialBucketE2eTest, TestGaugeMetricWithMinPartialBucket) { + StatsService service(nullptr); + // Partial buckets don't occur when app is first installed. + service.mUidMap->updateApp(1, String16(kApp1.c_str()), 1, 1); + SendConfig(service, MakeGaugeMetricConfig(60 * NS_PER_SEC /* One minute */)); + const long start = getElapsedRealtimeNs(); // This is the start-time the metrics producers are + // initialized with. + + const int64_t endSkipped = 5 * 60 * NS_PER_SEC + start + 2; + service.mProcessor->informPullAlarmFired(5 * 60 * NS_PER_SEC + start); + service.mUidMap->updateApp(endSkipped, String16(kApp1.c_str()), 1, 2); + + ConfigMetricsReport report = + GetReports(service.mProcessor, 5 * 60 * NS_PER_SEC + start + 100 * NS_PER_SEC, true); + EXPECT_EQ(1, report.metrics_size()); + EXPECT_EQ(1, report.metrics(0).gauge_metrics().skipped_size()); + // Can't test the start time since it will be based on the actual time when the pulling occurs. + EXPECT_EQ(endSkipped, report.metrics(0).gauge_metrics().skipped(0).end_elapsed_nanos()); +} + #else GTEST_LOG_(INFO) << "This test does nothing.\n"; #endif