Merge "Lifecycle: detecting blocked and unhealthy, part 2." into rvc-dev am: 825ad11167 am: 03c17047f0
Change-Id: I5e162388392ae8dfe6b507e111590408c124465a
This commit is contained in:
@@ -294,6 +294,10 @@ IncrementalService::IncrementalService(ServiceManagerWrapper&& sm, std::string_v
|
||||
mJni->initializeForCurrentThread();
|
||||
runCmdLooper();
|
||||
});
|
||||
mTimerThread = std::thread([this]() {
|
||||
mJni->initializeForCurrentThread();
|
||||
runTimers();
|
||||
});
|
||||
|
||||
const auto mountedRootNames = adoptMountedInstances();
|
||||
mountExistingImages(mountedRootNames);
|
||||
@@ -306,7 +310,13 @@ IncrementalService::~IncrementalService() {
|
||||
}
|
||||
mJobCondition.notify_all();
|
||||
mJobProcessor.join();
|
||||
mTimerCondition.notify_all();
|
||||
mTimerThread.join();
|
||||
mCmdLooperThread.join();
|
||||
mTimedJobs.clear();
|
||||
// Ensure that mounts are destroyed while the service is still valid.
|
||||
mBindsByPath.clear();
|
||||
mMounts.clear();
|
||||
}
|
||||
|
||||
static const char* toString(IncrementalService::BindKind kind) {
|
||||
@@ -1700,6 +1710,55 @@ void IncrementalService::onAppOpChanged(const std::string& packageName) {
|
||||
}
|
||||
}
|
||||
|
||||
void IncrementalService::addTimedJob(MountId id, TimePoint when, Job what) {
|
||||
if (id == kInvalidStorageId) {
|
||||
return;
|
||||
}
|
||||
{
|
||||
std::unique_lock lock(mTimerMutex);
|
||||
mTimedJobs.insert(TimedJob{id, when, std::move(what)});
|
||||
}
|
||||
mTimerCondition.notify_all();
|
||||
}
|
||||
|
||||
void IncrementalService::removeTimedJobs(MountId id) {
|
||||
if (id == kInvalidStorageId) {
|
||||
return;
|
||||
}
|
||||
{
|
||||
std::unique_lock lock(mTimerMutex);
|
||||
std::erase_if(mTimedJobs, [id](auto&& item) { return item.id == id; });
|
||||
}
|
||||
}
|
||||
|
||||
void IncrementalService::runTimers() {
|
||||
static constexpr TimePoint kInfinityTs{Clock::duration::max()};
|
||||
TimePoint nextTaskTs = kInfinityTs;
|
||||
for (;;) {
|
||||
std::unique_lock lock(mTimerMutex);
|
||||
mTimerCondition.wait_until(lock, nextTaskTs, [this]() {
|
||||
auto now = Clock::now();
|
||||
return !mRunning || (!mTimedJobs.empty() && mTimedJobs.begin()->when <= now);
|
||||
});
|
||||
if (!mRunning) {
|
||||
return;
|
||||
}
|
||||
|
||||
auto now = Clock::now();
|
||||
auto it = mTimedJobs.begin();
|
||||
// Always acquire begin(). We can't use it after unlock as mTimedJobs can change.
|
||||
for (; it != mTimedJobs.end() && it->when <= now; it = mTimedJobs.begin()) {
|
||||
auto job = it->what;
|
||||
mTimedJobs.erase(it);
|
||||
|
||||
lock.unlock();
|
||||
job();
|
||||
lock.lock();
|
||||
}
|
||||
nextTaskTs = it != mTimedJobs.end() ? it->when : kInfinityTs;
|
||||
}
|
||||
}
|
||||
|
||||
IncrementalService::DataLoaderStub::DataLoaderStub(IncrementalService& service, MountId id,
|
||||
DataLoaderParamsParcel&& params,
|
||||
FileSystemControlParcel&& control,
|
||||
@@ -1713,10 +1772,17 @@ IncrementalService::DataLoaderStub::DataLoaderStub(IncrementalService& service,
|
||||
mControl(std::move(control)),
|
||||
mStatusListener(statusListener ? *statusListener : DataLoaderStatusListener()),
|
||||
mHealthListener(healthListener ? *healthListener : StorageHealthListener()),
|
||||
mHealthPath(std::move(healthPath)) {
|
||||
// TODO(b/153874006): enable external health listener.
|
||||
mHealthListener = {};
|
||||
healthStatusOk();
|
||||
mHealthPath(std::move(healthPath)),
|
||||
mHealthCheckParams(std::move(healthCheckParams)) {
|
||||
if (mHealthListener) {
|
||||
if (!isHealthParamsValid()) {
|
||||
mHealthListener = {};
|
||||
}
|
||||
} else {
|
||||
// Disable advanced health check statuses.
|
||||
mHealthCheckParams.blockedTimeoutMs = -1;
|
||||
}
|
||||
updateHealthStatus();
|
||||
}
|
||||
|
||||
IncrementalService::DataLoaderStub::~DataLoaderStub() {
|
||||
@@ -1726,21 +1792,29 @@ IncrementalService::DataLoaderStub::~DataLoaderStub() {
|
||||
}
|
||||
|
||||
void IncrementalService::DataLoaderStub::cleanupResources() {
|
||||
auto now = Clock::now();
|
||||
{
|
||||
std::unique_lock lock(mMutex);
|
||||
mHealthPath.clear();
|
||||
unregisterFromPendingReads();
|
||||
resetHealthControl();
|
||||
mService.removeTimedJobs(mId);
|
||||
}
|
||||
|
||||
requestDestroy();
|
||||
|
||||
auto now = Clock::now();
|
||||
std::unique_lock lock(mMutex);
|
||||
|
||||
unregisterFromPendingReads();
|
||||
|
||||
mParams = {};
|
||||
mControl = {};
|
||||
mStatusCondition.wait_until(lock, now + 60s, [this] {
|
||||
return mCurrentStatus == IDataLoaderStatusListener::DATA_LOADER_DESTROYED;
|
||||
});
|
||||
mStatusListener = {};
|
||||
mHealthListener = {};
|
||||
mId = kInvalidStorageId;
|
||||
{
|
||||
std::unique_lock lock(mMutex);
|
||||
mParams = {};
|
||||
mControl = {};
|
||||
mHealthControl = {};
|
||||
mHealthListener = {};
|
||||
mStatusCondition.wait_until(lock, now + 60s, [this] {
|
||||
return mCurrentStatus == IDataLoaderStatusListener::DATA_LOADER_DESTROYED;
|
||||
});
|
||||
mStatusListener = {};
|
||||
mId = kInvalidStorageId;
|
||||
}
|
||||
}
|
||||
|
||||
sp<content::pm::IDataLoader> IncrementalService::DataLoaderStub::getDataLoader() {
|
||||
@@ -1838,7 +1912,7 @@ bool IncrementalService::DataLoaderStub::fsmStep() {
|
||||
targetStatus = mTargetStatus;
|
||||
}
|
||||
|
||||
LOG(DEBUG) << "fsmStep: " << mId << ": " << currentStatus << " -> " << targetStatus;
|
||||
LOG(DEBUG) << "fsmStep: " << id() << ": " << currentStatus << " -> " << targetStatus;
|
||||
|
||||
if (currentStatus == targetStatus) {
|
||||
return true;
|
||||
@@ -1920,42 +1994,167 @@ binder::Status IncrementalService::DataLoaderStub::onStatusChanged(MountId mount
|
||||
return binder::Status::ok();
|
||||
}
|
||||
|
||||
void IncrementalService::DataLoaderStub::healthStatusOk() {
|
||||
LOG(DEBUG) << "healthStatusOk: " << mId;
|
||||
std::unique_lock lock(mMutex);
|
||||
registerForPendingReads();
|
||||
bool IncrementalService::DataLoaderStub::isHealthParamsValid() const {
|
||||
return mHealthCheckParams.blockedTimeoutMs > 0 &&
|
||||
mHealthCheckParams.blockedTimeoutMs < mHealthCheckParams.unhealthyTimeoutMs;
|
||||
}
|
||||
|
||||
void IncrementalService::DataLoaderStub::healthStatusReadsPending() {
|
||||
LOG(DEBUG) << "healthStatusReadsPending: " << mId;
|
||||
requestStart();
|
||||
|
||||
std::unique_lock lock(mMutex);
|
||||
unregisterFromPendingReads();
|
||||
void IncrementalService::DataLoaderStub::onHealthStatus(StorageHealthListener healthListener,
|
||||
int healthStatus) {
|
||||
LOG(DEBUG) << id() << ": healthStatus: " << healthStatus;
|
||||
if (healthListener) {
|
||||
healthListener->onHealthStatus(id(), healthStatus);
|
||||
}
|
||||
}
|
||||
|
||||
void IncrementalService::DataLoaderStub::healthStatusBlocked() {}
|
||||
void IncrementalService::DataLoaderStub::updateHealthStatus(bool baseline) {
|
||||
LOG(DEBUG) << id() << ": updateHealthStatus" << (baseline ? " (baseline)" : "");
|
||||
|
||||
void IncrementalService::DataLoaderStub::healthStatusUnhealthy() {}
|
||||
int healthStatusToReport = -1;
|
||||
StorageHealthListener healthListener;
|
||||
|
||||
void IncrementalService::DataLoaderStub::registerForPendingReads() {
|
||||
auto pendingReadsFd = mHealthControl.pendingReads();
|
||||
if (pendingReadsFd < 0) {
|
||||
mHealthControl = mService.mIncFs->openMount(mHealthPath);
|
||||
pendingReadsFd = mHealthControl.pendingReads();
|
||||
if (pendingReadsFd < 0) {
|
||||
LOG(ERROR) << "Failed to open health control for: " << mId << ", path: " << mHealthPath
|
||||
<< "(" << mHealthControl.cmd() << ":" << mHealthControl.pendingReads() << ":"
|
||||
<< mHealthControl.logs() << ")";
|
||||
{
|
||||
std::unique_lock lock(mMutex);
|
||||
unregisterFromPendingReads();
|
||||
|
||||
healthListener = mHealthListener;
|
||||
|
||||
// Healthcheck depends on timestamp of the oldest pending read.
|
||||
// To get it, we need to re-open a pendingReads FD to get a full list of reads.
|
||||
// Additionally we need to re-register for epoll with fresh FDs in case there are no reads.
|
||||
const auto now = Clock::now();
|
||||
const auto kernelTsUs = getOldestPendingReadTs();
|
||||
if (baseline) {
|
||||
// Updating baseline only on looper/epoll callback, i.e. on new set of pending reads.
|
||||
mHealthBase = {now, kernelTsUs};
|
||||
}
|
||||
|
||||
if (kernelTsUs == kMaxBootClockTsUs || mHealthBase.userTs > now ||
|
||||
mHealthBase.kernelTsUs > kernelTsUs) {
|
||||
LOG(DEBUG) << id() << ": No pending reads or invalid base, report Ok and wait.";
|
||||
registerForPendingReads();
|
||||
healthStatusToReport = IStorageHealthListener::HEALTH_STATUS_OK;
|
||||
lock.unlock();
|
||||
onHealthStatus(healthListener, healthStatusToReport);
|
||||
return;
|
||||
}
|
||||
|
||||
resetHealthControl();
|
||||
|
||||
// Always make sure the data loader is started.
|
||||
setTargetStatusLocked(IDataLoaderStatusListener::DATA_LOADER_STARTED);
|
||||
|
||||
// Skip any further processing if health check params are invalid.
|
||||
if (!isHealthParamsValid()) {
|
||||
LOG(DEBUG) << id()
|
||||
<< ": Skip any further processing if health check params are invalid.";
|
||||
healthStatusToReport = IStorageHealthListener::HEALTH_STATUS_READS_PENDING;
|
||||
lock.unlock();
|
||||
onHealthStatus(healthListener, healthStatusToReport);
|
||||
// Triggering data loader start. This is a one-time action.
|
||||
fsmStep();
|
||||
return;
|
||||
}
|
||||
|
||||
const auto blockedTimeout = std::chrono::milliseconds(mHealthCheckParams.blockedTimeoutMs);
|
||||
const auto unhealthyTimeout =
|
||||
std::chrono::milliseconds(mHealthCheckParams.unhealthyTimeoutMs);
|
||||
const auto unhealthyMonitoring =
|
||||
std::max(1000ms,
|
||||
std::chrono::milliseconds(mHealthCheckParams.unhealthyMonitoringMs));
|
||||
|
||||
const auto kernelDeltaUs = kernelTsUs - mHealthBase.kernelTsUs;
|
||||
const auto userTs = mHealthBase.userTs + std::chrono::microseconds(kernelDeltaUs);
|
||||
const auto delta = now - userTs;
|
||||
|
||||
TimePoint whenToCheckBack;
|
||||
if (delta < blockedTimeout) {
|
||||
LOG(DEBUG) << id() << ": Report reads pending and wait for blocked status.";
|
||||
whenToCheckBack = userTs + blockedTimeout;
|
||||
healthStatusToReport = IStorageHealthListener::HEALTH_STATUS_READS_PENDING;
|
||||
} else if (delta < unhealthyTimeout) {
|
||||
LOG(DEBUG) << id() << ": Report blocked and wait for unhealthy.";
|
||||
whenToCheckBack = userTs + unhealthyTimeout;
|
||||
healthStatusToReport = IStorageHealthListener::HEALTH_STATUS_BLOCKED;
|
||||
} else {
|
||||
LOG(DEBUG) << id() << ": Report unhealthy and continue monitoring.";
|
||||
whenToCheckBack = now + unhealthyMonitoring;
|
||||
healthStatusToReport = IStorageHealthListener::HEALTH_STATUS_UNHEALTHY;
|
||||
}
|
||||
LOG(DEBUG) << id() << ": updateHealthStatus in "
|
||||
<< double(std::chrono::duration_cast<std::chrono::milliseconds>(whenToCheckBack -
|
||||
now)
|
||||
.count()) /
|
||||
1000.0
|
||||
<< "secs";
|
||||
mService.addTimedJob(id(), whenToCheckBack, [this]() { updateHealthStatus(); });
|
||||
}
|
||||
|
||||
if (healthStatusToReport != -1) {
|
||||
onHealthStatus(healthListener, healthStatusToReport);
|
||||
}
|
||||
|
||||
fsmStep();
|
||||
}
|
||||
|
||||
const incfs::UniqueControl& IncrementalService::DataLoaderStub::initializeHealthControl() {
|
||||
if (mHealthPath.empty()) {
|
||||
resetHealthControl();
|
||||
return mHealthControl;
|
||||
}
|
||||
if (mHealthControl.pendingReads() < 0) {
|
||||
mHealthControl = mService.mIncFs->openMount(mHealthPath);
|
||||
}
|
||||
if (mHealthControl.pendingReads() < 0) {
|
||||
LOG(ERROR) << "Failed to open health control for: " << id() << ", path: " << mHealthPath
|
||||
<< "(" << mHealthControl.cmd() << ":" << mHealthControl.pendingReads() << ":"
|
||||
<< mHealthControl.logs() << ")";
|
||||
}
|
||||
return mHealthControl;
|
||||
}
|
||||
|
||||
void IncrementalService::DataLoaderStub::resetHealthControl() {
|
||||
mHealthControl = {};
|
||||
}
|
||||
|
||||
BootClockTsUs IncrementalService::DataLoaderStub::getOldestPendingReadTs() {
|
||||
auto result = kMaxBootClockTsUs;
|
||||
|
||||
const auto& control = initializeHealthControl();
|
||||
if (control.pendingReads() < 0) {
|
||||
return result;
|
||||
}
|
||||
|
||||
std::vector<incfs::ReadInfo> pendingReads;
|
||||
if (mService.mIncFs->waitForPendingReads(control, 0ms, &pendingReads) !=
|
||||
android::incfs::WaitResult::HaveData ||
|
||||
pendingReads.empty()) {
|
||||
return result;
|
||||
}
|
||||
|
||||
LOG(DEBUG) << id() << ": pendingReads: " << control.pendingReads() << ", "
|
||||
<< pendingReads.size() << ": " << pendingReads.front().bootClockTsUs;
|
||||
|
||||
for (auto&& pendingRead : pendingReads) {
|
||||
result = std::min(result, pendingRead.bootClockTsUs);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
void IncrementalService::DataLoaderStub::registerForPendingReads() {
|
||||
const auto pendingReadsFd = mHealthControl.pendingReads();
|
||||
if (pendingReadsFd < 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
LOG(DEBUG) << id() << ": addFd(pendingReadsFd): " << pendingReadsFd;
|
||||
|
||||
mService.mLooper->addFd(
|
||||
pendingReadsFd, android::Looper::POLL_CALLBACK, android::Looper::EVENT_INPUT,
|
||||
[](int, int, void* data) -> int {
|
||||
auto&& self = (DataLoaderStub*)data;
|
||||
return self->onPendingReads();
|
||||
self->updateHealthStatus(/*baseline=*/true);
|
||||
return 0;
|
||||
},
|
||||
this);
|
||||
mService.mLooper->wake();
|
||||
@@ -1967,19 +2166,10 @@ void IncrementalService::DataLoaderStub::unregisterFromPendingReads() {
|
||||
return;
|
||||
}
|
||||
|
||||
LOG(DEBUG) << id() << ": removeFd(pendingReadsFd): " << pendingReadsFd;
|
||||
|
||||
mService.mLooper->removeFd(pendingReadsFd);
|
||||
mService.mLooper->wake();
|
||||
|
||||
mHealthControl = {};
|
||||
}
|
||||
|
||||
int IncrementalService::DataLoaderStub::onPendingReads() {
|
||||
if (!mService.mRunning.load(std::memory_order_relaxed)) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
healthStatusReadsPending();
|
||||
return 0;
|
||||
}
|
||||
|
||||
void IncrementalService::DataLoaderStub::onDump(int fd) {
|
||||
|
||||
@@ -35,6 +35,7 @@
|
||||
#include <limits>
|
||||
#include <map>
|
||||
#include <mutex>
|
||||
#include <set>
|
||||
#include <span>
|
||||
#include <string>
|
||||
#include <string_view>
|
||||
@@ -186,17 +187,12 @@ private:
|
||||
|
||||
void onDump(int fd);
|
||||
|
||||
MountId id() const { return mId; }
|
||||
MountId id() const { return mId.load(std::memory_order_relaxed); }
|
||||
const content::pm::DataLoaderParamsParcel& params() const { return mParams; }
|
||||
|
||||
private:
|
||||
binder::Status onStatusChanged(MountId mount, int newStatus) final;
|
||||
|
||||
void registerForPendingReads();
|
||||
void unregisterFromPendingReads();
|
||||
int onPendingReads();
|
||||
|
||||
bool isValid() const { return mId != kInvalidStorageId; }
|
||||
sp<content::pm::IDataLoader> getDataLoader();
|
||||
|
||||
bool bind();
|
||||
@@ -208,21 +204,27 @@ private:
|
||||
void setTargetStatusLocked(int status);
|
||||
|
||||
bool fsmStep();
|
||||
bool fsmStep(int currentStatus, int targetStatus);
|
||||
|
||||
// Watching for pending reads.
|
||||
void healthStatusOk();
|
||||
// Pending reads detected, waiting for Xsecs to confirm blocked state.
|
||||
void healthStatusReadsPending();
|
||||
// There are reads pending for X+secs, waiting for additional Ysecs to confirm unhealthy
|
||||
// state.
|
||||
void healthStatusBlocked();
|
||||
// There are reads pending for X+Ysecs, marking storage as unhealthy.
|
||||
void healthStatusUnhealthy();
|
||||
void onHealthStatus(StorageHealthListener healthListener, int healthStatus);
|
||||
void updateHealthStatus(bool baseline = false);
|
||||
|
||||
bool isValid() const { return id() != kInvalidStorageId; }
|
||||
|
||||
bool isHealthParamsValid() const;
|
||||
|
||||
const incfs::UniqueControl& initializeHealthControl();
|
||||
void resetHealthControl();
|
||||
|
||||
BootClockTsUs getOldestPendingReadTs();
|
||||
|
||||
void registerForPendingReads();
|
||||
void unregisterFromPendingReads();
|
||||
|
||||
IncrementalService& mService;
|
||||
|
||||
std::mutex mMutex;
|
||||
MountId mId = kInvalidStorageId;
|
||||
std::atomic<MountId> mId = kInvalidStorageId;
|
||||
content::pm::DataLoaderParamsParcel mParams;
|
||||
content::pm::FileSystemControlParcel mControl;
|
||||
DataLoaderStatusListener mStatusListener;
|
||||
@@ -235,6 +237,11 @@ private:
|
||||
|
||||
std::string mHealthPath;
|
||||
incfs::UniqueControl mHealthControl;
|
||||
struct {
|
||||
TimePoint userTs;
|
||||
BootClockTsUs kernelTsUs;
|
||||
} mHealthBase = {TimePoint::max(), kMaxBootClockTsUs};
|
||||
StorageHealthCheckParams mHealthCheckParams;
|
||||
};
|
||||
using DataLoaderStubPtr = sp<DataLoaderStub>;
|
||||
|
||||
@@ -331,6 +338,8 @@ private:
|
||||
bool unregisterAppOpsCallback(const std::string& packageName);
|
||||
void onAppOpChanged(const std::string& packageName);
|
||||
|
||||
using Job = std::function<void()>;
|
||||
|
||||
void runJobProcessing();
|
||||
void extractZipFile(const IfsMountPtr& ifs, ZipArchiveHandle zipFile, ZipEntry& entry,
|
||||
const incfs::FileId& libFileId, std::string_view targetLibPath,
|
||||
@@ -338,6 +347,10 @@ private:
|
||||
|
||||
void runCmdLooper();
|
||||
|
||||
void addTimedJob(MountId id, TimePoint when, Job what);
|
||||
void removeTimedJobs(MountId id);
|
||||
void runTimers();
|
||||
|
||||
private:
|
||||
const std::unique_ptr<VoldServiceWrapper> mVold;
|
||||
const std::unique_ptr<DataLoaderManagerWrapper> mDataLoaderManager;
|
||||
@@ -360,7 +373,6 @@ private:
|
||||
|
||||
std::atomic_bool mRunning{true};
|
||||
|
||||
using Job = std::function<void()>;
|
||||
std::unordered_map<MountId, std::vector<Job>> mJobQueue;
|
||||
MountId mPendingJobsMount = kInvalidStorageId;
|
||||
std::condition_variable mJobCondition;
|
||||
@@ -368,6 +380,19 @@ private:
|
||||
std::thread mJobProcessor;
|
||||
|
||||
std::thread mCmdLooperThread;
|
||||
|
||||
struct TimedJob {
|
||||
MountId id;
|
||||
TimePoint when;
|
||||
Job what;
|
||||
friend bool operator<(const TimedJob& lhs, const TimedJob& rhs) {
|
||||
return lhs.when < rhs.when;
|
||||
}
|
||||
};
|
||||
std::set<TimedJob> mTimedJobs;
|
||||
std::condition_variable mTimerCondition;
|
||||
std::mutex mTimerMutex;
|
||||
std::thread mTimerThread;
|
||||
};
|
||||
|
||||
} // namespace android::incremental
|
||||
|
||||
@@ -371,7 +371,7 @@ class MockJniWrapper : public JniWrapper {
|
||||
public:
|
||||
MOCK_CONST_METHOD0(initializeForCurrentThread, void());
|
||||
|
||||
MockJniWrapper() { EXPECT_CALL(*this, initializeForCurrentThread()).Times(2); }
|
||||
MockJniWrapper() { EXPECT_CALL(*this, initializeForCurrentThread()).Times(3); }
|
||||
};
|
||||
|
||||
class MockLooperWrapper : public LooperWrapper {
|
||||
|
||||
Reference in New Issue
Block a user