Merge "Handle logd reconnect." into pi-dev
This commit is contained in:
committed by
Android (Google) Code Review
commit
2e0f45f087
@@ -79,7 +79,8 @@ StatsLogProcessor::StatsLogProcessor(const sp<UidMap>& uidMap,
|
||||
mPeriodicAlarmMonitor(periodicAlarmMonitor),
|
||||
mSendBroadcast(sendBroadcast),
|
||||
mTimeBaseNs(timeBaseNs),
|
||||
mLastLogTimestamp(0) {
|
||||
mLargestTimestampSeen(0),
|
||||
mLastTimestampSeen(0) {
|
||||
}
|
||||
|
||||
StatsLogProcessor::~StatsLogProcessor() {
|
||||
@@ -156,18 +157,54 @@ void StatsLogProcessor::onIsolatedUidChangedEventLocked(const LogEvent& event) {
|
||||
}
|
||||
|
||||
void StatsLogProcessor::OnLogEvent(LogEvent* event) {
|
||||
OnLogEvent(event, false);
|
||||
}
|
||||
|
||||
void StatsLogProcessor::OnLogEvent(LogEvent* event, bool reconnected) {
|
||||
std::lock_guard<std::mutex> lock(mMetricsMutex);
|
||||
const int64_t currentTimestampNs = event->GetElapsedTimestampNs();
|
||||
|
||||
if (currentTimestampNs < mLastLogTimestamp) {
|
||||
StatsdStats::getInstance().noteLogEventSkipped(
|
||||
event->GetTagId(), event->GetElapsedTimestampNs());
|
||||
return;
|
||||
if (reconnected && mLastTimestampSeen != 0) {
|
||||
// LogReader tells us the connection has just been reset. Now we need
|
||||
// to enter reconnection state to find the last CP.
|
||||
mInReconnection = true;
|
||||
}
|
||||
|
||||
if (mInReconnection) {
|
||||
// We see the checkpoint
|
||||
if (currentTimestampNs == mLastTimestampSeen) {
|
||||
mInReconnection = false;
|
||||
// Found the CP. ignore this event, and we will start to read from next event.
|
||||
return;
|
||||
}
|
||||
if (currentTimestampNs > mLargestTimestampSeen) {
|
||||
// We see a new log but CP has not been found yet. Give up now.
|
||||
mLogLossCount++;
|
||||
mInReconnection = false;
|
||||
StatsdStats::getInstance().noteLogLost(currentTimestampNs);
|
||||
// Persist the data before we reset. Do we want this?
|
||||
WriteDataToDiskLocked();
|
||||
// We see fresher event before we see the checkpoint. We might have lost data.
|
||||
// The best we can do is to reset.
|
||||
std::vector<ConfigKey> configKeys;
|
||||
for (auto it = mMetricsManagers.begin(); it != mMetricsManagers.end(); it++) {
|
||||
configKeys.push_back(it->first);
|
||||
}
|
||||
resetConfigsLocked(currentTimestampNs, configKeys);
|
||||
} else {
|
||||
// Still in search of the CP. Keep going.
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
mLogCount++;
|
||||
mLastTimestampSeen = currentTimestampNs;
|
||||
if (mLargestTimestampSeen < currentTimestampNs) {
|
||||
mLargestTimestampSeen = currentTimestampNs;
|
||||
}
|
||||
|
||||
resetIfConfigTtlExpiredLocked(currentTimestampNs);
|
||||
|
||||
mLastLogTimestamp = currentTimestampNs;
|
||||
StatsdStats::getInstance().noteAtomLogged(
|
||||
event->GetTagId(), event->GetElapsedTimestampNs() / NS_PER_SEC);
|
||||
|
||||
@@ -339,15 +376,9 @@ void StatsLogProcessor::onConfigMetricsReportLocked(const ConfigKey& key,
|
||||
(long long)getWallClockNs());
|
||||
}
|
||||
|
||||
void StatsLogProcessor::resetIfConfigTtlExpiredLocked(const int64_t timestampNs) {
|
||||
std::vector<ConfigKey> configKeysTtlExpired;
|
||||
for (auto it = mMetricsManagers.begin(); it != mMetricsManagers.end(); it++) {
|
||||
if (it->second != nullptr && !it->second->isInTtl(timestampNs)) {
|
||||
configKeysTtlExpired.push_back(it->first);
|
||||
}
|
||||
}
|
||||
|
||||
for (const auto& key : configKeysTtlExpired) {
|
||||
void StatsLogProcessor::resetConfigsLocked(const int64_t timestampNs,
|
||||
const std::vector<ConfigKey>& configs) {
|
||||
for (const auto& key : configs) {
|
||||
StatsdConfig config;
|
||||
if (StorageManager::readConfigFromDisk(key, &config)) {
|
||||
OnConfigUpdatedLocked(timestampNs, key, config);
|
||||
@@ -362,6 +393,18 @@ void StatsLogProcessor::resetIfConfigTtlExpiredLocked(const int64_t timestampNs)
|
||||
}
|
||||
}
|
||||
|
||||
void StatsLogProcessor::resetIfConfigTtlExpiredLocked(const int64_t timestampNs) {
|
||||
std::vector<ConfigKey> configKeysTtlExpired;
|
||||
for (auto it = mMetricsManagers.begin(); it != mMetricsManagers.end(); it++) {
|
||||
if (it->second != nullptr && !it->second->isInTtl(timestampNs)) {
|
||||
configKeysTtlExpired.push_back(it->first);
|
||||
}
|
||||
}
|
||||
if (configKeysTtlExpired.size() > 0) {
|
||||
resetConfigsLocked(timestampNs, configKeysTtlExpired);
|
||||
}
|
||||
}
|
||||
|
||||
void StatsLogProcessor::OnConfigRemoved(const ConfigKey& key) {
|
||||
std::lock_guard<std::mutex> lock(mMetricsMutex);
|
||||
auto it = mMetricsManagers.find(key);
|
||||
|
||||
@@ -40,6 +40,9 @@ public:
|
||||
const std::function<void(const ConfigKey&)>& sendBroadcast);
|
||||
virtual ~StatsLogProcessor();
|
||||
|
||||
void OnLogEvent(LogEvent* event, bool reconnectionStarts);
|
||||
|
||||
// for testing only.
|
||||
void OnLogEvent(LogEvent* event);
|
||||
|
||||
void OnConfigUpdated(const int64_t timestampNs, const ConfigKey& key,
|
||||
@@ -122,16 +125,30 @@ private:
|
||||
// Handler over the isolated uid change event.
|
||||
void onIsolatedUidChangedEventLocked(const LogEvent& event);
|
||||
|
||||
void resetConfigsLocked(const int64_t timestampNs, const std::vector<ConfigKey>& configs);
|
||||
|
||||
// Function used to send a broadcast so that receiver for the config key can call getData
|
||||
// to retrieve the stored data.
|
||||
std::function<void(const ConfigKey& key)> mSendBroadcast;
|
||||
|
||||
const int64_t mTimeBaseNs;
|
||||
|
||||
int64_t mLastLogTimestamp;
|
||||
// Largest timestamp of the events that we have processed.
|
||||
int64_t mLargestTimestampSeen = 0;
|
||||
|
||||
int64_t mLastTimestampSeen = 0;
|
||||
|
||||
bool mInReconnection = false;
|
||||
|
||||
// Processed log count
|
||||
uint64_t mLogCount = 0;
|
||||
|
||||
// Log loss detected count
|
||||
int mLogLossCount = 0;
|
||||
|
||||
long mLastPullerCacheClearTimeSec = 0;
|
||||
|
||||
FRIEND_TEST(StatsLogProcessorTest, TestOutOfOrderLogs);
|
||||
FRIEND_TEST(StatsLogProcessorTest, TestRateLimitByteSize);
|
||||
FRIEND_TEST(StatsLogProcessorTest, TestRateLimitBroadcast);
|
||||
FRIEND_TEST(StatsLogProcessorTest, TestDropWhenByteSizeTooLarge);
|
||||
|
||||
@@ -818,8 +818,8 @@ void StatsService::Startup() {
|
||||
mConfigManager->Startup();
|
||||
}
|
||||
|
||||
void StatsService::OnLogEvent(LogEvent* event) {
|
||||
mProcessor->OnLogEvent(event);
|
||||
void StatsService::OnLogEvent(LogEvent* event, bool reconnectionStarts) {
|
||||
mProcessor->OnLogEvent(event, reconnectionStarts);
|
||||
}
|
||||
|
||||
Status StatsService::getData(int64_t key, vector<uint8_t>* output) {
|
||||
|
||||
@@ -76,7 +76,7 @@ public:
|
||||
/**
|
||||
* Called by LogReader when there's a log event to process.
|
||||
*/
|
||||
virtual void OnLogEvent(LogEvent* event);
|
||||
virtual void OnLogEvent(LogEvent* event, bool reconnectionStarts);
|
||||
|
||||
/**
|
||||
* Binder call for clients to request data for this configuration key.
|
||||
|
||||
@@ -50,7 +50,7 @@ const int FIELD_ID_ANOMALY_ALARM_STATS = 9;
|
||||
// const int FIELD_ID_PULLED_ATOM_STATS = 10; // The proto is written in stats_log_util.cpp
|
||||
const int FIELD_ID_LOGGER_ERROR_STATS = 11;
|
||||
const int FIELD_ID_PERIODIC_ALARM_STATS = 12;
|
||||
const int FIELD_ID_SKIPPED_LOG_EVENT_STATS = 13;
|
||||
const int FIELD_ID_LOG_LOSS_STATS = 14;
|
||||
|
||||
const int FIELD_ID_ATOM_STATS_TAG = 1;
|
||||
const int FIELD_ID_ATOM_STATS_COUNT = 2;
|
||||
@@ -61,9 +61,6 @@ const int FIELD_ID_PERIODIC_ALARMS_REGISTERED = 1;
|
||||
const int FIELD_ID_LOGGER_STATS_TIME = 1;
|
||||
const int FIELD_ID_LOGGER_STATS_ERROR_CODE = 2;
|
||||
|
||||
const int FIELD_ID_SKIPPED_LOG_EVENT_STATS_TAG = 1;
|
||||
const int FIELD_ID_SKIPPED_LOG_EVENT_STATS_TIMESTAMP = 2;
|
||||
|
||||
const int FIELD_ID_CONFIG_STATS_UID = 1;
|
||||
const int FIELD_ID_CONFIG_STATS_ID = 2;
|
||||
const int FIELD_ID_CONFIG_STATS_CREATION = 3;
|
||||
@@ -182,6 +179,14 @@ void StatsdStats::noteConfigReset(const ConfigKey& key) {
|
||||
noteConfigResetInternalLocked(key);
|
||||
}
|
||||
|
||||
void StatsdStats::noteLogLost(int64_t timestampNs) {
|
||||
lock_guard<std::mutex> lock(mLock);
|
||||
if (mLogLossTimestampNs.size() == kMaxLoggerErrors) {
|
||||
mLogLossTimestampNs.pop_front();
|
||||
}
|
||||
mLogLossTimestampNs.push_back(timestampNs);
|
||||
}
|
||||
|
||||
void StatsdStats::noteBroadcastSent(const ConfigKey& key) {
|
||||
noteBroadcastSent(key, getWallClockSec());
|
||||
}
|
||||
@@ -350,15 +355,6 @@ void StatsdStats::noteAtomLogged(int atomId, int32_t timeSec) {
|
||||
mPushedAtomStats[atomId]++;
|
||||
}
|
||||
|
||||
void StatsdStats::noteLogEventSkipped(int tag, int64_t timestamp) {
|
||||
lock_guard<std::mutex> lock(mLock);
|
||||
// grows strictly one at a time. so it won't > kMaxSkippedLogEvents
|
||||
if (mSkippedLogEvents.size() == kMaxSkippedLogEvents) {
|
||||
mSkippedLogEvents.pop_front();
|
||||
}
|
||||
mSkippedLogEvents.push_back(std::make_pair(tag, timestamp));
|
||||
}
|
||||
|
||||
void StatsdStats::noteLoggerError(int error) {
|
||||
lock_guard<std::mutex> lock(mLock);
|
||||
// grows strictly one at a time. so it won't > kMaxLoggerErrors
|
||||
@@ -381,7 +377,7 @@ void StatsdStats::resetInternalLocked() {
|
||||
mAnomalyAlarmRegisteredStats = 0;
|
||||
mPeriodicAlarmRegisteredStats = 0;
|
||||
mLoggerErrors.clear();
|
||||
mSkippedLogEvents.clear();
|
||||
mLogLossTimestampNs.clear();
|
||||
for (auto& config : mConfigStats) {
|
||||
config.second->broadcast_sent_time_sec.clear();
|
||||
config.second->data_drop_time_sec.clear();
|
||||
@@ -515,8 +511,8 @@ void StatsdStats::dumpStats(FILE* out) const {
|
||||
strftime(buffer, sizeof(buffer), "%Y-%m-%d %I:%M%p\n", error_tm);
|
||||
fprintf(out, "Logger error %d at %s\n", error.second, buffer);
|
||||
}
|
||||
for (const auto& skipped : mSkippedLogEvents) {
|
||||
fprintf(out, "Log event (%d) skipped at %lld\n", skipped.first, (long long)skipped.second);
|
||||
for (const auto& loss : mLogLossTimestampNs) {
|
||||
fprintf(out, "Log loss detected at %lld (elapsedRealtimeNs)\n", (long long)loss);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -672,13 +668,9 @@ void StatsdStats::dumpStats(std::vector<uint8_t>* output, bool reset) {
|
||||
proto.end(token);
|
||||
}
|
||||
|
||||
for (const auto& skipped : mSkippedLogEvents) {
|
||||
uint64_t token = proto.start(FIELD_TYPE_MESSAGE | FIELD_ID_SKIPPED_LOG_EVENT_STATS |
|
||||
FIELD_COUNT_REPEATED);
|
||||
proto.write(FIELD_TYPE_INT32 | FIELD_ID_SKIPPED_LOG_EVENT_STATS_TAG, skipped.first);
|
||||
proto.write(FIELD_TYPE_INT64 | FIELD_ID_SKIPPED_LOG_EVENT_STATS_TIMESTAMP,
|
||||
(long long)skipped.second);
|
||||
proto.end(token);
|
||||
for (const auto& loss : mLogLossTimestampNs) {
|
||||
proto.write(FIELD_TYPE_INT64 | FIELD_ID_LOG_LOSS_STATS | FIELD_COUNT_REPEATED,
|
||||
(long long)loss);
|
||||
}
|
||||
|
||||
output->clear();
|
||||
|
||||
@@ -102,9 +102,7 @@ public:
|
||||
// The max number of old config stats we keep.
|
||||
const static int kMaxIceBoxSize = 20;
|
||||
|
||||
const static int kMaxLoggerErrors = 10;
|
||||
|
||||
const static int kMaxSkippedLogEvents = 200;
|
||||
const static int kMaxLoggerErrors = 20;
|
||||
|
||||
const static int kMaxTimestampCount = 20;
|
||||
|
||||
@@ -280,7 +278,7 @@ public:
|
||||
/**
|
||||
* Records statsd skipped an event.
|
||||
*/
|
||||
void noteLogEventSkipped(int tag, int64_t timestamp);
|
||||
void noteLogLost(int64_t timestamp);
|
||||
|
||||
/**
|
||||
* Reset the historical stats. Including all stats in icebox, and the tracked stats about
|
||||
@@ -337,8 +335,8 @@ private:
|
||||
// Logd errors. Size capped by kMaxLoggerErrors.
|
||||
std::list<const std::pair<int, int>> mLoggerErrors;
|
||||
|
||||
// Skipped log events.
|
||||
std::list<const std::pair<int, int64_t>> mSkippedLogEvents;
|
||||
// Timestamps when we detect log loss after logd reconnect.
|
||||
std::list<int64_t> mLogLossTimestampNs;
|
||||
|
||||
// Stores the number of times statsd modified the anomaly alarm registered with
|
||||
// StatsCompanionService.
|
||||
|
||||
@@ -33,7 +33,7 @@ public:
|
||||
LogListener();
|
||||
virtual ~LogListener();
|
||||
|
||||
virtual void OnLogEvent(LogEvent* msg) = 0;
|
||||
virtual void OnLogEvent(LogEvent* msg, bool reconnectionStarts) = 0;
|
||||
};
|
||||
|
||||
} // namespace statsd
|
||||
|
||||
@@ -113,7 +113,8 @@ int LogReader::connect_and_read() {
|
||||
LogEvent event(msg);
|
||||
|
||||
// Call the listener
|
||||
mListener->OnLogEvent(&event);
|
||||
mListener->OnLogEvent(&event,
|
||||
lineCount == 1 /* indicate whether it's a new connection */);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -305,4 +305,6 @@ message StatsdStatsReport {
|
||||
optional int64 elapsed_timestamp_nanos = 2;
|
||||
}
|
||||
repeated SkippedLogEventStats skipped_log_event_stats = 13;
|
||||
|
||||
repeated int64 log_loss_stats = 14;
|
||||
}
|
||||
|
||||
@@ -178,6 +178,128 @@ TEST(StatsLogProcessorTest, TestReportIncludesSubConfig) {
|
||||
EXPECT_EQ(2, report.annotation(0).field_int32());
|
||||
}
|
||||
|
||||
TEST(StatsLogProcessorTest, TestOutOfOrderLogs) {
|
||||
// Setup simple config key corresponding to empty config.
|
||||
sp<UidMap> m = new UidMap();
|
||||
sp<AlarmMonitor> anomalyAlarmMonitor;
|
||||
sp<AlarmMonitor> subscriberAlarmMonitor;
|
||||
int broadcastCount = 0;
|
||||
StatsLogProcessor p(m, anomalyAlarmMonitor, subscriberAlarmMonitor, 0,
|
||||
[&broadcastCount](const ConfigKey& key) { broadcastCount++; });
|
||||
|
||||
LogEvent event1(0, 1 /*logd timestamp*/, 1001 /*elapsedRealtime*/);
|
||||
event1.init();
|
||||
|
||||
LogEvent event2(0, 2, 1002);
|
||||
event2.init();
|
||||
|
||||
LogEvent event3(0, 3, 1005);
|
||||
event3.init();
|
||||
|
||||
LogEvent event4(0, 4, 1004);
|
||||
event4.init();
|
||||
|
||||
// <----- Reconnection happens
|
||||
|
||||
LogEvent event5(0, 5, 999);
|
||||
event5.init();
|
||||
|
||||
LogEvent event6(0, 6, 2000);
|
||||
event6.init();
|
||||
|
||||
// <----- Reconnection happens
|
||||
|
||||
LogEvent event7(0, 7, 3000);
|
||||
event7.init();
|
||||
|
||||
// first event ever
|
||||
p.OnLogEvent(&event1, true);
|
||||
EXPECT_EQ(1UL, p.mLogCount);
|
||||
EXPECT_EQ(1001LL, p.mLargestTimestampSeen);
|
||||
EXPECT_EQ(1001LL, p.mLastTimestampSeen);
|
||||
|
||||
p.OnLogEvent(&event2, false);
|
||||
EXPECT_EQ(2UL, p.mLogCount);
|
||||
EXPECT_EQ(1002LL, p.mLargestTimestampSeen);
|
||||
EXPECT_EQ(1002LL, p.mLastTimestampSeen);
|
||||
|
||||
p.OnLogEvent(&event3, false);
|
||||
EXPECT_EQ(3UL, p.mLogCount);
|
||||
EXPECT_EQ(1005LL, p.mLargestTimestampSeen);
|
||||
EXPECT_EQ(1005LL, p.mLastTimestampSeen);
|
||||
|
||||
p.OnLogEvent(&event4, false);
|
||||
EXPECT_EQ(4UL, p.mLogCount);
|
||||
EXPECT_EQ(1005LL, p.mLargestTimestampSeen);
|
||||
EXPECT_EQ(1004LL, p.mLastTimestampSeen);
|
||||
EXPECT_FALSE(p.mInReconnection);
|
||||
|
||||
// Reconnect happens, event1 out of buffer. Read event2
|
||||
p.OnLogEvent(&event2, true);
|
||||
EXPECT_EQ(4UL, p.mLogCount);
|
||||
EXPECT_EQ(1005LL, p.mLargestTimestampSeen);
|
||||
EXPECT_EQ(1004LL, p.mLastTimestampSeen);
|
||||
EXPECT_TRUE(p.mInReconnection);
|
||||
|
||||
p.OnLogEvent(&event3, false);
|
||||
EXPECT_EQ(4UL, p.mLogCount);
|
||||
EXPECT_EQ(1005LL, p.mLargestTimestampSeen);
|
||||
EXPECT_EQ(1004LL, p.mLastTimestampSeen);
|
||||
EXPECT_TRUE(p.mInReconnection);
|
||||
|
||||
p.OnLogEvent(&event4, false);
|
||||
EXPECT_EQ(4UL, p.mLogCount);
|
||||
EXPECT_EQ(1005LL, p.mLargestTimestampSeen);
|
||||
EXPECT_EQ(1004LL, p.mLastTimestampSeen);
|
||||
EXPECT_FALSE(p.mInReconnection);
|
||||
|
||||
// Fresh event comes.
|
||||
p.OnLogEvent(&event5, false);
|
||||
EXPECT_EQ(5UL, p.mLogCount);
|
||||
EXPECT_EQ(1005LL, p.mLargestTimestampSeen);
|
||||
EXPECT_EQ(999LL, p.mLastTimestampSeen);
|
||||
|
||||
p.OnLogEvent(&event6, false);
|
||||
EXPECT_EQ(6UL, p.mLogCount);
|
||||
EXPECT_EQ(2000LL, p.mLargestTimestampSeen);
|
||||
EXPECT_EQ(2000LL, p.mLastTimestampSeen);
|
||||
|
||||
// Reconnect happens, read from event4
|
||||
p.OnLogEvent(&event4, true);
|
||||
EXPECT_EQ(6UL, p.mLogCount);
|
||||
EXPECT_EQ(2000LL, p.mLargestTimestampSeen);
|
||||
EXPECT_EQ(2000LL, p.mLastTimestampSeen);
|
||||
EXPECT_TRUE(p.mInReconnection);
|
||||
|
||||
p.OnLogEvent(&event5, false);
|
||||
EXPECT_EQ(6UL, p.mLogCount);
|
||||
EXPECT_EQ(2000LL, p.mLargestTimestampSeen);
|
||||
EXPECT_EQ(2000LL, p.mLastTimestampSeen);
|
||||
EXPECT_TRUE(p.mInReconnection);
|
||||
|
||||
// Before we get out of reconnection state, it reconnects again.
|
||||
p.OnLogEvent(&event5, true);
|
||||
EXPECT_EQ(6UL, p.mLogCount);
|
||||
EXPECT_EQ(2000LL, p.mLargestTimestampSeen);
|
||||
EXPECT_EQ(2000LL, p.mLastTimestampSeen);
|
||||
EXPECT_TRUE(p.mInReconnection);
|
||||
|
||||
p.OnLogEvent(&event6, false);
|
||||
EXPECT_EQ(6UL, p.mLogCount);
|
||||
EXPECT_EQ(2000LL, p.mLargestTimestampSeen);
|
||||
EXPECT_EQ(2000LL, p.mLastTimestampSeen);
|
||||
EXPECT_FALSE(p.mInReconnection);
|
||||
EXPECT_EQ(0, p.mLogLossCount);
|
||||
|
||||
// it reconnects again. All old events are gone. We lose CP.
|
||||
p.OnLogEvent(&event7, true);
|
||||
EXPECT_EQ(7UL, p.mLogCount);
|
||||
EXPECT_EQ(3000LL, p.mLargestTimestampSeen);
|
||||
EXPECT_EQ(3000LL, p.mLastTimestampSeen);
|
||||
EXPECT_EQ(1, p.mLogLossCount);
|
||||
EXPECT_FALSE(p.mInReconnection);
|
||||
}
|
||||
|
||||
#else
|
||||
GTEST_LOG_(INFO) << "This test does nothing.\n";
|
||||
#endif
|
||||
|
||||
Reference in New Issue
Block a user