Skip to content

Commit eb7f67d

Browse files
authored
Merge pull request #8358 from fwyzard/IB/CMSSW_13_0_X/master_fix_TBB_weak_memory
[TBB] Fix concurrent_[bounded_]queue correctness on weak memory models [13.0.x]
2 parents 7222c29 + 867a8b5 commit eb7f67d

2 files changed

Lines changed: 322 additions & 0 deletions

File tree

tbb-782.patch

Lines changed: 320 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,320 @@
1+
From 2f1b2a2055da106b98859b60a785c0c45829dc79 Mon Sep 17 00:00:00 2001
2+
From: Alexei Katranov <alexei.katranov@intel.com>
3+
Date: Mon, 21 Feb 2022 15:03:10 +0300
4+
Subject: [PATCH 1/6] Fix cq & cbq correctness on ARM
5+
6+
Signed-off-by: Alexei Katranov <alexei.katranov@intel.com>
7+
---
8+
include/oneapi/tbb/concurrent_queue.h | 6 +++--
9+
.../tbb/detail/_concurrent_queue_base.h | 14 +++++------
10+
test/tbb/test_concurrent_queue_whitebox.cpp | 23 +++++++++----------
11+
3 files changed, 22 insertions(+), 21 deletions(-)
12+
13+
diff --git a/include/oneapi/tbb/concurrent_queue.h b/include/oneapi/tbb/concurrent_queue.h
14+
index 18f5bc80cf..8b5f289104 100644
15+
--- a/include/oneapi/tbb/concurrent_queue.h
16+
+++ b/include/oneapi/tbb/concurrent_queue.h
17+
@@ -183,7 +183,8 @@ class concurrent_queue {
18+
bool internal_try_pop( void* dst ) {
19+
ticket_type k;
20+
do {
21+
- k = my_queue_representation->head_counter.load(std::memory_order_relaxed);
22+
+ // Bassicaly, we need to read `head` before `tail`. To achive it we build happens-before on `head`
23+
+ k = my_queue_representation->head_counter.load(std::memory_order_acquire);
24+
do {
25+
if (static_cast<std::ptrdiff_t>(my_queue_representation->tail_counter.load(std::memory_order_relaxed) - k) <= 0) {
26+
// Queue is empty
27+
@@ -514,7 +515,8 @@ class concurrent_bounded_queue {
28+
bool internal_pop_if_present( void* dst ) {
29+
ticket_type ticket;
30+
do {
31+
- ticket = my_queue_representation->head_counter.load(std::memory_order_relaxed);
32+
+ // Bassicaly, we need to read `head` before `tail`. To achive it we build happens-before on `head`
33+
+ ticket = my_queue_representation->head_counter.load(std::memory_order_acquire);
34+
do {
35+
if (static_cast<std::ptrdiff_t>(my_queue_representation->tail_counter.load(std::memory_order_relaxed) - ticket) <= 0) { // queue is empty
36+
// Queue is empty
37+
diff --git a/include/oneapi/tbb/detail/_concurrent_queue_base.h b/include/oneapi/tbb/detail/_concurrent_queue_base.h
38+
index 8bdf213230..b51b2e419b 100644
39+
--- a/include/oneapi/tbb/detail/_concurrent_queue_base.h
40+
+++ b/include/oneapi/tbb/detail/_concurrent_queue_base.h
41+
@@ -123,7 +123,7 @@ class micro_queue {
42+
page_allocator_traits::construct(page_allocator, p);
43+
}
44+
45+
- if (tail_counter.load(std::memory_order_relaxed) != k) spin_wait_until_my_turn(tail_counter, k, base);
46+
+ spin_wait_until_my_turn(tail_counter, k, base);
47+
d1::call_itt_notify(d1::acquired, &tail_counter);
48+
49+
if (p) {
50+
@@ -134,9 +134,9 @@ class micro_queue {
51+
} else {
52+
head_page.store(p, std::memory_order_relaxed);
53+
}
54+
- tail_page.store(p, std::memory_order_release);
55+
+ tail_page.store(p, std::memory_order_relaxed);
56+
} else {
57+
- p = tail_page.load(std::memory_order_acquire); // TODO may be relaxed ?
58+
+ p = tail_page.load(std::memory_order_relaxed);
59+
}
60+
return index;
61+
}
62+
@@ -179,7 +179,7 @@ class micro_queue {
63+
d1::call_itt_notify(d1::acquired, &head_counter);
64+
spin_wait_while_eq(tail_counter, k);
65+
d1::call_itt_notify(d1::acquired, &tail_counter);
66+
- padded_page *p = head_page.load(std::memory_order_acquire);
67+
+ padded_page *p = head_page.load(std::memory_order_relaxed);
68+
__TBB_ASSERT( p, nullptr );
69+
size_type index = modulo_power_of_two( k/queue_rep_type::n_queue, items_per_page );
70+
bool success = false;
71+
@@ -337,7 +337,7 @@ class micro_queue {
72+
73+
void spin_wait_until_my_turn( std::atomic<ticket_type>& counter, ticket_type k, queue_rep_type& rb ) const {
74+
for (atomic_backoff b(true);; b.pause()) {
75+
- ticket_type c = counter;
76+
+ ticket_type c = counter.load(std::memory_order_acquire);
77+
if (c == k) return;
78+
else if (c & 1) {
79+
++rb.n_invalid_entries;
80+
@@ -378,9 +378,9 @@ class micro_queue_pop_finalizer {
81+
if( is_valid_page(p) ) {
82+
spin_mutex::scoped_lock lock( my_queue.page_mutex );
83+
padded_page* q = p->next;
84+
- my_queue.head_page.store(q, std::memory_order_release);
85+
+ my_queue.head_page.store(q, std::memory_order_relaxed);
86+
if( !is_valid_page(q) ) {
87+
- my_queue.tail_page.store(nullptr, std::memory_order_release);
88+
+ my_queue.tail_page.store(nullptr, std::memory_order_relaxed);
89+
}
90+
}
91+
my_queue.head_counter.store(my_ticket_type, std::memory_order_release);
92+
diff --git a/test/tbb/test_concurrent_queue_whitebox.cpp b/test/tbb/test_concurrent_queue_whitebox.cpp
93+
index 18da8def25..59ce7f54cb 100644
94+
--- a/test/tbb/test_concurrent_queue_whitebox.cpp
95+
+++ b/test/tbb/test_concurrent_queue_whitebox.cpp
96+
@@ -51,7 +51,8 @@ class FloggerBody {
97+
value_type elem = value_type(thread_id);
98+
for (std::size_t i = 0; i < elem_num; ++i) {
99+
q.push(elem);
100+
- q.try_pop(elem);
101+
+ bool res = q.try_pop(elem);
102+
+ CHECK_FAST(res);
103+
}
104+
}
105+
106+
@@ -83,20 +84,18 @@ void test_flogger_help( Q& q, std::size_t items_per_page ) {
107+
REQUIRE_MESSAGE(q.my_queue_representation->head_counter < hack_val, "Failed wraparound test");
108+
}
109+
110+
-template <typename T>
111+
-void test_flogger() {
112+
- {
113+
- tbb::concurrent_queue<T> q;
114+
- test_flogger_help(q, q.my_queue_representation->items_per_page);
115+
- }
116+
- {
117+
- tbb::concurrent_bounded_queue<T> q;
118+
+//! \brief \ref error_guessing
119+
+TEST_CASE("Test CQ Wrapparound") {
120+
+ for (int i = 0; i < 1000; ++i) {
121+
+ tbb::concurrent_queue<int> q;
122+
test_flogger_help(q, q.my_queue_representation->items_per_page);
123+
}
124+
}
125+
126+
//! \brief \ref error_guessing
127+
-TEST_CASE("Test Wrapparound") {
128+
- test_flogger<int>();
129+
- // TODO: add test with unsigned char
130+
+TEST_CASE("Test CBQ Wrapparound") {
131+
+ for (int i = 0; i < 1000; ++i) {
132+
+ tbb::concurrent_bounded_queue<int> q;
133+
+ test_flogger_help(q, q.my_queue_representation->items_per_page);
134+
+ }
135+
}
136+
137+
From 65a157d0a625383280fe6bddd046414cd0d980f3 Mon Sep 17 00:00:00 2001
138+
From: Alex <alexei.katranov@intel.com>
139+
Date: Mon, 21 Feb 2022 15:25:18 +0300
140+
Subject: [PATCH 2/6] Fix spelling
141+
142+
---
143+
include/oneapi/tbb/concurrent_queue.h | 4 ++--
144+
1 file changed, 2 insertions(+), 2 deletions(-)
145+
146+
diff --git a/include/oneapi/tbb/concurrent_queue.h b/include/oneapi/tbb/concurrent_queue.h
147+
index 8b5f289104..8083b5a2db 100644
148+
--- a/include/oneapi/tbb/concurrent_queue.h
149+
+++ b/include/oneapi/tbb/concurrent_queue.h
150+
@@ -183,7 +183,7 @@ class concurrent_queue {
151+
bool internal_try_pop( void* dst ) {
152+
ticket_type k;
153+
do {
154+
- // Bassicaly, we need to read `head` before `tail`. To achive it we build happens-before on `head`
155+
+ // Basically, we need to read `head` before `tail`. To achieve it we build happens-before on `head`
156+
k = my_queue_representation->head_counter.load(std::memory_order_acquire);
157+
do {
158+
if (static_cast<std::ptrdiff_t>(my_queue_representation->tail_counter.load(std::memory_order_relaxed) - k) <= 0) {
159+
@@ -515,7 +515,7 @@ class concurrent_bounded_queue {
160+
bool internal_pop_if_present( void* dst ) {
161+
ticket_type ticket;
162+
do {
163+
- // Bassicaly, we need to read `head` before `tail`. To achive it we build happens-before on `head`
164+
+ // Basically, we need to read `head` before `tail`. To achieve it we build happens-before on `head`
165+
ticket = my_queue_representation->head_counter.load(std::memory_order_acquire);
166+
do {
167+
if (static_cast<std::ptrdiff_t>(my_queue_representation->tail_counter.load(std::memory_order_relaxed) - ticket) <= 0) { // queue is empty
168+
169+
From 8895e287d44120b1ebfc4eab367fc505456c5100 Mon Sep 17 00:00:00 2001
170+
From: Alexei Katranov <alexei.katranov@intel.com>
171+
Date: Tue, 1 Mar 2022 15:00:08 +0300
172+
Subject: [PATCH 3/6] Factor out try_pop
173+
174+
Signed-off-by: Alexei Katranov <alexei.katranov@intel.com>
175+
---
176+
include/oneapi/tbb/concurrent_queue.h | 56 +++++++++++++--------------
177+
1 file changed, 26 insertions(+), 30 deletions(-)
178+
179+
diff --git a/include/oneapi/tbb/concurrent_queue.h b/include/oneapi/tbb/concurrent_queue.h
180+
index 8083b5a2db..45c8cf9024 100644
181+
--- a/include/oneapi/tbb/concurrent_queue.h
182+
+++ b/include/oneapi/tbb/concurrent_queue.h
183+
@@ -28,6 +28,24 @@ namespace tbb {
184+
namespace detail {
185+
namespace d2 {
186+
187+
+template <typename QueueRep, typename Allocator>
188+
+std::pair<bool, ticket_type> internal_try_pop_impl(void* dst, QueueRep& queue, Allocator& alloc ) {
189+
+ ticket_type ticket{};
190+
+ do {
191+
+ // Basically, we need to read `head` before `tail`. To achieve it we build happens-before on `head`
192+
+ ticket = queue.head_counter.load(std::memory_order_acquire);
193+
+ do {
194+
+ if (static_cast<std::ptrdiff_t>(queue.tail_counter.load(std::memory_order_relaxed) - ticket) <= 0) { // queue is empty
195+
+ // Queue is empty
196+
+ return { false, ticket };
197+
+ }
198+
+ // Queue had item with ticket k when we looked. Attempt to get that item.
199+
+ // Another thread snatched the item, retry.
200+
+ } while (!queue.head_counter.compare_exchange_strong(ticket, ticket + 1));
201+
+ } while (!queue.choose(ticket).pop(dst, ticket, queue, alloc));
202+
+ return { true, ticket };
203+
+}
204+
+
205+
// A high-performance thread-safe non-blocking concurrent queue.
206+
// Multiple threads may each push and pop concurrently.
207+
// Assignment construction is not allowed.
208+
@@ -181,21 +199,7 @@ class concurrent_queue {
209+
}
210+
211+
bool internal_try_pop( void* dst ) {
212+
- ticket_type k;
213+
- do {
214+
- // Basically, we need to read `head` before `tail`. To achieve it we build happens-before on `head`
215+
- k = my_queue_representation->head_counter.load(std::memory_order_acquire);
216+
- do {
217+
- if (static_cast<std::ptrdiff_t>(my_queue_representation->tail_counter.load(std::memory_order_relaxed) - k) <= 0) {
218+
- // Queue is empty
219+
- return false;
220+
- }
221+
-
222+
- // Queue had item with ticket k when we looked. Attempt to get that item.
223+
- // Another thread snatched the item, retry.
224+
- } while (!my_queue_representation->head_counter.compare_exchange_strong(k, k + 1));
225+
- } while (!my_queue_representation->choose(k).pop(dst, k, *my_queue_representation, my_allocator));
226+
- return true;
227+
+ return internal_try_pop_impl(dst, *my_queue_representation, my_allocator).first;
228+
}
229+
230+
template <typename Container, typename Value, typename A>
231+
@@ -513,22 +517,14 @@ class concurrent_bounded_queue {
232+
}
233+
234+
bool internal_pop_if_present( void* dst ) {
235+
- ticket_type ticket;
236+
- do {
237+
- // Basically, we need to read `head` before `tail`. To achieve it we build happens-before on `head`
238+
- ticket = my_queue_representation->head_counter.load(std::memory_order_acquire);
239+
- do {
240+
- if (static_cast<std::ptrdiff_t>(my_queue_representation->tail_counter.load(std::memory_order_relaxed) - ticket) <= 0) { // queue is empty
241+
- // Queue is empty
242+
- return false;
243+
- }
244+
- // Queue had item with ticket k when we looked. Attempt to get that item.
245+
- // Another thread snatched the item, retry.
246+
- } while (!my_queue_representation->head_counter.compare_exchange_strong(ticket, ticket + 1));
247+
- } while (!my_queue_representation->choose(ticket).pop(dst, ticket, *my_queue_representation, my_allocator));
248+
+ bool present{};
249+
+ ticket_type ticket{};
250+
+ std::tie(present, ticket) = internal_try_pop_impl(dst, *my_queue_representation, my_allocator);
251+
252+
- r1::notify_bounded_queue_monitor(my_monitors, cbq_slots_avail_tag, ticket);
253+
- return true;
254+
+ if (present) {
255+
+ r1::notify_bounded_queue_monitor(my_monitors, cbq_slots_avail_tag, ticket);
256+
+ }
257+
+ return present;
258+
}
259+
260+
void internal_abort() {
261+
262+
From 14532b2b7551f9a20653b7f4f692f8e52b35adbb Mon Sep 17 00:00:00 2001
263+
From: Alex <alexei.katranov@intel.com>
264+
Date: Tue, 1 Mar 2022 15:09:42 +0300
265+
Subject: [PATCH 4/6] Update test_concurrent_queue_whitebox.cpp
266+
267+
Fix copyright year
268+
---
269+
test/tbb/test_concurrent_queue_whitebox.cpp | 2 +-
270+
1 file changed, 1 insertion(+), 1 deletion(-)
271+
272+
diff --git a/test/tbb/test_concurrent_queue_whitebox.cpp b/test/tbb/test_concurrent_queue_whitebox.cpp
273+
index 59ce7f54cb..1ba1530e70 100644
274+
--- a/test/tbb/test_concurrent_queue_whitebox.cpp
275+
+++ b/test/tbb/test_concurrent_queue_whitebox.cpp
276+
@@ -1,5 +1,5 @@
277+
/*
278+
- Copyright (c) 2005-2021 Intel Corporation
279+
+ Copyright (c) 2005-2022 Intel Corporation
280+
281+
Licensed under the Apache License, Version 2.0 (the "License");
282+
you may not use this file except in compliance with the License.
283+
284+
From 073cac5d77fc7045aad2f96d92b6030d71617cf4 Mon Sep 17 00:00:00 2001
285+
From: pavelkumbrasev <pavel.kumbrasev@intel.com>
286+
Date: Fri, 4 Nov 2022 08:52:17 +0000
287+
Subject: [PATCH 6/6] Apply review suggestions
288+
289+
Signed-off-by: pavelkumbrasev <pavel.kumbrasev@intel.com>
290+
---
291+
include/oneapi/tbb/concurrent_queue.h | 2 +-
292+
include/oneapi/tbb/detail/_concurrent_queue_base.h | 2 +-
293+
2 files changed, 2 insertions(+), 2 deletions(-)
294+
295+
diff --git a/include/oneapi/tbb/concurrent_queue.h b/include/oneapi/tbb/concurrent_queue.h
296+
index 0a821efa2a..d694bd2b47 100644
297+
--- a/include/oneapi/tbb/concurrent_queue.h
298+
+++ b/include/oneapi/tbb/concurrent_queue.h
299+
@@ -32,7 +32,7 @@ template <typename QueueRep, typename Allocator>
300+
std::pair<bool, ticket_type> internal_try_pop_impl(void* dst, QueueRep& queue, Allocator& alloc ) {
301+
ticket_type ticket{};
302+
do {
303+
- // Basically, we need to read `head` before `tail`. To achieve it we build happens-before on `head`
304+
+ // Basically, we need to read `head_counter` before `tail_counter`. To achieve it we build happens-before on `head_counter`
305+
ticket = queue.head_counter.load(std::memory_order_acquire);
306+
do {
307+
if (static_cast<std::ptrdiff_t>(queue.tail_counter.load(std::memory_order_relaxed) - ticket) <= 0) { // queue is empty
308+
diff --git a/include/oneapi/tbb/detail/_concurrent_queue_base.h b/include/oneapi/tbb/detail/_concurrent_queue_base.h
309+
index b51b2e419b..b868797985 100644
310+
--- a/include/oneapi/tbb/detail/_concurrent_queue_base.h
311+
+++ b/include/oneapi/tbb/detail/_concurrent_queue_base.h
312+
@@ -336,7 +336,7 @@ class micro_queue {
313+
}
314+
315+
void spin_wait_until_my_turn( std::atomic<ticket_type>& counter, ticket_type k, queue_rep_type& rb ) const {
316+
- for (atomic_backoff b(true);; b.pause()) {
317+
+ for (atomic_backoff b{};; b.pause()) {
318+
ticket_type c = counter.load(std::memory_order_acquire);
319+
if (c == k) return;
320+
else if (c & 1) {

tbb.spec

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,13 @@
55
%define github_user oneapi-src
66
%define github_repo oneTBB
77
Source: git+https://github.com/%{github_user}/%{github_repo}.git?obj=%{branch}/%{tag}&export=%{n}-%{realversion}&output=/%{n}-%{branch}-%{tag}.tgz
8+
Patch0: tbb-782
89
Requires: hwloc
910
BuildRequires: cmake
1011

1112
%prep
1213
%setup -n %{n}-%{realversion}
14+
%patch0 -p1
1315

1416
%build
1517
rm -rf %{_builddir}/build

0 commit comments

Comments
 (0)