@@ -94,6 +94,10 @@ def delete_runpath(run_path: str) -> None:
94
94
shutil .rmtree (run_path )
95
95
96
96
97
+ class UserCancelled (Exception ):
98
+ pass
99
+
100
+
97
101
class _LogAggregration (logging .Handler ):
98
102
def __init__ (self , messages : MutableSequence [str ]) -> None :
99
103
self .messages = messages
@@ -568,10 +572,11 @@ async def run_monitor(
568
572
logger .debug (
569
573
"observed evaluation cancelled event, exit drainer"
570
574
)
571
- # Allow track() to emit an EndEvent.
572
- return False
575
+ raise UserCancelled (
576
+ "Experiment cancelled by user during evaluation"
577
+ )
573
578
elif type (event ) is EETerminated :
574
- logger .debug ("got terminator event" )
579
+ logger .debug ("got terminated event" )
575
580
576
581
if not self ._end_queue .empty ():
577
582
logger .debug ("Run model canceled - during evaluation" )
@@ -580,7 +585,9 @@ async def run_monitor(
580
585
logger .debug (
581
586
"Run model canceled - during evaluation - cancel sent"
582
587
)
583
- except BaseException as e :
588
+ except UserCancelled :
589
+ raise
590
+ except Exception as e :
584
591
logger .exception (f"unexpected error: { e } " )
585
592
# We really don't know what happened... shut down
586
593
# the thread and get out of here. The monitor has
@@ -598,7 +605,8 @@ async def run_ensemble_evaluator_async(
598
605
if not self ._end_queue .empty ():
599
606
logger .debug ("Run model canceled - pre evaluation" )
600
607
self ._end_queue .get ()
601
- return []
608
+ raise UserCancelled ("Experiment cancelled by user in pre evaluation" )
609
+
602
610
ee_ensemble = self ._build_ensemble (run_args , ensemble .experiment_id )
603
611
evaluator = EnsembleEvaluator (
604
612
ee_ensemble ,
@@ -619,8 +627,14 @@ async def run_ensemble_evaluator_async(
619
627
if not self ._end_queue .empty ():
620
628
logger .debug ("Run model canceled - post evaluation" )
621
629
self ._end_queue .get ()
622
- await evaluator_task
623
- return []
630
+ try :
631
+ await evaluator_task
632
+ except BaseException as e :
633
+ raise Exception (
634
+ "Exception occured during user initiatied termination of experiment"
635
+ ) from e
636
+ raise UserCancelled ("Experiment cancelled by user in post evaluation" )
637
+
624
638
await evaluator_task
625
639
ensemble .refresh_ensemble_state ()
626
640
@@ -634,10 +648,9 @@ def run_ensemble_evaluator(
634
648
ensemble : Ensemble ,
635
649
ee_config : EvaluatorServerConfig ,
636
650
) -> list [int ]:
637
- successful_realizations = asyncio .run (
651
+ return asyncio .run (
638
652
self .run_ensemble_evaluator_async (run_args , ensemble , ee_config )
639
653
)
640
- return successful_realizations
641
654
642
655
def _build_ensemble (
643
656
self ,
@@ -757,11 +770,16 @@ def _evaluate_and_postprocess(
757
770
"run_paths" : self .run_paths ,
758
771
},
759
772
)
760
- successful_realizations = self .run_ensemble_evaluator (
761
- run_args ,
762
- ensemble ,
763
- evaluator_server_config ,
764
- )
773
+ try :
774
+ successful_realizations = self .run_ensemble_evaluator (
775
+ run_args ,
776
+ ensemble ,
777
+ evaluator_server_config ,
778
+ )
779
+ except UserCancelled :
780
+ self .active_realizations = [False for _ in self .active_realizations ]
781
+ raise
782
+
765
783
starting_realizations = [real .iens for real in run_args if real .active ]
766
784
failed_realizations = list (
767
785
set (starting_realizations ) - set (successful_realizations )
0 commit comments