Merge "Handle logd reconnect." into pi-dev

This commit is contained in:
TreeHugger Robot
2018-04-11 16:45:11 +00:00
committed by Android (Google) Code Review
10 changed files with 225 additions and 50 deletions

View File

@@ -79,7 +79,8 @@ StatsLogProcessor::StatsLogProcessor(const sp<UidMap>& uidMap,
mPeriodicAlarmMonitor(periodicAlarmMonitor),
mSendBroadcast(sendBroadcast),
mTimeBaseNs(timeBaseNs),
mLastLogTimestamp(0) {
mLargestTimestampSeen(0),
mLastTimestampSeen(0) {
}
StatsLogProcessor::~StatsLogProcessor() {
@@ -156,18 +157,54 @@ void StatsLogProcessor::onIsolatedUidChangedEventLocked(const LogEvent& event) {
}
void StatsLogProcessor::OnLogEvent(LogEvent* event) {
OnLogEvent(event, false);
}
void StatsLogProcessor::OnLogEvent(LogEvent* event, bool reconnected) {
std::lock_guard<std::mutex> lock(mMetricsMutex);
const int64_t currentTimestampNs = event->GetElapsedTimestampNs();
if (currentTimestampNs < mLastLogTimestamp) {
StatsdStats::getInstance().noteLogEventSkipped(
event->GetTagId(), event->GetElapsedTimestampNs());
return;
if (reconnected && mLastTimestampSeen != 0) {
// LogReader tells us the connection has just been reset. Now we need
// to enter reconnection state to find the last CP.
mInReconnection = true;
}
if (mInReconnection) {
// We see the checkpoint
if (currentTimestampNs == mLastTimestampSeen) {
mInReconnection = false;
// Found the CP. ignore this event, and we will start to read from next event.
return;
}
if (currentTimestampNs > mLargestTimestampSeen) {
// We see a new log but CP has not been found yet. Give up now.
mLogLossCount++;
mInReconnection = false;
StatsdStats::getInstance().noteLogLost(currentTimestampNs);
// Persist the data before we reset. Do we want this?
WriteDataToDiskLocked();
// We see fresher event before we see the checkpoint. We might have lost data.
// The best we can do is to reset.
std::vector<ConfigKey> configKeys;
for (auto it = mMetricsManagers.begin(); it != mMetricsManagers.end(); it++) {
configKeys.push_back(it->first);
}
resetConfigsLocked(currentTimestampNs, configKeys);
} else {
// Still in search of the CP. Keep going.
return;
}
}
mLogCount++;
mLastTimestampSeen = currentTimestampNs;
if (mLargestTimestampSeen < currentTimestampNs) {
mLargestTimestampSeen = currentTimestampNs;
}
resetIfConfigTtlExpiredLocked(currentTimestampNs);
mLastLogTimestamp = currentTimestampNs;
StatsdStats::getInstance().noteAtomLogged(
event->GetTagId(), event->GetElapsedTimestampNs() / NS_PER_SEC);
@@ -339,15 +376,9 @@ void StatsLogProcessor::onConfigMetricsReportLocked(const ConfigKey& key,
(long long)getWallClockNs());
}
void StatsLogProcessor::resetIfConfigTtlExpiredLocked(const int64_t timestampNs) {
std::vector<ConfigKey> configKeysTtlExpired;
for (auto it = mMetricsManagers.begin(); it != mMetricsManagers.end(); it++) {
if (it->second != nullptr && !it->second->isInTtl(timestampNs)) {
configKeysTtlExpired.push_back(it->first);
}
}
for (const auto& key : configKeysTtlExpired) {
void StatsLogProcessor::resetConfigsLocked(const int64_t timestampNs,
const std::vector<ConfigKey>& configs) {
for (const auto& key : configs) {
StatsdConfig config;
if (StorageManager::readConfigFromDisk(key, &config)) {
OnConfigUpdatedLocked(timestampNs, key, config);
@@ -362,6 +393,18 @@ void StatsLogProcessor::resetIfConfigTtlExpiredLocked(const int64_t timestampNs)
}
}
void StatsLogProcessor::resetIfConfigTtlExpiredLocked(const int64_t timestampNs) {
std::vector<ConfigKey> configKeysTtlExpired;
for (auto it = mMetricsManagers.begin(); it != mMetricsManagers.end(); it++) {
if (it->second != nullptr && !it->second->isInTtl(timestampNs)) {
configKeysTtlExpired.push_back(it->first);
}
}
if (configKeysTtlExpired.size() > 0) {
resetConfigsLocked(timestampNs, configKeysTtlExpired);
}
}
void StatsLogProcessor::OnConfigRemoved(const ConfigKey& key) {
std::lock_guard<std::mutex> lock(mMetricsMutex);
auto it = mMetricsManagers.find(key);

View File

@@ -40,6 +40,9 @@ public:
const std::function<void(const ConfigKey&)>& sendBroadcast);
virtual ~StatsLogProcessor();
void OnLogEvent(LogEvent* event, bool reconnectionStarts);
// for testing only.
void OnLogEvent(LogEvent* event);
void OnConfigUpdated(const int64_t timestampNs, const ConfigKey& key,
@@ -122,16 +125,30 @@ private:
// Handler over the isolated uid change event.
void onIsolatedUidChangedEventLocked(const LogEvent& event);
void resetConfigsLocked(const int64_t timestampNs, const std::vector<ConfigKey>& configs);
// Function used to send a broadcast so that receiver for the config key can call getData
// to retrieve the stored data.
std::function<void(const ConfigKey& key)> mSendBroadcast;
const int64_t mTimeBaseNs;
int64_t mLastLogTimestamp;
// Largest timestamp of the events that we have processed.
int64_t mLargestTimestampSeen = 0;
int64_t mLastTimestampSeen = 0;
bool mInReconnection = false;
// Processed log count
uint64_t mLogCount = 0;
// Log loss detected count
int mLogLossCount = 0;
long mLastPullerCacheClearTimeSec = 0;
FRIEND_TEST(StatsLogProcessorTest, TestOutOfOrderLogs);
FRIEND_TEST(StatsLogProcessorTest, TestRateLimitByteSize);
FRIEND_TEST(StatsLogProcessorTest, TestRateLimitBroadcast);
FRIEND_TEST(StatsLogProcessorTest, TestDropWhenByteSizeTooLarge);

View File

@@ -818,8 +818,8 @@ void StatsService::Startup() {
mConfigManager->Startup();
}
void StatsService::OnLogEvent(LogEvent* event) {
mProcessor->OnLogEvent(event);
void StatsService::OnLogEvent(LogEvent* event, bool reconnectionStarts) {
mProcessor->OnLogEvent(event, reconnectionStarts);
}
Status StatsService::getData(int64_t key, vector<uint8_t>* output) {

View File

@@ -76,7 +76,7 @@ public:
/**
* Called by LogReader when there's a log event to process.
*/
virtual void OnLogEvent(LogEvent* event);
virtual void OnLogEvent(LogEvent* event, bool reconnectionStarts);
/**
* Binder call for clients to request data for this configuration key.

View File

@@ -50,7 +50,7 @@ const int FIELD_ID_ANOMALY_ALARM_STATS = 9;
// const int FIELD_ID_PULLED_ATOM_STATS = 10; // The proto is written in stats_log_util.cpp
const int FIELD_ID_LOGGER_ERROR_STATS = 11;
const int FIELD_ID_PERIODIC_ALARM_STATS = 12;
const int FIELD_ID_SKIPPED_LOG_EVENT_STATS = 13;
const int FIELD_ID_LOG_LOSS_STATS = 14;
const int FIELD_ID_ATOM_STATS_TAG = 1;
const int FIELD_ID_ATOM_STATS_COUNT = 2;
@@ -61,9 +61,6 @@ const int FIELD_ID_PERIODIC_ALARMS_REGISTERED = 1;
const int FIELD_ID_LOGGER_STATS_TIME = 1;
const int FIELD_ID_LOGGER_STATS_ERROR_CODE = 2;
const int FIELD_ID_SKIPPED_LOG_EVENT_STATS_TAG = 1;
const int FIELD_ID_SKIPPED_LOG_EVENT_STATS_TIMESTAMP = 2;
const int FIELD_ID_CONFIG_STATS_UID = 1;
const int FIELD_ID_CONFIG_STATS_ID = 2;
const int FIELD_ID_CONFIG_STATS_CREATION = 3;
@@ -182,6 +179,14 @@ void StatsdStats::noteConfigReset(const ConfigKey& key) {
noteConfigResetInternalLocked(key);
}
void StatsdStats::noteLogLost(int64_t timestampNs) {
lock_guard<std::mutex> lock(mLock);
if (mLogLossTimestampNs.size() == kMaxLoggerErrors) {
mLogLossTimestampNs.pop_front();
}
mLogLossTimestampNs.push_back(timestampNs);
}
void StatsdStats::noteBroadcastSent(const ConfigKey& key) {
noteBroadcastSent(key, getWallClockSec());
}
@@ -350,15 +355,6 @@ void StatsdStats::noteAtomLogged(int atomId, int32_t timeSec) {
mPushedAtomStats[atomId]++;
}
void StatsdStats::noteLogEventSkipped(int tag, int64_t timestamp) {
lock_guard<std::mutex> lock(mLock);
// grows strictly one at a time. so it won't > kMaxSkippedLogEvents
if (mSkippedLogEvents.size() == kMaxSkippedLogEvents) {
mSkippedLogEvents.pop_front();
}
mSkippedLogEvents.push_back(std::make_pair(tag, timestamp));
}
void StatsdStats::noteLoggerError(int error) {
lock_guard<std::mutex> lock(mLock);
// grows strictly one at a time. so it won't > kMaxLoggerErrors
@@ -381,7 +377,7 @@ void StatsdStats::resetInternalLocked() {
mAnomalyAlarmRegisteredStats = 0;
mPeriodicAlarmRegisteredStats = 0;
mLoggerErrors.clear();
mSkippedLogEvents.clear();
mLogLossTimestampNs.clear();
for (auto& config : mConfigStats) {
config.second->broadcast_sent_time_sec.clear();
config.second->data_drop_time_sec.clear();
@@ -515,8 +511,8 @@ void StatsdStats::dumpStats(FILE* out) const {
strftime(buffer, sizeof(buffer), "%Y-%m-%d %I:%M%p\n", error_tm);
fprintf(out, "Logger error %d at %s\n", error.second, buffer);
}
for (const auto& skipped : mSkippedLogEvents) {
fprintf(out, "Log event (%d) skipped at %lld\n", skipped.first, (long long)skipped.second);
for (const auto& loss : mLogLossTimestampNs) {
fprintf(out, "Log loss detected at %lld (elapsedRealtimeNs)\n", (long long)loss);
}
}
@@ -672,13 +668,9 @@ void StatsdStats::dumpStats(std::vector<uint8_t>* output, bool reset) {
proto.end(token);
}
for (const auto& skipped : mSkippedLogEvents) {
uint64_t token = proto.start(FIELD_TYPE_MESSAGE | FIELD_ID_SKIPPED_LOG_EVENT_STATS |
FIELD_COUNT_REPEATED);
proto.write(FIELD_TYPE_INT32 | FIELD_ID_SKIPPED_LOG_EVENT_STATS_TAG, skipped.first);
proto.write(FIELD_TYPE_INT64 | FIELD_ID_SKIPPED_LOG_EVENT_STATS_TIMESTAMP,
(long long)skipped.second);
proto.end(token);
for (const auto& loss : mLogLossTimestampNs) {
proto.write(FIELD_TYPE_INT64 | FIELD_ID_LOG_LOSS_STATS | FIELD_COUNT_REPEATED,
(long long)loss);
}
output->clear();

View File

@@ -102,9 +102,7 @@ public:
// The max number of old config stats we keep.
const static int kMaxIceBoxSize = 20;
const static int kMaxLoggerErrors = 10;
const static int kMaxSkippedLogEvents = 200;
const static int kMaxLoggerErrors = 20;
const static int kMaxTimestampCount = 20;
@@ -280,7 +278,7 @@ public:
/**
* Records statsd skipped an event.
*/
void noteLogEventSkipped(int tag, int64_t timestamp);
void noteLogLost(int64_t timestamp);
/**
* Reset the historical stats. Including all stats in icebox, and the tracked stats about
@@ -337,8 +335,8 @@ private:
// Logd errors. Size capped by kMaxLoggerErrors.
std::list<const std::pair<int, int>> mLoggerErrors;
// Skipped log events.
std::list<const std::pair<int, int64_t>> mSkippedLogEvents;
// Timestamps when we detect log loss after logd reconnect.
std::list<int64_t> mLogLossTimestampNs;
// Stores the number of times statsd modified the anomaly alarm registered with
// StatsCompanionService.

View File

@@ -33,7 +33,7 @@ public:
LogListener();
virtual ~LogListener();
virtual void OnLogEvent(LogEvent* msg) = 0;
virtual void OnLogEvent(LogEvent* msg, bool reconnectionStarts) = 0;
};
} // namespace statsd

View File

@@ -113,7 +113,8 @@ int LogReader::connect_and_read() {
LogEvent event(msg);
// Call the listener
mListener->OnLogEvent(&event);
mListener->OnLogEvent(&event,
lineCount == 1 /* indicate whether it's a new connection */);
}
}

View File

@@ -305,4 +305,6 @@ message StatsdStatsReport {
optional int64 elapsed_timestamp_nanos = 2;
}
repeated SkippedLogEventStats skipped_log_event_stats = 13;
repeated int64 log_loss_stats = 14;
}

View File

@@ -178,6 +178,128 @@ TEST(StatsLogProcessorTest, TestReportIncludesSubConfig) {
EXPECT_EQ(2, report.annotation(0).field_int32());
}
TEST(StatsLogProcessorTest, TestOutOfOrderLogs) {
// Setup simple config key corresponding to empty config.
sp<UidMap> m = new UidMap();
sp<AlarmMonitor> anomalyAlarmMonitor;
sp<AlarmMonitor> subscriberAlarmMonitor;
int broadcastCount = 0;
StatsLogProcessor p(m, anomalyAlarmMonitor, subscriberAlarmMonitor, 0,
[&broadcastCount](const ConfigKey& key) { broadcastCount++; });
LogEvent event1(0, 1 /*logd timestamp*/, 1001 /*elapsedRealtime*/);
event1.init();
LogEvent event2(0, 2, 1002);
event2.init();
LogEvent event3(0, 3, 1005);
event3.init();
LogEvent event4(0, 4, 1004);
event4.init();
// <----- Reconnection happens
LogEvent event5(0, 5, 999);
event5.init();
LogEvent event6(0, 6, 2000);
event6.init();
// <----- Reconnection happens
LogEvent event7(0, 7, 3000);
event7.init();
// first event ever
p.OnLogEvent(&event1, true);
EXPECT_EQ(1UL, p.mLogCount);
EXPECT_EQ(1001LL, p.mLargestTimestampSeen);
EXPECT_EQ(1001LL, p.mLastTimestampSeen);
p.OnLogEvent(&event2, false);
EXPECT_EQ(2UL, p.mLogCount);
EXPECT_EQ(1002LL, p.mLargestTimestampSeen);
EXPECT_EQ(1002LL, p.mLastTimestampSeen);
p.OnLogEvent(&event3, false);
EXPECT_EQ(3UL, p.mLogCount);
EXPECT_EQ(1005LL, p.mLargestTimestampSeen);
EXPECT_EQ(1005LL, p.mLastTimestampSeen);
p.OnLogEvent(&event4, false);
EXPECT_EQ(4UL, p.mLogCount);
EXPECT_EQ(1005LL, p.mLargestTimestampSeen);
EXPECT_EQ(1004LL, p.mLastTimestampSeen);
EXPECT_FALSE(p.mInReconnection);
// Reconnect happens, event1 out of buffer. Read event2
p.OnLogEvent(&event2, true);
EXPECT_EQ(4UL, p.mLogCount);
EXPECT_EQ(1005LL, p.mLargestTimestampSeen);
EXPECT_EQ(1004LL, p.mLastTimestampSeen);
EXPECT_TRUE(p.mInReconnection);
p.OnLogEvent(&event3, false);
EXPECT_EQ(4UL, p.mLogCount);
EXPECT_EQ(1005LL, p.mLargestTimestampSeen);
EXPECT_EQ(1004LL, p.mLastTimestampSeen);
EXPECT_TRUE(p.mInReconnection);
p.OnLogEvent(&event4, false);
EXPECT_EQ(4UL, p.mLogCount);
EXPECT_EQ(1005LL, p.mLargestTimestampSeen);
EXPECT_EQ(1004LL, p.mLastTimestampSeen);
EXPECT_FALSE(p.mInReconnection);
// Fresh event comes.
p.OnLogEvent(&event5, false);
EXPECT_EQ(5UL, p.mLogCount);
EXPECT_EQ(1005LL, p.mLargestTimestampSeen);
EXPECT_EQ(999LL, p.mLastTimestampSeen);
p.OnLogEvent(&event6, false);
EXPECT_EQ(6UL, p.mLogCount);
EXPECT_EQ(2000LL, p.mLargestTimestampSeen);
EXPECT_EQ(2000LL, p.mLastTimestampSeen);
// Reconnect happens, read from event4
p.OnLogEvent(&event4, true);
EXPECT_EQ(6UL, p.mLogCount);
EXPECT_EQ(2000LL, p.mLargestTimestampSeen);
EXPECT_EQ(2000LL, p.mLastTimestampSeen);
EXPECT_TRUE(p.mInReconnection);
p.OnLogEvent(&event5, false);
EXPECT_EQ(6UL, p.mLogCount);
EXPECT_EQ(2000LL, p.mLargestTimestampSeen);
EXPECT_EQ(2000LL, p.mLastTimestampSeen);
EXPECT_TRUE(p.mInReconnection);
// Before we get out of reconnection state, it reconnects again.
p.OnLogEvent(&event5, true);
EXPECT_EQ(6UL, p.mLogCount);
EXPECT_EQ(2000LL, p.mLargestTimestampSeen);
EXPECT_EQ(2000LL, p.mLastTimestampSeen);
EXPECT_TRUE(p.mInReconnection);
p.OnLogEvent(&event6, false);
EXPECT_EQ(6UL, p.mLogCount);
EXPECT_EQ(2000LL, p.mLargestTimestampSeen);
EXPECT_EQ(2000LL, p.mLastTimestampSeen);
EXPECT_FALSE(p.mInReconnection);
EXPECT_EQ(0, p.mLogLossCount);
// it reconnects again. All old events are gone. We lose CP.
p.OnLogEvent(&event7, true);
EXPECT_EQ(7UL, p.mLogCount);
EXPECT_EQ(3000LL, p.mLargestTimestampSeen);
EXPECT_EQ(3000LL, p.mLastTimestampSeen);
EXPECT_EQ(1, p.mLogLossCount);
EXPECT_FALSE(p.mInReconnection);
}
#else
GTEST_LOG_(INFO) << "This test does nothing.\n";
#endif