1/ Only expose thread-safe interfaces in metric producer.

2/ Simplify lock logic.
3/ Add test for duration metric producer.

Test: all unit test passsed.
Change-Id: If6ee2e69a17f12406f4b3ea3553b14642cd636d6
This commit is contained in:
Yangster
2017-11-29 12:01:05 -08:00
committed by Yangster-mac
parent cf6e63b4db
commit f2bee6fec9
20 changed files with 396 additions and 204 deletions

View File

@@ -167,6 +167,7 @@ LOCAL_SRC_FILES := \
tests/metrics/OringDurationTracker_test.cpp \
tests/metrics/MaxDurationTracker_test.cpp \
tests/metrics/CountMetricProducer_test.cpp \
tests/metrics/DurationMetricProducer_test.cpp \
tests/metrics/EventMetricProducer_test.cpp \
tests/metrics/ValueMetricProducer_test.cpp \
tests/guardrail/StatsdStats_test.cpp

View File

@@ -36,7 +36,7 @@ public:
const std::function<void(const ConfigKey&)>& sendBroadcast);
virtual ~StatsLogProcessor();
virtual void OnLogEvent(const LogEvent& event);
void OnLogEvent(const LogEvent& event);
void OnConfigUpdated(const ConfigKey& key, const StatsdConfig& config);
void OnConfigRemoved(const ConfigKey& key);

View File

