From 4a24f4aa4608df55b97678a40bd7b632633c370b Mon Sep 17 00:00:00 2001 From: Schell Carl Scivally Date: Wed, 8 Jan 2025 06:37:52 +1300 Subject: [PATCH] feature: part out slab allocator into craballoc --- Cargo.lock | 39 ++- Cargo.toml | 6 +- crates/craballoc/Cargo.toml | 18 ++ crates/craballoc/src/lib.rs | 153 +++++++++ crates/craballoc/src/range.rs | 158 +++++++++ crates/craballoc/src/runtime.rs | 163 ++++++++++ crates/craballoc/src/slab.rs | 356 ++++++++++++++++++++ crates/craballoc/src/slab/wgpu_slab.rs | 159 +++++++++ crates/craballoc/src/value.rs | 428 +++++++++++++++++++++++++ 9 files changed, 1473 insertions(+), 7 deletions(-) create mode 100644 crates/craballoc/Cargo.toml create mode 100644 crates/craballoc/src/lib.rs create mode 100644 crates/craballoc/src/range.rs create mode 100644 crates/craballoc/src/runtime.rs create mode 100644 crates/craballoc/src/slab.rs create mode 100644 crates/craballoc/src/slab/wgpu_slab.rs create mode 100644 crates/craballoc/src/value.rs diff --git a/Cargo.lock b/Cargo.lock index cbe8b919..c3f233ce 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1,6 +1,6 @@ # This file is automatically @generated by Cargo. # It is not intended for manual editing. -version = 3 +version = 4 [[package]] name = "ab_glyph" @@ -628,6 +628,20 @@ dependencies = [ "libc", ] +[[package]] +name = "craballoc" +version = "0.1.0" +dependencies = [ + "async-channel 1.9.0", + "bytemuck", + "crabslab", + "log", + "rustc-hash 1.1.0", + "snafu", + "tracing", + "wgpu", +] + [[package]] name = "crabslab" version = "0.6.2" @@ -2897,19 +2911,34 @@ dependencies = [ [[package]] name = "tracing" -version = "0.1.40" +version = "0.1.41" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c3523ab5a71916ccf420eebdf5521fcef02141234bbc0b8a49f2fdc4544364ef" +checksum = "784e0ac535deb450455cbfa28a6f0df145ea1bb7ae51b821cf5e7927fdcfbdd0" dependencies = [ "pin-project-lite", + "tracing-attributes", "tracing-core", ] +[[package]] +name = "tracing-attributes" +version = "0.1.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "395ae124c09f9e6918a2310af6038fba074bcf474ac352496d5910dd59a2226d" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.87", +] + [[package]] name = "tracing-core" -version = "0.1.32" +version = "0.1.33" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c06d3da6113f116aaee68e4d601191614c9053067f9ab7f6edbcb161237daa54" +checksum = "e672c95779cf947c5311f83787af4fa8fffd12fb27e4993211a84bdfd9610f9c" +dependencies = [ + "once_cell", +] [[package]] name = "ttf-parser" diff --git a/Cargo.toml b/Cargo.toml index 95479051..c1a7db9c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,12 +1,13 @@ [workspace] -members = [ +members = [ + "crates/craballoc", "crates/example", "crates/example-culling", "crates/example-wasm", "crates/loading-bytes", "crates/renderling", "crates/renderling-ui", - "crates/sandbox" + "crates/sandbox", ] exclude = ["./shaders"] @@ -34,6 +35,7 @@ serde_json = "1.0.117" send_wrapper = "0.6.0" snafu = "0.7" syn = { version = "2.0.49", features = ["full", "extra-traits", "parsing"] } +tracing = "0.1.41" wasm-bindgen = "0.2" wasm-bindgen-futures = "0.4" web-sys = "0.3" diff --git a/crates/craballoc/Cargo.toml b/crates/craballoc/Cargo.toml new file mode 100644 index 00000000..163d1279 --- /dev/null +++ b/crates/craballoc/Cargo.toml @@ -0,0 +1,18 @@ +[package] +name = "craballoc" +version = "0.1.0" +edition = "2021" + +[features] +default = ["wgpu"] +wgpu = ["dep:wgpu"] + +[dependencies] +async-channel.workspace = true +bytemuck.workspace = true +crabslab.workspace = true +log.workspace = true +rustc-hash.workspace = true +snafu.workspace = true +tracing.workspace = true +wgpu = { workspace = true, optional = true } diff --git a/crates/craballoc/src/lib.rs b/crates/craballoc/src/lib.rs new file mode 100644 index 00000000..fdfa62bd --- /dev/null +++ b/crates/craballoc/src/lib.rs @@ -0,0 +1,153 @@ +//! GPU and CPU slab allocation. +//! +//! Re-exports [`Array`], [`Id`], [`Slab`] and [`SlabItem`] from +//! [`crabslab`](https://docs.rs/crabslab/latest/crabslab/). +//! +//! User types can automatically derive `SlabItem` in most cases. It is +//! required that your type's fields all implement `SlabItem` and `crabslab` +//! must be in scope. +//! +//! ``` +//! use renderling::slab::SlabItem; +//! +//! #[derive(Clone, Copy, SlabItem)] +//! struct UserData { +//! pos: (f32, f32, f32), +//! acc: (f32, f32, f32), +//! } +//! ``` + +pub mod range; +pub mod runtime; +pub mod slab; +pub mod value; + +pub mod prelude { + //! Easy-include prelude module. + + pub use super::runtime::*; + pub use super::slab::*; + pub use super::value::*; +} + +// #[cfg(feature = "wgpu")] +// mod wgpu_slab; +// #[cfg(feature = "wgpu")] +// pub use wgpu_slab::*; + +// #[cfg(test)] +// mod test { +// pub use crabslab::{Array, Id, Slab, SlabItem}; + +// use crate::slab::SlabAllocator; + +// #[test] +// fn mngr_updates_count_sanity() { +// let mngr = SlabAllocator::>>::default(); +// { +// let value = mngr.new_value(666u32); +// assert_eq!( +// 1, +// value.ref_count(), +// "slab should not retain a count on value" +// ); +// } +// let _ = mngr.upkeep(()); +// assert_eq!( +// 0, +// mngr.update_sources.read().unwrap().len(), +// "value should have dropped with no refs" +// ); +// { +// let values = mngr.new_array([666u32, 420u32]); +// assert_eq!( +// 1, +// values.ref_count(), +// "slab should not retain a count on array" +// ); +// } +// let _ = mngr.upkeep(()); +// assert_eq!( +// 0, +// mngr.update_sources.read().unwrap().len(), +// "array should have dropped with no refs" +// ); +// } + +// #[test] +// fn range_sanity() { +// let a = Range { +// first_index: 1, +// last_index: 2, +// }; +// let b = Range { +// first_index: 0, +// last_index: 0, +// }; +// assert!(!a.intersects(&b)); +// assert!(!b.intersects(&a)); +// } + +// #[test] +// fn slab_manager_sanity() { +// let m = SlabAllocator::>>::default(); +// log::info!("allocating 4 unused u32 slots"); +// let _ = m.allocate::(); +// let _ = m.allocate::(); +// let _ = m.allocate::(); +// let _ = m.allocate::(); + +// log::info!("creating 4 update sources"); +// let h4 = m.new_value(0u32); +// let h5 = m.new_value(0u32); +// let h6 = m.new_value(0u32); +// let h7 = m.new_value(0u32); +// log::info!("running upkeep"); +// let _ = m.upkeep(()); +// assert!(m.recycles.read().unwrap().ranges.is_empty()); +// assert_eq!(4, m.update_sources.read().unwrap().len()); +// let k = m.update_k.load(Ordering::Relaxed); +// assert_eq!(4, k); + +// log::info!("dropping 4 update sources"); +// drop(h4); +// drop(h5); +// drop(h6); +// drop(h7); +// let _ = m.upkeep(()); +// assert_eq!(1, m.recycles.read().unwrap().ranges.len()); +// assert!(m.update_sources.read().unwrap().is_empty()); + +// log::info!("creating 4 update sources, round two"); +// let h4 = m.new_value(0u32); +// let h5 = m.new_value(0u32); +// let h6 = m.new_value(0u32); +// let h7 = m.new_value(0u32); +// assert!(m.recycles.read().unwrap().ranges.is_empty()); +// assert_eq!(4, m.update_sources.read().unwrap().len()); +// let k = m.update_k.load(Ordering::Relaxed); +// // MAYBE_TODO: recycle "update_k"s instead of incrementing for each new source +// assert_eq!(8, k); + +// log::info!("creating one more update source, immediately dropping it and two others"); +// let h8 = m.new_value(0u32); +// drop(h8); +// drop(h4); +// drop(h6); +// let _ = m.upkeep(()); +// assert_eq!(3, m.recycles.read().unwrap().ranges.len()); +// assert_eq!(2, m.update_sources.read().unwrap().len()); +// assert_eq!(9, m.update_k.load(Ordering::Relaxed)); + +// drop(h7); +// drop(h5); +// let _ = m.upkeep(()); +// m.defrag(); +// assert_eq!( +// 1, +// m.recycles.read().unwrap().ranges.len(), +// "ranges: {:#?}", +// m.recycles.read().unwrap().ranges +// ); +// } +// } diff --git a/crates/craballoc/src/range.rs b/crates/craballoc/src/range.rs new file mode 100644 index 00000000..0d83dfcf --- /dev/null +++ b/crates/craballoc/src/range.rs @@ -0,0 +1,158 @@ +//! Managing ranges of values. + +use crabslab::{Array, Slab, SlabItem}; + +use crate::runtime::SlabUpdate; + +#[derive(Clone, Copy, PartialEq)] +pub struct Range { + pub first_index: u32, + pub last_index: u32, +} + +impl core::fmt::Debug for Range { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + f.write_str(&format!("{}..={}", self.first_index, self.last_index)) + } +} + +impl From> for Range { + fn from(array: Array) -> Self { + let array = array.into_u32_array(); + let first_index = array.starting_index() as u32; + Range { + first_index, + last_index: first_index + array.len() as u32 - 1, + } + } +} + +impl Range { + pub fn len(&self) -> u32 { + 1 + self.last_index - self.first_index + } + + pub fn is_empty(&self) -> bool { + self.last_index == self.first_index + } + + pub fn intersects(&self, other: &Range) -> bool { + !(self.first_index > other.last_index || self.last_index < other.first_index) + } +} + +/// Represents a block of contiguous numbers. +pub trait IsRange { + /// Returns `true` if the two ranges overlap or "touch". + fn should_merge_with(&self, other: &Self) -> bool; + + /// Returns the union of two ranges. + fn union(&mut self, other: Self); +} + +impl IsRange for Range { + fn should_merge_with(&self, other: &Self) -> bool { + debug_assert!( + !self.intersects(other), + "{self:?} intersects existing {other:?}, should never happen with Range" + ); + + self.last_index + 1 == other.first_index || self.first_index == other.last_index + 1 + } + + fn union(&mut self, other: Self) { + *self = Range { + first_index: self.first_index.min(other.first_index), + last_index: self.last_index.max(other.last_index), + }; + } +} + +impl IsRange for SlabUpdate { + fn should_merge_with(&self, other: &Self) -> bool { + self.intersects(other) + } + + fn union(&mut self, other: Self) { + if self.array == other.array { + *self = other; + return; + } + + let mut array = self.array; + array.union(&other.array); + + let mut elements = vec![0u32; array.len()]; + + let self_index = self.array.index - array.index; + elements.write_indexed_slice(&self.elements, self_index as usize); + let other_index = other.array.index - array.index; + elements.write_indexed_slice(&other.elements, other_index as usize); + + self.array = array; + self.elements = elements; + } +} + +/// Manages contiguous ranges. +pub struct RangeManager { + pub ranges: Vec, +} + +impl Default for RangeManager { + fn default() -> Self { + Self { ranges: vec![] } + } +} + +impl RangeManager { + /// Return the number of distinct ranges being managed. + pub fn len(&self) -> usize { + self.ranges.len() + } + + /// Return whether this manager is managing any ranges. + pub fn is_empty(&self) -> bool { + self.ranges.is_empty() + } + + pub fn add_range(&mut self, input_range: R) { + for range in self.ranges.iter_mut() { + if range.should_merge_with(&input_range) { + range.union(input_range); + return; + } + } + self.ranges.push(input_range); + } +} + +impl RangeManager { + /// Removes a range of `count` elements, if possible. + pub fn remove(&mut self, count: u32) -> Option { + let mut remove_index = usize::MAX; + for (i, range) in self.ranges.iter_mut().enumerate() { + // This is potentially a hot path, so use the `if` even + // though clippy complains (because using match is slower) + #[allow(clippy::comparison_chain)] + if range.len() > count { + let first_index = range.first_index; + let last_index = range.first_index + count - 1; + range.first_index += count; + return Some(Range { + first_index, + last_index, + }); + } else if range.len() == count { + remove_index = i; + break; + } + } + + if remove_index == usize::MAX { + None + } else { + Some(self.ranges.swap_remove(remove_index)) + } + } +} diff --git a/crates/craballoc/src/runtime.rs b/crates/craballoc/src/runtime.rs new file mode 100644 index 00000000..f964717b --- /dev/null +++ b/crates/craballoc/src/runtime.rs @@ -0,0 +1,163 @@ +//! The CPU side fo slab allocation. + +use std::{future::Future, sync::Mutex}; + +use crabslab::Array; + +use crate::slab::SlabAllocatorError; + +/// An update to a slab. +/// +/// This is a write that can be serialized for later syncronization. +#[derive(Clone, Debug)] +pub struct SlabUpdate { + pub array: Array, + pub elements: Vec, +} + +impl SlabUpdate { + // pub fn range(&self) -> Range { + // Range { + // first_index: self.array.starting_index() as u32, + // last_index: (self.array.starting_index() + self.array.len()) as u32 - + // 1, } + // } + + pub fn intersects(&self, other: &Self) -> bool { + let here_start = self.array.index; + let there_start = other.array.index; + let here_end = self.array.index + self.array.len; + let there_end = other.array.index + other.array.len; + !(here_start >= there_end || there_start >= here_end) + } +} + +/// Represents the runtime that provides the interface to the GPU buffer. +/// +/// For example, this could be a struct that contains `wgpu::Device` and `wgpu::Queue`, +/// or it could be a struct that contains Vulkan types, etc. +pub trait IsRuntime: Clone { + /// The type of buffer this runtime engages with. + type Buffer; + + /// The type used to denote the configuration of the buffer. + type BufferUsages: Clone; + + /// Create a new buffer with the given `capacity`, where `capacity` is the number of `u32`s + /// that can be stored in the buffer. + fn buffer_create( + &self, + capacity: usize, + label: Option<&str>, + usages: Self::BufferUsages, + ) -> Self::Buffer; + + /// Copy the contents of one buffer into another at index 0. + fn buffer_copy( + &self, + source_buffer: &Self::Buffer, + destination_buffer: &Self::Buffer, + label: Option<&str>, + ); + + /// Write the updates into the given buffer. + fn buffer_write>(&self, updates: U, buffer: &Self::Buffer); + + /// Read the range from the given buffer. + /// + /// ## Note + /// This function is async. + fn buffer_read( + &self, + buffer: &Self::Buffer, + buffer_len: usize, + range: impl std::ops::RangeBounds, + ) -> impl Future, SlabAllocatorError>>; +} + +pub(crate) fn range_to_indices_and_len( + // Used in the case the range is unbounded + max_len: usize, + range: impl std::ops::RangeBounds, +) -> (usize, usize, usize) { + let start = match range.start_bound() { + core::ops::Bound::Included(start) => *start, + core::ops::Bound::Excluded(start) => *start + 1, + core::ops::Bound::Unbounded => 0, + }; + let end = match range.end_bound() { + core::ops::Bound::Included(end) => *end + 1, + core::ops::Bound::Excluded(end) => *end, + core::ops::Bound::Unbounded => max_len, + }; + let len = end - start; + (start, end, len) +} + +/// A runtime that only operates on the CPU. +/// +/// `CpuRuntime` manages [`VecSlab`]s, which are used as a reference +/// implementation, mostly for testing. +#[derive(Clone, Copy)] +pub struct CpuRuntime; + +/// A slab buffer used _only_ on the GPU. +/// +/// This is mostly for testing. +pub struct VecSlab { + inner: Mutex>, +} + +impl IsRuntime for CpuRuntime { + type Buffer = VecSlab; + type BufferUsages = (); + + fn buffer_create(&self, capacity: usize, label: Option<&str>, _usages: ()) -> VecSlab { + log::trace!( + "creating vec buffer '{}' with capacity {capacity}", + label.unwrap_or("unknown") + ); + VecSlab { + inner: Mutex::new(vec![0; capacity]), + } + } + + fn buffer_copy( + &self, + source_buffer: &VecSlab, + destination_buffer: &VecSlab, + label: Option<&str>, + ) { + log::trace!("performing copy '{}'", label.unwrap_or("unknown")); + let this = &destination_buffer; + let source = source_buffer.inner.lock().unwrap(); + let mut destination = this.inner.lock().unwrap(); + let destination_slice = &mut destination[0..source.len()]; + destination_slice.copy_from_slice(source.as_slice()); + } + + fn buffer_write>(&self, updates: U, buffer: &Self::Buffer) { + let mut guard = buffer.inner.lock().unwrap(); + log::trace!("writing to vec len:{}", guard.len()); + for SlabUpdate { array, elements } in updates { + log::trace!("array: {array:?} elements: {elements:?}"); + let slice = &mut guard[array.starting_index()..array.starting_index() + array.len()]; + slice.copy_from_slice(&elements); + } + } + + async fn buffer_read( + &self, + buffer: &Self::Buffer, + buffer_len: usize, + range: impl std::ops::RangeBounds, + ) -> Result, SlabAllocatorError> { + let v = buffer.inner.lock().unwrap(); + debug_assert_eq!(v.len(), buffer_len); + let (start, end, len) = range_to_indices_and_len(v.len(), range); + let mut output = vec![0; len]; + let slice = &v[start..end]; + output.copy_from_slice(slice); + Ok(output) + } +} diff --git a/crates/craballoc/src/slab.rs b/crates/craballoc/src/slab.rs new file mode 100644 index 00000000..0dc1be10 --- /dev/null +++ b/crates/craballoc/src/slab.rs @@ -0,0 +1,356 @@ +//! Slab allocators. +use core::sync::atomic::{AtomicUsize, Ordering}; +use crabslab::{Array, Id, SlabItem}; +use rustc_hash::{FxHashMap, FxHashSet}; +use snafu::prelude::*; +use std::sync::{atomic::AtomicBool, Arc, RwLock}; + +use crate::{ + range::{Range, RangeManager}, + runtime::{IsRuntime, SlabUpdate}, + value::{Hybrid, HybridArray, WeakGpuRef}, +}; + +#[cfg(feature = "wgpu")] +mod wgpu_slab; +#[cfg(feature = "wgpu")] +pub use wgpu_slab::*; + +#[derive(Debug, Snafu)] +pub enum SlabAllocatorError { + #[snafu(display( + "Slab has no internal buffer. Please call SlabAllocator::upkeep or \ + SlabAllocator::get_updated_buffer first." + ))] + NoInternalBuffer, + + #[snafu(display("Async recv error: {source}"))] + AsyncRecv { source: async_channel::RecvError }, + + #[cfg(feature = "wgpu")] + #[snafu(display("Async error: {source}"))] + Async { source: wgpu::BufferAsyncError }, +} + +/// Manages slab allocations and updates over a parameterised buffer. +/// +/// Create a new instance using [`SlabAllocator::new`]. +/// +/// Upon creation you will need to call [`SlabAllocator::get_updated_buffer`] or +/// [`SlabAllocator::upkeep`] at least once before any data is written to the +/// internal buffer. +pub struct SlabAllocator { + pub(crate) notifier: (async_channel::Sender, async_channel::Receiver), + runtime: Runtime, + len: Arc, + capacity: Arc, + needs_expansion: Arc, + buffer: Arc>>>, + buffer_usages: Runtime::BufferUsages, + update_k: Arc, + update_sources: Arc>>, + update_queue: Arc>>, + recycles: Arc>>, +} + +impl Clone for SlabAllocator { + fn clone(&self) -> Self { + SlabAllocator { + runtime: self.runtime.clone(), + notifier: self.notifier.clone(), + len: self.len.clone(), + capacity: self.capacity.clone(), + needs_expansion: self.needs_expansion.clone(), + buffer: self.buffer.clone(), + buffer_usages: self.buffer_usages.clone(), + update_k: self.update_k.clone(), + update_sources: self.update_sources.clone(), + update_queue: self.update_queue.clone(), + recycles: self.recycles.clone(), + } + } +} + +impl SlabAllocator { + pub fn new(runtime: &R, default_buffer_usages: R::BufferUsages) -> Self { + Self { + runtime: runtime.clone(), + notifier: async_channel::unbounded(), + update_k: Default::default(), + update_sources: Default::default(), + update_queue: Default::default(), + recycles: Default::default(), + len: Default::default(), + // Start with size 1, because some of `wgpu`'s validation depends on it. + // See for more info. + capacity: Arc::new(AtomicUsize::new(1)), + needs_expansion: Arc::new(true.into()), + buffer: Default::default(), + buffer_usages: default_buffer_usages, + } + } + + pub(crate) fn next_update_k(&self) -> usize { + self.update_k.fetch_add(1, Ordering::Relaxed) + } + + pub(crate) fn insert_update_source(&self, k: usize, source: WeakGpuRef) { + log::trace!("slab insert_update_source {k}",); + let _ = self.notifier.0.try_send(k); + // UNWRAP: panic on purpose + self.update_sources.write().unwrap().insert(k, source); + } + + fn len(&self) -> usize { + self.len.load(Ordering::Relaxed) + } + + pub(crate) fn allocate(&self) -> Id { + // UNWRAP: we want to panic + let may_range = self.recycles.write().unwrap().remove(T::SLAB_SIZE as u32); + if let Some(range) = may_range { + let id = Id::::new(range.first_index); + log::trace!( + "slab allocate {}: dequeued {range:?} to {id:?}", + std::any::type_name::() + ); + debug_assert_eq!( + range.last_index, + range.first_index + T::SLAB_SIZE as u32 - 1 + ); + id + } else { + self.maybe_expand_to_fit::(1); + let index = self.increment_len(T::SLAB_SIZE); + Id::from(index) + } + } + + pub(crate) fn allocate_array(&self, len: usize) -> Array { + if len == 0 { + return Array::default(); + } + + // UNWRAP: we want to panic + let may_range = self + .recycles + .write() + .unwrap() + .remove((T::SLAB_SIZE * len) as u32); + if let Some(range) = may_range { + let array = Array::::new(range.first_index, len as u32); + log::trace!( + "slab allocate_array {len}x{}: dequeued {range:?} to {array:?}", + std::any::type_name::() + ); + debug_assert_eq!( + range.last_index, + range.first_index + (T::SLAB_SIZE * len) as u32 - 1 + ); + array + } else { + self.maybe_expand_to_fit::(len); + let index = self.increment_len(T::SLAB_SIZE * len); + Array::new(index as u32, len as u32) + } + } + + fn capacity(&self) -> usize { + self.capacity.load(Ordering::Relaxed) + } + + fn reserve_capacity(&self, capacity: usize) { + self.capacity.store(capacity, Ordering::Relaxed); + self.needs_expansion.store(true, Ordering::Relaxed); + } + + fn increment_len(&self, n: usize) -> usize { + self.len.fetch_add(n, Ordering::Relaxed) + } + + fn maybe_expand_to_fit(&self, len: usize) { + let capacity = self.capacity(); + // log::trace!( + // "append_slice: {size} * {ts_len} + {len} ({}) >= {capacity}", + // size * ts_len + len + //); + let capacity_needed = self.len() + T::SLAB_SIZE * len; + if capacity_needed > capacity { + let mut new_capacity = capacity * 2; + while new_capacity < capacity_needed { + new_capacity = (new_capacity * 2).max(2); + } + self.reserve_capacity(new_capacity); + } + } + + /// Return the internal buffer used by this slab. + /// + /// If the buffer needs recreating due to a capacity change this function + /// will return `None`. In that case use [`Self::get_updated_buffer`]. + pub fn get_buffer(&self) -> Option> { + self.buffer.read().unwrap().as_ref().cloned() + } + + /// Return an updated buffer. + /// + /// This is the only way to guarantee access to a buffer. + /// + /// Use [`SlabAllocator::upkeep`] when you only need the buffer after a + /// change, for example to recreate bindgroups. + pub fn get_updated_buffer(&self) -> Arc { + self.get_updated_buffer_and_check().0 + } + + /// Return an updated buffer, and whether or not it is different from the + /// last one. + /// + /// This is the only way to guarantee access to a buffer. + /// + /// Use [`SlabAllocator::upkeep`] when you only need the buffer after a + /// change, for example to recreate bindgroups. + pub fn get_updated_buffer_and_check(&self) -> (Arc, bool) { + if let Some(new_buffer) = self.upkeep() { + (new_buffer, true) + } else { + // UNWRAP: safe because we know the buffer exists at this point, + // as we've called `upkeep` above + (self.get_buffer().unwrap(), false) + } + } + + /// Recreate this buffer, writing the contents of the previous buffer (if it + /// exists) to the new one, then return the new buffer. + fn recreate_buffer(&self) -> Arc { + let new_buffer = Arc::new(self.runtime.buffer_create( + self.capacity(), + None, + self.buffer_usages.clone(), + )); + let mut guard = self.buffer.write().unwrap(); + if let Some(old_buffer) = guard.take() { + self.runtime.buffer_copy(&old_buffer, &new_buffer, None); + } + *guard = Some(new_buffer.clone()); + new_buffer + } + + /// Stage a new value that lives on the GPU _and_ CPU. + pub fn new_value(&self, value: T) -> Hybrid { + Hybrid::new(self, value) + } + + /// Stage a contiguous array of new values that live on the GPU _and_ CPU. + pub fn new_array( + &self, + values: impl IntoIterator, + ) -> HybridArray { + HybridArray::new(self, values) + } + + /// Return the ids of all sources that require updating. + pub fn get_updated_source_ids(&self) -> FxHashSet { + // UNWRAP: panic on purpose + let mut update_set = self.update_queue.write().unwrap(); + while let Ok(source_index) = self.notifier.1.try_recv() { + update_set.insert(source_index); + } + update_set.clone() + } + + /// Build the set of sources that require updates, draining the source + /// notifier and resetting the stored `update_queue`. + /// + /// This also places recycled items into the recycle bin. + fn drain_updated_sources(&self) -> RangeManager { + let update_set = self.get_updated_source_ids(); + // UNWRAP: panic on purpose + *self.update_queue.write().unwrap() = Default::default(); + // Prepare all of our GPU buffer writes + let mut writes = RangeManager::::default(); + { + // Recycle any update sources that are no longer needed, and collect the active + // sources' updates into `writes`. + let mut updates_guard = self.update_sources.write().unwrap(); + let mut recycles_guard = self.recycles.write().unwrap(); + for key in update_set { + let delete = if let Some(gpu_ref) = updates_guard.get_mut(&key) { + let count = gpu_ref.weak.strong_count(); + if count == 0 { + // recycle this allocation + let array = gpu_ref.u32_array; + log::debug!("slab drain_updated_sources: recycling {key} {array:?}"); + if array.is_null() { + log::debug!(" cannot recycle, null"); + } else if array.is_empty() { + log::debug!(" cannot recycle, empty"); + } else { + recycles_guard.add_range(gpu_ref.u32_array.into()); + } + true + } else { + gpu_ref + .get_update() + .into_iter() + .flatten() + .for_each(|u| writes.add_range(u)); + false + } + } else { + log::debug!("could not find {key}"); + false + }; + if delete { + let _ = updates_guard.remove(&key); + } + } + // Defrag the recycle ranges + let ranges = std::mem::take(&mut recycles_guard.ranges); + let num_ranges_to_defrag = ranges.len(); + for range in ranges.into_iter() { + recycles_guard.add_range(range); + } + let num_ranges = recycles_guard.ranges.len(); + if num_ranges < num_ranges_to_defrag { + log::trace!("{num_ranges_to_defrag} ranges before, {num_ranges} after"); + } + } + + writes + } + + /// Perform upkeep on the slab, commiting changes to the GPU. + /// + /// Returns the new buffer if one was created due to a capacity resize. + #[must_use] + pub fn upkeep(&self) -> Option> { + let new_buffer = if self.needs_expansion.swap(false, Ordering::Relaxed) { + Some(self.recreate_buffer()) + } else { + None + }; + + let writes = self.drain_updated_sources(); + if !writes.is_empty() { + // UNWRAP: safe because we know the buffer exists at this point, as we may have + // recreated it above^ + let buffer = self.get_buffer().unwrap(); + self.runtime + .buffer_write(writes.ranges.into_iter(), &buffer); + } + new_buffer + } + + /// Defragments the internal "recycle" buffer. + pub fn defrag(&self) { + // UNWRAP: panic on purpose + let mut recycle_guard = self.recycles.write().unwrap(); + for range in std::mem::take(&mut recycle_guard.ranges) { + recycle_guard.add_range(range); + } + } + + pub fn runtime(&self) -> &R { + &self.runtime + } +} diff --git a/crates/craballoc/src/slab/wgpu_slab.rs b/crates/craballoc/src/slab/wgpu_slab.rs new file mode 100644 index 00000000..2102db09 --- /dev/null +++ b/crates/craballoc/src/slab/wgpu_slab.rs @@ -0,0 +1,159 @@ +//! Slab allocation of WebGPU buffers. +use std::{ops::Deref, sync::Arc}; + +use crabslab::{Array, Slab, SlabItem}; +use snafu::{OptionExt, ResultExt}; +use tracing::Instrument; + +use crate::{ + runtime::{IsRuntime, SlabUpdate}, + slab::{AsyncRecvSnafu, AsyncSnafu, NoInternalBufferSnafu}, +}; + +use super::{SlabAllocator, SlabAllocatorError}; + +/// A slab allocation runtime that creates and updates [`wgpu::Buffer`]s. +#[derive(Clone)] +pub struct WgpuRuntime { + pub device: Arc, + pub queue: Arc, +} + +impl IsRuntime for WgpuRuntime { + type Buffer = wgpu::Buffer; + type BufferUsages = wgpu::BufferUsages; + + fn buffer_write>(&self, updates: U, buffer: &Self::Buffer) { + for SlabUpdate { array, elements } in updates { + let offset = array.starting_index() as u64 * std::mem::size_of::() as u64; + self.queue + .write_buffer(buffer, offset, bytemuck::cast_slice(&elements)); + } + self.queue.submit(std::iter::empty()); + } + + fn buffer_create( + &self, + capacity: usize, + label: Option<&str>, + usages: wgpu::BufferUsages, + ) -> Self::Buffer { + let size = (capacity * std::mem::size_of::()) as u64; + self.device.create_buffer(&wgpu::BufferDescriptor { + label, + size, + usage: usages, + mapped_at_creation: false, + }) + } + + fn buffer_copy( + &self, + source_buffer: &Self::Buffer, + destination_buffer: &Self::Buffer, + label: Option<&str>, + ) { + let mut encoder = self + .device + .create_command_encoder(&wgpu::CommandEncoderDescriptor { label }); + encoder.copy_buffer_to_buffer( + source_buffer, + 0, + destination_buffer, + 0, + source_buffer.size(), + ); + self.queue.submit(std::iter::once(encoder.finish())); + } + + #[tracing::instrument(skip_all)] + async fn buffer_read( + &self, + buffer: &Self::Buffer, + buffer_len: usize, + range: impl std::ops::RangeBounds, + ) -> Result, SlabAllocatorError> { + let (start, _end, len) = crate::runtime::range_to_indices_and_len(buffer_len, range); + let byte_offset = start * std::mem::size_of::(); + let length = len * std::mem::size_of::(); + let output_buffer_size = length as u64; + let output_buffer = tracing::trace_span!("create-buffer").in_scope(|| { + self.device.create_buffer(&wgpu::BufferDescriptor { + label: None, + size: output_buffer_size, + usage: wgpu::BufferUsages::COPY_DST | wgpu::BufferUsages::MAP_READ, + mapped_at_creation: false, + }) + }); + + let submission_index = tracing::trace_span!("copy_buffer").in_scope(|| { + let mut encoder = self + .device + .create_command_encoder(&wgpu::CommandEncoderDescriptor { label: None }); + log::trace!( + "copy_buffer_to_buffer byte_offset:{byte_offset}, \ + output_buffer_size:{output_buffer_size}", + ); + encoder.copy_buffer_to_buffer( + buffer, + byte_offset as u64, + &output_buffer, + 0, + output_buffer_size, + ); + self.queue.submit(std::iter::once(encoder.finish())) + }); + + let buffer_slice = output_buffer.slice(..); + let (tx, rx) = async_channel::bounded(1); + tracing::trace_span!("map_async").in_scope(|| { + buffer_slice.map_async(wgpu::MapMode::Read, move |res| tx.try_send(res).unwrap()); + }); + tracing::trace_span!("poll").in_scope(|| { + self.device.poll(wgpu::Maintain::wait_for(submission_index)); + }); + rx.recv() + .instrument(tracing::info_span!("recv")) + .await + .context(AsyncRecvSnafu)? + .context(AsyncSnafu)?; + let output = tracing::trace_span!("get_mapped").in_scope(|| { + let bytes = buffer_slice.get_mapped_range(); + bytemuck::cast_slice(bytes.deref()).to_vec() + }); + Ok(output) + } +} + +impl SlabAllocator { + #[tracing::instrument(skip_all)] + pub async fn read( + &self, + range: impl std::ops::RangeBounds, + ) -> Result, SlabAllocatorError> { + let internal_buffer = self.get_buffer().context(NoInternalBufferSnafu)?; + self.runtime + .buffer_read(&internal_buffer, self.len(), range) + .await + } + + #[tracing::instrument(skip_all)] + pub async fn read_array( + &self, + array: Array, + ) -> Result, SlabAllocatorError> { + let arr = array.into_u32_array(); + let range = array.index as usize..(arr.index + arr.len) as usize; + let data = self.read(range).await?; + let t_array = Array::new(0, array.len() as u32); + Ok(data.read_vec(t_array)) + } + + pub fn device(&self) -> &wgpu::Device { + &self.runtime.device + } + + pub fn queue(&self) -> &wgpu::Queue { + &self.runtime.queue + } +} diff --git a/crates/craballoc/src/value.rs b/crates/craballoc/src/value.rs new file mode 100644 index 00000000..ab822242 --- /dev/null +++ b/crates/craballoc/src/value.rs @@ -0,0 +1,428 @@ +//! Allocated values. + +use std::sync::{Arc, Mutex, RwLock, Weak}; + +use crabslab::{Array, Id, Slab, SlabItem}; + +use crate::{ + runtime::{IsRuntime, SlabUpdate}, + slab::SlabAllocator, +}; + +pub struct WeakGpuRef { + pub(crate) u32_array: Array, + pub(crate) weak: Weak>>, + pub(crate) takes_update: bool, +} + +impl WeakGpuRef { + /// Take any queued updates. + pub fn get_update(&self) -> Option> { + let strong = self.weak.upgrade()?; + let mut guard = strong.lock().unwrap(); + let updates: Vec<_> = if self.takes_update { + std::mem::take(guard.as_mut()) + } else { + guard.clone() + }; + + if updates.is_empty() { + None + } else { + Some(updates) + } + } + + fn from_gpu(gpu: &Gpu) -> Self { + WeakGpuRef { + u32_array: Array::new(gpu.id.inner(), T::SLAB_SIZE as u32), + weak: Arc::downgrade(&gpu.update), + takes_update: true, + } + } + + fn from_gpu_array(gpu_array: &GpuArray) -> Self { + WeakGpuRef { + u32_array: gpu_array.array.into_u32_array(), + weak: Arc::downgrade(&gpu_array.updates), + takes_update: true, + } + } +} + +#[derive(Debug)] +pub struct WeakGpu { + pub(crate) id: Id, + pub(crate) notifier_index: usize, + pub(crate) notify: async_channel::Sender, + pub(crate) update: Weak>>, +} + +impl Clone for WeakGpu { + fn clone(&self) -> Self { + Self { + id: self.id, + notifier_index: self.notifier_index, + notify: self.notify.clone(), + update: self.update.clone(), + } + } +} + +impl WeakGpu { + pub fn from_gpu(gpu: &Gpu) -> Self { + Self { + id: gpu.id, + notifier_index: gpu.notifier_index, + notify: gpu.notify.clone(), + update: Arc::downgrade(&gpu.update), + } + } + + pub fn upgrade(&self) -> Option> { + Some(Gpu { + id: self.id, + notifier_index: self.notifier_index, + notify: self.notify.clone(), + update: self.update.upgrade()?, + }) + } +} + +#[derive(Debug)] +pub struct WeakHybrid { + pub(crate) weak_cpu: Weak>, + pub(crate) weak_gpu: WeakGpu, +} + +impl Clone for WeakHybrid { + fn clone(&self) -> Self { + Self { + weak_cpu: self.weak_cpu.clone(), + weak_gpu: self.weak_gpu.clone(), + } + } +} + +impl WeakHybrid { + pub fn id(&self) -> Id { + self.weak_gpu.id + } + + pub fn from_hybrid(h: &Hybrid) -> Self { + Self { + weak_cpu: Arc::downgrade(&h.cpu_value), + weak_gpu: WeakGpu::from_gpu(&h.gpu_value), + } + } + + pub fn upgrade(&self) -> Option> { + Some(Hybrid { + cpu_value: self.weak_cpu.upgrade()?, + gpu_value: self.weak_gpu.upgrade()?, + }) + } + + pub fn strong_count(&self) -> usize { + self.weak_gpu.update.strong_count() + } +} + +/// A "hybrid" type that lives on the CPU and the GPU. +/// +/// Updates are syncronized to the GPU once per frame. +/// +/// Clones of a hybrid all point to the same CPU and GPU data. +pub struct Hybrid { + pub(crate) cpu_value: Arc>, + pub(crate) gpu_value: Gpu, +} + +impl core::fmt::Debug for Hybrid { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + f.debug_struct(&format!("Hybrid<{}>", std::any::type_name::())) + .field("id", &self.gpu_value.id) + .field("cpu_value", &self.cpu_value.read().unwrap()) + .finish() + } +} + +impl Clone for Hybrid { + fn clone(&self) -> Self { + Hybrid { + cpu_value: self.cpu_value.clone(), + gpu_value: self.gpu_value.clone(), + } + } +} + +impl Hybrid { + pub fn new(mngr: &SlabAllocator, value: T) -> Self { + let cpu_value = Arc::new(RwLock::new(value.clone())); + let gpu_value = Gpu::new(mngr, value); + Self { + cpu_value, + gpu_value, + } + } + + /// Returns the number of clones of this Hybrid on the CPU. + pub fn ref_count(&self) -> usize { + Arc::strong_count(&self.gpu_value.update) + } + + pub fn id(&self) -> Id { + self.gpu_value.id() + } + + pub fn get(&self) -> T { + self.cpu_value.read().unwrap().clone() + } + + pub fn modify(&self, f: impl FnOnce(&mut T) -> A) -> A { + let mut value_guard = self.cpu_value.write().unwrap(); + let a = f(&mut value_guard); + let t = value_guard.clone(); + self.gpu_value.set(t); + a + } + + pub fn set(&self, value: T) { + self.modify(move |old| { + *old = value; + }) + } + + /// Drop the CPU portion of the hybrid value, returning a type that wraps + /// only the GPU resources. + pub fn into_gpu_only(self) -> Gpu { + self.gpu_value + } +} + +/// A type that lives on the GPU. +/// +/// Updates are synchronized to the GPU during [`SlabAllocator::upkeep`]. +pub struct Gpu { + pub(crate) id: Id, + pub(crate) notifier_index: usize, + pub(crate) notify: async_channel::Sender, + pub(crate) update: Arc>>, +} + +impl Drop for Gpu { + fn drop(&mut self) { + let _ = self.notify.try_send(self.notifier_index); + } +} + +impl Clone for Gpu { + fn clone(&self) -> Self { + Self { + id: self.id, + notifier_index: self.notifier_index, + notify: self.notify.clone(), + update: self.update.clone(), + } + } +} + +impl Gpu { + pub fn new(mngr: &SlabAllocator, value: T) -> Self { + let id = mngr.allocate::(); + let notifier_index = mngr.next_update_k(); + let s = Self { + id, + notifier_index, + notify: mngr.notifier.0.clone(), + update: Default::default(), + }; + s.set(value); + mngr.insert_update_source(notifier_index, WeakGpuRef::from_gpu(&s)); + s + } + + pub fn id(&self) -> Id { + self.id + } + + pub fn set(&self, value: T) { + // UNWRAP: panic on purpose + *self.update.lock().unwrap() = vec![SlabUpdate { + array: Array::new(self.id.inner(), T::SLAB_SIZE as u32), + elements: { + let mut es = vec![0u32; T::SLAB_SIZE]; + es.write(Id::new(0), &value); + es + }, + }]; + // UNWRAP: safe because it's unbound + self.notify.try_send(self.notifier_index).unwrap(); + } +} + +/// A array type that lives on the GPU. +/// +/// Once created, the array cannot be resized. +/// +/// Updates are syncronized to the GPU once per frame. +#[derive(Debug)] +pub struct GpuArray { + array: Array, + notifier_index: usize, + notifier: async_channel::Sender, + updates: Arc>>, +} + +impl Drop for GpuArray { + fn drop(&mut self) { + let _ = self.notifier.try_send(self.notifier_index); + } +} + +impl Clone for GpuArray { + fn clone(&self) -> Self { + GpuArray { + notifier: self.notifier.clone(), + notifier_index: self.notifier_index, + array: self.array, + updates: self.updates.clone(), + } + } +} + +impl GpuArray { + pub fn new(mngr: &SlabAllocator, values: &[T]) -> Self { + let array = mngr.allocate_array::(values.len()); + let update = { + let mut elements = vec![0u32; T::SLAB_SIZE * array.len()]; + elements.write_indexed_slice(values, 0); + SlabUpdate { + array: array.into_u32_array(), + elements, + } + }; + let notifier_index = mngr.next_update_k(); + let g = GpuArray { + notifier_index, + notifier: mngr.notifier.0.clone(), + array, + updates: Arc::new(Mutex::new(vec![update])), + }; + mngr.insert_update_source(notifier_index, WeakGpuRef::from_gpu_array(&g)); + g + } + + pub fn len(&self) -> usize { + self.array.len() + } + + pub fn is_empty(&self) -> bool { + self.array.is_empty() + } + + pub fn array(&self) -> Array { + self.array + } + + pub fn get_id(&self, index: usize) -> Id { + self.array().at(index) + } + + pub fn set_item(&self, index: usize, value: &T) { + let id = self.array.at(index); + let array = Array::::new(id.inner(), T::SLAB_SIZE as u32); + let mut elements = vec![0u32; T::SLAB_SIZE]; + elements.write(0u32.into(), value); + self.updates + .lock() + .unwrap() + .push(SlabUpdate { array, elements }); + // UNWRAP: safe because it's unbounded + self.notifier.try_send(self.notifier_index).unwrap(); + } +} + +/// A "hybrid" array type that lives on the CPU and the GPU. +/// +/// Once created, the array cannot be resized. +/// +/// Updates are syncronized to the GPU once per frame. +pub struct HybridArray { + cpu_value: Arc>>, + gpu_value: GpuArray, +} + +impl core::fmt::Debug for HybridArray { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + f.debug_struct(&format!("HybridArray<{}>", std::any::type_name::())) + .field("array", &self.gpu_value.array) + .field("cpu_value", &self.cpu_value.read().unwrap()) + .finish() + } +} + +impl Clone for HybridArray { + fn clone(&self) -> Self { + HybridArray { + cpu_value: self.cpu_value.clone(), + gpu_value: self.gpu_value.clone(), + } + } +} + +impl HybridArray { + pub fn new(mngr: &SlabAllocator, values: impl IntoIterator) -> Self { + let values = values.into_iter().collect::>(); + let gpu_value = GpuArray::::new(mngr, &values); + let cpu_value = Arc::new(RwLock::new(values)); + HybridArray { + cpu_value, + gpu_value, + } + } + + pub fn ref_count(&self) -> usize { + Arc::strong_count(&self.gpu_value.updates) + } + + pub fn len(&self) -> usize { + self.gpu_value.array.len() + } + + pub fn is_empty(&self) -> bool { + self.gpu_value.is_empty() + } + + pub fn array(&self) -> Array { + self.gpu_value.array() + } + + pub fn get(&self, index: usize) -> Option { + self.cpu_value.read().unwrap().get(index).cloned() + } + + pub fn get_vec(&self) -> Vec { + self.cpu_value.read().unwrap().clone() + } + + pub fn get_id(&self, index: usize) -> Id { + self.gpu_value.get_id(index) + } + + pub fn modify(&self, index: usize, f: impl FnOnce(&mut T) -> S) -> Option { + let mut value_guard = self.cpu_value.write().unwrap(); + let t = value_guard.get_mut(index)?; + let output = Some(f(t)); + self.gpu_value.set_item(index, t); + output + } + + pub fn set_item(&self, index: usize, value: T) -> Option { + self.modify(index, move |t| std::mem::replace(t, value)) + } + + pub fn into_gpu_only(self) -> GpuArray { + self.gpu_value + } +}