Skip to content

Commit 418ae90

Browse files
committed
Random dequeue queue
1 parent 9340aae commit 418ae90

4 files changed

Lines changed: 228 additions & 1 deletion

File tree

relaxed_concurrent_fifo/benchmark.h

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -320,11 +320,17 @@ class benchmark_provider_generic : public benchmark_provider<BENCHMARK> {
320320
};
321321

322322
#ifdef __GNUC__
323+
template <typename BENCHMARK>
324+
using benchmark_provider_ws_kfifo = benchmark_provider_generic<ws_k_fifo<uint64_t>, BENCHMARK, size_t>;
325+
323326
template <typename BENCHMARK>
324327
using benchmark_provider_ss_kfifo = benchmark_provider_generic<ss_k_fifo<uint64_t>, BENCHMARK, size_t>;
325328

326329
template <typename BENCHMARK>
327-
using benchmark_provider_ws_kfifo = benchmark_provider_generic<ws_k_fifo<uint64_t>, BENCHMARK, size_t>;
330+
using benchmark_provider_ws_rdq = benchmark_provider_generic<ws_random_dequeue_queue<uint64_t>, BENCHMARK, size_t, size_t>;
331+
332+
template <typename BENCHMARK>
333+
using benchmark_provider_ss_rdq = benchmark_provider_generic<ss_random_dequeue_queue<uint64_t>, BENCHMARK, size_t, size_t>;
328334
#endif // __GNUC__
329335

