Merge "Refactor ValueMetricProducer bucket flush" into rvc-dev
This commit is contained in:
committed by
Android (Google) Code Review
commit
eaf58a156b
@@ -951,6 +951,11 @@ void ValueMetricProducer::flushCurrentBucketLocked(const int64_t& eventTimeNs,
|
||||
StatsdStats::getInstance().noteBucketUnknownCondition(mMetricId);
|
||||
}
|
||||
|
||||
VLOG("finalizing bucket for %ld, dumping %d slices", (long)mCurrentBucketStartTimeNs,
|
||||
(int)mCurrentSlicedBucket.size());
|
||||
|
||||
int64_t fullBucketEndTimeNs = getCurrentBucketEndTimeNs();
|
||||
int64_t bucketEndTime = fullBucketEndTimeNs;
|
||||
int64_t numBucketsForward = calcBucketsForwardCount(eventTimeNs);
|
||||
if (numBucketsForward > 1) {
|
||||
VLOG("Skipping forward %lld buckets", (long long)numBucketsForward);
|
||||
@@ -959,20 +964,20 @@ void ValueMetricProducer::flushCurrentBucketLocked(const int64_t& eventTimeNs,
|
||||
// to mark the current bucket as invalid. The last pull might have been successful through.
|
||||
invalidateCurrentBucketWithoutResetBase(eventTimeNs,
|
||||
BucketDropReason::MULTIPLE_BUCKETS_SKIPPED);
|
||||
// End the bucket at the next bucket start time so the entire interval is skipped.
|
||||
bucketEndTime = nextBucketStartTimeNs;
|
||||
} else if (eventTimeNs < fullBucketEndTimeNs) {
|
||||
bucketEndTime = eventTimeNs;
|
||||
}
|
||||
|
||||
VLOG("finalizing bucket for %ld, dumping %d slices", (long)mCurrentBucketStartTimeNs,
|
||||
(int)mCurrentSlicedBucket.size());
|
||||
int64_t fullBucketEndTimeNs = getCurrentBucketEndTimeNs();
|
||||
int64_t bucketEndTime = eventTimeNs < fullBucketEndTimeNs ? eventTimeNs : fullBucketEndTimeNs;
|
||||
// Close the current bucket.
|
||||
int64_t conditionTrueDuration = mConditionTimer.newBucketStart(bucketEndTime);
|
||||
bool isBucketLargeEnough = bucketEndTime - mCurrentBucketStartTimeNs >= mMinBucketSizeNs;
|
||||
if (!isBucketLargeEnough) {
|
||||
skipCurrentBucket(eventTimeNs, BucketDropReason::BUCKET_TOO_SMALL);
|
||||
}
|
||||
bool bucketHasData = false;
|
||||
if (!mCurrentBucketIsSkipped) {
|
||||
bool bucketHasData = false;
|
||||
// The current bucket is large enough to keep.
|
||||
for (const auto& slice : mCurrentSlicedBucket) {
|
||||
ValueBucket bucket = buildPartialBucket(bucketEndTime, slice.second);
|
||||
@@ -984,22 +989,22 @@ void ValueMetricProducer::flushCurrentBucketLocked(const int64_t& eventTimeNs,
|
||||
bucketHasData = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (!bucketHasData && !mCurrentBucketIsSkipped) {
|
||||
skipCurrentBucket(eventTimeNs, BucketDropReason::NO_DATA);
|
||||
if (!bucketHasData) {
|
||||
skipCurrentBucket(eventTimeNs, BucketDropReason::NO_DATA);
|
||||
}
|
||||
}
|
||||
|
||||
if (mCurrentBucketIsSkipped) {
|
||||
mCurrentSkippedBucket.bucketStartTimeNs = mCurrentBucketStartTimeNs;
|
||||
// Fill in the gap if we skipped multiple buckets.
|
||||
mCurrentSkippedBucket.bucketEndTimeNs =
|
||||
numBucketsForward > 1 ? nextBucketStartTimeNs : bucketEndTime;
|
||||
mCurrentSkippedBucket.bucketEndTimeNs = bucketEndTime;
|
||||
mSkippedBuckets.emplace_back(mCurrentSkippedBucket);
|
||||
}
|
||||
|
||||
// This means that the current bucket was not flushed before a forced bucket split.
|
||||
if (bucketEndTime < nextBucketStartTimeNs && numBucketsForward <= 1) {
|
||||
// This can happen if an app update or a dump report with include_current_partial_bucket is
|
||||
// requested before we get a chance to flush the bucket due to receiving new data, either from
|
||||
// the statsd socket or the StatsPullerManager.
|
||||
if (bucketEndTime < nextBucketStartTimeNs) {
|
||||
SkippedBucket bucketInGap;
|
||||
bucketInGap.bucketStartTimeNs = bucketEndTime;
|
||||
bucketInGap.bucketEndTimeNs = nextBucketStartTimeNs;
|
||||
@@ -1008,7 +1013,7 @@ void ValueMetricProducer::flushCurrentBucketLocked(const int64_t& eventTimeNs,
|
||||
mSkippedBuckets.emplace_back(bucketInGap);
|
||||
}
|
||||
|
||||
appendToFullBucket(eventTimeNs, fullBucketEndTimeNs);
|
||||
appendToFullBucket(eventTimeNs > fullBucketEndTimeNs);
|
||||
initCurrentSlicedBucket(nextBucketStartTimeNs);
|
||||
// Update the condition timer again, in case we skipped buckets.
|
||||
mConditionTimer.newBucketStart(nextBucketStartTimeNs);
|
||||
@@ -1074,8 +1079,7 @@ void ValueMetricProducer::initCurrentSlicedBucket(int64_t nextBucketStartTimeNs)
|
||||
(long long)mCurrentBucketStartTimeNs);
|
||||
}
|
||||
|
||||
void ValueMetricProducer::appendToFullBucket(int64_t eventTimeNs, int64_t fullBucketEndTimeNs) {
|
||||
bool isFullBucketReached = eventTimeNs > fullBucketEndTimeNs;
|
||||
void ValueMetricProducer::appendToFullBucket(const bool isFullBucketReached) {
|
||||
if (mCurrentBucketIsSkipped) {
|
||||
if (isFullBucketReached) {
|
||||
// If the bucket is invalid, we ignore the full bucket since it contains invalid data.
|
||||
|
||||
@@ -142,8 +142,10 @@ private:
|
||||
|
||||
// Mark the data as invalid.
|
||||
void invalidateCurrentBucket(const int64_t dropTimeNs, const BucketDropReason reason);
|
||||
|
||||
void invalidateCurrentBucketWithoutResetBase(const int64_t dropTimeNs,
|
||||
const BucketDropReason reason);
|
||||
|
||||
// Skips the current bucket without notifying StatsdStats of the skipped bucket.
|
||||
// This should only be called from #flushCurrentBucketLocked. Otherwise, a future event that
|
||||
// causes the bucket to be invalidated will not notify StatsdStats.
|
||||
@@ -209,6 +211,7 @@ private:
|
||||
|
||||
// Util function to check whether the specified dimension hits the guardrail.
|
||||
bool hitGuardRailLocked(const MetricDimensionKey& newKey);
|
||||
|
||||
bool hasReachedGuardRailLimit() const;
|
||||
|
||||
bool hitFullBucketGuardRailLocked(const MetricDimensionKey& newKey);
|
||||
@@ -220,8 +223,10 @@ private:
|
||||
|
||||
ValueBucket buildPartialBucket(int64_t bucketEndTime,
|
||||
const std::vector<Interval>& intervals);
|
||||
|
||||
void initCurrentSlicedBucket(int64_t nextBucketStartTimeNs);
|
||||
void appendToFullBucket(int64_t eventTimeNs, int64_t fullBucketEndTimeNs);
|
||||
|
||||
void appendToFullBucket(const bool isFullBucketReached);
|
||||
|
||||
// Reset diff base and mHasGlobalBase
|
||||
void resetBase();
|
||||
|
||||
@@ -3612,10 +3612,32 @@ TEST(ValueMetricProducerTest_BucketDrop, TestBucketDropWhenForceBucketSplitBefor
|
||||
ValueMetric metric = ValueMetricProducerTestHelper::createMetricWithCondition();
|
||||
|
||||
sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>();
|
||||
EXPECT_CALL(*pullerManager, Pull(tagId, kConfigKey, _, _, _))
|
||||
// Condition change to true.
|
||||
.WillOnce(Invoke([](int tagId, const ConfigKey&, const int64_t eventTimeNs,
|
||||
vector<std::shared_ptr<LogEvent>>* data, bool) {
|
||||
EXPECT_EQ(eventTimeNs, bucketStartTimeNs + 10);
|
||||
data->clear();
|
||||
data->push_back(CreateRepeatedValueLogEvent(tagId, bucketStartTimeNs + 10, 10));
|
||||
return true;
|
||||
}))
|
||||
// App Update.
|
||||
.WillOnce(Invoke([](int tagId, const ConfigKey&, const int64_t eventTimeNs,
|
||||
vector<std::shared_ptr<LogEvent>>* data, bool) {
|
||||
EXPECT_EQ(eventTimeNs, bucket2StartTimeNs + 1000);
|
||||
data->clear();
|
||||
data->push_back(
|
||||
CreateRepeatedValueLogEvent(tagId, bucket2StartTimeNs + 1000, 15));
|
||||
return true;
|
||||
}));
|
||||
|
||||
sp<ValueMetricProducer> valueProducer =
|
||||
ValueMetricProducerTestHelper::createValueProducerWithCondition(pullerManager, metric);
|
||||
|
||||
// Condition changed event
|
||||
int64_t conditionChangeTimeNs = bucketStartTimeNs + 10;
|
||||
valueProducer->onConditionChanged(true, conditionChangeTimeNs);
|
||||
|
||||
// App update event.
|
||||
int64_t appUpdateTimeNs = bucket2StartTimeNs + 1000;
|
||||
valueProducer->notifyAppUpgrade(appUpdateTimeNs);
|
||||
@@ -3629,28 +3651,23 @@ TEST(ValueMetricProducerTest_BucketDrop, TestBucketDropWhenForceBucketSplitBefor
|
||||
|
||||
StatsLogReport report = outputStreamToProto(&output);
|
||||
EXPECT_TRUE(report.has_value_metrics());
|
||||
ASSERT_EQ(0, report.value_metrics().data_size());
|
||||
ASSERT_EQ(2, report.value_metrics().skipped_size());
|
||||
ASSERT_EQ(1, report.value_metrics().data_size());
|
||||
ASSERT_EQ(1, report.value_metrics().skipped_size());
|
||||
|
||||
ASSERT_EQ(1, report.value_metrics().data(0).bucket_info_size());
|
||||
auto data = report.value_metrics().data(0);
|
||||
ASSERT_EQ(0, data.bucket_info(0).bucket_num());
|
||||
EXPECT_EQ(5, data.bucket_info(0).values(0).value_long());
|
||||
|
||||
EXPECT_EQ(NanoToMillis(bucketStartTimeNs),
|
||||
report.value_metrics().skipped(0).start_bucket_elapsed_millis());
|
||||
EXPECT_EQ(NanoToMillis(bucket2StartTimeNs),
|
||||
report.value_metrics().skipped(0).start_bucket_elapsed_millis());
|
||||
EXPECT_EQ(NanoToMillis(appUpdateTimeNs),
|
||||
report.value_metrics().skipped(0).end_bucket_elapsed_millis());
|
||||
ASSERT_EQ(1, report.value_metrics().skipped(0).drop_event_size());
|
||||
|
||||
auto dropEvent = report.value_metrics().skipped(0).drop_event(0);
|
||||
EXPECT_EQ(BucketDropReason::NO_DATA, dropEvent.drop_reason());
|
||||
EXPECT_EQ(NanoToMillis(appUpdateTimeNs), dropEvent.drop_time_millis());
|
||||
|
||||
EXPECT_EQ(NanoToMillis(bucket2StartTimeNs),
|
||||
report.value_metrics().skipped(1).start_bucket_elapsed_millis());
|
||||
EXPECT_EQ(NanoToMillis(appUpdateTimeNs),
|
||||
report.value_metrics().skipped(1).end_bucket_elapsed_millis());
|
||||
ASSERT_EQ(1, report.value_metrics().skipped(1).drop_event_size());
|
||||
|
||||
dropEvent = report.value_metrics().skipped(1).drop_event(0);
|
||||
EXPECT_EQ(BucketDropReason::NO_DATA, dropEvent.drop_reason());
|
||||
EXPECT_EQ(NanoToMillis(appUpdateTimeNs), dropEvent.drop_time_millis());
|
||||
}
|
||||
|
||||
/*
|
||||
|
||||
Reference in New Issue
Block a user