[incfs] Fix the mount state callbacks processing

- use a never-existing storage ID as a job key
- order the jobs in the map to not skip them on changes, or,
  worse, never hang in a loop
- clear the local callbacks vector before moving to the next
  storage ID
- try to resume from the closest place on the next processing
  iteration

Bug: 183435580
Test: atest service.incremental_test
Change-Id: I36cd5d30c656bed62c20bd7a7f84fb58046a0933
This commit is contained in:
Yurii Zubrytskyi
2021-03-24 00:48:24 -07:00
committed by Alex Buynytskyy
parent fef3257d39
commit 9acc9acea5
3 changed files with 44 additions and 36 deletions

View File

@@ -2272,7 +2272,7 @@ void IncrementalService::addIfsStateCallback(StorageId storageId, IfsStateCallba
mIfsStateCallbacks[storageId].emplace_back(std::move(callback));
}
if (wasEmpty) {
addTimedJob(*mTimedQueue, kMaxStorageId, Constants::progressUpdateInterval,
addTimedJob(*mTimedQueue, kAllStoragesId, Constants::progressUpdateInterval,
[this]() { processIfsStateCallbacks(); });
}
}
@@ -2288,29 +2288,36 @@ void IncrementalService::processIfsStateCallbacks() {
}
IfsStateCallbacks::iterator it;
if (storageId == kInvalidStorageId) {
// First entry, initialize the it.
// First entry, initialize the |it|.
it = mIfsStateCallbacks.begin();
} else {
// Subsequent entries, update the storageId, and shift to the new one.
it = mIfsStateCallbacks.find(storageId);
// Subsequent entries, update the |storageId|, and shift to the new one (not that
// it guarantees much about updated items, but at least the loop will finish).
it = mIfsStateCallbacks.lower_bound(storageId);
if (it == mIfsStateCallbacks.end()) {
// Was removed while processing, too bad.
// Nothing else left, too bad.
break;
}
auto& callbacks = it->second;
if (callbacks.empty()) {
std::swap(callbacks, local);
if (it->first != storageId) {
local.clear(); // Was removed during processing, forget the old callbacks.
} else {
callbacks.insert(callbacks.end(), local.begin(), local.end());
}
if (callbacks.empty()) {
it = mIfsStateCallbacks.erase(it);
if (mIfsStateCallbacks.empty()) {
return;
// Put the 'surviving' callbacks back into the map and advance the position.
auto& callbacks = it->second;
if (callbacks.empty()) {
std::swap(callbacks, local);
} else {
callbacks.insert(callbacks.end(), std::move_iterator(local.begin()),
std::move_iterator(local.end()));
local.clear();
}
if (callbacks.empty()) {
it = mIfsStateCallbacks.erase(it);
if (mIfsStateCallbacks.empty()) {
return;
}
} else {
++it;
}
} else {
++it;
}
}
@@ -2330,7 +2337,7 @@ void IncrementalService::processIfsStateCallbacks() {
processIfsStateCallbacks(storageId, local);
}
addTimedJob(*mTimedQueue, kMaxStorageId, Constants::progressUpdateInterval,
addTimedJob(*mTimedQueue, kAllStoragesId, Constants::progressUpdateInterval,
[this]() { processIfsStateCallbacks(); });
}

View File

@@ -95,7 +95,8 @@ public:
#pragma GCC diagnostic pop
static constexpr StorageId kInvalidStorageId = -1;
static constexpr StorageId kMaxStorageId = std::numeric_limits<int>::max();
static constexpr StorageId kMaxStorageId = std::numeric_limits<int>::max() - 1;
static constexpr StorageId kAllStoragesId = kMaxStorageId + 1;
static constexpr BootClockTsUs kMaxBootClockTsUs = std::numeric_limits<BootClockTsUs>::max();
@@ -472,7 +473,7 @@ private:
std::mutex mCallbacksLock;
std::unordered_map<std::string, sp<AppOpsListener>> mCallbackRegistered;
using IfsStateCallbacks = std::unordered_map<StorageId, std::vector<IfsStateCallback>>;
using IfsStateCallbacks = std::map<StorageId, std::vector<IfsStateCallback>>;
std::mutex mIfsStateCallbacksLock;
IfsStateCallbacks mIfsStateCallbacks;

View File

@@ -1735,10 +1735,10 @@ TEST_F(IncrementalServiceTest, testStartDataLoaderUnbindOnAllDone) {
ASSERT_EQ(mDataLoader->status(), IDataLoaderStatusListener::DATA_LOADER_STARTED);
// IfsState callback present.
ASSERT_EQ(IncrementalService::kMaxStorageId, mTimedQueue->mId);
ASSERT_EQ(IncrementalService::kAllStoragesId, mTimedQueue->mId);
ASSERT_EQ(mTimedQueue->mAfter, stateUpdateInterval);
auto callback = mTimedQueue->mWhat;
mTimedQueue->clearJob(IncrementalService::kMaxStorageId);
mTimedQueue->clearJob(IncrementalService::kAllStoragesId);
// Not loaded yet.
EXPECT_CALL(*mIncFs, isEverythingFullyLoaded(_))
@@ -1751,10 +1751,10 @@ TEST_F(IncrementalServiceTest, testStartDataLoaderUnbindOnAllDone) {
ASSERT_EQ(mDataLoader->status(), IDataLoaderStatusListener::DATA_LOADER_STARTED);
// Still present.
ASSERT_EQ(IncrementalService::kMaxStorageId, mTimedQueue->mId);
ASSERT_EQ(IncrementalService::kAllStoragesId, mTimedQueue->mId);
ASSERT_EQ(mTimedQueue->mAfter, stateUpdateInterval);
callback = mTimedQueue->mWhat;
mTimedQueue->clearJob(IncrementalService::kMaxStorageId);
mTimedQueue->clearJob(IncrementalService::kAllStoragesId);
// Fully loaded.
EXPECT_CALL(*mIncFs, isEverythingFullyLoaded(_)).WillOnce(Return(incfs::LoadingState::Full));
@@ -1797,10 +1797,10 @@ TEST_F(IncrementalServiceTest, testStartDataLoaderUnbindOnAllDoneWithReadlogs) {
ASSERT_EQ(mDataLoader->status(), IDataLoaderStatusListener::DATA_LOADER_STARTED);
// IfsState callback present.
ASSERT_EQ(IncrementalService::kMaxStorageId, mTimedQueue->mId);
ASSERT_EQ(IncrementalService::kAllStoragesId, mTimedQueue->mId);
ASSERT_EQ(mTimedQueue->mAfter, stateUpdateInterval);
auto callback = mTimedQueue->mWhat;
mTimedQueue->clearJob(IncrementalService::kMaxStorageId);
mTimedQueue->clearJob(IncrementalService::kAllStoragesId);
// Not loaded yet.
EXPECT_CALL(*mIncFs, isEverythingFullyLoaded(_))
@@ -1813,10 +1813,10 @@ TEST_F(IncrementalServiceTest, testStartDataLoaderUnbindOnAllDoneWithReadlogs) {
ASSERT_EQ(mDataLoader->status(), IDataLoaderStatusListener::DATA_LOADER_STARTED);
// Still present.
ASSERT_EQ(IncrementalService::kMaxStorageId, mTimedQueue->mId);
ASSERT_EQ(IncrementalService::kAllStoragesId, mTimedQueue->mId);
ASSERT_EQ(mTimedQueue->mAfter, stateUpdateInterval);
callback = mTimedQueue->mWhat;
mTimedQueue->clearJob(IncrementalService::kMaxStorageId);
mTimedQueue->clearJob(IncrementalService::kAllStoragesId);
// Fully loaded.
EXPECT_CALL(*mIncFs, isEverythingFullyLoaded(_))
@@ -1832,10 +1832,10 @@ TEST_F(IncrementalServiceTest, testStartDataLoaderUnbindOnAllDoneWithReadlogs) {
ASSERT_EQ(mDataLoader->status(), IDataLoaderStatusListener::DATA_LOADER_STARTED);
// Still present.
ASSERT_EQ(IncrementalService::kMaxStorageId, mTimedQueue->mId);
ASSERT_EQ(IncrementalService::kAllStoragesId, mTimedQueue->mId);
ASSERT_EQ(mTimedQueue->mAfter, stateUpdateInterval);
callback = mTimedQueue->mWhat;
mTimedQueue->clearJob(IncrementalService::kMaxStorageId);
mTimedQueue->clearJob(IncrementalService::kAllStoragesId);
// Disable readlogs and expect the unbind.
EXPECT_CALL(*mDataLoaderManager, unbindFromDataLoader(_)).Times(1);
@@ -2007,10 +2007,10 @@ TEST_F(IncrementalServiceTest, testPerUidTimeoutsSuccess) {
{
// Timed callback present -> 0 progress.
ASSERT_EQ(IncrementalService::kMaxStorageId, mTimedQueue->mId);
ASSERT_EQ(IncrementalService::kAllStoragesId, mTimedQueue->mId);
ASSERT_GE(mTimedQueue->mAfter, std::chrono::seconds(1));
const auto timedCallback = mTimedQueue->mWhat;
mTimedQueue->clearJob(IncrementalService::kMaxStorageId);
mTimedQueue->clearJob(IncrementalService::kAllStoragesId);
// Call it again.
timedCallback();
@@ -2018,10 +2018,10 @@ TEST_F(IncrementalServiceTest, testPerUidTimeoutsSuccess) {
{
// Still present -> some progress.
ASSERT_EQ(IncrementalService::kMaxStorageId, mTimedQueue->mId);
ASSERT_EQ(IncrementalService::kAllStoragesId, mTimedQueue->mId);
ASSERT_GE(mTimedQueue->mAfter, std::chrono::seconds(1));
const auto timedCallback = mTimedQueue->mWhat;
mTimedQueue->clearJob(IncrementalService::kMaxStorageId);
mTimedQueue->clearJob(IncrementalService::kAllStoragesId);
// Fully loaded but readlogs collection enabled.
ASSERT_GE(mDataLoader->setStorageParams(true), 0);
@@ -2032,10 +2032,10 @@ TEST_F(IncrementalServiceTest, testPerUidTimeoutsSuccess) {
{
// Still present -> fully loaded + readlogs.
ASSERT_EQ(IncrementalService::kMaxStorageId, mTimedQueue->mId);
ASSERT_EQ(IncrementalService::kAllStoragesId, mTimedQueue->mId);
ASSERT_GE(mTimedQueue->mAfter, std::chrono::seconds(1));
const auto timedCallback = mTimedQueue->mWhat;
mTimedQueue->clearJob(IncrementalService::kMaxStorageId);
mTimedQueue->clearJob(IncrementalService::kAllStoragesId);
// Now disable readlogs.
ASSERT_GE(mDataLoader->setStorageParams(false), 0);