diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml
index dbb731068f..39538d145e 100644
--- a/.github/workflows/build.yml
+++ b/.github/workflows/build.yml
@@ -86,6 +86,11 @@ jobs:
- name: Run cargo test
run: cargo test --features="alloc,defmt,mpmc_large,portable-atomic-critical-section,serde,ufmt,bytes,zeroize"
+ - name: Run loom tests
+ run: cargo test -- loom
+ continue-on-error: true
+ env:
+ RUSTFLAGS: '--cfg loom'
# Run cargo fmt --check
style:
name: style
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 5dea8c1c44..1419d89a79 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -14,6 +14,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/).
- Implement `TryFrom` for `Deque` from array.
- Switch from `serde` to `serde_core` for enabling faster compilations.
- Implement `Zeroize` trait for all data structures with the `zeroize` feature to securely clear sensitive data from memory.
+- `mpmc::Queue`: document non-lock free behaviour, and add loom tests
## [v0.9.1] - 2025-08-19
diff --git a/Cargo.toml b/Cargo.toml
index 7390c86bc9..db049734ef 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -76,6 +76,9 @@ stable_deref_trait = { version = "1", default-features = false }
critical-section = { version = "1.1", features = ["std"] }
static_assertions = "1.1.0"
+[target.'cfg(loom)'.dependencies]
+loom = "0.7.2"
+
[package.metadata.docs.rs]
features = [
"bytes",
@@ -89,3 +92,6 @@ features = [
# for the pool module
targets = ["i686-unknown-linux-gnu"]
rustdoc-args = ["--cfg", "docsrs"]
+
+[lints.rust]
+unexpected_cfgs = { level = "warn", check-cfg = ['cfg(loom)'] }
diff --git a/src/mpmc.rs b/src/mpmc.rs
index f676c7c93a..9961d60f49 100644
--- a/src/mpmc.rs
+++ b/src/mpmc.rs
@@ -61,6 +61,16 @@
//! - The numbers reported correspond to the successful path, i.e. `dequeue` returning `Some`
//! and `enqueue` returning `Ok`.
//!
+//!
+//!
+//!
+//! This implementation is not fully lock-free. If a thread or task gets preempted during
+//! a `dequeue` or `enqueue` operation, it may prevent other operations from succeeding until
+//! it's scheduled again to finish its operation.
+//!
+//! See for more details.
+//!
+//!
//! # References
//!
//! This is an implementation of Dmitry Vyukov's [bounded MPMC queue], minus the
@@ -70,7 +80,10 @@
use core::{cell::UnsafeCell, mem::MaybeUninit};
-#[cfg(not(feature = "portable-atomic"))]
+#[cfg(loom)]
+use loom::sync::atomic;
+
+#[cfg(not(any(feature = "portable-atomic", loom)))]
use core::sync::atomic;
#[cfg(feature = "portable-atomic")]
use portable_atomic as atomic;
@@ -113,6 +126,16 @@ pub struct QueueInner {
///
///
/// The maximum value of `N` is 128 if the `mpmc_large` feature is not enabled.
+///
+///
+///
+/// This implementation is not fully lock-free. If a thread or task gets preempted during
+/// a `dequeue` or `enqueue` operation, it may prevent other operations from succeeding until
+/// it's scheduled again to finish its operation.
+///
+/// See for more details.
+///
+///
pub type Queue = QueueInner>;
/// A [`Queue`] with dynamic capacity.
@@ -121,6 +144,7 @@ pub type Queue = QueueInner>;
pub type QueueView = QueueInner;
impl Queue {
+ #[cfg(not(loom))]
/// Creates an empty queue.
pub const fn new() -> Self {
const {
@@ -144,6 +168,26 @@ impl Queue {
}
}
+ /// Creates an empty queue.
+ #[cfg(loom)]
+ pub fn new() -> Self {
+ use core::array;
+
+ const {
+ assert!(N > 1);
+ assert!(N.is_power_of_two());
+ assert!(N < UintSize::MAX as usize);
+ }
+
+ let result_cells: [Cell; N] = array::from_fn(|idx| Cell::new(idx));
+
+ Self {
+ buffer: UnsafeCell::new(result_cells),
+ dequeue_pos: AtomicTargetSize::new(0),
+ enqueue_pos: AtomicTargetSize::new(0),
+ }
+ }
+
/// Used in `Storage` implementation.
pub(crate) fn as_view_private(&self) -> &QueueView {
self
@@ -247,12 +291,20 @@ struct Cell {
}
impl Cell {
+ #[cfg(not(loom))]
const fn new(seq: usize) -> Self {
Self {
data: MaybeUninit::uninit(),
sequence: AtomicTargetSize::new(seq as UintSize),
}
}
+ #[cfg(loom)]
+ fn new(seq: usize) -> Self {
+ Self {
+ data: MaybeUninit::uninit(),
+ sequence: AtomicTargetSize::new(seq as UintSize),
+ }
+ }
}
unsafe fn dequeue(
@@ -342,6 +394,7 @@ unsafe fn enqueue(
Ok(())
}
+#[cfg(not(loom))]
#[cfg(test)]
mod tests {
use static_assertions::assert_not_impl_any;
@@ -420,3 +473,63 @@ mod tests {
q.enqueue(0x55).unwrap_err();
}
}
+#[cfg(all(loom, test))]
+mod tests_loom {
+ use super::*;
+ use std::sync::Arc;
+ const N: usize = 4;
+
+ #[test]
+ #[cfg(loom)]
+ fn loom_issue_583_enqueue() {
+ loom::model(|| {
+ let q0 = Arc::new(Queue::::new());
+ q0.enqueue(0).unwrap();
+ q0.enqueue(1).unwrap();
+ q0.enqueue(2).unwrap();
+ q0.enqueue(3).unwrap();
+ let model_thread = || {
+ let q0 = q0.clone();
+ move || {
+ for k in 0..10 {
+ let Some(i) = q0.dequeue() else {
+ panic!("{k}");
+ };
+ if q0.enqueue(k as u8).is_err() {
+ panic!("{i}");
+ }
+ }
+ }
+ };
+
+ let h1 = loom::thread::spawn(model_thread());
+ let h2 = loom::thread::spawn(model_thread());
+ h1.join().unwrap();
+ h2.join().unwrap();
+ });
+ }
+
+ #[test]
+ #[cfg(loom)]
+ fn loom_issue_583_dequeue() {
+ loom::model(|| {
+ let q0 = Arc::new(Queue::::new());
+ let model_thread = || {
+ let q0 = q0.clone();
+ move || {
+ for k in 0..10 {
+ q0.enqueue(k as u8).unwrap();
+ if q0.dequeue().is_none() {
+ panic!("{k}");
+ }
+ }
+ }
+ };
+
+ let h1 = loom::thread::spawn(model_thread());
+ let h2 = loom::thread::spawn(model_thread());
+ h1.join().unwrap();
+ h2.join().unwrap();
+ });
+ }
+}
diff --git a/tests/tsan.rs b/tests/tsan.rs
index 14391e2435..00bf9d3361 100644
--- a/tests/tsan.rs
+++ b/tests/tsan.rs
@@ -1,5 +1,6 @@
#![deny(rust_2018_compatibility)]
#![deny(rust_2018_idioms)]
+#![cfg(not(loom))]
use std::{ptr::addr_of_mut, thread};