330336
template <typename BENCHMARK, size_t BLOCK_MULTIPLIER, size_t CELLS_PER_BLOCK, typename BITSET_TYPE = uint8_t>
Lines changed: 174 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,174 @@
1+
// Copyright (c) 2012-2013, the Scal Project Authors. All rights reserved.
2+
// Please see the AUTHORS file for details. Use of this source code is governed
3+
// by a BSD license that can be found in the LICENSE file.
4+
5+
// Implementing the "Random Deqeued Queue" from:
6+
//
7+
// Y. Afek, G. Korland, and E. Yanovsky. quasi-linearizability: relaxed
8+
// consistency for improved concurrency. In Proceedings of the 14th
9+
// international conference on Principles of distributed systems, OPODIS’10,
10+
// pages 395–410, Berlin, Heidelberg, 2010. Springer-Verlag.
11+
//
12+
// The more detailed tech report:
13+
//
14+
// Yehuda Afek, Guy Korland and Eitan Yanovsky, Quasi-Linearizability: relaxed
15+
// consistency for improved concurrency, Technical report, TAU '10.
16+
//
17+
// Available at (last accessed: 2013/01/08):
18+
// https://docs.google.com/file/d/1dED19mzUmCozvl_PVufxux3vsWtUTTf3KFcanAyrH3io47nllSWGS9gTVait/edit?hl=en
19+
20+
#ifndef SCAL_DATASTRUCTURES_RANDOM_DEQUEUE_QUEUE_H_
21+
#define SCAL_DATASTRUCTURES_RANDOM_DEQUEUE_QUEUE_H_
22+
23+
#include <assert.h>
24+
#include <inttypes.h>
25+
#include <stdio.h>
26+
27+
#include "datastructures/queue.h"
28+
#include "util/allocation.h"
29+
#include "util/atomic_value_new.h"
30+
#include "util/platform.h"
31+
#include "util/random.h"
32+
33+
namespace scal {
34+
35+
namespace rdq_detail {
36+
37+
template<typename T>
38+
struct Node : ThreadLocalMemory<kCachePrefetch> {
39+
explicit Node(T item) : next(TaggedValue<Node<T>*>()),
40+
value(item),
41+
deleted(false) { }
42+
43+
AtomicTaggedValue<Node<T>*, 64, 64> next;
44+
T value;
45+
bool deleted;
46+
};
47+
48+
} // namespace rdq_detail
49+
50+
51+
template<typename T>
52+
class RandomDequeueQueue : public Queue<T> {
53+
public:
54+
RandomDequeueQueue(uint64_t quasi_factor, uint64_t max_retries);
55+
bool enqueue(T item);
56+
bool dequeue(T *item);
57+
58+
private:
59+
typedef rdq_detail::Node<T> Node;
60+
typedef TaggedValue<Node*> NodePtr;
61+
typedef AtomicTaggedValue<Node*, 64, 64> AtomicNodePtr;
62+
63+
uint64_t quasi_factor_;
64+
uint64_t max_retries_;
65+
66+
AtomicNodePtr* head_;
67+
AtomicNodePtr* tail_;
68+
};
69+
70+
template<typename T>
71+
RandomDequeueQueue<T>::RandomDequeueQueue(uint64_t quasi_factor,
72+
uint64_t max_retries)
73+
: quasi_factor_(quasi_factor),
74+
max_retries_(max_retries),
75+
head_(new AtomicNodePtr()),
76+
tail_(new AtomicNodePtr()) {
77+
Node* n = new Node((T)NULL);
78+
head_->store(NodePtr(n, 0));
79+
tail_->store(NodePtr(n, 0));
80+
}
81+
82+
83+
template<typename T> bool
84+
RandomDequeueQueue<T>::enqueue(T item) {
85+
assert(item != (T)NULL);
86+
Node* n = new Node(item);
87+
NodePtr tail_old;
88+
NodePtr next;
89+
while (true) {
90+
tail_old = tail_->load();
91+
next = tail_old.value()->next.load();
92+
if (tail_old == tail_->load()) {
93+
if (next.value() == NULL) {
94+
NodePtr next_new(n, next.tag() + 1);
95+
if (tail_old.value()->next.swap(next, next_new)) {
96+
break;
97+
}
98+
} else {
99+
tail_->swap(tail_old, NodePtr(next.value(), tail_old.tag() + 1));
100+
}
101+
}
102+
}
103+
tail_->swap(tail_old, NodePtr(n, tail_old.tag() + 1));
104+
return true;
105+
}
106+
107+
108+
template<typename T>
109+
bool RandomDequeueQueue<T>::dequeue(T *item) {
110+
NodePtr tail_old;
111+
NodePtr head_old;
112+
NodePtr next;
113+
uint64_t retries = 0;
114+
uint64_t random_index;
115+
while (true) {
116+
TOP_WHILE:
117+
head_old = head_->load();
118+
tail_old = tail_->load();
119+
next = head_old.value()->next.load();
120+
if (head_->load() == head_old) {
121+
if (head_old == tail_old) {
122+
if (next.value() == NULL) {
123+
return false;
124+
}
125+
tail_->swap(tail_old, NodePtr(next.value(), tail_old.tag() + 1));
126+
} else {
127+
if (retries >= max_retries_) {
128+
random_index = 0;
129+
} else {
130+
random_index = pseudorand() % quasi_factor_;
131+
}
132+
retries++;
133+
134+
Node* node = next.value();
135+
if (random_index == 0) {
136+
while (node != NULL && node->deleted == true) {
137+
NodePtr head_new(node, head_old.tag() + 1);
138+
if (!head_->swap(head_old, head_new) || node == tail_old.value()) {
139+
goto TOP_WHILE;
140+
}
141+
head_old = head_new;
142+
next = head_old.value()->next.load();
143+
node = next.value();
144+
}
145+
if (node == NULL) {
146+
return false;
147+
}
148+
if (node->deleted == false &&
149+
__sync_bool_compare_and_swap(&(node->deleted), false, true)) {
150+
*item = node->value;
151+
return true;
152+
}
153+
} else {
154+
uint64_t i;
155+
for (i = 0;
156+
(i < random_index) && (node->next.load().value() != NULL);
157+
++i) {
158+
node = node->next.load().value();
159+
}
160+
if (node->deleted == false &&
161+
__sync_bool_compare_and_swap(&(node->deleted), false, true)) {
162+
*item = node->value;
163+
return true;
164+
}
165+
}
166+
}
167+
}
168+
}
169+
return true;
170+
}
171+
172+
} // namespace scal
173+
174+
#endif // SCAL_DATASTRUCTURES_RANDOM_DEQUEUE_QUEUE_H_

