From 1e2405160447116c8fd9b6f09e2245381c55f23e Mon Sep 17 00:00:00 2001 From: Ruchir Rastogi Date: Wed, 22 Apr 2020 09:03:22 -0700 Subject: [PATCH] Fix ShellSubscriber concurrency issues This CL creates a sendHeartbeat thread that ocassionally sends heartbeats, consisting of a dataSize of 0, to perfd. perfd will discard those heartbeats, recheck if the user has canceled the subscription, and if not, wait for more data from statsd. Sending heartbeats solves two big problems: (1) Allows statsd to robustly check if writes to the socket fail because the read end of the pipe has closed. Previously, if no atoms were pushed or pulled, statsd never attempted to write to perfd, so statsd could never detect the end of the subscription. However, now, writes are regularly made regardless of if statsd receives data. Note that even if atoms were pushed or pulled, there is no guarantee that they would have matched the atom matchers sent in perfd's config. (2) Allows perfd to escape a blocking read call and recheck whether the user has canceled the subscription. If no data is sent to perfd, perfd will block in this read call and the AndroidStudio UI will freeze up. Heartbeats are only sent if statsd has not sent any data to perfd within the last second, so we do not spam perfd with writes. + decomposes the startNewSubscription function + prevents startPull from holding the lock while sleeping Test: atest stastd_test Test: atest CtsStatsdHostTestCases Test: manually confirm that AndroidStudio is not freezing Bug: 153595161 Change-Id: I78f0818e8ed29bdadd02c151444ee7c9555623a4 --- cmds/statsd/src/shell/ShellSubscriber.cpp | 198 ++++++++++-------- cmds/statsd/src/shell/ShellSubscriber.h | 36 +++- .../tests/shell/ShellSubscriber_test.cpp | 38 ++-- 3 files changed, 164 insertions(+), 108 deletions(-) diff --git a/cmds/statsd/src/shell/ShellSubscriber.cpp b/cmds/statsd/src/shell/ShellSubscriber.cpp index bed836a1bd90b..7b687210ce33b 100644 --- a/cmds/statsd/src/shell/ShellSubscriber.cpp +++ b/cmds/statsd/src/shell/ShellSubscriber.cpp @@ -19,6 +19,7 @@ #include "ShellSubscriber.h" #include + #include "matchers/matcher_util.h" #include "stats_log_util.h" @@ -32,41 +33,52 @@ const static int FIELD_ID_ATOM = 1; void ShellSubscriber::startNewSubscription(int in, int out, int timeoutSec) { int myToken = claimToken(); + VLOG("ShellSubscriber: new subscription %d has come in", myToken); mSubscriptionShouldEnd.notify_one(); shared_ptr mySubscriptionInfo = make_shared(in, out); - if (!readConfig(mySubscriptionInfo)) { - return; - } + if (!readConfig(mySubscriptionInfo)) return; - // critical-section - std::unique_lock lock(mMutex); - if (myToken != mToken) { - // Some other subscription has already come in. Stop. - return; - } - mSubscriptionInfo = mySubscriptionInfo; + { + std::unique_lock lock(mMutex); + if (myToken != mToken) { + // Some other subscription has already come in. Stop. + return; + } + mSubscriptionInfo = mySubscriptionInfo; - if (mySubscriptionInfo->mPulledInfo.size() > 0 && mySubscriptionInfo->mPullIntervalMin > 0) { - // This thread terminates after it detects that mToken has changed. + spawnHelperThreadsLocked(mySubscriptionInfo, myToken); + waitForSubscriptionToEndLocked(mySubscriptionInfo, myToken, lock, timeoutSec); + + if (mSubscriptionInfo == mySubscriptionInfo) { + mSubscriptionInfo = nullptr; + } + + } +} + +void ShellSubscriber::spawnHelperThreadsLocked(shared_ptr myInfo, int myToken) { + if (!myInfo->mPulledInfo.empty() && myInfo->mPullIntervalMin > 0) { std::thread puller([this, myToken] { startPull(myToken); }); puller.detach(); } - // Block until subscription has ended. - if (timeoutSec > 0) { - mSubscriptionShouldEnd.wait_for( - lock, timeoutSec * 1s, [this, myToken, &mySubscriptionInfo] { - return mToken != myToken || !mySubscriptionInfo->mClientAlive; - }); - } else { - mSubscriptionShouldEnd.wait(lock, [this, myToken, &mySubscriptionInfo] { - return mToken != myToken || !mySubscriptionInfo->mClientAlive; - }); - } + std::thread heartbeatSender([this, myToken] { sendHeartbeats(myToken); }); + heartbeatSender.detach(); +} - if (mSubscriptionInfo == mySubscriptionInfo) { - mSubscriptionInfo = nullptr; +void ShellSubscriber::waitForSubscriptionToEndLocked(shared_ptr myInfo, + int myToken, + std::unique_lock& lock, + int timeoutSec) { + if (timeoutSec > 0) { + mSubscriptionShouldEnd.wait_for(lock, timeoutSec * 1s, [this, myToken, &myInfo] { + return mToken != myToken || !myInfo->mClientAlive; + }); + } else { + mSubscriptionShouldEnd.wait(lock, [this, myToken, &myInfo] { + return mToken != myToken || !myInfo->mClientAlive; + }); } } @@ -129,51 +141,55 @@ bool ShellSubscriber::readConfig(shared_ptr subscriptionInfo) return true; } -void ShellSubscriber::startPull(int64_t myToken) { +void ShellSubscriber::startPull(int myToken) { + VLOG("ShellSubscriber: pull thread %d starting", myToken); while (true) { - std::lock_guard lock(mMutex); - if (!mSubscriptionInfo || mToken != myToken) { - VLOG("Pulling thread %lld done!", (long long)myToken); - return; - } + { + std::lock_guard lock(mMutex); + if (!mSubscriptionInfo || mToken != myToken) { + VLOG("ShellSubscriber: pulling thread %d done!", myToken); + return; + } + + int64_t nowMillis = getElapsedRealtimeMillis(); + for (PullInfo& pullInfo : mSubscriptionInfo->mPulledInfo) { + if (pullInfo.mPrevPullElapsedRealtimeMs + pullInfo.mInterval >= nowMillis) { + continue; + } - int64_t nowMillis = getElapsedRealtimeMillis(); - for (auto& pullInfo : mSubscriptionInfo->mPulledInfo) { - if (pullInfo.mPrevPullElapsedRealtimeMs + pullInfo.mInterval < nowMillis) { - vector> data; vector uids; - uids.insert(uids.end(), pullInfo.mPullUids.begin(), pullInfo.mPullUids.end()); - // This is slow. Consider storing the uids per app and listening to uidmap updates. - for (const string& pkg : pullInfo.mPullPackages) { - set uidsForPkg = mUidMap->getAppUid(pkg); - uids.insert(uids.end(), uidsForPkg.begin(), uidsForPkg.end()); - } - uids.push_back(DEFAULT_PULL_UID); - mPullerMgr->Pull(pullInfo.mPullerMatcher.atom_id(), uids, &data); - VLOG("pulled %zu atoms with id %d", data.size(), pullInfo.mPullerMatcher.atom_id()); + getUidsForPullAtom(&uids, pullInfo); + + vector> data; + mPullerMgr->Pull(pullInfo.mPullerMatcher.atom_id(), uids, &data); + VLOG("Pulled %zu atoms with id %d", data.size(), pullInfo.mPullerMatcher.atom_id()); + writePulledAtomsLocked(data, pullInfo.mPullerMatcher); - if (!writePulledAtomsLocked(data, pullInfo.mPullerMatcher)) { - mSubscriptionInfo->mClientAlive = false; - mSubscriptionShouldEnd.notify_one(); - return; - } pullInfo.mPrevPullElapsedRealtimeMs = nowMillis; } } - VLOG("Pulling thread %lld sleep....", (long long)myToken); + VLOG("ShellSubscriber: pulling thread %d sleeping for %d ms", myToken, + mSubscriptionInfo->mPullIntervalMin); std::this_thread::sleep_for(std::chrono::milliseconds(mSubscriptionInfo->mPullIntervalMin)); } } -// \return boolean indicating if writes were successful (will return false if -// client dies) -bool ShellSubscriber::writePulledAtomsLocked(const vector>& data, +void ShellSubscriber::getUidsForPullAtom(vector* uids, const PullInfo& pullInfo) { + uids->insert(uids->end(), pullInfo.mPullUids.begin(), pullInfo.mPullUids.end()); + // This is slow. Consider storing the uids per app and listening to uidmap updates. + for (const string& pkg : pullInfo.mPullPackages) { + set uidsForPkg = mUidMap->getAppUid(pkg); + uids->insert(uids->end(), uidsForPkg.begin(), uidsForPkg.end()); + } + uids->push_back(DEFAULT_PULL_UID); +} + +void ShellSubscriber::writePulledAtomsLocked(const vector>& data, const SimpleAtomMatcher& matcher) { mProto.clear(); int count = 0; for (const auto& event : data) { - VLOG("%s", event->ToString().c_str()); if (matchesSimple(*mUidMap, matcher, *event)) { count++; uint64_t atomToken = mProto.start(util::FIELD_TYPE_MESSAGE | @@ -183,55 +199,67 @@ bool ShellSubscriber::writePulledAtomsLocked(const vector 0) { - // First write the payload size. - size_t bufferSize = mProto.size(); - if (!android::base::WriteFully(mSubscriptionInfo->mOutputFd, &bufferSize, - sizeof(bufferSize))) { - return false; - } - - VLOG("%d atoms, proto size: %zu", count, bufferSize); - // Then write the payload. - if (!mProto.flush(mSubscriptionInfo->mOutputFd)) { - return false; - } - } - - return true; + if (count > 0) attemptWriteToSocketLocked(mProto.size()); } void ShellSubscriber::onLogEvent(const LogEvent& event) { std::lock_guard lock(mMutex); - if (!mSubscriptionInfo) { - return; - } + if (!mSubscriptionInfo) return; mProto.clear(); for (const auto& matcher : mSubscriptionInfo->mPushedMatchers) { if (matchesSimple(*mUidMap, matcher, event)) { - VLOG("%s", event.ToString().c_str()); uint64_t atomToken = mProto.start(util::FIELD_TYPE_MESSAGE | util::FIELD_COUNT_REPEATED | FIELD_ID_ATOM); event.ToProto(mProto); mProto.end(atomToken); + attemptWriteToSocketLocked(mProto.size()); + } + } +} - // First write the payload size. - size_t bufferSize = mProto.size(); - if (!android::base::WriteFully(mSubscriptionInfo->mOutputFd, &bufferSize, - sizeof(bufferSize))) { - mSubscriptionInfo->mClientAlive = false; - mSubscriptionShouldEnd.notify_one(); +// Tries to write the atom encoded in mProto to the socket. If the write fails +// because the read end of the pipe has closed, signals to other threads that +// the subscription should end. +void ShellSubscriber::attemptWriteToSocketLocked(size_t dataSize) { + // First write the payload size. + if (!android::base::WriteFully(mSubscriptionInfo->mOutputFd, &dataSize, sizeof(dataSize))) { + mSubscriptionInfo->mClientAlive = false; + mSubscriptionShouldEnd.notify_one(); + return; + } + + if (dataSize == 0) return; + + // Then, write the payload. + if (!mProto.flush(mSubscriptionInfo->mOutputFd)) { + mSubscriptionInfo->mClientAlive = false; + mSubscriptionShouldEnd.notify_one(); + return; + } + + mLastWriteMs = getElapsedRealtimeMillis(); +} + +// Send a heartbeat, consisting solely of a data size of 0, if perfd has not +// recently received any writes from statsd. When it receives the data size of +// 0, perfd will not expect any data and recheck whether the shell command is +// still running. +void ShellSubscriber::sendHeartbeats(int myToken) { + while (true) { + { + std::lock_guard lock(mMutex); + if (!mSubscriptionInfo || myToken != mToken) { + VLOG("ShellSubscriber: heartbeat thread %d done!", myToken); return; } - // Then write the payload. - if (!mProto.flush(mSubscriptionInfo->mOutputFd)) { - mSubscriptionInfo->mClientAlive = false; - mSubscriptionShouldEnd.notify_one(); - return; + if (getElapsedRealtimeMillis() - mLastWriteMs > kMsBetweenHeartbeats) { + VLOG("ShellSubscriber: sending a heartbeat to perfd"); + attemptWriteToSocketLocked(/*dataSize=*/0); } } + std::this_thread::sleep_for(std::chrono::milliseconds(kMsBetweenHeartbeats)); } } diff --git a/cmds/statsd/src/shell/ShellSubscriber.h b/cmds/statsd/src/shell/ShellSubscriber.h index 61457d89f2242..26c8a2a0b683f 100644 --- a/cmds/statsd/src/shell/ShellSubscriber.h +++ b/cmds/statsd/src/shell/ShellSubscriber.h @@ -38,11 +38,11 @@ namespace statsd { * * A shell subscription lasts *until shell exits*. Unlike config based clients, a shell client * communicates with statsd via file descriptors. They can subscribe pushed and pulled atoms. - * The atoms are sent back to the client in real time, as opposed to - * keeping the data in memory. Shell clients do not subscribe aggregated metrics, as they are - * responsible for doing the aggregation after receiving the atom events. + * The atoms are sent back to the client in real time, as opposed to keeping the data in memory. + * Shell clients do not subscribe aggregated metrics, as they are responsible for doing the + * aggregation after receiving the atom events. * - * Shell client pass ShellSubscription in the proto binary format. Client can update the + * Shell clients pass ShellSubscription in the proto binary format. Clients can update the * subscription by sending a new subscription. The new subscription would replace the old one. * Input data stream format is: * @@ -54,7 +54,7 @@ namespace statsd { * The stream would be in the following format: * |size_t|shellData proto|size_t|shellData proto|.... * - * Only one shell subscriber allowed at a time, because each shell subscriber blocks one thread + * Only one shell subscriber is allowed at a time because each shell subscriber blocks one thread * until it exits. */ class ShellSubscriber : public virtual RefBase { @@ -100,11 +100,28 @@ private: bool readConfig(std::shared_ptr subscriptionInfo); - void startPull(int64_t myToken); + void spawnHelperThreadsLocked(std::shared_ptr myInfo, int myToken); - bool writePulledAtomsLocked(const vector>& data, + void waitForSubscriptionToEndLocked(std::shared_ptr myInfo, + int myToken, + std::unique_lock& lock, + int timeoutSec); + + void startPull(int myToken); + + void writePulledAtomsLocked(const vector>& data, const SimpleAtomMatcher& matcher); + void getUidsForPullAtom(vector* uids, const PullInfo& pullInfo); + + void attemptWriteToSocketLocked(size_t dataSize); + + // Send ocassional heartbeats for two reasons: (a) for statsd to detect when + // the read end of the pipe has closed and (b) for perfd to escape a + // blocking read call and recheck if the user has terminated the + // subscription. + void sendHeartbeats(int myToken); + sp mUidMap; sp mPullerMgr; @@ -120,6 +137,11 @@ private: int mToken = 0; const int32_t DEFAULT_PULL_UID = AID_SYSTEM; + + // Tracks when we last send data to perfd. We need that time to determine + // when next to send a heartbeat. + int64_t mLastWriteMs = 0; + const int64_t kMsBetweenHeartbeats = 1000; }; } // namespace statsd diff --git a/cmds/statsd/tests/shell/ShellSubscriber_test.cpp b/cmds/statsd/tests/shell/ShellSubscriber_test.cpp index 7b952d7a392e2..363fcb4bf1938 100644 --- a/cmds/statsd/tests/shell/ShellSubscriber_test.cpp +++ b/cmds/statsd/tests/shell/ShellSubscriber_test.cpp @@ -86,28 +86,34 @@ void runShellTest(ShellSubscription config, sp uidMap, // wait for the data to be written. std::this_thread::sleep_for(100ms); - int expected_data_size = expectedData.ByteSize(); + // Because we might receive heartbeats from statsd, consisting of data sizes + // of 0, encapsulate reads within a while loop. + bool readAtom = false; + while (!readAtom) { + // Read the atom size. + size_t dataSize = 0; + read(fds_data[0], &dataSize, sizeof(dataSize)); + if (dataSize == 0) continue; + EXPECT_EQ(expectedData.ByteSize(), int(dataSize)); - // now read from the pipe. firstly read the atom size. - size_t dataSize = 0; - EXPECT_EQ((int)sizeof(dataSize), read(fds_data[0], &dataSize, sizeof(dataSize))); + // Read that much data in proto binary format. + vector dataBuffer(dataSize); + EXPECT_EQ((int)dataSize, read(fds_data[0], dataBuffer.data(), dataSize)); - EXPECT_EQ(expected_data_size, (int)dataSize); + // Make sure the received bytes can be parsed to an atom. + ShellData receivedAtom; + EXPECT_TRUE(receivedAtom.ParseFromArray(dataBuffer.data(), dataSize) != 0); - // then read that much data which is the atom in proto binary format - vector dataBuffer(dataSize); - EXPECT_EQ((int)dataSize, read(fds_data[0], dataBuffer.data(), dataSize)); + // Serialize the expected atom to byte array and compare to make sure + // they are the same. + vector expectedAtomBuffer(expectedData.ByteSize()); + expectedData.SerializeToArray(expectedAtomBuffer.data(), expectedData.ByteSize()); + EXPECT_EQ(expectedAtomBuffer, dataBuffer); - // make sure the received bytes can be parsed to an atom - ShellData receivedAtom; - EXPECT_TRUE(receivedAtom.ParseFromArray(dataBuffer.data(), dataSize) != 0); + readAtom = true; + } - // serialze the expected atom to bytes. and compare. to make sure they are the same. - vector atomBuffer(expected_data_size); - expectedData.SerializeToArray(&atomBuffer[0], expected_data_size); - EXPECT_EQ(atomBuffer, dataBuffer); close(fds_data[0]); - if (reader.joinable()) { reader.join(); }