From ac7f9b03422bdf65765763f61ab29bc1557354dc Mon Sep 17 00:00:00 2001 From: andriyDev Date: Fri, 24 Oct 2025 10:38:56 -0700 Subject: [PATCH 1/7] Completely skip all the hot-reloading polling if hot-reloading is disabled. --- crates/bevy_asset/src/server/mod.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/crates/bevy_asset/src/server/mod.rs b/crates/bevy_asset/src/server/mod.rs index 3ae2ee1af6971..303bff96870b7 100644 --- a/crates/bevy_asset/src/server/mod.rs +++ b/crates/bevy_asset/src/server/mod.rs @@ -1775,6 +1775,12 @@ pub fn handle_internal_asset_events(world: &mut World) { world.write_message_batch(untyped_failures); } + // The following code all deals with hot-reloading, which we can skip if the server isn't + // watching for changes. + if !infos.watching_for_changes { + return; + } + fn queue_ancestors( asset_path: &AssetPath, infos: &AssetInfos, From cf717e3abb47b0ab646738442878bb36829e17a4 Mon Sep 17 00:00:00 2001 From: andriyDev Date: Fri, 31 Oct 2025 23:27:30 -0700 Subject: [PATCH 2/7] Separate all the processing state stuff into its own `ProcessingState` struct. --- crates/bevy_asset/src/io/processor_gated.rs | 37 ++--- crates/bevy_asset/src/processor/mod.rs | 144 ++++++++++++++------ 2 files changed, 125 insertions(+), 56 deletions(-) diff --git a/crates/bevy_asset/src/io/processor_gated.rs b/crates/bevy_asset/src/io/processor_gated.rs index da439f56f5e18..2dcd0dacf1748 100644 --- a/crates/bevy_asset/src/io/processor_gated.rs +++ b/crates/bevy_asset/src/io/processor_gated.rs @@ -35,19 +35,6 @@ impl ProcessorGatedReader { reader, } } - - /// Gets a "transaction lock" that can be used to ensure no writes to asset or asset meta occur - /// while it is held. - async fn get_transaction_lock( - &self, - path: &AssetPath<'static>, - ) -> Result, AssetReaderError> { - let infos = self.processor_data.asset_infos.read().await; - let info = infos - .get(path) - .ok_or_else(|| AssetReaderError::NotFound(path.path().to_owned()))?; - Ok(info.file_transaction_lock.read_arc().await) - } } impl AssetReader for ProcessorGatedReader { @@ -56,6 +43,7 @@ impl AssetReader for ProcessorGatedReader { trace!("Waiting for processing to finish before reading {asset_path}"); let process_result = self .processor_data + .processing_state .wait_until_processed(asset_path.clone()) .await; match process_result { @@ -65,7 +53,11 @@ impl AssetReader for ProcessorGatedReader { } } trace!("Processing finished with {asset_path}, reading {process_result:?}",); - let lock = self.get_transaction_lock(&asset_path).await?; + let lock = self + .processor_data + .processing_state + .get_transaction_lock(&asset_path) + .await?; let asset_reader = self.reader.read(path).await?; let reader = TransactionLockedReader::new(asset_reader, lock); Ok(reader) @@ -76,6 +68,7 @@ impl AssetReader for ProcessorGatedReader { trace!("Waiting for processing to finish before reading meta for {asset_path}",); let process_result = self .processor_data + .processing_state .wait_until_processed(asset_path.clone()) .await; match process_result { @@ -85,7 +78,11 @@ impl AssetReader for ProcessorGatedReader { } } trace!("Processing finished with {process_result:?}, reading meta for {asset_path}",); - let lock = self.get_transaction_lock(&asset_path).await?; + let lock = self + .processor_data + .processing_state + .get_transaction_lock(&asset_path) + .await?; let meta_reader = self.reader.read_meta(path).await?; let reader = TransactionLockedReader::new(meta_reader, lock); Ok(reader) @@ -99,7 +96,10 @@ impl AssetReader for ProcessorGatedReader { "Waiting for processing to finish before reading directory {:?}", path ); - self.processor_data.wait_until_finished().await; + self.processor_data + .processing_state + .wait_until_finished() + .await; trace!("Processing finished, reading directory {:?}", path); let result = self.reader.read_directory(path).await?; Ok(result) @@ -110,7 +110,10 @@ impl AssetReader for ProcessorGatedReader { "Waiting for processing to finish before reading directory {:?}", path ); - self.processor_data.wait_until_finished().await; + self.processor_data + .processing_state + .wait_until_finished() + .await; trace!("Processing finished, getting directory status {:?}", path); let result = self.reader.is_directory(path).await?; Ok(result) diff --git a/crates/bevy_asset/src/processor/mod.rs b/crates/bevy_asset/src/processor/mod.rs index efb904a864f4b..e1134dc607daa 100644 --- a/crates/bevy_asset/src/processor/mod.rs +++ b/crates/bevy_asset/src/processor/mod.rs @@ -40,6 +40,7 @@ mod log; mod process; +use async_lock::RwLockReadGuardArc; pub use log::*; pub use process::*; @@ -103,7 +104,8 @@ pub struct AssetProcessor { /// Internal data stored inside an [`AssetProcessor`]. pub struct AssetProcessorData { - pub(crate) asset_infos: async_lock::RwLock, + /// The state of processing. + pub(crate) processing_state: ProcessingState, /// The factory that creates the transaction log. /// /// Note: we use a regular Mutex instead of an async mutex since we expect users to only set @@ -114,12 +116,21 @@ pub struct AssetProcessorData { processors: RwLock>>, /// Default processors for file extensions default_processors: RwLock, &'static str>>, - state: async_lock::RwLock, sources: AssetSources, +} + +/// The current state of processing, including the overall state and the state of all assets. +pub(crate) struct ProcessingState { + /// The overall state of processing. + state: async_lock::RwLock, + /// The channel to broadcast when the processor has completed initialization. initialized_sender: async_broadcast::Sender<()>, initialized_receiver: async_broadcast::Receiver<()>, + /// The channel to broadcast when the processor has completed processing. finished_sender: async_broadcast::Sender<()>, finished_receiver: async_broadcast::Receiver<()>, + /// The current state of the assets. + asset_infos: async_lock::RwLock, } impl AssetProcessor { @@ -150,20 +161,9 @@ impl AssetProcessor { &self.server } - async fn set_state(&self, state: ProcessorState) { - let mut state_guard = self.data.state.write().await; - let last_state = *state_guard; - *state_guard = state; - if last_state != ProcessorState::Finished && state == ProcessorState::Finished { - self.data.finished_sender.broadcast(()).await.unwrap(); - } else if last_state != ProcessorState::Processing && state == ProcessorState::Processing { - self.data.initialized_sender.broadcast(()).await.unwrap(); - } - } - /// Retrieves the current [`ProcessorState`] pub async fn get_state(&self) -> ProcessorState { - *self.data.state.read().await + self.data.processing_state.get_state().await } /// Retrieves the [`AssetSource`] for this processor @@ -325,7 +325,10 @@ impl AssetProcessor { // to the finished state (otherwise we'd be sitting around stuck in the `Initialized` // state). if new_task_receiver.is_empty() { - self.set_state(ProcessorState::Finished).await; + self.data + .processing_state + .set_state(ProcessorState::Finished) + .await; } enum ProcessorTaskEvent { Start(AssetSourceId<'static>, PathBuf), @@ -371,14 +374,20 @@ impl AssetProcessor { let _ = task_finished_sender.send(()).await; }) .detach(); - self.set_state(ProcessorState::Processing).await; + self.data + .processing_state + .set_state(ProcessorState::Processing) + .await; } ProcessorTaskEvent::Finished => { pending_tasks -= 1; if pending_tasks == 0 { // clean up metadata in asset server self.server.write_infos().consume_handle_drop_events(); - self.set_state(ProcessorState::Finished).await; + self.data + .processing_state + .set_state(ProcessorState::Finished) + .await; } } } @@ -628,7 +637,7 @@ impl AssetProcessor { async fn handle_removed_asset(&self, source: &AssetSource, path: PathBuf) { let asset_path = AssetPath::from(path).with_source(source.id()); debug!("Removing processed {asset_path} because source was removed"); - let mut infos = self.data.asset_infos.write().await; + let mut infos = self.data.processing_state.asset_infos.write().await; if let Some(info) = infos.get(&asset_path) { // we must wait for uncontested write access to the asset source to ensure existing readers / writers // can finish their operations @@ -648,7 +657,7 @@ impl AssetProcessor { new: PathBuf, new_task_sender: &async_channel::Sender<(AssetSourceId<'static>, PathBuf)>, ) { - let mut infos = self.data.asset_infos.write().await; + let mut infos = self.data.processing_state.asset_infos.write().await; let old = AssetPath::from(old).with_source(source.id()); let new = AssetPath::from(new).with_source(source.id()); let processed_writer = source.processed_writer().unwrap(); @@ -740,7 +749,7 @@ impl AssetProcessor { /// This will validate transactions and recover failed transactions when necessary. async fn initialize(&self) -> Result<(), InitializeError> { self.validate_transaction_log_and_recover().await; - let mut asset_infos = self.data.asset_infos.write().await; + let mut asset_infos = self.data.processing_state.asset_infos.write().await; /// Retrieves asset paths recursively. If `clean_empty_folders_writer` is Some, it will be used to clean up empty /// folders when they are discovered. @@ -855,7 +864,10 @@ impl AssetProcessor { } } - self.set_state(ProcessorState::Processing).await; + self.data + .processing_state + .set_state(ProcessorState::Processing) + .await; Ok(()) } @@ -913,7 +925,7 @@ impl AssetProcessor { ) { let asset_path = AssetPath::from(path).with_source(source.id()); let result = self.process_asset_internal(source, &asset_path).await; - let mut infos = self.data.asset_infos.write().await; + let mut infos = self.data.processing_state.asset_infos.write().await; infos .finish_processing(asset_path, result, processor_task_event) .await; @@ -1016,7 +1028,7 @@ impl AssetProcessor { }; { - let infos = self.data.asset_infos.read().await; + let infos = self.data.processing_state.asset_infos.read().await; if let Some(current_processed_info) = infos .get(asset_path) .and_then(|i| i.processed_info.as_ref()) @@ -1042,7 +1054,7 @@ impl AssetProcessor { // Note: this lock must remain alive until all processed asset and meta writes have finished (or failed) // See ProcessedAssetInfo::file_transaction_lock docs for more info let _transaction_lock = { - let mut infos = self.data.asset_infos.write().await; + let mut infos = self.data.processing_state.asset_infos.write().await; let info = infos.get_or_insert(asset_path.clone()); info.file_transaction_lock.write_arc().await }; @@ -1199,24 +1211,12 @@ impl AssetProcessor { impl AssetProcessorData { /// Initializes a new [`AssetProcessorData`] using the given [`AssetSources`]. pub fn new(source: AssetSources) -> Self { - let (mut finished_sender, finished_receiver) = async_broadcast::broadcast(1); - let (mut initialized_sender, initialized_receiver) = async_broadcast::broadcast(1); - // allow overflow on these "one slot" channels to allow receivers to retrieve the "latest" state, and to allow senders to - // not block if there was older state present. - finished_sender.set_overflow(true); - initialized_sender.set_overflow(true); - AssetProcessorData { + processing_state: ProcessingState::new(), sources: source, - finished_sender, - finished_receiver, - initialized_sender, - initialized_receiver, - state: async_lock::RwLock::new(ProcessorState::Initializing), log_factory: Mutex::new(Some(Box::new(FileTransactionLogFactory::default()))), log: Default::default(), processors: Default::default(), - asset_infos: Default::default(), default_processors: Default::default(), } } @@ -1245,6 +1245,72 @@ impl AssetProcessorData { /// Returns a future that will not finish until the path has been processed. pub async fn wait_until_processed(&self, path: AssetPath<'static>) -> ProcessStatus { + self.processing_state.wait_until_processed(path).await + } + + /// Returns a future that will not finish until the processor has been initialized. + pub async fn wait_until_initialized(&self) { + self.processing_state.wait_until_initialized().await; + } + + /// Returns a future that will not finish until processing has finished. + pub async fn wait_until_finished(&self) { + self.processing_state.wait_until_finished().await; + } +} + +impl ProcessingState { + /// Creates a new empty processing state. + fn new() -> Self { + let (mut initialized_sender, initialized_receiver) = async_broadcast::broadcast(1); + let (mut finished_sender, finished_receiver) = async_broadcast::broadcast(1); + // allow overflow on these "one slot" channels to allow receivers to retrieve the "latest" state, and to allow senders to + // not block if there was older state present. + initialized_sender.set_overflow(true); + finished_sender.set_overflow(true); + + Self { + state: async_lock::RwLock::new(ProcessorState::Initializing), + initialized_sender, + initialized_receiver, + finished_sender, + finished_receiver, + asset_infos: Default::default(), + } + } + + /// Sets the overall state of processing and broadcasts appropriate events. + async fn set_state(&self, state: ProcessorState) { + let mut state_guard = self.state.write().await; + let last_state = *state_guard; + *state_guard = state; + if last_state != ProcessorState::Finished && state == ProcessorState::Finished { + self.finished_sender.broadcast(()).await.unwrap(); + } else if last_state != ProcessorState::Processing && state == ProcessorState::Processing { + self.initialized_sender.broadcast(()).await.unwrap(); + } + } + + /// Retrieves the current [`ProcessorState`] + pub(crate) async fn get_state(&self) -> ProcessorState { + *self.state.read().await + } + + /// Gets a "transaction lock" that can be used to ensure no writes to asset or asset meta occur + /// while it is held. + pub(crate) async fn get_transaction_lock( + &self, + path: &AssetPath<'static>, + ) -> Result, AssetReaderError> { + let infos = self.asset_infos.read().await; + let info = infos + .get(path) + .ok_or_else(|| AssetReaderError::NotFound(path.path().to_owned()))?; + Ok(info.file_transaction_lock.read_arc().await) + } + + /// Returns a future that will not finish until the path has been processed. + pub(crate) async fn wait_until_processed(&self, path: AssetPath<'static>) -> ProcessStatus { self.wait_until_initialized().await; let mut receiver = { let infos = self.asset_infos.write().await; @@ -1262,7 +1328,7 @@ impl AssetProcessorData { } /// Returns a future that will not finish until the processor has been initialized. - pub async fn wait_until_initialized(&self) { + pub(crate) async fn wait_until_initialized(&self) { let receiver = { let state = self.state.read().await; match *state { @@ -1280,7 +1346,7 @@ impl AssetProcessorData { } /// Returns a future that will not finish until processing has finished. - pub async fn wait_until_finished(&self) { + pub(crate) async fn wait_until_finished(&self) { let receiver = { let state = self.state.read().await; match *state { From 9218845fb9b528d8f5b5ae7f417366946ea2938b Mon Sep 17 00:00:00 2001 From: andriyDev Date: Sat, 1 Nov 2025 09:36:06 -0700 Subject: [PATCH 3/7] Store the processing state as an Arc. --- crates/bevy_asset/src/io/processor_gated.rs | 26 +++++++-------------- crates/bevy_asset/src/io/source.rs | 10 ++++---- crates/bevy_asset/src/lib.rs | 2 +- crates/bevy_asset/src/processor/mod.rs | 6 ++--- 4 files changed, 17 insertions(+), 27 deletions(-) diff --git a/crates/bevy_asset/src/io/processor_gated.rs b/crates/bevy_asset/src/io/processor_gated.rs index 2dcd0dacf1748..691f8c8325e6b 100644 --- a/crates/bevy_asset/src/io/processor_gated.rs +++ b/crates/bevy_asset/src/io/processor_gated.rs @@ -1,6 +1,6 @@ use crate::{ io::{AssetReader, AssetReaderError, AssetSourceId, PathStream, Reader}, - processor::{AssetProcessorData, ProcessStatus}, + processor::{ProcessStatus, ProcessingState}, AssetPath, }; use alloc::{borrow::ToOwned, boxed::Box, sync::Arc, vec::Vec}; @@ -16,23 +16,23 @@ use super::{AsyncSeekForward, ErasedAssetReader}; /// given path until that path has been processed by [`AssetProcessor`]. /// /// [`AssetProcessor`]: crate::processor::AssetProcessor -pub struct ProcessorGatedReader { +pub(crate) struct ProcessorGatedReader { reader: Box, source: AssetSourceId<'static>, - processor_data: Arc, + processing_state: Arc, } impl ProcessorGatedReader { /// Creates a new [`ProcessorGatedReader`]. - pub fn new( + pub(crate) fn new( source: AssetSourceId<'static>, reader: Box, - processor_data: Arc, + processing_state: Arc, ) -> Self { Self { source, - processor_data, reader, + processing_state, } } } @@ -42,7 +42,6 @@ impl AssetReader for ProcessorGatedReader { let asset_path = AssetPath::from(path.to_path_buf()).with_source(self.source.clone()); trace!("Waiting for processing to finish before reading {asset_path}"); let process_result = self - .processor_data .processing_state .wait_until_processed(asset_path.clone()) .await; @@ -54,7 +53,6 @@ impl AssetReader for ProcessorGatedReader { } trace!("Processing finished with {asset_path}, reading {process_result:?}",); let lock = self - .processor_data .processing_state .get_transaction_lock(&asset_path) .await?; @@ -67,7 +65,6 @@ impl AssetReader for ProcessorGatedReader { let asset_path = AssetPath::from(path.to_path_buf()).with_source(self.source.clone()); trace!("Waiting for processing to finish before reading meta for {asset_path}",); let process_result = self - .processor_data .processing_state .wait_until_processed(asset_path.clone()) .await; @@ -79,7 +76,6 @@ impl AssetReader for ProcessorGatedReader { } trace!("Processing finished with {process_result:?}, reading meta for {asset_path}",); let lock = self - .processor_data .processing_state .get_transaction_lock(&asset_path) .await?; @@ -96,10 +92,7 @@ impl AssetReader for ProcessorGatedReader { "Waiting for processing to finish before reading directory {:?}", path ); - self.processor_data - .processing_state - .wait_until_finished() - .await; + self.processing_state.wait_until_finished().await; trace!("Processing finished, reading directory {:?}", path); let result = self.reader.read_directory(path).await?; Ok(result) @@ -110,10 +103,7 @@ impl AssetReader for ProcessorGatedReader { "Waiting for processing to finish before reading directory {:?}", path ); - self.processor_data - .processing_state - .wait_until_finished() - .await; + self.processing_state.wait_until_finished().await; trace!("Processing finished, getting directory status {:?}", path); let result = self.reader.is_directory(path).await?; Ok(result) diff --git a/crates/bevy_asset/src/io/source.rs b/crates/bevy_asset/src/io/source.rs index d48871ca3f85f..ec86f2e155685 100644 --- a/crates/bevy_asset/src/io/source.rs +++ b/crates/bevy_asset/src/io/source.rs @@ -1,6 +1,6 @@ use crate::{ io::{processor_gated::ProcessorGatedReader, AssetSourceEvent, AssetWatcher}, - processor::AssetProcessorData, + processor::ProcessingState, }; use alloc::{ boxed::Box, @@ -560,12 +560,12 @@ impl AssetSource { /// This will cause processed [`AssetReader`](crate::io::AssetReader) futures (such as [`AssetReader::read`](crate::io::AssetReader::read)) to wait until /// the [`AssetProcessor`](crate::AssetProcessor) has finished processing the requested asset. - pub fn gate_on_processor(&mut self, processor_data: Arc) { + pub(crate) fn gate_on_processor(&mut self, processing_state: Arc) { if let Some(reader) = self.processed_reader.take() { self.processed_reader = Some(Box::new(ProcessorGatedReader::new( self.id(), reader, - processor_data, + processing_state, ))); } } @@ -622,9 +622,9 @@ impl AssetSources { /// This will cause processed [`AssetReader`](crate::io::AssetReader) futures (such as [`AssetReader::read`](crate::io::AssetReader::read)) to wait until /// the [`AssetProcessor`](crate::AssetProcessor) has finished processing the requested asset. - pub fn gate_on_processor(&mut self, processor_data: Arc) { + pub(crate) fn gate_on_processor(&mut self, processing_state: Arc) { for source in self.iter_processed_mut() { - source.gate_on_processor(processor_data.clone()); + source.gate_on_processor(processing_state.clone()); } } } diff --git a/crates/bevy_asset/src/lib.rs b/crates/bevy_asset/src/lib.rs index add1b42c35553..2eaa205738737 100644 --- a/crates/bevy_asset/src/lib.rs +++ b/crates/bevy_asset/src/lib.rs @@ -390,7 +390,7 @@ impl Plugin for AssetPlugin { let mut builders = app.world_mut().resource_mut::(); let processor = AssetProcessor::new(&mut builders); let mut sources = builders.build_sources(false, watch); - sources.gate_on_processor(processor.data.clone()); + sources.gate_on_processor(processor.data.processing_state.clone()); // the main asset server shares loaders with the processor asset server app.insert_resource(AssetServer::new_with_loaders( sources, diff --git a/crates/bevy_asset/src/processor/mod.rs b/crates/bevy_asset/src/processor/mod.rs index e1134dc607daa..9341bcfdcec80 100644 --- a/crates/bevy_asset/src/processor/mod.rs +++ b/crates/bevy_asset/src/processor/mod.rs @@ -105,7 +105,7 @@ pub struct AssetProcessor { /// Internal data stored inside an [`AssetProcessor`]. pub struct AssetProcessorData { /// The state of processing. - pub(crate) processing_state: ProcessingState, + pub(crate) processing_state: Arc, /// The factory that creates the transaction log. /// /// Note: we use a regular Mutex instead of an async mutex since we expect users to only set @@ -139,7 +139,7 @@ impl AssetProcessor { let data = Arc::new(AssetProcessorData::new(source.build_sources(true, false))); // The asset processor uses its own asset server with its own id space let mut sources = source.build_sources(false, false); - sources.gate_on_processor(data.clone()); + sources.gate_on_processor(data.processing_state.clone()); let server = AssetServer::new_with_meta_check( sources, AssetServerMode::Processed, @@ -1212,7 +1212,7 @@ impl AssetProcessorData { /// Initializes a new [`AssetProcessorData`] using the given [`AssetSources`]. pub fn new(source: AssetSources) -> Self { AssetProcessorData { - processing_state: ProcessingState::new(), + processing_state: Arc::new(ProcessingState::new()), sources: source, log_factory: Mutex::new(Some(Box::new(FileTransactionLogFactory::default()))), log: Default::default(), From 2157b7a6de4b792525b2cce33984e826c3adb537 Mon Sep 17 00:00:00 2001 From: andriyDev Date: Sat, 1 Nov 2025 10:33:14 -0700 Subject: [PATCH 4/7] Separate the gated and ungated processed readers. --- crates/bevy_asset/src/io/processor_gated.rs | 4 ++-- crates/bevy_asset/src/io/source.rs | 13 ++++++++++--- 2 files changed, 12 insertions(+), 5 deletions(-) diff --git a/crates/bevy_asset/src/io/processor_gated.rs b/crates/bevy_asset/src/io/processor_gated.rs index 691f8c8325e6b..0c5e147a6a3c7 100644 --- a/crates/bevy_asset/src/io/processor_gated.rs +++ b/crates/bevy_asset/src/io/processor_gated.rs @@ -17,7 +17,7 @@ use super::{AsyncSeekForward, ErasedAssetReader}; /// /// [`AssetProcessor`]: crate::processor::AssetProcessor pub(crate) struct ProcessorGatedReader { - reader: Box, + reader: Arc, source: AssetSourceId<'static>, processing_state: Arc, } @@ -26,7 +26,7 @@ impl ProcessorGatedReader { /// Creates a new [`ProcessorGatedReader`]. pub(crate) fn new( source: AssetSourceId<'static>, - reader: Box, + reader: Arc, processing_state: Arc, ) -> Self { Self { diff --git a/crates/bevy_asset/src/io/source.rs b/crates/bevy_asset/src/io/source.rs index ec86f2e155685..34bee2a0cb63d 100644 --- a/crates/bevy_asset/src/io/source.rs +++ b/crates/bevy_asset/src/io/source.rs @@ -180,7 +180,12 @@ impl AssetSourceBuilder { id: id.clone(), reader, writer, - processed_reader: self.processed_reader.as_mut().map(|r| r()), + processed_reader: self + .processed_reader + .as_mut() + .map(|r| r()) + .map(Into::>::into), + ungated_processed_reader: None, processed_writer, event_receiver: None, watcher: None, @@ -386,7 +391,8 @@ pub struct AssetSource { id: AssetSourceId<'static>, reader: Box, writer: Option>, - processed_reader: Option>, + processed_reader: Option>, + ungated_processed_reader: Option>, processed_writer: Option>, watcher: Option>, processed_watcher: Option>, @@ -562,7 +568,8 @@ impl AssetSource { /// the [`AssetProcessor`](crate::AssetProcessor) has finished processing the requested asset. pub(crate) fn gate_on_processor(&mut self, processing_state: Arc) { if let Some(reader) = self.processed_reader.take() { - self.processed_reader = Some(Box::new(ProcessorGatedReader::new( + self.ungated_processed_reader = Some(reader.clone()); + self.processed_reader = Some(Arc::new(ProcessorGatedReader::new( self.id(), reader, processing_state, From a5f1ca9d6ce4c1bc4b0c7736bbf859610fc71699 Mon Sep 17 00:00:00 2001 From: andriyDev Date: Sat, 1 Nov 2025 10:00:12 -0700 Subject: [PATCH 5/7] Arc AssetSources and share them between the processor, processor's asset server, and the regular asset server. --- crates/bevy_asset/src/io/source.rs | 7 ++++++ crates/bevy_asset/src/lib.rs | 8 +++---- crates/bevy_asset/src/processor/mod.rs | 32 +++++++++++++++----------- crates/bevy_asset/src/server/mod.rs | 8 +++---- 4 files changed, 33 insertions(+), 22 deletions(-) diff --git a/crates/bevy_asset/src/io/source.rs b/crates/bevy_asset/src/io/source.rs index 34bee2a0cb63d..9524f5cfc1726 100644 --- a/crates/bevy_asset/src/io/source.rs +++ b/crates/bevy_asset/src/io/source.rs @@ -431,6 +431,13 @@ impl AssetSource { .ok_or_else(|| MissingProcessedAssetReaderError(self.id.clone_owned())) } + /// Return's this source's ungated processed [`AssetReader`](crate::io::AssetReader), if it + /// exists. + #[inline] + pub(crate) fn ungated_processed_reader(&self) -> Option<&dyn ErasedAssetReader> { + self.ungated_processed_reader.as_deref() + } + /// Return's this source's processed [`AssetWriter`](crate::io::AssetWriter), if it exists. #[inline] pub fn processed_writer( diff --git a/crates/bevy_asset/src/lib.rs b/crates/bevy_asset/src/lib.rs index 2eaa205738737..df47416193387 100644 --- a/crates/bevy_asset/src/lib.rs +++ b/crates/bevy_asset/src/lib.rs @@ -375,7 +375,7 @@ impl Plugin for AssetPlugin { let sources = builders.build_sources(watch, false); app.insert_resource(AssetServer::new_with_meta_check( - sources, + Arc::new(sources), AssetServerMode::Unprocessed, self.meta_check.clone(), watch, @@ -388,9 +388,7 @@ impl Plugin for AssetPlugin { .unwrap_or(cfg!(feature = "asset_processor")); if use_asset_processor { let mut builders = app.world_mut().resource_mut::(); - let processor = AssetProcessor::new(&mut builders); - let mut sources = builders.build_sources(false, watch); - sources.gate_on_processor(processor.data.processing_state.clone()); + let (processor, sources) = AssetProcessor::new(&mut builders, watch); // the main asset server shares loaders with the processor asset server app.insert_resource(AssetServer::new_with_loaders( sources, @@ -406,7 +404,7 @@ impl Plugin for AssetPlugin { let mut builders = app.world_mut().resource_mut::(); let sources = builders.build_sources(false, watch); app.insert_resource(AssetServer::new_with_meta_check( - sources, + Arc::new(sources), AssetServerMode::Processed, AssetMetaCheck::Always, watch, diff --git a/crates/bevy_asset/src/processor/mod.rs b/crates/bevy_asset/src/processor/mod.rs index 9341bcfdcec80..7070bcd2eff65 100644 --- a/crates/bevy_asset/src/processor/mod.rs +++ b/crates/bevy_asset/src/processor/mod.rs @@ -116,7 +116,7 @@ pub struct AssetProcessorData { processors: RwLock>>, /// Default processors for file extensions default_processors: RwLock, &'static str>>, - sources: AssetSources, + sources: Arc, } /// The current state of processing, including the overall state and the state of all assets. @@ -135,19 +135,25 @@ pub(crate) struct ProcessingState { impl AssetProcessor { /// Creates a new [`AssetProcessor`] instance. - pub fn new(source: &mut AssetSourceBuilders) -> Self { - let data = Arc::new(AssetProcessorData::new(source.build_sources(true, false))); + pub fn new( + sources: &mut AssetSourceBuilders, + watch_processed: bool, + ) -> (Self, Arc) { + let state = Arc::new(ProcessingState::new()); + let mut sources = sources.build_sources(true, watch_processed); + sources.gate_on_processor(state.clone()); + let sources = Arc::new(sources); + + let data = Arc::new(AssetProcessorData::new(sources.clone(), state)); // The asset processor uses its own asset server with its own id space - let mut sources = source.build_sources(false, false); - sources.gate_on_processor(data.processing_state.clone()); let server = AssetServer::new_with_meta_check( - sources, + sources.clone(), AssetServerMode::Processed, AssetMetaCheck::Always, false, UnapprovedPathMode::default(), ); - Self { server, data } + (Self { server, data }, sources) } /// Gets a reference to the [`Arc`] containing the [`AssetProcessorData`]. @@ -513,7 +519,7 @@ impl AssetProcessor { } } AssetSourceEvent::RemovedUnknown { path, is_meta } => { - let processed_reader = source.processed_reader().unwrap(); + let processed_reader = source.ungated_processed_reader().unwrap(); match processed_reader.is_directory(&path).await { Ok(is_directory) => { if is_directory { @@ -590,7 +596,7 @@ impl AssetProcessor { "Removing folder {} because source was removed", path.display() ); - let processed_reader = source.processed_reader().unwrap(); + let processed_reader = source.ungated_processed_reader().unwrap(); match processed_reader.read_directory(path).await { Ok(mut path_stream) => { while let Some(child_path) = path_stream.next().await { @@ -787,7 +793,7 @@ impl AssetProcessor { } for source in self.sources().iter_processed() { - let Ok(processed_reader) = source.processed_reader() else { + let Some(processed_reader) = source.ungated_processed_reader() else { continue; }; let Ok(processed_writer) = source.processed_writer() else { @@ -1210,10 +1216,10 @@ impl AssetProcessor { impl AssetProcessorData { /// Initializes a new [`AssetProcessorData`] using the given [`AssetSources`]. - pub fn new(source: AssetSources) -> Self { + pub(crate) fn new(sources: Arc, processing_state: Arc) -> Self { AssetProcessorData { - processing_state: Arc::new(ProcessingState::new()), - sources: source, + processing_state, + sources, log_factory: Mutex::new(Some(Box::new(FileTransactionLogFactory::default()))), log: Default::default(), processors: Default::default(), diff --git a/crates/bevy_asset/src/server/mod.rs b/crates/bevy_asset/src/server/mod.rs index 303bff96870b7..9f7d594e5d3bb 100644 --- a/crates/bevy_asset/src/server/mod.rs +++ b/crates/bevy_asset/src/server/mod.rs @@ -69,7 +69,7 @@ pub(crate) struct AssetServerData { pub(crate) loaders: Arc>, asset_event_sender: Sender, asset_event_receiver: Receiver, - sources: AssetSources, + sources: Arc, mode: AssetServerMode, meta_check: AssetMetaCheck, unapproved_path_mode: UnapprovedPathMode, @@ -91,7 +91,7 @@ impl AssetServer { /// Create a new instance of [`AssetServer`]. If `watch_for_changes` is true, the [`AssetReader`](crate::io::AssetReader) storage will watch for changes to /// asset sources and hot-reload them. pub fn new( - sources: AssetSources, + sources: Arc, mode: AssetServerMode, watching_for_changes: bool, unapproved_path_mode: UnapprovedPathMode, @@ -109,7 +109,7 @@ impl AssetServer { /// Create a new instance of [`AssetServer`]. If `watch_for_changes` is true, the [`AssetReader`](crate::io::AssetReader) storage will watch for changes to /// asset sources and hot-reload them. pub fn new_with_meta_check( - sources: AssetSources, + sources: Arc, mode: AssetServerMode, meta_check: AssetMetaCheck, watching_for_changes: bool, @@ -126,7 +126,7 @@ impl AssetServer { } pub(crate) fn new_with_loaders( - sources: AssetSources, + sources: Arc, loaders: Arc>, mode: AssetServerMode, meta_check: AssetMetaCheck, From 32d016d03993ebd9ed8a0f0633f1d130dd0fffdb Mon Sep 17 00:00:00 2001 From: andriyDev Date: Thu, 6 Nov 2025 17:23:40 -0800 Subject: [PATCH 6/7] Write a migration guide for how to update `AssetServer` and `AssetProcessor` init. --- .../changed_asset_server_init.md | 62 +++++++++++++++++++ 1 file changed, 62 insertions(+) create mode 100644 release-content/migration-guides/changed_asset_server_init.md diff --git a/release-content/migration-guides/changed_asset_server_init.md b/release-content/migration-guides/changed_asset_server_init.md new file mode 100644 index 0000000000000..ae0735c6c4d15 --- /dev/null +++ b/release-content/migration-guides/changed_asset_server_init.md @@ -0,0 +1,62 @@ +--- +title: Changes to `AssetServer` and `AssetProcessor` creation. +pull_requests: [21763] +--- + +Previously `AssetServer`s `new` methods would take `AssetSources`. Now, these methods take +`Arc`. So if you previously had: + +```rust +AssetServer::new( + sources, + mode, + watching_for_changes, + unapproved_path_mode, +) + +// OR: +AssetServer::new_with_meta_check( + sources, + mode, + meta_check, + watching_for_changes, + unapproved_path_mode, +) +``` + +Now you need to do: + +```rust +AssetServer::new( + Arc::new(sources), + mode, + watching_for_changes, + unapproved_path_mode, +) + +// OR: +AssetServer::new_with_meta_check( + Arc::new(sources), + mode, + meta_check, + watching_for_changes, + unapproved_path_mode, +) +``` + +`AssetProcessor::new` has also changed. It now returns to you the `Arc` which can (and +should) be shared with the `AssetServer`. So if you previously had: + +```rust +let processor = AssetProcessor::new(sources); +``` + +Now you need: + +```rust +let (processor, sources_arc) = AssetProcessor::new( + sources, + // A bool whether the returned sources should listen for changes as asset processing completes. + false, +); +``` From d2ff00bb1ee69978f42126defb78c7b164694ed8 Mon Sep 17 00:00:00 2001 From: andriyDev Date: Sun, 9 Nov 2025 11:21:25 -0800 Subject: [PATCH 7/7] Add a doc comment explaining why the ungated reader exists. --- crates/bevy_asset/src/io/source.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/crates/bevy_asset/src/io/source.rs b/crates/bevy_asset/src/io/source.rs index 9524f5cfc1726..4b04198d15aeb 100644 --- a/crates/bevy_asset/src/io/source.rs +++ b/crates/bevy_asset/src/io/source.rs @@ -392,6 +392,10 @@ pub struct AssetSource { reader: Box, writer: Option>, processed_reader: Option>, + /// The ungated version of `processed_reader`. + /// + /// This allows the processor to read all the processed assets to initialize itself without + /// being gated on itself (causing a deadlock). ungated_processed_reader: Option>, processed_writer: Option>, watcher: Option>,