@@ -4,17 +4,21 @@ use bevy_ecs::prelude::*;
4
4
use bevy_ecs:: system:: { BoxedSystem , SystemId } ;
5
5
use std:: any:: Any ;
6
6
use std:: marker:: PhantomData ;
7
+ use std:: sync:: Arc ;
7
8
8
9
type BoxedAnySend = Box < dyn Any + Send > ;
9
10
type SystemIdWithIO = SystemId < BoxedAnySend , BoxedAnySend > ;
10
11
type BoxedSystemWithIO = BoxedSystem < BoxedAnySend , BoxedAnySend > ;
11
12
12
13
/// Represents a registered `System` that can be run asynchronously.
13
14
///
15
+ /// Dropping an `AsyncSystem` will not unregister it. Use `AsyncSystem::unregister()`
16
+ /// to clean up an `AsyncSystem` from the main bevy `World`.
17
+ ///
14
18
/// The easiest way to get an `AsyncSystem` is with `AsyncWorld::register_system()`.
15
19
#[ derive( Debug , Clone ) ]
16
20
pub struct AsyncSystem {
17
- id : SystemId ,
21
+ id : Arc < SystemId > ,
18
22
world : AsyncWorld ,
19
23
}
20
24
@@ -28,35 +32,55 @@ impl AsyncSystem {
28
32
} )
29
33
. await ;
30
34
let id = recv_and_yield ( id_rx) . await ;
35
+ let id = Arc :: new ( id) ;
31
36
Self { id, world }
32
37
}
33
38
34
39
/// Run the system.
35
40
pub async fn run ( & self ) {
36
- let id = self . id ;
41
+ let id = * self . id ;
37
42
self . world
38
43
. apply ( move |world : & mut World | {
39
44
world. run_system ( id) . unwrap_or_else ( die) ;
40
45
} )
41
46
. await ;
42
47
}
48
+
49
+ /// Unregister the system.
50
+ ///
51
+ /// If multiple clones of the AsyncSystem exist, a reference counter will be
52
+ /// decremented instead. The system will be unregistered when the counter
53
+ /// decrements to zero.
54
+ pub async fn unregister ( self ) {
55
+ let Self { id, world } = self ;
56
+ if let Some ( id) = Arc :: into_inner ( id) {
57
+ world
58
+ . apply ( move |world : & mut World | {
59
+ world. remove_system ( id) . unwrap_or_else ( die) ;
60
+ } )
61
+ . await ;
62
+ }
63
+ }
43
64
}
44
65
45
66
/// Represents a registered `System` that accepts input and returns output, and can be run
46
67
/// asynchronously.
47
68
///
69
+ /// Dropping an `AsyncIOSystem` will not unregister it. Use `AsyncSystemIO::unregister()`
70
+ /// to clean up an `AsyncSystemIO` from the main bevy `World`.
71
+ ///
48
72
/// The easiest way to get an `AsyncIOSystem` is with `AsyncWorld::register_io_system()`.
49
73
#[ derive( Debug ) ]
50
74
pub struct AsyncIOSystem < I : Send , O : Send > {
51
- id : SystemIdWithIO ,
75
+ id : Arc < SystemIdWithIO > ,
52
76
world : AsyncWorld ,
53
77
_pd : PhantomData < fn ( I ) -> O > ,
54
78
}
55
79
56
80
impl < I : Send , O : Send > Clone for AsyncIOSystem < I , O > {
57
81
fn clone ( & self ) -> Self {
58
82
Self {
59
- id : self . id ,
83
+ id : Arc :: clone ( & self . id ) ,
60
84
world : self . world . clone ( ) ,
61
85
_pd : PhantomData ,
62
86
}
@@ -85,6 +109,7 @@ impl<I: Send + 'static, O: Send + 'static> AsyncIOSystem<I, O> {
85
109
. await ;
86
110
87
111
let id = recv_and_yield ( id_rx) . await ;
112
+ let id = Arc :: new ( id) ;
88
113
89
114
Self {
90
115
id,
@@ -101,7 +126,7 @@ impl<I: Send + 'static, O: Send + 'static> AsyncIOSystem<I, O> {
101
126
let input: BoxedAnySend = Box :: new ( input) ;
102
127
input_tx. send ( input) . await . unwrap_or_else ( die) ;
103
128
104
- let id = self . id ;
129
+ let id = * self . id ;
105
130
self . world
106
131
. apply ( move |world : & mut World | {
107
132
let input = input_rx. try_recv ( ) . unwrap_or_else ( die) ;
@@ -114,12 +139,29 @@ impl<I: Send + 'static, O: Send + 'static> AsyncIOSystem<I, O> {
114
139
let concrete = boxed. downcast ( ) . unwrap_or_else ( die) ;
115
140
* concrete
116
141
}
142
+
143
+ /// Unregister the system.
144
+ ///
145
+ /// If multiple clones of the AsyncIOSystem exist, a reference counter will be
146
+ /// decremented instead. The system will be unregistered when the counter
147
+ /// decrements to zero.
148
+ pub async fn unregister ( self ) {
149
+ let Self { id, world, _pd } = self ;
150
+ if let Some ( id) = Arc :: into_inner ( id) {
151
+ world
152
+ . apply ( move |world : & mut World | {
153
+ world. remove_system ( id) . unwrap_or_else ( die) ;
154
+ } )
155
+ . await
156
+ }
157
+ }
117
158
}
118
159
119
160
#[ cfg( test) ]
120
161
mod tests {
121
162
use crate :: world:: AsyncWorld ;
122
163
use crate :: AsyncEcsPlugin ;
164
+ use bevy:: ecs:: system:: RegisteredSystemError ;
123
165
use bevy:: prelude:: * ;
124
166
use bevy:: tasks:: AsyncComputeTaskPool ;
125
167
@@ -182,6 +224,46 @@ mod tests {
182
224
assert_counter ! ( id, 1 , & app. world) ;
183
225
}
184
226
227
+ #[ test]
228
+ fn normal_unregister ( ) {
229
+ let mut app = App :: new ( ) ;
230
+ app. add_plugins ( ( MinimalPlugins , AsyncEcsPlugin ) ) ;
231
+ let id = app. world . spawn ( Counter ( 0 ) ) . id ( ) ;
232
+ assert_counter ! ( id, 0 , & app. world) ;
233
+
234
+ let ( sender, receiver) = async_channel:: bounded ( 1 ) ;
235
+ let async_world = AsyncWorld :: from_world ( & mut app. world ) ;
236
+
237
+ AsyncComputeTaskPool :: get ( )
238
+ . spawn ( async move {
239
+ let increase_counter_all = async_world. register_system ( increase_counter_all) . await ;
240
+ let ica2 = increase_counter_all. clone ( ) ;
241
+ increase_counter_all. unregister ( ) . await ;
242
+
243
+ ica2. run ( ) . await ;
244
+
245
+ let id = * ica2. id ;
246
+ ica2. unregister ( ) . await ;
247
+ sender. send ( id) . await . unwrap ( ) ;
248
+ } )
249
+ . detach ( ) ;
250
+
251
+ let system_id = loop {
252
+ match receiver. try_recv ( ) {
253
+ Ok ( id) => break id,
254
+ Err ( _) => app. update ( ) ,
255
+ }
256
+ } ;
257
+ app. update ( ) ;
258
+
259
+ let err = app. world . remove_system ( system_id) ;
260
+ assert_counter ! ( id, 1 , & app. world) ;
261
+ assert ! ( matches!(
262
+ err,
263
+ Err ( RegisteredSystemError :: SystemIdNotRegistered ( _) )
264
+ ) ) ;
265
+ }
266
+
185
267
#[ test]
186
268
fn io ( ) {
187
269
let mut app = App :: new ( ) ;
@@ -218,4 +300,50 @@ mod tests {
218
300
assert_eq ! ( 5 , value) ;
219
301
assert_counter ! ( id, 5 , & app. world) ;
220
302
}
303
+
304
+ #[ test]
305
+ fn io_unregister ( ) {
306
+ let mut app = App :: new ( ) ;
307
+ app. add_plugins ( ( MinimalPlugins , AsyncEcsPlugin ) ) ;
308
+ let id = app. world . spawn ( Counter ( 4 ) ) . id ( ) ;
309
+ assert_counter ! ( id, 4 , & app. world) ;
310
+
311
+ let ( sender, receiver) = async_channel:: bounded ( 1 ) ;
312
+ let async_world = AsyncWorld :: from_world ( & mut app. world ) ;
313
+
314
+ AsyncComputeTaskPool :: get ( )
315
+ . spawn ( async move {
316
+ let increase_counter = async_world
317
+ . register_io_system :: < Entity , ( ) , _ > ( increase_counter)
318
+ . await ;
319
+ let get_counter_value = async_world
320
+ . register_io_system :: < Entity , u8 , _ > ( get_counter_value)
321
+ . await ;
322
+
323
+ let gcv2 = get_counter_value. clone ( ) ;
324
+ get_counter_value. unregister ( ) . await ;
325
+
326
+ increase_counter. run ( id) . await ;
327
+ let value = gcv2. run ( id) . await ;
328
+ sender. send ( ( value, * gcv2. id ) ) . await . unwrap ( ) ;
329
+ gcv2. unregister ( ) . await ;
330
+ } )
331
+ . detach ( ) ;
332
+
333
+ let ( value, system_id) = loop {
334
+ match receiver. try_recv ( ) {
335
+ Ok ( value) => break value,
336
+ Err ( _) => app. update ( ) ,
337
+ }
338
+ } ;
339
+ app. update ( ) ;
340
+
341
+ let err = app. world . remove_system ( system_id) ;
342
+ assert_eq ! ( 5 , value) ;
343
+ assert_counter ! ( id, 5 , & app. world) ;
344
+ assert ! ( matches!(
345
+ err,
346
+ Err ( RegisteredSystemError :: SystemIdNotRegistered ( _) )
347
+ ) ) ;
348
+ }
221
349
}
0 commit comments