@@ -98,6 +98,10 @@ def delete_runpath(run_path: str) -> None:
98
98
shutil .rmtree (run_path )
99
99
100
100
101
+ class UserCancelled (Exception ):
102
+ pass
103
+
104
+
101
105
class _LogAggregration (logging .Handler ):
102
106
def __init__ (self , messages : MutableSequence [str ]) -> None :
103
107
self .messages = messages
@@ -572,10 +576,11 @@ async def run_monitor(
572
576
logger .debug (
573
577
"observed evaluation cancelled event, exit drainer"
574
578
)
575
- # Allow track() to emit an EndEvent.
576
- return False
579
+ raise UserCancelled (
580
+ "Experiment cancelled by user during evaluation"
581
+ )
577
582
elif type (event ) is EETerminated :
578
- logger .debug ("got terminator event" )
583
+ logger .debug ("got terminated event" )
579
584
580
585
if not self ._end_queue .empty ():
581
586
logger .debug ("Run model canceled - during evaluation" )
@@ -584,7 +589,9 @@ async def run_monitor(
584
589
logger .debug (
585
590
"Run model canceled - during evaluation - cancel sent"
586
591
)
587
- except BaseException as e :
592
+ except UserCancelled :
593
+ raise
594
+ except Exception as e :
588
595
logger .exception (f"unexpected error: { e } " )
589
596
# We really don't know what happened... shut down
590
597
# the thread and get out of here. The monitor has
@@ -602,7 +609,8 @@ async def run_ensemble_evaluator_async(
602
609
if not self ._end_queue .empty ():
603
610
logger .debug ("Run model canceled - pre evaluation" )
604
611
self ._end_queue .get ()
605
- return []
612
+ raise UserCancelled ("Experiment cancelled by user in pre evaluation" )
613
+
606
614
ee_ensemble = self ._build_ensemble (run_args , ensemble .experiment_id )
607
615
evaluator = EnsembleEvaluator (
608
616
ee_ensemble ,
@@ -623,8 +631,14 @@ async def run_ensemble_evaluator_async(
623
631
if not self ._end_queue .empty ():
624
632
logger .debug ("Run model canceled - post evaluation" )
625
633
self ._end_queue .get ()
626
- await evaluator_task
627
- return []
634
+ try :
635
+ await evaluator_task
636
+ except Exception as e :
637
+ raise Exception (
638
+ "Exception occured during user initiatied termination of experiment"
639
+ ) from e
640
+ raise UserCancelled ("Experiment cancelled by user in post evaluation" )
641
+
628
642
await evaluator_task
629
643
ensemble .refresh_ensemble_state ()
630
644
@@ -638,10 +652,9 @@ def run_ensemble_evaluator(
638
652
ensemble : Ensemble ,
639
653
ee_config : EvaluatorServerConfig ,
640
654
) -> list [int ]:
641
- successful_realizations = asyncio .run (
655
+ return asyncio .run (
642
656
self .run_ensemble_evaluator_async (run_args , ensemble , ee_config )
643
657
)
644
- return successful_realizations
645
658
646
659
def _build_ensemble (
647
660
self ,
@@ -761,11 +774,16 @@ def _evaluate_and_postprocess(
761
774
"run_paths" : self .run_paths ,
762
775
},
763
776
)
764
- successful_realizations = self .run_ensemble_evaluator (
765
- run_args ,
766
- ensemble ,
767
- evaluator_server_config ,
768
- )
777
+ try :
778
+ successful_realizations = self .run_ensemble_evaluator (
779
+ run_args ,
780
+ ensemble ,
781
+ evaluator_server_config ,
782
+ )
783
+ except UserCancelled :
784
+ self .active_realizations = [False for _ in self .active_realizations ]
785
+ raise
786
+
769
787
starting_realizations = [real .iens for real in run_args if real .active ]
770
788
failed_realizations = list (
771
789
set (starting_realizations ) - set (successful_realizations )
0 commit comments