relaxed_concurrent_fifo/contenders/scal/scal_wrapper.h

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
#include "boundedsize_kfifo.h"
55
#include "rts_queue.h"
66
#include "segment_queue.h"
7+
#include "random_dequeue_queue.h"
78

89
template <typename T, template <typename> typename FIFO>
910
struct scal_wrapper_base {
@@ -70,4 +71,14 @@ struct ss_segment_queue : scal_wrapper_base<T, scal::SegmentQueue> {
7071
ss_segment_queue([[maybe_unused]] size_t thread_count, [[maybe_unused]] size_t size, size_t s) : scal_wrapper_base<T, scal::SegmentQueue>{ s } {}
7172
};
7273

74+
template <typename T>
75+
struct ws_random_dequeue_queue : scal_wrapper_base<T, scal::RandomDequeueQueue> {
76+
ws_random_dequeue_queue([[maybe_unused]] size_t thread_count, [[maybe_unused]] size_t size, size_t quasi_factor, size_t max_retries) : scal_wrapper_base<T, scal::RandomDequeueQueue>{ thread_count * quasi_factor, max_retries } {}
77+
};
78+
79+
template <typename T>
80+
struct ss_random_dequeue_queue : scal_wrapper_base<T, scal::RandomDequeueQueue> {
81+
ss_random_dequeue_queue([[maybe_unused]] size_t thread_count, [[maybe_unused]] size_t size, size_t quasi_factor, size_t max_retries) : scal_wrapper_base<T, scal::RandomDequeueQueue>{ quasi_factor, max_retries } {}
82+
};
83+
7384
#endif // BSKFIFO_WRAPPER_H_INCLUDED

relaxed_concurrent_fifo/main.cpp

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -233,6 +233,42 @@ void add_all_parameter_tuning(std::vector<std::unique_ptr<benchmark_provider<BEN
233233
instances.push_back(std::make_unique<benchmark_provider_ss_kfifo<BENCHMARK>>("2048,kfifo", 2048));
234234
instances.push_back(std::make_unique<benchmark_provider_ss_kfifo<BENCHMARK>>("4096,kfifo", 4096));
235235
instances.push_back(std::make_unique<benchmark_provider_ss_kfifo<BENCHMARK>>("8192,kfifo", 8192));
236+
instances.push_back(std::make_unique<benchmark_provider_ss_rdq<BENCHMARK>>("1,8,rdq", 1, 8));
237+
instances.push_back(std::make_unique<benchmark_provider_ss_rdq<BENCHMARK>>("3,8,rdq", 3, 8));
238+
instances.push_back(std::make_unique<benchmark_provider_ss_rdq<BENCHMARK>>("7,8,rdq", 7, 8));
239+
instances.push_back(std::make_unique<benchmark_provider_ss_rdq<BENCHMARK>>("15,8,rdq", 15, 8));
240+
instances.push_back(std::make_unique<benchmark_provider_ss_rdq<BENCHMARK>>("31,8,rdq", 31, 8));
241+
instances.push_back(std::make_unique<benchmark_provider_ss_rdq<BENCHMARK>>("63,8,rdq", 63, 8));
242+
instances.push_back(std::make_unique<benchmark_provider_ss_rdq<BENCHMARK>>("127,8,rdq", 127, 8));
243+
instances.push_back(std::make_unique<benchmark_provider_ss_rdq<BENCHMARK>>("255,8,rdq", 255, 8));
244+
instances.push_back(std::make_unique<benchmark_provider_ss_rdq<BENCHMARK>>("511,8,rdq", 511, 8));
245+
instances.push_back(std::make_unique<benchmark_provider_ss_rdq<BENCHMARK>>("1023,8,rdq", 1023, 8));
246+
instances.push_back(std::make_unique<benchmark_provider_ss_rdq<BENCHMARK>>("2047,8,rdq", 2047, 8));
247+
instances.push_back(std::make_unique<benchmark_provider_ss_rdq<BENCHMARK>>("4095,8,rdq", 4095, 8));
248+
instances.push_back(std::make_unique<benchmark_provider_ss_rdq<BENCHMARK>>("1,128,rdq", 1, 128));
249+
instances.push_back(std::make_unique<benchmark_provider_ss_rdq<BENCHMARK>>("3,128,rdq", 3, 128));
250+
instances.push_back(std::make_unique<benchmark_provider_ss_rdq<BENCHMARK>>("7,128,rdq", 7, 128));
251+
instances.push_back(std::make_unique<benchmark_provider_ss_rdq<BENCHMARK>>("15,128,rdq", 15, 128));
252+
instances.push_back(std::make_unique<benchmark_provider_ss_rdq<BENCHMARK>>("31,128,rdq", 31, 128));
253+
instances.push_back(std::make_unique<benchmark_provider_ss_rdq<BENCHMARK>>("63,128,rdq", 63, 128));
254+
instances.push_back(std::make_unique<benchmark_provider_ss_rdq<BENCHMARK>>("127,128,rdq", 127, 128));
255+
instances.push_back(std::make_unique<benchmark_provider_ss_rdq<BENCHMARK>>("255,128,rdq", 255, 128));
256+
instances.push_back(std::make_unique<benchmark_provider_ss_rdq<BENCHMARK>>("511,128,rdq", 511, 128));
257+
instances.push_back(std::make_unique<benchmark_provider_ss_rdq<BENCHMARK>>("1023,128,rdq", 1023, 128));
258+
instances.push_back(std::make_unique<benchmark_provider_ss_rdq<BENCHMARK>>("2047,128,rdq", 2047, 128));
259+
instances.push_back(std::make_unique<benchmark_provider_ss_rdq<BENCHMARK>>("4095,128,rdq", 4095, 128));
260+
instances.push_back(std::make_unique<benchmark_provider_ss_rdq<BENCHMARK>>("1,1024,rdq", 1, 1024));
261+
instances.push_back(std::make_unique<benchmark_provider_ss_rdq<BENCHMARK>>("3,1024,rdq", 3, 1024));
262+
instances.push_back(std::make_unique<benchmark_provider_ss_rdq<BENCHMARK>>("7,1024,rdq", 7, 1024));
263+
instances.push_back(std::make_unique<benchmark_provider_ss_rdq<BENCHMARK>>("15,1024,rdq", 15, 1024));
264+
instances.push_back(std::make_unique<benchmark_provider_ss_rdq<BENCHMARK>>("31,1024,rdq", 31, 1024));
265+
instances.push_back(std::make_unique<benchmark_provider_ss_rdq<BENCHMARK>>("63,1024,rdq", 63, 1024));
266+
instances.push_back(std::make_unique<benchmark_provider_ss_rdq<BENCHMARK>>("127,1024,rdq", 127, 1024));
267+
instances.push_back(std::make_unique<benchmark_provider_ss_rdq<BENCHMARK>>("255,1024,rdq", 255, 1024));
268+
instances.push_back(std::make_unique<benchmark_provider_ss_rdq<BENCHMARK>>("511,1024,rdq", 511, 1024));
269+
instances.push_back(std::make_unique<benchmark_provider_ss_rdq<BENCHMARK>>("1023,1024,rdq", 1023, 1024));
270+
instances.push_back(std::make_unique<benchmark_provider_ss_rdq<BENCHMARK>>("2047,1024,rdq", 2047, 1024));
271+
instances.push_back(std::make_unique<benchmark_provider_ss_rdq<BENCHMARK>>("4095,1024,rdq", 4095, 1024));
236272
instances.push_back(std::make_unique<benchmark_provider_generic<rts_queue<uint64_t>, BENCHMARK>>("rts-queue"));
237273
instances.push_back(std::make_unique<benchmark_provider_generic<adapter<uint64_t, LCRQWrapped>, BENCHMARK>>("lcrq"));
238274
#endif // __GNUC__

0 commit comments

Comments
 (0)