@@ -83,7 +83,7 @@ CountMetricProducer::CountMetricProducer(const ConfigKey& key, const CountMetric
mConditionSliced = true;
}
startNewProtoOutputStream(mStartTimeNs);
startNewProtoOutputStreamLocked(mStartTimeNs);
VLOG("metric %s created. bucket size %lld start_time: %lld", metric.name().c_str(),
(long long)mBucketSizeNs, (long long)mStartTimeNs);
@@ -93,7 +93,7 @@ CountMetricProducer::~CountMetricProducer() {
VLOG("~CountMetricProducer() called");
}
void CountMetricProducer::startNewProtoOutputStream(long long startTime) {
void CountMetricProducer::startNewProtoOutputStreamLocked(long long startTime) {
mProto = std::make_unique<ProtoOutputStream>();
mProto->write(FIELD_TYPE_STRING | FIELD_ID_NAME, mMetric.name());
mProto->write(FIELD_TYPE_INT64 | FIELD_ID_START_REPORT_NANOS, startTime);
@@ -103,17 +103,17 @@ void CountMetricProducer::startNewProtoOutputStream(long long startTime) {
void CountMetricProducer::finish() {
}
void CountMetricProducer::onSlicedConditionMayChange(const uint64_t eventTime) {
void CountMetricProducer::onSlicedConditionMayChangeLocked(const uint64_t eventTime) {
VLOG("Metric %s onSlicedConditionMayChange", mMetric.name().c_str());
}
std::unique_ptr<std::vector<uint8_t>> CountMetricProducer::onDumpReport() {
std::unique_ptr<std::vector<uint8_t>> CountMetricProducer::onDumpReportLocked() {
long long endTime = time(nullptr) * NS_PER_SEC;
// Dump current bucket if it's stale.
// If current bucket is still on-going, don't force dump current bucket.
// In finish(), We can force dump current bucket.
flushIfNeeded(endTime);
flushIfNeededLocked(endTime);
VLOG("metric %s dump report now...", mMetric.name().c_str());
for (const auto& counter : mPastBuckets) {
@@ -165,9 +165,9 @@ std::unique_ptr<std::vector<uint8_t>> CountMetricProducer::onDumpReport() {
(long long)mCurrentBucketStartTimeNs);
VLOG("metric %s dump report now...", mMetric.name().c_str());
std::unique_ptr<std::vector<uint8_t>> buffer = serializeProto();
std::unique_ptr<std::vector<uint8_t>> buffer = serializeProtoLocked();
startNewProtoOutputStream(endTime);
startNewProtoOutputStreamLocked(endTime);
mPastBuckets.clear();
return buffer;
@@ -175,12 +175,13 @@ std::unique_ptr<std::vector<uint8_t>> CountMetricProducer::onDumpReport() {
// TODO: Clear mDimensionKeyMap once the report is dumped.
}
void CountMetricProducer::onConditionChanged(const bool conditionMet, const uint64_t eventTime) {
void CountMetricProducer::onConditionChangedLocked(const bool conditionMet,
const uint64_t eventTime) {
VLOG("Metric %s onConditionChanged", mMetric.name().c_str());
mCondition = conditionMet;
}
bool CountMetricProducer::hitGuardRail(const HashableDimensionKey& newKey) {
bool CountMetricProducer::hitGuardRailLocked(const HashableDimensionKey& newKey) {
if (mCurrentSlicedCounter->find(newKey) != mCurrentSlicedCounter->end()) {
return false;
}
@@ -200,13 +201,14 @@ bool CountMetricProducer::hitGuardRail(const HashableDimensionKey& newKey) {
return false;
}
void CountMetricProducer::onMatchedLogEventInternal(
void CountMetricProducer::onMatchedLogEventInternalLocked(
const size_t matcherIndex, const HashableDimensionKey& eventKey,
const map<string, HashableDimensionKey>& conditionKey, bool condition,
const LogEvent& event, bool scheduledPull) {
uint64_t eventTimeNs = event.GetTimestampNs();
flushIfNeeded(eventTimeNs);
flushIfNeededLocked(eventTimeNs);
if (condition == false) {
return;
@@ -216,7 +218,7 @@ void CountMetricProducer::onMatchedLogEventInternal(
if (it == mCurrentSlicedCounter->end()) {
// ===========GuardRail==============
if (hitGuardRail(eventKey)) {
if (hitGuardRailLocked(eventKey)) {
return;
}
@@ -239,7 +241,7 @@ void CountMetricProducer::onMatchedLogEventInternal(
// When a new matched event comes in, we check if event falls into the current
// bucket. If not, flush the old counter to past buckets and initialize the new bucket.
void CountMetricProducer::flushIfNeeded(const uint64_t eventTimeNs) {
void CountMetricProducer::flushIfNeededLocked(const uint64_t& eventTimeNs) {
if (eventTimeNs < mCurrentBucketStartTimeNs + mBucketSizeNs) {
return;
}
@@ -272,7 +274,7 @@ void CountMetricProducer::flushIfNeeded(const uint64_t eventTimeNs) {
// Rough estimate of CountMetricProducer buffer stored. This number will be
// greater than actual data size as it contains each dimension of
// CountMetricData is duplicated.
size_t CountMetricProducer::byteSize() const {
size_t CountMetricProducer::byteSizeLocked() const {
size_t totalSize = 0;
for (const auto& pair : mPastBuckets) {
totalSize += pair.second.size() * kBucketSize;

View File

@@ -48,33 +48,38 @@ public:
virtual ~CountMetricProducer();
void onConditionChanged(const bool conditionMet, const uint64_t eventTime) override;
void finish() override;
void flushIfNeeded(const uint64_t newEventTime) override;
// TODO: Pass a timestamp as a parameter in onDumpReport.
std::unique_ptr<std::vector<uint8_t>> onDumpReport() override;
void onSlicedConditionMayChange(const uint64_t eventTime) override;
size_t byteSize() const override;
// TODO: Implement this later.
virtual void notifyAppUpgrade(const string& apk, const int uid, const int version) override{};
// TODO: Implement this later.
virtual void notifyAppRemoved(const string& apk, const int uid) override{};
protected:
void onMatchedLogEventInternal(const size_t matcherIndex, const HashableDimensionKey& eventKey,
const std::map<std::string, HashableDimensionKey>& conditionKey,
bool condition, const LogEvent& event,
bool scheduledPull) override;
void startNewProtoOutputStream(long long timestamp) override;
void onMatchedLogEventInternalLocked(
const size_t matcherIndex, const HashableDimensionKey& eventKey,
const std::map<std::string, HashableDimensionKey>& conditionKey, bool condition,
const LogEvent& event, bool scheduledPull) override;
private:
// TODO: Pass a timestamp as a parameter in onDumpReport.
std::unique_ptr<std::vector<uint8_t>> onDumpReportLocked() override;
// Internal interface to handle condition change.
void onConditionChangedLocked(const bool conditionMet, const uint64_t eventTime) override;
// Internal interface to handle sliced condition change.
void onSlicedConditionMayChangeLocked(const uint64_t eventTime) override;
// Internal function to calculate the current used bytes.
size_t byteSizeLocked() const override;
// Util function to flush the old packet.
void flushIfNeededLocked(const uint64_t& newEventTime);
// Util function to init/reset the proto output stream.
void startNewProtoOutputStreamLocked(long long timestamp);
const CountMetric mMetric;
// TODO: Add a lock to mPastBuckets.
@@ -85,7 +90,7 @@ private:
static const size_t kBucketSize = sizeof(CountBucket{});
bool hitGuardRail(const HashableDimensionKey& newKey);
bool hitGuardRailLocked(const HashableDimensionKey& newKey);
FRIEND_TEST(CountMetricProducerTest, TestNonDimensionalEvents);
FRIEND_TEST(CountMetricProducerTest, TestEventsWithNonSlicedCondition);

View File

@@ -93,7 +93,7 @@ DurationMetricProducer::DurationMetricProducer(const ConfigKey& key, const Durat
mConditionSliced = true;
}
startNewProtoOutputStream(mStartTimeNs);
startNewProtoOutputStreamLocked(mStartTimeNs);
VLOG("metric %s created. bucket size %lld start_time: %lld", metric.name().c_str(),
(long long)mBucketSizeNs, (long long)mStartTimeNs);
@@ -103,7 +103,7 @@ DurationMetricProducer::~DurationMetricProducer() {
VLOG("~DurationMetric() called");
}
void DurationMetricProducer::startNewProtoOutputStream(long long startTime) {
void DurationMetricProducer::startNewProtoOutputStreamLocked(long long startTime) {
mProto = std::make_unique<ProtoOutputStream>();
mProto->write(FIELD_TYPE_STRING | FIELD_ID_NAME, mMetric.name());
mProto->write(FIELD_TYPE_INT64 | FIELD_ID_START_REPORT_NANOS, startTime);
@@ -111,7 +111,7 @@ void DurationMetricProducer::startNewProtoOutputStream(long long startTime) {
}
unique_ptr<DurationTracker> DurationMetricProducer::createDurationTracker(
const HashableDimensionKey& eventKey, vector<DurationBucket>& bucket) {
const HashableDimensionKey& eventKey, vector<DurationBucket>& bucket) const {
switch (mMetric.aggregation_type()) {
case DurationMetric_AggregationType_SUM:
return make_unique<OringDurationTracker>(
@@ -129,19 +129,20 @@ void DurationMetricProducer::finish() {
// DropboxWriter.
}
void DurationMetricProducer::onSlicedConditionMayChange(const uint64_t eventTime) {
void DurationMetricProducer::onSlicedConditionMayChangeLocked(const uint64_t eventTime) {
VLOG("Metric %s onSlicedConditionMayChange", mMetric.name().c_str());
flushIfNeeded(eventTime);
flushIfNeededLocked(eventTime);
// Now for each of the on-going event, check if the condition has changed for them.
for (auto& pair : mCurrentSlicedDuration) {
pair.second->onSlicedConditionMayChange(eventTime);
}
}
void DurationMetricProducer::onConditionChanged(const bool conditionMet, const uint64_t eventTime) {
void DurationMetricProducer::onConditionChangedLocked(const bool conditionMet,
const uint64_t eventTime) {
VLOG("Metric %s onConditionChanged", mMetric.name().c_str());
mCondition = conditionMet;
flushIfNeeded(eventTime);
flushIfNeededLocked(eventTime);
// TODO: need to populate the condition change time from the event which triggers the condition
// change, instead of using current time.
for (auto& pair : mCurrentSlicedDuration) {
@@ -149,13 +150,13 @@ void DurationMetricProducer::onConditionChanged(const bool conditionMet, const u
}
}
std::unique_ptr<std::vector<uint8_t>> DurationMetricProducer::onDumpReport() {
std::unique_ptr<std::vector<uint8_t>> DurationMetricProducer::onDumpReportLocked() {
long long endTime = time(nullptr) * NS_PER_SEC;
// Dump current bucket if it's stale.
// If current bucket is still on-going, don't force dump current bucket.
// In finish(), We can force dump current bucket.
flushIfNeeded(endTime);
flushIfNeededLocked(endTime);
VLOG("metric %s dump report now...", mMetric.name().c_str());
for (const auto& pair : mPastBuckets) {
@@ -214,13 +215,13 @@ std::unique_ptr<std::vector<uint8_t>> DurationMetricProducer::onDumpReport() {
mProto->end(mProtoToken);
mProto->write(FIELD_TYPE_INT64 | FIELD_ID_END_REPORT_NANOS,
(long long)mCurrentBucketStartTimeNs);
std::unique_ptr<std::vector<uint8_t>> buffer = serializeProto();
startNewProtoOutputStream(endTime);
std::unique_ptr<std::vector<uint8_t>> buffer = serializeProtoLocked();
startNewProtoOutputStreamLocked(endTime);
// TODO: Properly clear the old buckets.
return buffer;
}
void DurationMetricProducer::flushIfNeeded(uint64_t eventTime) {
void DurationMetricProducer::flushIfNeededLocked(const uint64_t& eventTime) {
if (mCurrentBucketStartTimeNs + mBucketSizeNs > eventTime) {
return;
}
@@ -239,7 +240,7 @@ void DurationMetricProducer::flushIfNeeded(uint64_t eventTime) {
mCurrentBucketNum += numBucketsForward;
}
bool DurationMetricProducer::hitGuardRail(const HashableDimensionKey& newKey) {
bool DurationMetricProducer::hitGuardRailLocked(const HashableDimensionKey& newKey) {
// the key is not new, we are good.
if (mCurrentSlicedDuration.find(newKey) != mCurrentSlicedDuration.end()) {
return false;
@@ -259,11 +260,11 @@ bool DurationMetricProducer::hitGuardRail(const HashableDimensionKey& newKey) {
return false;
}
void DurationMetricProducer::onMatchedLogEventInternal(
void DurationMetricProducer::onMatchedLogEventInternalLocked(
const size_t matcherIndex, const HashableDimensionKey& eventKey,
const map<string, HashableDimensionKey>& conditionKeys, bool condition,
const LogEvent& event, bool scheduledPull) {
flushIfNeeded(event.GetTimestampNs());
flushIfNeededLocked(event.GetTimestampNs());
if (matcherIndex == mStopAllIndex) {
for (auto& pair : mCurrentSlicedDuration) {
@@ -275,7 +276,7 @@ void DurationMetricProducer::onMatchedLogEventInternal(
HashableDimensionKey atomKey = getHashableKey(getDimensionKey(event, mInternalDimension));
if (mCurrentSlicedDuration.find(eventKey) == mCurrentSlicedDuration.end()) {
if (hitGuardRail(eventKey)) {
if (hitGuardRailLocked(eventKey)) {
return;
}
mCurrentSlicedDuration[eventKey] = createDurationTracker(eventKey, mPastBuckets[eventKey]);
@@ -290,7 +291,7 @@ void DurationMetricProducer::onMatchedLogEventInternal(
}
}
size_t DurationMetricProducer::byteSize() const {
size_t DurationMetricProducer::byteSizeLocked() const {
size_t totalSize = 0;
for (const auto& pair : mPastBuckets) {
totalSize += pair.second.size() * kBucketSize;

View File

@@ -45,17 +45,7 @@ public:
virtual ~DurationMetricProducer();
void onConditionChanged(const bool conditionMet, const uint64_t eventTime) override;
void finish() override;
void flushIfNeeded(const uint64_t newEventTime) override;
// TODO: Pass a timestamp as a parameter in onDumpReport.
std::unique_ptr<std::vector<uint8_t>> onDumpReport() override;
void onSlicedConditionMayChange(const uint64_t eventTime) override;
size_t byteSize() const override;
// TODO: Implement this later.
virtual void notifyAppUpgrade(const string& apk, const int uid, const int version) override{};
@@ -63,14 +53,30 @@ public:
virtual void notifyAppRemoved(const string& apk, const int uid) override{};
protected:
void onMatchedLogEventInternal(const size_t matcherIndex, const HashableDimensionKey& eventKey,
const std::map<std::string, HashableDimensionKey>& conditionKeys,
bool condition, const LogEvent& event,
bool scheduledPull) override;
void startNewProtoOutputStream(long long timestamp) override;
void onMatchedLogEventInternalLocked(
const size_t matcherIndex, const HashableDimensionKey& eventKey,
const std::map<std::string, HashableDimensionKey>& conditionKeys, bool condition,
const LogEvent& event, bool scheduledPull) override;
private:
// TODO: Pass a timestamp as a parameter in onDumpReport.
std::unique_ptr<std::vector<uint8_t>> onDumpReportLocked() override;
// Internal interface to handle condition change.
void onConditionChangedLocked(const bool conditionMet, const uint64_t eventTime) override;
// Internal interface to handle sliced condition change.
void onSlicedConditionMayChangeLocked(const uint64_t eventTime) override;
// Internal function to calculate the current used bytes.
size_t byteSizeLocked() const override;
// Util function to flush the old packet.
void flushIfNeededLocked(const uint64_t& eventTime);
// Util function to init/reset the proto output stream.
void startNewProtoOutputStreamLocked(long long timestamp);
const DurationMetric mMetric;
// Index of the SimpleLogEntryMatcher which defines the start.
@@ -96,11 +102,17 @@ private:
std::unordered_map<HashableDimensionKey, std::unique_ptr<DurationTracker>>
mCurrentSlicedDuration;
std::unique_ptr<DurationTracker> createDurationTracker(const HashableDimensionKey& eventKey,
std::vector<DurationBucket>& bucket);
bool hitGuardRail(const HashableDimensionKey& newKey);
// Helper function to create a duration tracker given the metric aggregation type.
std::unique_ptr<DurationTracker> createDurationTracker(
const HashableDimensionKey& eventKey, std::vector<DurationBucket>& bucket) const;
// Util function to check whether the specified dimension hits the guardrail.
bool hitGuardRailLocked(const HashableDimensionKey& newKey);
static const size_t kBucketSize = sizeof(DurationBucket{});
FRIEND_TEST(DurationMetricTrackerTest, TestNoCondition);
FRIEND_TEST(DurationMetricTrackerTest, TestNonSlicedCondition);
};
} // namespace statsd

View File

@@ -62,7 +62,7 @@ EventMetricProducer::EventMetricProducer(const ConfigKey& key, const EventMetric
mConditionSliced = true;
}
startNewProtoOutputStream(mStartTimeNs);
startNewProtoOutputStreamLocked(mStartTimeNs);
VLOG("metric %s created. bucket size %lld start_time: %lld", metric.name().c_str(),
(long long)mBucketSizeNs, (long long)mStartTimeNs);
@@ -72,7 +72,7 @@ EventMetricProducer::~EventMetricProducer() {
VLOG("~EventMetricProducer() called");
}
void EventMetricProducer::startNewProtoOutputStream(long long startTime) {
void EventMetricProducer::startNewProtoOutputStreamLocked(long long startTime) {
mProto = std::make_unique<ProtoOutputStream>();
// TODO: We need to auto-generate the field IDs for StatsLogReport, EventMetricData,
// and StatsEvent.
@@ -84,29 +84,30 @@ void EventMetricProducer::startNewProtoOutputStream(long long startTime) {
void EventMetricProducer::finish() {
}
void EventMetricProducer::onSlicedConditionMayChange(const uint64_t eventTime) {
void EventMetricProducer::onSlicedConditionMayChangeLocked(const uint64_t eventTime) {
}
std::unique_ptr<std::vector<uint8_t>> EventMetricProducer::onDumpReport() {
std::unique_ptr<std::vector<uint8_t>> EventMetricProducer::onDumpReportLocked() {
long long endTime = time(nullptr) * NS_PER_SEC;
mProto->end(mProtoToken);
mProto->write(FIELD_TYPE_INT64 | FIELD_ID_END_REPORT_NANOS, endTime);
size_t bufferSize = mProto->size();
VLOG("metric %s dump report now... proto size: %zu ", mMetric.name().c_str(), bufferSize);
std::unique_ptr<std::vector<uint8_t>> buffer = serializeProto();
std::unique_ptr<std::vector<uint8_t>> buffer = serializeProtoLocked();
startNewProtoOutputStream(endTime);
startNewProtoOutputStreamLocked(endTime);
return buffer;
}
void EventMetricProducer::onConditionChanged(const bool conditionMet, const uint64_t eventTime) {
void EventMetricProducer::onConditionChangedLocked(const bool conditionMet,
const uint64_t eventTime) {
VLOG("Metric %s onConditionChanged", mMetric.name().c_str());
mCondition = conditionMet;
}
void EventMetricProducer::onMatchedLogEventInternal(
void EventMetricProducer::onMatchedLogEventInternalLocked(
const size_t matcherIndex, const HashableDimensionKey& eventKey,
const std::map<std::string, HashableDimensionKey>& conditionKey, bool condition,
const LogEvent& event, bool scheduledPull) {
@@ -123,7 +124,7 @@ void EventMetricProducer::onMatchedLogEventInternal(
mProto->end(wrapperToken);
}
size_t EventMetricProducer::byteSize() const {
size_t EventMetricProducer::byteSizeLocked() const {
return mProto->bytesWritten();
}

View File

@@ -40,23 +40,7 @@ public:
virtual ~EventMetricProducer();
void onMatchedLogEventInternal(const size_t matcherIndex, const HashableDimensionKey& eventKey,
const std::map<std::string, HashableDimensionKey>& conditionKey,
bool condition, const LogEvent& event,
bool scheduledPull) override;
void onConditionChanged(const bool conditionMet, const uint64_t eventTime) override;
void finish() override;
void flushIfNeeded(const uint64_t newEventTime) override {
}
// TODO: Pass a timestamp as a parameter in onDumpReport.
std::unique_ptr<std::vector<uint8_t>> onDumpReport() override;
void onSlicedConditionMayChange(const uint64_t eventTime) override;
size_t byteSize() const override;
// TODO: Implement this later.
virtual void notifyAppUpgrade(const string& apk, const int uid, const int version) override{};
@@ -64,9 +48,26 @@ public:
virtual void notifyAppRemoved(const string& apk, const int uid) override{};
protected:
void startNewProtoOutputStream(long long timestamp) override;
void startNewProtoOutputStreamLocked(long long timestamp);
private:
void onMatchedLogEventInternalLocked(
const size_t matcherIndex, const HashableDimensionKey& eventKey,
const std::map<std::string, HashableDimensionKey>& conditionKey, bool condition,
const LogEvent& event, bool scheduledPull) override;
// TODO: Pass a timestamp as a parameter in onDumpReport.
std::unique_ptr<std::vector<uint8_t>> onDumpReportLocked() override;
// Internal interface to handle condition change.
void onConditionChangedLocked(const bool conditionMet, const uint64_t eventTime) override;
// Internal interface to handle sliced condition change.
void onSlicedConditionMayChangeLocked(const uint64_t eventTime) override;
// Internal function to calculate the current used bytes.
size_t byteSizeLocked() const override;
const EventMetric mMetric;
};

View File

@@ -91,7 +91,7 @@ GaugeMetricProducer::GaugeMetricProducer(const ConfigKey& key, const GaugeMetric
metric.bucket().bucket_size_millis());
}
startNewProtoOutputStream(mStartTimeNs);
startNewProtoOutputStreamLocked(mStartTimeNs);
VLOG("metric %s created. bucket size %lld start_time: %lld", metric.name().c_str(),
(long long)mBucketSizeNs, (long long)mStartTimeNs);
@@ -101,7 +101,7 @@ GaugeMetricProducer::~GaugeMetricProducer() {
VLOG("~GaugeMetricProducer() called");
}
void GaugeMetricProducer::startNewProtoOutputStream(long long startTime) {
void GaugeMetricProducer::startNewProtoOutputStreamLocked(long long startTime) {
mProto = std::make_unique<ProtoOutputStream>();
mProto->write(FIELD_TYPE_STRING | FIELD_ID_NAME, mMetric.name());
mProto->write(FIELD_TYPE_INT64 | FIELD_ID_START_REPORT_NANOS, startTime);
@@ -111,13 +111,13 @@ void GaugeMetricProducer::startNewProtoOutputStream(long long startTime) {
void GaugeMetricProducer::finish() {
}
std::unique_ptr<std::vector<uint8_t>> GaugeMetricProducer::onDumpReport() {
std::unique_ptr<std::vector<uint8_t>> GaugeMetricProducer::onDumpReportLocked() {
VLOG("gauge metric %s dump report now...", mMetric.name().c_str());
// Dump current bucket if it's stale.
// If current bucket is still on-going, don't force dump current bucket.
// In finish(), We can force dump current bucket.
flushIfNeeded(time(nullptr) * NS_PER_SEC);
flushIfNeededLocked(time(nullptr) * NS_PER_SEC);
for (const auto& pair : mPastBuckets) {
const HashableDimensionKey& hashableKey = pair.first;
@@ -167,9 +167,9 @@ std::unique_ptr<std::vector<uint8_t>> GaugeMetricProducer::onDumpReport() {
mProto->write(FIELD_TYPE_INT64 | FIELD_ID_END_REPORT_NANOS,
(long long)mCurrentBucketStartTimeNs);
std::unique_ptr<std::vector<uint8_t>> buffer = serializeProto();
std::unique_ptr<std::vector<uint8_t>> buffer = serializeProtoLocked();
startNewProtoOutputStream(time(nullptr) * NS_PER_SEC);
startNewProtoOutputStreamLocked(time(nullptr) * NS_PER_SEC);
mPastBuckets.clear();
return buffer;
@@ -177,10 +177,10 @@ std::unique_ptr<std::vector<uint8_t>> GaugeMetricProducer::onDumpReport() {
// TODO: Clear mDimensionKeyMap once the report is dumped.
}
void GaugeMetricProducer::onConditionChanged(const bool conditionMet, const uint64_t eventTime) {
AutoMutex _l(mLock);
void GaugeMetricProducer::onConditionChangedLocked(const bool conditionMet,
const uint64_t eventTime) {
VLOG("Metric %s onConditionChanged", mMetric.name().c_str());
flushIfNeeded(eventTime);
flushIfNeededLocked(eventTime);
mCondition = conditionMet;
// Push mode. No need to proactively pull the gauge data.
@@ -202,10 +202,10 @@ void GaugeMetricProducer::onConditionChanged(const bool conditionMet, const uint
for (const auto& data : allData) {
onMatchedLogEvent(0, *data, false /*scheduledPull*/);
}
flushIfNeeded(eventTime);
flushIfNeededLocked(eventTime);
}
void GaugeMetricProducer::onSlicedConditionMayChange(const uint64_t eventTime) {
void GaugeMetricProducer::onSlicedConditionMayChangeLocked(const uint64_t eventTime) {
VLOG("Metric %s onSlicedConditionMayChange", mMetric.name().c_str());
}
@@ -221,13 +221,14 @@ int64_t GaugeMetricProducer::getGauge(const LogEvent& event) {
}
void GaugeMetricProducer::onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& allData) {
AutoMutex mutex(mLock);
std::lock_guard<std::mutex> lock(mMutex);
for (const auto& data : allData) {
onMatchedLogEvent(0, *data, true /*scheduledPull*/);
}
}
bool GaugeMetricProducer::hitGuardRail(const HashableDimensionKey& newKey) {
bool GaugeMetricProducer::hitGuardRailLocked(const HashableDimensionKey& newKey) {
if (mCurrentSlicedBucket->find(newKey) != mCurrentSlicedBucket->end()) {
return false;
}
@@ -247,7 +248,7 @@ bool GaugeMetricProducer::hitGuardRail(const HashableDimensionKey& newKey) {
return false;
}
void GaugeMetricProducer::onMatchedLogEventInternal(
void GaugeMetricProducer::onMatchedLogEventInternalLocked(
const size_t matcherIndex, const HashableDimensionKey& eventKey,
const map<string, HashableDimensionKey>& conditionKey, bool condition,
const LogEvent& event, bool scheduledPull) {
@@ -263,7 +264,7 @@ void GaugeMetricProducer::onMatchedLogEventInternal(
// When the event happens in a new bucket, flush the old buckets.
if (eventTimeNs >= mCurrentBucketStartTimeNs + mBucketSizeNs) {
flushIfNeeded(eventTimeNs);
flushIfNeededLocked(eventTimeNs);
}
// For gauge metric, we just simply use the first gauge in the given bucket.
@@ -272,7 +273,7 @@ void GaugeMetricProducer::onMatchedLogEventInternal(
}
const long gauge = getGauge(event);
if (gauge >= 0) {
if (hitGuardRail(eventKey)) {
if (hitGuardRailLocked(eventKey)) {
return;
}
(*mCurrentSlicedBucket)[eventKey] = gauge;
@@ -287,7 +288,7 @@ void GaugeMetricProducer::onMatchedLogEventInternal(
// bucket.
// if data is pushed, onMatchedLogEvent will only be called through onConditionChanged() inside
// the GaugeMetricProducer while holding the lock.
void GaugeMetricProducer::flushIfNeeded(const uint64_t eventTimeNs) {
void GaugeMetricProducer::flushIfNeededLocked(const uint64_t& eventTimeNs) {
if (eventTimeNs < mCurrentBucketStartTimeNs + mBucketSizeNs) {
return;
}
@@ -320,7 +321,7 @@ void GaugeMetricProducer::flushIfNeeded(const uint64_t eventTimeNs) {
(long long)mCurrentBucketStartTimeNs);
}
size_t GaugeMetricProducer::byteSize() const {
size_t GaugeMetricProducer::byteSizeLocked() const {
size_t totalSize = 0;
for (const auto& pair : mPastBuckets) {
totalSize += pair.second.size() * kBucketSize;

View File

@@ -56,16 +56,7 @@ public:
// Handles when the pulled data arrives.
void onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& data) override;
void onConditionChanged(const bool conditionMet, const uint64_t eventTime) override;
void onSlicedConditionMayChange(const uint64_t eventTime) override;
void finish() override;
void flushIfNeeded(const uint64_t newEventTime) override;
// TODO: Pass a timestamp as a parameter in onDumpReport.
std::unique_ptr<std::vector<uint8_t>> onDumpReport() override;
size_t byteSize() const override;
// TODO: Implement this later.
virtual void notifyAppUpgrade(const string& apk, const int uid, const int version) override{};
@@ -73,24 +64,39 @@ public:
virtual void notifyAppRemoved(const string& apk, const int uid) override{};
protected:
void onMatchedLogEventInternal(const size_t matcherIndex, const HashableDimensionKey& eventKey,
const std::map<std::string, HashableDimensionKey>& conditionKey,
bool condition, const LogEvent& event,
bool scheduledPull) override;
void startNewProtoOutputStream(long long timestamp) override;
void onMatchedLogEventInternalLocked(
const size_t matcherIndex, const HashableDimensionKey& eventKey,
const std::map<std::string, HashableDimensionKey>& conditionKey, bool condition,
const LogEvent& event, bool scheduledPull) override;
private:
// TODO: Pass a timestamp as a parameter in onDumpReport.
std::unique_ptr<std::vector<uint8_t>> onDumpReportLocked() override;
// Internal interface to handle condition change.
void onConditionChangedLocked(const bool conditionMet, const uint64_t eventTime) override;
// Internal interface to handle sliced condition change.
void onSlicedConditionMayChangeLocked(const uint64_t eventTime) override;
// Internal function to calculate the current used bytes.
size_t byteSizeLocked() const override;
// Util function to flush the old packet.
void flushIfNeededLocked(const uint64_t& eventTime);
// Util function to init/reset the proto output stream.
void startNewProtoOutputStreamLocked(long long timestamp);
// The default bucket size for gauge metric is 1 second.
static const uint64_t kDefaultGaugemBucketSizeNs = 1000 * 1000 * 1000;
const GaugeMetric mMetric;
StatsPullerManager mStatsPullerManager;
// tagId for pulled data. -1 if this is not pulled
const int mPullTagId;
Mutex mLock;
// Save the past buckets and we can clear when the StatsLogReport is dumped.
// TODO: Add a lock to mPastBuckets.
std::unordered_map<HashableDimensionKey, std::vector<GaugeBucket>> mPastBuckets;
@@ -100,7 +106,8 @@ private:
int64_t getGauge(const LogEvent& event);
bool hitGuardRail(const HashableDimensionKey& newKey);
// Util function to check whether the specified dimension hits the guardrail.
bool hitGuardRailLocked(const HashableDimensionKey& newKey);
static const size_t kBucketSize = sizeof(GaugeBucket{});

View File

@@ -21,8 +21,8 @@ namespace statsd {
using std::map;
void MetricProducer::onMatchedLogEvent(const size_t matcherIndex, const LogEvent& event,
bool scheduledPull) {
void MetricProducer::onMatchedLogEventLocked(const size_t matcherIndex, const LogEvent& event,
bool scheduledPull) {
uint64_t eventTimeNs = event.GetTimestampNs();
// this is old event, maybe statsd restarted?
if (eventTimeNs < mStartTimeNs) {
@@ -60,11 +60,11 @@ void MetricProducer::onMatchedLogEvent(const size_t matcherIndex, const LogEvent
condition = mCondition;
}
onMatchedLogEventInternal(matcherIndex, eventKey, conditionKeys, condition, event,
scheduledPull);
onMatchedLogEventInternalLocked(matcherIndex, eventKey, conditionKeys, condition, event,
scheduledPull);
}
std::unique_ptr<std::vector<uint8_t>> MetricProducer::serializeProto() {
std::unique_ptr<std::vector<uint8_t>> MetricProducer::serializeProtoLocked() {
size_t bufferSize = mProto->size();
std::unique_ptr<std::vector<uint8_t>> buffer(new std::vector<uint8_t>(bufferSize));

View File

@@ -17,6 +17,8 @@
#ifndef METRIC_PRODUCER_H
#define METRIC_PRODUCER_H
#include <shared_mutex>
#include "anomaly/AnomalyTracker.h"
#include "condition/ConditionWizard.h"
#include "config/ConfigKey.h"
@@ -52,39 +54,61 @@ public:
virtual ~MetricProducer(){};
// Consume the parsed stats log entry that already matched the "what" of the metric.
void onMatchedLogEvent(const size_t matcherIndex, const LogEvent& event, bool scheduledPull);
void onMatchedLogEvent(const size_t matcherIndex, const LogEvent& event, bool scheduledPull) {
std::lock_guard<std::mutex> lock(mMutex);
onMatchedLogEventLocked(matcherIndex, event, scheduledPull);
}
virtual void onConditionChanged(const bool condition, const uint64_t eventTime) = 0;
void onConditionChanged(const bool condition, const uint64_t eventTime) {
std::lock_guard<std::mutex> lock(mMutex);
onConditionChangedLocked(condition, eventTime);
}
virtual void onSlicedConditionMayChange(const uint64_t eventTime) = 0;
void onSlicedConditionMayChange(const uint64_t eventTime) {
std::lock_guard<std::mutex> lock(mMutex);
onSlicedConditionMayChangeLocked(eventTime);
}
bool isConditionSliced() const {
std::lock_guard<std::mutex> lock(mMutex);
return mConditionSliced;
};
// This is called when the metric collecting is done, e.g., when there is a new configuration
// coming. MetricProducer should do the clean up, and dump existing data to dropbox.
virtual void finish() = 0;
virtual void flushIfNeeded(const uint64_t newEventTime) = 0;
// TODO: Pass a timestamp as a parameter in onDumpReport and update all its
// implementations.
// onDumpReport returns the proto-serialized output and clears the previously stored contents.
virtual std::unique_ptr<std::vector<uint8_t>> onDumpReport() = 0;
virtual bool isConditionSliced() const {
return mConditionSliced;
};
std::unique_ptr<std::vector<uint8_t>> onDumpReport() {
std::lock_guard<std::mutex> lock(mMutex);
return onDumpReportLocked();
}
// Returns the memory in bytes currently used to store this metric's data. Does not change
// state.
virtual size_t byteSize() const = 0;
size_t byteSize() const {
std::lock_guard<std::mutex> lock(mMutex);
return byteSizeLocked();
}
void addAnomalyTracker(sp<AnomalyTracker> tracker) {
std::lock_guard<std::mutex> lock(mMutex);
mAnomalyTrackers.push_back(tracker);
}
int64_t getBuckeSizeInNs() const {
std::lock_guard<std::mutex> lock(mMutex);
return mBucketSizeNs;
}
protected:
virtual void onConditionChangedLocked(const bool condition, const uint64_t eventTime) = 0;
virtual void onSlicedConditionMayChangeLocked(const uint64_t eventTime) = 0;
virtual std::unique_ptr<std::vector<uint8_t>> onDumpReportLocked() = 0;
virtual size_t byteSizeLocked() const = 0;
const ConfigKey mConfigKey;
const uint64_t mStartTimeNs;
@@ -128,18 +152,24 @@ protected:
* nonSlicedCondition.
* [event]: the log event, just in case the metric needs its data, e.g., EventMetric.
*/
virtual void onMatchedLogEventInternal(
virtual void onMatchedLogEventInternalLocked(
const size_t matcherIndex, const HashableDimensionKey& eventKey,
const std::map<std::string, HashableDimensionKey>& conditionKey, bool condition,
const LogEvent& event, bool scheduledPull) = 0;
// Consume the parsed stats log entry that already matched the "what" of the metric.
void onMatchedLogEventLocked(const size_t matcherIndex, const LogEvent& event,
bool scheduledPull);
std::unique_ptr<android::util::ProtoOutputStream> mProto;
long long mProtoToken;
virtual void startNewProtoOutputStream(long long timestamp) = 0;
// Read/Write mutex to make the producer thread-safe.
// TODO(yanglu): replace with std::shared_mutex when available in libc++.
mutable std::mutex mMutex;
std::unique_ptr<std::vector<uint8_t>> serializeProto();
std::unique_ptr<std::vector<uint8_t>> serializeProtoLocked();
};
} // namespace statsd

View File

@@ -142,7 +142,7 @@ void MetricsManager::onLogEvent(const LogEvent& event) {
// metric cares about sliced conditions, and it may have changed. Send
// notification, and the metric can query the sliced conditions that are
// interesting to it.
} else if (mAllMetricProducers[metricIndex]->isConditionSliced()) {
} else {
mAllMetricProducers[metricIndex]->onSlicedConditionMayChange(eventTime);
}
}
@@ -159,8 +159,8 @@ void MetricsManager::onLogEvent(const LogEvent& event) {
auto& metricList = pair->second;
for (const int metricIndex : metricList) {
// pushed metrics are never scheduled pulls
mAllMetricProducers[metricIndex]->onMatchedLogEvent(
i, event, false /* schedulePull */);
mAllMetricProducers[metricIndex]->onMatchedLogEvent(i, event,
false /* schedulePull */);
}
}
}

View File

@@ -98,7 +98,7 @@ ValueMetricProducer::ValueMetricProducer(const ConfigKey& key, const ValueMetric
metric.bucket().bucket_size_millis());
}
startNewProtoOutputStream(mStartTimeNs);
startNewProtoOutputStreamLocked(mStartTimeNs);
VLOG("value metric %s created. bucket size %lld start_time: %lld", metric.name().c_str(),
(long long)mBucketSizeNs, (long long)mStartTimeNs);
@@ -120,7 +120,7 @@ ValueMetricProducer::~ValueMetricProducer() {
}
}
void ValueMetricProducer::startNewProtoOutputStream(long long startTime) {
void ValueMetricProducer::startNewProtoOutputStreamLocked(long long startTime) {
mProto = std::make_unique<ProtoOutputStream>();
mProto->write(FIELD_TYPE_STRING | FIELD_ID_NAME, mMetric.name());
mProto->write(FIELD_TYPE_INT64 | FIELD_ID_START_REPORT_NANOS, startTime);
@@ -132,11 +132,11 @@ void ValueMetricProducer::finish() {
// DropboxWriter.
}
void ValueMetricProducer::onSlicedConditionMayChange(const uint64_t eventTime) {
void ValueMetricProducer::onSlicedConditionMayChangeLocked(const uint64_t eventTime) {
VLOG("Metric %s onSlicedConditionMayChange", mMetric.name().c_str());
}
std::unique_ptr<std::vector<uint8_t>> ValueMetricProducer::onDumpReport() {
std::unique_ptr<std::vector<uint8_t>> ValueMetricProducer::onDumpReportLocked() {
VLOG("metric %s dump report now...", mMetric.name().c_str());
for (const auto& pair : mPastBuckets) {
@@ -187,9 +187,9 @@ std::unique_ptr<std::vector<uint8_t>> ValueMetricProducer::onDumpReport() {
(long long)mCurrentBucketStartTimeNs);
VLOG("metric %s dump report now...", mMetric.name().c_str());
std::unique_ptr<std::vector<uint8_t>> buffer = serializeProto();
std::unique_ptr<std::vector<uint8_t>> buffer = serializeProtoLocked();
startNewProtoOutputStream(time(nullptr) * NS_PER_SEC);
startNewProtoOutputStreamLocked(time(nullptr) * NS_PER_SEC);
mPastBuckets.clear();
return buffer;
@@ -197,8 +197,7 @@ std::unique_ptr<std::vector<uint8_t>> ValueMetricProducer::onDumpReport() {
// TODO: Clear mDimensionKeyMap once the report is dumped.
}
void ValueMetricProducer::onConditionChanged(const bool condition, const uint64_t eventTime) {
AutoMutex _l(mLock);
void ValueMetricProducer::onConditionChangedLocked(const bool condition, const uint64_t eventTime) {
mCondition = condition;
if (mPullTagId != -1) {
@@ -215,16 +214,17 @@ void ValueMetricProducer::onConditionChanged(const bool condition, const uint64_
return;
}
for (const auto& data : allData) {
onMatchedLogEvent(0, *data, false);
onMatchedLogEventLocked(0, *data, false);
}
flushIfNeeded(eventTime);
flushIfNeededLocked(eventTime);
}
return;
}
}
void ValueMetricProducer::onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& allData) {
AutoMutex _l(mLock);
std::lock_guard<std::mutex> lock(mMutex);
if (mCondition == true || !mMetric.has_condition()) {
if (allData.size() == 0) {
return;
@@ -232,16 +232,16 @@ void ValueMetricProducer::onDataPulled(const std::vector<std::shared_ptr<LogEven
uint64_t eventTime = allData.at(0)->GetTimestampNs();
// alarm is not accurate and might drift.
if (eventTime > mCurrentBucketStartTimeNs + mBucketSizeNs * 3 / 2) {
flushIfNeeded(eventTime);
flushIfNeededLocked(eventTime);
}
for (const auto& data : allData) {
onMatchedLogEvent(0, *data, true);
onMatchedLogEventLocked(0, *data, true);
}
flushIfNeeded(eventTime);
flushIfNeededLocked(eventTime);
}
}
bool ValueMetricProducer::hitGuardRail(const HashableDimensionKey& newKey) {
bool ValueMetricProducer::hitGuardRailLocked(const HashableDimensionKey& newKey) {
// ===========GuardRail==============
// 1. Report the tuple count if the tuple count > soft limit
if (mCurrentSlicedBucket.find(newKey) != mCurrentSlicedBucket.end()) {
@@ -262,7 +262,7 @@ bool ValueMetricProducer::hitGuardRail(const HashableDimensionKey& newKey) {
return false;
}
void ValueMetricProducer::onMatchedLogEventInternal(
void ValueMetricProducer::onMatchedLogEventInternalLocked(
const size_t matcherIndex, const HashableDimensionKey& eventKey,
const map<string, HashableDimensionKey>& conditionKey, bool condition,
const LogEvent& event, bool scheduledPull) {
@@ -273,7 +273,7 @@ void ValueMetricProducer::onMatchedLogEventInternal(
return;
}
if (hitGuardRail(eventKey)) {
if (hitGuardRailLocked(eventKey)) {
return;
}
Interval& interval = mCurrentSlicedBucket[eventKey];
@@ -308,7 +308,7 @@ void ValueMetricProducer::onMatchedLogEventInternal(
}
}
} else {
flushIfNeeded(eventTimeNs);
flushIfNeededLocked(eventTimeNs);
interval.raw.push_back(make_pair(value, 0));
}
}
@@ -324,7 +324,7 @@ long ValueMetricProducer::get_value(const LogEvent& event) {
}
}
void ValueMetricProducer::flushIfNeeded(const uint64_t eventTimeNs) {
void ValueMetricProducer::flushIfNeededLocked(const uint64_t& eventTimeNs) {
if (mCurrentBucketStartTimeNs + mBucketSizeNs > eventTimeNs) {
VLOG("eventTime is %lld, less than next bucket start time %lld", (long long)eventTimeNs,
(long long)(mCurrentBucketStartTimeNs + mBucketSizeNs));
@@ -372,7 +372,7 @@ void ValueMetricProducer::flushIfNeeded(const uint64_t eventTimeNs) {
(long long)mCurrentBucketStartTimeNs);
}
size_t ValueMetricProducer::byteSize() const {
size_t ValueMetricProducer::byteSizeLocked() const {
size_t totalSize = 0;
for (const auto& pair : mPastBuckets) {
totalSize += pair.second.size() * kBucketSize;

View File

@@ -44,34 +44,40 @@ public:
virtual ~ValueMetricProducer();
void onConditionChanged(const bool condition, const uint64_t eventTime) override;
void finish() override;
void flushIfNeeded(const uint64_t eventTimeNs) override;
// TODO: Pass a timestamp as a parameter in onDumpReport.
std::unique_ptr<std::vector<uint8_t>> onDumpReport() override;
void onSlicedConditionMayChange(const uint64_t eventTime);
void onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& data) override;
size_t byteSize() const override;
// TODO: Implement this later.
virtual void notifyAppUpgrade(const string& apk, const int uid, const int version) override{};
// TODO: Implement this later.
virtual void notifyAppRemoved(const string& apk, const int uid) override{};
protected:
void onMatchedLogEventInternal(const size_t matcherIndex, const HashableDimensionKey& eventKey,
const std::map<std::string, HashableDimensionKey>& conditionKey,
bool condition, const LogEvent& event,
bool scheduledPull) override;
void startNewProtoOutputStream(long long timestamp) override;
void onMatchedLogEventInternalLocked(
const size_t matcherIndex, const HashableDimensionKey& eventKey,
const std::map<std::string, HashableDimensionKey>& conditionKey, bool condition,
const LogEvent& event, bool scheduledPull) override;
private:
// TODO: Pass a timestamp as a parameter in onDumpReport.
std::unique_ptr<std::vector<uint8_t>> onDumpReportLocked() override;
// Internal interface to handle condition change.
void onConditionChangedLocked(const bool conditionMet, const uint64_t eventTime) override;
// Internal interface to handle sliced condition change.
void onSlicedConditionMayChangeLocked(const uint64_t eventTime) override;
// Internal function to calculate the current used bytes.
size_t byteSizeLocked() const override;
// Util function to flush the old packet.
void flushIfNeededLocked(const uint64_t& eventTime);
// Util function to init/reset the proto output stream.
void startNewProtoOutputStreamLocked(long long timestamp);
const ValueMetric mMetric;
std::shared_ptr<StatsPullerManager> mStatsPullerManager;
@@ -82,8 +88,6 @@ private:
const int pullTagId, const uint64_t startTimeNs,
std::shared_ptr<StatsPullerManager> statsPullerManager);
Mutex mLock;
// tagId for pulled data. -1 if this is not pulled
const int mPullTagId;
@@ -104,7 +108,8 @@ private:
long get_value(const LogEvent& event);
bool hitGuardRail(const HashableDimensionKey& newKey);
// Util function to check whether the specified dimension hits the guardrail.
bool hitGuardRailLocked(const HashableDimensionKey& newKey);
static const size_t kBucketSize = sizeof(ValueBucket{});

View File

@@ -469,11 +469,11 @@ bool initAlerts(const StatsdConfig& config, const unordered_map<string, int>& me
}
const int metricIndex = itr->second;
if (alert.trigger_if_sum_gt() >
(int64_t) alert.number_of_buckets()
* allMetricProducers[metricIndex]->getBuckeSizeInNs()) {
(int64_t)alert.number_of_buckets() *
allMetricProducers[metricIndex]->getBuckeSizeInNs()) {
ALOGW("invalid alert: threshold (%lld) > possible recordable value (%d x %lld)",
alert.trigger_if_sum_gt(), alert.number_of_buckets(),
(long long) allMetricProducers[metricIndex]->getBuckeSizeInNs());
(long long)allMetricProducers[metricIndex]->getBuckeSizeInNs());
return false;
}

View File

@@ -58,11 +58,11 @@ TEST(CountMetricProducerTest, TestNonDimensionalEvents) {
countProducer.onMatchedLogEvent(1 /*log matcher index*/, event2, false);
// Flushes at event #2.
countProducer.flushIfNeeded(bucketStartTimeNs + 2);
countProducer.flushIfNeededLocked(bucketStartTimeNs + 2);
EXPECT_EQ(0UL, countProducer.mPastBuckets.size());
// Flushes.
countProducer.flushIfNeeded(bucketStartTimeNs + bucketSizeNs + 1);
countProducer.flushIfNeededLocked(bucketStartTimeNs + bucketSizeNs + 1);
EXPECT_EQ(1UL, countProducer.mPastBuckets.size());
EXPECT_TRUE(countProducer.mPastBuckets.find(DEFAULT_DIMENSION_KEY) !=
countProducer.mPastBuckets.end());
@@ -75,7 +75,7 @@ TEST(CountMetricProducerTest, TestNonDimensionalEvents) {
// 1 matched event happens in bucket 2.
LogEvent event3(tagId, bucketStartTimeNs + bucketSizeNs + 2);
countProducer.onMatchedLogEvent(1 /*log matcher index*/, event3, false);
countProducer.flushIfNeeded(bucketStartTimeNs + 2 * bucketSizeNs + 1);
countProducer.flushIfNeededLocked(bucketStartTimeNs + 2 * bucketSizeNs + 1);
EXPECT_EQ(1UL, countProducer.mPastBuckets.size());
EXPECT_TRUE(countProducer.mPastBuckets.find(DEFAULT_DIMENSION_KEY) !=
countProducer.mPastBuckets.end());
@@ -86,7 +86,7 @@ TEST(CountMetricProducerTest, TestNonDimensionalEvents) {
EXPECT_EQ(1LL, bucketInfo2.mCount);
// nothing happens in bucket 3. we should not record anything for bucket 3.
countProducer.flushIfNeeded(bucketStartTimeNs + 3 * bucketSizeNs + 1);
countProducer.flushIfNeededLocked(bucketStartTimeNs + 3 * bucketSizeNs + 1);
EXPECT_EQ(1UL, countProducer.mPastBuckets.size());
EXPECT_TRUE(countProducer.mPastBuckets.find(DEFAULT_DIMENSION_KEY) !=
countProducer.mPastBuckets.end());
@@ -119,7 +119,7 @@ TEST(CountMetricProducerTest, TestEventsWithNonSlicedCondition) {
countProducer.onMatchedLogEvent(1 /*matcher index*/, event2, false /*pulled*/);
EXPECT_EQ(0UL, countProducer.mPastBuckets.size());
countProducer.flushIfNeeded(bucketStartTimeNs + bucketSizeNs + 1);
countProducer.flushIfNeededLocked(bucketStartTimeNs + bucketSizeNs + 1);
EXPECT_EQ(1UL, countProducer.mPastBuckets.size());
EXPECT_TRUE(countProducer.mPastBuckets.find(DEFAULT_DIMENSION_KEY) !=
countProducer.mPastBuckets.end());
@@ -167,11 +167,11 @@ TEST(CountMetricProducerTest, TestEventsWithSlicedCondition) {
bucketStartTimeNs);
countProducer.onMatchedLogEvent(1 /*log matcher index*/, event1, false);
countProducer.flushIfNeeded(bucketStartTimeNs + 1);
countProducer.flushIfNeededLocked(bucketStartTimeNs + 1);
EXPECT_EQ(0UL, countProducer.mPastBuckets.size());
countProducer.onMatchedLogEvent(1 /*log matcher index*/, event2, false);
countProducer.flushIfNeeded(bucketStartTimeNs + bucketSizeNs + 1);
countProducer.flushIfNeededLocked(bucketStartTimeNs + bucketSizeNs + 1);
EXPECT_EQ(1UL, countProducer.mPastBuckets.size());
EXPECT_TRUE(countProducer.mPastBuckets.find(DEFAULT_DIMENSION_KEY) !=
countProducer.mPastBuckets.end());

View File

@@ -0,0 +1,126 @@
// Copyright (C) 2017 The Android Open Source Project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "src/metrics/DurationMetricProducer.h"
#include "metrics_test_helper.h"
#include "src/condition/ConditionWizard.h"
#include <gmock/gmock.h>
#include <gtest/gtest.h>
#include <stdio.h>
#include <set>
#include <unordered_map>
#include <vector>
using namespace android::os::statsd;
using namespace testing;
using android::sp;
using std::set;
using std::unordered_map;
using std::vector;
#ifdef __ANDROID__
namespace android {
namespace os {
namespace statsd {
const ConfigKey kConfigKey(0, "test");
TEST(DurationMetricTrackerTest, TestNoCondition) {
sp<MockConditionWizard> wizard = new NaggyMock<MockConditionWizard>();
uint64_t bucketStartTimeNs = 10000000000;
uint64_t bucketSizeNs = 30 * 1000 * 1000 * 1000LL;
DurationMetric metric;
metric.set_name("1");
metric.mutable_bucket()->set_bucket_size_millis(bucketSizeNs / 1000000);
metric.set_aggregation_type(DurationMetric_AggregationType_SUM);
int tagId = 1;
LogEvent event1(tagId, bucketStartTimeNs + 1);
LogEvent event2(tagId, bucketStartTimeNs + bucketSizeNs + 2);
DurationMetricProducer durationProducer(
kConfigKey, metric, -1 /*no condition*/, 1 /* start index */, 2 /* stop index */,
3 /* stop_all index */, false /*nesting*/, wizard, {}, bucketStartTimeNs);
durationProducer.onMatchedLogEvent(1 /* start index*/, event1, false /* scheduledPull */);
durationProducer.onMatchedLogEvent(2 /* stop index*/, event2, false /* scheduledPull */);
durationProducer.flushIfNeededLocked(bucketStartTimeNs + 2 * bucketSizeNs + 1);
EXPECT_EQ(1UL, durationProducer.mPastBuckets.size());
EXPECT_TRUE(durationProducer.mPastBuckets.find(DEFAULT_DIMENSION_KEY) !=
durationProducer.mPastBuckets.end());
const auto& buckets = durationProducer.mPastBuckets[DEFAULT_DIMENSION_KEY];
EXPECT_EQ(2UL, buckets.size());
EXPECT_EQ(bucketStartTimeNs, buckets[0].mBucketStartNs);
EXPECT_EQ(bucketStartTimeNs + bucketSizeNs, buckets[0].mBucketEndNs);
EXPECT_EQ(bucketSizeNs - 1ULL, buckets[0].mDuration);
EXPECT_EQ(bucketStartTimeNs + bucketSizeNs, buckets[1].mBucketStartNs);
EXPECT_EQ(bucketStartTimeNs + 2 * bucketSizeNs, buckets[1].mBucketEndNs);
EXPECT_EQ(2ULL, buckets[1].mDuration);
}
TEST(DurationMetricTrackerTest, TestNonSlicedCondition) {
sp<MockConditionWizard> wizard = new NaggyMock<MockConditionWizard>();
uint64_t bucketStartTimeNs = 10000000000;
uint64_t bucketSizeNs = 30 * 1000 * 1000 * 1000LL;
DurationMetric metric;
metric.set_name("1");
metric.mutable_bucket()->set_bucket_size_millis(bucketSizeNs / 1000000);
metric.set_aggregation_type(DurationMetric_AggregationType_SUM);
int tagId = 1;
LogEvent event1(tagId, bucketStartTimeNs + 1);
LogEvent event2(tagId, bucketStartTimeNs + 2);
LogEvent event3(tagId, bucketStartTimeNs + bucketSizeNs + 1);
LogEvent event4(tagId, bucketStartTimeNs + bucketSizeNs + 3);
DurationMetricProducer durationProducer(
kConfigKey, metric, 0 /*no condition*/, 1 /* start index */, 2 /* stop index */,
3 /* stop_all index */, false /*nesting*/, wizard, {}, bucketStartTimeNs);
EXPECT_FALSE(durationProducer.mCondition);
EXPECT_FALSE(durationProducer.isConditionSliced());
durationProducer.onMatchedLogEvent(1 /* start index*/, event1, false /* scheduledPull */);
durationProducer.onMatchedLogEvent(2 /* stop index*/, event2, false /* scheduledPull */);
durationProducer.flushIfNeededLocked(bucketStartTimeNs + bucketSizeNs + 1);
EXPECT_EQ(1UL, durationProducer.mPastBuckets.size());
EXPECT_TRUE(durationProducer.mPastBuckets.find(DEFAULT_DIMENSION_KEY) !=
durationProducer.mPastBuckets.end());
const auto& buckets1 = durationProducer.mPastBuckets[DEFAULT_DIMENSION_KEY];
EXPECT_EQ(0UL, buckets1.size());
durationProducer.onMatchedLogEvent(1 /* start index*/, event3, false /* scheduledPull */);
durationProducer.onConditionChanged(true /* condition */, bucketStartTimeNs + bucketSizeNs + 2);
durationProducer.onMatchedLogEvent(2 /* stop index*/, event4, false /* scheduledPull */);
durationProducer.flushIfNeededLocked(bucketStartTimeNs + 2 * bucketSizeNs + 1);
EXPECT_EQ(1UL, durationProducer.mPastBuckets.size());
EXPECT_TRUE(durationProducer.mPastBuckets.find(DEFAULT_DIMENSION_KEY) !=
durationProducer.mPastBuckets.end());
const auto& buckets2 = durationProducer.mPastBuckets[DEFAULT_DIMENSION_KEY];
EXPECT_EQ(1UL, buckets2.size());
EXPECT_EQ(bucketStartTimeNs + bucketSizeNs, buckets2[0].mBucketStartNs);
EXPECT_EQ(bucketStartTimeNs + 2 * bucketSizeNs, buckets2[0].mBucketEndNs);
EXPECT_EQ(1ULL, buckets2[0].mDuration);
}
} // namespace statsd
} // namespace os
} // namespace android
#else
GTEST_LOG_(INFO) << "This test does nothing.\n";
#endif

View File

@@ -60,7 +60,7 @@ TEST(GaugeMetricProducerTest, TestWithCondition) {
allData.push_back(event2);
gaugeProducer.onDataPulled(allData);
gaugeProducer.flushIfNeeded(event2->GetTimestampNs() + 1);
gaugeProducer.flushIfNeededLocked(event2->GetTimestampNs() + 1);
EXPECT_EQ(0UL, gaugeProducer.mCurrentSlicedBucket->size());
EXPECT_EQ(0UL, gaugeProducer.mPastBuckets.size());
@@ -74,7 +74,7 @@ TEST(GaugeMetricProducerTest, TestWithCondition) {
event3->init();
allData.push_back(event3);
gaugeProducer.onDataPulled(allData);
gaugeProducer.flushIfNeeded(bucketStartTimeNs + 2 * bucketSizeNs + 10);
gaugeProducer.flushIfNeededLocked(bucketStartTimeNs + 2 * bucketSizeNs + 10);
EXPECT_EQ(1UL, gaugeProducer.mCurrentSlicedBucket->size());
EXPECT_EQ(25L, gaugeProducer.mCurrentSlicedBucket->begin()->second);
// One dimension.

View File

@@ -282,7 +282,7 @@ TEST(ValueMetricProducerTest, TestPushedEventsWithoutCondition) {
EXPECT_EQ(20, curInterval.raw.back().first);
EXPECT_EQ(0UL, valueProducer.mNextSlicedBucket.size());
valueProducer.flushIfNeeded(bucket3StartTimeNs);
valueProducer.flushIfNeededLocked(bucket3StartTimeNs);
EXPECT_EQ(1UL, valueProducer.mPastBuckets.size());
EXPECT_EQ(1UL, valueProducer.mPastBuckets.begin()->second.size());
EXPECT_EQ(30, valueProducer.mPastBuckets.begin()->second.back().mValue);