Merge "Handle errors within LogEvent" into rvc-dev am: 3360c54cd6

Change-Id: I97273d19d981a6f817427071823929a80797e9ff
This commit is contained in:
Ruchir Rastogi
2020-04-21 17:12:11 +00:00
committed by Automerger Merge Worker
9 changed files with 144 additions and 19 deletions

View File

@@ -389,15 +389,24 @@ void StatsLogProcessor::OnLogEvent(LogEvent* event) {
void StatsLogProcessor::OnLogEvent(LogEvent* event, int64_t elapsedRealtimeNs) {
std::lock_guard<std::mutex> lock(mMetricsMutex);
// Tell StatsdStats about new event
const int64_t eventElapsedTimeNs = event->GetElapsedTimestampNs();
int atomId = event->GetTagId();
StatsdStats::getInstance().noteAtomLogged(atomId, eventElapsedTimeNs / NS_PER_SEC);
if (!event->isValid()) {
StatsdStats::getInstance().noteAtomError(atomId);
return;
}
// Hard-coded logic to update train info on disk and fill in any information
// this log event may be missing.
if (event->GetTagId() == android::os::statsd::util::BINARY_PUSH_STATE_CHANGED) {
if (atomId == android::os::statsd::util::BINARY_PUSH_STATE_CHANGED) {
onBinaryPushStateChangedEventLocked(event);
}
// Hard-coded logic to update experiment ids on disk for certain rollback
// types and fill the rollback atom with experiment ids
if (event->GetTagId() == android::os::statsd::util::WATCHDOG_ROLLBACK_OCCURRED) {
if (atomId == android::os::statsd::util::WATCHDOG_ROLLBACK_OCCURRED) {
onWatchdogRollbackOccurredLocked(event);
}
@@ -406,16 +415,11 @@ void StatsLogProcessor::OnLogEvent(LogEvent* event, int64_t elapsedRealtimeNs) {
ALOGI("%s", event->ToString().c_str());
}
#endif
const int64_t eventElapsedTimeNs = event->GetElapsedTimestampNs();
resetIfConfigTtlExpiredLocked(eventElapsedTimeNs);
StatsdStats::getInstance().noteAtomLogged(
event->GetTagId(), event->GetElapsedTimestampNs() / NS_PER_SEC);
// Hard-coded logic to update the isolated uid's in the uid-map.
// The field numbers need to be currently updated by hand with atoms.proto
if (event->GetTagId() == android::os::statsd::util::ISOLATED_UID_CHANGED) {
if (atomId == android::os::statsd::util::ISOLATED_UID_CHANGED) {
onIsolatedUidChangedEventLocked(*event);
}
@@ -432,7 +436,7 @@ void StatsLogProcessor::OnLogEvent(LogEvent* event, int64_t elapsedRealtimeNs) {
}
if (event->GetTagId() != android::os::statsd::util::ISOLATED_UID_CHANGED) {
if (atomId != android::os::statsd::util::ISOLATED_UID_CHANGED) {
// Map the isolated uid to host uid if necessary.
mapIsolatedUidToHostUidIfNecessaryLocked(event);
}

View File

@@ -67,8 +67,14 @@ bool StatsCallbackPuller::PullInternal(vector<shared_ptr<LogEvent>>* data) {
lock_guard<mutex> lk(*cv_mutex);
for (const StatsEventParcel& parcel: output) {
shared_ptr<LogEvent> event = make_shared<LogEvent>(/*uid=*/-1, /*pid=*/-1);
event->parseBuffer((uint8_t*)parcel.buffer.data(), parcel.buffer.size());
sharedData->push_back(event);
bool valid = event->parseBuffer((uint8_t*)parcel.buffer.data(),
parcel.buffer.size());
if (valid) {
sharedData->push_back(event);
} else {
StatsdStats::getInstance().noteAtomError(event->GetTagId(),
/*pull=*/true);
}
}
*pullSuccess = success;
*pullFinish = true;

View File

@@ -54,6 +54,7 @@ const int FIELD_ID_ACTIVATION_BROADCAST_GUARDRAIL = 19;
const int FIELD_ID_ATOM_STATS_TAG = 1;
const int FIELD_ID_ATOM_STATS_COUNT = 2;
const int FIELD_ID_ATOM_STATS_ERROR_COUNT = 3;
const int FIELD_ID_ANOMALY_ALARMS_REGISTERED = 1;
const int FIELD_ID_PERIODIC_ALARMS_REGISTERED = 1;
@@ -549,6 +550,20 @@ void StatsdStats::noteBucketBoundaryDelayNs(int64_t metricId, int64_t timeDelayN
std::min(pullStats.minBucketBoundaryDelayNs, timeDelayNs);
}
void StatsdStats::noteAtomError(int atomTag, bool pull) {
lock_guard<std::mutex> lock(mLock);
if (pull) {
mPulledAtomStats[atomTag].atomErrorCount++;
return;
}
bool present = (mPushedAtomErrorStats.find(atomTag) != mPushedAtomErrorStats.end());
bool full = (mPushedAtomErrorStats.size() >= (size_t)kMaxPushedAtomErrorStatsSize);
if (!full || present) {
mPushedAtomErrorStats[atomTag]++;
}
}
StatsdStats::AtomMetricStats& StatsdStats::getAtomMetricStats(int64_t metricId) {
auto atomMetricStatsIter = mAtomMetricStats.find(metricId);
if (atomMetricStatsIter != mAtomMetricStats.end()) {
@@ -604,9 +619,11 @@ void StatsdStats::resetInternalLocked() {
pullStats.second.pullExceedMaxDelay = 0;
pullStats.second.registeredCount = 0;
pullStats.second.unregisteredCount = 0;
pullStats.second.atomErrorCount = 0;
}
mAtomMetricStats.clear();
mActivationBroadcastGuardrailStats.clear();
mPushedAtomErrorStats.clear();
}
string buildTimeString(int64_t timeSec) {
@@ -617,6 +634,15 @@ string buildTimeString(int64_t timeSec) {
return string(timeBuffer);
}
int StatsdStats::getPushedAtomErrors(int atomId) const {
const auto& it = mPushedAtomErrorStats.find(atomId);
if (it != mPushedAtomErrorStats.end()) {
return it->second;
} else {
return 0;
}
}
void StatsdStats::dumpStats(int out) const {
lock_guard<std::mutex> lock(mLock);
time_t t = mStartTimeSec;
@@ -721,11 +747,13 @@ void StatsdStats::dumpStats(int out) const {
const size_t atomCounts = mPushedAtomStats.size();
for (size_t i = 2; i < atomCounts; i++) {
if (mPushedAtomStats[i] > 0) {
dprintf(out, "Atom %lu->%d\n", (unsigned long)i, mPushedAtomStats[i]);
dprintf(out, "Atom %zu->(total count)%d, (error count)%d\n", i, mPushedAtomStats[i],
getPushedAtomErrors((int)i));
}
}
for (const auto& pair : mNonPlatformPushedAtomStats) {
dprintf(out, "Atom %lu->%d\n", (unsigned long)pair.first, pair.second);
dprintf(out, "Atom %d->(total count)%d, (error count)%d\n", pair.first, pair.second,
getPushedAtomErrors(pair.first));
}
dprintf(out, "********Pulled Atom stats***********\n");
@@ -737,13 +765,15 @@ void StatsdStats::dumpStats(int out) const {
"nanos)%lld, "
" (max pull delay nanos)%lld, (data error)%ld\n"
" (pull timeout)%ld, (pull exceed max delay)%ld\n"
" (registered count) %ld, (unregistered count) %ld\n",
" (registered count) %ld, (unregistered count) %ld\n"
" (atom error count) %d\n",
(int)pair.first, (long)pair.second.totalPull, (long)pair.second.totalPullFromCache,
(long)pair.second.pullFailed, (long)pair.second.minPullIntervalSec,
(long long)pair.second.avgPullTimeNs, (long long)pair.second.maxPullTimeNs,
(long long)pair.second.avgPullDelayNs, (long long)pair.second.maxPullDelayNs,
pair.second.dataError, pair.second.pullTimeout, pair.second.pullExceedMaxDelay,
pair.second.registeredCount, pair.second.unregisteredCount);
pair.second.registeredCount, pair.second.unregisteredCount,
pair.second.atomErrorCount);
}
if (mAnomalyAlarmRegisteredStats > 0) {
@@ -919,6 +949,10 @@ void StatsdStats::dumpStats(std::vector<uint8_t>* output, bool reset) {
proto.start(FIELD_TYPE_MESSAGE | FIELD_ID_ATOM_STATS | FIELD_COUNT_REPEATED);
proto.write(FIELD_TYPE_INT32 | FIELD_ID_ATOM_STATS_TAG, (int32_t)i);
proto.write(FIELD_TYPE_INT32 | FIELD_ID_ATOM_STATS_COUNT, mPushedAtomStats[i]);
int errors = getPushedAtomErrors(i);
if (errors > 0) {
proto.write(FIELD_TYPE_INT32 | FIELD_ID_ATOM_STATS_ERROR_COUNT, errors);
}
proto.end(token);
}
}
@@ -928,6 +962,10 @@ void StatsdStats::dumpStats(std::vector<uint8_t>* output, bool reset) {
proto.start(FIELD_TYPE_MESSAGE | FIELD_ID_ATOM_STATS | FIELD_COUNT_REPEATED);
proto.write(FIELD_TYPE_INT32 | FIELD_ID_ATOM_STATS_TAG, pair.first);
proto.write(FIELD_TYPE_INT32 | FIELD_ID_ATOM_STATS_COUNT, pair.second);
int errors = getPushedAtomErrors(pair.first);
if (errors > 0) {
proto.write(FIELD_TYPE_INT32 | FIELD_ID_ATOM_STATS_ERROR_COUNT, errors);
}
proto.end(token);
}

View File

@@ -461,6 +461,16 @@ public:
*/
void noteActivationBroadcastGuardrailHit(const int uid);
/**
* Reports that an atom is erroneous or cannot be parsed successfully by
* statsd. An atom tag of 0 indicates that the client did not supply the
* atom id within the encoding.
*
* For pushed atoms only, this call should be preceded by a call to
* noteAtomLogged.
*/
void noteAtomError(int atomTag, bool pull=false);
/**
* Reset the historical stats. Including all stats in icebox, and the tracked stats about
* metrics, matchers, and atoms. The active configs will be kept and StatsdStats will continue
@@ -499,6 +509,7 @@ public:
long emptyData = 0;
long registeredCount = 0;
long unregisteredCount = 0;
int32_t atomErrorCount = 0;
} PulledAtomStats;
typedef struct {
@@ -546,6 +557,12 @@ private:
// Maps PullAtomId to its stats. The size is capped by the puller atom counts.
std::map<int, PulledAtomStats> mPulledAtomStats;
// Stores the number of times a pushed atom was logged erroneously. The
// corresponding counts for pulled atoms are stored in PulledAtomStats.
// The max size of this map is kMaxAtomErrorsStatsSize.
std::map<int, int> mPushedAtomErrorStats;
int kMaxPushedAtomErrorStatsSize = 100;
// Maps metric ID to its stats. The size is capped by the number of metrics.
std::map<int64_t, AtomMetricStats> mAtomMetricStats;
@@ -613,6 +630,8 @@ private:
void addToIceBoxLocked(std::shared_ptr<ConfigStats>& stats);
int getPushedAtomErrors(int atomId) const;
/**
* Get a reference to AtomMetricStats for a metric. If none exists, create it. The reference
* will live as long as `this`.
@@ -631,6 +650,7 @@ private:
FRIEND_TEST(StatsdStatsTest, TestPullAtomStats);
FRIEND_TEST(StatsdStatsTest, TestAtomMetricsStats);
FRIEND_TEST(StatsdStatsTest, TestActivationBroadcastGuardrailHit);
FRIEND_TEST(StatsdStatsTest, TestAtomErrorStats);
};
} // namespace statsd

View File

@@ -377,7 +377,6 @@ bool LogEvent::parseBuffer(uint8_t* buf, size_t len) {
typeInfo = readNextValue<uint8_t>();
uint8_t typeId = getTypeId(typeInfo);
// TODO(b/144373276): handle errors passed to the socket
switch (typeId) {
case BOOL_TYPE:
parseBool(pos, /*depth=*/0, last, getNumAnnotations(typeInfo));
@@ -404,8 +403,13 @@ bool LogEvent::parseBuffer(uint8_t* buf, size_t len) {
parseAttributionChain(pos, /*depth=*/0, last, getNumAnnotations(typeInfo));
if (mAttributionChainIndex == -1) mAttributionChainIndex = pos[0];
break;
case ERROR_TYPE:
mErrorBitmask = readNextValue<int32_t>();
mValid = false;
break;
default:
mValid = false;
break;
}
}

View File

@@ -210,6 +210,14 @@ public:
return BAD_INDEX;
}
bool isValid() const {
return mValid;
}
int32_t getErrorBitmask() const {
return mErrorBitmask;
}
private:
/**
* Only use this if copy is absolutely needed.
@@ -236,12 +244,13 @@ private:
bool checkPreviousValueType(Type expected);
/**
* The below three variables are only valid during the execution of
* The below two variables are only valid during the execution of
* parseBuffer. There are no guarantees about the state of these variables
* before/after.
*/
uint8_t* mBuf;
uint32_t mRemainingLen; // number of valid bytes left in the buffer being parsed
bool mValid = true; // stores whether the event we received from the socket is valid
/**
@@ -299,8 +308,9 @@ private:
// The elapsed timestamp set by statsd log writer.
int64_t mElapsedTimestampNs;
// The atom tag of the event.
int mTagId;
// The atom tag of the event (defaults to 0 if client does not
// appropriately set the atom id).
int mTagId = 0;
// The uid of the logging client (defaults to -1).
int32_t mLogUid = -1;
@@ -308,6 +318,9 @@ private:
// The pid of the logging client (defaults to -1).
int32_t mLogPid = -1;
// Bitmask of errors sent by StatsEvent/AStatsEvent.
int32_t mErrorBitmask = 0;
// Annotations
bool mTruncateTimestamp = false;
int mUidFieldIndex = -1;

View File

@@ -425,6 +425,7 @@ message StatsdStatsReport {
message AtomStats {
optional int32 tag = 1;
optional int32 count = 2;
optional int32 error_count = 3;
}
repeated AtomStats atom_stats = 7;
@@ -460,6 +461,7 @@ message StatsdStatsReport {
optional int64 empty_data = 15;
optional int64 registered_count = 16;
optional int64 unregistered_count = 17;
optional int32 atom_error_count = 18;
}
repeated PulledAtomStats pulled_atom_stats = 10;

View File

@@ -80,6 +80,8 @@ const int FIELD_ID_STATS_COMPANION_BINDER_TRANSACTION_FAILED = 14;
const int FIELD_ID_EMPTY_DATA = 15;
const int FIELD_ID_PULL_REGISTERED_COUNT = 16;
const int FIELD_ID_PULL_UNREGISTERED_COUNT = 17;
const int FIELD_ID_ATOM_ERROR_COUNT = 18;
// for AtomMetricStats proto
const int FIELD_ID_ATOM_METRIC_STATS = 17;
const int FIELD_ID_METRIC_ID = 1;
@@ -492,6 +494,7 @@ void writePullerStatsToStream(const std::pair<int, StatsdStats::PulledAtomStats>
(long long) pair.second.registeredCount);
protoOutput->write(FIELD_TYPE_INT64 | FIELD_ID_PULL_UNREGISTERED_COUNT,
(long long) pair.second.unregisteredCount);
protoOutput->write(FIELD_TYPE_INT32 | FIELD_ID_ATOM_ERROR_COUNT, pair.second.atomErrorCount);
protoOutput->end(token);
}

View File

@@ -486,6 +486,41 @@ TEST(StatsdStatsTest, TestActivationBroadcastGuardrailHit) {
EXPECT_TRUE(uid2Good);
}
TEST(StatsdStatsTest, TestAtomErrorStats) {
StatsdStats stats;
int pushAtomTag = 100;
int pullAtomTag = 1000;
int numErrors = 10;
for (int i = 0; i < numErrors; i++) {
// We must call noteAtomLogged as well because only those pushed atoms
// that have been logged will have stats printed about them in the
// proto.
stats.noteAtomLogged(pushAtomTag, /*timeSec=*/0);
stats.noteAtomError(pushAtomTag, /*pull=*/false);
stats.noteAtomError(pullAtomTag, /*pull=*/true);
}
vector<uint8_t> output;
stats.dumpStats(&output, false);
StatsdStatsReport report;
EXPECT_TRUE(report.ParseFromArray(&output[0], output.size()));
// Check error count = numErrors for push atom
EXPECT_EQ(1, report.atom_stats_size());
const auto& pushedAtomStats = report.atom_stats(0);
EXPECT_EQ(pushAtomTag, pushedAtomStats.tag());
EXPECT_EQ(numErrors, pushedAtomStats.error_count());
// Check error count = numErrors for pull atom
EXPECT_EQ(1, report.pulled_atom_stats_size());
const auto& pulledAtomStats = report.pulled_atom_stats(0);
EXPECT_EQ(pullAtomTag, pulledAtomStats.atom_id());
EXPECT_EQ(numErrors, pulledAtomStats.atom_error_count());
}
} // namespace statsd
} // namespace os
} // namespace android