Skip to content

Commit 7b8590f

Browse files
Nurtau Toganbayfacebook-github-bot
authored andcommitted
Correctly join thread in TaskDispatchThread (#53943)
Summary: Pull Request resolved: #53943 Changelog: [Internal] Thread join should happen only during TaskDispatchThread dtor, otherwise if quit is called on the same thread (thread_), then thread_ will get detached, which can lead to use of already deleted fields (queue_, running_) of TaskDispatchThread. But we also need to make sure that on quit() call, loop is getting stopped, otherwise in loop, already running task will use already deleted fields of captured object. If quit() is called as task on thread_, then there is no need to wait for future because loop will organically end since running_ is false. Reviewed By: rshest Differential Revision: D83044012 fbshipit-source-id: a2da97a89093c47c64b693e3c9e5d6f7297c038e
1 parent 4e595b7 commit 7b8590f

File tree

3 files changed

+57
-30
lines changed

3 files changed

+57
-30
lines changed

packages/react-native/ReactCxxPlatform/react/threading/TaskDispatchThread.cpp

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,10 @@ TaskDispatchThread::TaskDispatchThread(
5050

5151
TaskDispatchThread::~TaskDispatchThread() noexcept {
5252
quit();
53+
54+
if (thread_.joinable()) {
55+
thread_.join();
56+
}
5357
}
5458

5559
bool TaskDispatchThread::isOnThread() noexcept {
@@ -87,17 +91,14 @@ void TaskDispatchThread::runSync(TaskFn&& task) noexcept {
8791
}
8892

8993
void TaskDispatchThread::quit() noexcept {
90-
if (!running_) {
94+
bool expected = true;
95+
if (!running_.compare_exchange_strong(expected, false)) {
9196
return;
9297
}
93-
running_ = false;
98+
9499
loopCv_.notify_one();
95-
if (thread_.joinable()) {
96-
if (!isOnThread()) {
97-
thread_.join();
98-
} else {
99-
thread_.detach();
100-
}
100+
if (!isOnThread()) {
101+
loopStoppedPromise_.get_future().wait();
101102
}
102103
}
103104

@@ -108,6 +109,7 @@ void TaskDispatchThread::loop() noexcept {
108109
while (running_) {
109110
std::unique_lock<std::mutex> lock(queueLock_);
110111
loopCv_.wait(lock, [&]() { return !running_ || !queue_.empty(); });
112+
111113
while (!queue_.empty()) {
112114
auto task = queue_.top();
113115
auto now = std::chrono::system_clock::now();
@@ -120,11 +122,11 @@ void TaskDispatchThread::loop() noexcept {
120122
} else {
121123
// Shutting down, skip all the remaining tasks
122124
queue_ = {};
123-
return;
125+
break;
124126
}
125127

126-
// We should check whether the task thread is still running at this point
127-
// (which may not anymore be the case since the previous check)
128+
// We should check whether the task thread is still running at this
129+
// point (which may not anymore be the case since the previous check)
128130
if (running_) {
129131
lock.unlock();
130132
task.fn();
@@ -133,6 +135,8 @@ void TaskDispatchThread::loop() noexcept {
133135
queue_.pop();
134136
}
135137
}
138+
139+
loopStoppedPromise_.set_value();
136140
}
137141

138142
} // namespace facebook::react

packages/react-native/ReactCxxPlatform/react/threading/TaskDispatchThread.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
#include <chrono>
1111
#include <condition_variable>
1212
#include <functional>
13+
#include <future>
1314
#include <mutex>
1415
#include <queue>
1516
#include <thread>
@@ -71,6 +72,8 @@ class TaskDispatchThread {
7172
std::atomic<bool> running_{true};
7273
std::string threadName_;
7374
std::thread thread_;
75+
76+
std::promise<void> loopStoppedPromise_;
7477
};
7578

7679
} // namespace facebook::react

packages/react-native/ReactCxxPlatform/react/threading/tests/TaskDispatchThreadTests.cpp

Lines changed: 39 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -15,27 +15,28 @@ namespace facebook::react {
1515

1616
class TaskDispatchThreadTest : public ::testing::Test {
1717
protected:
18-
TaskDispatchThread dispatcher;
18+
std::unique_ptr<TaskDispatchThread> dispatcher{
19+
std::make_unique<TaskDispatchThread>()};
1920
};
2021

2122
// Test: isOnThread returns true inside the looper thread
2223
TEST_F(TaskDispatchThreadTest, IsOnThreadReturnsTrueInLooper) {
2324
bool result = false;
24-
dispatcher.runSync([&] { result = dispatcher.isOnThread(); });
25+
dispatcher->runSync([&] { result = dispatcher->isOnThread(); });
2526
EXPECT_TRUE(result);
2627
}
2728

2829
// Test: isRunning returns true before quit, false after
2930
TEST_F(TaskDispatchThreadTest, IsRunningFlag) {
30-
EXPECT_TRUE(dispatcher.isRunning());
31-
dispatcher.quit();
32-
EXPECT_FALSE(dispatcher.isRunning());
31+
EXPECT_TRUE(dispatcher->isRunning());
32+
dispatcher->quit();
33+
EXPECT_FALSE(dispatcher->isRunning());
3334
}
3435

3536
// Test: runAsync executes the task
3637
TEST_F(TaskDispatchThreadTest, RunAsyncExecutesTask) {
3738
std::atomic<int> counter{0};
38-
dispatcher.runAsync([&] { counter++; });
39+
dispatcher->runAsync([&] { counter++; });
3940
// Wait for task to complete
4041
std::this_thread::sleep_for(std::chrono::milliseconds(50));
4142
EXPECT_EQ(counter.load(), 1);
@@ -44,14 +45,14 @@ TEST_F(TaskDispatchThreadTest, RunAsyncExecutesTask) {
4445
// Test: runSync executes the task and blocks until done
4546
TEST_F(TaskDispatchThreadTest, RunSyncExecutesTask) {
4647
std::atomic<int> counter{0};
47-
dispatcher.runSync([&] { counter++; });
48+
dispatcher->runSync([&] { counter++; });
4849
EXPECT_EQ(counter.load(), 1);
4950
}
5051

5152
// Test: runAsync with delay
5253
TEST_F(TaskDispatchThreadTest, RunAsyncWithDelay) {
5354
std::atomic<int> counter{0};
54-
dispatcher.runAsync([&] { counter++; }, std::chrono::milliseconds(100));
55+
dispatcher->runAsync([&] { counter++; }, std::chrono::milliseconds(100));
5556
std::this_thread::sleep_for(std::chrono::milliseconds(50));
5657
EXPECT_EQ(counter.load(), 0); // Not yet executed
5758
std::this_thread::sleep_for(std::chrono::milliseconds(70));
@@ -61,9 +62,9 @@ TEST_F(TaskDispatchThreadTest, RunAsyncWithDelay) {
6162
// Test: Multiple delayed tasks execute in order
6263
TEST_F(TaskDispatchThreadTest, MultipleDelayedTasksOrder) {
6364
std::vector<int> results;
64-
dispatcher.runAsync(
65+
dispatcher->runAsync(
6566
[&] { results.push_back(1); }, std::chrono::milliseconds(50));
66-
dispatcher.runAsync(
67+
dispatcher->runAsync(
6768
[&] { results.push_back(2); }, std::chrono::milliseconds(100));
6869
std::this_thread::sleep_for(std::chrono::milliseconds(120));
6970
ASSERT_EQ(results.size(), 2);
@@ -75,48 +76,48 @@ TEST_F(TaskDispatchThreadTest, MultipleDelayedTasksOrder) {
7576
TEST_F(TaskDispatchThreadTest, RunSyncBlocksUntilDone) {
7677
std::atomic<bool> started{false};
7778
std::atomic<bool> finished{false};
78-
dispatcher.runAsync([&] {
79+
dispatcher->runAsync([&] {
7980
started = true;
8081
std::this_thread::sleep_for(std::chrono::milliseconds(50));
8182
finished = true;
8283
});
83-
dispatcher.runSync([&] {
84+
dispatcher->runSync([&] {
8485
EXPECT_TRUE(started);
8586
EXPECT_TRUE(finished);
8687
});
8788
}
8889

8990
// Test: quit prevents further tasks from running
9091
TEST_F(TaskDispatchThreadTest, QuitPreventsFurtherTasks) {
91-
dispatcher.quit();
92+
dispatcher->quit();
9293
std::atomic<int> counter{0};
93-
dispatcher.runAsync([&] { counter++; });
94+
dispatcher->runAsync([&] { counter++; });
9495
std::this_thread::sleep_for(std::chrono::milliseconds(50));
9596
EXPECT_EQ(counter.load(), 0);
9697
}
9798

9899
// Test: Multiple runSync tasks execute serially
99100
TEST_F(TaskDispatchThreadTest, MultipleRunSyncSerialExecution) {
100101
std::vector<int> results;
101-
dispatcher.runSync([&] { results.push_back(1); });
102-
dispatcher.runSync([&] { results.push_back(2); });
102+
dispatcher->runSync([&] { results.push_back(1); });
103+
dispatcher->runSync([&] { results.push_back(2); });
103104
EXPECT_EQ(results.size(), 2);
104105
EXPECT_EQ(results[0], 1);
105106
EXPECT_EQ(results[1], 2);
106107
}
107108

108109
// Test: Edge case - runSync after quit should not execute
109110
TEST_F(TaskDispatchThreadTest, RunSyncAfterQuitDoesNotExecute) {
110-
dispatcher.quit();
111+
dispatcher->quit();
111112
std::atomic<int> counter{0};
112-
dispatcher.runSync([&] { counter++; });
113+
dispatcher->runSync([&] { counter++; });
113114
EXPECT_EQ(counter.load(), 0);
114115
}
115116

116117
// Test: Thread safety - runAsync from multiple threads
117118
TEST_F(TaskDispatchThreadTest, RunAsyncFromMultipleThreads) {
118119
std::atomic<int> counter{0};
119-
auto task = [&] { dispatcher.runAsync([&] { counter++; }); };
120+
auto task = [&] { dispatcher->runAsync([&] { counter++; }); };
120121
std::thread t1(task);
121122
std::thread t2(task);
122123
std::thread t3(task);
@@ -127,4 +128,23 @@ TEST_F(TaskDispatchThreadTest, RunAsyncFromMultipleThreads) {
127128
EXPECT_EQ(counter.load(), 3);
128129
}
129130

131+
TEST_F(TaskDispatchThreadTest, QuitInTaskShouldntBeBlockedForever) {
132+
dispatcher->runSync([&] { dispatcher->quit(); });
133+
}
134+
135+
TEST_F(TaskDispatchThreadTest, QuitShouldWaitAlreadyRunningTask) {
136+
{
137+
std::unique_ptr<int> counter = std::make_unique<int>(0);
138+
dispatcher->runAsync([&] {
139+
std::this_thread::sleep_for(std::chrono::milliseconds(300));
140+
*counter = 1;
141+
});
142+
std::this_thread::sleep_for(std::chrono::milliseconds(50));
143+
// if quit doesn't wait for running task, then *counter will access already
144+
// deleted object
145+
dispatcher->quit();
146+
}
147+
// forcing dispatcher to join thread
148+
dispatcher.reset();
149+
}
130150
} // namespace facebook::react

0 commit comments

Comments
 (0)