From f2bee6fec965fd42ab223f1a3aa705f07ba79aea Mon Sep 17 00:00:00 2001 From: Yangster Date: Wed, 29 Nov 2017 12:01:05 -0800 Subject: [PATCH] 1/ Only expose thread-safe interfaces in metric producer. 2/ Simplify lock logic. 3/ Add test for duration metric producer. Test: all unit test passsed. Change-Id: If6ee2e69a17f12406f4b3ea3553b14642cd636d6 --- cmds/statsd/Android.mk | 1 + cmds/statsd/src/StatsLogProcessor.h | 2 +- .../src/metrics/CountMetricProducer.cpp | 30 +++-- cmds/statsd/src/metrics/CountMetricProducer.h | 41 +++--- .../src/metrics/DurationMetricProducer.cpp | 35 ++--- .../src/metrics/DurationMetricProducer.h | 50 ++++--- .../src/metrics/EventMetricProducer.cpp | 19 +-- cmds/statsd/src/metrics/EventMetricProducer.h | 35 ++--- .../src/metrics/GaugeMetricProducer.cpp | 37 ++--- cmds/statsd/src/metrics/GaugeMetricProducer.h | 43 +++--- cmds/statsd/src/metrics/MetricProducer.cpp | 10 +- cmds/statsd/src/metrics/MetricProducer.h | 56 ++++++-- cmds/statsd/src/metrics/MetricsManager.cpp | 6 +- .../src/metrics/ValueMetricProducer.cpp | 40 +++--- cmds/statsd/src/metrics/ValueMetricProducer.h | 43 +++--- .../src/metrics/metrics_manager_util.cpp | 6 +- .../metrics/CountMetricProducer_test.cpp | 14 +- .../metrics/DurationMetricProducer_test.cpp | 126 ++++++++++++++++++ .../metrics/GaugeMetricProducer_test.cpp | 4 +- .../metrics/ValueMetricProducer_test.cpp | 2 +- 20 files changed, 396 insertions(+), 204 deletions(-) create mode 100644 cmds/statsd/tests/metrics/DurationMetricProducer_test.cpp diff --git a/cmds/statsd/Android.mk b/cmds/statsd/Android.mk index 1f15c5e97e0dc..87cde032ba795 100644 --- a/cmds/statsd/Android.mk +++ b/cmds/statsd/Android.mk @@ -167,6 +167,7 @@ LOCAL_SRC_FILES := \ tests/metrics/OringDurationTracker_test.cpp \ tests/metrics/MaxDurationTracker_test.cpp \ tests/metrics/CountMetricProducer_test.cpp \ + tests/metrics/DurationMetricProducer_test.cpp \ tests/metrics/EventMetricProducer_test.cpp \ tests/metrics/ValueMetricProducer_test.cpp \ tests/guardrail/StatsdStats_test.cpp diff --git a/cmds/statsd/src/StatsLogProcessor.h b/cmds/statsd/src/StatsLogProcessor.h index 27e3854b91d62..e9ac01536b423 100644 --- a/cmds/statsd/src/StatsLogProcessor.h +++ b/cmds/statsd/src/StatsLogProcessor.h @@ -36,7 +36,7 @@ public: const std::function& sendBroadcast); virtual ~StatsLogProcessor(); - virtual void OnLogEvent(const LogEvent& event); + void OnLogEvent(const LogEvent& event); void OnConfigUpdated(const ConfigKey& key, const StatsdConfig& config); void OnConfigRemoved(const ConfigKey& key); diff --git a/cmds/statsd/src/metrics/CountMetricProducer.cpp b/cmds/statsd/src/metrics/CountMetricProducer.cpp index 00642400b3e8f..36ec6b98b2d7b 100644 --- a/cmds/statsd/src/metrics/CountMetricProducer.cpp +++ b/cmds/statsd/src/metrics/CountMetricProducer.cpp @@ -83,7 +83,7 @@ CountMetricProducer::CountMetricProducer(const ConfigKey& key, const CountMetric mConditionSliced = true; } - startNewProtoOutputStream(mStartTimeNs); + startNewProtoOutputStreamLocked(mStartTimeNs); VLOG("metric %s created. bucket size %lld start_time: %lld", metric.name().c_str(), (long long)mBucketSizeNs, (long long)mStartTimeNs); @@ -93,7 +93,7 @@ CountMetricProducer::~CountMetricProducer() { VLOG("~CountMetricProducer() called"); } -void CountMetricProducer::startNewProtoOutputStream(long long startTime) { +void CountMetricProducer::startNewProtoOutputStreamLocked(long long startTime) { mProto = std::make_unique(); mProto->write(FIELD_TYPE_STRING | FIELD_ID_NAME, mMetric.name()); mProto->write(FIELD_TYPE_INT64 | FIELD_ID_START_REPORT_NANOS, startTime); @@ -103,17 +103,17 @@ void CountMetricProducer::startNewProtoOutputStream(long long startTime) { void CountMetricProducer::finish() { } -void CountMetricProducer::onSlicedConditionMayChange(const uint64_t eventTime) { +void CountMetricProducer::onSlicedConditionMayChangeLocked(const uint64_t eventTime) { VLOG("Metric %s onSlicedConditionMayChange", mMetric.name().c_str()); } -std::unique_ptr> CountMetricProducer::onDumpReport() { +std::unique_ptr> CountMetricProducer::onDumpReportLocked() { long long endTime = time(nullptr) * NS_PER_SEC; // Dump current bucket if it's stale. // If current bucket is still on-going, don't force dump current bucket. // In finish(), We can force dump current bucket. - flushIfNeeded(endTime); + flushIfNeededLocked(endTime); VLOG("metric %s dump report now...", mMetric.name().c_str()); for (const auto& counter : mPastBuckets) { @@ -165,9 +165,9 @@ std::unique_ptr> CountMetricProducer::onDumpReport() { (long long)mCurrentBucketStartTimeNs); VLOG("metric %s dump report now...", mMetric.name().c_str()); - std::unique_ptr> buffer = serializeProto(); + std::unique_ptr> buffer = serializeProtoLocked(); - startNewProtoOutputStream(endTime); + startNewProtoOutputStreamLocked(endTime); mPastBuckets.clear(); return buffer; @@ -175,12 +175,13 @@ std::unique_ptr> CountMetricProducer::onDumpReport() { // TODO: Clear mDimensionKeyMap once the report is dumped. } -void CountMetricProducer::onConditionChanged(const bool conditionMet, const uint64_t eventTime) { +void CountMetricProducer::onConditionChangedLocked(const bool conditionMet, + const uint64_t eventTime) { VLOG("Metric %s onConditionChanged", mMetric.name().c_str()); mCondition = conditionMet; } -bool CountMetricProducer::hitGuardRail(const HashableDimensionKey& newKey) { +bool CountMetricProducer::hitGuardRailLocked(const HashableDimensionKey& newKey) { if (mCurrentSlicedCounter->find(newKey) != mCurrentSlicedCounter->end()) { return false; } @@ -200,13 +201,14 @@ bool CountMetricProducer::hitGuardRail(const HashableDimensionKey& newKey) { return false; } -void CountMetricProducer::onMatchedLogEventInternal( + +void CountMetricProducer::onMatchedLogEventInternalLocked( const size_t matcherIndex, const HashableDimensionKey& eventKey, const map& conditionKey, bool condition, const LogEvent& event, bool scheduledPull) { uint64_t eventTimeNs = event.GetTimestampNs(); - flushIfNeeded(eventTimeNs); + flushIfNeededLocked(eventTimeNs); if (condition == false) { return; @@ -216,7 +218,7 @@ void CountMetricProducer::onMatchedLogEventInternal( if (it == mCurrentSlicedCounter->end()) { // ===========GuardRail============== - if (hitGuardRail(eventKey)) { + if (hitGuardRailLocked(eventKey)) { return; } @@ -239,7 +241,7 @@ void CountMetricProducer::onMatchedLogEventInternal( // When a new matched event comes in, we check if event falls into the current // bucket. If not, flush the old counter to past buckets and initialize the new bucket. -void CountMetricProducer::flushIfNeeded(const uint64_t eventTimeNs) { +void CountMetricProducer::flushIfNeededLocked(const uint64_t& eventTimeNs) { if (eventTimeNs < mCurrentBucketStartTimeNs + mBucketSizeNs) { return; } @@ -272,7 +274,7 @@ void CountMetricProducer::flushIfNeeded(const uint64_t eventTimeNs) { // Rough estimate of CountMetricProducer buffer stored. This number will be // greater than actual data size as it contains each dimension of // CountMetricData is duplicated. -size_t CountMetricProducer::byteSize() const { +size_t CountMetricProducer::byteSizeLocked() const { size_t totalSize = 0; for (const auto& pair : mPastBuckets) { totalSize += pair.second.size() * kBucketSize; diff --git a/cmds/statsd/src/metrics/CountMetricProducer.h b/cmds/statsd/src/metrics/CountMetricProducer.h index f78a199de1033..ce38f5832047d 100644 --- a/cmds/statsd/src/metrics/CountMetricProducer.h +++ b/cmds/statsd/src/metrics/CountMetricProducer.h @@ -48,33 +48,38 @@ public: virtual ~CountMetricProducer(); - void onConditionChanged(const bool conditionMet, const uint64_t eventTime) override; - void finish() override; - void flushIfNeeded(const uint64_t newEventTime) override; - - // TODO: Pass a timestamp as a parameter in onDumpReport. - std::unique_ptr> onDumpReport() override; - - void onSlicedConditionMayChange(const uint64_t eventTime) override; - - size_t byteSize() const override; - // TODO: Implement this later. virtual void notifyAppUpgrade(const string& apk, const int uid, const int version) override{}; // TODO: Implement this later. virtual void notifyAppRemoved(const string& apk, const int uid) override{}; protected: - void onMatchedLogEventInternal(const size_t matcherIndex, const HashableDimensionKey& eventKey, - const std::map& conditionKey, - bool condition, const LogEvent& event, - bool scheduledPull) override; - - void startNewProtoOutputStream(long long timestamp) override; + void onMatchedLogEventInternalLocked( + const size_t matcherIndex, const HashableDimensionKey& eventKey, + const std::map& conditionKey, bool condition, + const LogEvent& event, bool scheduledPull) override; private: + // TODO: Pass a timestamp as a parameter in onDumpReport. + std::unique_ptr> onDumpReportLocked() override; + + // Internal interface to handle condition change. + void onConditionChangedLocked(const bool conditionMet, const uint64_t eventTime) override; + + // Internal interface to handle sliced condition change. + void onSlicedConditionMayChangeLocked(const uint64_t eventTime) override; + + // Internal function to calculate the current used bytes. + size_t byteSizeLocked() const override; + + // Util function to flush the old packet. + void flushIfNeededLocked(const uint64_t& newEventTime); + + // Util function to init/reset the proto output stream. + void startNewProtoOutputStreamLocked(long long timestamp); + const CountMetric mMetric; // TODO: Add a lock to mPastBuckets. @@ -85,7 +90,7 @@ private: static const size_t kBucketSize = sizeof(CountBucket{}); - bool hitGuardRail(const HashableDimensionKey& newKey); + bool hitGuardRailLocked(const HashableDimensionKey& newKey); FRIEND_TEST(CountMetricProducerTest, TestNonDimensionalEvents); FRIEND_TEST(CountMetricProducerTest, TestEventsWithNonSlicedCondition); diff --git a/cmds/statsd/src/metrics/DurationMetricProducer.cpp b/cmds/statsd/src/metrics/DurationMetricProducer.cpp index a0374c0ba67c3..66e8c6103ae86 100644 --- a/cmds/statsd/src/metrics/DurationMetricProducer.cpp +++ b/cmds/statsd/src/metrics/DurationMetricProducer.cpp @@ -93,7 +93,7 @@ DurationMetricProducer::DurationMetricProducer(const ConfigKey& key, const Durat mConditionSliced = true; } - startNewProtoOutputStream(mStartTimeNs); + startNewProtoOutputStreamLocked(mStartTimeNs); VLOG("metric %s created. bucket size %lld start_time: %lld", metric.name().c_str(), (long long)mBucketSizeNs, (long long)mStartTimeNs); @@ -103,7 +103,7 @@ DurationMetricProducer::~DurationMetricProducer() { VLOG("~DurationMetric() called"); } -void DurationMetricProducer::startNewProtoOutputStream(long long startTime) { +void DurationMetricProducer::startNewProtoOutputStreamLocked(long long startTime) { mProto = std::make_unique(); mProto->write(FIELD_TYPE_STRING | FIELD_ID_NAME, mMetric.name()); mProto->write(FIELD_TYPE_INT64 | FIELD_ID_START_REPORT_NANOS, startTime); @@ -111,7 +111,7 @@ void DurationMetricProducer::startNewProtoOutputStream(long long startTime) { } unique_ptr DurationMetricProducer::createDurationTracker( - const HashableDimensionKey& eventKey, vector& bucket) { + const HashableDimensionKey& eventKey, vector& bucket) const { switch (mMetric.aggregation_type()) { case DurationMetric_AggregationType_SUM: return make_unique( @@ -129,19 +129,20 @@ void DurationMetricProducer::finish() { // DropboxWriter. } -void DurationMetricProducer::onSlicedConditionMayChange(const uint64_t eventTime) { +void DurationMetricProducer::onSlicedConditionMayChangeLocked(const uint64_t eventTime) { VLOG("Metric %s onSlicedConditionMayChange", mMetric.name().c_str()); - flushIfNeeded(eventTime); + flushIfNeededLocked(eventTime); // Now for each of the on-going event, check if the condition has changed for them. for (auto& pair : mCurrentSlicedDuration) { pair.second->onSlicedConditionMayChange(eventTime); } } -void DurationMetricProducer::onConditionChanged(const bool conditionMet, const uint64_t eventTime) { +void DurationMetricProducer::onConditionChangedLocked(const bool conditionMet, + const uint64_t eventTime) { VLOG("Metric %s onConditionChanged", mMetric.name().c_str()); mCondition = conditionMet; - flushIfNeeded(eventTime); + flushIfNeededLocked(eventTime); // TODO: need to populate the condition change time from the event which triggers the condition // change, instead of using current time. for (auto& pair : mCurrentSlicedDuration) { @@ -149,13 +150,13 @@ void DurationMetricProducer::onConditionChanged(const bool conditionMet, const u } } -std::unique_ptr> DurationMetricProducer::onDumpReport() { +std::unique_ptr> DurationMetricProducer::onDumpReportLocked() { long long endTime = time(nullptr) * NS_PER_SEC; // Dump current bucket if it's stale. // If current bucket is still on-going, don't force dump current bucket. // In finish(), We can force dump current bucket. - flushIfNeeded(endTime); + flushIfNeededLocked(endTime); VLOG("metric %s dump report now...", mMetric.name().c_str()); for (const auto& pair : mPastBuckets) { @@ -214,13 +215,13 @@ std::unique_ptr> DurationMetricProducer::onDumpReport() { mProto->end(mProtoToken); mProto->write(FIELD_TYPE_INT64 | FIELD_ID_END_REPORT_NANOS, (long long)mCurrentBucketStartTimeNs); - std::unique_ptr> buffer = serializeProto(); - startNewProtoOutputStream(endTime); + std::unique_ptr> buffer = serializeProtoLocked(); + startNewProtoOutputStreamLocked(endTime); // TODO: Properly clear the old buckets. return buffer; } -void DurationMetricProducer::flushIfNeeded(uint64_t eventTime) { +void DurationMetricProducer::flushIfNeededLocked(const uint64_t& eventTime) { if (mCurrentBucketStartTimeNs + mBucketSizeNs > eventTime) { return; } @@ -239,7 +240,7 @@ void DurationMetricProducer::flushIfNeeded(uint64_t eventTime) { mCurrentBucketNum += numBucketsForward; } -bool DurationMetricProducer::hitGuardRail(const HashableDimensionKey& newKey) { +bool DurationMetricProducer::hitGuardRailLocked(const HashableDimensionKey& newKey) { // the key is not new, we are good. if (mCurrentSlicedDuration.find(newKey) != mCurrentSlicedDuration.end()) { return false; @@ -259,11 +260,11 @@ bool DurationMetricProducer::hitGuardRail(const HashableDimensionKey& newKey) { return false; } -void DurationMetricProducer::onMatchedLogEventInternal( +void DurationMetricProducer::onMatchedLogEventInternalLocked( const size_t matcherIndex, const HashableDimensionKey& eventKey, const map& conditionKeys, bool condition, const LogEvent& event, bool scheduledPull) { - flushIfNeeded(event.GetTimestampNs()); + flushIfNeededLocked(event.GetTimestampNs()); if (matcherIndex == mStopAllIndex) { for (auto& pair : mCurrentSlicedDuration) { @@ -275,7 +276,7 @@ void DurationMetricProducer::onMatchedLogEventInternal( HashableDimensionKey atomKey = getHashableKey(getDimensionKey(event, mInternalDimension)); if (mCurrentSlicedDuration.find(eventKey) == mCurrentSlicedDuration.end()) { - if (hitGuardRail(eventKey)) { + if (hitGuardRailLocked(eventKey)) { return; } mCurrentSlicedDuration[eventKey] = createDurationTracker(eventKey, mPastBuckets[eventKey]); @@ -290,7 +291,7 @@ void DurationMetricProducer::onMatchedLogEventInternal( } } -size_t DurationMetricProducer::byteSize() const { +size_t DurationMetricProducer::byteSizeLocked() const { size_t totalSize = 0; for (const auto& pair : mPastBuckets) { totalSize += pair.second.size() * kBucketSize; diff --git a/cmds/statsd/src/metrics/DurationMetricProducer.h b/cmds/statsd/src/metrics/DurationMetricProducer.h index 5b5373ec9aeb1..8e32d148e221a 100644 --- a/cmds/statsd/src/metrics/DurationMetricProducer.h +++ b/cmds/statsd/src/metrics/DurationMetricProducer.h @@ -45,17 +45,7 @@ public: virtual ~DurationMetricProducer(); - void onConditionChanged(const bool conditionMet, const uint64_t eventTime) override; - void finish() override; - void flushIfNeeded(const uint64_t newEventTime) override; - - // TODO: Pass a timestamp as a parameter in onDumpReport. - std::unique_ptr> onDumpReport() override; - - void onSlicedConditionMayChange(const uint64_t eventTime) override; - - size_t byteSize() const override; // TODO: Implement this later. virtual void notifyAppUpgrade(const string& apk, const int uid, const int version) override{}; @@ -63,14 +53,30 @@ public: virtual void notifyAppRemoved(const string& apk, const int uid) override{}; protected: - void onMatchedLogEventInternal(const size_t matcherIndex, const HashableDimensionKey& eventKey, - const std::map& conditionKeys, - bool condition, const LogEvent& event, - bool scheduledPull) override; - - void startNewProtoOutputStream(long long timestamp) override; + void onMatchedLogEventInternalLocked( + const size_t matcherIndex, const HashableDimensionKey& eventKey, + const std::map& conditionKeys, bool condition, + const LogEvent& event, bool scheduledPull) override; private: + // TODO: Pass a timestamp as a parameter in onDumpReport. + std::unique_ptr> onDumpReportLocked() override; + + // Internal interface to handle condition change. + void onConditionChangedLocked(const bool conditionMet, const uint64_t eventTime) override; + + // Internal interface to handle sliced condition change. + void onSlicedConditionMayChangeLocked(const uint64_t eventTime) override; + + // Internal function to calculate the current used bytes. + size_t byteSizeLocked() const override; + + // Util function to flush the old packet. + void flushIfNeededLocked(const uint64_t& eventTime); + + // Util function to init/reset the proto output stream. + void startNewProtoOutputStreamLocked(long long timestamp); + const DurationMetric mMetric; // Index of the SimpleLogEntryMatcher which defines the start. @@ -96,11 +102,17 @@ private: std::unordered_map> mCurrentSlicedDuration; - std::unique_ptr createDurationTracker(const HashableDimensionKey& eventKey, - std::vector& bucket); - bool hitGuardRail(const HashableDimensionKey& newKey); + // Helper function to create a duration tracker given the metric aggregation type. + std::unique_ptr createDurationTracker( + const HashableDimensionKey& eventKey, std::vector& bucket) const; + + // Util function to check whether the specified dimension hits the guardrail. + bool hitGuardRailLocked(const HashableDimensionKey& newKey); static const size_t kBucketSize = sizeof(DurationBucket{}); + + FRIEND_TEST(DurationMetricTrackerTest, TestNoCondition); + FRIEND_TEST(DurationMetricTrackerTest, TestNonSlicedCondition); }; } // namespace statsd diff --git a/cmds/statsd/src/metrics/EventMetricProducer.cpp b/cmds/statsd/src/metrics/EventMetricProducer.cpp index 95a18f7241f0e..8bdc9e3bf9fcd 100644 --- a/cmds/statsd/src/metrics/EventMetricProducer.cpp +++ b/cmds/statsd/src/metrics/EventMetricProducer.cpp @@ -62,7 +62,7 @@ EventMetricProducer::EventMetricProducer(const ConfigKey& key, const EventMetric mConditionSliced = true; } - startNewProtoOutputStream(mStartTimeNs); + startNewProtoOutputStreamLocked(mStartTimeNs); VLOG("metric %s created. bucket size %lld start_time: %lld", metric.name().c_str(), (long long)mBucketSizeNs, (long long)mStartTimeNs); @@ -72,7 +72,7 @@ EventMetricProducer::~EventMetricProducer() { VLOG("~EventMetricProducer() called"); } -void EventMetricProducer::startNewProtoOutputStream(long long startTime) { +void EventMetricProducer::startNewProtoOutputStreamLocked(long long startTime) { mProto = std::make_unique(); // TODO: We need to auto-generate the field IDs for StatsLogReport, EventMetricData, // and StatsEvent. @@ -84,29 +84,30 @@ void EventMetricProducer::startNewProtoOutputStream(long long startTime) { void EventMetricProducer::finish() { } -void EventMetricProducer::onSlicedConditionMayChange(const uint64_t eventTime) { +void EventMetricProducer::onSlicedConditionMayChangeLocked(const uint64_t eventTime) { } -std::unique_ptr> EventMetricProducer::onDumpReport() { +std::unique_ptr> EventMetricProducer::onDumpReportLocked() { long long endTime = time(nullptr) * NS_PER_SEC; mProto->end(mProtoToken); mProto->write(FIELD_TYPE_INT64 | FIELD_ID_END_REPORT_NANOS, endTime); size_t bufferSize = mProto->size(); VLOG("metric %s dump report now... proto size: %zu ", mMetric.name().c_str(), bufferSize); - std::unique_ptr> buffer = serializeProto(); + std::unique_ptr> buffer = serializeProtoLocked(); - startNewProtoOutputStream(endTime); + startNewProtoOutputStreamLocked(endTime); return buffer; } -void EventMetricProducer::onConditionChanged(const bool conditionMet, const uint64_t eventTime) { +void EventMetricProducer::onConditionChangedLocked(const bool conditionMet, + const uint64_t eventTime) { VLOG("Metric %s onConditionChanged", mMetric.name().c_str()); mCondition = conditionMet; } -void EventMetricProducer::onMatchedLogEventInternal( +void EventMetricProducer::onMatchedLogEventInternalLocked( const size_t matcherIndex, const HashableDimensionKey& eventKey, const std::map& conditionKey, bool condition, const LogEvent& event, bool scheduledPull) { @@ -123,7 +124,7 @@ void EventMetricProducer::onMatchedLogEventInternal( mProto->end(wrapperToken); } -size_t EventMetricProducer::byteSize() const { +size_t EventMetricProducer::byteSizeLocked() const { return mProto->bytesWritten(); } diff --git a/cmds/statsd/src/metrics/EventMetricProducer.h b/cmds/statsd/src/metrics/EventMetricProducer.h index 33a951035dca1..afb48c4b9cefd 100644 --- a/cmds/statsd/src/metrics/EventMetricProducer.h +++ b/cmds/statsd/src/metrics/EventMetricProducer.h @@ -40,23 +40,7 @@ public: virtual ~EventMetricProducer(); - void onMatchedLogEventInternal(const size_t matcherIndex, const HashableDimensionKey& eventKey, - const std::map& conditionKey, - bool condition, const LogEvent& event, - bool scheduledPull) override; - - void onConditionChanged(const bool conditionMet, const uint64_t eventTime) override; - void finish() override; - void flushIfNeeded(const uint64_t newEventTime) override { - } - - // TODO: Pass a timestamp as a parameter in onDumpReport. - std::unique_ptr> onDumpReport() override; - - void onSlicedConditionMayChange(const uint64_t eventTime) override; - - size_t byteSize() const override; // TODO: Implement this later. virtual void notifyAppUpgrade(const string& apk, const int uid, const int version) override{}; @@ -64,9 +48,26 @@ public: virtual void notifyAppRemoved(const string& apk, const int uid) override{}; protected: - void startNewProtoOutputStream(long long timestamp) override; + void startNewProtoOutputStreamLocked(long long timestamp); private: + void onMatchedLogEventInternalLocked( + const size_t matcherIndex, const HashableDimensionKey& eventKey, + const std::map& conditionKey, bool condition, + const LogEvent& event, bool scheduledPull) override; + + // TODO: Pass a timestamp as a parameter in onDumpReport. + std::unique_ptr> onDumpReportLocked() override; + + // Internal interface to handle condition change. + void onConditionChangedLocked(const bool conditionMet, const uint64_t eventTime) override; + + // Internal interface to handle sliced condition change. + void onSlicedConditionMayChangeLocked(const uint64_t eventTime) override; + + // Internal function to calculate the current used bytes. + size_t byteSizeLocked() const override; + const EventMetric mMetric; }; diff --git a/cmds/statsd/src/metrics/GaugeMetricProducer.cpp b/cmds/statsd/src/metrics/GaugeMetricProducer.cpp index 1791654ba7ccc..2284ff174c350 100644 --- a/cmds/statsd/src/metrics/GaugeMetricProducer.cpp +++ b/cmds/statsd/src/metrics/GaugeMetricProducer.cpp @@ -91,7 +91,7 @@ GaugeMetricProducer::GaugeMetricProducer(const ConfigKey& key, const GaugeMetric metric.bucket().bucket_size_millis()); } - startNewProtoOutputStream(mStartTimeNs); + startNewProtoOutputStreamLocked(mStartTimeNs); VLOG("metric %s created. bucket size %lld start_time: %lld", metric.name().c_str(), (long long)mBucketSizeNs, (long long)mStartTimeNs); @@ -101,7 +101,7 @@ GaugeMetricProducer::~GaugeMetricProducer() { VLOG("~GaugeMetricProducer() called"); } -void GaugeMetricProducer::startNewProtoOutputStream(long long startTime) { +void GaugeMetricProducer::startNewProtoOutputStreamLocked(long long startTime) { mProto = std::make_unique(); mProto->write(FIELD_TYPE_STRING | FIELD_ID_NAME, mMetric.name()); mProto->write(FIELD_TYPE_INT64 | FIELD_ID_START_REPORT_NANOS, startTime); @@ -111,13 +111,13 @@ void GaugeMetricProducer::startNewProtoOutputStream(long long startTime) { void GaugeMetricProducer::finish() { } -std::unique_ptr> GaugeMetricProducer::onDumpReport() { +std::unique_ptr> GaugeMetricProducer::onDumpReportLocked() { VLOG("gauge metric %s dump report now...", mMetric.name().c_str()); // Dump current bucket if it's stale. // If current bucket is still on-going, don't force dump current bucket. // In finish(), We can force dump current bucket. - flushIfNeeded(time(nullptr) * NS_PER_SEC); + flushIfNeededLocked(time(nullptr) * NS_PER_SEC); for (const auto& pair : mPastBuckets) { const HashableDimensionKey& hashableKey = pair.first; @@ -167,9 +167,9 @@ std::unique_ptr> GaugeMetricProducer::onDumpReport() { mProto->write(FIELD_TYPE_INT64 | FIELD_ID_END_REPORT_NANOS, (long long)mCurrentBucketStartTimeNs); - std::unique_ptr> buffer = serializeProto(); + std::unique_ptr> buffer = serializeProtoLocked(); - startNewProtoOutputStream(time(nullptr) * NS_PER_SEC); + startNewProtoOutputStreamLocked(time(nullptr) * NS_PER_SEC); mPastBuckets.clear(); return buffer; @@ -177,10 +177,10 @@ std::unique_ptr> GaugeMetricProducer::onDumpReport() { // TODO: Clear mDimensionKeyMap once the report is dumped. } -void GaugeMetricProducer::onConditionChanged(const bool conditionMet, const uint64_t eventTime) { - AutoMutex _l(mLock); +void GaugeMetricProducer::onConditionChangedLocked(const bool conditionMet, + const uint64_t eventTime) { VLOG("Metric %s onConditionChanged", mMetric.name().c_str()); - flushIfNeeded(eventTime); + flushIfNeededLocked(eventTime); mCondition = conditionMet; // Push mode. No need to proactively pull the gauge data. @@ -202,10 +202,10 @@ void GaugeMetricProducer::onConditionChanged(const bool conditionMet, const uint for (const auto& data : allData) { onMatchedLogEvent(0, *data, false /*scheduledPull*/); } - flushIfNeeded(eventTime); + flushIfNeededLocked(eventTime); } -void GaugeMetricProducer::onSlicedConditionMayChange(const uint64_t eventTime) { +void GaugeMetricProducer::onSlicedConditionMayChangeLocked(const uint64_t eventTime) { VLOG("Metric %s onSlicedConditionMayChange", mMetric.name().c_str()); } @@ -221,13 +221,14 @@ int64_t GaugeMetricProducer::getGauge(const LogEvent& event) { } void GaugeMetricProducer::onDataPulled(const std::vector>& allData) { - AutoMutex mutex(mLock); + std::lock_guard lock(mMutex); + for (const auto& data : allData) { onMatchedLogEvent(0, *data, true /*scheduledPull*/); } } -bool GaugeMetricProducer::hitGuardRail(const HashableDimensionKey& newKey) { +bool GaugeMetricProducer::hitGuardRailLocked(const HashableDimensionKey& newKey) { if (mCurrentSlicedBucket->find(newKey) != mCurrentSlicedBucket->end()) { return false; } @@ -247,7 +248,7 @@ bool GaugeMetricProducer::hitGuardRail(const HashableDimensionKey& newKey) { return false; } -void GaugeMetricProducer::onMatchedLogEventInternal( +void GaugeMetricProducer::onMatchedLogEventInternalLocked( const size_t matcherIndex, const HashableDimensionKey& eventKey, const map& conditionKey, bool condition, const LogEvent& event, bool scheduledPull) { @@ -263,7 +264,7 @@ void GaugeMetricProducer::onMatchedLogEventInternal( // When the event happens in a new bucket, flush the old buckets. if (eventTimeNs >= mCurrentBucketStartTimeNs + mBucketSizeNs) { - flushIfNeeded(eventTimeNs); + flushIfNeededLocked(eventTimeNs); } // For gauge metric, we just simply use the first gauge in the given bucket. @@ -272,7 +273,7 @@ void GaugeMetricProducer::onMatchedLogEventInternal( } const long gauge = getGauge(event); if (gauge >= 0) { - if (hitGuardRail(eventKey)) { + if (hitGuardRailLocked(eventKey)) { return; } (*mCurrentSlicedBucket)[eventKey] = gauge; @@ -287,7 +288,7 @@ void GaugeMetricProducer::onMatchedLogEventInternal( // bucket. // if data is pushed, onMatchedLogEvent will only be called through onConditionChanged() inside // the GaugeMetricProducer while holding the lock. -void GaugeMetricProducer::flushIfNeeded(const uint64_t eventTimeNs) { +void GaugeMetricProducer::flushIfNeededLocked(const uint64_t& eventTimeNs) { if (eventTimeNs < mCurrentBucketStartTimeNs + mBucketSizeNs) { return; } @@ -320,7 +321,7 @@ void GaugeMetricProducer::flushIfNeeded(const uint64_t eventTimeNs) { (long long)mCurrentBucketStartTimeNs); } -size_t GaugeMetricProducer::byteSize() const { +size_t GaugeMetricProducer::byteSizeLocked() const { size_t totalSize = 0; for (const auto& pair : mPastBuckets) { totalSize += pair.second.size() * kBucketSize; diff --git a/cmds/statsd/src/metrics/GaugeMetricProducer.h b/cmds/statsd/src/metrics/GaugeMetricProducer.h index f344303179e8a..c839c09f953cf 100644 --- a/cmds/statsd/src/metrics/GaugeMetricProducer.h +++ b/cmds/statsd/src/metrics/GaugeMetricProducer.h @@ -56,16 +56,7 @@ public: // Handles when the pulled data arrives. void onDataPulled(const std::vector>& data) override; - void onConditionChanged(const bool conditionMet, const uint64_t eventTime) override; - void onSlicedConditionMayChange(const uint64_t eventTime) override; - void finish() override; - void flushIfNeeded(const uint64_t newEventTime) override; - - // TODO: Pass a timestamp as a parameter in onDumpReport. - std::unique_ptr> onDumpReport() override; - - size_t byteSize() const override; // TODO: Implement this later. virtual void notifyAppUpgrade(const string& apk, const int uid, const int version) override{}; @@ -73,24 +64,39 @@ public: virtual void notifyAppRemoved(const string& apk, const int uid) override{}; protected: - void onMatchedLogEventInternal(const size_t matcherIndex, const HashableDimensionKey& eventKey, - const std::map& conditionKey, - bool condition, const LogEvent& event, - bool scheduledPull) override; - - void startNewProtoOutputStream(long long timestamp) override; + void onMatchedLogEventInternalLocked( + const size_t matcherIndex, const HashableDimensionKey& eventKey, + const std::map& conditionKey, bool condition, + const LogEvent& event, bool scheduledPull) override; private: + // TODO: Pass a timestamp as a parameter in onDumpReport. + std::unique_ptr> onDumpReportLocked() override; + + // Internal interface to handle condition change. + void onConditionChangedLocked(const bool conditionMet, const uint64_t eventTime) override; + + // Internal interface to handle sliced condition change. + void onSlicedConditionMayChangeLocked(const uint64_t eventTime) override; + + // Internal function to calculate the current used bytes. + size_t byteSizeLocked() const override; + + // Util function to flush the old packet. + void flushIfNeededLocked(const uint64_t& eventTime); + + // Util function to init/reset the proto output stream. + void startNewProtoOutputStreamLocked(long long timestamp); + // The default bucket size for gauge metric is 1 second. static const uint64_t kDefaultGaugemBucketSizeNs = 1000 * 1000 * 1000; + const GaugeMetric mMetric; StatsPullerManager mStatsPullerManager; // tagId for pulled data. -1 if this is not pulled const int mPullTagId; - Mutex mLock; - // Save the past buckets and we can clear when the StatsLogReport is dumped. // TODO: Add a lock to mPastBuckets. std::unordered_map> mPastBuckets; @@ -100,7 +106,8 @@ private: int64_t getGauge(const LogEvent& event); - bool hitGuardRail(const HashableDimensionKey& newKey); + // Util function to check whether the specified dimension hits the guardrail. + bool hitGuardRailLocked(const HashableDimensionKey& newKey); static const size_t kBucketSize = sizeof(GaugeBucket{}); diff --git a/cmds/statsd/src/metrics/MetricProducer.cpp b/cmds/statsd/src/metrics/MetricProducer.cpp index 62fb632faaaf2..7e78a8558c4ab 100644 --- a/cmds/statsd/src/metrics/MetricProducer.cpp +++ b/cmds/statsd/src/metrics/MetricProducer.cpp @@ -21,8 +21,8 @@ namespace statsd { using std::map; -void MetricProducer::onMatchedLogEvent(const size_t matcherIndex, const LogEvent& event, - bool scheduledPull) { +void MetricProducer::onMatchedLogEventLocked(const size_t matcherIndex, const LogEvent& event, + bool scheduledPull) { uint64_t eventTimeNs = event.GetTimestampNs(); // this is old event, maybe statsd restarted? if (eventTimeNs < mStartTimeNs) { @@ -60,11 +60,11 @@ void MetricProducer::onMatchedLogEvent(const size_t matcherIndex, const LogEvent condition = mCondition; } - onMatchedLogEventInternal(matcherIndex, eventKey, conditionKeys, condition, event, - scheduledPull); + onMatchedLogEventInternalLocked(matcherIndex, eventKey, conditionKeys, condition, event, + scheduledPull); } -std::unique_ptr> MetricProducer::serializeProto() { +std::unique_ptr> MetricProducer::serializeProtoLocked() { size_t bufferSize = mProto->size(); std::unique_ptr> buffer(new std::vector(bufferSize)); diff --git a/cmds/statsd/src/metrics/MetricProducer.h b/cmds/statsd/src/metrics/MetricProducer.h index b22ff6f3348cc..5df712c4bc661 100644 --- a/cmds/statsd/src/metrics/MetricProducer.h +++ b/cmds/statsd/src/metrics/MetricProducer.h @@ -17,6 +17,8 @@ #ifndef METRIC_PRODUCER_H #define METRIC_PRODUCER_H +#include + #include "anomaly/AnomalyTracker.h" #include "condition/ConditionWizard.h" #include "config/ConfigKey.h" @@ -52,39 +54,61 @@ public: virtual ~MetricProducer(){}; // Consume the parsed stats log entry that already matched the "what" of the metric. - void onMatchedLogEvent(const size_t matcherIndex, const LogEvent& event, bool scheduledPull); + void onMatchedLogEvent(const size_t matcherIndex, const LogEvent& event, bool scheduledPull) { + std::lock_guard lock(mMutex); + onMatchedLogEventLocked(matcherIndex, event, scheduledPull); + } - virtual void onConditionChanged(const bool condition, const uint64_t eventTime) = 0; + void onConditionChanged(const bool condition, const uint64_t eventTime) { + std::lock_guard lock(mMutex); + onConditionChangedLocked(condition, eventTime); + } - virtual void onSlicedConditionMayChange(const uint64_t eventTime) = 0; + void onSlicedConditionMayChange(const uint64_t eventTime) { + std::lock_guard lock(mMutex); + onSlicedConditionMayChangeLocked(eventTime); + } + + bool isConditionSliced() const { + std::lock_guard lock(mMutex); + return mConditionSliced; + }; // This is called when the metric collecting is done, e.g., when there is a new configuration // coming. MetricProducer should do the clean up, and dump existing data to dropbox. virtual void finish() = 0; - virtual void flushIfNeeded(const uint64_t newEventTime) = 0; // TODO: Pass a timestamp as a parameter in onDumpReport and update all its // implementations. // onDumpReport returns the proto-serialized output and clears the previously stored contents. - virtual std::unique_ptr> onDumpReport() = 0; - - virtual bool isConditionSliced() const { - return mConditionSliced; - }; + std::unique_ptr> onDumpReport() { + std::lock_guard lock(mMutex); + return onDumpReportLocked(); + } // Returns the memory in bytes currently used to store this metric's data. Does not change // state. - virtual size_t byteSize() const = 0; + size_t byteSize() const { + std::lock_guard lock(mMutex); + return byteSizeLocked(); + } void addAnomalyTracker(sp tracker) { + std::lock_guard lock(mMutex); mAnomalyTrackers.push_back(tracker); } int64_t getBuckeSizeInNs() const { + std::lock_guard lock(mMutex); return mBucketSizeNs; } protected: + virtual void onConditionChangedLocked(const bool condition, const uint64_t eventTime) = 0; + virtual void onSlicedConditionMayChangeLocked(const uint64_t eventTime) = 0; + virtual std::unique_ptr> onDumpReportLocked() = 0; + virtual size_t byteSizeLocked() const = 0; + const ConfigKey mConfigKey; const uint64_t mStartTimeNs; @@ -128,18 +152,24 @@ protected: * nonSlicedCondition. * [event]: the log event, just in case the metric needs its data, e.g., EventMetric. */ - virtual void onMatchedLogEventInternal( + virtual void onMatchedLogEventInternalLocked( const size_t matcherIndex, const HashableDimensionKey& eventKey, const std::map& conditionKey, bool condition, const LogEvent& event, bool scheduledPull) = 0; + // Consume the parsed stats log entry that already matched the "what" of the metric. + void onMatchedLogEventLocked(const size_t matcherIndex, const LogEvent& event, + bool scheduledPull); + std::unique_ptr mProto; long long mProtoToken; - virtual void startNewProtoOutputStream(long long timestamp) = 0; + // Read/Write mutex to make the producer thread-safe. + // TODO(yanglu): replace with std::shared_mutex when available in libc++. + mutable std::mutex mMutex; - std::unique_ptr> serializeProto(); + std::unique_ptr> serializeProtoLocked(); }; } // namespace statsd diff --git a/cmds/statsd/src/metrics/MetricsManager.cpp b/cmds/statsd/src/metrics/MetricsManager.cpp index e986c1ab2ddbe..8da8316a72258 100644 --- a/cmds/statsd/src/metrics/MetricsManager.cpp +++ b/cmds/statsd/src/metrics/MetricsManager.cpp @@ -142,7 +142,7 @@ void MetricsManager::onLogEvent(const LogEvent& event) { // metric cares about sliced conditions, and it may have changed. Send // notification, and the metric can query the sliced conditions that are // interesting to it. - } else if (mAllMetricProducers[metricIndex]->isConditionSliced()) { + } else { mAllMetricProducers[metricIndex]->onSlicedConditionMayChange(eventTime); } } @@ -159,8 +159,8 @@ void MetricsManager::onLogEvent(const LogEvent& event) { auto& metricList = pair->second; for (const int metricIndex : metricList) { // pushed metrics are never scheduled pulls - mAllMetricProducers[metricIndex]->onMatchedLogEvent( - i, event, false /* schedulePull */); + mAllMetricProducers[metricIndex]->onMatchedLogEvent(i, event, + false /* schedulePull */); } } } diff --git a/cmds/statsd/src/metrics/ValueMetricProducer.cpp b/cmds/statsd/src/metrics/ValueMetricProducer.cpp index 66c8419de6621..977aa88a14d28 100644 --- a/cmds/statsd/src/metrics/ValueMetricProducer.cpp +++ b/cmds/statsd/src/metrics/ValueMetricProducer.cpp @@ -98,7 +98,7 @@ ValueMetricProducer::ValueMetricProducer(const ConfigKey& key, const ValueMetric metric.bucket().bucket_size_millis()); } - startNewProtoOutputStream(mStartTimeNs); + startNewProtoOutputStreamLocked(mStartTimeNs); VLOG("value metric %s created. bucket size %lld start_time: %lld", metric.name().c_str(), (long long)mBucketSizeNs, (long long)mStartTimeNs); @@ -120,7 +120,7 @@ ValueMetricProducer::~ValueMetricProducer() { } } -void ValueMetricProducer::startNewProtoOutputStream(long long startTime) { +void ValueMetricProducer::startNewProtoOutputStreamLocked(long long startTime) { mProto = std::make_unique(); mProto->write(FIELD_TYPE_STRING | FIELD_ID_NAME, mMetric.name()); mProto->write(FIELD_TYPE_INT64 | FIELD_ID_START_REPORT_NANOS, startTime); @@ -132,11 +132,11 @@ void ValueMetricProducer::finish() { // DropboxWriter. } -void ValueMetricProducer::onSlicedConditionMayChange(const uint64_t eventTime) { +void ValueMetricProducer::onSlicedConditionMayChangeLocked(const uint64_t eventTime) { VLOG("Metric %s onSlicedConditionMayChange", mMetric.name().c_str()); } -std::unique_ptr> ValueMetricProducer::onDumpReport() { +std::unique_ptr> ValueMetricProducer::onDumpReportLocked() { VLOG("metric %s dump report now...", mMetric.name().c_str()); for (const auto& pair : mPastBuckets) { @@ -187,9 +187,9 @@ std::unique_ptr> ValueMetricProducer::onDumpReport() { (long long)mCurrentBucketStartTimeNs); VLOG("metric %s dump report now...", mMetric.name().c_str()); - std::unique_ptr> buffer = serializeProto(); + std::unique_ptr> buffer = serializeProtoLocked(); - startNewProtoOutputStream(time(nullptr) * NS_PER_SEC); + startNewProtoOutputStreamLocked(time(nullptr) * NS_PER_SEC); mPastBuckets.clear(); return buffer; @@ -197,8 +197,7 @@ std::unique_ptr> ValueMetricProducer::onDumpReport() { // TODO: Clear mDimensionKeyMap once the report is dumped. } -void ValueMetricProducer::onConditionChanged(const bool condition, const uint64_t eventTime) { - AutoMutex _l(mLock); +void ValueMetricProducer::onConditionChangedLocked(const bool condition, const uint64_t eventTime) { mCondition = condition; if (mPullTagId != -1) { @@ -215,16 +214,17 @@ void ValueMetricProducer::onConditionChanged(const bool condition, const uint64_ return; } for (const auto& data : allData) { - onMatchedLogEvent(0, *data, false); + onMatchedLogEventLocked(0, *data, false); } - flushIfNeeded(eventTime); + flushIfNeededLocked(eventTime); } return; } } void ValueMetricProducer::onDataPulled(const std::vector>& allData) { - AutoMutex _l(mLock); + std::lock_guard lock(mMutex); + if (mCondition == true || !mMetric.has_condition()) { if (allData.size() == 0) { return; @@ -232,16 +232,16 @@ void ValueMetricProducer::onDataPulled(const std::vectorGetTimestampNs(); // alarm is not accurate and might drift. if (eventTime > mCurrentBucketStartTimeNs + mBucketSizeNs * 3 / 2) { - flushIfNeeded(eventTime); + flushIfNeededLocked(eventTime); } for (const auto& data : allData) { - onMatchedLogEvent(0, *data, true); + onMatchedLogEventLocked(0, *data, true); } - flushIfNeeded(eventTime); + flushIfNeededLocked(eventTime); } } -bool ValueMetricProducer::hitGuardRail(const HashableDimensionKey& newKey) { +bool ValueMetricProducer::hitGuardRailLocked(const HashableDimensionKey& newKey) { // ===========GuardRail============== // 1. Report the tuple count if the tuple count > soft limit if (mCurrentSlicedBucket.find(newKey) != mCurrentSlicedBucket.end()) { @@ -262,7 +262,7 @@ bool ValueMetricProducer::hitGuardRail(const HashableDimensionKey& newKey) { return false; } -void ValueMetricProducer::onMatchedLogEventInternal( +void ValueMetricProducer::onMatchedLogEventInternalLocked( const size_t matcherIndex, const HashableDimensionKey& eventKey, const map& conditionKey, bool condition, const LogEvent& event, bool scheduledPull) { @@ -273,7 +273,7 @@ void ValueMetricProducer::onMatchedLogEventInternal( return; } - if (hitGuardRail(eventKey)) { + if (hitGuardRailLocked(eventKey)) { return; } Interval& interval = mCurrentSlicedBucket[eventKey]; @@ -308,7 +308,7 @@ void ValueMetricProducer::onMatchedLogEventInternal( } } } else { - flushIfNeeded(eventTimeNs); + flushIfNeededLocked(eventTimeNs); interval.raw.push_back(make_pair(value, 0)); } } @@ -324,7 +324,7 @@ long ValueMetricProducer::get_value(const LogEvent& event) { } } -void ValueMetricProducer::flushIfNeeded(const uint64_t eventTimeNs) { +void ValueMetricProducer::flushIfNeededLocked(const uint64_t& eventTimeNs) { if (mCurrentBucketStartTimeNs + mBucketSizeNs > eventTimeNs) { VLOG("eventTime is %lld, less than next bucket start time %lld", (long long)eventTimeNs, (long long)(mCurrentBucketStartTimeNs + mBucketSizeNs)); @@ -372,7 +372,7 @@ void ValueMetricProducer::flushIfNeeded(const uint64_t eventTimeNs) { (long long)mCurrentBucketStartTimeNs); } -size_t ValueMetricProducer::byteSize() const { +size_t ValueMetricProducer::byteSizeLocked() const { size_t totalSize = 0; for (const auto& pair : mPastBuckets) { totalSize += pair.second.size() * kBucketSize; diff --git a/cmds/statsd/src/metrics/ValueMetricProducer.h b/cmds/statsd/src/metrics/ValueMetricProducer.h index a024bd804d835..60b725d2ab55c 100644 --- a/cmds/statsd/src/metrics/ValueMetricProducer.h +++ b/cmds/statsd/src/metrics/ValueMetricProducer.h @@ -44,34 +44,40 @@ public: virtual ~ValueMetricProducer(); - void onConditionChanged(const bool condition, const uint64_t eventTime) override; - void finish() override; - void flushIfNeeded(const uint64_t eventTimeNs) override; - - // TODO: Pass a timestamp as a parameter in onDumpReport. - std::unique_ptr> onDumpReport() override; - - void onSlicedConditionMayChange(const uint64_t eventTime); void onDataPulled(const std::vector>& data) override; - size_t byteSize() const override; - // TODO: Implement this later. virtual void notifyAppUpgrade(const string& apk, const int uid, const int version) override{}; // TODO: Implement this later. virtual void notifyAppRemoved(const string& apk, const int uid) override{}; protected: - void onMatchedLogEventInternal(const size_t matcherIndex, const HashableDimensionKey& eventKey, - const std::map& conditionKey, - bool condition, const LogEvent& event, - bool scheduledPull) override; - - void startNewProtoOutputStream(long long timestamp) override; + void onMatchedLogEventInternalLocked( + const size_t matcherIndex, const HashableDimensionKey& eventKey, + const std::map& conditionKey, bool condition, + const LogEvent& event, bool scheduledPull) override; private: + // TODO: Pass a timestamp as a parameter in onDumpReport. + std::unique_ptr> onDumpReportLocked() override; + + // Internal interface to handle condition change. + void onConditionChangedLocked(const bool conditionMet, const uint64_t eventTime) override; + + // Internal interface to handle sliced condition change. + void onSlicedConditionMayChangeLocked(const uint64_t eventTime) override; + + // Internal function to calculate the current used bytes. + size_t byteSizeLocked() const override; + + // Util function to flush the old packet. + void flushIfNeededLocked(const uint64_t& eventTime); + + // Util function to init/reset the proto output stream. + void startNewProtoOutputStreamLocked(long long timestamp); + const ValueMetric mMetric; std::shared_ptr mStatsPullerManager; @@ -82,8 +88,6 @@ private: const int pullTagId, const uint64_t startTimeNs, std::shared_ptr statsPullerManager); - Mutex mLock; - // tagId for pulled data. -1 if this is not pulled const int mPullTagId; @@ -104,7 +108,8 @@ private: long get_value(const LogEvent& event); - bool hitGuardRail(const HashableDimensionKey& newKey); + // Util function to check whether the specified dimension hits the guardrail. + bool hitGuardRailLocked(const HashableDimensionKey& newKey); static const size_t kBucketSize = sizeof(ValueBucket{}); diff --git a/cmds/statsd/src/metrics/metrics_manager_util.cpp b/cmds/statsd/src/metrics/metrics_manager_util.cpp index 9e5163fe9db5d..33683f04cdd4f 100644 --- a/cmds/statsd/src/metrics/metrics_manager_util.cpp +++ b/cmds/statsd/src/metrics/metrics_manager_util.cpp @@ -469,11 +469,11 @@ bool initAlerts(const StatsdConfig& config, const unordered_map& me } const int metricIndex = itr->second; if (alert.trigger_if_sum_gt() > - (int64_t) alert.number_of_buckets() - * allMetricProducers[metricIndex]->getBuckeSizeInNs()) { + (int64_t)alert.number_of_buckets() * + allMetricProducers[metricIndex]->getBuckeSizeInNs()) { ALOGW("invalid alert: threshold (%lld) > possible recordable value (%d x %lld)", alert.trigger_if_sum_gt(), alert.number_of_buckets(), - (long long) allMetricProducers[metricIndex]->getBuckeSizeInNs()); + (long long)allMetricProducers[metricIndex]->getBuckeSizeInNs()); return false; } diff --git a/cmds/statsd/tests/metrics/CountMetricProducer_test.cpp b/cmds/statsd/tests/metrics/CountMetricProducer_test.cpp index 2cbeaaa8601d0..dc42943e82339 100644 --- a/cmds/statsd/tests/metrics/CountMetricProducer_test.cpp +++ b/cmds/statsd/tests/metrics/CountMetricProducer_test.cpp @@ -58,11 +58,11 @@ TEST(CountMetricProducerTest, TestNonDimensionalEvents) { countProducer.onMatchedLogEvent(1 /*log matcher index*/, event2, false); // Flushes at event #2. - countProducer.flushIfNeeded(bucketStartTimeNs + 2); + countProducer.flushIfNeededLocked(bucketStartTimeNs + 2); EXPECT_EQ(0UL, countProducer.mPastBuckets.size()); // Flushes. - countProducer.flushIfNeeded(bucketStartTimeNs + bucketSizeNs + 1); + countProducer.flushIfNeededLocked(bucketStartTimeNs + bucketSizeNs + 1); EXPECT_EQ(1UL, countProducer.mPastBuckets.size()); EXPECT_TRUE(countProducer.mPastBuckets.find(DEFAULT_DIMENSION_KEY) != countProducer.mPastBuckets.end()); @@ -75,7 +75,7 @@ TEST(CountMetricProducerTest, TestNonDimensionalEvents) { // 1 matched event happens in bucket 2. LogEvent event3(tagId, bucketStartTimeNs + bucketSizeNs + 2); countProducer.onMatchedLogEvent(1 /*log matcher index*/, event3, false); - countProducer.flushIfNeeded(bucketStartTimeNs + 2 * bucketSizeNs + 1); + countProducer.flushIfNeededLocked(bucketStartTimeNs + 2 * bucketSizeNs + 1); EXPECT_EQ(1UL, countProducer.mPastBuckets.size()); EXPECT_TRUE(countProducer.mPastBuckets.find(DEFAULT_DIMENSION_KEY) != countProducer.mPastBuckets.end()); @@ -86,7 +86,7 @@ TEST(CountMetricProducerTest, TestNonDimensionalEvents) { EXPECT_EQ(1LL, bucketInfo2.mCount); // nothing happens in bucket 3. we should not record anything for bucket 3. - countProducer.flushIfNeeded(bucketStartTimeNs + 3 * bucketSizeNs + 1); + countProducer.flushIfNeededLocked(bucketStartTimeNs + 3 * bucketSizeNs + 1); EXPECT_EQ(1UL, countProducer.mPastBuckets.size()); EXPECT_TRUE(countProducer.mPastBuckets.find(DEFAULT_DIMENSION_KEY) != countProducer.mPastBuckets.end()); @@ -119,7 +119,7 @@ TEST(CountMetricProducerTest, TestEventsWithNonSlicedCondition) { countProducer.onMatchedLogEvent(1 /*matcher index*/, event2, false /*pulled*/); EXPECT_EQ(0UL, countProducer.mPastBuckets.size()); - countProducer.flushIfNeeded(bucketStartTimeNs + bucketSizeNs + 1); + countProducer.flushIfNeededLocked(bucketStartTimeNs + bucketSizeNs + 1); EXPECT_EQ(1UL, countProducer.mPastBuckets.size()); EXPECT_TRUE(countProducer.mPastBuckets.find(DEFAULT_DIMENSION_KEY) != countProducer.mPastBuckets.end()); @@ -167,11 +167,11 @@ TEST(CountMetricProducerTest, TestEventsWithSlicedCondition) { bucketStartTimeNs); countProducer.onMatchedLogEvent(1 /*log matcher index*/, event1, false); - countProducer.flushIfNeeded(bucketStartTimeNs + 1); + countProducer.flushIfNeededLocked(bucketStartTimeNs + 1); EXPECT_EQ(0UL, countProducer.mPastBuckets.size()); countProducer.onMatchedLogEvent(1 /*log matcher index*/, event2, false); - countProducer.flushIfNeeded(bucketStartTimeNs + bucketSizeNs + 1); + countProducer.flushIfNeededLocked(bucketStartTimeNs + bucketSizeNs + 1); EXPECT_EQ(1UL, countProducer.mPastBuckets.size()); EXPECT_TRUE(countProducer.mPastBuckets.find(DEFAULT_DIMENSION_KEY) != countProducer.mPastBuckets.end()); diff --git a/cmds/statsd/tests/metrics/DurationMetricProducer_test.cpp b/cmds/statsd/tests/metrics/DurationMetricProducer_test.cpp new file mode 100644 index 0000000000000..3f2b7cdd22ee7 --- /dev/null +++ b/cmds/statsd/tests/metrics/DurationMetricProducer_test.cpp @@ -0,0 +1,126 @@ +// Copyright (C) 2017 The Android Open Source Project +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "src/metrics/DurationMetricProducer.h" +#include "metrics_test_helper.h" +#include "src/condition/ConditionWizard.h" + +#include +#include +#include +#include +#include +#include + +using namespace android::os::statsd; +using namespace testing; +using android::sp; +using std::set; +using std::unordered_map; +using std::vector; + +#ifdef __ANDROID__ + +namespace android { +namespace os { +namespace statsd { + +const ConfigKey kConfigKey(0, "test"); + +TEST(DurationMetricTrackerTest, TestNoCondition) { + sp wizard = new NaggyMock(); + uint64_t bucketStartTimeNs = 10000000000; + uint64_t bucketSizeNs = 30 * 1000 * 1000 * 1000LL; + + DurationMetric metric; + metric.set_name("1"); + metric.mutable_bucket()->set_bucket_size_millis(bucketSizeNs / 1000000); + metric.set_aggregation_type(DurationMetric_AggregationType_SUM); + + int tagId = 1; + LogEvent event1(tagId, bucketStartTimeNs + 1); + LogEvent event2(tagId, bucketStartTimeNs + bucketSizeNs + 2); + + DurationMetricProducer durationProducer( + kConfigKey, metric, -1 /*no condition*/, 1 /* start index */, 2 /* stop index */, + 3 /* stop_all index */, false /*nesting*/, wizard, {}, bucketStartTimeNs); + + durationProducer.onMatchedLogEvent(1 /* start index*/, event1, false /* scheduledPull */); + durationProducer.onMatchedLogEvent(2 /* stop index*/, event2, false /* scheduledPull */); + durationProducer.flushIfNeededLocked(bucketStartTimeNs + 2 * bucketSizeNs + 1); + EXPECT_EQ(1UL, durationProducer.mPastBuckets.size()); + EXPECT_TRUE(durationProducer.mPastBuckets.find(DEFAULT_DIMENSION_KEY) != + durationProducer.mPastBuckets.end()); + const auto& buckets = durationProducer.mPastBuckets[DEFAULT_DIMENSION_KEY]; + EXPECT_EQ(2UL, buckets.size()); + EXPECT_EQ(bucketStartTimeNs, buckets[0].mBucketStartNs); + EXPECT_EQ(bucketStartTimeNs + bucketSizeNs, buckets[0].mBucketEndNs); + EXPECT_EQ(bucketSizeNs - 1ULL, buckets[0].mDuration); + EXPECT_EQ(bucketStartTimeNs + bucketSizeNs, buckets[1].mBucketStartNs); + EXPECT_EQ(bucketStartTimeNs + 2 * bucketSizeNs, buckets[1].mBucketEndNs); + EXPECT_EQ(2ULL, buckets[1].mDuration); +} + +TEST(DurationMetricTrackerTest, TestNonSlicedCondition) { + sp wizard = new NaggyMock(); + uint64_t bucketStartTimeNs = 10000000000; + uint64_t bucketSizeNs = 30 * 1000 * 1000 * 1000LL; + + DurationMetric metric; + metric.set_name("1"); + metric.mutable_bucket()->set_bucket_size_millis(bucketSizeNs / 1000000); + metric.set_aggregation_type(DurationMetric_AggregationType_SUM); + + int tagId = 1; + LogEvent event1(tagId, bucketStartTimeNs + 1); + LogEvent event2(tagId, bucketStartTimeNs + 2); + LogEvent event3(tagId, bucketStartTimeNs + bucketSizeNs + 1); + LogEvent event4(tagId, bucketStartTimeNs + bucketSizeNs + 3); + + DurationMetricProducer durationProducer( + kConfigKey, metric, 0 /*no condition*/, 1 /* start index */, 2 /* stop index */, + 3 /* stop_all index */, false /*nesting*/, wizard, {}, bucketStartTimeNs); + EXPECT_FALSE(durationProducer.mCondition); + EXPECT_FALSE(durationProducer.isConditionSliced()); + + durationProducer.onMatchedLogEvent(1 /* start index*/, event1, false /* scheduledPull */); + durationProducer.onMatchedLogEvent(2 /* stop index*/, event2, false /* scheduledPull */); + durationProducer.flushIfNeededLocked(bucketStartTimeNs + bucketSizeNs + 1); + EXPECT_EQ(1UL, durationProducer.mPastBuckets.size()); + EXPECT_TRUE(durationProducer.mPastBuckets.find(DEFAULT_DIMENSION_KEY) != + durationProducer.mPastBuckets.end()); + const auto& buckets1 = durationProducer.mPastBuckets[DEFAULT_DIMENSION_KEY]; + EXPECT_EQ(0UL, buckets1.size()); + + + durationProducer.onMatchedLogEvent(1 /* start index*/, event3, false /* scheduledPull */); + durationProducer.onConditionChanged(true /* condition */, bucketStartTimeNs + bucketSizeNs + 2); + durationProducer.onMatchedLogEvent(2 /* stop index*/, event4, false /* scheduledPull */); + durationProducer.flushIfNeededLocked(bucketStartTimeNs + 2 * bucketSizeNs + 1); + EXPECT_EQ(1UL, durationProducer.mPastBuckets.size()); + EXPECT_TRUE(durationProducer.mPastBuckets.find(DEFAULT_DIMENSION_KEY) != + durationProducer.mPastBuckets.end()); + const auto& buckets2 = durationProducer.mPastBuckets[DEFAULT_DIMENSION_KEY]; + EXPECT_EQ(1UL, buckets2.size()); + EXPECT_EQ(bucketStartTimeNs + bucketSizeNs, buckets2[0].mBucketStartNs); + EXPECT_EQ(bucketStartTimeNs + 2 * bucketSizeNs, buckets2[0].mBucketEndNs); + EXPECT_EQ(1ULL, buckets2[0].mDuration); +} + +} // namespace statsd +} // namespace os +} // namespace android +#else +GTEST_LOG_(INFO) << "This test does nothing.\n"; +#endif diff --git a/cmds/statsd/tests/metrics/GaugeMetricProducer_test.cpp b/cmds/statsd/tests/metrics/GaugeMetricProducer_test.cpp index 85f50081ca1eb..ed13db25397ac 100644 --- a/cmds/statsd/tests/metrics/GaugeMetricProducer_test.cpp +++ b/cmds/statsd/tests/metrics/GaugeMetricProducer_test.cpp @@ -60,7 +60,7 @@ TEST(GaugeMetricProducerTest, TestWithCondition) { allData.push_back(event2); gaugeProducer.onDataPulled(allData); - gaugeProducer.flushIfNeeded(event2->GetTimestampNs() + 1); + gaugeProducer.flushIfNeededLocked(event2->GetTimestampNs() + 1); EXPECT_EQ(0UL, gaugeProducer.mCurrentSlicedBucket->size()); EXPECT_EQ(0UL, gaugeProducer.mPastBuckets.size()); @@ -74,7 +74,7 @@ TEST(GaugeMetricProducerTest, TestWithCondition) { event3->init(); allData.push_back(event3); gaugeProducer.onDataPulled(allData); - gaugeProducer.flushIfNeeded(bucketStartTimeNs + 2 * bucketSizeNs + 10); + gaugeProducer.flushIfNeededLocked(bucketStartTimeNs + 2 * bucketSizeNs + 10); EXPECT_EQ(1UL, gaugeProducer.mCurrentSlicedBucket->size()); EXPECT_EQ(25L, gaugeProducer.mCurrentSlicedBucket->begin()->second); // One dimension. diff --git a/cmds/statsd/tests/metrics/ValueMetricProducer_test.cpp b/cmds/statsd/tests/metrics/ValueMetricProducer_test.cpp index 9d784663de03c..d3206977a89d3 100644 --- a/cmds/statsd/tests/metrics/ValueMetricProducer_test.cpp +++ b/cmds/statsd/tests/metrics/ValueMetricProducer_test.cpp @@ -282,7 +282,7 @@ TEST(ValueMetricProducerTest, TestPushedEventsWithoutCondition) { EXPECT_EQ(20, curInterval.raw.back().first); EXPECT_EQ(0UL, valueProducer.mNextSlicedBucket.size()); - valueProducer.flushIfNeeded(bucket3StartTimeNs); + valueProducer.flushIfNeededLocked(bucket3StartTimeNs); EXPECT_EQ(1UL, valueProducer.mPastBuckets.size()); EXPECT_EQ(1UL, valueProducer.mPastBuckets.begin()->second.size()); EXPECT_EQ(30, valueProducer.mPastBuckets.begin()->second.back().mValue);