Merge "Fix ShellSubscriber concurrency issues" into rvc-dev am: ca7fc3be0f am: 0a60d240e5 am: f283d69da8
Change-Id: Idd66ecb29489169eaff4e4cc0d6d4ed9f19ba67f
This commit is contained in:
@@ -19,6 +19,7 @@
|
|||||||
#include "ShellSubscriber.h"
|
#include "ShellSubscriber.h"
|
||||||
|
|
||||||
#include <android-base/file.h>
|
#include <android-base/file.h>
|
||||||
|
|
||||||
#include "matchers/matcher_util.h"
|
#include "matchers/matcher_util.h"
|
||||||
#include "stats_log_util.h"
|
#include "stats_log_util.h"
|
||||||
|
|
||||||
@@ -32,41 +33,52 @@ const static int FIELD_ID_ATOM = 1;
|
|||||||
|
|
||||||
void ShellSubscriber::startNewSubscription(int in, int out, int timeoutSec) {
|
void ShellSubscriber::startNewSubscription(int in, int out, int timeoutSec) {
|
||||||
int myToken = claimToken();
|
int myToken = claimToken();
|
||||||
|
VLOG("ShellSubscriber: new subscription %d has come in", myToken);
|
||||||
mSubscriptionShouldEnd.notify_one();
|
mSubscriptionShouldEnd.notify_one();
|
||||||
|
|
||||||
shared_ptr<SubscriptionInfo> mySubscriptionInfo = make_shared<SubscriptionInfo>(in, out);
|
shared_ptr<SubscriptionInfo> mySubscriptionInfo = make_shared<SubscriptionInfo>(in, out);
|
||||||
if (!readConfig(mySubscriptionInfo)) {
|
if (!readConfig(mySubscriptionInfo)) return;
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
// critical-section
|
{
|
||||||
std::unique_lock<std::mutex> lock(mMutex);
|
std::unique_lock<std::mutex> lock(mMutex);
|
||||||
if (myToken != mToken) {
|
if (myToken != mToken) {
|
||||||
// Some other subscription has already come in. Stop.
|
// Some other subscription has already come in. Stop.
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
mSubscriptionInfo = mySubscriptionInfo;
|
mSubscriptionInfo = mySubscriptionInfo;
|
||||||
|
|
||||||
if (mySubscriptionInfo->mPulledInfo.size() > 0 && mySubscriptionInfo->mPullIntervalMin > 0) {
|
spawnHelperThreadsLocked(mySubscriptionInfo, myToken);
|
||||||
// This thread terminates after it detects that mToken has changed.
|
waitForSubscriptionToEndLocked(mySubscriptionInfo, myToken, lock, timeoutSec);
|
||||||
|
|
||||||
|
if (mSubscriptionInfo == mySubscriptionInfo) {
|
||||||
|
mSubscriptionInfo = nullptr;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void ShellSubscriber::spawnHelperThreadsLocked(shared_ptr<SubscriptionInfo> myInfo, int myToken) {
|
||||||
|
if (!myInfo->mPulledInfo.empty() && myInfo->mPullIntervalMin > 0) {
|
||||||
std::thread puller([this, myToken] { startPull(myToken); });
|
std::thread puller([this, myToken] { startPull(myToken); });
|
||||||
puller.detach();
|
puller.detach();
|
||||||
}
|
}
|
||||||
|
|
||||||
// Block until subscription has ended.
|
std::thread heartbeatSender([this, myToken] { sendHeartbeats(myToken); });
|
||||||
if (timeoutSec > 0) {
|
heartbeatSender.detach();
|
||||||
mSubscriptionShouldEnd.wait_for(
|
}
|
||||||
lock, timeoutSec * 1s, [this, myToken, &mySubscriptionInfo] {
|
|
||||||
return mToken != myToken || !mySubscriptionInfo->mClientAlive;
|
|
||||||
});
|
|
||||||
} else {
|
|
||||||
mSubscriptionShouldEnd.wait(lock, [this, myToken, &mySubscriptionInfo] {
|
|
||||||
return mToken != myToken || !mySubscriptionInfo->mClientAlive;
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
if (mSubscriptionInfo == mySubscriptionInfo) {
|
void ShellSubscriber::waitForSubscriptionToEndLocked(shared_ptr<SubscriptionInfo> myInfo,
|
||||||
mSubscriptionInfo = nullptr;
|
int myToken,
|
||||||
|
std::unique_lock<std::mutex>& lock,
|
||||||
|
int timeoutSec) {
|
||||||
|
if (timeoutSec > 0) {
|
||||||
|
mSubscriptionShouldEnd.wait_for(lock, timeoutSec * 1s, [this, myToken, &myInfo] {
|
||||||
|
return mToken != myToken || !myInfo->mClientAlive;
|
||||||
|
});
|
||||||
|
} else {
|
||||||
|
mSubscriptionShouldEnd.wait(lock, [this, myToken, &myInfo] {
|
||||||
|
return mToken != myToken || !myInfo->mClientAlive;
|
||||||
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -129,51 +141,55 @@ bool ShellSubscriber::readConfig(shared_ptr<SubscriptionInfo> subscriptionInfo)
|
|||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
void ShellSubscriber::startPull(int64_t myToken) {
|
void ShellSubscriber::startPull(int myToken) {
|
||||||
|
VLOG("ShellSubscriber: pull thread %d starting", myToken);
|
||||||
while (true) {
|
while (true) {
|
||||||
std::lock_guard<std::mutex> lock(mMutex);
|
{
|
||||||
if (!mSubscriptionInfo || mToken != myToken) {
|
std::lock_guard<std::mutex> lock(mMutex);
|
||||||
VLOG("Pulling thread %lld done!", (long long)myToken);
|
if (!mSubscriptionInfo || mToken != myToken) {
|
||||||
return;
|
VLOG("ShellSubscriber: pulling thread %d done!", myToken);
|
||||||
}
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
int64_t nowMillis = getElapsedRealtimeMillis();
|
||||||
|
for (PullInfo& pullInfo : mSubscriptionInfo->mPulledInfo) {
|
||||||
|
if (pullInfo.mPrevPullElapsedRealtimeMs + pullInfo.mInterval >= nowMillis) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
int64_t nowMillis = getElapsedRealtimeMillis();
|
|
||||||
for (auto& pullInfo : mSubscriptionInfo->mPulledInfo) {
|
|
||||||
if (pullInfo.mPrevPullElapsedRealtimeMs + pullInfo.mInterval < nowMillis) {
|
|
||||||
vector<std::shared_ptr<LogEvent>> data;
|
|
||||||
vector<int32_t> uids;
|
vector<int32_t> uids;
|
||||||
uids.insert(uids.end(), pullInfo.mPullUids.begin(), pullInfo.mPullUids.end());
|
getUidsForPullAtom(&uids, pullInfo);
|
||||||
// This is slow. Consider storing the uids per app and listening to uidmap updates.
|
|
||||||
for (const string& pkg : pullInfo.mPullPackages) {
|
vector<std::shared_ptr<LogEvent>> data;
|
||||||
set<int32_t> uidsForPkg = mUidMap->getAppUid(pkg);
|
mPullerMgr->Pull(pullInfo.mPullerMatcher.atom_id(), uids, &data);
|
||||||
uids.insert(uids.end(), uidsForPkg.begin(), uidsForPkg.end());
|
VLOG("Pulled %zu atoms with id %d", data.size(), pullInfo.mPullerMatcher.atom_id());
|
||||||
}
|
writePulledAtomsLocked(data, pullInfo.mPullerMatcher);
|
||||||
uids.push_back(DEFAULT_PULL_UID);
|
|
||||||
mPullerMgr->Pull(pullInfo.mPullerMatcher.atom_id(), uids, &data);
|
|
||||||
VLOG("pulled %zu atoms with id %d", data.size(), pullInfo.mPullerMatcher.atom_id());
|
|
||||||
|
|
||||||
if (!writePulledAtomsLocked(data, pullInfo.mPullerMatcher)) {
|
|
||||||
mSubscriptionInfo->mClientAlive = false;
|
|
||||||
mSubscriptionShouldEnd.notify_one();
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
pullInfo.mPrevPullElapsedRealtimeMs = nowMillis;
|
pullInfo.mPrevPullElapsedRealtimeMs = nowMillis;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
VLOG("Pulling thread %lld sleep....", (long long)myToken);
|
VLOG("ShellSubscriber: pulling thread %d sleeping for %d ms", myToken,
|
||||||
|
mSubscriptionInfo->mPullIntervalMin);
|
||||||
std::this_thread::sleep_for(std::chrono::milliseconds(mSubscriptionInfo->mPullIntervalMin));
|
std::this_thread::sleep_for(std::chrono::milliseconds(mSubscriptionInfo->mPullIntervalMin));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// \return boolean indicating if writes were successful (will return false if
|
void ShellSubscriber::getUidsForPullAtom(vector<int32_t>* uids, const PullInfo& pullInfo) {
|
||||||
// client dies)
|
uids->insert(uids->end(), pullInfo.mPullUids.begin(), pullInfo.mPullUids.end());
|
||||||
bool ShellSubscriber::writePulledAtomsLocked(const vector<std::shared_ptr<LogEvent>>& data,
|
// This is slow. Consider storing the uids per app and listening to uidmap updates.
|
||||||
|
for (const string& pkg : pullInfo.mPullPackages) {
|
||||||
|
set<int32_t> uidsForPkg = mUidMap->getAppUid(pkg);
|
||||||
|
uids->insert(uids->end(), uidsForPkg.begin(), uidsForPkg.end());
|
||||||
|
}
|
||||||
|
uids->push_back(DEFAULT_PULL_UID);
|
||||||
|
}
|
||||||
|
|
||||||
|
void ShellSubscriber::writePulledAtomsLocked(const vector<std::shared_ptr<LogEvent>>& data,
|
||||||
const SimpleAtomMatcher& matcher) {
|
const SimpleAtomMatcher& matcher) {
|
||||||
mProto.clear();
|
mProto.clear();
|
||||||
int count = 0;
|
int count = 0;
|
||||||
for (const auto& event : data) {
|
for (const auto& event : data) {
|
||||||
VLOG("%s", event->ToString().c_str());
|
|
||||||
if (matchesSimple(*mUidMap, matcher, *event)) {
|
if (matchesSimple(*mUidMap, matcher, *event)) {
|
||||||
count++;
|
count++;
|
||||||
uint64_t atomToken = mProto.start(util::FIELD_TYPE_MESSAGE |
|
uint64_t atomToken = mProto.start(util::FIELD_TYPE_MESSAGE |
|
||||||
@@ -183,55 +199,67 @@ bool ShellSubscriber::writePulledAtomsLocked(const vector<std::shared_ptr<LogEve
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (count > 0) {
|
if (count > 0) attemptWriteToSocketLocked(mProto.size());
|
||||||
// First write the payload size.
|
|
||||||
size_t bufferSize = mProto.size();
|
|
||||||
if (!android::base::WriteFully(mSubscriptionInfo->mOutputFd, &bufferSize,
|
|
||||||
sizeof(bufferSize))) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
VLOG("%d atoms, proto size: %zu", count, bufferSize);
|
|
||||||
// Then write the payload.
|
|
||||||
if (!mProto.flush(mSubscriptionInfo->mOutputFd)) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return true;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void ShellSubscriber::onLogEvent(const LogEvent& event) {
|
void ShellSubscriber::onLogEvent(const LogEvent& event) {
|
||||||
std::lock_guard<std::mutex> lock(mMutex);
|
std::lock_guard<std::mutex> lock(mMutex);
|
||||||
if (!mSubscriptionInfo) {
|
if (!mSubscriptionInfo) return;
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
mProto.clear();
|
mProto.clear();
|
||||||
for (const auto& matcher : mSubscriptionInfo->mPushedMatchers) {
|
for (const auto& matcher : mSubscriptionInfo->mPushedMatchers) {
|
||||||
if (matchesSimple(*mUidMap, matcher, event)) {
|
if (matchesSimple(*mUidMap, matcher, event)) {
|
||||||
VLOG("%s", event.ToString().c_str());
|
|
||||||
uint64_t atomToken = mProto.start(util::FIELD_TYPE_MESSAGE |
|
uint64_t atomToken = mProto.start(util::FIELD_TYPE_MESSAGE |
|
||||||
util::FIELD_COUNT_REPEATED | FIELD_ID_ATOM);
|
util::FIELD_COUNT_REPEATED | FIELD_ID_ATOM);
|
||||||
event.ToProto(mProto);
|
event.ToProto(mProto);
|
||||||
mProto.end(atomToken);
|
mProto.end(atomToken);
|
||||||
|
attemptWriteToSocketLocked(mProto.size());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// First write the payload size.
|
// Tries to write the atom encoded in mProto to the socket. If the write fails
|
||||||
size_t bufferSize = mProto.size();
|
// because the read end of the pipe has closed, signals to other threads that
|
||||||
if (!android::base::WriteFully(mSubscriptionInfo->mOutputFd, &bufferSize,
|
// the subscription should end.
|
||||||
sizeof(bufferSize))) {
|
void ShellSubscriber::attemptWriteToSocketLocked(size_t dataSize) {
|
||||||
mSubscriptionInfo->mClientAlive = false;
|
// First write the payload size.
|
||||||
mSubscriptionShouldEnd.notify_one();
|
if (!android::base::WriteFully(mSubscriptionInfo->mOutputFd, &dataSize, sizeof(dataSize))) {
|
||||||
|
mSubscriptionInfo->mClientAlive = false;
|
||||||
|
mSubscriptionShouldEnd.notify_one();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (dataSize == 0) return;
|
||||||
|
|
||||||
|
// Then, write the payload.
|
||||||
|
if (!mProto.flush(mSubscriptionInfo->mOutputFd)) {
|
||||||
|
mSubscriptionInfo->mClientAlive = false;
|
||||||
|
mSubscriptionShouldEnd.notify_one();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
mLastWriteMs = getElapsedRealtimeMillis();
|
||||||
|
}
|
||||||
|
|
||||||
|
// Send a heartbeat, consisting solely of a data size of 0, if perfd has not
|
||||||
|
// recently received any writes from statsd. When it receives the data size of
|
||||||
|
// 0, perfd will not expect any data and recheck whether the shell command is
|
||||||
|
// still running.
|
||||||
|
void ShellSubscriber::sendHeartbeats(int myToken) {
|
||||||
|
while (true) {
|
||||||
|
{
|
||||||
|
std::lock_guard<std::mutex> lock(mMutex);
|
||||||
|
if (!mSubscriptionInfo || myToken != mToken) {
|
||||||
|
VLOG("ShellSubscriber: heartbeat thread %d done!", myToken);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Then write the payload.
|
if (getElapsedRealtimeMillis() - mLastWriteMs > kMsBetweenHeartbeats) {
|
||||||
if (!mProto.flush(mSubscriptionInfo->mOutputFd)) {
|
VLOG("ShellSubscriber: sending a heartbeat to perfd");
|
||||||
mSubscriptionInfo->mClientAlive = false;
|
attemptWriteToSocketLocked(/*dataSize=*/0);
|
||||||
mSubscriptionShouldEnd.notify_one();
|
|
||||||
return;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
std::this_thread::sleep_for(std::chrono::milliseconds(kMsBetweenHeartbeats));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -38,11 +38,11 @@ namespace statsd {
|
|||||||
*
|
*
|
||||||
* A shell subscription lasts *until shell exits*. Unlike config based clients, a shell client
|
* A shell subscription lasts *until shell exits*. Unlike config based clients, a shell client
|
||||||
* communicates with statsd via file descriptors. They can subscribe pushed and pulled atoms.
|
* communicates with statsd via file descriptors. They can subscribe pushed and pulled atoms.
|
||||||
* The atoms are sent back to the client in real time, as opposed to
|
* The atoms are sent back to the client in real time, as opposed to keeping the data in memory.
|
||||||
* keeping the data in memory. Shell clients do not subscribe aggregated metrics, as they are
|
* Shell clients do not subscribe aggregated metrics, as they are responsible for doing the
|
||||||
* responsible for doing the aggregation after receiving the atom events.
|
* aggregation after receiving the atom events.
|
||||||
*
|
*
|
||||||
* Shell client pass ShellSubscription in the proto binary format. Client can update the
|
* Shell clients pass ShellSubscription in the proto binary format. Clients can update the
|
||||||
* subscription by sending a new subscription. The new subscription would replace the old one.
|
* subscription by sending a new subscription. The new subscription would replace the old one.
|
||||||
* Input data stream format is:
|
* Input data stream format is:
|
||||||
*
|
*
|
||||||
@@ -54,7 +54,7 @@ namespace statsd {
|
|||||||
* The stream would be in the following format:
|
* The stream would be in the following format:
|
||||||
* |size_t|shellData proto|size_t|shellData proto|....
|
* |size_t|shellData proto|size_t|shellData proto|....
|
||||||
*
|
*
|
||||||
* Only one shell subscriber allowed at a time, because each shell subscriber blocks one thread
|
* Only one shell subscriber is allowed at a time because each shell subscriber blocks one thread
|
||||||
* until it exits.
|
* until it exits.
|
||||||
*/
|
*/
|
||||||
class ShellSubscriber : public virtual RefBase {
|
class ShellSubscriber : public virtual RefBase {
|
||||||
@@ -100,11 +100,28 @@ private:
|
|||||||
|
|
||||||
bool readConfig(std::shared_ptr<SubscriptionInfo> subscriptionInfo);
|
bool readConfig(std::shared_ptr<SubscriptionInfo> subscriptionInfo);
|
||||||
|
|
||||||
void startPull(int64_t myToken);
|
void spawnHelperThreadsLocked(std::shared_ptr<SubscriptionInfo> myInfo, int myToken);
|
||||||
|
|
||||||
bool writePulledAtomsLocked(const vector<std::shared_ptr<LogEvent>>& data,
|
void waitForSubscriptionToEndLocked(std::shared_ptr<SubscriptionInfo> myInfo,
|
||||||
|
int myToken,
|
||||||
|
std::unique_lock<std::mutex>& lock,
|
||||||
|
int timeoutSec);
|
||||||
|
|
||||||
|
void startPull(int myToken);
|
||||||
|
|
||||||
|
void writePulledAtomsLocked(const vector<std::shared_ptr<LogEvent>>& data,
|
||||||
const SimpleAtomMatcher& matcher);
|
const SimpleAtomMatcher& matcher);
|
||||||
|
|
||||||
|
void getUidsForPullAtom(vector<int32_t>* uids, const PullInfo& pullInfo);
|
||||||
|
|
||||||
|
void attemptWriteToSocketLocked(size_t dataSize);
|
||||||
|
|
||||||
|
// Send ocassional heartbeats for two reasons: (a) for statsd to detect when
|
||||||
|
// the read end of the pipe has closed and (b) for perfd to escape a
|
||||||
|
// blocking read call and recheck if the user has terminated the
|
||||||
|
// subscription.
|
||||||
|
void sendHeartbeats(int myToken);
|
||||||
|
|
||||||
sp<UidMap> mUidMap;
|
sp<UidMap> mUidMap;
|
||||||
|
|
||||||
sp<StatsPullerManager> mPullerMgr;
|
sp<StatsPullerManager> mPullerMgr;
|
||||||
@@ -120,6 +137,11 @@ private:
|
|||||||
int mToken = 0;
|
int mToken = 0;
|
||||||
|
|
||||||
const int32_t DEFAULT_PULL_UID = AID_SYSTEM;
|
const int32_t DEFAULT_PULL_UID = AID_SYSTEM;
|
||||||
|
|
||||||
|
// Tracks when we last send data to perfd. We need that time to determine
|
||||||
|
// when next to send a heartbeat.
|
||||||
|
int64_t mLastWriteMs = 0;
|
||||||
|
const int64_t kMsBetweenHeartbeats = 1000;
|
||||||
};
|
};
|
||||||
|
|
||||||
} // namespace statsd
|
} // namespace statsd
|
||||||
|
|||||||
@@ -86,28 +86,34 @@ void runShellTest(ShellSubscription config, sp<MockUidMap> uidMap,
|
|||||||
// wait for the data to be written.
|
// wait for the data to be written.
|
||||||
std::this_thread::sleep_for(100ms);
|
std::this_thread::sleep_for(100ms);
|
||||||
|
|
||||||
int expected_data_size = expectedData.ByteSize();
|
// Because we might receive heartbeats from statsd, consisting of data sizes
|
||||||
|
// of 0, encapsulate reads within a while loop.
|
||||||
|
bool readAtom = false;
|
||||||
|
while (!readAtom) {
|
||||||
|
// Read the atom size.
|
||||||
|
size_t dataSize = 0;
|
||||||
|
read(fds_data[0], &dataSize, sizeof(dataSize));
|
||||||
|
if (dataSize == 0) continue;
|
||||||
|
EXPECT_EQ(expectedData.ByteSize(), int(dataSize));
|
||||||
|
|
||||||
// now read from the pipe. firstly read the atom size.
|
// Read that much data in proto binary format.
|
||||||
size_t dataSize = 0;
|
vector<uint8_t> dataBuffer(dataSize);
|
||||||
EXPECT_EQ((int)sizeof(dataSize), read(fds_data[0], &dataSize, sizeof(dataSize)));
|
EXPECT_EQ((int)dataSize, read(fds_data[0], dataBuffer.data(), dataSize));
|
||||||
|
|
||||||
EXPECT_EQ(expected_data_size, (int)dataSize);
|
// Make sure the received bytes can be parsed to an atom.
|
||||||
|
ShellData receivedAtom;
|
||||||
|
EXPECT_TRUE(receivedAtom.ParseFromArray(dataBuffer.data(), dataSize) != 0);
|
||||||
|
|
||||||
// then read that much data which is the atom in proto binary format
|
// Serialize the expected atom to byte array and compare to make sure
|
||||||
vector<uint8_t> dataBuffer(dataSize);
|
// they are the same.
|
||||||
EXPECT_EQ((int)dataSize, read(fds_data[0], dataBuffer.data(), dataSize));
|
vector<uint8_t> expectedAtomBuffer(expectedData.ByteSize());
|
||||||
|
expectedData.SerializeToArray(expectedAtomBuffer.data(), expectedData.ByteSize());
|
||||||
|
EXPECT_EQ(expectedAtomBuffer, dataBuffer);
|
||||||
|
|
||||||
// make sure the received bytes can be parsed to an atom
|
readAtom = true;
|
||||||
ShellData receivedAtom;
|
}
|
||||||
EXPECT_TRUE(receivedAtom.ParseFromArray(dataBuffer.data(), dataSize) != 0);
|
|
||||||
|
|
||||||
// serialze the expected atom to bytes. and compare. to make sure they are the same.
|
|
||||||
vector<uint8_t> atomBuffer(expected_data_size);
|
|
||||||
expectedData.SerializeToArray(&atomBuffer[0], expected_data_size);
|
|
||||||
EXPECT_EQ(atomBuffer, dataBuffer);
|
|
||||||
close(fds_data[0]);
|
close(fds_data[0]);
|
||||||
|
|
||||||
if (reader.joinable()) {
|
if (reader.joinable()) {
|
||||||
reader.join();
|
reader.join();
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user