From a5e4bb5aa63d3e371193b024c6cfc665ff674203 Mon Sep 17 00:00:00 2001 From: Ruchir Rastogi Date: Wed, 4 Mar 2020 17:11:58 -0800 Subject: [PATCH] Implement new perfd<->statsd ShellSubscriber comm. Because we no longer linkToDeath against a binder object to detect if the cmd process has died, we detect deaths by checking if writes fail. ag/10476582 proves that writes fail if the cmd process dies. Test: m statsd Test: bit statsd_test:ShellSubscriberTest.testPushedSubscription Test: bit statsd_test:ShellSubscriberTest.testPulledSubscription Bug: 150619687 Change-Id: I44a777ffff11e5b9298912b2906063c65e9009eb --- cmds/statsd/src/shell/ShellSubscriber.cpp | 232 ++++++++---------- cmds/statsd/src/shell/ShellSubscriber.h | 41 ++-- .../tests/shell/ShellSubscriber_test.cpp | 116 +++++---- 3 files changed, 193 insertions(+), 196 deletions(-) diff --git a/cmds/statsd/src/shell/ShellSubscriber.cpp b/cmds/statsd/src/shell/ShellSubscriber.cpp index a861a3b76868e..6fa165475029c 100644 --- a/cmds/statsd/src/shell/ShellSubscriber.cpp +++ b/cmds/statsd/src/shell/ShellSubscriber.cpp @@ -18,6 +18,7 @@ #include "ShellSubscriber.h" +#include #include "matchers/matcher_util.h" #include "stats_log_util.h" @@ -30,154 +31,129 @@ namespace statsd { const static int FIELD_ID_ATOM = 1; void ShellSubscriber::startNewSubscription(int in, int out, int timeoutSec) { - VLOG("start new shell subscription"); - int64_t subscriberId = getElapsedRealtimeNs(); + int myToken = claimToken(); + mSubscriptionShouldEnd.notify_one(); - { - std::lock_guard lock(mMutex); - if (mSubscriberId> 0) { - VLOG("Only one shell subscriber is allowed."); - return; - } - mSubscriberId = subscriberId; - mInput = in; - mOutput = out; + shared_ptr mySubscriptionInfo = make_shared(in, out); + if (!readConfig(mySubscriptionInfo)) { + return; } - bool success = readConfig(); - if (!success) { - std::lock_guard lock(mMutex); - cleanUpLocked(); + // critical-section + std::unique_lock lock(mMutex); + if (myToken < mToken) { + // Some other subscription has already come in. Stop. + return; + } + mSubscriptionInfo = mySubscriptionInfo; + + if (mySubscriptionInfo->mPulledInfo.size() > 0 && mySubscriptionInfo->mPullIntervalMin > 0) { + // This thread terminates after it detects that mToken has changed. + std::thread puller([this, myToken] { startPull(myToken); }); + puller.detach(); } - VLOG("Wait for client to exit or timeout (%d sec)", timeoutSec); - std::unique_lock lk(mMutex); - - // Note that the following is blocking, and it's intended as we cannot return until the shell - // cmd exits or we time out. + // Block until subscription has ended. if (timeoutSec > 0) { - mShellDied.wait_for(lk, timeoutSec * 1s, - [this, subscriberId] { return mSubscriberId != subscriberId; }); + mSubscriptionShouldEnd.wait_for( + lock, timeoutSec * 1s, [this, myToken, &mySubscriptionInfo] { + return mToken != myToken || !mySubscriptionInfo->mClientAlive; + }); } else { - mShellDied.wait(lk, [this, subscriberId] { return mSubscriberId != subscriberId; }); + mSubscriptionShouldEnd.wait(lock, [this, myToken, &mySubscriptionInfo] { + return mToken != myToken || !mySubscriptionInfo->mClientAlive; + }); + } + + if (mSubscriptionInfo == mySubscriptionInfo) { + mSubscriptionInfo = nullptr; } } +// Atomically claim the next token. Token numbers denote subscriber ordering. +int ShellSubscriber::claimToken() { + std::unique_lock lock(mMutex); + int myToken = ++mToken; + return myToken; +} -// Read configs until EOF is reached. There may be multiple configs in the input -// -- each new config should replace the previous one. -// -// Returns a boolean indicating whether the input was read successfully. -bool ShellSubscriber::readConfig() { - if (mInput < 0) { +// Read and parse single config. There should only one config per input. +bool ShellSubscriber::readConfig(shared_ptr subscriptionInfo) { + // Read the size of the config. + size_t bufferSize; + if (!android::base::ReadFully(subscriptionInfo->mInputFd, &bufferSize, sizeof(bufferSize))) { return false; } - while (true) { - // Read the size of the config. - size_t bufferSize = 0; - ssize_t bytesRead = read(mInput, &bufferSize, sizeof(bufferSize)); - if (bytesRead == 0) { - VLOG("We have reached the end of the input."); - return true; - } else if (bytesRead < 0 || (size_t)bytesRead != sizeof(bufferSize)) { - ALOGE("Error reading config size"); - return false; - } - - // Read and parse the config. - vector buffer(bufferSize); - bytesRead = read(mInput, buffer.data(), bufferSize); - if (bytesRead > 0 && (size_t)bytesRead == bufferSize) { - ShellSubscription config; - if (config.ParseFromArray(buffer.data(), bufferSize)) { - updateConfig(config); - } else { - ALOGE("Error parsing the config"); - return false; - } - } else { - VLOG("Error reading the config, expected bytes: %zu, actual bytes: %zu", bufferSize, - bytesRead); - return false; - } + // Read the config. + vector buffer(bufferSize); + if (!android::base::ReadFully(subscriptionInfo->mInputFd, buffer.data(), bufferSize)) { + return false; } -} -void ShellSubscriber::updateConfig(const ShellSubscription& config) { - mPushedMatchers.clear(); - mPulledInfo.clear(); + // Parse the config. + ShellSubscription config; + if (!config.ParseFromArray(buffer.data(), bufferSize)) { + return false; + } + // Update SubscriptionInfo with state from config for (const auto& pushed : config.pushed()) { - mPushedMatchers.push_back(pushed); - VLOG("adding matcher for pushed atom %d", pushed.atom_id()); + subscriptionInfo->mPushedMatchers.push_back(pushed); } - int64_t token = getElapsedRealtimeNs(); - mPullToken = token; - - int64_t minInterval = -1; + int minInterval = -1; for (const auto& pulled : config.pulled()) { // All intervals need to be multiples of the min interval. if (minInterval < 0 || pulled.freq_millis() < minInterval) { minInterval = pulled.freq_millis(); } - - mPulledInfo.emplace_back(pulled.matcher(), pulled.freq_millis()); - VLOG("adding matcher for pulled atom %d", pulled.matcher().atom_id()); + subscriptionInfo->mPulledInfo.emplace_back(pulled.matcher(), pulled.freq_millis()); } + subscriptionInfo->mPullIntervalMin = minInterval; - if (mPulledInfo.size() > 0 && minInterval > 0) { - // This thread is guaranteed to terminate after it detects the token is - // different. - std::thread puller([token, minInterval, this] { startPull(token, minInterval); }); - puller.detach(); - } + return true; } -void ShellSubscriber::startPull(int64_t token, int64_t intervalMillis) { +void ShellSubscriber::startPull(int64_t myToken) { while (true) { - int64_t nowMillis = getElapsedRealtimeMillis(); - { - std::lock_guard lock(mMutex); - // If the token has changed, the config has changed, so this - // puller can now stop. - if (mPulledInfo.size() == 0 || mPullToken != token) { - VLOG("Pulling thread %lld done!", (long long)token); - return; - } - for (auto& pullInfo : mPulledInfo) { - if (pullInfo.mPrevPullElapsedRealtimeMs + pullInfo.mInterval < nowMillis) { - VLOG("pull atom %d now", pullInfo.mPullerMatcher.atom_id()); + std::lock_guard lock(mMutex); + if (!mSubscriptionInfo || mToken != myToken) { + VLOG("Pulling thread %lld done!", (long long)myToken); + return; + } - vector> data; - mPullerMgr->Pull(pullInfo.mPullerMatcher.atom_id(), &data); - VLOG("pulled %zu atoms", data.size()); - if (data.size() > 0) { - writeToOutputLocked(data, pullInfo.mPullerMatcher); - } - pullInfo.mPrevPullElapsedRealtimeMs = nowMillis; + int64_t nowMillis = getElapsedRealtimeMillis(); + for (auto& pullInfo : mSubscriptionInfo->mPulledInfo) { + if (pullInfo.mPrevPullElapsedRealtimeMs + pullInfo.mInterval < nowMillis) { + vector> data; + mPullerMgr->Pull(pullInfo.mPullerMatcher.atom_id(), &data); + VLOG("pulled %zu atoms with id %d", data.size(), pullInfo.mPullerMatcher.atom_id()); + + // TODO(b/150969574): Don't write to a pipe while holding a lock. + if (!writePulledAtomsLocked(data, pullInfo.mPullerMatcher)) { + mSubscriptionInfo->mClientAlive = false; + mSubscriptionShouldEnd.notify_one(); + return; } + pullInfo.mPrevPullElapsedRealtimeMs = nowMillis; } } - VLOG("Pulling thread %lld sleep....", (long long)token); - std::this_thread::sleep_for(std::chrono::milliseconds(intervalMillis)); + + VLOG("Pulling thread %lld sleep....", (long long)myToken); + std::this_thread::sleep_for(std::chrono::milliseconds(mSubscriptionInfo->mPullIntervalMin)); } } -// Must be called with the lock acquired, so that mProto isn't being written to -// at the same time by multiple threads. -void ShellSubscriber::writeToOutputLocked(const vector>& data, - const SimpleAtomMatcher& matcher) { - if (mOutput < 0) { - return; - } - int count = 0; +// \return boolean indicating if writes were successful (will return false if +// client dies) +bool ShellSubscriber::writePulledAtomsLocked(const vector>& data, + const SimpleAtomMatcher& matcher) { mProto.clear(); + int count = 0; for (const auto& event : data) { VLOG("%s", event->ToString().c_str()); if (matchesSimple(*mUidMap, matcher, *event)) { - VLOG("matched"); count++; uint64_t atomToken = mProto.start(util::FIELD_TYPE_MESSAGE | util::FIELD_COUNT_REPEATED | FIELD_ID_ATOM); @@ -189,24 +165,29 @@ void ShellSubscriber::writeToOutputLocked(const vector if (count > 0) { // First write the payload size. size_t bufferSize = mProto.size(); - write(mOutput, &bufferSize, sizeof(bufferSize)); + if (!android::base::WriteFully(mSubscriptionInfo->mOutputFd, &bufferSize, + sizeof(bufferSize))) { + return false; + } VLOG("%d atoms, proto size: %zu", count, bufferSize); // Then write the payload. - mProto.flush(mOutput); + if (!mProto.flush(mSubscriptionInfo->mOutputFd)) { + return false; + } } + + return true; } void ShellSubscriber::onLogEvent(const LogEvent& event) { - // Acquire a lock to prevent corruption from multiple threads writing to - // mProto. std::lock_guard lock(mMutex); - if (mOutput < 0) { + if (!mSubscriptionInfo) { return; } mProto.clear(); - for (const auto& matcher : mPushedMatchers) { + for (const auto& matcher : mSubscriptionInfo->mPushedMatchers) { if (matchesSimple(*mUidMap, matcher, event)) { VLOG("%s", event.ToString().c_str()); uint64_t atomToken = mProto.start(util::FIELD_TYPE_MESSAGE | @@ -216,26 +197,23 @@ void ShellSubscriber::onLogEvent(const LogEvent& event) { // First write the payload size. size_t bufferSize = mProto.size(); - write(mOutput, &bufferSize, sizeof(bufferSize)); + if (!android::base::WriteFully(mSubscriptionInfo->mOutputFd, &bufferSize, + sizeof(bufferSize))) { + mSubscriptionInfo->mClientAlive = false; + mSubscriptionShouldEnd.notify_one(); + return; + } // Then write the payload. - mProto.flush(mOutput); + if (!mProto.flush(mSubscriptionInfo->mOutputFd)) { + mSubscriptionInfo->mClientAlive = false; + mSubscriptionShouldEnd.notify_one(); + return; + } } } } -void ShellSubscriber::cleanUpLocked() { - // The file descriptors will be closed by binder. - mInput = -1; - mOutput = -1; - mSubscriberId = 0; - mPushedMatchers.clear(); - mPulledInfo.clear(); - // Setting mPullToken == 0 tells pull thread that its work is done. - mPullToken = 0; - VLOG("done clean up"); -} - } // namespace statsd } // namespace os } // namespace android diff --git a/cmds/statsd/src/shell/ShellSubscriber.h b/cmds/statsd/src/shell/ShellSubscriber.h index eaf2ad141c8e7..7fd625e472b40 100644 --- a/cmds/statsd/src/shell/ShellSubscriber.h +++ b/cmds/statsd/src/shell/ShellSubscriber.h @@ -60,9 +60,6 @@ public: ShellSubscriber(sp uidMap, sp pullerMgr) : mUidMap(uidMap), mPullerMgr(pullerMgr){}; - /** - * Start a new subscription. - */ void startNewSubscription(int inFd, int outFd, int timeoutSec); void onLogEvent(const LogEvent& event); @@ -76,16 +73,28 @@ private: int64_t mInterval; int64_t mPrevPullElapsedRealtimeMs; }; - bool readConfig(); - void updateConfig(const ShellSubscription& config); + struct SubscriptionInfo { + SubscriptionInfo(const int& inputFd, const int& outputFd) + : mInputFd(inputFd), mOutputFd(outputFd), mClientAlive(true) { + } - void startPull(int64_t token, int64_t intervalMillis); + int mInputFd; + int mOutputFd; + std::vector mPushedMatchers; + std::vector mPulledInfo; + int mPullIntervalMin; + bool mClientAlive; + }; - void cleanUpLocked(); + int claimToken(); - void writeToOutputLocked(const vector>& data, - const SimpleAtomMatcher& matcher); + bool readConfig(std::shared_ptr subscriptionInfo); + + void startPull(int64_t myToken); + + bool writePulledAtomsLocked(const vector>& data, + const SimpleAtomMatcher& matcher); sp mUidMap; @@ -95,19 +104,11 @@ private: mutable std::mutex mMutex; - std::condition_variable mShellDied; // semaphore for waiting until shell exits. + std::condition_variable mSubscriptionShouldEnd; - int mInput = -1; // The input file descriptor + std::shared_ptr mSubscriptionInfo = nullptr; - int mOutput = -1; // The output file descriptor - - std::vector mPushedMatchers; - - std::vector mPulledInfo; - - int64_t mSubscriberId = 0; // A unique id to identify a subscriber. - - int64_t mPullToken = 0; // A unique token to identify a puller thread. + int mToken; }; } // namespace statsd diff --git a/cmds/statsd/tests/shell/ShellSubscriber_test.cpp b/cmds/statsd/tests/shell/ShellSubscriber_test.cpp index 57e426507b752..4c55683d909c4 100644 --- a/cmds/statsd/tests/shell/ShellSubscriber_test.cpp +++ b/cmds/statsd/tests/shell/ShellSubscriber_test.cpp @@ -19,6 +19,7 @@ #include "frameworks/base/cmds/statsd/src/shell/shell_config.pb.h" #include "frameworks/base/cmds/statsd/src/shell/shell_data.pb.h" #include "src/shell/ShellSubscriber.h" +#include "stats_event.h" #include "tests/metrics/metrics_test_helper.h" #include @@ -88,6 +89,7 @@ void runShellTest(ShellSubscription config, sp uidMap, // now read from the pipe. firstly read the atom size. size_t dataSize = 0; EXPECT_EQ((int)sizeof(dataSize), read(fds_data[0], &dataSize, sizeof(dataSize))); + EXPECT_EQ(expected_data_size, (int)dataSize); // then read that much data which is the atom in proto binary format @@ -103,32 +105,43 @@ void runShellTest(ShellSubscription config, sp uidMap, expectedData.SerializeToArray(&atomBuffer[0], expected_data_size); EXPECT_EQ(atomBuffer, dataBuffer); close(fds_data[0]); + + if (reader.joinable()) { + reader.join(); + } } -// TODO(b/149590301): Update this test to use new socket schema. -//TEST(ShellSubscriberTest, testPushedSubscription) { -// sp uidMap = new NaggyMock(); -// -// sp pullerManager = new StrictMock(); -// vector> pushedList; -// -// std::shared_ptr event1 = -// std::make_shared(29 /*screen_state_atom_id*/, 1000 /*timestamp*/); -// event1->write(::android::view::DisplayStateEnum::DISPLAY_STATE_ON); -// event1->init(); -// pushedList.push_back(event1); -// -// // create a simple config to get screen events -// ShellSubscription config; -// config.add_pushed()->set_atom_id(29); -// -// // this is the expected screen event atom. -// ShellData shellData; -// shellData.add_atom()->mutable_screen_state_changed()->set_state( -// ::android::view::DisplayStateEnum::DISPLAY_STATE_ON); -// -// runShellTest(config, uidMap, pullerManager, pushedList, shellData); -//} +TEST(ShellSubscriberTest, testPushedSubscription) { + sp uidMap = new NaggyMock(); + + sp pullerManager = new StrictMock(); + vector> pushedList; + + // Create the LogEvent from an AStatsEvent + AStatsEvent* statsEvent = AStatsEvent_obtain(); + AStatsEvent_setAtomId(statsEvent, 29 /*screen_state_atom_id*/); + AStatsEvent_overwriteTimestamp(statsEvent, 1000); + AStatsEvent_writeInt32(statsEvent, ::android::view::DisplayStateEnum::DISPLAY_STATE_ON); + AStatsEvent_build(statsEvent); + size_t size; + uint8_t* buffer = AStatsEvent_getBuffer(statsEvent, &size); + std::shared_ptr logEvent = std::make_shared(/*uid=*/0, /*pid=*/0); + logEvent->parseBuffer(buffer, size); + AStatsEvent_release(statsEvent); + + pushedList.push_back(logEvent); + + // create a simple config to get screen events + ShellSubscription config; + config.add_pushed()->set_atom_id(29); + + // this is the expected screen event atom. + ShellData shellData; + shellData.add_atom()->mutable_screen_state_changed()->set_state( + ::android::view::DisplayStateEnum::DISPLAY_STATE_ON); + + runShellTest(config, uidMap, pullerManager, pushedList, shellData); +} namespace { @@ -159,33 +172,38 @@ ShellSubscription getPulledConfig() { return config; } +shared_ptr makeCpuActiveTimeAtom(int32_t uid, int64_t timeMillis) { + AStatsEvent* statsEvent = AStatsEvent_obtain(); + AStatsEvent_setAtomId(statsEvent, 10016); + AStatsEvent_overwriteTimestamp(statsEvent, 1111L); + AStatsEvent_writeInt32(statsEvent, uid); + AStatsEvent_writeInt64(statsEvent, timeMillis); + AStatsEvent_build(statsEvent); + + size_t size; + uint8_t* buf = AStatsEvent_getBuffer(statsEvent, &size); + + std::shared_ptr logEvent = std::make_shared(/*uid=*/0, /*pid=*/0); + logEvent->parseBuffer(buf, size); + return logEvent; +} + } // namespace -// TODO(b/149590301): Update this test to use new socket schema. -//TEST(ShellSubscriberTest, testPulledSubscription) { -// sp uidMap = new NaggyMock(); -// -// sp pullerManager = new StrictMock(); -// EXPECT_CALL(*pullerManager, Pull(10016, _)) -// .WillRepeatedly(Invoke([](int tagId, vector>* data) { -// data->clear(); -// shared_ptr event = make_shared(tagId, 1111L); -// event->write(kUid1); -// event->write(kCpuTime1); -// event->init(); -// data->push_back(event); -// // another event -// event = make_shared(tagId, 1111L); -// event->write(kUid2); -// event->write(kCpuTime2); -// event->init(); -// data->push_back(event); -// return true; -// })); -// -// runShellTest(getPulledConfig(), uidMap, pullerManager, vector>(), -// getExpectedShellData()); -//} +TEST(ShellSubscriberTest, testPulledSubscription) { + sp uidMap = new NaggyMock(); + + sp pullerManager = new StrictMock(); + EXPECT_CALL(*pullerManager, Pull(10016, _)) + .WillRepeatedly(Invoke([](int tagId, vector>* data) { + data->clear(); + data->push_back(makeCpuActiveTimeAtom(/*uid=*/kUid1, /*timeMillis=*/kCpuTime1)); + data->push_back(makeCpuActiveTimeAtom(/*uid=*/kUid2, /*timeMillis=*/kCpuTime2)); + return true; + })); + runShellTest(getPulledConfig(), uidMap, pullerManager, vector>(), + getExpectedShellData()); +} #else GTEST_LOG_(INFO) << "This test does nothing.\n";