diff --git a/cmds/statsd/src/metrics/ValueMetricProducer.cpp b/cmds/statsd/src/metrics/ValueMetricProducer.cpp index b50756a5fe219..71df710ee6479 100644 --- a/cmds/statsd/src/metrics/ValueMetricProducer.cpp +++ b/cmds/statsd/src/metrics/ValueMetricProducer.cpp @@ -951,6 +951,11 @@ void ValueMetricProducer::flushCurrentBucketLocked(const int64_t& eventTimeNs, StatsdStats::getInstance().noteBucketUnknownCondition(mMetricId); } + VLOG("finalizing bucket for %ld, dumping %d slices", (long)mCurrentBucketStartTimeNs, + (int)mCurrentSlicedBucket.size()); + + int64_t fullBucketEndTimeNs = getCurrentBucketEndTimeNs(); + int64_t bucketEndTime = fullBucketEndTimeNs; int64_t numBucketsForward = calcBucketsForwardCount(eventTimeNs); if (numBucketsForward > 1) { VLOG("Skipping forward %lld buckets", (long long)numBucketsForward); @@ -959,20 +964,20 @@ void ValueMetricProducer::flushCurrentBucketLocked(const int64_t& eventTimeNs, // to mark the current bucket as invalid. The last pull might have been successful through. invalidateCurrentBucketWithoutResetBase(eventTimeNs, BucketDropReason::MULTIPLE_BUCKETS_SKIPPED); + // End the bucket at the next bucket start time so the entire interval is skipped. + bucketEndTime = nextBucketStartTimeNs; + } else if (eventTimeNs < fullBucketEndTimeNs) { + bucketEndTime = eventTimeNs; } - VLOG("finalizing bucket for %ld, dumping %d slices", (long)mCurrentBucketStartTimeNs, - (int)mCurrentSlicedBucket.size()); - int64_t fullBucketEndTimeNs = getCurrentBucketEndTimeNs(); - int64_t bucketEndTime = eventTimeNs < fullBucketEndTimeNs ? eventTimeNs : fullBucketEndTimeNs; // Close the current bucket. int64_t conditionTrueDuration = mConditionTimer.newBucketStart(bucketEndTime); bool isBucketLargeEnough = bucketEndTime - mCurrentBucketStartTimeNs >= mMinBucketSizeNs; if (!isBucketLargeEnough) { skipCurrentBucket(eventTimeNs, BucketDropReason::BUCKET_TOO_SMALL); } - bool bucketHasData = false; if (!mCurrentBucketIsSkipped) { + bool bucketHasData = false; // The current bucket is large enough to keep. for (const auto& slice : mCurrentSlicedBucket) { ValueBucket bucket = buildPartialBucket(bucketEndTime, slice.second); @@ -984,22 +989,22 @@ void ValueMetricProducer::flushCurrentBucketLocked(const int64_t& eventTimeNs, bucketHasData = true; } } - } - - if (!bucketHasData && !mCurrentBucketIsSkipped) { - skipCurrentBucket(eventTimeNs, BucketDropReason::NO_DATA); + if (!bucketHasData) { + skipCurrentBucket(eventTimeNs, BucketDropReason::NO_DATA); + } } if (mCurrentBucketIsSkipped) { mCurrentSkippedBucket.bucketStartTimeNs = mCurrentBucketStartTimeNs; - // Fill in the gap if we skipped multiple buckets. - mCurrentSkippedBucket.bucketEndTimeNs = - numBucketsForward > 1 ? nextBucketStartTimeNs : bucketEndTime; + mCurrentSkippedBucket.bucketEndTimeNs = bucketEndTime; mSkippedBuckets.emplace_back(mCurrentSkippedBucket); } // This means that the current bucket was not flushed before a forced bucket split. - if (bucketEndTime < nextBucketStartTimeNs && numBucketsForward <= 1) { + // This can happen if an app update or a dump report with include_current_partial_bucket is + // requested before we get a chance to flush the bucket due to receiving new data, either from + // the statsd socket or the StatsPullerManager. + if (bucketEndTime < nextBucketStartTimeNs) { SkippedBucket bucketInGap; bucketInGap.bucketStartTimeNs = bucketEndTime; bucketInGap.bucketEndTimeNs = nextBucketStartTimeNs; @@ -1008,7 +1013,7 @@ void ValueMetricProducer::flushCurrentBucketLocked(const int64_t& eventTimeNs, mSkippedBuckets.emplace_back(bucketInGap); } - appendToFullBucket(eventTimeNs, fullBucketEndTimeNs); + appendToFullBucket(eventTimeNs > fullBucketEndTimeNs); initCurrentSlicedBucket(nextBucketStartTimeNs); // Update the condition timer again, in case we skipped buckets. mConditionTimer.newBucketStart(nextBucketStartTimeNs); @@ -1074,8 +1079,7 @@ void ValueMetricProducer::initCurrentSlicedBucket(int64_t nextBucketStartTimeNs) (long long)mCurrentBucketStartTimeNs); } -void ValueMetricProducer::appendToFullBucket(int64_t eventTimeNs, int64_t fullBucketEndTimeNs) { - bool isFullBucketReached = eventTimeNs > fullBucketEndTimeNs; +void ValueMetricProducer::appendToFullBucket(const bool isFullBucketReached) { if (mCurrentBucketIsSkipped) { if (isFullBucketReached) { // If the bucket is invalid, we ignore the full bucket since it contains invalid data. diff --git a/cmds/statsd/src/metrics/ValueMetricProducer.h b/cmds/statsd/src/metrics/ValueMetricProducer.h index bb4a66164860a..aaf7df5c97717 100644 --- a/cmds/statsd/src/metrics/ValueMetricProducer.h +++ b/cmds/statsd/src/metrics/ValueMetricProducer.h @@ -142,8 +142,10 @@ private: // Mark the data as invalid. void invalidateCurrentBucket(const int64_t dropTimeNs, const BucketDropReason reason); + void invalidateCurrentBucketWithoutResetBase(const int64_t dropTimeNs, const BucketDropReason reason); + // Skips the current bucket without notifying StatsdStats of the skipped bucket. // This should only be called from #flushCurrentBucketLocked. Otherwise, a future event that // causes the bucket to be invalidated will not notify StatsdStats. @@ -209,6 +211,7 @@ private: // Util function to check whether the specified dimension hits the guardrail. bool hitGuardRailLocked(const MetricDimensionKey& newKey); + bool hasReachedGuardRailLimit() const; bool hitFullBucketGuardRailLocked(const MetricDimensionKey& newKey); @@ -220,8 +223,10 @@ private: ValueBucket buildPartialBucket(int64_t bucketEndTime, const std::vector& intervals); + void initCurrentSlicedBucket(int64_t nextBucketStartTimeNs); - void appendToFullBucket(int64_t eventTimeNs, int64_t fullBucketEndTimeNs); + + void appendToFullBucket(const bool isFullBucketReached); // Reset diff base and mHasGlobalBase void resetBase(); diff --git a/cmds/statsd/tests/metrics/ValueMetricProducer_test.cpp b/cmds/statsd/tests/metrics/ValueMetricProducer_test.cpp index 0ba318ecf1851..52eb7409aa5dd 100644 --- a/cmds/statsd/tests/metrics/ValueMetricProducer_test.cpp +++ b/cmds/statsd/tests/metrics/ValueMetricProducer_test.cpp @@ -3612,10 +3612,32 @@ TEST(ValueMetricProducerTest_BucketDrop, TestBucketDropWhenForceBucketSplitBefor ValueMetric metric = ValueMetricProducerTestHelper::createMetricWithCondition(); sp pullerManager = new StrictMock(); + EXPECT_CALL(*pullerManager, Pull(tagId, kConfigKey, _, _, _)) + // Condition change to true. + .WillOnce(Invoke([](int tagId, const ConfigKey&, const int64_t eventTimeNs, + vector>* data, bool) { + EXPECT_EQ(eventTimeNs, bucketStartTimeNs + 10); + data->clear(); + data->push_back(CreateRepeatedValueLogEvent(tagId, bucketStartTimeNs + 10, 10)); + return true; + })) + // App Update. + .WillOnce(Invoke([](int tagId, const ConfigKey&, const int64_t eventTimeNs, + vector>* data, bool) { + EXPECT_EQ(eventTimeNs, bucket2StartTimeNs + 1000); + data->clear(); + data->push_back( + CreateRepeatedValueLogEvent(tagId, bucket2StartTimeNs + 1000, 15)); + return true; + })); sp valueProducer = ValueMetricProducerTestHelper::createValueProducerWithCondition(pullerManager, metric); + // Condition changed event + int64_t conditionChangeTimeNs = bucketStartTimeNs + 10; + valueProducer->onConditionChanged(true, conditionChangeTimeNs); + // App update event. int64_t appUpdateTimeNs = bucket2StartTimeNs + 1000; valueProducer->notifyAppUpgrade(appUpdateTimeNs); @@ -3629,28 +3651,23 @@ TEST(ValueMetricProducerTest_BucketDrop, TestBucketDropWhenForceBucketSplitBefor StatsLogReport report = outputStreamToProto(&output); EXPECT_TRUE(report.has_value_metrics()); - ASSERT_EQ(0, report.value_metrics().data_size()); - ASSERT_EQ(2, report.value_metrics().skipped_size()); + ASSERT_EQ(1, report.value_metrics().data_size()); + ASSERT_EQ(1, report.value_metrics().skipped_size()); + + ASSERT_EQ(1, report.value_metrics().data(0).bucket_info_size()); + auto data = report.value_metrics().data(0); + ASSERT_EQ(0, data.bucket_info(0).bucket_num()); + EXPECT_EQ(5, data.bucket_info(0).values(0).value_long()); - EXPECT_EQ(NanoToMillis(bucketStartTimeNs), - report.value_metrics().skipped(0).start_bucket_elapsed_millis()); EXPECT_EQ(NanoToMillis(bucket2StartTimeNs), + report.value_metrics().skipped(0).start_bucket_elapsed_millis()); + EXPECT_EQ(NanoToMillis(appUpdateTimeNs), report.value_metrics().skipped(0).end_bucket_elapsed_millis()); ASSERT_EQ(1, report.value_metrics().skipped(0).drop_event_size()); auto dropEvent = report.value_metrics().skipped(0).drop_event(0); EXPECT_EQ(BucketDropReason::NO_DATA, dropEvent.drop_reason()); EXPECT_EQ(NanoToMillis(appUpdateTimeNs), dropEvent.drop_time_millis()); - - EXPECT_EQ(NanoToMillis(bucket2StartTimeNs), - report.value_metrics().skipped(1).start_bucket_elapsed_millis()); - EXPECT_EQ(NanoToMillis(appUpdateTimeNs), - report.value_metrics().skipped(1).end_bucket_elapsed_millis()); - ASSERT_EQ(1, report.value_metrics().skipped(1).drop_event_size()); - - dropEvent = report.value_metrics().skipped(1).drop_event(0); - EXPECT_EQ(BucketDropReason::NO_DATA, dropEvent.drop_reason()); - EXPECT_EQ(NanoToMillis(appUpdateTimeNs), dropEvent.drop_time_millis()); } /*