Skip to content

Commit 16524c5

Browse files
committed
mpmc: Document #583 and add corresponding loom test
This patch documents #583 adds failing loom tests to exercise #583 hoping that a future algorithm change can make them pass. The tests are ran in CI but their results is ignored for now as they fail. Ideally the `loom` tests will cover more usage of `mpmc::Queue` in the future and also cover `spsc`.
1 parent 5ca8839 commit 16524c5

File tree

5 files changed

+127
-1
lines changed

5 files changed

+127
-1
lines changed

.github/workflows/build.yml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,11 @@ jobs:
8686
- name: Run cargo test
8787
run: cargo test --features="alloc,defmt,mpmc_large,portable-atomic-critical-section,serde,ufmt,bytes,zeroize"
8888

89+
- name: Run loom tests
90+
run: cargo test -- loom
91+
continue-on-error: true
92+
env:
93+
RUSTFLAGS: '--cfg loom'
8994
# Run cargo fmt --check
9095
style:
9196
name: style

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/).
1414
- Implement `TryFrom` for `Deque` from array.
1515
- Switch from `serde` to `serde_core` for enabling faster compilations.
1616
- Implement `Zeroize` trait for all data structures with the `zeroize` feature to securely clear sensitive data from memory.
17+
- `mpmc::Queue`: document non-lock free behaviour, and add loom tests
1718

1819
## [v0.9.1] - 2025-08-19
1920

Cargo.toml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,9 @@ stable_deref_trait = { version = "1", default-features = false }
7676
critical-section = { version = "1.1", features = ["std"] }
7777
static_assertions = "1.1.0"
7878

