88
99from eos .configuration .configuration_manager import ConfigurationManager
1010from eos .configuration .constants import EOS_COMPUTER_NAME
11+ from eos .allocation .entities .device_allocation import DeviceAllocationModel
1112from eos .devices .entities .device import Device , DeviceStatus , DeviceModel
1213from eos .devices .exceptions import EosDeviceStateError , EosDeviceInitializationError
1314from eos .logging .batch_error_logger import batch_error , raise_batched_errors
@@ -125,15 +126,14 @@ async def reload_devices(self, db: AsyncDbSession, lab_name: str, device_names:
125126 raise
126127
127128 # Cleanup the specific device actors
128- reload_tasks = []
129-
130- for device_name in device_names :
131- actor_name = f"{ lab_name } .{ device_name } "
132- if actor_name in self ._device_actor_handles :
133- reload_tasks .append (self ._cleanup_single_device (actor_name ))
129+ actors_to_cleanup = [
130+ f"{ lab_name } .{ device_name } "
131+ for device_name in device_names
132+ if f"{ lab_name } .{ device_name } " in self ._device_actor_handles
133+ ]
134134
135- if reload_tasks :
136- await asyncio . gather ( * reload_tasks )
135+ if actors_to_cleanup :
136+ await self . _cleanup_device_actors_with_timeout ( actors_to_cleanup )
137137
138138 # Remove device records from database
139139 await db .execute (
@@ -183,69 +183,96 @@ async def cleanup_device_actors(self, db: AsyncDbSession, lab_names: list[str] |
183183 if not actor_names :
184184 return
185185
186- cleanup_tasks = [
187- self ._cleanup_single_device (actor_name )
188- for actor_name in actor_names
189- if actor_name in self ._device_actor_handles
190- ]
191-
192- if cleanup_tasks :
193- await asyncio .gather (* cleanup_tasks )
186+ actors_to_cleanup = [name for name in actor_names if name in self ._device_actor_handles ]
187+ if actors_to_cleanup :
188+ await self ._cleanup_device_actors_with_timeout (actors_to_cleanup )
194189
195190 await self .cleanup_devices (db , lab_names )
196191
197- async def _get_actor_names_to_cleanup (self , db : AsyncDbSession , lab_names : list [str ] | None ) -> list [str ]:
198- """Get actor names that need to be cleaned up."""
199- if not lab_names :
200- return list (self ._device_actor_handles .keys ())
192+ async def _cleanup_device_actors_with_timeout (self , actor_names : list [str ], cleanup_timeout : float = 30.0 ) -> None :
193+ """Clean up multiple device actors concurrently with a timeout."""
194+ # Start cleanup on all actors and collect the object refs
195+ cleanup_refs : dict [ray .ObjectRef , str ] = {}
196+ for actor_name in actor_names :
197+ actor_handle = self ._device_actor_handles .get (actor_name )
198+ if actor_handle is None :
199+ continue
201200
202- result = await db .execute (select (DeviceModel ).where (DeviceModel .lab_name .in_ (lab_names )))
203- devices = [Device .model_validate (device ) for device in result .scalars ()]
204- return [device .get_actor_name () for device in devices ]
201+ try :
202+ log .info (f"Cleaning up device '{ actor_name } '..." )
203+ cleanup_ref = actor_handle .cleanup .remote ()
204+ cleanup_refs [cleanup_ref ] = actor_name
205+ except Exception as e :
206+ log .error (f"Failed to start cleanup for device '{ actor_name } ': { e } " )
207+ self ._forcefully_kill_actor (actor_name )
205208
206- async def _cleanup_single_device ( self , actor_name : str ) -> None :
207- """Clean up a single device actor with timeout.
209+ if not cleanup_refs :
210+ return
208211
209- Attempts to gracefully clean up a device actor. If the cleanup
210- doesn't complete within 30 seconds, forcefully kills the actor.
212+ # Process cleanups as they complete, with overall timeout
213+ pending_refs = set (cleanup_refs .keys ())
214+ start_time = asyncio .get_event_loop ().time ()
211215
212- :param actor_name: The name of the actor to clean up
213- """
214- if actor_name not in self ._device_actor_handles :
215- return
216+ while pending_refs :
217+ elapsed = asyncio .get_event_loop ().time () - start_time
218+ remaining_timeout = max (0 , cleanup_timeout - elapsed )
216219
217- actor_handle = self ._device_actor_handles [actor_name ]
218- success = False
219- cleanup_timeout = 30.0
220+ if remaining_timeout <= 0 :
221+ break
220222
221- try :
222- log .info (f"Cleaning up device actor '{ actor_name } '..." )
223- cleanup_ref = actor_handle .cleanup .remote ()
223+ ready_refs , _ = ray .wait (
224+ list (pending_refs ),
225+ num_returns = 1 ,
226+ timeout = remaining_timeout ,
227+ )
224228
225- # Wait for cleanup to complete with timeout
226- ready_refs , _ = ray .wait ([cleanup_ref ], timeout = cleanup_timeout )
229+ if not ready_refs :
230+ # Timeout reached with no more completions
231+ break
227232
228- if cleanup_ref in ready_refs :
229- log .info (f"Cleaned up device actor '{ actor_name } '" )
230- success = True
231- else :
232- log .warning (
233- f"Timed out cleaning up device actor '{ actor_name } ' after { cleanup_timeout } seconds, "
234- f"will forcefully kill..."
235- )
236- except Exception as e :
237- log .error (f"Failed cleaning up device actor '{ actor_name } ': { e } " )
238- finally :
239- # Kill if cleanup wasn't successful
240- if not success and actor_name in self ._device_actor_handles :
233+ for ref in ready_refs :
234+ pending_refs .discard (ref )
235+ actor_name = cleanup_refs [ref ]
241236 try :
242- log . warning ( f"Forcefully killing device actor ' { actor_name } '" )
243- ray . kill ( self . _device_actor_handles [ actor_name ] )
237+ ray . get ( ref ) # Check for exceptions
238+ log . info ( f"Cleaned up device ' { actor_name } '" )
244239 except Exception as e :
245- log .error (f"Error killing device actor '{ actor_name } ': { e } " )
240+ log .error (f"Cleanup failed for device '{ actor_name } ': { e } " )
241+ self ._forcefully_kill_actor (actor_name )
242+ finally :
243+ self ._remove_device_references (actor_name )
244+
245+ # Forcefully kill actors that timed out
246+ if pending_refs :
247+ timed_out_actors = [cleanup_refs [ref ] for ref in pending_refs ]
248+ log .warning (
249+ f"Timed out cleaning up { len (timed_out_actors )} device(s) after { cleanup_timeout } seconds: "
250+ f"{ ', ' .join (timed_out_actors )} "
251+ )
252+ for ref in pending_refs :
253+ actor_name = cleanup_refs [ref ]
254+ self ._forcefully_kill_actor (actor_name )
255+ self ._remove_device_references (actor_name )
256+
257+ def _forcefully_kill_actor (self , actor_name : str ) -> None :
258+ """Forcefully kill a device actor."""
259+ if actor_name not in self ._device_actor_handles :
260+ return
261+
262+ try :
263+ log .warning (f"Forcefully killing device '{ actor_name } '" )
264+ ray .kill (self ._device_actor_handles [actor_name ])
265+ except Exception as e :
266+ log .error (f"Error killing device '{ actor_name } ': { e } " )
267+
268+ async def _get_actor_names_to_cleanup (self , db : AsyncDbSession , lab_names : list [str ] | None ) -> list [str ]:
269+ """Get actor names that need to be cleaned up."""
270+ if not lab_names :
271+ return list (self ._device_actor_handles .keys ())
246272
247- # Clean up references regardless of success
248- self ._remove_device_references (actor_name )
273+ result = await db .execute (select (DeviceModel ).where (DeviceModel .lab_name .in_ (lab_names )))
274+ devices = [Device .model_validate (device ) for device in result .scalars ()]
275+ return [device .get_actor_name () for device in devices ]
249276
250277 def _remove_device_references (self , actor_name : str ) -> None :
251278 """Remove device references from internal tracking dictionaries."""
@@ -255,9 +282,11 @@ def _remove_device_references(self, actor_name: str) -> None:
255282 async def cleanup_devices (self , db : AsyncDbSession , lab_names : list [str ] | None = None ) -> None :
256283 """Remove device records from the database."""
257284 if lab_names :
285+ await db .execute (delete (DeviceAllocationModel ).where (DeviceAllocationModel .lab_name .in_ (lab_names )))
258286 await db .execute (delete (DeviceModel ).where (DeviceModel .lab_name .in_ (lab_names )))
259287 log .debug (f"Cleaned up devices for lab(s): { ', ' .join (lab_names )} " )
260288 else :
289+ await db .execute (delete (DeviceAllocationModel ))
261290 await db .execute (delete (DeviceModel ))
262291 log .debug ("Cleaned up all devices" )
263292
0 commit comments