From 81245fd53a0bd627fa87e3a69dd667c7d6696ede Mon Sep 17 00:00:00 2001 From: David Chen Date: Thu, 12 Apr 2018 14:33:37 -0700 Subject: [PATCH] Adds option to drop small buckets for statsd. We notice that some of the pulled metrics have a ton of data, and during app upgrades, we're forming partial buckets that represent small periods of time but require many bytes of data. We now have an option to drop these buckets that are too short. Note that we still have to pull the data to keep the metrics for the next bucket correct. We include a new field in the value and gauge metric outputs so that it's easy to tell when a bucket was dropped. We drop the partial buckets also from anomaly detection since we should be computing anomalies from the same data that is reported. Test: Added unit-tests for value and gauge metrics. Bug: 77925710 Change-Id: Ic370496377c6afd380e02278a6c1ed8b521a2731 --- cmds/statsd/src/StatsService.h | 4 + .../src/metrics/GaugeMetricProducer.cpp | 29 +++- cmds/statsd/src/metrics/GaugeMetricProducer.h | 5 + .../src/metrics/ValueMetricProducer.cpp | 40 ++++-- cmds/statsd/src/metrics/ValueMetricProducer.h | 5 + cmds/statsd/src/packages/UidMap.cpp | 2 + cmds/statsd/src/stats_log.proto | 7 + cmds/statsd/src/statsd_config.proto | 4 + .../tests/e2e/PartialBucket_e2e_test.cpp | 132 +++++++++++++++++- 9 files changed, 205 insertions(+), 23 deletions(-) diff --git a/cmds/statsd/src/StatsService.h b/cmds/statsd/src/StatsService.h index 774a3e94e05bc..b5957b57a4c7b 100644 --- a/cmds/statsd/src/StatsService.h +++ b/cmds/statsd/src/StatsService.h @@ -272,6 +272,10 @@ private: FRIEND_TEST(PartialBucketE2eTest, TestCountMetricSplitOnUpgrade); FRIEND_TEST(PartialBucketE2eTest, TestCountMetricSplitOnRemoval); FRIEND_TEST(PartialBucketE2eTest, TestCountMetricWithoutSplit); + FRIEND_TEST(PartialBucketE2eTest, TestValueMetricWithoutMinPartialBucket); + FRIEND_TEST(PartialBucketE2eTest, TestValueMetricWithMinPartialBucket); + FRIEND_TEST(PartialBucketE2eTest, TestGaugeMetricWithoutMinPartialBucket); + FRIEND_TEST(PartialBucketE2eTest, TestGaugeMetricWithMinPartialBucket); }; } // namespace statsd diff --git a/cmds/statsd/src/metrics/GaugeMetricProducer.cpp b/cmds/statsd/src/metrics/GaugeMetricProducer.cpp index 6886f7c2e87a9..12708567043bf 100644 --- a/cmds/statsd/src/metrics/GaugeMetricProducer.cpp +++ b/cmds/statsd/src/metrics/GaugeMetricProducer.cpp @@ -47,6 +47,9 @@ const int FIELD_ID_ID = 1; const int FIELD_ID_GAUGE_METRICS = 8; // for GaugeMetricDataWrapper const int FIELD_ID_DATA = 1; +const int FIELD_ID_SKIPPED = 2; +const int FIELD_ID_SKIPPED_START = 1; +const int FIELD_ID_SKIPPED_END = 2; // for GaugeMetricData const int FIELD_ID_DIMENSION_IN_WHAT = 1; const int FIELD_ID_DIMENSION_IN_CONDITION = 2; @@ -66,6 +69,7 @@ GaugeMetricProducer::GaugeMetricProducer(const ConfigKey& key, const GaugeMetric : MetricProducer(metric.id(), key, timeBaseNs, conditionIndex, wizard), mStatsPullerManager(statsPullerManager), mPullTagId(pullTagId), + mMinBucketSizeNs(metric.min_bucket_size_nanos()), mDimensionSoftLimit(StatsdStats::kAtomDimensionKeySizeLimitMap.find(pullTagId) != StatsdStats::kAtomDimensionKeySizeLimitMap.end() ? StatsdStats::kAtomDimensionKeySizeLimitMap.at(pullTagId).first @@ -174,6 +178,15 @@ void GaugeMetricProducer::onDumpReportLocked(const int64_t dumpTimeNs, protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_ID, (long long)mMetricId); uint64_t protoToken = protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_ID_GAUGE_METRICS); + for (const auto& pair : mSkippedBuckets) { + uint64_t wrapperToken = + protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_SKIPPED); + protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_SKIPPED_START, (long long)pair.first); + protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_SKIPPED_END, (long long)pair.second); + protoOutput->end(wrapperToken); + } + mSkippedBuckets.clear(); + for (const auto& pair : mPastBuckets) { const MetricDimensionKey& dimensionKey = pair.first; @@ -440,12 +453,16 @@ void GaugeMetricProducer::flushCurrentBucketLocked(const int64_t& eventTimeNs) { info.mBucketEndNs = fullBucketEndTimeNs; } - for (const auto& slice : *mCurrentSlicedBucket) { - info.mGaugeAtoms = slice.second; - auto& bucketList = mPastBuckets[slice.first]; - bucketList.push_back(info); - VLOG("Gauge gauge metric %lld, dump key value: %s", (long long)mMetricId, - slice.first.toString().c_str()); + if (info.mBucketEndNs - mCurrentBucketStartTimeNs >= mMinBucketSizeNs) { + for (const auto& slice : *mCurrentSlicedBucket) { + info.mGaugeAtoms = slice.second; + auto& bucketList = mPastBuckets[slice.first]; + bucketList.push_back(info); + VLOG("Gauge gauge metric %lld, dump key value: %s", (long long)mMetricId, + slice.first.toString().c_str()); + } + } else { + mSkippedBuckets.emplace_back(info.mBucketStartNs, info.mBucketEndNs); } // If we have anomaly trackers, we need to update the partial bucket values. diff --git a/cmds/statsd/src/metrics/GaugeMetricProducer.h b/cmds/statsd/src/metrics/GaugeMetricProducer.h index 08765c28dbc39..71d5912df6ae1 100644 --- a/cmds/statsd/src/metrics/GaugeMetricProducer.h +++ b/cmds/statsd/src/metrics/GaugeMetricProducer.h @@ -136,6 +136,11 @@ private: // this slice (ie, for partial buckets, we use the last partial bucket in this full bucket). std::shared_ptr mCurrentSlicedBucketForAnomaly; + // Pairs of (elapsed start, elapsed end) denoting buckets that were skipped. + std::list> mSkippedBuckets; + + const int64_t mMinBucketSizeNs; + // Translate Atom based bucket to single numeric value bucket for anomaly and updates the map // for each slice with the latest value. void updateCurrentSlicedBucketForAnomaly(); diff --git a/cmds/statsd/src/metrics/ValueMetricProducer.cpp b/cmds/statsd/src/metrics/ValueMetricProducer.cpp index 844c72801ea61..27fd78f4ed94d 100644 --- a/cmds/statsd/src/metrics/ValueMetricProducer.cpp +++ b/cmds/statsd/src/metrics/ValueMetricProducer.cpp @@ -50,6 +50,9 @@ const int FIELD_ID_ID = 1; const int FIELD_ID_VALUE_METRICS = 7; // for ValueMetricDataWrapper const int FIELD_ID_DATA = 1; +const int FIELD_ID_SKIPPED = 2; +const int FIELD_ID_SKIPPED_START = 1; +const int FIELD_ID_SKIPPED_END = 2; // for ValueMetricData const int FIELD_ID_DIMENSION_IN_WHAT = 1; const int FIELD_ID_DIMENSION_IN_CONDITION = 2; @@ -69,6 +72,7 @@ ValueMetricProducer::ValueMetricProducer(const ConfigKey& key, const ValueMetric mValueField(metric.value_field()), mStatsPullerManager(statsPullerManager), mPullTagId(pullTagId), + mMinBucketSizeNs(metric.min_bucket_size_nanos()), mDimensionSoftLimit(StatsdStats::kAtomDimensionKeySizeLimitMap.find(pullTagId) != StatsdStats::kAtomDimensionKeySizeLimitMap.end() ? StatsdStats::kAtomDimensionKeySizeLimitMap.at(pullTagId).first @@ -156,12 +160,21 @@ void ValueMetricProducer::onDumpReportLocked(const int64_t dumpTimeNs, } else { flushIfNeededLocked(dumpTimeNs); } - if (mPastBuckets.empty()) { + if (mPastBuckets.empty() && mSkippedBuckets.empty()) { return; } protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_ID, (long long)mMetricId); uint64_t protoToken = protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_ID_VALUE_METRICS); + for (const auto& pair : mSkippedBuckets) { + uint64_t wrapperToken = + protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_SKIPPED); + protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_SKIPPED_START, (long long)pair.first); + protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_SKIPPED_END, (long long)pair.second); + protoOutput->end(wrapperToken); + } + mSkippedBuckets.clear(); + for (const auto& pair : mPastBuckets) { const MetricDimensionKey& dimensionKey = pair.first; VLOG(" dimension key %s", dimensionKey.toString().c_str()); @@ -391,18 +404,23 @@ void ValueMetricProducer::flushCurrentBucketLocked(const int64_t& eventTimeNs) { info.mBucketEndNs = fullBucketEndTimeNs; } - int tainted = 0; - for (const auto& slice : mCurrentSlicedBucket) { - tainted += slice.second.tainted; - tainted += slice.second.startUpdated; - if (slice.second.hasValue) { - info.mValue = slice.second.sum; - // it will auto create new vector of ValuebucketInfo if the key is not found. - auto& bucketList = mPastBuckets[slice.first]; - bucketList.push_back(info); + if (info.mBucketEndNs - mCurrentBucketStartTimeNs >= mMinBucketSizeNs) { + // The current bucket is large enough to keep. + int tainted = 0; + for (const auto& slice : mCurrentSlicedBucket) { + tainted += slice.second.tainted; + tainted += slice.second.startUpdated; + if (slice.second.hasValue) { + info.mValue = slice.second.sum; + // it will auto create new vector of ValuebucketInfo if the key is not found. + auto& bucketList = mPastBuckets[slice.first]; + bucketList.push_back(info); + } } + VLOG("%d tainted pairs in the bucket", tainted); + } else { + mSkippedBuckets.emplace_back(info.mBucketStartNs, info.mBucketEndNs); } - VLOG("%d tainted pairs in the bucket", tainted); if (eventTimeNs > fullBucketEndTimeNs) { // If full bucket, send to anomaly tracker. // Accumulate partial buckets with current value and then send to anomaly tracker. diff --git a/cmds/statsd/src/metrics/ValueMetricProducer.h b/cmds/statsd/src/metrics/ValueMetricProducer.h index 9c5a56c7b40ed..8df30d3de2844 100644 --- a/cmds/statsd/src/metrics/ValueMetricProducer.h +++ b/cmds/statsd/src/metrics/ValueMetricProducer.h @@ -148,6 +148,11 @@ private: // TODO: Add a lock to mPastBuckets. std::unordered_map> mPastBuckets; + // Pairs of (elapsed start, elapsed end) denoting buckets that were skipped. + std::list> mSkippedBuckets; + + const int64_t mMinBucketSizeNs; + // Util function to check whether the specified dimension hits the guardrail. bool hitGuardRailLocked(const MetricDimensionKey& newKey); diff --git a/cmds/statsd/src/packages/UidMap.cpp b/cmds/statsd/src/packages/UidMap.cpp index b3425a4d7a8f0..8eb53276e7507 100644 --- a/cmds/statsd/src/packages/UidMap.cpp +++ b/cmds/statsd/src/packages/UidMap.cpp @@ -166,6 +166,8 @@ void UidMap::updateApp(const int64_t& timestamp, const String16& app_16, const i } else { // Only notify the listeners if this is an app upgrade. If this app is being installed // for the first time, then we don't notify the listeners. + // It's also OK to split again if we're forming a partial bucket after re-installing an + // app after deletion. getListenerListCopyLocked(&broadcastList); } mChanges.emplace_back(false, timestamp, appName, uid, versionCode, prevVersion); diff --git a/cmds/statsd/src/stats_log.proto b/cmds/statsd/src/stats_log.proto index eaa7bf1343e04..e1b6386a55c14 100644 --- a/cmds/statsd/src/stats_log.proto +++ b/cmds/statsd/src/stats_log.proto @@ -121,6 +121,11 @@ message StatsLogReport { // Fields 2 and 3 are reserved. + message SkippedBuckets { + optional int64 start_elapsed_nanos = 1; + optional int64 end_elapsed_nanos = 2; + } + message EventMetricDataWrapper { repeated EventMetricData data = 1; } @@ -132,10 +137,12 @@ message StatsLogReport { } message ValueMetricDataWrapper { repeated ValueMetricData data = 1; + repeated SkippedBuckets skipped = 2; } message GaugeMetricDataWrapper { repeated GaugeMetricData data = 1; + repeated SkippedBuckets skipped = 2; } oneof data { diff --git a/cmds/statsd/src/statsd_config.proto b/cmds/statsd/src/statsd_config.proto index 870f92e11c0a4..fd365602427d3 100644 --- a/cmds/statsd/src/statsd_config.proto +++ b/cmds/statsd/src/statsd_config.proto @@ -236,6 +236,8 @@ message GaugeMetric { ALL_CONDITION_CHANGES = 2; } optional SamplingType sampling_type = 9 [default = RANDOM_ONE_SAMPLE] ; + + optional int64 min_bucket_size_nanos = 10; } message ValueMetric { @@ -259,6 +261,8 @@ message ValueMetric { SUM = 1; } optional AggregationType aggregation_type = 8 [default = SUM]; + + optional int64 min_bucket_size_nanos = 10; } message Alert { diff --git a/cmds/statsd/tests/e2e/PartialBucket_e2e_test.cpp b/cmds/statsd/tests/e2e/PartialBucket_e2e_test.cpp index af07f6bee3509..a34aaf059ce53 100644 --- a/cmds/statsd/tests/e2e/PartialBucket_e2e_test.cpp +++ b/cmds/statsd/tests/e2e/PartialBucket_e2e_test.cpp @@ -14,6 +14,7 @@ #include +#include #include "src/StatsLogProcessor.h" #include "src/StatsService.h" #include "src/stats_log_util.h" @@ -39,9 +40,13 @@ void SendConfig(StatsService& service, const StatsdConfig& config) { service.addConfiguration(kConfigKey, configAsVec, String16(kAndroid.c_str())); } -ConfigMetricsReport GetReports(StatsService& service) { +ConfigMetricsReport GetReports(sp processor, int64_t timestamp, + bool include_current = false) { vector output; - service.getData(kConfigKey, String16(kAndroid.c_str()), &output); + IPCThreadState* ipc = IPCThreadState::self(); + ConfigKey configKey(ipc->getCallingUid(), kConfigKey); + processor->onDumpReport(configKey, timestamp, include_current /* include_current_bucket*/, + &output); ConfigMetricsReportList reports; reports.ParseFromArray(output.data(), output.size()); EXPECT_EQ(1, reports.reports_size()); @@ -61,6 +66,47 @@ StatsdConfig MakeConfig() { return config; } +StatsdConfig MakeValueMetricConfig(int64_t minTime) { + StatsdConfig config; + config.add_allowed_log_source("AID_ROOT"); // LogEvent defaults to UID of root. + + auto temperatureAtomMatcher = CreateTemperatureAtomMatcher(); + *config.add_atom_matcher() = temperatureAtomMatcher; + *config.add_atom_matcher() = CreateScreenTurnedOnAtomMatcher(); + *config.add_atom_matcher() = CreateScreenTurnedOffAtomMatcher(); + + auto valueMetric = config.add_value_metric(); + valueMetric->set_id(123456); + valueMetric->set_what(temperatureAtomMatcher.id()); + *valueMetric->mutable_value_field() = + CreateDimensions(android::util::TEMPERATURE, {3 /* temperature degree field */}); + *valueMetric->mutable_dimensions_in_what() = + CreateDimensions(android::util::TEMPERATURE, {2 /* sensor name field */}); + valueMetric->set_bucket(FIVE_MINUTES); + valueMetric->set_min_bucket_size_nanos(minTime); + return config; +} + +StatsdConfig MakeGaugeMetricConfig(int64_t minTime) { + StatsdConfig config; + config.add_allowed_log_source("AID_ROOT"); // LogEvent defaults to UID of root. + + auto temperatureAtomMatcher = CreateTemperatureAtomMatcher(); + *config.add_atom_matcher() = temperatureAtomMatcher; + *config.add_atom_matcher() = CreateScreenTurnedOnAtomMatcher(); + *config.add_atom_matcher() = CreateScreenTurnedOffAtomMatcher(); + + auto gaugeMetric = config.add_gauge_metric(); + gaugeMetric->set_id(123456); + gaugeMetric->set_what(temperatureAtomMatcher.id()); + gaugeMetric->mutable_gauge_fields_filter()->set_include_all(true); + *gaugeMetric->mutable_dimensions_in_what() = + CreateDimensions(android::util::TEMPERATURE, {2 /* sensor name field */}); + gaugeMetric->set_bucket(FIVE_MINUTES); + gaugeMetric->set_min_bucket_size_nanos(minTime); + return config; +} + TEST(PartialBucketE2eTest, TestCountMetricWithoutSplit) { StatsService service(nullptr); SendConfig(service, MakeConfig()); @@ -70,7 +116,7 @@ TEST(PartialBucketE2eTest, TestCountMetricWithoutSplit) { service.mProcessor->OnLogEvent(CreateAppCrashEvent(100, start + 1).get()); service.mProcessor->OnLogEvent(CreateAppCrashEvent(100, start + 2).get()); - ConfigMetricsReport report = GetReports(service); + ConfigMetricsReport report = GetReports(service.mProcessor, start + 3); // Expect no metrics since the bucket has not finished yet. EXPECT_EQ(0, report.metrics_size()); } @@ -89,7 +135,7 @@ TEST(PartialBucketE2eTest, TestCountMetricNoSplitOnNewApp) { // Goes into the second bucket. service.mProcessor->OnLogEvent(CreateAppCrashEvent(100, start + 3).get()); - ConfigMetricsReport report = GetReports(service); + ConfigMetricsReport report = GetReports(service.mProcessor, start + 4); EXPECT_EQ(0, report.metrics_size()); } @@ -106,7 +152,7 @@ TEST(PartialBucketE2eTest, TestCountMetricSplitOnUpgrade) { // Goes into the second bucket. service.mProcessor->OnLogEvent(CreateAppCrashEvent(100, start + 3).get()); - ConfigMetricsReport report = GetReports(service); + ConfigMetricsReport report = GetReports(service.mProcessor, start + 4); EXPECT_EQ(1, report.metrics_size()); EXPECT_EQ(1, report.metrics(0).count_metrics().data(0).bucket_info(0).count()); } @@ -124,11 +170,85 @@ TEST(PartialBucketE2eTest, TestCountMetricSplitOnRemoval) { // Goes into the second bucket. service.mProcessor->OnLogEvent(CreateAppCrashEvent(100, start + 3).get()); - ConfigMetricsReport report = GetReports(service); + ConfigMetricsReport report = GetReports(service.mProcessor, start + 4); EXPECT_EQ(1, report.metrics_size()); EXPECT_EQ(1, report.metrics(0).count_metrics().data(0).bucket_info(0).count()); } +TEST(PartialBucketE2eTest, TestValueMetricWithoutMinPartialBucket) { + StatsService service(nullptr); + // Partial buckets don't occur when app is first installed. + service.mUidMap->updateApp(1, String16(kApp1.c_str()), 1, 1); + SendConfig(service, MakeValueMetricConfig(0)); + const long start = getElapsedRealtimeNs(); // This is the start-time the metrics producers are + // initialized with. + + service.mProcessor->informPullAlarmFired(5 * 60 * NS_PER_SEC + start); + service.mUidMap->updateApp(5 * 60 * NS_PER_SEC + start + 2, String16(kApp1.c_str()), 1, 2); + + ConfigMetricsReport report = + GetReports(service.mProcessor, 5 * 60 * NS_PER_SEC + start + 100, true); + EXPECT_EQ(1, report.metrics_size()); + EXPECT_EQ(0, report.metrics(0).value_metrics().skipped_size()); +} + +TEST(PartialBucketE2eTest, TestValueMetricWithMinPartialBucket) { + StatsService service(nullptr); + // Partial buckets don't occur when app is first installed. + service.mUidMap->updateApp(1, String16(kApp1.c_str()), 1, 1); + SendConfig(service, MakeValueMetricConfig(60 * NS_PER_SEC /* One minute */)); + const long start = getElapsedRealtimeNs(); // This is the start-time the metrics producers are + // initialized with. + + const int64_t endSkipped = 5 * 60 * NS_PER_SEC + start + 2; + service.mProcessor->informPullAlarmFired(5 * 60 * NS_PER_SEC + start); + service.mUidMap->updateApp(endSkipped, String16(kApp1.c_str()), 1, 2); + + ConfigMetricsReport report = + GetReports(service.mProcessor, 5 * 60 * NS_PER_SEC + start + 100 * NS_PER_SEC, true); + EXPECT_EQ(1, report.metrics_size()); + EXPECT_EQ(1, report.metrics(0).value_metrics().skipped_size()); + // Can't test the start time since it will be based on the actual time when the pulling occurs. + EXPECT_EQ(endSkipped, report.metrics(0).value_metrics().skipped(0).end_elapsed_nanos()); +} + +TEST(PartialBucketE2eTest, TestGaugeMetricWithoutMinPartialBucket) { + StatsService service(nullptr); + // Partial buckets don't occur when app is first installed. + service.mUidMap->updateApp(1, String16(kApp1.c_str()), 1, 1); + SendConfig(service, MakeGaugeMetricConfig(0)); + const long start = getElapsedRealtimeNs(); // This is the start-time the metrics producers are + // initialized with. + + service.mProcessor->informPullAlarmFired(5 * 60 * NS_PER_SEC + start); + service.mUidMap->updateApp(5 * 60 * NS_PER_SEC + start + 2, String16(kApp1.c_str()), 1, 2); + + ConfigMetricsReport report = + GetReports(service.mProcessor, 5 * 60 * NS_PER_SEC + start + 100, true); + EXPECT_EQ(1, report.metrics_size()); + EXPECT_EQ(0, report.metrics(0).gauge_metrics().skipped_size()); +} + +TEST(PartialBucketE2eTest, TestGaugeMetricWithMinPartialBucket) { + StatsService service(nullptr); + // Partial buckets don't occur when app is first installed. + service.mUidMap->updateApp(1, String16(kApp1.c_str()), 1, 1); + SendConfig(service, MakeGaugeMetricConfig(60 * NS_PER_SEC /* One minute */)); + const long start = getElapsedRealtimeNs(); // This is the start-time the metrics producers are + // initialized with. + + const int64_t endSkipped = 5 * 60 * NS_PER_SEC + start + 2; + service.mProcessor->informPullAlarmFired(5 * 60 * NS_PER_SEC + start); + service.mUidMap->updateApp(endSkipped, String16(kApp1.c_str()), 1, 2); + + ConfigMetricsReport report = + GetReports(service.mProcessor, 5 * 60 * NS_PER_SEC + start + 100 * NS_PER_SEC, true); + EXPECT_EQ(1, report.metrics_size()); + EXPECT_EQ(1, report.metrics(0).gauge_metrics().skipped_size()); + // Can't test the start time since it will be based on the actual time when the pulling occurs. + EXPECT_EQ(endSkipped, report.metrics(0).gauge_metrics().skipped(0).end_elapsed_nanos()); +} + #else GTEST_LOG_(INFO) << "This test does nothing.\n"; #endif