[incfs] Allow multiple timed jobs at the same time point
Old code had a tiny chance of ignoring a job if it happens to be scheduled to the exactly the same time as one already in the queue. Not that it will ever happen, but better to fix it. + make the worker thread code slightly easier to reason about Bug: 183243150 Test: atest IncrementalServiceTest Change-Id: Ia3126d30e19edfd17f7c8da368e9763ca5501e84
This commit is contained in:
@@ -255,7 +255,7 @@ public:
|
||||
|
||||
static JNIEnv* getOrAttachJniEnv(JavaVM* jvm);
|
||||
|
||||
class RealTimedQueueWrapper : public TimedQueueWrapper {
|
||||
class RealTimedQueueWrapper final : public TimedQueueWrapper {
|
||||
public:
|
||||
RealTimedQueueWrapper(JavaVM* jvm) {
|
||||
mThread = std::thread([this, jvm]() {
|
||||
@@ -268,11 +268,11 @@ public:
|
||||
CHECK(!mThread.joinable()) << "call stop first";
|
||||
}
|
||||
|
||||
void addJob(MountId id, Milliseconds after, Job what) final {
|
||||
void addJob(MountId id, Milliseconds timeout, Job what) final {
|
||||
const auto now = Clock::now();
|
||||
{
|
||||
std::unique_lock lock(mMutex);
|
||||
mJobs.insert(TimedJob{id, now + after, std::move(what)});
|
||||
mJobs.insert(TimedJob{id, now + timeout, std::move(what)});
|
||||
}
|
||||
mCondition.notify_all();
|
||||
}
|
||||
@@ -293,29 +293,28 @@ public:
|
||||
private:
|
||||
void runTimers() {
|
||||
static constexpr TimePoint kInfinityTs{Clock::duration::max()};
|
||||
TimePoint nextJobTs = kInfinityTs;
|
||||
std::unique_lock lock(mMutex);
|
||||
for (;;) {
|
||||
mCondition.wait_until(lock, nextJobTs, [this, nextJobTs]() {
|
||||
const TimePoint nextJobTs = mJobs.empty() ? kInfinityTs : mJobs.begin()->when;
|
||||
mCondition.wait_until(lock, nextJobTs, [this, oldNextJobTs = nextJobTs]() {
|
||||
const auto now = Clock::now();
|
||||
const auto firstJobTs = !mJobs.empty() ? mJobs.begin()->when : kInfinityTs;
|
||||
return !mRunning || firstJobTs <= now || firstJobTs < nextJobTs;
|
||||
const auto newFirstJobTs = !mJobs.empty() ? mJobs.begin()->when : kInfinityTs;
|
||||
return newFirstJobTs <= now || newFirstJobTs < oldNextJobTs || !mRunning;
|
||||
});
|
||||
if (!mRunning) {
|
||||
return;
|
||||
}
|
||||
|
||||
const auto now = Clock::now();
|
||||
auto it = mJobs.begin();
|
||||
// Always acquire begin(). We can't use it after unlock as mTimedJobs can change.
|
||||
for (; it != mJobs.end() && it->when <= now; it = mJobs.begin()) {
|
||||
// Always re-acquire begin(). We can't use it after unlock as mTimedJobs can change.
|
||||
for (auto it = mJobs.begin(); it != mJobs.end() && it->when <= now;
|
||||
it = mJobs.begin()) {
|
||||
auto jobNode = mJobs.extract(it);
|
||||
|
||||
lock.unlock();
|
||||
jobNode.value().what();
|
||||
lock.lock();
|
||||
}
|
||||
nextJobTs = it != mJobs.end() ? it->when : kInfinityTs;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -328,7 +327,7 @@ private:
|
||||
}
|
||||
};
|
||||
bool mRunning = true;
|
||||
std::set<TimedJob> mJobs;
|
||||
std::multiset<TimedJob> mJobs;
|
||||
std::condition_variable mCondition;
|
||||
std::mutex mMutex;
|
||||
std::thread mThread;
|
||||
|
||||
Reference in New Issue
Block a user