Merge "Adds option to drop small buckets for statsd." into pi-dev

This commit is contained in:
TreeHugger Robot
2018-04-17 19:12:33 +00:00
committed by Android (Google) Code Review
9 changed files with 205 additions and 23 deletions

View File

@@ -272,6 +272,10 @@ private:
FRIEND_TEST(PartialBucketE2eTest, TestCountMetricSplitOnUpgrade);
FRIEND_TEST(PartialBucketE2eTest, TestCountMetricSplitOnRemoval);
FRIEND_TEST(PartialBucketE2eTest, TestCountMetricWithoutSplit);
FRIEND_TEST(PartialBucketE2eTest, TestValueMetricWithoutMinPartialBucket);
FRIEND_TEST(PartialBucketE2eTest, TestValueMetricWithMinPartialBucket);
FRIEND_TEST(PartialBucketE2eTest, TestGaugeMetricWithoutMinPartialBucket);
FRIEND_TEST(PartialBucketE2eTest, TestGaugeMetricWithMinPartialBucket);
};
} // namespace statsd

View File

@@ -47,6 +47,9 @@ const int FIELD_ID_ID = 1;
const int FIELD_ID_GAUGE_METRICS = 8;
// for GaugeMetricDataWrapper
const int FIELD_ID_DATA = 1;
const int FIELD_ID_SKIPPED = 2;
const int FIELD_ID_SKIPPED_START = 1;
const int FIELD_ID_SKIPPED_END = 2;
// for GaugeMetricData
const int FIELD_ID_DIMENSION_IN_WHAT = 1;
const int FIELD_ID_DIMENSION_IN_CONDITION = 2;
@@ -66,6 +69,7 @@ GaugeMetricProducer::GaugeMetricProducer(const ConfigKey& key, const GaugeMetric
: MetricProducer(metric.id(), key, timeBaseNs, conditionIndex, wizard),
mStatsPullerManager(statsPullerManager),
mPullTagId(pullTagId),
mMinBucketSizeNs(metric.min_bucket_size_nanos()),
mDimensionSoftLimit(StatsdStats::kAtomDimensionKeySizeLimitMap.find(pullTagId) !=
StatsdStats::kAtomDimensionKeySizeLimitMap.end()
? StatsdStats::kAtomDimensionKeySizeLimitMap.at(pullTagId).first
@@ -174,6 +178,15 @@ void GaugeMetricProducer::onDumpReportLocked(const int64_t dumpTimeNs,
protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_ID, (long long)mMetricId);
uint64_t protoToken = protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_ID_GAUGE_METRICS);
for (const auto& pair : mSkippedBuckets) {
uint64_t wrapperToken =
protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_SKIPPED);
protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_SKIPPED_START, (long long)pair.first);
protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_SKIPPED_END, (long long)pair.second);
protoOutput->end(wrapperToken);
}
mSkippedBuckets.clear();
for (const auto& pair : mPastBuckets) {
const MetricDimensionKey& dimensionKey = pair.first;
@@ -440,12 +453,16 @@ void GaugeMetricProducer::flushCurrentBucketLocked(const int64_t& eventTimeNs) {
info.mBucketEndNs = fullBucketEndTimeNs;
}
for (const auto& slice : *mCurrentSlicedBucket) {
info.mGaugeAtoms = slice.second;
auto& bucketList = mPastBuckets[slice.first];
bucketList.push_back(info);
VLOG("Gauge gauge metric %lld, dump key value: %s", (long long)mMetricId,
slice.first.toString().c_str());
if (info.mBucketEndNs - mCurrentBucketStartTimeNs >= mMinBucketSizeNs) {
for (const auto& slice : *mCurrentSlicedBucket) {
info.mGaugeAtoms = slice.second;
auto& bucketList = mPastBuckets[slice.first];
bucketList.push_back(info);
VLOG("Gauge gauge metric %lld, dump key value: %s", (long long)mMetricId,
slice.first.toString().c_str());
}
} else {
mSkippedBuckets.emplace_back(info.mBucketStartNs, info.mBucketEndNs);
}
// If we have anomaly trackers, we need to update the partial bucket values.

View File

@@ -136,6 +136,11 @@ private:
// this slice (ie, for partial buckets, we use the last partial bucket in this full bucket).
std::shared_ptr<DimToValMap> mCurrentSlicedBucketForAnomaly;
// Pairs of (elapsed start, elapsed end) denoting buckets that were skipped.
std::list<std::pair<int64_t, int64_t>> mSkippedBuckets;
const int64_t mMinBucketSizeNs;
// Translate Atom based bucket to single numeric value bucket for anomaly and updates the map
// for each slice with the latest value.
void updateCurrentSlicedBucketForAnomaly();

View File

@@ -50,6 +50,9 @@ const int FIELD_ID_ID = 1;
const int FIELD_ID_VALUE_METRICS = 7;
// for ValueMetricDataWrapper
const int FIELD_ID_DATA = 1;
const int FIELD_ID_SKIPPED = 2;
const int FIELD_ID_SKIPPED_START = 1;
const int FIELD_ID_SKIPPED_END = 2;
// for ValueMetricData
const int FIELD_ID_DIMENSION_IN_WHAT = 1;
const int FIELD_ID_DIMENSION_IN_CONDITION = 2;
@@ -69,6 +72,7 @@ ValueMetricProducer::ValueMetricProducer(const ConfigKey& key, const ValueMetric
mValueField(metric.value_field()),
mStatsPullerManager(statsPullerManager),
mPullTagId(pullTagId),
mMinBucketSizeNs(metric.min_bucket_size_nanos()),
mDimensionSoftLimit(StatsdStats::kAtomDimensionKeySizeLimitMap.find(pullTagId) !=
StatsdStats::kAtomDimensionKeySizeLimitMap.end()
? StatsdStats::kAtomDimensionKeySizeLimitMap.at(pullTagId).first
@@ -156,12 +160,21 @@ void ValueMetricProducer::onDumpReportLocked(const int64_t dumpTimeNs,
} else {
flushIfNeededLocked(dumpTimeNs);
}
if (mPastBuckets.empty()) {
if (mPastBuckets.empty() && mSkippedBuckets.empty()) {
return;
}
protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_ID, (long long)mMetricId);
uint64_t protoToken = protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_ID_VALUE_METRICS);
for (const auto& pair : mSkippedBuckets) {
uint64_t wrapperToken =
protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_COUNT_REPEATED | FIELD_ID_SKIPPED);
protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_SKIPPED_START, (long long)pair.first);
protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_SKIPPED_END, (long long)pair.second);
protoOutput->end(wrapperToken);
}
mSkippedBuckets.clear();
for (const auto& pair : mPastBuckets) {
const MetricDimensionKey& dimensionKey = pair.first;
VLOG(" dimension key %s", dimensionKey.toString().c_str());
@@ -391,18 +404,23 @@ void ValueMetricProducer::flushCurrentBucketLocked(const int64_t& eventTimeNs) {
info.mBucketEndNs = fullBucketEndTimeNs;
}
int tainted = 0;
for (const auto& slice : mCurrentSlicedBucket) {
tainted += slice.second.tainted;
tainted += slice.second.startUpdated;
if (slice.second.hasValue) {
info.mValue = slice.second.sum;
// it will auto create new vector of ValuebucketInfo if the key is not found.
auto& bucketList = mPastBuckets[slice.first];
bucketList.push_back(info);
if (info.mBucketEndNs - mCurrentBucketStartTimeNs >= mMinBucketSizeNs) {
// The current bucket is large enough to keep.
int tainted = 0;
for (const auto& slice : mCurrentSlicedBucket) {
tainted += slice.second.tainted;
tainted += slice.second.startUpdated;
if (slice.second.hasValue) {
info.mValue = slice.second.sum;
// it will auto create new vector of ValuebucketInfo if the key is not found.
auto& bucketList = mPastBuckets[slice.first];
bucketList.push_back(info);
}
}
VLOG("%d tainted pairs in the bucket", tainted);
} else {
mSkippedBuckets.emplace_back(info.mBucketStartNs, info.mBucketEndNs);
}
VLOG("%d tainted pairs in the bucket", tainted);
if (eventTimeNs > fullBucketEndTimeNs) { // If full bucket, send to anomaly tracker.
// Accumulate partial buckets with current value and then send to anomaly tracker.

View File

@@ -148,6 +148,11 @@ private:
// TODO: Add a lock to mPastBuckets.
std::unordered_map<MetricDimensionKey, std::vector<ValueBucket>> mPastBuckets;
// Pairs of (elapsed start, elapsed end) denoting buckets that were skipped.
std::list<std::pair<int64_t, int64_t>> mSkippedBuckets;
const int64_t mMinBucketSizeNs;
// Util function to check whether the specified dimension hits the guardrail.
bool hitGuardRailLocked(const MetricDimensionKey& newKey);

View File

@@ -166,6 +166,8 @@ void UidMap::updateApp(const int64_t& timestamp, const String16& app_16, const i
} else {
// Only notify the listeners if this is an app upgrade. If this app is being installed
// for the first time, then we don't notify the listeners.
// It's also OK to split again if we're forming a partial bucket after re-installing an
// app after deletion.
getListenerListCopyLocked(&broadcastList);
}
mChanges.emplace_back(false, timestamp, appName, uid, versionCode, prevVersion);

View File

@@ -121,6 +121,11 @@ message StatsLogReport {
// Fields 2 and 3 are reserved.
message SkippedBuckets {
optional int64 start_elapsed_nanos = 1;
optional int64 end_elapsed_nanos = 2;
}
message EventMetricDataWrapper {
repeated EventMetricData data = 1;
}
@@ -132,10 +137,12 @@ message StatsLogReport {
}
message ValueMetricDataWrapper {
repeated ValueMetricData data = 1;
repeated SkippedBuckets skipped = 2;
}
message GaugeMetricDataWrapper {
repeated GaugeMetricData data = 1;
repeated SkippedBuckets skipped = 2;
}
oneof data {

View File

@@ -236,6 +236,8 @@ message GaugeMetric {
ALL_CONDITION_CHANGES = 2;
}
optional SamplingType sampling_type = 9 [default = RANDOM_ONE_SAMPLE] ;
optional int64 min_bucket_size_nanos = 10;
}
message ValueMetric {
@@ -259,6 +261,8 @@ message ValueMetric {
SUM = 1;
}
optional AggregationType aggregation_type = 8 [default = SUM];
optional int64 min_bucket_size_nanos = 10;
}
message Alert {

View File

@@ -14,6 +14,7 @@
#include <gtest/gtest.h>
#include <binder/IPCThreadState.h>
#include "src/StatsLogProcessor.h"
#include "src/StatsService.h"
#include "src/stats_log_util.h"
@@ -39,9 +40,13 @@ void SendConfig(StatsService& service, const StatsdConfig& config) {
service.addConfiguration(kConfigKey, configAsVec, String16(kAndroid.c_str()));
}
ConfigMetricsReport GetReports(StatsService& service) {
ConfigMetricsReport GetReports(sp<StatsLogProcessor> processor, int64_t timestamp,
bool include_current = false) {
vector<uint8_t> output;
service.getData(kConfigKey, String16(kAndroid.c_str()), &output);
IPCThreadState* ipc = IPCThreadState::self();
ConfigKey configKey(ipc->getCallingUid(), kConfigKey);
processor->onDumpReport(configKey, timestamp, include_current /* include_current_bucket*/,
&output);
ConfigMetricsReportList reports;
reports.ParseFromArray(output.data(), output.size());
EXPECT_EQ(1, reports.reports_size());
@@ -61,6 +66,47 @@ StatsdConfig MakeConfig() {
return config;
}
StatsdConfig MakeValueMetricConfig(int64_t minTime) {
StatsdConfig config;
config.add_allowed_log_source("AID_ROOT"); // LogEvent defaults to UID of root.
auto temperatureAtomMatcher = CreateTemperatureAtomMatcher();
*config.add_atom_matcher() = temperatureAtomMatcher;
*config.add_atom_matcher() = CreateScreenTurnedOnAtomMatcher();
*config.add_atom_matcher() = CreateScreenTurnedOffAtomMatcher();
auto valueMetric = config.add_value_metric();
valueMetric->set_id(123456);
valueMetric->set_what(temperatureAtomMatcher.id());
*valueMetric->mutable_value_field() =
CreateDimensions(android::util::TEMPERATURE, {3 /* temperature degree field */});
*valueMetric->mutable_dimensions_in_what() =
CreateDimensions(android::util::TEMPERATURE, {2 /* sensor name field */});
valueMetric->set_bucket(FIVE_MINUTES);
valueMetric->set_min_bucket_size_nanos(minTime);
return config;
}
StatsdConfig MakeGaugeMetricConfig(int64_t minTime) {
StatsdConfig config;
config.add_allowed_log_source("AID_ROOT"); // LogEvent defaults to UID of root.
auto temperatureAtomMatcher = CreateTemperatureAtomMatcher();
*config.add_atom_matcher() = temperatureAtomMatcher;
*config.add_atom_matcher() = CreateScreenTurnedOnAtomMatcher();
*config.add_atom_matcher() = CreateScreenTurnedOffAtomMatcher();
auto gaugeMetric = config.add_gauge_metric();
gaugeMetric->set_id(123456);
gaugeMetric->set_what(temperatureAtomMatcher.id());
gaugeMetric->mutable_gauge_fields_filter()->set_include_all(true);
*gaugeMetric->mutable_dimensions_in_what() =
CreateDimensions(android::util::TEMPERATURE, {2 /* sensor name field */});
gaugeMetric->set_bucket(FIVE_MINUTES);
gaugeMetric->set_min_bucket_size_nanos(minTime);
return config;
}
TEST(PartialBucketE2eTest, TestCountMetricWithoutSplit) {
StatsService service(nullptr);
SendConfig(service, MakeConfig());
@@ -70,7 +116,7 @@ TEST(PartialBucketE2eTest, TestCountMetricWithoutSplit) {
service.mProcessor->OnLogEvent(CreateAppCrashEvent(100, start + 1).get());
service.mProcessor->OnLogEvent(CreateAppCrashEvent(100, start + 2).get());
ConfigMetricsReport report = GetReports(service);
ConfigMetricsReport report = GetReports(service.mProcessor, start + 3);
// Expect no metrics since the bucket has not finished yet.
EXPECT_EQ(0, report.metrics_size());
}
@@ -89,7 +135,7 @@ TEST(PartialBucketE2eTest, TestCountMetricNoSplitOnNewApp) {
// Goes into the second bucket.
service.mProcessor->OnLogEvent(CreateAppCrashEvent(100, start + 3).get());
ConfigMetricsReport report = GetReports(service);
ConfigMetricsReport report = GetReports(service.mProcessor, start + 4);
EXPECT_EQ(0, report.metrics_size());
}
@@ -106,7 +152,7 @@ TEST(PartialBucketE2eTest, TestCountMetricSplitOnUpgrade) {
// Goes into the second bucket.
service.mProcessor->OnLogEvent(CreateAppCrashEvent(100, start + 3).get());
ConfigMetricsReport report = GetReports(service);
ConfigMetricsReport report = GetReports(service.mProcessor, start + 4);
EXPECT_EQ(1, report.metrics_size());
EXPECT_EQ(1, report.metrics(0).count_metrics().data(0).bucket_info(0).count());
}
@@ -124,11 +170,85 @@ TEST(PartialBucketE2eTest, TestCountMetricSplitOnRemoval) {
// Goes into the second bucket.
service.mProcessor->OnLogEvent(CreateAppCrashEvent(100, start + 3).get());
ConfigMetricsReport report = GetReports(service);
ConfigMetricsReport report = GetReports(service.mProcessor, start + 4);
EXPECT_EQ(1, report.metrics_size());
EXPECT_EQ(1, report.metrics(0).count_metrics().data(0).bucket_info(0).count());
}
TEST(PartialBucketE2eTest, TestValueMetricWithoutMinPartialBucket) {
StatsService service(nullptr);
// Partial buckets don't occur when app is first installed.
service.mUidMap->updateApp(1, String16(kApp1.c_str()), 1, 1);
SendConfig(service, MakeValueMetricConfig(0));
const long start = getElapsedRealtimeNs(); // This is the start-time the metrics producers are
// initialized with.
service.mProcessor->informPullAlarmFired(5 * 60 * NS_PER_SEC + start);
service.mUidMap->updateApp(5 * 60 * NS_PER_SEC + start + 2, String16(kApp1.c_str()), 1, 2);
ConfigMetricsReport report =
GetReports(service.mProcessor, 5 * 60 * NS_PER_SEC + start + 100, true);
EXPECT_EQ(1, report.metrics_size());
EXPECT_EQ(0, report.metrics(0).value_metrics().skipped_size());
}
TEST(PartialBucketE2eTest, TestValueMetricWithMinPartialBucket) {
StatsService service(nullptr);
// Partial buckets don't occur when app is first installed.
service.mUidMap->updateApp(1, String16(kApp1.c_str()), 1, 1);
SendConfig(service, MakeValueMetricConfig(60 * NS_PER_SEC /* One minute */));
const long start = getElapsedRealtimeNs(); // This is the start-time the metrics producers are
// initialized with.
const int64_t endSkipped = 5 * 60 * NS_PER_SEC + start + 2;
service.mProcessor->informPullAlarmFired(5 * 60 * NS_PER_SEC + start);
service.mUidMap->updateApp(endSkipped, String16(kApp1.c_str()), 1, 2);
ConfigMetricsReport report =
GetReports(service.mProcessor, 5 * 60 * NS_PER_SEC + start + 100 * NS_PER_SEC, true);
EXPECT_EQ(1, report.metrics_size());
EXPECT_EQ(1, report.metrics(0).value_metrics().skipped_size());
// Can't test the start time since it will be based on the actual time when the pulling occurs.
EXPECT_EQ(endSkipped, report.metrics(0).value_metrics().skipped(0).end_elapsed_nanos());
}
TEST(PartialBucketE2eTest, TestGaugeMetricWithoutMinPartialBucket) {
StatsService service(nullptr);
// Partial buckets don't occur when app is first installed.
service.mUidMap->updateApp(1, String16(kApp1.c_str()), 1, 1);
SendConfig(service, MakeGaugeMetricConfig(0));
const long start = getElapsedRealtimeNs(); // This is the start-time the metrics producers are
// initialized with.
service.mProcessor->informPullAlarmFired(5 * 60 * NS_PER_SEC + start);
service.mUidMap->updateApp(5 * 60 * NS_PER_SEC + start + 2, String16(kApp1.c_str()), 1, 2);
ConfigMetricsReport report =
GetReports(service.mProcessor, 5 * 60 * NS_PER_SEC + start + 100, true);
EXPECT_EQ(1, report.metrics_size());
EXPECT_EQ(0, report.metrics(0).gauge_metrics().skipped_size());
}
TEST(PartialBucketE2eTest, TestGaugeMetricWithMinPartialBucket) {
StatsService service(nullptr);
// Partial buckets don't occur when app is first installed.
service.mUidMap->updateApp(1, String16(kApp1.c_str()), 1, 1);
SendConfig(service, MakeGaugeMetricConfig(60 * NS_PER_SEC /* One minute */));
const long start = getElapsedRealtimeNs(); // This is the start-time the metrics producers are
// initialized with.
const int64_t endSkipped = 5 * 60 * NS_PER_SEC + start + 2;
service.mProcessor->informPullAlarmFired(5 * 60 * NS_PER_SEC + start);
service.mUidMap->updateApp(endSkipped, String16(kApp1.c_str()), 1, 2);
ConfigMetricsReport report =
GetReports(service.mProcessor, 5 * 60 * NS_PER_SEC + start + 100 * NS_PER_SEC, true);
EXPECT_EQ(1, report.metrics_size());
EXPECT_EQ(1, report.metrics(0).gauge_metrics().skipped_size());
// Can't test the start time since it will be based on the actual time when the pulling occurs.
EXPECT_EQ(endSkipped, report.metrics(0).gauge_metrics().skipped(0).end_elapsed_nanos());
}
#else
GTEST_LOG_(INFO) << "This test does nothing.\n";
#endif