Merge "Implement new perfd<->statsd ShellSubscriber comm." into rvc-dev am: a799cdba86 am: a39c329538

Change-Id: I2321a85b6e4077adc14eb931924794cd7a281647
This commit is contained in:
TreeHugger Robot
2020-03-20 18:12:07 +00:00
committed by Automerger Merge Worker
3 changed files with 193 additions and 196 deletions

View File

@@ -18,6 +18,7 @@
#include "ShellSubscriber.h"
#include <android-base/file.h>
#include "matchers/matcher_util.h"
#include "stats_log_util.h"
@@ -30,154 +31,129 @@ namespace statsd {
const static int FIELD_ID_ATOM = 1;
void ShellSubscriber::startNewSubscription(int in, int out, int timeoutSec) {
VLOG("start new shell subscription");
int64_t subscriberId = getElapsedRealtimeNs();
int myToken = claimToken();
mSubscriptionShouldEnd.notify_one();
{
std::lock_guard<std::mutex> lock(mMutex);
if (mSubscriberId> 0) {
VLOG("Only one shell subscriber is allowed.");
return;
}
mSubscriberId = subscriberId;
mInput = in;
mOutput = out;
shared_ptr<SubscriptionInfo> mySubscriptionInfo = make_shared<SubscriptionInfo>(in, out);
if (!readConfig(mySubscriptionInfo)) {
return;
}
bool success = readConfig();
if (!success) {
std::lock_guard<std::mutex> lock(mMutex);
cleanUpLocked();
// critical-section
std::unique_lock<std::mutex> lock(mMutex);
if (myToken < mToken) {
// Some other subscription has already come in. Stop.
return;
}
mSubscriptionInfo = mySubscriptionInfo;
if (mySubscriptionInfo->mPulledInfo.size() > 0 && mySubscriptionInfo->mPullIntervalMin > 0) {
// This thread terminates after it detects that mToken has changed.
std::thread puller([this, myToken] { startPull(myToken); });
puller.detach();
}
VLOG("Wait for client to exit or timeout (%d sec)", timeoutSec);
std::unique_lock<std::mutex> lk(mMutex);
// Note that the following is blocking, and it's intended as we cannot return until the shell
// cmd exits or we time out.
// Block until subscription has ended.
if (timeoutSec > 0) {
mShellDied.wait_for(lk, timeoutSec * 1s,
[this, subscriberId] { return mSubscriberId != subscriberId; });
mSubscriptionShouldEnd.wait_for(
lock, timeoutSec * 1s, [this, myToken, &mySubscriptionInfo] {
return mToken != myToken || !mySubscriptionInfo->mClientAlive;
});
} else {
mShellDied.wait(lk, [this, subscriberId] { return mSubscriberId != subscriberId; });
mSubscriptionShouldEnd.wait(lock, [this, myToken, &mySubscriptionInfo] {
return mToken != myToken || !mySubscriptionInfo->mClientAlive;
});
}
if (mSubscriptionInfo == mySubscriptionInfo) {
mSubscriptionInfo = nullptr;
}
}
// Atomically claim the next token. Token numbers denote subscriber ordering.
int ShellSubscriber::claimToken() {
std::unique_lock<std::mutex> lock(mMutex);
int myToken = ++mToken;
return myToken;
}
// Read configs until EOF is reached. There may be multiple configs in the input
// -- each new config should replace the previous one.
//
// Returns a boolean indicating whether the input was read successfully.
bool ShellSubscriber::readConfig() {
if (mInput < 0) {
// Read and parse single config. There should only one config per input.
bool ShellSubscriber::readConfig(shared_ptr<SubscriptionInfo> subscriptionInfo) {
// Read the size of the config.
size_t bufferSize;
if (!android::base::ReadFully(subscriptionInfo->mInputFd, &bufferSize, sizeof(bufferSize))) {
return false;
}
while (true) {
// Read the size of the config.
size_t bufferSize = 0;
ssize_t bytesRead = read(mInput, &bufferSize, sizeof(bufferSize));
if (bytesRead == 0) {
VLOG("We have reached the end of the input.");
return true;
} else if (bytesRead < 0 || (size_t)bytesRead != sizeof(bufferSize)) {
ALOGE("Error reading config size");
return false;
}
// Read and parse the config.
vector<uint8_t> buffer(bufferSize);
bytesRead = read(mInput, buffer.data(), bufferSize);
if (bytesRead > 0 && (size_t)bytesRead == bufferSize) {
ShellSubscription config;
if (config.ParseFromArray(buffer.data(), bufferSize)) {
updateConfig(config);
} else {
ALOGE("Error parsing the config");
return false;
}
} else {
VLOG("Error reading the config, expected bytes: %zu, actual bytes: %zu", bufferSize,
bytesRead);
return false;
}
// Read the config.
vector<uint8_t> buffer(bufferSize);
if (!android::base::ReadFully(subscriptionInfo->mInputFd, buffer.data(), bufferSize)) {
return false;
}
}
void ShellSubscriber::updateConfig(const ShellSubscription& config) {
mPushedMatchers.clear();
mPulledInfo.clear();
// Parse the config.
ShellSubscription config;
if (!config.ParseFromArray(buffer.data(), bufferSize)) {
return false;
}
// Update SubscriptionInfo with state from config
for (const auto& pushed : config.pushed()) {
mPushedMatchers.push_back(pushed);
VLOG("adding matcher for pushed atom %d", pushed.atom_id());
subscriptionInfo->mPushedMatchers.push_back(pushed);
}
int64_t token = getElapsedRealtimeNs();
mPullToken = token;
int64_t minInterval = -1;
int minInterval = -1;
for (const auto& pulled : config.pulled()) {
// All intervals need to be multiples of the min interval.
if (minInterval < 0 || pulled.freq_millis() < minInterval) {
minInterval = pulled.freq_millis();
}
mPulledInfo.emplace_back(pulled.matcher(), pulled.freq_millis());
VLOG("adding matcher for pulled atom %d", pulled.matcher().atom_id());
subscriptionInfo->mPulledInfo.emplace_back(pulled.matcher(), pulled.freq_millis());
}
subscriptionInfo->mPullIntervalMin = minInterval;
if (mPulledInfo.size() > 0 && minInterval > 0) {
// This thread is guaranteed to terminate after it detects the token is
// different.
std::thread puller([token, minInterval, this] { startPull(token, minInterval); });
puller.detach();
}
return true;
}
void ShellSubscriber::startPull(int64_t token, int64_t intervalMillis) {
void ShellSubscriber::startPull(int64_t myToken) {
while (true) {
int64_t nowMillis = getElapsedRealtimeMillis();
{
std::lock_guard<std::mutex> lock(mMutex);
// If the token has changed, the config has changed, so this
// puller can now stop.
if (mPulledInfo.size() == 0 || mPullToken != token) {
VLOG("Pulling thread %lld done!", (long long)token);
return;
}
for (auto& pullInfo : mPulledInfo) {
if (pullInfo.mPrevPullElapsedRealtimeMs + pullInfo.mInterval < nowMillis) {
VLOG("pull atom %d now", pullInfo.mPullerMatcher.atom_id());
std::lock_guard<std::mutex> lock(mMutex);
if (!mSubscriptionInfo || mToken != myToken) {
VLOG("Pulling thread %lld done!", (long long)myToken);
return;
}
vector<std::shared_ptr<LogEvent>> data;
mPullerMgr->Pull(pullInfo.mPullerMatcher.atom_id(), &data);
VLOG("pulled %zu atoms", data.size());
if (data.size() > 0) {
writeToOutputLocked(data, pullInfo.mPullerMatcher);
}
pullInfo.mPrevPullElapsedRealtimeMs = nowMillis;
int64_t nowMillis = getElapsedRealtimeMillis();
for (auto& pullInfo : mSubscriptionInfo->mPulledInfo) {
if (pullInfo.mPrevPullElapsedRealtimeMs + pullInfo.mInterval < nowMillis) {
vector<std::shared_ptr<LogEvent>> data;
mPullerMgr->Pull(pullInfo.mPullerMatcher.atom_id(), &data);
VLOG("pulled %zu atoms with id %d", data.size(), pullInfo.mPullerMatcher.atom_id());
// TODO(b/150969574): Don't write to a pipe while holding a lock.
if (!writePulledAtomsLocked(data, pullInfo.mPullerMatcher)) {
mSubscriptionInfo->mClientAlive = false;
mSubscriptionShouldEnd.notify_one();
return;
}
pullInfo.mPrevPullElapsedRealtimeMs = nowMillis;
}
}
VLOG("Pulling thread %lld sleep....", (long long)token);
std::this_thread::sleep_for(std::chrono::milliseconds(intervalMillis));
VLOG("Pulling thread %lld sleep....", (long long)myToken);
std::this_thread::sleep_for(std::chrono::milliseconds(mSubscriptionInfo->mPullIntervalMin));
}
}
// Must be called with the lock acquired, so that mProto isn't being written to
// at the same time by multiple threads.
void ShellSubscriber::writeToOutputLocked(const vector<std::shared_ptr<LogEvent>>& data,
const SimpleAtomMatcher& matcher) {
if (mOutput < 0) {
return;
}
int count = 0;
// \return boolean indicating if writes were successful (will return false if
// client dies)
bool ShellSubscriber::writePulledAtomsLocked(const vector<std::shared_ptr<LogEvent>>& data,
const SimpleAtomMatcher& matcher) {
mProto.clear();
int count = 0;
for (const auto& event : data) {
VLOG("%s", event->ToString().c_str());
if (matchesSimple(*mUidMap, matcher, *event)) {
VLOG("matched");
count++;
uint64_t atomToken = mProto.start(util::FIELD_TYPE_MESSAGE |
util::FIELD_COUNT_REPEATED | FIELD_ID_ATOM);
@@ -189,24 +165,29 @@ void ShellSubscriber::writeToOutputLocked(const vector<std::shared_ptr<LogEvent>
if (count > 0) {
// First write the payload size.
size_t bufferSize = mProto.size();
write(mOutput, &bufferSize, sizeof(bufferSize));
if (!android::base::WriteFully(mSubscriptionInfo->mOutputFd, &bufferSize,
sizeof(bufferSize))) {
return false;
}
VLOG("%d atoms, proto size: %zu", count, bufferSize);
// Then write the payload.
mProto.flush(mOutput);
if (!mProto.flush(mSubscriptionInfo->mOutputFd)) {
return false;
}
}
return true;
}
void ShellSubscriber::onLogEvent(const LogEvent& event) {
// Acquire a lock to prevent corruption from multiple threads writing to
// mProto.
std::lock_guard<std::mutex> lock(mMutex);
if (mOutput < 0) {
if (!mSubscriptionInfo) {
return;
}
mProto.clear();
for (const auto& matcher : mPushedMatchers) {
for (const auto& matcher : mSubscriptionInfo->mPushedMatchers) {
if (matchesSimple(*mUidMap, matcher, event)) {
VLOG("%s", event.ToString().c_str());
uint64_t atomToken = mProto.start(util::FIELD_TYPE_MESSAGE |
@@ -216,26 +197,23 @@ void ShellSubscriber::onLogEvent(const LogEvent& event) {
// First write the payload size.
size_t bufferSize = mProto.size();
write(mOutput, &bufferSize, sizeof(bufferSize));
if (!android::base::WriteFully(mSubscriptionInfo->mOutputFd, &bufferSize,
sizeof(bufferSize))) {
mSubscriptionInfo->mClientAlive = false;
mSubscriptionShouldEnd.notify_one();
return;
}
// Then write the payload.
mProto.flush(mOutput);
if (!mProto.flush(mSubscriptionInfo->mOutputFd)) {
mSubscriptionInfo->mClientAlive = false;
mSubscriptionShouldEnd.notify_one();
return;
}
}
}
}
void ShellSubscriber::cleanUpLocked() {
// The file descriptors will be closed by binder.
mInput = -1;
mOutput = -1;
mSubscriberId = 0;
mPushedMatchers.clear();
mPulledInfo.clear();
// Setting mPullToken == 0 tells pull thread that its work is done.
mPullToken = 0;
VLOG("done clean up");
}
} // namespace statsd
} // namespace os
} // namespace android

View File

@@ -60,9 +60,6 @@ public:
ShellSubscriber(sp<UidMap> uidMap, sp<StatsPullerManager> pullerMgr)
: mUidMap(uidMap), mPullerMgr(pullerMgr){};
/**
* Start a new subscription.
*/
void startNewSubscription(int inFd, int outFd, int timeoutSec);
void onLogEvent(const LogEvent& event);
@@ -76,16 +73,28 @@ private:
int64_t mInterval;
int64_t mPrevPullElapsedRealtimeMs;
};
bool readConfig();
void updateConfig(const ShellSubscription& config);
struct SubscriptionInfo {
SubscriptionInfo(const int& inputFd, const int& outputFd)
: mInputFd(inputFd), mOutputFd(outputFd), mClientAlive(true) {
}
void startPull(int64_t token, int64_t intervalMillis);
int mInputFd;
int mOutputFd;
std::vector<SimpleAtomMatcher> mPushedMatchers;
std::vector<PullInfo> mPulledInfo;
int mPullIntervalMin;
bool mClientAlive;
};
void cleanUpLocked();
int claimToken();
void writeToOutputLocked(const vector<std::shared_ptr<LogEvent>>& data,
const SimpleAtomMatcher& matcher);
bool readConfig(std::shared_ptr<SubscriptionInfo> subscriptionInfo);
void startPull(int64_t myToken);
bool writePulledAtomsLocked(const vector<std::shared_ptr<LogEvent>>& data,
const SimpleAtomMatcher& matcher);
sp<UidMap> mUidMap;
@@ -95,19 +104,11 @@ private:
mutable std::mutex mMutex;
std::condition_variable mShellDied; // semaphore for waiting until shell exits.
std::condition_variable mSubscriptionShouldEnd;
int mInput = -1; // The input file descriptor
std::shared_ptr<SubscriptionInfo> mSubscriptionInfo = nullptr;
int mOutput = -1; // The output file descriptor
std::vector<SimpleAtomMatcher> mPushedMatchers;
std::vector<PullInfo> mPulledInfo;
int64_t mSubscriberId = 0; // A unique id to identify a subscriber.
int64_t mPullToken = 0; // A unique token to identify a puller thread.
int mToken;
};
} // namespace statsd

View File

@@ -19,6 +19,7 @@
#include "frameworks/base/cmds/statsd/src/shell/shell_config.pb.h"
#include "frameworks/base/cmds/statsd/src/shell/shell_data.pb.h"
#include "src/shell/ShellSubscriber.h"
#include "stats_event.h"
#include "tests/metrics/metrics_test_helper.h"
#include <stdio.h>
@@ -88,6 +89,7 @@ void runShellTest(ShellSubscription config, sp<MockUidMap> uidMap,
// now read from the pipe. firstly read the atom size.
size_t dataSize = 0;
EXPECT_EQ((int)sizeof(dataSize), read(fds_data[0], &dataSize, sizeof(dataSize)));
EXPECT_EQ(expected_data_size, (int)dataSize);
// then read that much data which is the atom in proto binary format
@@ -103,32 +105,43 @@ void runShellTest(ShellSubscription config, sp<MockUidMap> uidMap,
expectedData.SerializeToArray(&atomBuffer[0], expected_data_size);
EXPECT_EQ(atomBuffer, dataBuffer);
close(fds_data[0]);
if (reader.joinable()) {
reader.join();
}
}
// TODO(b/149590301): Update this test to use new socket schema.
//TEST(ShellSubscriberTest, testPushedSubscription) {
// sp<MockUidMap> uidMap = new NaggyMock<MockUidMap>();
//
// sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>();
// vector<std::shared_ptr<LogEvent>> pushedList;
//
// std::shared_ptr<LogEvent> event1 =
// std::make_shared<LogEvent>(29 /*screen_state_atom_id*/, 1000 /*timestamp*/);
// event1->write(::android::view::DisplayStateEnum::DISPLAY_STATE_ON);
// event1->init();
// pushedList.push_back(event1);
//
// // create a simple config to get screen events
// ShellSubscription config;
// config.add_pushed()->set_atom_id(29);
//
// // this is the expected screen event atom.
// ShellData shellData;
// shellData.add_atom()->mutable_screen_state_changed()->set_state(
// ::android::view::DisplayStateEnum::DISPLAY_STATE_ON);
//
// runShellTest(config, uidMap, pullerManager, pushedList, shellData);
//}
TEST(ShellSubscriberTest, testPushedSubscription) {
sp<MockUidMap> uidMap = new NaggyMock<MockUidMap>();
sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>();
vector<std::shared_ptr<LogEvent>> pushedList;
// Create the LogEvent from an AStatsEvent
AStatsEvent* statsEvent = AStatsEvent_obtain();
AStatsEvent_setAtomId(statsEvent, 29 /*screen_state_atom_id*/);
AStatsEvent_overwriteTimestamp(statsEvent, 1000);
AStatsEvent_writeInt32(statsEvent, ::android::view::DisplayStateEnum::DISPLAY_STATE_ON);
AStatsEvent_build(statsEvent);
size_t size;
uint8_t* buffer = AStatsEvent_getBuffer(statsEvent, &size);
std::shared_ptr<LogEvent> logEvent = std::make_shared<LogEvent>(/*uid=*/0, /*pid=*/0);
logEvent->parseBuffer(buffer, size);
AStatsEvent_release(statsEvent);
pushedList.push_back(logEvent);
// create a simple config to get screen events
ShellSubscription config;
config.add_pushed()->set_atom_id(29);
// this is the expected screen event atom.
ShellData shellData;
shellData.add_atom()->mutable_screen_state_changed()->set_state(
::android::view::DisplayStateEnum::DISPLAY_STATE_ON);
runShellTest(config, uidMap, pullerManager, pushedList, shellData);
}
namespace {
@@ -159,33 +172,38 @@ ShellSubscription getPulledConfig() {
return config;
}
shared_ptr<LogEvent> makeCpuActiveTimeAtom(int32_t uid, int64_t timeMillis) {
AStatsEvent* statsEvent = AStatsEvent_obtain();
AStatsEvent_setAtomId(statsEvent, 10016);
AStatsEvent_overwriteTimestamp(statsEvent, 1111L);
AStatsEvent_writeInt32(statsEvent, uid);
AStatsEvent_writeInt64(statsEvent, timeMillis);
AStatsEvent_build(statsEvent);
size_t size;
uint8_t* buf = AStatsEvent_getBuffer(statsEvent, &size);
std::shared_ptr<LogEvent> logEvent = std::make_shared<LogEvent>(/*uid=*/0, /*pid=*/0);
logEvent->parseBuffer(buf, size);
return logEvent;
}
} // namespace
// TODO(b/149590301): Update this test to use new socket schema.
//TEST(ShellSubscriberTest, testPulledSubscription) {
// sp<MockUidMap> uidMap = new NaggyMock<MockUidMap>();
//
// sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>();
// EXPECT_CALL(*pullerManager, Pull(10016, _))
// .WillRepeatedly(Invoke([](int tagId, vector<std::shared_ptr<LogEvent>>* data) {
// data->clear();
// shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, 1111L);
// event->write(kUid1);
// event->write(kCpuTime1);
// event->init();
// data->push_back(event);
// // another event
// event = make_shared<LogEvent>(tagId, 1111L);
// event->write(kUid2);
// event->write(kCpuTime2);
// event->init();
// data->push_back(event);
// return true;
// }));
//
// runShellTest(getPulledConfig(), uidMap, pullerManager, vector<std::shared_ptr<LogEvent>>(),
// getExpectedShellData());
//}
TEST(ShellSubscriberTest, testPulledSubscription) {
sp<MockUidMap> uidMap = new NaggyMock<MockUidMap>();
sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>();
EXPECT_CALL(*pullerManager, Pull(10016, _))
.WillRepeatedly(Invoke([](int tagId, vector<std::shared_ptr<LogEvent>>* data) {
data->clear();
data->push_back(makeCpuActiveTimeAtom(/*uid=*/kUid1, /*timeMillis=*/kCpuTime1));
data->push_back(makeCpuActiveTimeAtom(/*uid=*/kUid2, /*timeMillis=*/kCpuTime2));
return true;
}));
runShellTest(getPulledConfig(), uidMap, pullerManager, vector<std::shared_ptr<LogEvent>>(),
getExpectedShellData());
}
#else
GTEST_LOG_(INFO) << "This test does nothing.\n";