Merge "Implement new perfd<->statsd ShellSubscriber comm." into rvc-dev am: a799cdba86 am: a39c329538 am: fb4eb5c24f
Change-Id: I9a96bc49c1ca5e3c23e5b3a9236c9f12adb42895
This commit is contained in:
@@ -18,6 +18,7 @@
|
||||
|
||||
#include "ShellSubscriber.h"
|
||||
|
||||
#include <android-base/file.h>
|
||||
#include "matchers/matcher_util.h"
|
||||
#include "stats_log_util.h"
|
||||
|
||||
@@ -30,154 +31,129 @@ namespace statsd {
|
||||
const static int FIELD_ID_ATOM = 1;
|
||||
|
||||
void ShellSubscriber::startNewSubscription(int in, int out, int timeoutSec) {
|
||||
VLOG("start new shell subscription");
|
||||
int64_t subscriberId = getElapsedRealtimeNs();
|
||||
int myToken = claimToken();
|
||||
mSubscriptionShouldEnd.notify_one();
|
||||
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(mMutex);
|
||||
if (mSubscriberId> 0) {
|
||||
VLOG("Only one shell subscriber is allowed.");
|
||||
return;
|
||||
}
|
||||
mSubscriberId = subscriberId;
|
||||
mInput = in;
|
||||
mOutput = out;
|
||||
shared_ptr<SubscriptionInfo> mySubscriptionInfo = make_shared<SubscriptionInfo>(in, out);
|
||||
if (!readConfig(mySubscriptionInfo)) {
|
||||
return;
|
||||
}
|
||||
|
||||
bool success = readConfig();
|
||||
if (!success) {
|
||||
std::lock_guard<std::mutex> lock(mMutex);
|
||||
cleanUpLocked();
|
||||
// critical-section
|
||||
std::unique_lock<std::mutex> lock(mMutex);
|
||||
if (myToken < mToken) {
|
||||
// Some other subscription has already come in. Stop.
|
||||
return;
|
||||
}
|
||||
mSubscriptionInfo = mySubscriptionInfo;
|
||||
|
||||
if (mySubscriptionInfo->mPulledInfo.size() > 0 && mySubscriptionInfo->mPullIntervalMin > 0) {
|
||||
// This thread terminates after it detects that mToken has changed.
|
||||
std::thread puller([this, myToken] { startPull(myToken); });
|
||||
puller.detach();
|
||||
}
|
||||
|
||||
VLOG("Wait for client to exit or timeout (%d sec)", timeoutSec);
|
||||
std::unique_lock<std::mutex> lk(mMutex);
|
||||
|
||||
// Note that the following is blocking, and it's intended as we cannot return until the shell
|
||||
// cmd exits or we time out.
|
||||
// Block until subscription has ended.
|
||||
if (timeoutSec > 0) {
|
||||
mShellDied.wait_for(lk, timeoutSec * 1s,
|
||||
[this, subscriberId] { return mSubscriberId != subscriberId; });
|
||||
mSubscriptionShouldEnd.wait_for(
|
||||
lock, timeoutSec * 1s, [this, myToken, &mySubscriptionInfo] {
|
||||
return mToken != myToken || !mySubscriptionInfo->mClientAlive;
|
||||
});
|
||||
} else {
|
||||
mShellDied.wait(lk, [this, subscriberId] { return mSubscriberId != subscriberId; });
|
||||
mSubscriptionShouldEnd.wait(lock, [this, myToken, &mySubscriptionInfo] {
|
||||
return mToken != myToken || !mySubscriptionInfo->mClientAlive;
|
||||
});
|
||||
}
|
||||
|
||||
if (mSubscriptionInfo == mySubscriptionInfo) {
|
||||
mSubscriptionInfo = nullptr;
|
||||
}
|
||||
}
|
||||
|
||||
// Atomically claim the next token. Token numbers denote subscriber ordering.
|
||||
int ShellSubscriber::claimToken() {
|
||||
std::unique_lock<std::mutex> lock(mMutex);
|
||||
int myToken = ++mToken;
|
||||
return myToken;
|
||||
}
|
||||
|
||||
// Read configs until EOF is reached. There may be multiple configs in the input
|
||||
// -- each new config should replace the previous one.
|
||||
//
|
||||
// Returns a boolean indicating whether the input was read successfully.
|
||||
bool ShellSubscriber::readConfig() {
|
||||
if (mInput < 0) {
|
||||
// Read and parse single config. There should only one config per input.
|
||||
bool ShellSubscriber::readConfig(shared_ptr<SubscriptionInfo> subscriptionInfo) {
|
||||
// Read the size of the config.
|
||||
size_t bufferSize;
|
||||
if (!android::base::ReadFully(subscriptionInfo->mInputFd, &bufferSize, sizeof(bufferSize))) {
|
||||
return false;
|
||||
}
|
||||
|
||||
while (true) {
|
||||
// Read the size of the config.
|
||||
size_t bufferSize = 0;
|
||||
ssize_t bytesRead = read(mInput, &bufferSize, sizeof(bufferSize));
|
||||
if (bytesRead == 0) {
|
||||
VLOG("We have reached the end of the input.");
|
||||
return true;
|
||||
} else if (bytesRead < 0 || (size_t)bytesRead != sizeof(bufferSize)) {
|
||||
ALOGE("Error reading config size");
|
||||
return false;
|
||||
}
|
||||
|
||||
// Read and parse the config.
|
||||
vector<uint8_t> buffer(bufferSize);
|
||||
bytesRead = read(mInput, buffer.data(), bufferSize);
|
||||
if (bytesRead > 0 && (size_t)bytesRead == bufferSize) {
|
||||
ShellSubscription config;
|
||||
if (config.ParseFromArray(buffer.data(), bufferSize)) {
|
||||
updateConfig(config);
|
||||
} else {
|
||||
ALOGE("Error parsing the config");
|
||||
return false;
|
||||
}
|
||||
} else {
|
||||
VLOG("Error reading the config, expected bytes: %zu, actual bytes: %zu", bufferSize,
|
||||
bytesRead);
|
||||
return false;
|
||||
}
|
||||
// Read the config.
|
||||
vector<uint8_t> buffer(bufferSize);
|
||||
if (!android::base::ReadFully(subscriptionInfo->mInputFd, buffer.data(), bufferSize)) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
void ShellSubscriber::updateConfig(const ShellSubscription& config) {
|
||||
mPushedMatchers.clear();
|
||||
mPulledInfo.clear();
|
||||
// Parse the config.
|
||||
ShellSubscription config;
|
||||
if (!config.ParseFromArray(buffer.data(), bufferSize)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// Update SubscriptionInfo with state from config
|
||||
for (const auto& pushed : config.pushed()) {
|
||||
mPushedMatchers.push_back(pushed);
|
||||
VLOG("adding matcher for pushed atom %d", pushed.atom_id());
|
||||
subscriptionInfo->mPushedMatchers.push_back(pushed);
|
||||
}
|
||||
|
||||
int64_t token = getElapsedRealtimeNs();
|
||||
mPullToken = token;
|
||||
|
||||
int64_t minInterval = -1;
|
||||
int minInterval = -1;
|
||||
for (const auto& pulled : config.pulled()) {
|
||||
// All intervals need to be multiples of the min interval.
|
||||
if (minInterval < 0 || pulled.freq_millis() < minInterval) {
|
||||
minInterval = pulled.freq_millis();
|
||||
}
|
||||
|
||||
mPulledInfo.emplace_back(pulled.matcher(), pulled.freq_millis());
|
||||
VLOG("adding matcher for pulled atom %d", pulled.matcher().atom_id());
|
||||
subscriptionInfo->mPulledInfo.emplace_back(pulled.matcher(), pulled.freq_millis());
|
||||
}
|
||||
subscriptionInfo->mPullIntervalMin = minInterval;
|
||||
|
||||
if (mPulledInfo.size() > 0 && minInterval > 0) {
|
||||
// This thread is guaranteed to terminate after it detects the token is
|
||||
// different.
|
||||
std::thread puller([token, minInterval, this] { startPull(token, minInterval); });
|
||||
puller.detach();
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
void ShellSubscriber::startPull(int64_t token, int64_t intervalMillis) {
|
||||
void ShellSubscriber::startPull(int64_t myToken) {
|
||||
while (true) {
|
||||
int64_t nowMillis = getElapsedRealtimeMillis();
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(mMutex);
|
||||
// If the token has changed, the config has changed, so this
|
||||
// puller can now stop.
|
||||
if (mPulledInfo.size() == 0 || mPullToken != token) {
|
||||
VLOG("Pulling thread %lld done!", (long long)token);
|
||||
return;
|
||||
}
|
||||
for (auto& pullInfo : mPulledInfo) {
|
||||
if (pullInfo.mPrevPullElapsedRealtimeMs + pullInfo.mInterval < nowMillis) {
|
||||
VLOG("pull atom %d now", pullInfo.mPullerMatcher.atom_id());
|
||||
std::lock_guard<std::mutex> lock(mMutex);
|
||||
if (!mSubscriptionInfo || mToken != myToken) {
|
||||
VLOG("Pulling thread %lld done!", (long long)myToken);
|
||||
return;
|
||||
}
|
||||
|
||||
vector<std::shared_ptr<LogEvent>> data;
|
||||
mPullerMgr->Pull(pullInfo.mPullerMatcher.atom_id(), &data);
|
||||
VLOG("pulled %zu atoms", data.size());
|
||||
if (data.size() > 0) {
|
||||
writeToOutputLocked(data, pullInfo.mPullerMatcher);
|
||||
}
|
||||
pullInfo.mPrevPullElapsedRealtimeMs = nowMillis;
|
||||
int64_t nowMillis = getElapsedRealtimeMillis();
|
||||
for (auto& pullInfo : mSubscriptionInfo->mPulledInfo) {
|
||||
if (pullInfo.mPrevPullElapsedRealtimeMs + pullInfo.mInterval < nowMillis) {
|
||||
vector<std::shared_ptr<LogEvent>> data;
|
||||
mPullerMgr->Pull(pullInfo.mPullerMatcher.atom_id(), &data);
|
||||
VLOG("pulled %zu atoms with id %d", data.size(), pullInfo.mPullerMatcher.atom_id());
|
||||
|
||||
// TODO(b/150969574): Don't write to a pipe while holding a lock.
|
||||
if (!writePulledAtomsLocked(data, pullInfo.mPullerMatcher)) {
|
||||
mSubscriptionInfo->mClientAlive = false;
|
||||
mSubscriptionShouldEnd.notify_one();
|
||||
return;
|
||||
}
|
||||
pullInfo.mPrevPullElapsedRealtimeMs = nowMillis;
|
||||
}
|
||||
}
|
||||
VLOG("Pulling thread %lld sleep....", (long long)token);
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(intervalMillis));
|
||||
|
||||
VLOG("Pulling thread %lld sleep....", (long long)myToken);
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(mSubscriptionInfo->mPullIntervalMin));
|
||||
}
|
||||
}
|
||||
|
||||
// Must be called with the lock acquired, so that mProto isn't being written to
|
||||
// at the same time by multiple threads.
|
||||
void ShellSubscriber::writeToOutputLocked(const vector<std::shared_ptr<LogEvent>>& data,
|
||||
const SimpleAtomMatcher& matcher) {
|
||||
if (mOutput < 0) {
|
||||
return;
|
||||
}
|
||||
int count = 0;
|
||||
// \return boolean indicating if writes were successful (will return false if
|
||||
// client dies)
|
||||
bool ShellSubscriber::writePulledAtomsLocked(const vector<std::shared_ptr<LogEvent>>& data,
|
||||
const SimpleAtomMatcher& matcher) {
|
||||
mProto.clear();
|
||||
int count = 0;
|
||||
for (const auto& event : data) {
|
||||
VLOG("%s", event->ToString().c_str());
|
||||
if (matchesSimple(*mUidMap, matcher, *event)) {
|
||||
VLOG("matched");
|
||||
count++;
|
||||
uint64_t atomToken = mProto.start(util::FIELD_TYPE_MESSAGE |
|
||||
util::FIELD_COUNT_REPEATED | FIELD_ID_ATOM);
|
||||
@@ -189,24 +165,29 @@ void ShellSubscriber::writeToOutputLocked(const vector<std::shared_ptr<LogEvent>
|
||||
if (count > 0) {
|
||||
// First write the payload size.
|
||||
size_t bufferSize = mProto.size();
|
||||
write(mOutput, &bufferSize, sizeof(bufferSize));
|
||||
if (!android::base::WriteFully(mSubscriptionInfo->mOutputFd, &bufferSize,
|
||||
sizeof(bufferSize))) {
|
||||
return false;
|
||||
}
|
||||
|
||||
VLOG("%d atoms, proto size: %zu", count, bufferSize);
|
||||
// Then write the payload.
|
||||
mProto.flush(mOutput);
|
||||
if (!mProto.flush(mSubscriptionInfo->mOutputFd)) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
void ShellSubscriber::onLogEvent(const LogEvent& event) {
|
||||
// Acquire a lock to prevent corruption from multiple threads writing to
|
||||
// mProto.
|
||||
std::lock_guard<std::mutex> lock(mMutex);
|
||||
if (mOutput < 0) {
|
||||
if (!mSubscriptionInfo) {
|
||||
return;
|
||||
}
|
||||
|
||||
mProto.clear();
|
||||
for (const auto& matcher : mPushedMatchers) {
|
||||
for (const auto& matcher : mSubscriptionInfo->mPushedMatchers) {
|
||||
if (matchesSimple(*mUidMap, matcher, event)) {
|
||||
VLOG("%s", event.ToString().c_str());
|
||||
uint64_t atomToken = mProto.start(util::FIELD_TYPE_MESSAGE |
|
||||
@@ -216,26 +197,23 @@ void ShellSubscriber::onLogEvent(const LogEvent& event) {
|
||||
|
||||
// First write the payload size.
|
||||
size_t bufferSize = mProto.size();
|
||||
write(mOutput, &bufferSize, sizeof(bufferSize));
|
||||
if (!android::base::WriteFully(mSubscriptionInfo->mOutputFd, &bufferSize,
|
||||
sizeof(bufferSize))) {
|
||||
mSubscriptionInfo->mClientAlive = false;
|
||||
mSubscriptionShouldEnd.notify_one();
|
||||
return;
|
||||
}
|
||||
|
||||
// Then write the payload.
|
||||
mProto.flush(mOutput);
|
||||
if (!mProto.flush(mSubscriptionInfo->mOutputFd)) {
|
||||
mSubscriptionInfo->mClientAlive = false;
|
||||
mSubscriptionShouldEnd.notify_one();
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void ShellSubscriber::cleanUpLocked() {
|
||||
// The file descriptors will be closed by binder.
|
||||
mInput = -1;
|
||||
mOutput = -1;
|
||||
mSubscriberId = 0;
|
||||
mPushedMatchers.clear();
|
||||
mPulledInfo.clear();
|
||||
// Setting mPullToken == 0 tells pull thread that its work is done.
|
||||
mPullToken = 0;
|
||||
VLOG("done clean up");
|
||||
}
|
||||
|
||||
} // namespace statsd
|
||||
} // namespace os
|
||||
} // namespace android
|
||||
|
||||
@@ -60,9 +60,6 @@ public:
|
||||
ShellSubscriber(sp<UidMap> uidMap, sp<StatsPullerManager> pullerMgr)
|
||||
: mUidMap(uidMap), mPullerMgr(pullerMgr){};
|
||||
|
||||
/**
|
||||
* Start a new subscription.
|
||||
*/
|
||||
void startNewSubscription(int inFd, int outFd, int timeoutSec);
|
||||
|
||||
void onLogEvent(const LogEvent& event);
|
||||
@@ -76,16 +73,28 @@ private:
|
||||
int64_t mInterval;
|
||||
int64_t mPrevPullElapsedRealtimeMs;
|
||||
};
|
||||
bool readConfig();
|
||||
|
||||
void updateConfig(const ShellSubscription& config);
|
||||
struct SubscriptionInfo {
|
||||
SubscriptionInfo(const int& inputFd, const int& outputFd)
|
||||
: mInputFd(inputFd), mOutputFd(outputFd), mClientAlive(true) {
|
||||
}
|
||||
|
||||
void startPull(int64_t token, int64_t intervalMillis);
|
||||
int mInputFd;
|
||||
int mOutputFd;
|
||||
std::vector<SimpleAtomMatcher> mPushedMatchers;
|
||||
std::vector<PullInfo> mPulledInfo;
|
||||
int mPullIntervalMin;
|
||||
bool mClientAlive;
|
||||
};
|
||||
|
||||
void cleanUpLocked();
|
||||
int claimToken();
|
||||
|
||||
void writeToOutputLocked(const vector<std::shared_ptr<LogEvent>>& data,
|
||||
const SimpleAtomMatcher& matcher);
|
||||
bool readConfig(std::shared_ptr<SubscriptionInfo> subscriptionInfo);
|
||||
|
||||
void startPull(int64_t myToken);
|
||||
|
||||
bool writePulledAtomsLocked(const vector<std::shared_ptr<LogEvent>>& data,
|
||||
const SimpleAtomMatcher& matcher);
|
||||
|
||||
sp<UidMap> mUidMap;
|
||||
|
||||
@@ -95,19 +104,11 @@ private:
|
||||
|
||||
mutable std::mutex mMutex;
|
||||
|
||||
std::condition_variable mShellDied; // semaphore for waiting until shell exits.
|
||||
std::condition_variable mSubscriptionShouldEnd;
|
||||
|
||||
int mInput = -1; // The input file descriptor
|
||||
std::shared_ptr<SubscriptionInfo> mSubscriptionInfo = nullptr;
|
||||
|
||||
int mOutput = -1; // The output file descriptor
|
||||
|
||||
std::vector<SimpleAtomMatcher> mPushedMatchers;
|
||||
|
||||
std::vector<PullInfo> mPulledInfo;
|
||||
|
||||
int64_t mSubscriberId = 0; // A unique id to identify a subscriber.
|
||||
|
||||
int64_t mPullToken = 0; // A unique token to identify a puller thread.
|
||||
int mToken;
|
||||
};
|
||||
|
||||
} // namespace statsd
|
||||
|
||||
@@ -19,6 +19,7 @@
|
||||
#include "frameworks/base/cmds/statsd/src/shell/shell_config.pb.h"
|
||||
#include "frameworks/base/cmds/statsd/src/shell/shell_data.pb.h"
|
||||
#include "src/shell/ShellSubscriber.h"
|
||||
#include "stats_event.h"
|
||||
#include "tests/metrics/metrics_test_helper.h"
|
||||
|
||||
#include <stdio.h>
|
||||
@@ -88,6 +89,7 @@ void runShellTest(ShellSubscription config, sp<MockUidMap> uidMap,
|
||||
// now read from the pipe. firstly read the atom size.
|
||||
size_t dataSize = 0;
|
||||
EXPECT_EQ((int)sizeof(dataSize), read(fds_data[0], &dataSize, sizeof(dataSize)));
|
||||
|
||||
EXPECT_EQ(expected_data_size, (int)dataSize);
|
||||
|
||||
// then read that much data which is the atom in proto binary format
|
||||
@@ -103,32 +105,43 @@ void runShellTest(ShellSubscription config, sp<MockUidMap> uidMap,
|
||||
expectedData.SerializeToArray(&atomBuffer[0], expected_data_size);
|
||||
EXPECT_EQ(atomBuffer, dataBuffer);
|
||||
close(fds_data[0]);
|
||||
|
||||
if (reader.joinable()) {
|
||||
reader.join();
|
||||
}
|
||||
}
|
||||
|
||||
// TODO(b/149590301): Update this test to use new socket schema.
|
||||
//TEST(ShellSubscriberTest, testPushedSubscription) {
|
||||
// sp<MockUidMap> uidMap = new NaggyMock<MockUidMap>();
|
||||
//
|
||||
// sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>();
|
||||
// vector<std::shared_ptr<LogEvent>> pushedList;
|
||||
//
|
||||
// std::shared_ptr<LogEvent> event1 =
|
||||
// std::make_shared<LogEvent>(29 /*screen_state_atom_id*/, 1000 /*timestamp*/);
|
||||
// event1->write(::android::view::DisplayStateEnum::DISPLAY_STATE_ON);
|
||||
// event1->init();
|
||||
// pushedList.push_back(event1);
|
||||
//
|
||||
// // create a simple config to get screen events
|
||||
// ShellSubscription config;
|
||||
// config.add_pushed()->set_atom_id(29);
|
||||
//
|
||||
// // this is the expected screen event atom.
|
||||
// ShellData shellData;
|
||||
// shellData.add_atom()->mutable_screen_state_changed()->set_state(
|
||||
// ::android::view::DisplayStateEnum::DISPLAY_STATE_ON);
|
||||
//
|
||||
// runShellTest(config, uidMap, pullerManager, pushedList, shellData);
|
||||
//}
|
||||
TEST(ShellSubscriberTest, testPushedSubscription) {
|
||||
sp<MockUidMap> uidMap = new NaggyMock<MockUidMap>();
|
||||
|
||||
sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>();
|
||||
vector<std::shared_ptr<LogEvent>> pushedList;
|
||||
|
||||
// Create the LogEvent from an AStatsEvent
|
||||
AStatsEvent* statsEvent = AStatsEvent_obtain();
|
||||
AStatsEvent_setAtomId(statsEvent, 29 /*screen_state_atom_id*/);
|
||||
AStatsEvent_overwriteTimestamp(statsEvent, 1000);
|
||||
AStatsEvent_writeInt32(statsEvent, ::android::view::DisplayStateEnum::DISPLAY_STATE_ON);
|
||||
AStatsEvent_build(statsEvent);
|
||||
size_t size;
|
||||
uint8_t* buffer = AStatsEvent_getBuffer(statsEvent, &size);
|
||||
std::shared_ptr<LogEvent> logEvent = std::make_shared<LogEvent>(/*uid=*/0, /*pid=*/0);
|
||||
logEvent->parseBuffer(buffer, size);
|
||||
AStatsEvent_release(statsEvent);
|
||||
|
||||
pushedList.push_back(logEvent);
|
||||
|
||||
// create a simple config to get screen events
|
||||
ShellSubscription config;
|
||||
config.add_pushed()->set_atom_id(29);
|
||||
|
||||
// this is the expected screen event atom.
|
||||
ShellData shellData;
|
||||
shellData.add_atom()->mutable_screen_state_changed()->set_state(
|
||||
::android::view::DisplayStateEnum::DISPLAY_STATE_ON);
|
||||
|
||||
runShellTest(config, uidMap, pullerManager, pushedList, shellData);
|
||||
}
|
||||
|
||||
namespace {
|
||||
|
||||
@@ -159,33 +172,38 @@ ShellSubscription getPulledConfig() {
|
||||
return config;
|
||||
}
|
||||
|
||||
shared_ptr<LogEvent> makeCpuActiveTimeAtom(int32_t uid, int64_t timeMillis) {
|
||||
AStatsEvent* statsEvent = AStatsEvent_obtain();
|
||||
AStatsEvent_setAtomId(statsEvent, 10016);
|
||||
AStatsEvent_overwriteTimestamp(statsEvent, 1111L);
|
||||
AStatsEvent_writeInt32(statsEvent, uid);
|
||||
AStatsEvent_writeInt64(statsEvent, timeMillis);
|
||||
AStatsEvent_build(statsEvent);
|
||||
|
||||
size_t size;
|
||||
uint8_t* buf = AStatsEvent_getBuffer(statsEvent, &size);
|
||||
|
||||
std::shared_ptr<LogEvent> logEvent = std::make_shared<LogEvent>(/*uid=*/0, /*pid=*/0);
|
||||
logEvent->parseBuffer(buf, size);
|
||||
return logEvent;
|
||||
}
|
||||
|
||||
} // namespace
|
||||
|
||||
// TODO(b/149590301): Update this test to use new socket schema.
|
||||
//TEST(ShellSubscriberTest, testPulledSubscription) {
|
||||
// sp<MockUidMap> uidMap = new NaggyMock<MockUidMap>();
|
||||
//
|
||||
// sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>();
|
||||
// EXPECT_CALL(*pullerManager, Pull(10016, _))
|
||||
// .WillRepeatedly(Invoke([](int tagId, vector<std::shared_ptr<LogEvent>>* data) {
|
||||
// data->clear();
|
||||
// shared_ptr<LogEvent> event = make_shared<LogEvent>(tagId, 1111L);
|
||||
// event->write(kUid1);
|
||||
// event->write(kCpuTime1);
|
||||
// event->init();
|
||||
// data->push_back(event);
|
||||
// // another event
|
||||
// event = make_shared<LogEvent>(tagId, 1111L);
|
||||
// event->write(kUid2);
|
||||
// event->write(kCpuTime2);
|
||||
// event->init();
|
||||
// data->push_back(event);
|
||||
// return true;
|
||||
// }));
|
||||
//
|
||||
// runShellTest(getPulledConfig(), uidMap, pullerManager, vector<std::shared_ptr<LogEvent>>(),
|
||||
// getExpectedShellData());
|
||||
//}
|
||||
TEST(ShellSubscriberTest, testPulledSubscription) {
|
||||
sp<MockUidMap> uidMap = new NaggyMock<MockUidMap>();
|
||||
|
||||
sp<MockStatsPullerManager> pullerManager = new StrictMock<MockStatsPullerManager>();
|
||||
EXPECT_CALL(*pullerManager, Pull(10016, _))
|
||||
.WillRepeatedly(Invoke([](int tagId, vector<std::shared_ptr<LogEvent>>* data) {
|
||||
data->clear();
|
||||
data->push_back(makeCpuActiveTimeAtom(/*uid=*/kUid1, /*timeMillis=*/kCpuTime1));
|
||||
data->push_back(makeCpuActiveTimeAtom(/*uid=*/kUid2, /*timeMillis=*/kCpuTime2));
|
||||
return true;
|
||||
}));
|
||||
runShellTest(getPulledConfig(), uidMap, pullerManager, vector<std::shared_ptr<LogEvent>>(),
|
||||
getExpectedShellData());
|
||||
}
|
||||
|
||||
#else
|
||||
GTEST_LOG_(INFO) << "This test does nothing.\n";
|
||||
|
||||
Reference in New Issue
Block a user