From ee87619865426fcb0bd159ca50bcf9642c58205b Mon Sep 17 00:00:00 2001 From: Taleb Zeghmi Date: Thu, 30 Nov 2023 15:17:05 -0800 Subject: [PATCH] AIP-7511 user defined @exit_handler --- metaflow/plugins/__init__.py | 2 + metaflow/plugins/aip/__init__.py | 5 + metaflow/plugins/aip/aip.py | 274 +++++++++++++----- metaflow/plugins/aip/aip_cli.py | 35 +++ metaflow/plugins/aip/aip_constants.py | 3 +- metaflow/plugins/aip/aip_udf_exit_handler.py | 77 +++++ .../plugins/aip/exit_handler_decorator.py | 141 +++++++++ .../aip/tests/flows/raise_error_flow.py | 22 +- .../aip/tests/run_integration_tests.py | 28 ++ 9 files changed, 506 insertions(+), 81 deletions(-) create mode 100644 metaflow/plugins/aip/aip_udf_exit_handler.py create mode 100644 metaflow/plugins/aip/exit_handler_decorator.py diff --git a/metaflow/plugins/__init__.py b/metaflow/plugins/__init__.py index 7c47d3fd92d..2a3923e92f2 100644 --- a/metaflow/plugins/__init__.py +++ b/metaflow/plugins/__init__.py @@ -159,12 +159,14 @@ def get_plugin_cli(): from .aws.step_functions.schedule_decorator import ScheduleDecorator from .project_decorator import ProjectDecorator from .aip.s3_sensor_decorator import S3SensorDecorator +from .aip.exit_handler_decorator import ExitHandlerDecorator FLOW_DECORATORS = [ CondaFlowDecorator, ScheduleDecorator, ProjectDecorator, S3SensorDecorator, + ExitHandlerDecorator, ] _merge_lists(FLOW_DECORATORS, _ext_plugins["FLOW_DECORATORS"], "name") diff --git a/metaflow/plugins/aip/__init__.py b/metaflow/plugins/aip/__init__.py index 1646dc64b5a..0c4c318bfef 100644 --- a/metaflow/plugins/aip/__init__.py +++ b/metaflow/plugins/aip/__init__.py @@ -10,3 +10,8 @@ delete_argo_workflow, to_metaflow_run_id, ) + +from .exit_handler_decorator import ( + exit_handler_resources, + exit_handler_retry, +) diff --git a/metaflow/plugins/aip/aip.py b/metaflow/plugins/aip/aip.py index 957d6ad7aff..d1864f0492a 100644 --- a/metaflow/plugins/aip/aip.py +++ b/metaflow/plugins/aip/aip.py @@ -35,7 +35,7 @@ V1Volume, ) -from metaflow.decorators import FlowDecorator +from metaflow.decorators import FlowDecorator, flow_decorators from metaflow.metaflow_config import ( DATASTORE_SYSROOT_S3, AIP_TTL_SECONDS_AFTER_FINISHED, @@ -52,6 +52,7 @@ PVC_CREATE_RETRY_COUNT, EXIT_HANDLER_RETRY_COUNT, BACKOFF_DURATION, + BACKOFF_DURATION_INT, RETRY_BACKOFF_FACTOR, ) from metaflow.plugins.aip.aip_decorator import AIPException @@ -203,6 +204,7 @@ def __init__( self.sqs_url_on_error = sqs_url_on_error self.sqs_role_arn_on_error = sqs_role_arn_on_error self._client = None + self._exit_handler_created = False @classmethod def trigger(cls, kubernetes_namespace: str, name: str, parameters=None): @@ -294,32 +296,40 @@ def _create_workflow_yaml( KubeflowPipelines._add_archive_section_to_cards_artifacts(workflow) - if "onExit" in workflow["spec"]: + if self._exit_handler_created: # replace entrypoint content with the exit handler handler content """ # What it looks like beforehand... entrypoint: helloflow templates: - - name: exit-handler-1 + - name: helloflow dag: tasks: - name: end template: end dependencies: [start] - {name: start, template: start} + """ + + """ + # What it looks like afterwards... + entrypoint: helloflow + onExit: exit-handler + templates: - name: helloflow dag: tasks: - - {name: exit-handler-1, template: exit-handler-1} - - {name: sqs-exit-handler, template: sqs-exit-handler} + - name: end + template: end + dependencies: [start] + - {name: start, template: start} + - name: exit-handler + dag: + tasks: + - {name: exit-handler-1, template: exit-handler-1} + - {name: sqs-exit-handler, template: sqs-exit-handler} + - {name: user-defined-exit-handler, template: user-defined-exit-handler} """ - # find the exit-handler-1 template - exit_handler_template: dict = [ - template - for template in workflow["spec"]["templates"] - if template["name"] == "exit-handler-1" - ][0] - # find the entrypoint template entrypoint_template: dict = [ template @@ -327,15 +337,18 @@ def _create_workflow_yaml( if template["name"] == workflow["spec"]["entrypoint"] ][0] - # replace the entrypoint template with the exit handler template - entrypoint_template["dag"] = exit_handler_template["dag"] + # remove exit handlers from the entrypoint template + entrypoint_template["dag"]["tasks"] = [ + task + for task in entrypoint_template["dag"]["tasks"] + if "exit-handler" not in task["name"] + ] - # rename exit-handler-1 to exit-handler - exit_handler_template["name"] = "exit-handler" + # initialize the exit-handler template + exit_handler_template: dict = {"name": "exit-handler", "dag": {"tasks": []}} + workflow["spec"]["templates"].append(exit_handler_template) workflow["spec"]["onExit"] = "exit-handler" - # initialize - exit_handler_template["dag"] = {"tasks": []} if self.sqs_url_on_error: exit_handler_template["dag"]["tasks"].append( { @@ -360,6 +373,27 @@ def _create_workflow_yaml( exit_handler_template["dag"]["tasks"].append(notify_task) + udf_handler: Optional[FlowDecorator] = next( + (d for d in flow_decorators() if d.name == "exit_handler"), None + ) + if udf_handler: + udf_task = { + "name": "user-defined-exit-handler", + "template": "user-defined-exit-handler", + } + + if udf_handler.attributes.get( + "on_success", True + ) and udf_handler.attributes.get("on_failure", True): + # always run + pass + elif udf_handler.attributes.get("on_success", True): + udf_task["when"] = "{{workflow.status}} == 'Succeeded'" + else: + udf_task["when"] = "{{workflow.status}} != 'Succeeded'" + + exit_handler_template["dag"]["tasks"].append(udf_task) + return workflow @staticmethod @@ -521,6 +555,15 @@ def _get_retry_backoff_factor(node: DAGNode) -> Optional[float]: return int(val) return None + @staticmethod + def _to_k8s_resource_format(resource: str, value: Union[int, float, str]) -> str: + value = str(value) + + # Defaults memory unit to megabyte + if resource in ["memory", "volume"] and value.isnumeric(): + value = f"{value}M" + return value + @staticmethod def _get_resource_requirements(node: DAGNode) -> Dict[str, str]: """ @@ -541,22 +584,6 @@ def _get_resource_requirements(node: DAGNode) -> Dict[str, str]: @step def my_aip_step(): ... """ - - def to_k8s_resource_format(resource: str, value: Union[int, float, str]) -> str: - value = str(value) - - # Defaults memory unit to megabyte - if ( - resource - in [ - "memory", - "volume", - ] - and value.isnumeric() - ): - value = f"{value}M" - return value - resource_requirements = {} for deco in node.decorators: if isinstance(deco, ResourcesDecorator): @@ -568,7 +595,9 @@ def to_k8s_resource_format(resource: str, value: Union[int, float, str]) -> str: for attr_key, attr_value in deco.attributes.items(): if attr_value is not None: - resource_requirements[attr_key] = to_k8s_resource_format( + resource_requirements[ + attr_key + ] = KubeflowPipelines._to_k8s_resource_format( attr_key, attr_value ) @@ -1304,49 +1333,31 @@ def call_build_kfp_dag(workflow_uid_op: ContainerOp): ), ) - if self.notify or self.sqs_url_on_error: - op = ( - self._create_notify_exit_handler_op( - flow_variables.package_commands, flow_parameters - ) - if self.notify - else None - ) + # The following exit handlers get created and added as a ContainerOp + # and also as a parallel task to the Flow dag + # We remove them and introduce a new dag invoked by Argo onExit + notify_op: ContainerOp = self._create_notify_exit_handler_op( + flow_variables.package_commands, flow_parameters + ) + sqs_op: Optional[ContainerOp] = self._create_sqs_exit_handler_op( + flow_variables.package_commands, flow_parameters + ) + udf_op: Optional[ContainerOp] = self._create_user_defined_exit_handler_op( + flow_variables.package_commands, flow_parameters + ) + self._exit_handler_created: bool = ( + notify_op or sqs_op or udf_op + ) is not None - # The following exit handler gets created and added as a ContainerOp - # and also as a parallel task to the Argo template "exit-handler-1" - # (the hardcoded kfp compiler name of the exit handler) - # We replace, and rename, this parallel task dag with dag of steps in _create_workflow_yaml(). - op2 = ( - self._create_sqs_exit_handler_op( - flow_variables.package_commands, flow_parameters - ) - if self.sqs_url_on_error - else None - ) - with dsl.ExitHandler(op if op else op2): - s3_sensor_op: Optional[ContainerOp] = self.create_s3_sensor_op( - flow_variables, - ) - workflow_uid_op: Optional[ - ContainerOp - ] = self._create_workflow_uid_op( - s3_sensor_op.output if s3_sensor_op else "", - step_name_to_aip_component, - flow_variables.package_commands, - ) - call_build_kfp_dag(workflow_uid_op) - else: - # TODO: can this and above duplicated code be in a function? - s3_sensor_op: Optional[ContainerOp] = self.create_s3_sensor_op( - flow_variables, - ) - workflow_uid_op: Optional[ContainerOp] = self._create_workflow_uid_op( - s3_sensor_op.output if s3_sensor_op else "", - step_name_to_aip_component, - flow_variables.package_commands, - ) - call_build_kfp_dag(workflow_uid_op) + s3_sensor_op: Optional[ContainerOp] = self.create_s3_sensor_op( + flow_variables, + ) + workflow_uid_op: Optional[ContainerOp] = self._create_workflow_uid_op( + s3_sensor_op.output if s3_sensor_op else "", + step_name_to_aip_component, + flow_variables.package_commands, + ) + call_build_kfp_dag(workflow_uid_op) # Instruct KFP of the DAG order by iterating over the Metaflow # graph nodes. Each Metaflow graph node has in_funcs (nodes that @@ -1636,7 +1647,10 @@ def _create_sqs_exit_handler_op( self, package_commands: str, flow_parameters: Dict, - ) -> ContainerOp: + ) -> Optional[ContainerOp]: + if not self.sqs_url_on_error: + return None + env_variables: dict = { key: from_conf(key) for key in [ @@ -1660,7 +1674,10 @@ def _create_notify_exit_handler_op( self, package_commands: str, flow_parameters: Dict, - ) -> ContainerOp: + ) -> Optional[ContainerOp]: + if not self.notify: + return None + env_variables: dict = { key: from_conf(key) for key in [ @@ -1687,6 +1704,43 @@ def _create_notify_exit_handler_op( flag="--run_email_notify", ) + def _create_user_defined_exit_handler_op( + self, + package_commands: str, + flow_parameters: Dict, + ) -> Optional[ContainerOp]: + udf_handler: Optional[FlowDecorator] = next( + (d for d in flow_decorators() if d.name == "exit_handler"), None + ) + if not udf_handler: + return None + + env_variables: dict = { + key: from_conf(key) + for key in [ + "METAFLOW_NOTIFY_EMAIL_FROM", + "METAFLOW_NOTIFY_EMAIL_SMTP_HOST", + "METAFLOW_NOTIFY_EMAIL_SMTP_PORT", + "METAFLOW_NOTIFY_EMAIL_BODY", + "ARGO_RUN_URL_PREFIX", + ] + if from_conf(key) + } + + if self.notify_on_error: + env_variables["METAFLOW_NOTIFY_ON_ERROR"] = self.notify_on_error + + if self.notify_on_success: + env_variables["METAFLOW_NOTIFY_ON_SUCCESS"] = self.notify_on_success + + return self._get_user_defined_exit_handler_op( + udf_handler, + flow_parameters, + env_variables, + package_commands, + name="user-defined-exit-handler", + ) + def _get_aip_exit_handler_op( self, flow_parameters: Dict, @@ -1727,3 +1781,65 @@ def _get_aip_exit_handler_op( backoff_factor=RETRY_BACKOFF_FACTOR, ) ) + + def _get_user_defined_exit_handler_op( + self, + udf_handler: FlowDecorator, + flow_parameters: Dict, + env_variables: Dict, + package_commands: str, + name: str, + ) -> ContainerOp: + # when there are no flow parameters argo complains + # that {{workflow.parameters}} failed to resolve + # see https://github.com/argoproj/argo-workflows/issues/6036 + flow_parameters_json = f"'{FLOW_PARAMETERS_JSON}'" + + top_level: str = "--quiet --no-pylint" + + # capture metaflow configs from client to be used at runtime + # client configs have the highest precedence + metaflow_configs = dict( + METAFLOW_DATASTORE_SYSROOT_S3=DATASTORE_SYSROOT_S3, + METAFLOW_USER=METAFLOW_USER, + ) + + exit_handler_command = [ + "bash", + "-ec", + ( + f"{package_commands}" + f" && METAFLOW_USER=aip-user python {os.path.basename(sys.argv[0])} {top_level} aip user-defined-exit-handler" + f" --flow_name {self.name}" + " --run_id {{workflow.name}}" + f" --env_variables_json {json.dumps(json.dumps(env_variables))}" + f" --flow_parameters_json {flow_parameters_json if flow_parameters else '{}'}" + " --status {{workflow.status}}" + f" --metaflow_configs_json {json.dumps(json.dumps(metaflow_configs))}" + " --retries {{retries}}" + ), + ] + + container_op = dsl.ContainerOp( + name=name, + image=self.base_image, + command=exit_handler_command, + ).set_display_name(name) + + func = udf_handler.attributes["func"] + if hasattr(func, "memory"): + mem = KubeflowPipelines._to_k8s_resource_format("memory", func.memory) + container_op.container.set_memory_request(mem) + container_op.container.set_memory_limit(mem) + if hasattr(func, "cpu"): + container_op.container.set_cpu_request(func.cpu) + container_op.container.set_cpu_limit(func.cpu) + + container_op.set_retry( + getattr(func, "retries", EXIT_HANDLER_RETRY_COUNT), + policy="Always", + backoff_duration=f"{getattr(func, 'minutes_between_retries', BACKOFF_DURATION_INT)}m", + backoff_factor=getattr(func, "retry_backoff_factor", RETRY_BACKOFF_FACTOR), + ) + + return container_op diff --git a/metaflow/plugins/aip/aip_cli.py b/metaflow/plugins/aip/aip_cli.py index c3615f9e924..d9a88ea51e9 100644 --- a/metaflow/plugins/aip/aip_cli.py +++ b/metaflow/plugins/aip/aip_cli.py @@ -16,6 +16,7 @@ AIP_MAX_RUN_CONCURRENCY, ) from metaflow.package import MetaflowPackage +from metaflow.plugins.aip.aip_udf_exit_handler import invoke_user_defined_exit_handler from metaflow.plugins.aws.step_functions.step_functions_cli import ( check_metadata_service_version, ) @@ -44,6 +45,40 @@ def kubeflow_pipelines(obj): pass +@kubeflow_pipelines.command( + help="Internal step command to invoke user defined exit handler" +) +@click.option("--flow_name") +@click.option("--status") +@click.option("--run_id") +@click.option("--env_variables_json") +@click.option("--flow_parameters_json") +@click.option("--metaflow_configs_json") +@click.option("--retries") +@click.pass_obj +def user_defined_exit_handler( + obj, + flow_name: str, + status: str, + run_id: str, + env_variables_json: str, + flow_parameters_json: str, + metaflow_configs_json: str, + retries: int, +): + # call user defined exit handler + invoke_user_defined_exit_handler( + obj.graph, + flow_name, + status, + run_id, + env_variables_json, + flow_parameters_json, + metaflow_configs_json, + retries, + ) + + @kubeflow_pipelines.command(help="Internal step command to initialize parent taskIds") @click.option("--run-id") @click.option("--step_name") diff --git a/metaflow/plugins/aip/aip_constants.py b/metaflow/plugins/aip/aip_constants.py index bea3e395af0..49e578b0db0 100644 --- a/metaflow/plugins/aip/aip_constants.py +++ b/metaflow/plugins/aip/aip_constants.py @@ -14,7 +14,8 @@ S3_SENSOR_RETRY_COUNT = 7 PVC_CREATE_RETRY_COUNT = 7 EXIT_HANDLER_RETRY_COUNT = 7 -BACKOFF_DURATION = "2m" # 2 minute +BACKOFF_DURATION_INT = "2" +BACKOFF_DURATION = f"{BACKOFF_DURATION_INT}m" RETRY_BACKOFF_FACTOR = 3 # 2 * 3 * 3 * 3 = 54 minutes STEP_ENVIRONMENT_VARIABLES = "/tmp/step-environment-variables.sh" diff --git a/metaflow/plugins/aip/aip_udf_exit_handler.py b/metaflow/plugins/aip/aip_udf_exit_handler.py new file mode 100644 index 00000000000..d21ce2e36f4 --- /dev/null +++ b/metaflow/plugins/aip/aip_udf_exit_handler.py @@ -0,0 +1,77 @@ +from typing import Dict +from metaflow._vendor import click +import logging + +from metaflow.decorators import flow_decorators, FlowDecorator +from metaflow.graph import FlowGraph + +_logger = logging.getLogger(__name__) + + +def invoke_user_defined_exit_handler( + graph: FlowGraph, + flow_name: str, + status: str, + run_id: str, + env_variables_json: str, + flow_parameters_json: str, + metaflow_configs_json: str, + retries: int, +): + """ + The environment variables that this depends on: + METAFLOW_NOTIFY_ON_SUCCESS + METAFLOW_NOTIFY_ON_ERROR + METAFLOW_NOTIFY_EMAIL_SMTP_HOST + METAFLOW_NOTIFY_EMAIL_SMTP_PORT + METAFLOW_NOTIFY_EMAIL_FROM + METAFLOW_SQS_URL_ON_ERROR + METAFLOW_SQS_ROLE_ARN_ON_ERROR + K8S_CLUSTER_ENV + POD_NAMESPACE + MF_ARGO_WORKFLOW_NAME + METAFLOW_NOTIFY_EMAIL_BODY + """ + import json + import os + + env_variables: Dict[str, str] = json.loads(env_variables_json) + + def get_env(name, default=None) -> str: + return env_variables.get(name, os.environ.get(name, default=default)) + + cluster_env = get_env("K8S_CLUSTER_ENV", "") + argo_workflow_name = get_env("MF_ARGO_WORKFLOW_NAME", "") + argo_url_prefix = get_env("ARGO_RUN_URL_PREFIX", "") + k8s_namespace = get_env("POD_NAMESPACE", "") + argo_ui_url = f"{argo_url_prefix}/argo-ui/workflows/{k8s_namespace}/{run_id}" + + metaflow_configs: Dict[str, str] = json.loads(metaflow_configs_json) + metaflow_configs_new: Dict[str, str] = { + name: value for name, value in metaflow_configs.items() if value + } + if ( + not "METAFLOW_USER" in metaflow_configs_new + or metaflow_configs_new["METAFLOW_USER"] is None + ): + metaflow_configs_new["METAFLOW_USER"] = "aip-user" + + # update os.environ if the value is not None + # from metaflow_configs_new + for name, value in metaflow_configs_new.items(): + if value is not None and os.environ.get(name, None) is None: + os.environ[name] = value + + print(f"Flow completed with status={status}") + + udf_exit_handler: FlowDecorator = next( + d for d in flow_decorators() if d.name == "exit_handler" + ) + udf_exit_handler.attributes["func"]( + status=status, + flow_parameters=json.loads(flow_parameters_json), + argo_workflow_run_name=argo_workflow_name, + metaflow_run_id=run_id, + argo_ui_url=argo_ui_url, + retries=int(retries), + ) diff --git a/metaflow/plugins/aip/exit_handler_decorator.py b/metaflow/plugins/aip/exit_handler_decorator.py new file mode 100644 index 00000000000..0c4734f1304 --- /dev/null +++ b/metaflow/plugins/aip/exit_handler_decorator.py @@ -0,0 +1,141 @@ +import functools +from typing import Dict + +from metaflow.decorators import FlowDecorator +from metaflow.plugins.aip.aip_constants import ( + EXIT_HANDLER_RETRY_COUNT, + RETRY_BACKOFF_FACTOR, + BACKOFF_DURATION_INT, +) +from metaflow.plugins.aip.aip_decorator import AIPException + + +def exit_handler_resources(cpu=None, memory=None): + """ + Args: + cpu : Union[int, float, str] + AIP: Number of CPUs required for this step. Defaults to None - use cluster setting. + Accept int, float, or str. + Support millicpu requests using float or string ending in 'm'. + Requests with decimal points, like 0.1, are converted to 100m by aip + Precision finer than 1m is not allowed. + memory : Union[int, str] + AIP: Memory required for this step. Default to None - use cluster setting. + See notes above for more units. + """ + + def inner_decorator(f): + @functools.wraps(f) + def wrapped(*args, **kwargs): + response = f(*args, **kwargs) + return response + + wrapped.cpu = cpu + wrapped.memory = memory + return wrapped + + return inner_decorator + + +def exit_handler_retry( + times: int = EXIT_HANDLER_RETRY_COUNT, + minutes_between_retries: int = BACKOFF_DURATION_INT, + retry_backoff_factor: int = RETRY_BACKOFF_FACTOR, +): + """ + Args: + times : int + Number of times to retry this step. Defaults to 3 + minutes_between_retries : int + Number of minutes between retries + retry_backoff_factor : int + Exponential backoff factor. If set to 3, the time between retries will triple each time. + Defaults to 3. + """ + # validate that minutes_between_retries is an integer + if not isinstance(minutes_between_retries, int): + raise AIPException("minutes_between_retries must be an integer") + + def inner_decorator(f): + @functools.wraps(f) + def wrapped(*args, **kwargs): + response = f(*args, **kwargs) + return response + + wrapped.retries = times + wrapped.minutes_between_retries = minutes_between_retries + wrapped.retry_backoff_factor = retry_backoff_factor + return wrapped + + return inner_decorator + + +class ExitHandlerDecorator(FlowDecorator): + """ + Exit handler function that is run after the Flow run has completed, + irrespective of the run success or failure. Given that an exit_handler has + a retry, please note that it could be invoked multiple times, and hence + should ideally be idempotent, or not cause issues if the whole function is + run multiple times. + + Note: + You can only specify this decorator once in a Flow, else a + DuplicateFlowDecoratorException is raised. + + Parameters + ---------- + func (ExitHandlerDecorator): The user defined exit handler to invoke. + on_failure (bool): Whether to invoke the exit handler on Flow failure. The default is True. + on_success (bool): Whether to invoke the exit handler on Flow success. The default is False. + Returns + ------ + None: This function does not return anything. + + >>> from metaflow import FlowSpec, step, exit_handler + + >>> @exit_handler_resources(memory="2G") + >>> def my_exit_handler( + >>> status: str, + >>> flow_parameters: Dict[str, str], + >>> argo_workflow_run_name: str, + >>> metaflow_run_id: str, + >>> argo_ui_url: str, + >>> ) -> None: + >>> ''' + >>> Args: + >>> status (str): The success/failure status of the Argo run. + >>> flow_parameters (dict): The parameters passed to the Flow. + >>> argo_workflow_run_name (str): The name of the Argo workflow run. + >>> metaflow_run_id (str): The Metaflow run ID, it may not exist. + >>> argo_ui_url (str): The URL of the Argo UI. + >>> Returns: + >>> None: This function does not return anything. + >>> ''' + >>> pass + >>> + >>> @exit_handler(func=my_exit_handler) + >>> class MyFlow + >>> pass + + """ + + name = "exit_handler" + defaults = { + "func": None, + "on_failure": True, + "on_success": False, + } + + def flow_init( + self, flow, graph, environment, flow_datastore, metadata, logger, echo, options + ): + func = self.attributes.get("func") + if func is None: + raise AIPException("must specify the function.") + + if not callable(func): + raise AIPException("func must be callable") + + # validate that either on_failure or on_success is True + if not self.attributes["on_failure"] and not self.attributes["on_success"]: + raise AIPException("Either on_failure or on_success must be True") diff --git a/metaflow/plugins/aip/tests/flows/raise_error_flow.py b/metaflow/plugins/aip/tests/flows/raise_error_flow.py index 61094f74e4e..9cb96bc84b8 100644 --- a/metaflow/plugins/aip/tests/flows/raise_error_flow.py +++ b/metaflow/plugins/aip/tests/flows/raise_error_flow.py @@ -1,6 +1,26 @@ -from metaflow import FlowSpec, step +from metaflow import FlowSpec, step, exit_handler +from metaflow.plugins.aip import exit_handler_retry +@exit_handler_retry(times=1, minutes_between_retries=0) +def my_exit_handler( + status: str, + flow_parameters: dict, + argo_workflow_run_name: str, + metaflow_run_id: str, + argo_ui_url: str, + retries: int, +) -> None: + if status == "Succeeded": + print("Congratulations! The flow succeeded.") + else: + print("Oh no! The flow failed.") + + if retries == 0: + raise Exception("oopsie") + + +@exit_handler(func=my_exit_handler) class RaiseErrorFlow(FlowSpec): """ This flow is intended to "test" the Metaflow integration testing framework. diff --git a/metaflow/plugins/aip/tests/run_integration_tests.py b/metaflow/plugins/aip/tests/run_integration_tests.py index 65033900a0c..85910830278 100644 --- a/metaflow/plugins/aip/tests/run_integration_tests.py +++ b/metaflow/plugins/aip/tests/run_integration_tests.py @@ -336,6 +336,34 @@ def test_kfp_pod_default(pytestconfig) -> None: ) +def test_user_defined_exit_handler(pytestconfig) -> None: + with tempfile.TemporaryDirectory() as yaml_tmp_dir: + yaml_file_path: str = os.path.join(yaml_tmp_dir, "s3_sensor_flow.yaml") + + compile_to_yaml_cmd: str = ( + f" {_python()} flows/raise_error_flow.py --no-pylint --datastore s3 aip run" + f" --no-s3-code-package --yaml-only --notify --pipeline-path {yaml_file_path} " + f"--tag {pytestconfig.getoption('pipeline_tag')} " + ) + flow_yaml = get_compiled_yaml(compile_to_yaml_cmd, yaml_file_path) + + # find the exit-handler spec template + exit_handler_template = next( + template + for template in flow_yaml["spec"]["templates"] + if template["name"] == "exit-handler" + ) + + # find user-defined-exit-handler in the exit-handler dag + user_defined_exit_handler = next( + task + for task in exit_handler_template["dag"]["tasks"] + if task["name"] == "user-defined-exit-handler" + ) + + assert user_defined_exit_handler + + def test_kubernetes_service_account_compile_only(pytestconfig) -> None: service_account = "test-service-account" with tempfile.TemporaryDirectory() as yaml_tmp_dir: