Merge "Fix ShellSubscriber concurrency issues" into rvc-dev am: ca7fc3be0f am: 0a60d240e5 am: f283d69da8

Change-Id: Idd66ecb29489169eaff4e4cc0d6d4ed9f19ba67f
This commit is contained in:
Ruchir Rastogi
2020-05-08 00:20:39 +00:00
committed by Automerger Merge Worker
3 changed files with 164 additions and 108 deletions

View File

@@ -19,6 +19,7 @@
#include "ShellSubscriber.h" #include "ShellSubscriber.h"
#include <android-base/file.h> #include <android-base/file.h>
#include "matchers/matcher_util.h" #include "matchers/matcher_util.h"
#include "stats_log_util.h" #include "stats_log_util.h"
@@ -32,41 +33,52 @@ const static int FIELD_ID_ATOM = 1;
void ShellSubscriber::startNewSubscription(int in, int out, int timeoutSec) { void ShellSubscriber::startNewSubscription(int in, int out, int timeoutSec) {
int myToken = claimToken(); int myToken = claimToken();
VLOG("ShellSubscriber: new subscription %d has come in", myToken);
mSubscriptionShouldEnd.notify_one(); mSubscriptionShouldEnd.notify_one();
shared_ptr<SubscriptionInfo> mySubscriptionInfo = make_shared<SubscriptionInfo>(in, out); shared_ptr<SubscriptionInfo> mySubscriptionInfo = make_shared<SubscriptionInfo>(in, out);
if (!readConfig(mySubscriptionInfo)) { if (!readConfig(mySubscriptionInfo)) return;
return;
}
// critical-section {
std::unique_lock<std::mutex> lock(mMutex); std::unique_lock<std::mutex> lock(mMutex);
if (myToken != mToken) { if (myToken != mToken) {
// Some other subscription has already come in. Stop. // Some other subscription has already come in. Stop.
return; return;
} }
mSubscriptionInfo = mySubscriptionInfo; mSubscriptionInfo = mySubscriptionInfo;
if (mySubscriptionInfo->mPulledInfo.size() > 0 && mySubscriptionInfo->mPullIntervalMin > 0) { spawnHelperThreadsLocked(mySubscriptionInfo, myToken);
// This thread terminates after it detects that mToken has changed. waitForSubscriptionToEndLocked(mySubscriptionInfo, myToken, lock, timeoutSec);
if (mSubscriptionInfo == mySubscriptionInfo) {
mSubscriptionInfo = nullptr;
}
}
}
void ShellSubscriber::spawnHelperThreadsLocked(shared_ptr<SubscriptionInfo> myInfo, int myToken) {
if (!myInfo->mPulledInfo.empty() && myInfo->mPullIntervalMin > 0) {
std::thread puller([this, myToken] { startPull(myToken); }); std::thread puller([this, myToken] { startPull(myToken); });
puller.detach(); puller.detach();
} }
// Block until subscription has ended. std::thread heartbeatSender([this, myToken] { sendHeartbeats(myToken); });
if (timeoutSec > 0) { heartbeatSender.detach();
mSubscriptionShouldEnd.wait_for( }
lock, timeoutSec * 1s, [this, myToken, &mySubscriptionInfo] {
return mToken != myToken || !mySubscriptionInfo->mClientAlive;
});
} else {
mSubscriptionShouldEnd.wait(lock, [this, myToken, &mySubscriptionInfo] {
return mToken != myToken || !mySubscriptionInfo->mClientAlive;
});
}
if (mSubscriptionInfo == mySubscriptionInfo) { void ShellSubscriber::waitForSubscriptionToEndLocked(shared_ptr<SubscriptionInfo> myInfo,
mSubscriptionInfo = nullptr; int myToken,
std::unique_lock<std::mutex>& lock,
int timeoutSec) {
if (timeoutSec > 0) {
mSubscriptionShouldEnd.wait_for(lock, timeoutSec * 1s, [this, myToken, &myInfo] {
return mToken != myToken || !myInfo->mClientAlive;
});
} else {
mSubscriptionShouldEnd.wait(lock, [this, myToken, &myInfo] {
return mToken != myToken || !myInfo->mClientAlive;
});
} }
} }
@@ -129,51 +141,55 @@ bool ShellSubscriber::readConfig(shared_ptr<SubscriptionInfo> subscriptionInfo)
return true; return true;
} }
void ShellSubscriber::startPull(int64_t myToken) { void ShellSubscriber::startPull(int myToken) {
VLOG("ShellSubscriber: pull thread %d starting", myToken);
while (true) { while (true) {
std::lock_guard<std::mutex> lock(mMutex); {
if (!mSubscriptionInfo || mToken != myToken) { std::lock_guard<std::mutex> lock(mMutex);
VLOG("Pulling thread %lld done!", (long long)myToken); if (!mSubscriptionInfo || mToken != myToken) {
return; VLOG("ShellSubscriber: pulling thread %d done!", myToken);
} return;
}
int64_t nowMillis = getElapsedRealtimeMillis();
for (PullInfo& pullInfo : mSubscriptionInfo->mPulledInfo) {
if (pullInfo.mPrevPullElapsedRealtimeMs + pullInfo.mInterval >= nowMillis) {
continue;
}
int64_t nowMillis = getElapsedRealtimeMillis();
for (auto& pullInfo : mSubscriptionInfo->mPulledInfo) {
if (pullInfo.mPrevPullElapsedRealtimeMs + pullInfo.mInterval < nowMillis) {
vector<std::shared_ptr<LogEvent>> data;
vector<int32_t> uids; vector<int32_t> uids;
uids.insert(uids.end(), pullInfo.mPullUids.begin(), pullInfo.mPullUids.end()); getUidsForPullAtom(&uids, pullInfo);
// This is slow. Consider storing the uids per app and listening to uidmap updates.
for (const string& pkg : pullInfo.mPullPackages) { vector<std::shared_ptr<LogEvent>> data;
set<int32_t> uidsForPkg = mUidMap->getAppUid(pkg); mPullerMgr->Pull(pullInfo.mPullerMatcher.atom_id(), uids, &data);
uids.insert(uids.end(), uidsForPkg.begin(), uidsForPkg.end()); VLOG("Pulled %zu atoms with id %d", data.size(), pullInfo.mPullerMatcher.atom_id());
} writePulledAtomsLocked(data, pullInfo.mPullerMatcher);
uids.push_back(DEFAULT_PULL_UID);
mPullerMgr->Pull(pullInfo.mPullerMatcher.atom_id(), uids, &data);
VLOG("pulled %zu atoms with id %d", data.size(), pullInfo.mPullerMatcher.atom_id());
if (!writePulledAtomsLocked(data, pullInfo.mPullerMatcher)) {
mSubscriptionInfo->mClientAlive = false;
mSubscriptionShouldEnd.notify_one();
return;
}
pullInfo.mPrevPullElapsedRealtimeMs = nowMillis; pullInfo.mPrevPullElapsedRealtimeMs = nowMillis;
} }
} }
VLOG("Pulling thread %lld sleep....", (long long)myToken); VLOG("ShellSubscriber: pulling thread %d sleeping for %d ms", myToken,
mSubscriptionInfo->mPullIntervalMin);
std::this_thread::sleep_for(std::chrono::milliseconds(mSubscriptionInfo->mPullIntervalMin)); std::this_thread::sleep_for(std::chrono::milliseconds(mSubscriptionInfo->mPullIntervalMin));
} }
} }
// \return boolean indicating if writes were successful (will return false if void ShellSubscriber::getUidsForPullAtom(vector<int32_t>* uids, const PullInfo& pullInfo) {
// client dies) uids->insert(uids->end(), pullInfo.mPullUids.begin(), pullInfo.mPullUids.end());
bool ShellSubscriber::writePulledAtomsLocked(const vector<std::shared_ptr<LogEvent>>& data, // This is slow. Consider storing the uids per app and listening to uidmap updates.
for (const string& pkg : pullInfo.mPullPackages) {
set<int32_t> uidsForPkg = mUidMap->getAppUid(pkg);
uids->insert(uids->end(), uidsForPkg.begin(), uidsForPkg.end());
}
uids->push_back(DEFAULT_PULL_UID);
}
void ShellSubscriber::writePulledAtomsLocked(const vector<std::shared_ptr<LogEvent>>& data,
const SimpleAtomMatcher& matcher) { const SimpleAtomMatcher& matcher) {
mProto.clear(); mProto.clear();
int count = 0; int count = 0;
for (const auto& event : data) { for (const auto& event : data) {
VLOG("%s", event->ToString().c_str());
if (matchesSimple(*mUidMap, matcher, *event)) { if (matchesSimple(*mUidMap, matcher, *event)) {
count++; count++;
uint64_t atomToken = mProto.start(util::FIELD_TYPE_MESSAGE | uint64_t atomToken = mProto.start(util::FIELD_TYPE_MESSAGE |
@@ -183,55 +199,67 @@ bool ShellSubscriber::writePulledAtomsLocked(const vector<std::shared_ptr<LogEve
} }
} }
if (count > 0) { if (count > 0) attemptWriteToSocketLocked(mProto.size());
// First write the payload size.
size_t bufferSize = mProto.size();
if (!android::base::WriteFully(mSubscriptionInfo->mOutputFd, &bufferSize,
sizeof(bufferSize))) {
return false;
}
VLOG("%d atoms, proto size: %zu", count, bufferSize);
// Then write the payload.
if (!mProto.flush(mSubscriptionInfo->mOutputFd)) {
return false;
}
}
return true;
} }
void ShellSubscriber::onLogEvent(const LogEvent& event) { void ShellSubscriber::onLogEvent(const LogEvent& event) {
std::lock_guard<std::mutex> lock(mMutex); std::lock_guard<std::mutex> lock(mMutex);
if (!mSubscriptionInfo) { if (!mSubscriptionInfo) return;
return;
}
mProto.clear(); mProto.clear();
for (const auto& matcher : mSubscriptionInfo->mPushedMatchers) { for (const auto& matcher : mSubscriptionInfo->mPushedMatchers) {
if (matchesSimple(*mUidMap, matcher, event)) { if (matchesSimple(*mUidMap, matcher, event)) {
VLOG("%s", event.ToString().c_str());
uint64_t atomToken = mProto.start(util::FIELD_TYPE_MESSAGE | uint64_t atomToken = mProto.start(util::FIELD_TYPE_MESSAGE |
util::FIELD_COUNT_REPEATED | FIELD_ID_ATOM); util::FIELD_COUNT_REPEATED | FIELD_ID_ATOM);
event.ToProto(mProto); event.ToProto(mProto);
mProto.end(atomToken); mProto.end(atomToken);
attemptWriteToSocketLocked(mProto.size());
}
}
}
// First write the payload size. // Tries to write the atom encoded in mProto to the socket. If the write fails
size_t bufferSize = mProto.size(); // because the read end of the pipe has closed, signals to other threads that
if (!android::base::WriteFully(mSubscriptionInfo->mOutputFd, &bufferSize, // the subscription should end.
sizeof(bufferSize))) { void ShellSubscriber::attemptWriteToSocketLocked(size_t dataSize) {
mSubscriptionInfo->mClientAlive = false; // First write the payload size.
mSubscriptionShouldEnd.notify_one(); if (!android::base::WriteFully(mSubscriptionInfo->mOutputFd, &dataSize, sizeof(dataSize))) {
mSubscriptionInfo->mClientAlive = false;
mSubscriptionShouldEnd.notify_one();
return;
}
if (dataSize == 0) return;
// Then, write the payload.
if (!mProto.flush(mSubscriptionInfo->mOutputFd)) {
mSubscriptionInfo->mClientAlive = false;
mSubscriptionShouldEnd.notify_one();
return;
}
mLastWriteMs = getElapsedRealtimeMillis();
}
// Send a heartbeat, consisting solely of a data size of 0, if perfd has not
// recently received any writes from statsd. When it receives the data size of
// 0, perfd will not expect any data and recheck whether the shell command is
// still running.
void ShellSubscriber::sendHeartbeats(int myToken) {
while (true) {
{
std::lock_guard<std::mutex> lock(mMutex);
if (!mSubscriptionInfo || myToken != mToken) {
VLOG("ShellSubscriber: heartbeat thread %d done!", myToken);
return; return;
} }
// Then write the payload. if (getElapsedRealtimeMillis() - mLastWriteMs > kMsBetweenHeartbeats) {
if (!mProto.flush(mSubscriptionInfo->mOutputFd)) { VLOG("ShellSubscriber: sending a heartbeat to perfd");
mSubscriptionInfo->mClientAlive = false; attemptWriteToSocketLocked(/*dataSize=*/0);
mSubscriptionShouldEnd.notify_one();
return;
} }
} }
std::this_thread::sleep_for(std::chrono::milliseconds(kMsBetweenHeartbeats));
} }
} }

View File

@@ -38,11 +38,11 @@ namespace statsd {
* *
* A shell subscription lasts *until shell exits*. Unlike config based clients, a shell client * A shell subscription lasts *until shell exits*. Unlike config based clients, a shell client
* communicates with statsd via file descriptors. They can subscribe pushed and pulled atoms. * communicates with statsd via file descriptors. They can subscribe pushed and pulled atoms.
* The atoms are sent back to the client in real time, as opposed to * The atoms are sent back to the client in real time, as opposed to keeping the data in memory.
* keeping the data in memory. Shell clients do not subscribe aggregated metrics, as they are * Shell clients do not subscribe aggregated metrics, as they are responsible for doing the
* responsible for doing the aggregation after receiving the atom events. * aggregation after receiving the atom events.
* *
* Shell client pass ShellSubscription in the proto binary format. Client can update the * Shell clients pass ShellSubscription in the proto binary format. Clients can update the
* subscription by sending a new subscription. The new subscription would replace the old one. * subscription by sending a new subscription. The new subscription would replace the old one.
* Input data stream format is: * Input data stream format is:
* *
@@ -54,7 +54,7 @@ namespace statsd {
* The stream would be in the following format: * The stream would be in the following format:
* |size_t|shellData proto|size_t|shellData proto|.... * |size_t|shellData proto|size_t|shellData proto|....
* *
* Only one shell subscriber allowed at a time, because each shell subscriber blocks one thread * Only one shell subscriber is allowed at a time because each shell subscriber blocks one thread
* until it exits. * until it exits.
*/ */
class ShellSubscriber : public virtual RefBase { class ShellSubscriber : public virtual RefBase {
@@ -100,11 +100,28 @@ private:
bool readConfig(std::shared_ptr<SubscriptionInfo> subscriptionInfo); bool readConfig(std::shared_ptr<SubscriptionInfo> subscriptionInfo);
void startPull(int64_t myToken); void spawnHelperThreadsLocked(std::shared_ptr<SubscriptionInfo> myInfo, int myToken);
bool writePulledAtomsLocked(const vector<std::shared_ptr<LogEvent>>& data, void waitForSubscriptionToEndLocked(std::shared_ptr<SubscriptionInfo> myInfo,
int myToken,
std::unique_lock<std::mutex>& lock,
int timeoutSec);
void startPull(int myToken);
void writePulledAtomsLocked(const vector<std::shared_ptr<LogEvent>>& data,
const SimpleAtomMatcher& matcher); const SimpleAtomMatcher& matcher);
void getUidsForPullAtom(vector<int32_t>* uids, const PullInfo& pullInfo);
void attemptWriteToSocketLocked(size_t dataSize);
// Send ocassional heartbeats for two reasons: (a) for statsd to detect when
// the read end of the pipe has closed and (b) for perfd to escape a
// blocking read call and recheck if the user has terminated the
// subscription.
void sendHeartbeats(int myToken);
sp<UidMap> mUidMap; sp<UidMap> mUidMap;
sp<StatsPullerManager> mPullerMgr; sp<StatsPullerManager> mPullerMgr;
@@ -120,6 +137,11 @@ private:
int mToken = 0; int mToken = 0;
const int32_t DEFAULT_PULL_UID = AID_SYSTEM; const int32_t DEFAULT_PULL_UID = AID_SYSTEM;
// Tracks when we last send data to perfd. We need that time to determine
// when next to send a heartbeat.
int64_t mLastWriteMs = 0;
const int64_t kMsBetweenHeartbeats = 1000;
}; };
} // namespace statsd } // namespace statsd

View File

@@ -86,28 +86,34 @@ void runShellTest(ShellSubscription config, sp<MockUidMap> uidMap,
// wait for the data to be written. // wait for the data to be written.
std::this_thread::sleep_for(100ms); std::this_thread::sleep_for(100ms);
int expected_data_size = expectedData.ByteSize(); // Because we might receive heartbeats from statsd, consisting of data sizes
// of 0, encapsulate reads within a while loop.
bool readAtom = false;
while (!readAtom) {
// Read the atom size.
size_t dataSize = 0;
read(fds_data[0], &dataSize, sizeof(dataSize));
if (dataSize == 0) continue;
EXPECT_EQ(expectedData.ByteSize(), int(dataSize));
// now read from the pipe. firstly read the atom size. // Read that much data in proto binary format.
size_t dataSize = 0; vector<uint8_t> dataBuffer(dataSize);
EXPECT_EQ((int)sizeof(dataSize), read(fds_data[0], &dataSize, sizeof(dataSize))); EXPECT_EQ((int)dataSize, read(fds_data[0], dataBuffer.data(), dataSize));
EXPECT_EQ(expected_data_size, (int)dataSize); // Make sure the received bytes can be parsed to an atom.
ShellData receivedAtom;
EXPECT_TRUE(receivedAtom.ParseFromArray(dataBuffer.data(), dataSize) != 0);
// then read that much data which is the atom in proto binary format // Serialize the expected atom to byte array and compare to make sure
vector<uint8_t> dataBuffer(dataSize); // they are the same.
EXPECT_EQ((int)dataSize, read(fds_data[0], dataBuffer.data(), dataSize)); vector<uint8_t> expectedAtomBuffer(expectedData.ByteSize());
expectedData.SerializeToArray(expectedAtomBuffer.data(), expectedData.ByteSize());
EXPECT_EQ(expectedAtomBuffer, dataBuffer);
// make sure the received bytes can be parsed to an atom readAtom = true;
ShellData receivedAtom; }
EXPECT_TRUE(receivedAtom.ParseFromArray(dataBuffer.data(), dataSize) != 0);
// serialze the expected atom to bytes. and compare. to make sure they are the same.
vector<uint8_t> atomBuffer(expected_data_size);
expectedData.SerializeToArray(&atomBuffer[0], expected_data_size);
EXPECT_EQ(atomBuffer, dataBuffer);
close(fds_data[0]); close(fds_data[0]);
if (reader.joinable()) { if (reader.joinable()) {
reader.join(); reader.join();
} }