79+
[target.'cfg(loom)'.dependencies]
80+
loom = "0.7.2"
81+
7982
[package.metadata.docs.rs]
8083
features = [
8184
"bytes",
@@ -89,3 +92,6 @@ features = [
8992
# for the pool module
9093
targets = ["i686-unknown-linux-gnu"]
9194
rustdoc-args = ["--cfg", "docsrs"]
95+
96+
[lints.rust]
97+
unexpected_cfgs = { level = "warn", check-cfg = ['cfg(loom)'] }

src/mpmc.rs

Lines changed: 114 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,16 @@
6161
//! - The numbers reported correspond to the successful path, i.e. `dequeue` returning `Some`
6262
//! and `enqueue` returning `Ok`.
6363
//!
64+
//!
65+
//! <div class="warning">
66+
//!
67+
//! This implementation is not fully lock-free. If a thread or task gets preempted during
68+
//! a `dequeue` or `enqueue` operation, it may prevent other operations from succeeding until
69+
//! it's scheduled again to finish its operation.
70+
//!
71+
//! See <https://github.com/rust-embedded/heapless/issues/583> for more details.
72+
//!
73+
//! </div>
6474
//! # References
6575
//!
6676
//! This is an implementation of Dmitry Vyukov's [bounded MPMC queue], minus the
@@ -70,7 +80,10 @@
7080
7181
use core::{cell::UnsafeCell, mem::MaybeUninit};
7282

73-
#[cfg(not(feature = "portable-atomic"))]
83+
#[cfg(loom)]
84+
use loom::sync::atomic;
85+
86+
#[cfg(not(any(feature = "portable-atomic", loom)))]
7487
use core::sync::atomic;
7588
#[cfg(feature = "portable-atomic")]
7689
use portable_atomic as atomic;
@@ -113,6 +126,16 @@ pub struct QueueInner<T, S: Storage> {
113126
/// </div>
114127
///
115128
/// The maximum value of `N` is 128 if the `mpmc_large` feature is not enabled.
129+
///
130+
/// <div class="warning">
131+
///
132+
/// This implementation is not fully lock-free. If a thread or task gets preempted during
133+
/// a `dequeue` or `enqueue` operation, it may prevent other operations from succeeding until
134+
/// it's scheduled again to finish its operation.
135+
///
136+
/// See <https://github.com/rust-embedded/heapless/issues/583> for more details.
137+
///
138+
/// </div>
116139
pub type Queue<T, const N: usize> = QueueInner<T, OwnedStorage<N>>;
117140

118141
/// A [`Queue`] with dynamic capacity.
@@ -121,6 +144,7 @@ pub type Queue<T, const N: usize> = QueueInner<T, OwnedStorage<N>>;
121144
pub type QueueView<T> = QueueInner<T, ViewStorage>;
122145

123146
impl<T, const N: usize> Queue<T, N> {
147+
#[cfg(not(loom))]
124148
/// Creates an empty queue.
125149
pub const fn new() -> Self {
126150
const {
@@ -144,6 +168,26 @@ impl<T, const N: usize> Queue<T, N> {
144168
}
145169
}
146170

171+
/// Creates an empty queue.
172+
#[cfg(loom)]
173+
pub fn new() -> Self {
174+
use core::array;
175+
176+
const {
177+
assert!(N > 1);
178+
assert!(N.is_power_of_two());
179+
assert!(N < UintSize::MAX as usize);
180+
}
181+
182+
let result_cells: [Cell<T>; N] = array::from_fn(|idx| Cell::new(idx));
183+
184+
Self {
185+
buffer: UnsafeCell::new(result_cells),
186+
dequeue_pos: AtomicTargetSize::new(0),
187+
enqueue_pos: AtomicTargetSize::new(0),
188+
}
189+
}
190+
147191
/// Used in `Storage` implementation.
148192
pub(crate) fn as_view_private(&self) -> &QueueView<T> {
149193
self
@@ -247,12 +291,20 @@ struct Cell<T> {
247291
}
248292

249293
impl<T> Cell<T> {
294+
#[cfg(not(loom))]
250295
const fn new(seq: usize) -> Self {
251296
Self {
252297
data: MaybeUninit::uninit(),
253298
sequence: AtomicTargetSize::new(seq as UintSize),
254299
}
255300
}
301+
#[cfg(loom)]
302+
fn new(seq: usize) -> Self {
303+
Self {
304+
data: MaybeUninit::uninit(),
305+
sequence: AtomicTargetSize::new(seq as UintSize),
306+
}
307+
}
256308
}
257309

258310
unsafe fn dequeue<T>(
@@ -342,6 +394,7 @@ unsafe fn enqueue<T>(
342394
Ok(())
343395
}
344396

397+
#[cfg(not(loom))]
345398
#[cfg(test)]
346399
mod tests {
347400
use static_assertions::assert_not_impl_any;
@@ -420,3 +473,63 @@ mod tests {
420473
q.enqueue(0x55).unwrap_err();
421474
}
422475
}
476+
#[cfg(all(loom, test))]
477+
mod tests_loom {
478+
use super::*;
479+
use std::sync::Arc;
480+
const N: usize = 4;
481+
482+
#[test]
483+
#[cfg(loom)]
484+
fn loom_issue_583_enqueue() {
485+
loom::model(|| {
486+
let q0 = Arc::new(Queue::<u8, N>::new());
487+
q0.enqueue(0).unwrap();
488+
q0.enqueue(1).unwrap();
489+
q0.enqueue(2).unwrap();
490+
q0.enqueue(3).unwrap();
491+
let model_thread = || {
492+
let q0 = q0.clone();
493+
move || {
494+
for k in 0..10 {
495+
let Some(i) = q0.dequeue() else {
496+
panic!("{k}");
497+
};
498+
if q0.enqueue(k as u8).is_err() {
499+
panic!("{i}");
500+
}
501+
}
502+
}
503+
};
504+
505+
let h1 = loom::thread::spawn(model_thread());
506+
let h2 = loom::thread::spawn(model_thread());
507+
h1.join().unwrap();
508+
h2.join().unwrap();
509+
});
510+
}
511+
512+
#[test]
513+
#[cfg(loom)]
514+
fn loom_issue_583_dequeue() {
515+
loom::model(|| {
516+
let q0 = Arc::new(Queue::<u8, N>::new());
517+
let model_thread = || {
518+
let q0 = q0.clone();
519+
move || {
520+
for k in 0..10 {
521+
q0.enqueue(k as u8).unwrap();
522+
if q0.dequeue().is_none() {
523+
panic!("{k}");
524+
}
525+
}
526+
}
527+
};
528+
529+
let h1 = loom::thread::spawn(model_thread());
530+
let h2 = loom::thread::spawn(model_thread());
531+
h1.join().unwrap();
532+
h2.join().unwrap();
533+
});
534+
}
535+
}

tests/tsan.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
#![deny(rust_2018_compatibility)]
22
#![deny(rust_2018_idioms)]
3+
#![cfg(not(loom))]
34

45
use std::{ptr::addr_of_mut, thread};
56

0 commit comments

Comments
 (0)