Merge "Merge "Healthcheck: proper job allocation and test." into rvc-dev am: c4ac9d2321 am: fb0b107b82 am: 3257d65ec1" into rvc-qpr-dev-plus-aosp
This commit is contained in:
committed by
Android (Google) Code Review
commit
d4b71d5752
@@ -268,22 +268,14 @@ IncrementalService::IncrementalService(ServiceManagerWrapper&& sm, std::string_v
|
||||
mAppOpsManager(sm.getAppOpsManager()),
|
||||
mJni(sm.getJni()),
|
||||
mLooper(sm.getLooper()),
|
||||
mTimedQueue(sm.getTimedQueue()),
|
||||
mIncrementalDir(rootDir) {
|
||||
if (!mVold) {
|
||||
LOG(FATAL) << "Vold service is unavailable";
|
||||
}
|
||||
if (!mDataLoaderManager) {
|
||||
LOG(FATAL) << "DataLoaderManagerService is unavailable";
|
||||
}
|
||||
if (!mAppOpsManager) {
|
||||
LOG(FATAL) << "AppOpsManager is unavailable";
|
||||
}
|
||||
if (!mJni) {
|
||||
LOG(FATAL) << "JNI is unavailable";
|
||||
}
|
||||
if (!mLooper) {
|
||||
LOG(FATAL) << "Looper is unavailable";
|
||||
}
|
||||
CHECK(mVold) << "Vold service is unavailable";
|
||||
CHECK(mDataLoaderManager) << "DataLoaderManagerService is unavailable";
|
||||
CHECK(mAppOpsManager) << "AppOpsManager is unavailable";
|
||||
CHECK(mJni) << "JNI is unavailable";
|
||||
CHECK(mLooper) << "Looper is unavailable";
|
||||
CHECK(mTimedQueue) << "TimedQueue is unavailable";
|
||||
|
||||
mJobQueue.reserve(16);
|
||||
mJobProcessor = std::thread([this]() {
|
||||
@@ -294,10 +286,6 @@ IncrementalService::IncrementalService(ServiceManagerWrapper&& sm, std::string_v
|
||||
mJni->initializeForCurrentThread();
|
||||
runCmdLooper();
|
||||
});
|
||||
mTimerThread = std::thread([this]() {
|
||||
mJni->initializeForCurrentThread();
|
||||
runTimers();
|
||||
});
|
||||
|
||||
const auto mountedRootNames = adoptMountedInstances();
|
||||
mountExistingImages(mountedRootNames);
|
||||
@@ -310,10 +298,8 @@ IncrementalService::~IncrementalService() {
|
||||
}
|
||||
mJobCondition.notify_all();
|
||||
mJobProcessor.join();
|
||||
mTimerCondition.notify_all();
|
||||
mTimerThread.join();
|
||||
mCmdLooperThread.join();
|
||||
mTimedJobs.clear();
|
||||
mTimedQueue->stop();
|
||||
// Ensure that mounts are destroyed while the service is still valid.
|
||||
mBindsByPath.clear();
|
||||
mMounts.clear();
|
||||
@@ -1710,53 +1696,18 @@ void IncrementalService::onAppOpChanged(const std::string& packageName) {
|
||||
}
|
||||
}
|
||||
|
||||
void IncrementalService::addTimedJob(MountId id, TimePoint when, Job what) {
|
||||
void IncrementalService::addTimedJob(MountId id, Milliseconds after, Job what) {
|
||||
if (id == kInvalidStorageId) {
|
||||
return;
|
||||
}
|
||||
{
|
||||
std::unique_lock lock(mTimerMutex);
|
||||
mTimedJobs.insert(TimedJob{id, when, std::move(what)});
|
||||
}
|
||||
mTimerCondition.notify_all();
|
||||
mTimedQueue->addJob(id, after, std::move(what));
|
||||
}
|
||||
|
||||
void IncrementalService::removeTimedJobs(MountId id) {
|
||||
if (id == kInvalidStorageId) {
|
||||
return;
|
||||
}
|
||||
{
|
||||
std::unique_lock lock(mTimerMutex);
|
||||
std::erase_if(mTimedJobs, [id](auto&& item) { return item.id == id; });
|
||||
}
|
||||
}
|
||||
|
||||
void IncrementalService::runTimers() {
|
||||
static constexpr TimePoint kInfinityTs{Clock::duration::max()};
|
||||
TimePoint nextTaskTs = kInfinityTs;
|
||||
for (;;) {
|
||||
std::unique_lock lock(mTimerMutex);
|
||||
mTimerCondition.wait_until(lock, nextTaskTs, [this]() {
|
||||
auto now = Clock::now();
|
||||
return !mRunning || (!mTimedJobs.empty() && mTimedJobs.begin()->when <= now);
|
||||
});
|
||||
if (!mRunning) {
|
||||
return;
|
||||
}
|
||||
|
||||
auto now = Clock::now();
|
||||
auto it = mTimedJobs.begin();
|
||||
// Always acquire begin(). We can't use it after unlock as mTimedJobs can change.
|
||||
for (; it != mTimedJobs.end() && it->when <= now; it = mTimedJobs.begin()) {
|
||||
auto job = it->what;
|
||||
mTimedJobs.erase(it);
|
||||
|
||||
lock.unlock();
|
||||
job();
|
||||
lock.lock();
|
||||
}
|
||||
nextTaskTs = it != mTimedJobs.end() ? it->when : kInfinityTs;
|
||||
}
|
||||
mTimedQueue->removeJobs(id);
|
||||
}
|
||||
|
||||
IncrementalService::DataLoaderStub::DataLoaderStub(IncrementalService& service, MountId id,
|
||||
@@ -2029,8 +1980,8 @@ void IncrementalService::DataLoaderStub::updateHealthStatus(bool baseline) {
|
||||
mHealthBase = {now, kernelTsUs};
|
||||
}
|
||||
|
||||
if (kernelTsUs == kMaxBootClockTsUs || mHealthBase.userTs > now ||
|
||||
mHealthBase.kernelTsUs > kernelTsUs) {
|
||||
if (kernelTsUs == kMaxBootClockTsUs || mHealthBase.kernelTsUs == kMaxBootClockTsUs ||
|
||||
mHealthBase.userTs > now) {
|
||||
LOG(DEBUG) << id() << ": No pending reads or invalid base, report Ok and wait.";
|
||||
registerForPendingReads();
|
||||
healthStatusToReport = IStorageHealthListener::HEALTH_STATUS_OK;
|
||||
@@ -2056,6 +2007,9 @@ void IncrementalService::DataLoaderStub::updateHealthStatus(bool baseline) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Don't schedule timer job less than 500ms in advance.
|
||||
static constexpr auto kTolerance = 500ms;
|
||||
|
||||
const auto blockedTimeout = std::chrono::milliseconds(mHealthCheckParams.blockedTimeoutMs);
|
||||
const auto unhealthyTimeout =
|
||||
std::chrono::milliseconds(mHealthCheckParams.unhealthyTimeoutMs);
|
||||
@@ -2065,31 +2019,28 @@ void IncrementalService::DataLoaderStub::updateHealthStatus(bool baseline) {
|
||||
|
||||
const auto kernelDeltaUs = kernelTsUs - mHealthBase.kernelTsUs;
|
||||
const auto userTs = mHealthBase.userTs + std::chrono::microseconds(kernelDeltaUs);
|
||||
const auto delta = now - userTs;
|
||||
const auto delta = std::chrono::duration_cast<std::chrono::milliseconds>(now - userTs);
|
||||
|
||||
TimePoint whenToCheckBack;
|
||||
if (delta < blockedTimeout) {
|
||||
Milliseconds checkBackAfter;
|
||||
if (delta + kTolerance < blockedTimeout) {
|
||||
LOG(DEBUG) << id() << ": Report reads pending and wait for blocked status.";
|
||||
whenToCheckBack = userTs + blockedTimeout;
|
||||
checkBackAfter = blockedTimeout - delta;
|
||||
healthStatusToReport = IStorageHealthListener::HEALTH_STATUS_READS_PENDING;
|
||||
} else if (delta < unhealthyTimeout) {
|
||||
} else if (delta + kTolerance < unhealthyTimeout) {
|
||||
LOG(DEBUG) << id() << ": Report blocked and wait for unhealthy.";
|
||||
whenToCheckBack = userTs + unhealthyTimeout;
|
||||
checkBackAfter = unhealthyTimeout - delta;
|
||||
healthStatusToReport = IStorageHealthListener::HEALTH_STATUS_BLOCKED;
|
||||
} else {
|
||||
LOG(DEBUG) << id() << ": Report unhealthy and continue monitoring.";
|
||||
whenToCheckBack = now + unhealthyMonitoring;
|
||||
checkBackAfter = unhealthyMonitoring;
|
||||
healthStatusToReport = IStorageHealthListener::HEALTH_STATUS_UNHEALTHY;
|
||||
}
|
||||
LOG(DEBUG) << id() << ": updateHealthStatus in "
|
||||
<< double(std::chrono::duration_cast<std::chrono::milliseconds>(whenToCheckBack -
|
||||
now)
|
||||
.count()) /
|
||||
1000.0
|
||||
LOG(DEBUG) << id() << ": updateHealthStatus in " << double(checkBackAfter.count()) / 1000.0
|
||||
<< "secs";
|
||||
mService.addTimedJob(id(), whenToCheckBack, [this]() { updateHealthStatus(); });
|
||||
mService.addTimedJob(id(), checkBackAfter, [this]() { updateHealthStatus(); });
|
||||
}
|
||||
|
||||
// With kTolerance we are expecting these to execute before the next update.
|
||||
if (healthStatusToReport != -1) {
|
||||
onHealthStatus(healthListener, healthStatusToReport);
|
||||
}
|
||||
@@ -2178,6 +2129,16 @@ void IncrementalService::DataLoaderStub::onDump(int fd) {
|
||||
dprintf(fd, " targetStatus: %d\n", mTargetStatus);
|
||||
dprintf(fd, " targetStatusTs: %lldmcs\n",
|
||||
(long long)(elapsedMcs(mTargetStatusTs, Clock::now())));
|
||||
dprintf(fd, " health: {\n");
|
||||
dprintf(fd, " path: %s\n", mHealthPath.c_str());
|
||||
dprintf(fd, " base: %lldmcs (%lld)\n",
|
||||
(long long)(elapsedMcs(mHealthBase.userTs, Clock::now())),
|
||||
(long long)mHealthBase.kernelTsUs);
|
||||
dprintf(fd, " blockedTimeoutMs: %d\n", int(mHealthCheckParams.blockedTimeoutMs));
|
||||
dprintf(fd, " unhealthyTimeoutMs: %d\n", int(mHealthCheckParams.unhealthyTimeoutMs));
|
||||
dprintf(fd, " unhealthyMonitoringMs: %d\n",
|
||||
int(mHealthCheckParams.unhealthyMonitoringMs));
|
||||
dprintf(fd, " }\n");
|
||||
const auto& params = mParams;
|
||||
dprintf(fd, " dataLoaderParams: {\n");
|
||||
dprintf(fd, " type: %s\n", toString(params.type).c_str());
|
||||
|
||||
@@ -56,8 +56,6 @@ using StorageId = int;
|
||||
using FileId = incfs::FileId;
|
||||
using BlockIndex = incfs::BlockIndex;
|
||||
using RawMetadata = incfs::RawMetadata;
|
||||
using Clock = std::chrono::steady_clock;
|
||||
using TimePoint = std::chrono::time_point<Clock>;
|
||||
using Seconds = std::chrono::seconds;
|
||||
using BootClockTsUs = uint64_t;
|
||||
|
||||
@@ -338,8 +336,6 @@ private:
|
||||
bool unregisterAppOpsCallback(const std::string& packageName);
|
||||
void onAppOpChanged(const std::string& packageName);
|
||||
|
||||
using Job = std::function<void()>;
|
||||
|
||||
void runJobProcessing();
|
||||
void extractZipFile(const IfsMountPtr& ifs, ZipArchiveHandle zipFile, ZipEntry& entry,
|
||||
const incfs::FileId& libFileId, std::string_view targetLibPath,
|
||||
@@ -347,9 +343,8 @@ private:
|
||||
|
||||
void runCmdLooper();
|
||||
|
||||
void addTimedJob(MountId id, TimePoint when, Job what);
|
||||
void addTimedJob(MountId id, Milliseconds after, Job what);
|
||||
void removeTimedJobs(MountId id);
|
||||
void runTimers();
|
||||
|
||||
private:
|
||||
const std::unique_ptr<VoldServiceWrapper> mVold;
|
||||
@@ -358,6 +353,7 @@ private:
|
||||
const std::unique_ptr<AppOpsManagerWrapper> mAppOpsManager;
|
||||
const std::unique_ptr<JniWrapper> mJni;
|
||||
const std::unique_ptr<LooperWrapper> mLooper;
|
||||
const std::unique_ptr<TimedQueueWrapper> mTimedQueue;
|
||||
const std::string mIncrementalDir;
|
||||
|
||||
mutable std::mutex mLock;
|
||||
@@ -380,19 +376,6 @@ private:
|
||||
std::thread mJobProcessor;
|
||||
|
||||
std::thread mCmdLooperThread;
|
||||
|
||||
struct TimedJob {
|
||||
MountId id;
|
||||
TimePoint when;
|
||||
Job what;
|
||||
friend bool operator<(const TimedJob& lhs, const TimedJob& rhs) {
|
||||
return lhs.when < rhs.when;
|
||||
}
|
||||
};
|
||||
std::set<TimedJob> mTimedJobs;
|
||||
std::condition_variable mTimerCondition;
|
||||
std::mutex mTimerMutex;
|
||||
std::thread mTimerThread;
|
||||
};
|
||||
|
||||
} // namespace android::incremental
|
||||
|
||||
@@ -25,6 +25,8 @@
|
||||
#include <binder/AppOpsManager.h>
|
||||
#include <utils/String16.h>
|
||||
|
||||
#include <thread>
|
||||
|
||||
#include "IncrementalServiceValidation.h"
|
||||
|
||||
using namespace std::literals;
|
||||
@@ -181,6 +183,88 @@ public:
|
||||
}
|
||||
};
|
||||
|
||||
static JNIEnv* getOrAttachJniEnv(JavaVM* jvm);
|
||||
|
||||
class RealTimedQueueWrapper : public TimedQueueWrapper {
|
||||
public:
|
||||
RealTimedQueueWrapper(JavaVM* jvm) {
|
||||
mThread = std::thread([this, jvm]() {
|
||||
(void)getOrAttachJniEnv(jvm);
|
||||
runTimers();
|
||||
});
|
||||
}
|
||||
~RealTimedQueueWrapper() final {
|
||||
CHECK(!mRunning) << "call stop first";
|
||||
CHECK(!mThread.joinable()) << "call stop first";
|
||||
}
|
||||
|
||||
void addJob(MountId id, Milliseconds after, Job what) final {
|
||||
const auto now = Clock::now();
|
||||
{
|
||||
std::unique_lock lock(mMutex);
|
||||
mJobs.insert(TimedJob{id, now + after, std::move(what)});
|
||||
}
|
||||
mCondition.notify_all();
|
||||
}
|
||||
void removeJobs(MountId id) final {
|
||||
std::unique_lock lock(mMutex);
|
||||
std::erase_if(mJobs, [id](auto&& item) { return item.id == id; });
|
||||
}
|
||||
void stop() final {
|
||||
{
|
||||
std::unique_lock lock(mMutex);
|
||||
mRunning = false;
|
||||
}
|
||||
mCondition.notify_all();
|
||||
mThread.join();
|
||||
mJobs.clear();
|
||||
}
|
||||
|
||||
private:
|
||||
void runTimers() {
|
||||
static constexpr TimePoint kInfinityTs{Clock::duration::max()};
|
||||
TimePoint nextJobTs = kInfinityTs;
|
||||
std::unique_lock lock(mMutex);
|
||||
for (;;) {
|
||||
mCondition.wait_until(lock, nextJobTs, [this, nextJobTs]() {
|
||||
const auto now = Clock::now();
|
||||
const auto firstJobTs = !mJobs.empty() ? mJobs.begin()->when : kInfinityTs;
|
||||
return !mRunning || firstJobTs <= now || firstJobTs < nextJobTs;
|
||||
});
|
||||
if (!mRunning) {
|
||||
return;
|
||||
}
|
||||
|
||||
const auto now = Clock::now();
|
||||
auto it = mJobs.begin();
|
||||
// Always acquire begin(). We can't use it after unlock as mTimedJobs can change.
|
||||
for (; it != mJobs.end() && it->when <= now; it = mJobs.begin()) {
|
||||
auto job = std::move(it->what);
|
||||
mJobs.erase(it);
|
||||
|
||||
lock.unlock();
|
||||
job();
|
||||
lock.lock();
|
||||
}
|
||||
nextJobTs = it != mJobs.end() ? it->when : kInfinityTs;
|
||||
}
|
||||
}
|
||||
|
||||
struct TimedJob {
|
||||
MountId id;
|
||||
TimePoint when;
|
||||
Job what;
|
||||
friend bool operator<(const TimedJob& lhs, const TimedJob& rhs) {
|
||||
return lhs.when < rhs.when;
|
||||
}
|
||||
};
|
||||
bool mRunning = true;
|
||||
std::set<TimedJob> mJobs;
|
||||
std::condition_variable mCondition;
|
||||
std::mutex mMutex;
|
||||
std::thread mThread;
|
||||
};
|
||||
|
||||
RealServiceManager::RealServiceManager(sp<IServiceManager> serviceManager, JNIEnv* env)
|
||||
: mServiceManager(std::move(serviceManager)), mJvm(RealJniWrapper::getJvm(env)) {}
|
||||
|
||||
@@ -228,6 +312,10 @@ std::unique_ptr<LooperWrapper> RealServiceManager::getLooper() {
|
||||
return std::make_unique<RealLooperWrapper>();
|
||||
}
|
||||
|
||||
std::unique_ptr<TimedQueueWrapper> RealServiceManager::getTimedQueue() {
|
||||
return std::make_unique<RealTimedQueueWrapper>(mJvm);
|
||||
}
|
||||
|
||||
static JavaVM* getJavaVm(JNIEnv* env) {
|
||||
CHECK(env);
|
||||
JavaVM* jvm = nullptr;
|
||||
|
||||
@@ -35,6 +35,11 @@
|
||||
|
||||
namespace android::incremental {
|
||||
|
||||
using Clock = std::chrono::steady_clock;
|
||||
using TimePoint = std::chrono::time_point<Clock>;
|
||||
using Milliseconds = std::chrono::milliseconds;
|
||||
using Job = std::function<void()>;
|
||||
|
||||
// --- Wrapper interfaces ---
|
||||
|
||||
using MountId = int32_t;
|
||||
@@ -121,6 +126,14 @@ public:
|
||||
virtual int pollAll(int timeoutMillis) = 0;
|
||||
};
|
||||
|
||||
class TimedQueueWrapper {
|
||||
public:
|
||||
virtual ~TimedQueueWrapper() = default;
|
||||
virtual void addJob(MountId id, Milliseconds after, Job what) = 0;
|
||||
virtual void removeJobs(MountId id) = 0;
|
||||
virtual void stop() = 0;
|
||||
};
|
||||
|
||||
class ServiceManagerWrapper {
|
||||
public:
|
||||
virtual ~ServiceManagerWrapper() = default;
|
||||
@@ -130,6 +143,7 @@ public:
|
||||
virtual std::unique_ptr<AppOpsManagerWrapper> getAppOpsManager() = 0;
|
||||
virtual std::unique_ptr<JniWrapper> getJni() = 0;
|
||||
virtual std::unique_ptr<LooperWrapper> getLooper() = 0;
|
||||
virtual std::unique_ptr<TimedQueueWrapper> getTimedQueue() = 0;
|
||||
};
|
||||
|
||||
// --- Real stuff ---
|
||||
@@ -144,6 +158,7 @@ public:
|
||||
std::unique_ptr<AppOpsManagerWrapper> getAppOpsManager() final;
|
||||
std::unique_ptr<JniWrapper> getJni() final;
|
||||
std::unique_ptr<LooperWrapper> getLooper() final;
|
||||
std::unique_ptr<TimedQueueWrapper> getTimedQueue() final;
|
||||
|
||||
private:
|
||||
template <class INTERFACE>
|
||||
|
||||
@@ -22,6 +22,7 @@
|
||||
#include <gtest/gtest.h>
|
||||
#include <utils/Log.h>
|
||||
|
||||
#include <chrono>
|
||||
#include <future>
|
||||
|
||||
#include "IncrementalService.h"
|
||||
@@ -295,9 +296,21 @@ public:
|
||||
void openMountSuccess() {
|
||||
ON_CALL(*this, openMount(_)).WillByDefault(Invoke(this, &MockIncFs::openMountForHealth));
|
||||
}
|
||||
void waitForPendingReadsSuccess() {
|
||||
|
||||
// 1000ms
|
||||
void waitForPendingReadsSuccess(uint64_t ts = 0) {
|
||||
ON_CALL(*this, waitForPendingReads(_, _, _))
|
||||
.WillByDefault(Invoke(this, &MockIncFs::waitForPendingReadsForHealth));
|
||||
.WillByDefault(
|
||||
Invoke([ts](const Control& control, std::chrono::milliseconds timeout,
|
||||
std::vector<incfs::ReadInfo>* pendingReadsBuffer) {
|
||||
pendingReadsBuffer->push_back({.bootClockTsUs = ts});
|
||||
return android::incfs::WaitResult::HaveData;
|
||||
}));
|
||||
}
|
||||
|
||||
void waitForPendingReadsTimeout() {
|
||||
ON_CALL(*this, waitForPendingReads(_, _, _))
|
||||
.WillByDefault(Return(android::incfs::WaitResult::Timeout));
|
||||
}
|
||||
|
||||
static constexpr auto kPendingReadsFd = 42;
|
||||
@@ -305,13 +318,6 @@ public:
|
||||
return UniqueControl(IncFs_CreateControl(-1, kPendingReadsFd, -1));
|
||||
}
|
||||
|
||||
WaitResult waitForPendingReadsForHealth(
|
||||
const Control& control, std::chrono::milliseconds timeout,
|
||||
std::vector<incfs::ReadInfo>* pendingReadsBuffer) const {
|
||||
pendingReadsBuffer->push_back({.bootClockTsUs = 0});
|
||||
return android::incfs::WaitResult::HaveData;
|
||||
}
|
||||
|
||||
RawMetadata getMountInfoMetadata(const Control& control, std::string_view path) {
|
||||
metadata::Mount m;
|
||||
m.mutable_storage()->set_id(100);
|
||||
@@ -371,7 +377,7 @@ class MockJniWrapper : public JniWrapper {
|
||||
public:
|
||||
MOCK_CONST_METHOD0(initializeForCurrentThread, void());
|
||||
|
||||
MockJniWrapper() { EXPECT_CALL(*this, initializeForCurrentThread()).Times(3); }
|
||||
MockJniWrapper() { EXPECT_CALL(*this, initializeForCurrentThread()).Times(2); }
|
||||
};
|
||||
|
||||
class MockLooperWrapper : public LooperWrapper {
|
||||
@@ -385,7 +391,7 @@ public:
|
||||
ON_CALL(*this, addFd(_, _, _, _, _))
|
||||
.WillByDefault(Invoke(this, &MockLooperWrapper::storeCallback));
|
||||
ON_CALL(*this, removeFd(_)).WillByDefault(Invoke(this, &MockLooperWrapper::clearCallback));
|
||||
ON_CALL(*this, pollAll(_)).WillByDefault(Invoke(this, &MockLooperWrapper::sleepFor));
|
||||
ON_CALL(*this, pollAll(_)).WillByDefault(Invoke(this, &MockLooperWrapper::wait10Ms));
|
||||
}
|
||||
|
||||
int storeCallback(int, int, int, android::Looper_callbackFunc callback, void* data) {
|
||||
@@ -400,8 +406,10 @@ public:
|
||||
return 0;
|
||||
}
|
||||
|
||||
int sleepFor(int timeoutMillis) {
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(timeoutMillis));
|
||||
int wait10Ms(int) {
|
||||
// This is called from a loop in runCmdLooper.
|
||||
// Sleeping for 10ms only to avoid busy looping.
|
||||
std::this_thread::sleep_for(10ms);
|
||||
return 0;
|
||||
}
|
||||
|
||||
@@ -409,6 +417,55 @@ public:
|
||||
void* mCallbackData = nullptr;
|
||||
};
|
||||
|
||||
class MockTimedQueueWrapper : public TimedQueueWrapper {
|
||||
public:
|
||||
MOCK_METHOD3(addJob, void(MountId, Milliseconds, Job));
|
||||
MOCK_METHOD1(removeJobs, void(MountId));
|
||||
MOCK_METHOD0(stop, void());
|
||||
|
||||
MockTimedQueueWrapper() {
|
||||
ON_CALL(*this, addJob(_, _, _))
|
||||
.WillByDefault(Invoke(this, &MockTimedQueueWrapper::storeJob));
|
||||
ON_CALL(*this, removeJobs(_)).WillByDefault(Invoke(this, &MockTimedQueueWrapper::clearJob));
|
||||
}
|
||||
|
||||
void storeJob(MountId id, Milliseconds after, Job what) {
|
||||
mId = id;
|
||||
mAfter = after;
|
||||
mWhat = std::move(what);
|
||||
}
|
||||
|
||||
void clearJob(MountId id) {
|
||||
if (mId == id) {
|
||||
mAfter = {};
|
||||
mWhat = {};
|
||||
}
|
||||
}
|
||||
|
||||
MountId mId = -1;
|
||||
Milliseconds mAfter;
|
||||
Job mWhat;
|
||||
};
|
||||
|
||||
class MockStorageHealthListener : public os::incremental::BnStorageHealthListener {
|
||||
public:
|
||||
MOCK_METHOD2(onHealthStatus, binder::Status(int32_t storageId, int32_t status));
|
||||
|
||||
MockStorageHealthListener() {
|
||||
ON_CALL(*this, onHealthStatus(_, _))
|
||||
.WillByDefault(Invoke(this, &MockStorageHealthListener::storeStorageIdAndStatus));
|
||||
}
|
||||
|
||||
binder::Status storeStorageIdAndStatus(int32_t storageId, int32_t status) {
|
||||
mStorageId = storageId;
|
||||
mStatus = status;
|
||||
return binder::Status::ok();
|
||||
}
|
||||
|
||||
int32_t mStorageId = -1;
|
||||
int32_t mStatus = -1;
|
||||
};
|
||||
|
||||
class MockServiceManager : public ServiceManagerWrapper {
|
||||
public:
|
||||
MockServiceManager(std::unique_ptr<MockVoldService> vold,
|
||||
@@ -416,13 +473,15 @@ public:
|
||||
std::unique_ptr<MockIncFs> incfs,
|
||||
std::unique_ptr<MockAppOpsManager> appOpsManager,
|
||||
std::unique_ptr<MockJniWrapper> jni,
|
||||
std::unique_ptr<MockLooperWrapper> looper)
|
||||
std::unique_ptr<MockLooperWrapper> looper,
|
||||
std::unique_ptr<MockTimedQueueWrapper> timedQueue)
|
||||
: mVold(std::move(vold)),
|
||||
mDataLoaderManager(std::move(dataLoaderManager)),
|
||||
mIncFs(std::move(incfs)),
|
||||
mAppOpsManager(std::move(appOpsManager)),
|
||||
mJni(std::move(jni)),
|
||||
mLooper(std::move(looper)) {}
|
||||
mLooper(std::move(looper)),
|
||||
mTimedQueue(std::move(timedQueue)) {}
|
||||
std::unique_ptr<VoldServiceWrapper> getVoldService() final { return std::move(mVold); }
|
||||
std::unique_ptr<DataLoaderManagerWrapper> getDataLoaderManager() final {
|
||||
return std::move(mDataLoaderManager);
|
||||
@@ -431,6 +490,7 @@ public:
|
||||
std::unique_ptr<AppOpsManagerWrapper> getAppOpsManager() final { return std::move(mAppOpsManager); }
|
||||
std::unique_ptr<JniWrapper> getJni() final { return std::move(mJni); }
|
||||
std::unique_ptr<LooperWrapper> getLooper() final { return std::move(mLooper); }
|
||||
std::unique_ptr<TimedQueueWrapper> getTimedQueue() final { return std::move(mTimedQueue); }
|
||||
|
||||
private:
|
||||
std::unique_ptr<MockVoldService> mVold;
|
||||
@@ -439,6 +499,7 @@ private:
|
||||
std::unique_ptr<MockAppOpsManager> mAppOpsManager;
|
||||
std::unique_ptr<MockJniWrapper> mJni;
|
||||
std::unique_ptr<MockLooperWrapper> mLooper;
|
||||
std::unique_ptr<MockTimedQueueWrapper> mTimedQueue;
|
||||
};
|
||||
|
||||
// --- IncrementalServiceTest ---
|
||||
@@ -460,6 +521,8 @@ public:
|
||||
mJni = jni.get();
|
||||
auto looper = std::make_unique<NiceMock<MockLooperWrapper>>();
|
||||
mLooper = looper.get();
|
||||
auto timedQueue = std::make_unique<NiceMock<MockTimedQueueWrapper>>();
|
||||
mTimedQueue = timedQueue.get();
|
||||
mIncrementalService =
|
||||
std::make_unique<IncrementalService>(MockServiceManager(std::move(vold),
|
||||
std::move(
|
||||
@@ -467,7 +530,8 @@ public:
|
||||
std::move(incFs),
|
||||
std::move(appOps),
|
||||
std::move(jni),
|
||||
std::move(looper)),
|
||||
std::move(looper),
|
||||
std::move(timedQueue)),
|
||||
mRootDir.path);
|
||||
mDataLoaderParcel.packageName = "com.test";
|
||||
mDataLoaderParcel.arguments = "uri";
|
||||
@@ -503,6 +567,7 @@ protected:
|
||||
NiceMock<MockAppOpsManager>* mAppOpsManager = nullptr;
|
||||
NiceMock<MockJniWrapper>* mJni = nullptr;
|
||||
NiceMock<MockLooperWrapper>* mLooper = nullptr;
|
||||
NiceMock<MockTimedQueueWrapper>* mTimedQueue = nullptr;
|
||||
NiceMock<MockDataLoader>* mDataLoader = nullptr;
|
||||
std::unique_ptr<IncrementalService> mIncrementalService;
|
||||
TemporaryDir mRootDir;
|
||||
@@ -710,6 +775,136 @@ TEST_F(IncrementalServiceTest, testStartDataLoaderRecreateOnPendingReads) {
|
||||
mLooper->mCallback(-1, -1, mLooper->mCallbackData);
|
||||
}
|
||||
|
||||
TEST_F(IncrementalServiceTest, testStartDataLoaderUnhealthyStorage) {
|
||||
mVold->mountIncFsSuccess();
|
||||
mIncFs->makeFileSuccess();
|
||||
mIncFs->openMountSuccess();
|
||||
mVold->bindMountSuccess();
|
||||
mDataLoaderManager->bindToDataLoaderSuccess();
|
||||
mDataLoaderManager->getDataLoaderSuccess();
|
||||
EXPECT_CALL(*mDataLoaderManager, bindToDataLoader(_, _, _, _)).Times(1);
|
||||
EXPECT_CALL(*mDataLoaderManager, unbindFromDataLoader(_)).Times(1);
|
||||
EXPECT_CALL(*mDataLoader, create(_, _, _, _)).Times(1);
|
||||
EXPECT_CALL(*mDataLoader, start(_)).Times(1);
|
||||
EXPECT_CALL(*mDataLoader, destroy(_)).Times(1);
|
||||
EXPECT_CALL(*mVold, unmountIncFs(_)).Times(2);
|
||||
EXPECT_CALL(*mLooper, addFd(MockIncFs::kPendingReadsFd, _, _, _, _)).Times(2);
|
||||
EXPECT_CALL(*mLooper, removeFd(MockIncFs::kPendingReadsFd)).Times(2);
|
||||
EXPECT_CALL(*mTimedQueue, addJob(_, _, _)).Times(4);
|
||||
|
||||
sp<NiceMock<MockStorageHealthListener>> listener{new NiceMock<MockStorageHealthListener>};
|
||||
NiceMock<MockStorageHealthListener>* listenerMock = listener.get();
|
||||
EXPECT_CALL(*listenerMock, onHealthStatus(_, IStorageHealthListener::HEALTH_STATUS_OK))
|
||||
.Times(2);
|
||||
EXPECT_CALL(*listenerMock,
|
||||
onHealthStatus(_, IStorageHealthListener::HEALTH_STATUS_READS_PENDING))
|
||||
.Times(1);
|
||||
EXPECT_CALL(*listenerMock, onHealthStatus(_, IStorageHealthListener::HEALTH_STATUS_BLOCKED))
|
||||
.Times(1);
|
||||
EXPECT_CALL(*listenerMock, onHealthStatus(_, IStorageHealthListener::HEALTH_STATUS_UNHEALTHY))
|
||||
.Times(2);
|
||||
|
||||
StorageHealthCheckParams params;
|
||||
params.blockedTimeoutMs = 10000;
|
||||
params.unhealthyTimeoutMs = 20000;
|
||||
params.unhealthyMonitoringMs = 30000;
|
||||
|
||||
using MS = std::chrono::milliseconds;
|
||||
using MCS = std::chrono::microseconds;
|
||||
|
||||
const auto blockedTimeout = MS(params.blockedTimeoutMs);
|
||||
const auto unhealthyTimeout = MS(params.unhealthyTimeoutMs);
|
||||
const auto unhealthyMonitoring = MS(params.unhealthyMonitoringMs);
|
||||
|
||||
const uint64_t kFirstTimestampUs = 1000000000ll;
|
||||
const uint64_t kBlockedTimestampUs =
|
||||
kFirstTimestampUs - std::chrono::duration_cast<MCS>(blockedTimeout).count();
|
||||
const uint64_t kUnhealthyTimestampUs =
|
||||
kFirstTimestampUs - std::chrono::duration_cast<MCS>(unhealthyTimeout).count();
|
||||
|
||||
TemporaryDir tempDir;
|
||||
int storageId = mIncrementalService->createStorage(tempDir.path, std::move(mDataLoaderParcel),
|
||||
IncrementalService::CreateOptions::CreateNew,
|
||||
{}, std::move(params), listener);
|
||||
ASSERT_GE(storageId, 0);
|
||||
|
||||
// Healthy state, registered for pending reads.
|
||||
ASSERT_NE(nullptr, mLooper->mCallback);
|
||||
ASSERT_NE(nullptr, mLooper->mCallbackData);
|
||||
ASSERT_EQ(storageId, listener->mStorageId);
|
||||
ASSERT_EQ(IStorageHealthListener::HEALTH_STATUS_OK, listener->mStatus);
|
||||
|
||||
// Looper/epoll callback.
|
||||
mIncFs->waitForPendingReadsSuccess(kFirstTimestampUs);
|
||||
mLooper->mCallback(-1, -1, mLooper->mCallbackData);
|
||||
|
||||
// Unregister from pending reads and wait.
|
||||
ASSERT_EQ(nullptr, mLooper->mCallback);
|
||||
ASSERT_EQ(nullptr, mLooper->mCallbackData);
|
||||
ASSERT_EQ(storageId, listener->mStorageId);
|
||||
ASSERT_EQ(IStorageHealthListener::HEALTH_STATUS_READS_PENDING, listener->mStatus);
|
||||
// Timed callback present.
|
||||
ASSERT_EQ(storageId, mTimedQueue->mId);
|
||||
ASSERT_GE(mTimedQueue->mAfter, blockedTimeout);
|
||||
auto timedCallback = mTimedQueue->mWhat;
|
||||
mTimedQueue->clearJob(storageId);
|
||||
|
||||
// Timed job callback for blocked.
|
||||
mIncFs->waitForPendingReadsSuccess(kBlockedTimestampUs);
|
||||
timedCallback();
|
||||
|
||||
// Still not registered, and blocked.
|
||||
ASSERT_EQ(nullptr, mLooper->mCallback);
|
||||
ASSERT_EQ(nullptr, mLooper->mCallbackData);
|
||||
ASSERT_EQ(storageId, listener->mStorageId);
|
||||
ASSERT_EQ(IStorageHealthListener::HEALTH_STATUS_BLOCKED, listener->mStatus);
|
||||
// Timed callback present.
|
||||
ASSERT_EQ(storageId, mTimedQueue->mId);
|
||||
ASSERT_GE(mTimedQueue->mAfter, 1000ms);
|
||||
timedCallback = mTimedQueue->mWhat;
|
||||
mTimedQueue->clearJob(storageId);
|
||||
|
||||
// Timed job callback for unhealthy.
|
||||
mIncFs->waitForPendingReadsSuccess(kUnhealthyTimestampUs);
|
||||
timedCallback();
|
||||
|
||||
// Still not registered, and blocked.
|
||||
ASSERT_EQ(nullptr, mLooper->mCallback);
|
||||
ASSERT_EQ(nullptr, mLooper->mCallbackData);
|
||||
ASSERT_EQ(storageId, listener->mStorageId);
|
||||
ASSERT_EQ(IStorageHealthListener::HEALTH_STATUS_UNHEALTHY, listener->mStatus);
|
||||
// Timed callback present.
|
||||
ASSERT_EQ(storageId, mTimedQueue->mId);
|
||||
ASSERT_GE(mTimedQueue->mAfter, unhealthyMonitoring);
|
||||
timedCallback = mTimedQueue->mWhat;
|
||||
mTimedQueue->clearJob(storageId);
|
||||
|
||||
// One more unhealthy.
|
||||
mIncFs->waitForPendingReadsSuccess(kUnhealthyTimestampUs);
|
||||
timedCallback();
|
||||
|
||||
// Still not registered, and blocked.
|
||||
ASSERT_EQ(nullptr, mLooper->mCallback);
|
||||
ASSERT_EQ(nullptr, mLooper->mCallbackData);
|
||||
ASSERT_EQ(storageId, listener->mStorageId);
|
||||
ASSERT_EQ(IStorageHealthListener::HEALTH_STATUS_UNHEALTHY, listener->mStatus);
|
||||
// Timed callback present.
|
||||
ASSERT_EQ(storageId, mTimedQueue->mId);
|
||||
ASSERT_GE(mTimedQueue->mAfter, unhealthyMonitoring);
|
||||
timedCallback = mTimedQueue->mWhat;
|
||||
mTimedQueue->clearJob(storageId);
|
||||
|
||||
// And now healthy.
|
||||
mIncFs->waitForPendingReadsTimeout();
|
||||
timedCallback();
|
||||
|
||||
// Healthy state, registered for pending reads.
|
||||
ASSERT_NE(nullptr, mLooper->mCallback);
|
||||
ASSERT_NE(nullptr, mLooper->mCallbackData);
|
||||
ASSERT_EQ(storageId, listener->mStorageId);
|
||||
ASSERT_EQ(IStorageHealthListener::HEALTH_STATUS_OK, listener->mStatus);
|
||||
}
|
||||
|
||||
TEST_F(IncrementalServiceTest, testSetIncFsMountOptionsSuccess) {
|
||||
mVold->mountIncFsSuccess();
|
||||
mIncFs->makeFileSuccess();
|
||||
|
||||
Reference in New Issue
Block a user