Merge "Refactor ValueMetricProducer bucket flush" into rvc-dev

This commit is contained in:
TreeHugger Robot
2020-05-30 09:01:49 +00:00
committed by Android (Google) Code Review
3 changed files with 57 additions and 31 deletions

View File

@@ -951,6 +951,11 @@ void ValueMetricProducer::flushCurrentBucketLocked(const int64_t& eventTimeNs,
StatsdStats::getInstance().noteBucketUnknownCondition(mMetricId);
}
VLOG("finalizing bucket for %ld, dumping %d slices", (long)mCurrentBucketStartTimeNs,
(int)mCurrentSlicedBucket.size());
int64_t fullBucketEndTimeNs = getCurrentBucketEndTimeNs();
int64_t bucketEndTime = fullBucketEndTimeNs;
int64_t numBucketsForward = calcBucketsForwardCount(eventTimeNs);
if (numBucketsForward > 1) {
VLOG("Skipping forward %lld buckets", (long long)numBucketsForward);
@@ -959,20 +964,20 @@ void ValueMetricProducer::flushCurrentBucketLocked(const int64_t& eventTimeNs,
// to mark the current bucket as invalid. The last pull might have been successful through.
invalidateCurrentBucketWithoutResetBase(eventTimeNs,
BucketDropReason::MULTIPLE_BUCKETS_SKIPPED);
// End the bucket at the next bucket start time so the entire interval is skipped.
bucketEndTime = nextBucketStartTimeNs;
} else if (eventTimeNs < fullBucketEndTimeNs) {
bucketEndTime = eventTimeNs;
}
VLOG("finalizing bucket for %ld, dumping %d slices", (long)mCurrentBucketStartTimeNs,
(int)mCurrentSlicedBucket.size());
int64_t fullBucketEndTimeNs = getCurrentBucketEndTimeNs();
int64_t bucketEndTime = eventTimeNs < fullBucketEndTimeNs ? eventTimeNs : fullBucketEndTimeNs;
// Close the current bucket.
int64_t conditionTrueDuration = mConditionTimer.newBucketStart(bucketEndTime);
bool isBucketLargeEnough = bucketEndTime - mCurrentBucketStartTimeNs >= mMinBucketSizeNs;
if (!isBucketLargeEnough) {
skipCurrentBucket(eventTimeNs, BucketDropReason::BUCKET_TOO_SMALL);
}
bool bucketHasData = false;
if (!mCurrentBucketIsSkipped) {
bool bucketHasData = false;
// The current bucket is large enough to keep.
for (const auto& slice : mCurrentSlicedBucket) {
ValueBucket bucket = buildPartialBucket(bucketEndTime, slice.second);
@@ -984,22 +989,22 @@ void ValueMetricProducer::flushCurrentBucketLocked(const int64_t& eventTimeNs,
bucketHasData = true;
}
}
}
if (!bucketHasData && !mCurrentBucketIsSkipped) {
skipCurrentBucket(eventTimeNs, BucketDropReason::NO_DATA);
if (!bucketHasData) {
skipCurrentBucket(eventTimeNs, BucketDropReason::NO_DATA);
}
}
if (mCurrentBucketIsSkipped) {
mCurrentSkippedBucket.bucketStartTimeNs = mCurrentBucketStartTimeNs;
// Fill in the gap if we skipped multiple buckets.
mCurrentSkippedBucket.bucketEndTimeNs =
numBucketsForward > 1 ? nextBucketStartTimeNs : bucketEndTime;
mCurrentSkippedBucket.bucketEndTimeNs = bucketEndTime;
mSkippedBuckets.emplace_back(mCurrentSkippedBucket);
}
// This means that the current bucket was not flushed before a forced bucket split.
if (bucketEndTime < nextBucketStartTimeNs && numBucketsForward <= 1) {
// This can happen if an app update or a dump report with include_current_partial_bucket is
// requested before we get a chance to flush the bucket due to receiving new data, either from
// the statsd socket or the StatsPullerManager.
if (bucketEndTime < nextBucketStartTimeNs) {
SkippedBucket bucketInGap;
bucketInGap.bucketStartTimeNs = bucketEndTime;
bucketInGap.bucketEndTimeNs = nextBucketStartTimeNs;
@@ -1008,7 +1013,7 @@ void ValueMetricProducer::flushCurrentBucketLocked(const int64_t& eventTimeNs,
mSkippedBuckets.emplace_back(bucketInGap);
}
appendToFullBucket(eventTimeNs, fullBucketEndTimeNs);
appendToFullBucket(eventTimeNs > fullBucketEndTimeNs);
initCurrentSlicedBucket(nextBucketStartTimeNs);
// Update the condition timer again, in case we skipped buckets.
mConditionTimer.newBucketStart(nextBucketStartTimeNs);
@@ -1074,8 +1079,7 @@ void ValueMetricProducer::initCurrentSlicedBucket(int64_t nextBucketStartTimeNs)
(long long)mCurrentBucketStartTimeNs);
}
void ValueMetricProducer::appendToFullBucket(int64_t eventTimeNs, int64_t fullBucketEndTimeNs) {
bool isFullBucketReached = eventTimeNs > fullBucketEndTimeNs;
void ValueMetricProducer::appendToFullBucket(const bool isFullBucketReached) {
if (mCurrentBucketIsSkipped) {
if (isFullBucketReached) {
// If the bucket is invalid, we ignore the full bucket since it contains invalid data.

View File

@@ -142,8 +142,10 @@ private:
// Mark the data as invalid.
void invalidateCurrentBucket(const int64_t dropTimeNs, const BucketDropReason reason);
void invalidateCurrentBucketWithoutResetBase(const int64_t dropTimeNs,
const BucketDropReason reason);
// Skips the current bucket without notifying StatsdStats of the skipped bucket.
// This should only be called from #flushCurrentBucketLocked. Otherwise, a future event that
// causes the bucket to be invalidated will not notify StatsdStats.
@@ -209,6 +211,7 @@ private:
// Util function to check whether the specified dimension hits the guardrail.
bool hitGuardRailLocked(const MetricDimensionKey& newKey);
bool hasReachedGuardRailLimit() const;
bool hitFullBucketGuardRailLocked(const MetricDimensionKey& newKey);
@@ -220,8 +223,10 @@ private:
ValueBucket buildPartialBucket(int64_t bucketEndTime,
const std::vector<Interval>& intervals);
void initCurrentSlicedBucket(int64_t nextBucketStartTimeNs);
void appendToFullBucket(int64_t eventTimeNs, int64_t fullBucketEndTimeNs);
void appendToFullBucket(const bool isFullBucketReached);
// Reset diff base and mHasGlobalBase
void resetBase();

View File

@@ -3612,10 +3612,32 @@ TEST(ValueMetricProducerTest_BucketDrop, TestBucketDropWhenForceBucketSplitBefor
ValueMetric metric = ValueMetricProducerTestHelper::createMetricWithCondition();
sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>();
EXPECT_CALL(*pullerManager, Pull(tagId, kConfigKey, _, _, _))
// Condition change to true.
.WillOnce(Invoke([](int tagId, const ConfigKey&, const int64_t eventTimeNs,
vector<std::shared_ptr<LogEvent>>* data, bool) {
EXPECT_EQ(eventTimeNs, bucketStartTimeNs + 10);
data->clear();
data->push_back(CreateRepeatedValueLogEvent(tagId, bucketStartTimeNs + 10, 10));
return true;
}))
// App Update.
.WillOnce(Invoke([](int tagId, const ConfigKey&, const int64_t eventTimeNs,
vector<std::shared_ptr<LogEvent>>* data, bool) {
EXPECT_EQ(eventTimeNs, bucket2StartTimeNs + 1000);
data->clear();
data->push_back(
CreateRepeatedValueLogEvent(tagId, bucket2StartTimeNs + 1000, 15));
return true;
}));
sp<ValueMetricProducer> valueProducer =
ValueMetricProducerTestHelper::createValueProducerWithCondition(pullerManager, metric);
// Condition changed event
int64_t conditionChangeTimeNs = bucketStartTimeNs + 10;
valueProducer->onConditionChanged(true, conditionChangeTimeNs);
// App update event.
int64_t appUpdateTimeNs = bucket2StartTimeNs + 1000;
valueProducer->notifyAppUpgrade(appUpdateTimeNs);
@@ -3629,28 +3651,23 @@ TEST(ValueMetricProducerTest_BucketDrop, TestBucketDropWhenForceBucketSplitBefor
StatsLogReport report = outputStreamToProto(&output);
EXPECT_TRUE(report.has_value_metrics());
ASSERT_EQ(0, report.value_metrics().data_size());
ASSERT_EQ(2, report.value_metrics().skipped_size());
ASSERT_EQ(1, report.value_metrics().data_size());
ASSERT_EQ(1, report.value_metrics().skipped_size());
ASSERT_EQ(1, report.value_metrics().data(0).bucket_info_size());
auto data = report.value_metrics().data(0);
ASSERT_EQ(0, data.bucket_info(0).bucket_num());
EXPECT_EQ(5, data.bucket_info(0).values(0).value_long());
EXPECT_EQ(NanoToMillis(bucketStartTimeNs),
report.value_metrics().skipped(0).start_bucket_elapsed_millis());
EXPECT_EQ(NanoToMillis(bucket2StartTimeNs),
report.value_metrics().skipped(0).start_bucket_elapsed_millis());
EXPECT_EQ(NanoToMillis(appUpdateTimeNs),
report.value_metrics().skipped(0).end_bucket_elapsed_millis());
ASSERT_EQ(1, report.value_metrics().skipped(0).drop_event_size());
auto dropEvent = report.value_metrics().skipped(0).drop_event(0);
EXPECT_EQ(BucketDropReason::NO_DATA, dropEvent.drop_reason());
EXPECT_EQ(NanoToMillis(appUpdateTimeNs), dropEvent.drop_time_millis());
EXPECT_EQ(NanoToMillis(bucket2StartTimeNs),
report.value_metrics().skipped(1).start_bucket_elapsed_millis());
EXPECT_EQ(NanoToMillis(appUpdateTimeNs),
report.value_metrics().skipped(1).end_bucket_elapsed_millis());
ASSERT_EQ(1, report.value_metrics().skipped(1).drop_event_size());
dropEvent = report.value_metrics().skipped(1).drop_event(0);
EXPECT_EQ(BucketDropReason::NO_DATA, dropEvent.drop_reason());
EXPECT_EQ(NanoToMillis(appUpdateTimeNs), dropEvent.drop_time_millis());
}
/*