From e36e924280ecf6935b34c26acf7a01415abdf3a3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sosth=C3=A8ne=20Gu=C3=A9don?= Date: Wed, 8 Oct 2025 21:40:19 +0200 Subject: [PATCH] mpmc: Document #583 and add corresponding loom test This patch documents #583 adds failing loom tests to exercise #583 hoping that a future algorithm change can make them pass. The tests are ran in CI but their results is ignored for now as they fail. Ideally the `loom` tests will cover more usage of `mpmc::Queue` in the future and also cover `spsc`. --- .github/workflows/build.yml | 5 ++ CHANGELOG.md | 1 + Cargo.toml | 6 ++ src/mpmc.rs | 115 +++++++++++++++++++++++++++++++++++- tests/tsan.rs | 1 + 5 files changed, 127 insertions(+), 1 deletion(-) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index dbb731068f..39538d145e 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -86,6 +86,11 @@ jobs: - name: Run cargo test run: cargo test --features="alloc,defmt,mpmc_large,portable-atomic-critical-section,serde,ufmt,bytes,zeroize" + - name: Run loom tests + run: cargo test -- loom + continue-on-error: true + env: + RUSTFLAGS: '--cfg loom' # Run cargo fmt --check style: name: style diff --git a/CHANGELOG.md b/CHANGELOG.md index 5dea8c1c44..1419d89a79 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/). - Implement `TryFrom` for `Deque` from array. - Switch from `serde` to `serde_core` for enabling faster compilations. - Implement `Zeroize` trait for all data structures with the `zeroize` feature to securely clear sensitive data from memory. +- `mpmc::Queue`: document non-lock free behaviour, and add loom tests ## [v0.9.1] - 2025-08-19 diff --git a/Cargo.toml b/Cargo.toml index 7390c86bc9..db049734ef 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -76,6 +76,9 @@ stable_deref_trait = { version = "1", default-features = false } critical-section = { version = "1.1", features = ["std"] } static_assertions = "1.1.0" +[target.'cfg(loom)'.dependencies] +loom = "0.7.2" + [package.metadata.docs.rs] features = [ "bytes", @@ -89,3 +92,6 @@ features = [ # for the pool module targets = ["i686-unknown-linux-gnu"] rustdoc-args = ["--cfg", "docsrs"] + +[lints.rust] +unexpected_cfgs = { level = "warn", check-cfg = ['cfg(loom)'] } diff --git a/src/mpmc.rs b/src/mpmc.rs index f676c7c93a..9961d60f49 100644 --- a/src/mpmc.rs +++ b/src/mpmc.rs @@ -61,6 +61,16 @@ //! - The numbers reported correspond to the successful path, i.e. `dequeue` returning `Some` //! and `enqueue` returning `Ok`. //! +//! +//!
+//! +//! This implementation is not fully lock-free. If a thread or task gets preempted during +//! a `dequeue` or `enqueue` operation, it may prevent other operations from succeeding until +//! it's scheduled again to finish its operation. +//! +//! See for more details. +//! +//!
//! # References //! //! This is an implementation of Dmitry Vyukov's [bounded MPMC queue], minus the @@ -70,7 +80,10 @@ use core::{cell::UnsafeCell, mem::MaybeUninit}; -#[cfg(not(feature = "portable-atomic"))] +#[cfg(loom)] +use loom::sync::atomic; + +#[cfg(not(any(feature = "portable-atomic", loom)))] use core::sync::atomic; #[cfg(feature = "portable-atomic")] use portable_atomic as atomic; @@ -113,6 +126,16 @@ pub struct QueueInner { /// /// /// The maximum value of `N` is 128 if the `mpmc_large` feature is not enabled. +/// +///
+/// +/// This implementation is not fully lock-free. If a thread or task gets preempted during +/// a `dequeue` or `enqueue` operation, it may prevent other operations from succeeding until +/// it's scheduled again to finish its operation. +/// +/// See for more details. +/// +///
pub type Queue = QueueInner>; /// A [`Queue`] with dynamic capacity. @@ -121,6 +144,7 @@ pub type Queue = QueueInner>; pub type QueueView = QueueInner; impl Queue { + #[cfg(not(loom))] /// Creates an empty queue. pub const fn new() -> Self { const { @@ -144,6 +168,26 @@ impl Queue { } } + /// Creates an empty queue. + #[cfg(loom)] + pub fn new() -> Self { + use core::array; + + const { + assert!(N > 1); + assert!(N.is_power_of_two()); + assert!(N < UintSize::MAX as usize); + } + + let result_cells: [Cell; N] = array::from_fn(|idx| Cell::new(idx)); + + Self { + buffer: UnsafeCell::new(result_cells), + dequeue_pos: AtomicTargetSize::new(0), + enqueue_pos: AtomicTargetSize::new(0), + } + } + /// Used in `Storage` implementation. pub(crate) fn as_view_private(&self) -> &QueueView { self @@ -247,12 +291,20 @@ struct Cell { } impl Cell { + #[cfg(not(loom))] const fn new(seq: usize) -> Self { Self { data: MaybeUninit::uninit(), sequence: AtomicTargetSize::new(seq as UintSize), } } + #[cfg(loom)] + fn new(seq: usize) -> Self { + Self { + data: MaybeUninit::uninit(), + sequence: AtomicTargetSize::new(seq as UintSize), + } + } } unsafe fn dequeue( @@ -342,6 +394,7 @@ unsafe fn enqueue( Ok(()) } +#[cfg(not(loom))] #[cfg(test)] mod tests { use static_assertions::assert_not_impl_any; @@ -420,3 +473,63 @@ mod tests { q.enqueue(0x55).unwrap_err(); } } +#[cfg(all(loom, test))] +mod tests_loom { + use super::*; + use std::sync::Arc; + const N: usize = 4; + + #[test] + #[cfg(loom)] + fn loom_issue_583_enqueue() { + loom::model(|| { + let q0 = Arc::new(Queue::::new()); + q0.enqueue(0).unwrap(); + q0.enqueue(1).unwrap(); + q0.enqueue(2).unwrap(); + q0.enqueue(3).unwrap(); + let model_thread = || { + let q0 = q0.clone(); + move || { + for k in 0..10 { + let Some(i) = q0.dequeue() else { + panic!("{k}"); + }; + if q0.enqueue(k as u8).is_err() { + panic!("{i}"); + } + } + } + }; + + let h1 = loom::thread::spawn(model_thread()); + let h2 = loom::thread::spawn(model_thread()); + h1.join().unwrap(); + h2.join().unwrap(); + }); + } + + #[test] + #[cfg(loom)] + fn loom_issue_583_dequeue() { + loom::model(|| { + let q0 = Arc::new(Queue::::new()); + let model_thread = || { + let q0 = q0.clone(); + move || { + for k in 0..10 { + q0.enqueue(k as u8).unwrap(); + if q0.dequeue().is_none() { + panic!("{k}"); + } + } + } + }; + + let h1 = loom::thread::spawn(model_thread()); + let h2 = loom::thread::spawn(model_thread()); + h1.join().unwrap(); + h2.join().unwrap(); + }); + } +} diff --git a/tests/tsan.rs b/tests/tsan.rs index 14391e2435..00bf9d3361 100644 --- a/tests/tsan.rs +++ b/tests/tsan.rs @@ -1,5 +1,6 @@ #![deny(rust_2018_compatibility)] #![deny(rust_2018_idioms)] +#![cfg(not(loom))] use std::{ptr::addr_of_mut, thread};