add feature: GaugeMetricProducer now takes repeated list of fields
bug fix: GaugeMetricProducer now works better with pulled events. unit test also includes GaugeMetricProducer_test Test: unit test Change-Id: Ic60f09342d14cfb107be2130d445b323a56909e0
This commit is contained in:
@@ -173,6 +173,7 @@ LOCAL_SRC_FILES := \
|
||||
tests/metrics/DurationMetricProducer_test.cpp \
|
||||
tests/metrics/EventMetricProducer_test.cpp \
|
||||
tests/metrics/ValueMetricProducer_test.cpp \
|
||||
tests/metrics/GaugeMetricProducer_test.cpp \
|
||||
tests/guardrail/StatsdStats_test.cpp
|
||||
|
||||
LOCAL_STATIC_LIBRARIES := \
|
||||
|
||||
@@ -369,7 +369,7 @@ StatsdConfig build_fake_config() {
|
||||
GaugeMetric* gaugeMetric = config.add_gauge_metric();
|
||||
gaugeMetric->set_name("METRIC_10");
|
||||
gaugeMetric->set_what("DEVICE_TEMPERATURE");
|
||||
gaugeMetric->set_gauge_field(DEVICE_TEMPERATURE_KEY);
|
||||
gaugeMetric->mutable_gauge_fields()->add_field_num(DEVICE_TEMPERATURE_KEY);
|
||||
gaugeMetric->mutable_bucket()->set_bucket_size_millis(60 * 1000L);
|
||||
|
||||
// Event matchers............
|
||||
|
||||
@@ -69,6 +69,13 @@ bool LogEvent::write(uint32_t value) {
|
||||
return false;
|
||||
}
|
||||
|
||||
bool LogEvent::write(int64_t value) {
|
||||
if (mContext) {
|
||||
return android_log_write_int64(mContext, value) >= 0;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
bool LogEvent::write(uint64_t value) {
|
||||
if (mContext) {
|
||||
return android_log_write_int64(mContext, value) >= 0;
|
||||
@@ -224,7 +231,7 @@ KeyValuePair LogEvent::GetKeyValueProto(size_t key) const {
|
||||
if (elem.type == EVENT_TYPE_INT) {
|
||||
pair.set_value_int(elem.data.int32);
|
||||
} else if (elem.type == EVENT_TYPE_LONG) {
|
||||
pair.set_value_int(elem.data.int64);
|
||||
pair.set_value_long(elem.data.int64);
|
||||
} else if (elem.type == EVENT_TYPE_STRING) {
|
||||
pair.set_value_str(elem.data.string);
|
||||
} else if (elem.type == EVENT_TYPE_FLOAT) {
|
||||
|
||||
@@ -110,6 +110,10 @@ public:
|
||||
*/
|
||||
void setTimestampNs(uint64_t timestampNs) {mTimestampNs = timestampNs;}
|
||||
|
||||
int size() const {
|
||||
return mElements.size();
|
||||
}
|
||||
|
||||
private:
|
||||
/**
|
||||
* Don't copy, it's slower. If we really need this we can add it but let's try to
|
||||
|
||||
@@ -19,11 +19,8 @@
|
||||
|
||||
#include "GaugeMetricProducer.h"
|
||||
#include "guardrail/StatsdStats.h"
|
||||
#include "stats_util.h"
|
||||
|
||||
#include <cutils/log.h>
|
||||
#include <limits.h>
|
||||
#include <stdlib.h>
|
||||
|
||||
using android::util::FIELD_COUNT_REPEATED;
|
||||
using android::util::FIELD_TYPE_BOOL;
|
||||
@@ -37,6 +34,8 @@ using std::map;
|
||||
using std::string;
|
||||
using std::unordered_map;
|
||||
using std::vector;
|
||||
using std::make_shared;
|
||||
using std::shared_ptr;
|
||||
|
||||
namespace android {
|
||||
namespace os {
|
||||
@@ -61,21 +60,27 @@ const int FIELD_ID_VALUE_FLOAT = 5;
|
||||
// for GaugeBucketInfo
|
||||
const int FIELD_ID_START_BUCKET_NANOS = 1;
|
||||
const int FIELD_ID_END_BUCKET_NANOS = 2;
|
||||
const int FIELD_ID_GAUGE = 3;
|
||||
const int FIELD_ID_ATOM = 3;
|
||||
|
||||
GaugeMetricProducer::GaugeMetricProducer(const ConfigKey& key, const GaugeMetric& metric,
|
||||
const int conditionIndex,
|
||||
const sp<ConditionWizard>& wizard, const int pullTagId,
|
||||
const int64_t startTimeNs)
|
||||
const sp<ConditionWizard>& wizard, const int atomTagId,
|
||||
const int pullTagId, const uint64_t startTimeNs,
|
||||
shared_ptr<StatsPullerManager> statsPullerManager)
|
||||
: MetricProducer(metric.name(), key, startTimeNs, conditionIndex, wizard),
|
||||
mGaugeField(metric.gauge_field()),
|
||||
mPullTagId(pullTagId) {
|
||||
mStatsPullerManager(statsPullerManager),
|
||||
mPullTagId(pullTagId),
|
||||
mAtomTagId(atomTagId) {
|
||||
if (metric.has_bucket() && metric.bucket().has_bucket_size_millis()) {
|
||||
mBucketSizeNs = metric.bucket().bucket_size_millis() * 1000 * 1000;
|
||||
} else {
|
||||
mBucketSizeNs = kDefaultGaugemBucketSizeNs;
|
||||
}
|
||||
|
||||
for (int i = 0; i < metric.gauge_fields().field_num_size(); i++) {
|
||||
mGaugeFields.push_back(metric.gauge_fields().field_num(i));
|
||||
}
|
||||
|
||||
// TODO: use UidMap if uid->pkg_name is required
|
||||
mDimension.insert(mDimension.begin(), metric.dimension().begin(), metric.dimension().end());
|
||||
|
||||
@@ -87,7 +92,7 @@ GaugeMetricProducer::GaugeMetricProducer(const ConfigKey& key, const GaugeMetric
|
||||
|
||||
// Kicks off the puller immediately.
|
||||
if (mPullTagId != -1) {
|
||||
mStatsPullerManager.RegisterReceiver(mPullTagId, this,
|
||||
mStatsPullerManager->RegisterReceiver(mPullTagId, this,
|
||||
metric.bucket().bucket_size_millis());
|
||||
}
|
||||
|
||||
@@ -95,10 +100,19 @@ GaugeMetricProducer::GaugeMetricProducer(const ConfigKey& key, const GaugeMetric
|
||||
(long long)mBucketSizeNs, (long long)mStartTimeNs);
|
||||
}
|
||||
|
||||
// for testing
|
||||
GaugeMetricProducer::GaugeMetricProducer(const ConfigKey& key, const GaugeMetric& metric,
|
||||
const int conditionIndex,
|
||||
const sp<ConditionWizard>& wizard, const int pullTagId,
|
||||
const int atomTagId, const int64_t startTimeNs)
|
||||
: GaugeMetricProducer(key, metric, conditionIndex, wizard, pullTagId, atomTagId, startTimeNs,
|
||||
make_shared<StatsPullerManager>()) {
|
||||
}
|
||||
|
||||
GaugeMetricProducer::~GaugeMetricProducer() {
|
||||
VLOG("~GaugeMetricProducer() called");
|
||||
if (mPullTagId != -1) {
|
||||
mStatsPullerManager.UnRegisterReceiver(mPullTagId, this);
|
||||
mStatsPullerManager->UnRegisterReceiver(mPullTagId, this);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -149,10 +163,26 @@ void GaugeMetricProducer::onDumpReportLocked(const uint64_t dumpTimeNs,
|
||||
(long long)bucket.mBucketStartNs);
|
||||
protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_END_BUCKET_NANOS,
|
||||
(long long)bucket.mBucketEndNs);
|
||||
protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_GAUGE, (long long)bucket.mGauge);
|
||||
long long atomToken = protoOutput->start(FIELD_TYPE_MESSAGE | FIELD_ID_ATOM);
|
||||
long long eventToken = protoOutput->start(FIELD_TYPE_MESSAGE | mAtomTagId);
|
||||
for (const auto& pair : bucket.mEvent->kv) {
|
||||
if (pair.has_value_int()) {
|
||||
protoOutput->write(FIELD_TYPE_INT32 | pair.key(), pair.value_int());
|
||||
} else if (pair.has_value_long()) {
|
||||
protoOutput->write(FIELD_TYPE_INT64 | pair.key(), pair.value_long());
|
||||
} else if (pair.has_value_str()) {
|
||||
protoOutput->write(FIELD_TYPE_STRING | pair.key(), pair.value_str());
|
||||
} else if (pair.has_value_long()) {
|
||||
protoOutput->write(FIELD_TYPE_FLOAT | pair.key(), pair.value_float());
|
||||
} else if (pair.has_value_bool()) {
|
||||
protoOutput->write(FIELD_TYPE_BOOL | pair.key(), pair.value_bool());
|
||||
}
|
||||
}
|
||||
protoOutput->end(eventToken);
|
||||
protoOutput->end(atomToken);
|
||||
protoOutput->end(bucketInfoToken);
|
||||
VLOG("\t bucket [%lld - %lld] count: %lld", (long long)bucket.mBucketStartNs,
|
||||
(long long)bucket.mBucketEndNs, (long long)bucket.mGauge);
|
||||
VLOG("\t bucket [%lld - %lld] content: %s", (long long)bucket.mBucketStartNs,
|
||||
(long long)bucket.mBucketEndNs, bucket.mEvent->ToString().c_str());
|
||||
}
|
||||
protoOutput->end(wrapperToken);
|
||||
}
|
||||
@@ -174,6 +204,7 @@ void GaugeMetricProducer::onConditionChangedLocked(const bool conditionMet,
|
||||
if (mPullTagId == -1) {
|
||||
return;
|
||||
}
|
||||
// No need to pull again. Either scheduled pull or condition on true happened
|
||||
if (!mCondition) {
|
||||
return;
|
||||
}
|
||||
@@ -182,7 +213,7 @@ void GaugeMetricProducer::onConditionChangedLocked(const bool conditionMet,
|
||||
return;
|
||||
}
|
||||
vector<std::shared_ptr<LogEvent>> allData;
|
||||
if (!mStatsPullerManager.Pull(mPullTagId, &allData)) {
|
||||
if (!mStatsPullerManager->Pull(mPullTagId, &allData)) {
|
||||
ALOGE("Stats puller failed for tag: %d", mPullTagId);
|
||||
return;
|
||||
}
|
||||
@@ -196,20 +227,25 @@ void GaugeMetricProducer::onSlicedConditionMayChangeLocked(const uint64_t eventT
|
||||
VLOG("Metric %s onSlicedConditionMayChange", mName.c_str());
|
||||
}
|
||||
|
||||
int64_t GaugeMetricProducer::getGauge(const LogEvent& event) {
|
||||
status_t err = NO_ERROR;
|
||||
int64_t val = event.GetLong(mGaugeField, &err);
|
||||
if (err == NO_ERROR) {
|
||||
return val;
|
||||
shared_ptr<EventKV> GaugeMetricProducer::getGauge(const LogEvent& event) {
|
||||
shared_ptr<EventKV> ret = make_shared<EventKV>();
|
||||
if (mGaugeFields.size() == 0) {
|
||||
for (int i = 1; i <= event.size(); i++) {
|
||||
ret->kv.push_back(event.GetKeyValueProto(i));
|
||||
}
|
||||
} else {
|
||||
VLOG("Can't find value in message.");
|
||||
return -1;
|
||||
for (int i = 0; i < (int)mGaugeFields.size(); i++) {
|
||||
ret->kv.push_back(event.GetKeyValueProto(mGaugeFields[i]));
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
void GaugeMetricProducer::onDataPulled(const std::vector<std::shared_ptr<LogEvent>>& allData) {
|
||||
std::lock_guard<std::mutex> lock(mMutex);
|
||||
|
||||
if (allData.size() == 0) {
|
||||
return;
|
||||
}
|
||||
for (const auto& data : allData) {
|
||||
onMatchedLogEventLocked(0, *data);
|
||||
}
|
||||
@@ -247,25 +283,48 @@ void GaugeMetricProducer::onMatchedLogEventInternalLocked(
|
||||
(long long)mCurrentBucketStartTimeNs);
|
||||
return;
|
||||
}
|
||||
|
||||
// When the event happens in a new bucket, flush the old buckets.
|
||||
if (eventTimeNs >= mCurrentBucketStartTimeNs + mBucketSizeNs) {
|
||||
flushIfNeededLocked(eventTimeNs);
|
||||
}
|
||||
flushIfNeededLocked(eventTimeNs);
|
||||
|
||||
// For gauge metric, we just simply use the first gauge in the given bucket.
|
||||
if (!mCurrentSlicedBucket->empty()) {
|
||||
if (mCurrentSlicedBucket->find(eventKey) != mCurrentSlicedBucket->end()) {
|
||||
return;
|
||||
}
|
||||
const long gauge = getGauge(event);
|
||||
if (gauge >= 0) {
|
||||
if (hitGuardRailLocked(eventKey)) {
|
||||
return;
|
||||
}
|
||||
(*mCurrentSlicedBucket)[eventKey] = gauge;
|
||||
shared_ptr<EventKV> gauge = getGauge(event);
|
||||
if (hitGuardRailLocked(eventKey)) {
|
||||
return;
|
||||
}
|
||||
for (auto& tracker : mAnomalyTrackers) {
|
||||
tracker->detectAndDeclareAnomaly(eventTimeNs, mCurrentBucketNum, eventKey, gauge);
|
||||
(*mCurrentSlicedBucket)[eventKey] = gauge;
|
||||
// Anomaly detection on gauge metric only works when there is one numeric
|
||||
// field specified.
|
||||
if (mAnomalyTrackers.size() > 0) {
|
||||
if (gauge->kv.size() == 1) {
|
||||
KeyValuePair pair = gauge->kv[0];
|
||||
long gaugeVal = 0;
|
||||
if (pair.has_value_int()) {
|
||||
gaugeVal = (long)pair.value_int();
|
||||
} else if (pair.has_value_long()) {
|
||||
gaugeVal = pair.value_long();
|
||||
}
|
||||
for (auto& tracker : mAnomalyTrackers) {
|
||||
tracker->detectAndDeclareAnomaly(eventTimeNs, mCurrentBucketNum, eventKey,
|
||||
gaugeVal);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void GaugeMetricProducer::updateCurrentSlicedBucketForAnomaly() {
|
||||
mCurrentSlicedBucketForAnomaly->clear();
|
||||
status_t err = NO_ERROR;
|
||||
for (const auto& slice : *mCurrentSlicedBucket) {
|
||||
KeyValuePair pair = slice.second->kv[0];
|
||||
long gaugeVal = 0;
|
||||
if (pair.has_value_int()) {
|
||||
gaugeVal = (long)pair.value_int();
|
||||
} else if (pair.has_value_long()) {
|
||||
gaugeVal = pair.value_long();
|
||||
}
|
||||
(*mCurrentSlicedBucketForAnomaly)[slice.first] = gaugeVal;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -276,6 +335,8 @@ void GaugeMetricProducer::onMatchedLogEventInternalLocked(
|
||||
// the GaugeMetricProducer while holding the lock.
|
||||
void GaugeMetricProducer::flushIfNeededLocked(const uint64_t& eventTimeNs) {
|
||||
if (eventTimeNs < mCurrentBucketStartTimeNs + mBucketSizeNs) {
|
||||
VLOG("eventTime is %lld, less than next bucket start time %lld", (long long)eventTimeNs,
|
||||
(long long)(mCurrentBucketStartTimeNs + mBucketSizeNs));
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -285,19 +346,22 @@ void GaugeMetricProducer::flushIfNeededLocked(const uint64_t& eventTimeNs) {
|
||||
info.mBucketNum = mCurrentBucketNum;
|
||||
|
||||
for (const auto& slice : *mCurrentSlicedBucket) {
|
||||
info.mGauge = slice.second;
|
||||
info.mEvent = slice.second;
|
||||
auto& bucketList = mPastBuckets[slice.first];
|
||||
bucketList.push_back(info);
|
||||
VLOG("gauge metric %s, dump key value: %s -> %lld", mName.c_str(), slice.first.c_str(),
|
||||
(long long)slice.second);
|
||||
VLOG("gauge metric %s, dump key value: %s -> %s", mName.c_str(),
|
||||
slice.first.c_str(), slice.second->ToString().c_str());
|
||||
}
|
||||
|
||||
// Reset counters
|
||||
for (auto& tracker : mAnomalyTrackers) {
|
||||
tracker->addPastBucket(mCurrentSlicedBucket, mCurrentBucketNum);
|
||||
if (mAnomalyTrackers.size() > 0) {
|
||||
updateCurrentSlicedBucketForAnomaly();
|
||||
for (auto& tracker : mAnomalyTrackers) {
|
||||
tracker->addPastBucket(mCurrentSlicedBucketForAnomaly, mCurrentBucketNum);
|
||||
}
|
||||
}
|
||||
|
||||
mCurrentSlicedBucket = std::make_shared<DimToValMap>();
|
||||
mCurrentSlicedBucket = std::make_shared<DimToEventKVMap>();
|
||||
|
||||
// Adjusts the bucket start time
|
||||
int64_t numBucketsForward = (eventTimeNs - mCurrentBucketStartTimeNs) / mBucketSizeNs;
|
||||
|
||||
@@ -26,7 +26,7 @@
|
||||
#include "../matchers/matcher_util.h"
|
||||
#include "MetricProducer.h"
|
||||
#include "frameworks/base/cmds/statsd/src/statsd_config.pb.h"
|
||||
#include "stats_util.h"
|
||||
#include "../stats_util.h"
|
||||
|
||||
namespace android {
|
||||
namespace os {
|
||||
@@ -35,7 +35,7 @@ namespace statsd {
|
||||
struct GaugeBucket {
|
||||
int64_t mBucketStartNs;
|
||||
int64_t mBucketEndNs;
|
||||
int64_t mGauge;
|
||||
std::shared_ptr<EventKV> mEvent;
|
||||
uint64_t mBucketNum;
|
||||
};
|
||||
|
||||
@@ -49,7 +49,7 @@ public:
|
||||
// for all metrics.
|
||||
GaugeMetricProducer(const ConfigKey& key, const GaugeMetric& countMetric,
|
||||
const int conditionIndex, const sp<ConditionWizard>& wizard,
|
||||
const int pullTagId, const int64_t startTimeNs);
|
||||
const int pullTagId, const int atomTagId, const int64_t startTimeNs);
|
||||
|
||||
virtual ~GaugeMetricProducer();
|
||||
|
||||
@@ -72,6 +72,12 @@ private:
|
||||
void onDumpReportLocked(const uint64_t dumpTimeNs,
|
||||
android::util::ProtoOutputStream* protoOutput) override;
|
||||
|
||||
// for testing
|
||||
GaugeMetricProducer(const ConfigKey& key, const GaugeMetric& gaugeMetric,
|
||||
const int conditionIndex, const sp<ConditionWizard>& wizard,
|
||||
const int pullTagId, const int atomTagId, const uint64_t startTimeNs,
|
||||
std::shared_ptr<StatsPullerManager> statsPullerManager);
|
||||
|
||||
// Internal interface to handle condition change.
|
||||
void onConditionChangedLocked(const bool conditionMet, const uint64_t eventTime) override;
|
||||
|
||||
@@ -84,12 +90,10 @@ private:
|
||||
// Util function to flush the old packet.
|
||||
void flushIfNeededLocked(const uint64_t& eventTime);
|
||||
|
||||
// The default bucket size for gauge metric is 1 second.
|
||||
static const uint64_t kDefaultGaugemBucketSizeNs = 1000 * 1000 * 1000;
|
||||
// The default bucket size for gauge metric is 1 hr.
|
||||
static const uint64_t kDefaultGaugemBucketSizeNs = 60ULL * 60 * 1000 * 1000 * 1000;
|
||||
|
||||
const int32_t mGaugeField;
|
||||
|
||||
StatsPullerManager mStatsPullerManager;
|
||||
std::shared_ptr<StatsPullerManager> mStatsPullerManager;
|
||||
// tagId for pulled data. -1 if this is not pulled
|
||||
const int mPullTagId;
|
||||
|
||||
@@ -98,9 +102,21 @@ private:
|
||||
std::unordered_map<HashableDimensionKey, std::vector<GaugeBucket>> mPastBuckets;
|
||||
|
||||
// The current bucket.
|
||||
std::shared_ptr<DimToValMap> mCurrentSlicedBucket = std::make_shared<DimToValMap>();
|
||||
std::shared_ptr<DimToEventKVMap> mCurrentSlicedBucket = std::make_shared<DimToEventKVMap>();
|
||||
|
||||
int64_t getGauge(const LogEvent& event);
|
||||
// The current bucket for anomaly detection.
|
||||
std::shared_ptr<DimToValMap> mCurrentSlicedBucketForAnomaly = std::make_shared<DimToValMap>();
|
||||
|
||||
// Translate Atom based bucket to single numeric value bucket for anomaly
|
||||
void updateCurrentSlicedBucketForAnomaly();
|
||||
|
||||
int mAtomTagId;
|
||||
|
||||
// Whitelist of fields to report. Empty means all are reported.
|
||||
std::vector<int> mGaugeFields;
|
||||
|
||||
// apply a whitelist on the original input
|
||||
std::shared_ptr<EventKV> getGauge(const LogEvent& event);
|
||||
|
||||
// Util function to check whether the specified dimension hits the guardrail.
|
||||
bool hitGuardRailLocked(const HashableDimensionKey& newKey);
|
||||
|
||||
@@ -370,15 +370,11 @@ bool initMetrics(const ConfigKey& key, const StatsdConfig& config,
|
||||
|
||||
sp<LogMatchingTracker> atomMatcher = allAtomMatchers.at(trackerIndex);
|
||||
// If it is pulled atom, it should be simple matcher with one tagId.
|
||||
int pullTagId = -1;
|
||||
for (int tagId : atomMatcher->getTagIds()) {
|
||||
if (statsPullerManager.PullerForMatcherExists(tagId)) {
|
||||
if (atomMatcher->getTagIds().size() != 1) {
|
||||
return false;
|
||||
}
|
||||
pullTagId = tagId;
|
||||
}
|
||||
if (atomMatcher->getTagIds().size() != 1) {
|
||||
return false;
|
||||
}
|
||||
int atomTagId = *(atomMatcher->getTagIds().begin());
|
||||
int pullTagId = statsPullerManager.PullerForMatcherExists(atomTagId) ? atomTagId : -1;
|
||||
|
||||
int conditionIndex = -1;
|
||||
if (metric.has_condition()) {
|
||||
@@ -404,7 +400,17 @@ bool initMetrics(const ConfigKey& key, const StatsdConfig& config,
|
||||
for (int i = 0; i < config.gauge_metric_size(); i++) {
|
||||
const GaugeMetric& metric = config.gauge_metric(i);
|
||||
if (!metric.has_what()) {
|
||||
ALOGW("cannot find \"what\" in ValueMetric \"%s\"", metric.name().c_str());
|
||||
ALOGW("cannot find \"what\" in GaugeMetric \"%s\"", metric.name().c_str());
|
||||
return false;
|
||||
}
|
||||
|
||||
if (((!metric.gauge_fields().has_include_all() ||
|
||||
(metric.gauge_fields().has_include_all() &&
|
||||
metric.gauge_fields().include_all() == false)) &&
|
||||
metric.gauge_fields().field_num_size() == 0) ||
|
||||
(metric.gauge_fields().has_include_all() && metric.gauge_fields().include_all() == true &&
|
||||
metric.gauge_fields().field_num_size() > 0)) {
|
||||
ALOGW("Incorrect field filter setting in GaugeMetric %s", metric.name().c_str());
|
||||
return false;
|
||||
}
|
||||
|
||||
@@ -419,15 +425,11 @@ bool initMetrics(const ConfigKey& key, const StatsdConfig& config,
|
||||
|
||||
sp<LogMatchingTracker> atomMatcher = allAtomMatchers.at(trackerIndex);
|
||||
// If it is pulled atom, it should be simple matcher with one tagId.
|
||||
int pullTagId = -1;
|
||||
for (int tagId : atomMatcher->getTagIds()) {
|
||||
if (statsPullerManager.PullerForMatcherExists(tagId)) {
|
||||
if (atomMatcher->getTagIds().size() != 1) {
|
||||
return false;
|
||||
}
|
||||
pullTagId = tagId;
|
||||
}
|
||||
if (atomMatcher->getTagIds().size() != 1) {
|
||||
return false;
|
||||
}
|
||||
int atomTagId = *(atomMatcher->getTagIds().begin());
|
||||
int pullTagId = statsPullerManager.PullerForMatcherExists(atomTagId) ? atomTagId : -1;
|
||||
|
||||
int conditionIndex = -1;
|
||||
if (metric.has_condition()) {
|
||||
@@ -444,8 +446,8 @@ bool initMetrics(const ConfigKey& key, const StatsdConfig& config,
|
||||
}
|
||||
}
|
||||
|
||||
sp<MetricProducer> gaugeProducer = new GaugeMetricProducer(key, metric, conditionIndex,
|
||||
wizard, pullTagId, startTimeNs);
|
||||
sp<MetricProducer> gaugeProducer = new GaugeMetricProducer(
|
||||
key, metric, conditionIndex, wizard, pullTagId, atomTagId, startTimeNs);
|
||||
allMetricProducers.push_back(gaugeProducer);
|
||||
}
|
||||
return true;
|
||||
|
||||
@@ -29,9 +29,10 @@ message KeyValuePair {
|
||||
|
||||
oneof value {
|
||||
string value_str = 2;
|
||||
int64 value_int = 3;
|
||||
bool value_bool = 4;
|
||||
float value_float = 5;
|
||||
int32 value_int = 3;
|
||||
int64 value_long = 4;
|
||||
bool value_bool = 5;
|
||||
float value_float = 6;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -88,7 +89,7 @@ message GaugeBucketInfo {
|
||||
|
||||
optional int64 end_bucket_nanos = 2;
|
||||
|
||||
optional int64 gauge = 3;
|
||||
optional Atom atom = 3;
|
||||
}
|
||||
|
||||
message GaugeMetricData {
|
||||
|
||||
@@ -35,6 +35,9 @@ HashableDimensionKey getHashableKey(std::vector<KeyValuePair> keys) {
|
||||
case KeyValuePair::ValueCase::kValueInt:
|
||||
flattened += std::to_string(pair.value_int());
|
||||
break;
|
||||
case KeyValuePair::ValueCase::kValueLong:
|
||||
flattened += std::to_string(pair.value_long());
|
||||
break;
|
||||
case KeyValuePair::ValueCase::kValueBool:
|
||||
flattened += std::to_string(pair.value_bool());
|
||||
break;
|
||||
|
||||
@@ -17,6 +17,8 @@
|
||||
#pragma once
|
||||
|
||||
#include "frameworks/base/cmds/statsd/src/stats_log.pb.h"
|
||||
#include <sstream>
|
||||
#include "logd/LogReader.h"
|
||||
|
||||
#include <unordered_map>
|
||||
|
||||
@@ -32,6 +34,41 @@ typedef std::map<std::string, HashableDimensionKey> ConditionKey;
|
||||
|
||||
typedef std::unordered_map<HashableDimensionKey, int64_t> DimToValMap;
|
||||
|
||||
/*
|
||||
* In memory rep for LogEvent. Uses much less memory than LogEvent
|
||||
*/
|
||||
typedef struct EventKV {
|
||||
std::vector<KeyValuePair> kv;
|
||||
string ToString() const {
|
||||
std::ostringstream result;
|
||||
result << "{ ";
|
||||
const size_t N = kv.size();
|
||||
for (size_t i = 0; i < N; i++) {
|
||||
result << " ";
|
||||
result << (i + 1);
|
||||
result << "->";
|
||||
const auto& pair = kv[i];
|
||||
if (pair.has_value_int()) {
|
||||
result << pair.value_int();
|
||||
} else if (pair.has_value_long()) {
|
||||
result << pair.value_long();
|
||||
} else if (pair.has_value_float()) {
|
||||
result << pair.value_float();
|
||||
} else if (pair.has_value_str()) {
|
||||
result << pair.value_str().c_str();
|
||||
}
|
||||
}
|
||||
result << " }";
|
||||
return result.str();
|
||||
}
|
||||
} EventKV;
|
||||
|
||||
typedef std::unordered_map<HashableDimensionKey, std::shared_ptr<EventKV>> DimToEventKVMap;
|
||||
|
||||
EventMetricData parse(log_msg msg);
|
||||
|
||||
int getTagId(log_msg msg);
|
||||
|
||||
std::string getHashableKey(std::vector<KeyValuePair> key);
|
||||
|
||||
} // namespace statsd
|
||||
|
||||
@@ -120,6 +120,11 @@ message MetricConditionLink {
|
||||
repeated KeyMatcher key_in_condition = 3;
|
||||
}
|
||||
|
||||
message FieldFilter {
|
||||
optional bool include_all = 1;
|
||||
repeated int32 field_num = 2;
|
||||
}
|
||||
|
||||
message EventMetric {
|
||||
optional string name = 1;
|
||||
|
||||
@@ -170,7 +175,7 @@ message GaugeMetric {
|
||||
|
||||
optional string what = 2;
|
||||
|
||||
optional int32 gauge_field = 3;
|
||||
optional FieldFilter gauge_fields = 3;
|
||||
|
||||
optional string condition = 4;
|
||||
|
||||
|
||||
@@ -26,6 +26,7 @@ using android::sp;
|
||||
using std::set;
|
||||
using std::unordered_map;
|
||||
using std::vector;
|
||||
using std::make_shared;
|
||||
|
||||
#ifdef __ANDROID__
|
||||
|
||||
@@ -34,120 +35,142 @@ namespace os {
|
||||
namespace statsd {
|
||||
|
||||
const ConfigKey kConfigKey(0, "test");
|
||||
const int tagId = 1;
|
||||
const string metricName = "test_metric";
|
||||
const int64_t bucketStartTimeNs = 10000000000;
|
||||
const int64_t bucketSizeNs = 60 * 1000 * 1000 * 1000LL;
|
||||
const int64_t bucket2StartTimeNs = bucketStartTimeNs + bucketSizeNs;
|
||||
const int64_t bucket3StartTimeNs = bucketStartTimeNs + 2 * bucketSizeNs;
|
||||
const int64_t bucket4StartTimeNs = bucketStartTimeNs + 3 * bucketSizeNs;
|
||||
|
||||
TEST(GaugeMetricProducerTest, TestWithCondition) {
|
||||
int64_t bucketStartTimeNs = 10000000000;
|
||||
int64_t bucketSizeNs = 60 * 1000 * 1000 * 1000LL;
|
||||
|
||||
TEST(GaugeMetricProducerTest, TestNoCondition) {
|
||||
GaugeMetric metric;
|
||||
metric.set_name("1");
|
||||
metric.set_name(metricName);
|
||||
metric.mutable_bucket()->set_bucket_size_millis(bucketSizeNs / 1000000);
|
||||
metric.set_gauge_field(2);
|
||||
metric.mutable_gauge_fields()->add_field_num(2);
|
||||
|
||||
sp<MockConditionWizard> wizard = new NaggyMock<MockConditionWizard>();
|
||||
|
||||
GaugeMetricProducer gaugeProducer(metric, 1 /*has condition*/, wizard, -1, bucketStartTimeNs);
|
||||
// TODO: pending refactor of StatsPullerManager
|
||||
// For now we still need this so that it doesn't do real pulling.
|
||||
shared_ptr<MockStatsPullerManager> pullerManager =
|
||||
make_shared<StrictMock<MockStatsPullerManager>>();
|
||||
EXPECT_CALL(*pullerManager, RegisterReceiver(tagId, _, _)).WillOnce(Return());
|
||||
EXPECT_CALL(*pullerManager, UnRegisterReceiver(tagId, _)).WillOnce(Return());
|
||||
|
||||
vector<std::shared_ptr<LogEvent>> allData;
|
||||
std::shared_ptr<LogEvent> event1 = std::make_shared<LogEvent>(1, bucketStartTimeNs + 1);
|
||||
event1->write(1);
|
||||
event1->write(13);
|
||||
event1->init();
|
||||
allData.push_back(event1);
|
||||
GaugeMetricProducer gaugeProducer(kConfigKey, metric, -1 /*-1 meaning no condition*/, wizard,
|
||||
tagId, tagId, bucketStartTimeNs, pullerManager);
|
||||
|
||||
std::shared_ptr<LogEvent> event2 = std::make_shared<LogEvent>(1, bucketStartTimeNs + 10);
|
||||
event2->write(1);
|
||||
event2->write(15);
|
||||
event2->init();
|
||||
allData.push_back(event2);
|
||||
vector<shared_ptr<LogEvent>> allData;
|
||||
allData.clear();
|
||||
shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucket2StartTimeNs + 1);
|
||||
event->write(tagId);
|
||||
event->write(11);
|
||||
event->init();
|
||||
allData.push_back(event);
|
||||
|
||||
gaugeProducer.onDataPulled(allData);
|
||||
gaugeProducer.flushIfNeededLocked(event2->GetTimestampNs() + 1);
|
||||
EXPECT_EQ(0UL, gaugeProducer.mCurrentSlicedBucket->size());
|
||||
EXPECT_EQ(1UL, gaugeProducer.mCurrentSlicedBucket->size());
|
||||
EXPECT_EQ(11, gaugeProducer.mCurrentSlicedBucket->begin()->second->kv[0].value_int());
|
||||
EXPECT_EQ(0UL, gaugeProducer.mPastBuckets.size());
|
||||
|
||||
gaugeProducer.onConditionChanged(true, bucketStartTimeNs + 11);
|
||||
gaugeProducer.onConditionChanged(false, bucketStartTimeNs + 21);
|
||||
gaugeProducer.onConditionChanged(true, bucketStartTimeNs + bucketSizeNs + 11);
|
||||
std::shared_ptr<LogEvent> event3 =
|
||||
std::make_shared<LogEvent>(1, bucketStartTimeNs + 2 * bucketSizeNs + 10);
|
||||
event3->write(1);
|
||||
event3->write(25);
|
||||
event3->init();
|
||||
allData.push_back(event3);
|
||||
allData.clear();
|
||||
std::shared_ptr<LogEvent> event2 =
|
||||
std::make_shared<LogEvent>(tagId, bucket3StartTimeNs + 10);
|
||||
event2->write(tagId);
|
||||
event2->write(25);
|
||||
event2->init();
|
||||
allData.push_back(event2);
|
||||
gaugeProducer.onDataPulled(allData);
|
||||
gaugeProducer.flushIfNeededLocked(bucketStartTimeNs + 2 * bucketSizeNs + 10);
|
||||
EXPECT_EQ(1UL, gaugeProducer.mCurrentSlicedBucket->size());
|
||||
EXPECT_EQ(25L, gaugeProducer.mCurrentSlicedBucket->begin()->second);
|
||||
EXPECT_EQ(25, gaugeProducer.mCurrentSlicedBucket->begin()->second->kv[0].value_int());
|
||||
// One dimension.
|
||||
EXPECT_EQ(1UL, gaugeProducer.mPastBuckets.size());
|
||||
EXPECT_EQ(1UL, gaugeProducer.mPastBuckets.begin()->second.size());
|
||||
EXPECT_EQ(25L, gaugeProducer.mPastBuckets.begin()->second.front().mGauge);
|
||||
EXPECT_EQ(2UL, gaugeProducer.mPastBuckets.begin()->second.front().mBucketNum);
|
||||
EXPECT_EQ(bucketStartTimeNs + 2 * bucketSizeNs,
|
||||
gaugeProducer.mPastBuckets.begin()->second.front().mBucketStartNs);
|
||||
EXPECT_EQ(11L, gaugeProducer.mPastBuckets.begin()->second.back().mEvent->kv[0].value_int());
|
||||
EXPECT_EQ(1UL, gaugeProducer.mPastBuckets.begin()->second.back().mBucketNum);
|
||||
|
||||
gaugeProducer.flushIfNeededLocked(bucket4StartTimeNs);
|
||||
EXPECT_EQ(0UL, gaugeProducer.mCurrentSlicedBucket->size());
|
||||
// One dimension.
|
||||
EXPECT_EQ(1UL, gaugeProducer.mPastBuckets.size());
|
||||
EXPECT_EQ(2UL, gaugeProducer.mPastBuckets.begin()->second.size());
|
||||
EXPECT_EQ(25L, gaugeProducer.mPastBuckets.begin()->second.back().mEvent->kv[0].value_int());
|
||||
EXPECT_EQ(2UL, gaugeProducer.mPastBuckets.begin()->second.back().mBucketNum);
|
||||
}
|
||||
|
||||
TEST(GaugeMetricProducerTest, TestNoCondition) {
|
||||
int64_t bucketStartTimeNs = 10000000000;
|
||||
int64_t bucketSizeNs = 60 * 1000 * 1000 * 1000LL;
|
||||
|
||||
TEST(GaugeMetricProducerTest, TestWithCondition) {
|
||||
GaugeMetric metric;
|
||||
metric.set_name("1");
|
||||
metric.set_name(metricName);
|
||||
metric.mutable_bucket()->set_bucket_size_millis(bucketSizeNs / 1000000);
|
||||
metric.set_gauge_field(2);
|
||||
metric.mutable_gauge_fields()->add_field_num(2);
|
||||
metric.set_condition("SCREEN_ON");
|
||||
|
||||
sp<MockConditionWizard> wizard = new NaggyMock<MockConditionWizard>();
|
||||
|
||||
GaugeMetricProducer gaugeProducer(metric, -1 /*no condition*/, wizard, -1, bucketStartTimeNs);
|
||||
shared_ptr<MockStatsPullerManager> pullerManager =
|
||||
make_shared<StrictMock<MockStatsPullerManager>>();
|
||||
EXPECT_CALL(*pullerManager, RegisterReceiver(tagId, _, _)).WillOnce(Return());
|
||||
EXPECT_CALL(*pullerManager, UnRegisterReceiver(tagId, _)).WillOnce(Return());
|
||||
EXPECT_CALL(*pullerManager, Pull(tagId, _))
|
||||
.WillOnce(Invoke([](int tagId, vector<std::shared_ptr<LogEvent>>* data) {
|
||||
data->clear();
|
||||
shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucketStartTimeNs + 10);
|
||||
event->write(tagId);
|
||||
event->write(100);
|
||||
event->init();
|
||||
data->push_back(event);
|
||||
return true;
|
||||
}));
|
||||
|
||||
vector<std::shared_ptr<LogEvent>> allData;
|
||||
std::shared_ptr<LogEvent> event1 = std::make_shared<LogEvent>(1, bucketStartTimeNs + 1);
|
||||
event1->write(1);
|
||||
event1->write(13);
|
||||
event1->init();
|
||||
allData.push_back(event1);
|
||||
GaugeMetricProducer gaugeProducer(kConfigKey, metric, 1, wizard, tagId, tagId,
|
||||
bucketStartTimeNs, pullerManager);
|
||||
|
||||
std::shared_ptr<LogEvent> event2 = std::make_shared<LogEvent>(1, bucketStartTimeNs + 10);
|
||||
event2->write(1);
|
||||
event2->write(15);
|
||||
event2->init();
|
||||
allData.push_back(event2);
|
||||
|
||||
std::shared_ptr<LogEvent> event3 =
|
||||
std::make_shared<LogEvent>(1, bucketStartTimeNs + 2 * bucketSizeNs + 10);
|
||||
event3->write(1);
|
||||
event3->write(25);
|
||||
event3->init();
|
||||
allData.push_back(event3);
|
||||
|
||||
gaugeProducer.onDataPulled(allData);
|
||||
// Has one slice
|
||||
gaugeProducer.onConditionChanged(true, bucketStartTimeNs + 8);
|
||||
EXPECT_EQ(1UL, gaugeProducer.mCurrentSlicedBucket->size());
|
||||
EXPECT_EQ(25L, gaugeProducer.mCurrentSlicedBucket->begin()->second);
|
||||
EXPECT_EQ(100, gaugeProducer.mCurrentSlicedBucket->begin()->second->kv[0].value_int());
|
||||
EXPECT_EQ(0UL, gaugeProducer.mPastBuckets.size());
|
||||
|
||||
vector<shared_ptr<LogEvent>> allData;
|
||||
allData.clear();
|
||||
shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, bucket2StartTimeNs + 1);
|
||||
event->write(1);
|
||||
event->write(110);
|
||||
event->init();
|
||||
allData.push_back(event);
|
||||
gaugeProducer.onDataPulled(allData);
|
||||
|
||||
EXPECT_EQ(1UL, gaugeProducer.mCurrentSlicedBucket->size());
|
||||
EXPECT_EQ(110, gaugeProducer.mCurrentSlicedBucket->begin()->second->kv[0].value_int());
|
||||
EXPECT_EQ(1UL, gaugeProducer.mPastBuckets.size());
|
||||
EXPECT_EQ(100, gaugeProducer.mPastBuckets.begin()->second.back().mEvent->kv[0].value_int());
|
||||
|
||||
gaugeProducer.onConditionChanged(false, bucket2StartTimeNs + 10);
|
||||
gaugeProducer.flushIfNeededLocked(bucket3StartTimeNs + 10);
|
||||
EXPECT_EQ(1UL, gaugeProducer.mPastBuckets.size());
|
||||
EXPECT_EQ(2UL, gaugeProducer.mPastBuckets.begin()->second.size());
|
||||
EXPECT_EQ(13L, gaugeProducer.mPastBuckets.begin()->second.front().mGauge);
|
||||
EXPECT_EQ(0UL, gaugeProducer.mPastBuckets.begin()->second.front().mBucketNum);
|
||||
EXPECT_EQ(25L, gaugeProducer.mPastBuckets.begin()->second.back().mGauge);
|
||||
EXPECT_EQ(2UL, gaugeProducer.mPastBuckets.begin()->second.back().mBucketNum);
|
||||
EXPECT_EQ(bucketStartTimeNs + 2 * bucketSizeNs,
|
||||
gaugeProducer.mPastBuckets.begin()->second.back().mBucketStartNs);
|
||||
EXPECT_EQ(110L, gaugeProducer.mPastBuckets.begin()->second.back().mEvent->kv[0].value_int());
|
||||
EXPECT_EQ(1UL, gaugeProducer.mPastBuckets.begin()->second.back().mBucketNum);
|
||||
}
|
||||
|
||||
TEST(GaugeMetricProducerTest, TestAnomalyDetection) {
|
||||
int64_t bucketStartTimeNs = 10000000000;
|
||||
int64_t bucketSizeNs = 60 * 1000 * 1000 * 1000LL;
|
||||
sp<MockConditionWizard> wizard = new NaggyMock<MockConditionWizard>();
|
||||
|
||||
shared_ptr<MockStatsPullerManager> pullerManager =
|
||||
make_shared<StrictMock<MockStatsPullerManager>>();
|
||||
EXPECT_CALL(*pullerManager, RegisterReceiver(tagId, _, _)).WillOnce(Return());
|
||||
EXPECT_CALL(*pullerManager, UnRegisterReceiver(tagId, _)).WillOnce(Return());
|
||||
|
||||
GaugeMetric metric;
|
||||
metric.set_name("1");
|
||||
metric.set_name(metricName);
|
||||
metric.mutable_bucket()->set_bucket_size_millis(bucketSizeNs / 1000000);
|
||||
metric.set_gauge_field(2);
|
||||
GaugeMetricProducer gaugeProducer(metric, -1 /*no condition*/, wizard, -1, bucketStartTimeNs);
|
||||
metric.mutable_gauge_fields()->add_field_num(2);
|
||||
GaugeMetricProducer gaugeProducer(kConfigKey, metric, -1 /*-1 meaning no condition*/, wizard,
|
||||
tagId, tagId, bucketStartTimeNs, pullerManager);
|
||||
|
||||
Alert alert;
|
||||
alert.set_name("alert");
|
||||
alert.set_metric_name("1");
|
||||
alert.set_metric_name(metricName);
|
||||
alert.set_trigger_if_sum_gt(25);
|
||||
alert.set_number_of_buckets(2);
|
||||
sp<AnomalyTracker> anomalyTracker = new AnomalyTracker(alert, kConfigKey);
|
||||
@@ -160,7 +183,7 @@ TEST(GaugeMetricProducerTest, TestAnomalyDetection) {
|
||||
|
||||
gaugeProducer.onDataPulled({event1});
|
||||
EXPECT_EQ(1UL, gaugeProducer.mCurrentSlicedBucket->size());
|
||||
EXPECT_EQ(13L, gaugeProducer.mCurrentSlicedBucket->begin()->second);
|
||||
EXPECT_EQ(13L, gaugeProducer.mCurrentSlicedBucket->begin()->second->kv[0].value_int());
|
||||
EXPECT_EQ(anomalyTracker->getLastAlarmTimestampNs(), -1LL);
|
||||
|
||||
std::shared_ptr<LogEvent> event2 =
|
||||
@@ -171,7 +194,7 @@ TEST(GaugeMetricProducerTest, TestAnomalyDetection) {
|
||||
|
||||
gaugeProducer.onDataPulled({event2});
|
||||
EXPECT_EQ(1UL, gaugeProducer.mCurrentSlicedBucket->size());
|
||||
EXPECT_EQ(15L, gaugeProducer.mCurrentSlicedBucket->begin()->second);
|
||||
EXPECT_EQ(15L, gaugeProducer.mCurrentSlicedBucket->begin()->second->kv[0].value_int());
|
||||
EXPECT_EQ(anomalyTracker->getLastAlarmTimestampNs(), (long long)event2->GetTimestampNs());
|
||||
|
||||
std::shared_ptr<LogEvent> event3 =
|
||||
@@ -182,7 +205,7 @@ TEST(GaugeMetricProducerTest, TestAnomalyDetection) {
|
||||
|
||||
gaugeProducer.onDataPulled({event3});
|
||||
EXPECT_EQ(1UL, gaugeProducer.mCurrentSlicedBucket->size());
|
||||
EXPECT_EQ(24L, gaugeProducer.mCurrentSlicedBucket->begin()->second);
|
||||
EXPECT_EQ(24L, gaugeProducer.mCurrentSlicedBucket->begin()->second->kv[0].value_int());
|
||||
EXPECT_EQ(anomalyTracker->getLastAlarmTimestampNs(), (long long)event3->GetTimestampNs());
|
||||
|
||||
// The event4 does not have the gauge field. Thus the current bucket value is 0.
|
||||
@@ -191,7 +214,8 @@ TEST(GaugeMetricProducerTest, TestAnomalyDetection) {
|
||||
event4->write(1);
|
||||
event4->init();
|
||||
gaugeProducer.onDataPulled({event4});
|
||||
EXPECT_EQ(0UL, gaugeProducer.mCurrentSlicedBucket->size());
|
||||
EXPECT_EQ(1UL, gaugeProducer.mCurrentSlicedBucket->size());
|
||||
EXPECT_EQ(0, gaugeProducer.mCurrentSlicedBucket->begin()->second->kv[0].value_int());
|
||||
EXPECT_EQ(anomalyTracker->getLastAlarmTimestampNs(), (long long)event3->GetTimestampNs());
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user