From 2c01f4a6718a9c4b0d807323c20cb9af3273acf6 Mon Sep 17 00:00:00 2001 From: Yun Wu Date: Fri, 9 Aug 2024 17:26:46 -0700 Subject: [PATCH 01/12] Adding ability to run latest argo workflow template --- metaflow/plugins/aip/argo_client.py | 16 +++++- metaflow/plugins/aip/argo_utils.py | 76 +++++++++++++++++++++++++++-- 2 files changed, 87 insertions(+), 5 deletions(-) diff --git a/metaflow/plugins/aip/argo_client.py b/metaflow/plugins/aip/argo_client.py index 076a6bec3d4..69222679904 100644 --- a/metaflow/plugins/aip/argo_client.py +++ b/metaflow/plugins/aip/argo_client.py @@ -1,7 +1,7 @@ # talebz copied from https://github.com/Netflix/metaflow/blob/master/metaflow/plugins/argo/argo_client.py import json -from typing import Any, Dict, Optional +from typing import Any, Dict, Optional, List from metaflow.exception import MetaflowException from metaflow.plugins.aws.eks.kubernetes_client import KubernetesClient @@ -83,6 +83,20 @@ def create_workflow_config_map(self, name: str, config_map: Dict[str, Any]): json.loads(e.body)["message"] if e.body is not None else e.reason ) + def list_workflow_template(self, namespace: Optional[str] = None): + client = self._client.get() + try: + return client.CustomObjectsApi().list_namespaced_custom_object( + group=self._group, + version=self._version, + namespace=namespace or self._namespace, + plural="workflowtemplates", + ) + except client.rest.ApiException as e: + raise ArgoClientException( + json.loads(e.body)["message"] if e.body is not None else e.reason + ) + def trigger_workflow_template(self, name: str, parameters: Optional[Dict] = None): client = self._client.get() body = { diff --git a/metaflow/plugins/aip/argo_utils.py b/metaflow/plugins/aip/argo_utils.py index ab54d445c65..586dcb7851b 100644 --- a/metaflow/plugins/aip/argo_utils.py +++ b/metaflow/plugins/aip/argo_utils.py @@ -14,16 +14,38 @@ def run_argo_workflow( kubernetes_namespace: str, - template_name: str, + template_name: Optional[str] = None, + project_name: Optional[str] = None, + branch_name: Optional[str] = None, + template_name_prefix: Optional[str] = None, parameters: Optional[dict] = None, wait_timeout: Union[int, float, datetime.timedelta] = 0, **kwarg, # Other parameters for wait function ) -> Tuple[str, str]: + """ + Using template_name to trigger a workflow template name with exact match. + + If no template_name is provided, the latest workflow template satisfying + the project_name, branch_name and template_name_prefix will be used. + All of these filters are optional. If not provided, the latest workflow + from the namespace will be used. + """ + client = ArgoClient(namespace=kubernetes_namespace) + + if not template_name: + template_name = get_latest_workflow( + kubernetes_namespace, + project_name=project_name, + branch_name=branch_name, + template_name_prefix=template_name_prefix, + ) + try: # TODO(talebz): add tag of origin-run-id to correlate parent flow - workflow_manifest: Dict[str, Any] = ArgoClient( - namespace=kubernetes_namespace, - ).trigger_workflow_template(template_name, parameters) + logger.info(f"Triggering workflow template: {template_name}") + workflow_manifest: Dict[str, Any] = client.trigger_workflow_template( + template_name, parameters + ) except Exception as e: raise AIPException(str(e)) @@ -41,6 +63,52 @@ def run_argo_workflow( return argo_run_id, argo_run_uid +def get_latest_workflow( + kubernetes_namespace: str, + project_name: Optional[str] = None, + branch_name: Optional[str] = None, + template_name_prefix: Optional[str] = None, +): + # TODO: + # - Add filter by project_id instead of project name - project_id is not added as a label yet. + # - Add filter by flow_name - flow_name is not added as a label yet. + client = ArgoClient(namespace=kubernetes_namespace) + + templates = client.list_workflow_template()["items"] + templates = [ + template + for template in templates + if ( + not project_name + or template["metadata"]["labels"]["gitlab.zgtools.net/project-name"] + == project_name + ) + and ( + not branch_name + or template["metadata"]["labels"]["gitlab.zgtools.net/branch-name"] + == branch_name + ) + and ( + not template_name_prefix + or template["metadata"]["name"].startswith(template_name_prefix) + ) + ] + if not templates: + raise AIPException( + f"No workflow template found with constraints " + f"project_name={project_name}, branch_name={branch_name}, template_name_prefix={template_name_prefix}" + ) + # Sort by creation timestamp to get the latest template. + templates.sort( + key=lambda template: template["metadata"]["creationTimestamp"], reverse=True + ) + template_name = templates[1]["metadata"]["name"] + logger.info( + f"Found {len(templates)} WorkflowTemplates. Using latest workflow template: {template_name}" + ) + return template_name + + def delete_argo_workflow( kubernetes_namespace: str, template_name: str, From cc80d54d214b9e1aee0de8856f5c1d72d5952164 Mon Sep 17 00:00:00 2001 From: Yun Wu Date: Tue, 13 Aug 2024 20:42:13 -0700 Subject: [PATCH 02/12] Refactor Argo utils for clarity --- metaflow/plugins/aip/__init__.py | 10 +- metaflow/plugins/aip/aip_cli.py | 12 +- metaflow/plugins/aip/aip_exit_handler.py | 6 +- metaflow/plugins/aip/aip_udf_exit_handler.py | 4 +- metaflow/plugins/aip/argo_utils.py | 443 ++++++++++-------- .../aip/tests/flows/flow_triggering_flow.py | 25 +- 6 files changed, 276 insertions(+), 224 deletions(-) diff --git a/metaflow/plugins/aip/__init__.py b/metaflow/plugins/aip/__init__.py index 0c4c318bfef..503fa499121 100644 --- a/metaflow/plugins/aip/__init__.py +++ b/metaflow/plugins/aip/__init__.py @@ -3,12 +3,10 @@ ) from .argo_utils import ( - run_argo_workflow, - run_id_to_url, - run_id_to_metaflow_url, - wait_for_argo_run_completion, - delete_argo_workflow, - to_metaflow_run_id, + ArgoHelper, + get_argo_url, + get_metaflow_url, + get_metaflow_run_id, ) from .exit_handler_decorator import ( diff --git a/metaflow/plugins/aip/aip_cli.py b/metaflow/plugins/aip/aip_cli.py index 7e534f2aaab..11fd5389448 100644 --- a/metaflow/plugins/aip/aip_cli.py +++ b/metaflow/plugins/aip/aip_cli.py @@ -21,9 +21,9 @@ check_metadata_service_version, ) from metaflow.plugins.aip.argo_utils import ( - run_id_to_url, - run_id_to_metaflow_url, - to_metaflow_run_id, + get_argo_url, + get_metaflow_url, + get_metaflow_run_id, ) from metaflow.plugins.aip.aip_decorator import AIPException from metaflow.plugins.aip.aip_step_init import save_step_environment_variables @@ -421,9 +421,9 @@ def _echo_workflow_run( ): argo_workflow_name = workflow_manifest["metadata"]["name"] argo_workflow_uid = workflow_manifest["metadata"]["uid"] - metaflow_run_id = to_metaflow_run_id(argo_workflow_uid) - metaflow_ui_url = run_id_to_metaflow_url(flow_name, argo_workflow_uid) - argo_ui_url = run_id_to_url( + metaflow_run_id = get_metaflow_run_id(argo_workflow_uid) + metaflow_ui_url = get_metaflow_url(flow_name, argo_workflow_uid) + argo_ui_url = get_argo_url( argo_workflow_name, kubernetes_namespace, argo_workflow_uid ) obj.echo(f"Metaflow run_id=*{metaflow_run_id}*\n", fg="magenta") diff --git a/metaflow/plugins/aip/aip_exit_handler.py b/metaflow/plugins/aip/aip_exit_handler.py index a1d1c3f1131..2d35b417e10 100644 --- a/metaflow/plugins/aip/aip_exit_handler.py +++ b/metaflow/plugins/aip/aip_exit_handler.py @@ -2,7 +2,7 @@ from metaflow._vendor import click import logging -from metaflow.plugins.aip import run_id_to_url +from metaflow.plugins.aip import get_argo_url _logger = logging.getLogger(__name__) @@ -71,9 +71,7 @@ def email_notify(send_to): email_body = get_env("METAFLOW_NOTIFY_EMAIL_BODY", "") k8s_namespace = get_env("POD_NAMESPACE", "") - argo_ui_url = run_id_to_url( - argo_workflow_name, k8s_namespace, argo_workflow_uid - ) + argo_ui_url = get_argo_url(argo_workflow_name, k8s_namespace, argo_workflow_uid) body = ( f"status = {status}
\n" f"{argo_ui_url}
\n" diff --git a/metaflow/plugins/aip/aip_udf_exit_handler.py b/metaflow/plugins/aip/aip_udf_exit_handler.py index 9e13007e7bf..ac68ea0ac9c 100644 --- a/metaflow/plugins/aip/aip_udf_exit_handler.py +++ b/metaflow/plugins/aip/aip_udf_exit_handler.py @@ -4,7 +4,7 @@ from metaflow.decorators import flow_decorators, FlowDecorator from metaflow.graph import FlowGraph -from metaflow.plugins.aip import run_id_to_url +from metaflow.plugins.aip import get_argo_url _logger = logging.getLogger(__name__) @@ -44,7 +44,7 @@ def get_env(name, default=None) -> str: argo_workflow_name = get_env("MF_ARGO_WORKFLOW_NAME", "") k8s_namespace = get_env("POD_NAMESPACE", "") - argo_ui_url = run_id_to_url(argo_workflow_name, k8s_namespace, argo_workflow_uid) + argo_ui_url = get_argo_url(argo_workflow_name, k8s_namespace, argo_workflow_uid) metaflow_configs: Dict[str, str] = json.loads(metaflow_configs_json) metaflow_configs_new: Dict[str, str] = { diff --git a/metaflow/plugins/aip/argo_utils.py b/metaflow/plugins/aip/argo_utils.py index 586dcb7851b..4eaa5fbd0d5 100644 --- a/metaflow/plugins/aip/argo_utils.py +++ b/metaflow/plugins/aip/argo_utils.py @@ -1,7 +1,7 @@ import math import time import datetime -from typing import Optional, Union, Dict, Any, Tuple +from typing import Optional, Union, Dict, Any, Tuple, Callable from metaflow.metaflow_config import ARGO_RUN_URL_PREFIX, METAFLOW_RUN_URL_PREFIX from metaflow.plugins.aip.argo_client import ArgoClient @@ -12,223 +12,282 @@ logger = _get_aip_logger() -def run_argo_workflow( - kubernetes_namespace: str, - template_name: Optional[str] = None, - project_name: Optional[str] = None, - branch_name: Optional[str] = None, - template_name_prefix: Optional[str] = None, - parameters: Optional[dict] = None, - wait_timeout: Union[int, float, datetime.timedelta] = 0, - **kwarg, # Other parameters for wait function -) -> Tuple[str, str]: - """ - Using template_name to trigger a workflow template name with exact match. - - If no template_name is provided, the latest workflow template satisfying - the project_name, branch_name and template_name_prefix will be used. - All of these filters are optional. If not provided, the latest workflow - from the namespace will be used. - """ - client = ArgoClient(namespace=kubernetes_namespace) - - if not template_name: - template_name = get_latest_workflow( - kubernetes_namespace, +class ArgoHelper: + def __init__(self, kubernetes_namespace: str): + """ + Args: + kubernetes_namespace: Namespace where Argo is running. + Required as the defaults provided in the ArgoClient is usually not what customers desire. + TODO: This namespace can be default to the current namespace if the script is ran within a cluster. + """ + self._client = ArgoClient(namespace=kubernetes_namespace) + + def template_submit( + self, + template_name: Optional[str] = None, + parameters: Optional[dict] = None, + wait_timeout: Union[int, float, datetime.timedelta] = 0, + **kwarg, + ) -> Tuple[str, str]: + """ + Args: + template_name: Name of the workflow template to trigger. + parameters: Parameters to pass to the workflow template. + wait_timeout: Time to wait for the workflow to complete. Set to 0 to skip waiting. + **kwarg: Other parameters for the watch function. See `ArgoHelper.watch`. + """ + + try: + # TODO(talebz): add tag of origin-run-id to correlate parent flow + logger.info(f"Triggering workflow template: {template_name}") + workflow_manifest: Dict[str, Any] = self._client.trigger_workflow_template( + template_name, parameters + ) + except Exception as e: + raise AIPException(str(e)) + + argo_run_id = workflow_manifest["metadata"]["name"] + argo_run_uid = workflow_manifest["metadata"]["uid"] + + if ( + wait_timeout + ): # int, float and datetime.timedelta all evaluates to False when 0 + self.watch( + run_id=argo_run_id, + wait_timeout=wait_timeout, + **kwarg, + ) + + return argo_run_id, argo_run_uid + + def template_submit_latest( + self, + template_prefix: Optional[str] = None, + project_name: Optional[str] = None, + branch_name: Optional[str] = None, + filter_func: Optional[Callable[[Dict[str, Any]], bool]] = None, + parameters: Optional[dict] = None, + wait_timeout: Union[int, float, datetime.timedelta] = 0, + **kwarg, + ) -> Tuple[str, str]: + """ + Args: + template_prefix: Prefix of the template name to match. + project_name: Project name to match. + branch_name: Branch name to match. + filter_func: Custom filter function that is passed template, and should return boolean value + indicating if the template can be used. + parameters: Parameters to pass to the workflow template. + wait_timeout: Time to wait for the workflow to complete. Set to 0 to skip waiting. + **kwarg: Other parameters for the watch function. See `ArgoHelper.watch`. + """ + if not any([template_prefix, project_name, branch_name]): + raise AIPException( + "Running argo workflow with no specified template nor filters can be dangerous. " + "Please set at least one of project_name, branch_name or template_name_prefix." + ) + + template_name: str = self.template_get_latest( project_name=project_name, branch_name=branch_name, - template_name_prefix=template_name_prefix, - ) - - try: - # TODO(talebz): add tag of origin-run-id to correlate parent flow - logger.info(f"Triggering workflow template: {template_name}") - workflow_manifest: Dict[str, Any] = client.trigger_workflow_template( - template_name, parameters + template_name_prefix=template_prefix, + filter_func=filter_func, ) - except Exception as e: - raise AIPException(str(e)) - argo_run_id = workflow_manifest["metadata"]["name"] - argo_run_uid = workflow_manifest["metadata"]["uid"] - - if wait_timeout: # int, float and datetime.timedelta all evaluates to False when 0 - wait_for_argo_run_completion( - run_id=argo_run_id, - kubernetes_namespace=kubernetes_namespace, + return self.template_submit( + template_name=template_name, + parameters=parameters, wait_timeout=wait_timeout, **kwarg, ) - return argo_run_id, argo_run_uid + def template_get_latest( + self, + template_name_prefix: Optional[str] = None, + project_name: Optional[str] = None, + branch_name: Optional[str] = None, + filter_func: Optional[Callable[[Dict[str, Any]], bool]] = None, + name_only: bool = True, + ) -> Union[str, Dict[str, Any]]: + """ + Args: + template_name_prefix: Prefix of the template name to match. + project_name: Project name to match. + branch_name: Branch name to match. + filter_func: Custom filter function that is passed template, and should return boolean value + indicating if the template can be used. + name_only: Whether to return only the name of the template or the full manifest. Defaults to True. + + Returns: + The name of the latest workflow template, or the full manifest if name_only is set to False. + """ + # TODO: + # - Add filter by project_id instead of project name - project_id is not added as a label yet. + # - Add filter by flow_name - flow_name is not added as a label yet. -def get_latest_workflow( - kubernetes_namespace: str, - project_name: Optional[str] = None, - branch_name: Optional[str] = None, - template_name_prefix: Optional[str] = None, -): - # TODO: - # - Add filter by project_id instead of project name - project_id is not added as a label yet. - # - Add filter by flow_name - flow_name is not added as a label yet. - client = ArgoClient(namespace=kubernetes_namespace) - - templates = client.list_workflow_template()["items"] - templates = [ - template - for template in templates - if ( - not project_name - or template["metadata"]["labels"]["gitlab.zgtools.net/project-name"] - == project_name - ) - and ( - not branch_name - or template["metadata"]["labels"]["gitlab.zgtools.net/branch-name"] - == branch_name - ) - and ( - not template_name_prefix - or template["metadata"]["name"].startswith(template_name_prefix) + templates = self._client.list_workflow_template()["items"] + + templates = [ + template + for template in templates + if ( + not project_name + or template["metadata"]["labels"]["gitlab.zgtools.net/project-name"] + == project_name + ) + and ( + not branch_name + or template["metadata"]["labels"]["gitlab.zgtools.net/branch-name"] + == branch_name + ) + and ( + not template_name_prefix + or template["metadata"]["name"].startswith(template_name_prefix) + ) + and (not filter_func or filter_func(template)) + ] + if not templates: + raise AIPException( + f"No workflow template found with constraints " + f"{project_name=}, {branch_name=}, {template_name_prefix=}, {filter_func=}" + ) + + # Sort by creation timestamp to get the latest template. + templates.sort( + key=lambda template: template["metadata"]["creationTimestamp"], reverse=True ) - ] - if not templates: - raise AIPException( - f"No workflow template found with constraints " - f"project_name={project_name}, branch_name={branch_name}, template_name_prefix={template_name_prefix}" + latest_template = templates[0] + latest_template_name = latest_template["metadata"]["name"] + logger.info( + f"Found {len(templates)} WorkflowTemplates. Using latest workflow template: {latest_template_name}" ) - # Sort by creation timestamp to get the latest template. - templates.sort( - key=lambda template: template["metadata"]["creationTimestamp"], reverse=True - ) - template_name = templates[1]["metadata"]["name"] - logger.info( - f"Found {len(templates)} WorkflowTemplates. Using latest workflow template: {template_name}" - ) - return template_name + if name_only: + return latest_template_name + else: + return latest_template + + def template_delete( + self, + template_name: str, + ): + try: + self._client.delete_workflow_template(template_name) + except Exception as e: + raise AIPException(str(e)) + + def watch( + self, + run_id: str, + wait_timeout: Union[int, float, datetime.timedelta] = 0, + min_check_delay: int = 10, + max_check_delay: int = 30, + assert_success: bool = True, + ) -> str: + """Check for Argo run status. Returns status as a string. + + If timeout (in second) is positive this function waits for flow to complete. + Raise timeout if run is not finished after seconds + - finished or not (bool) + - success or not (bool) + - run info (kfp_server_api.ApiRun) + + Status check frequency will be close to min_check_delay for the first 11 minutes, + and gradually approaches max_check_delay after 23 minutes. + + A close mimic to async is to use _get_kfp_run above. + Implementation for async is not prioritized until specifically requested. + + TODO(yunw)(AIP-5671): Async version + """ -def delete_argo_workflow( - kubernetes_namespace: str, - template_name: str, -): - try: - ArgoClient(namespace=kubernetes_namespace).delete_workflow_template( - template_name - ) - except Exception as e: - raise AIPException(str(e)) + def get_delay(secs_since_start, min_delay, max_delay): + """ + this sigmoid function reaches + - 0.1 after 11 minutes + - 0.5 after 15 minutes + - 1.0 after 23 minutes + in other words, the delay is close to min_delay during the first 10 minutes + """ + sigmoid = 1.0 / (1.0 + math.exp(-0.01 * secs_since_start + 9.0)) + return min_delay + sigmoid * (max_delay - min_delay) + run_status: str = self._client.get_workflow_run_status(run_id) -def to_metaflow_run_id(argo_run_uid: str): + if not self._is_finished_run(run_status) and wait_timeout: + if isinstance(wait_timeout, datetime.timedelta): + wait_timeout = wait_timeout.total_seconds() + + # A mimic of aip.Client.wait_for_run_completion with customized logging + logger.info( + f"Waiting for workflow {run_id} to finish. Timeout: {wait_timeout} second(s)" + ) + start_time = datetime.datetime.now() + while not self._is_finished_run(run_status): + elapsed_time = (datetime.datetime.now() - start_time).total_seconds() + logger.info( + f"Waiting for the workflow {run_id} to complete... {elapsed_time:.2f}s / {wait_timeout}s" + ) + if elapsed_time > wait_timeout: + raise TimeoutError( + f"Timeout while waiting for workflow {run_id} to finish." + ) + time.sleep( + get_delay( + elapsed_time, + min_delay=min_check_delay, + max_delay=max_check_delay, + ) + ) + run_status = self._client.get_workflow_run_status(run_id) + + if assert_success: + self._assert_run_success(run_id, run_status) + + return run_status + + @staticmethod + def _is_finished_run(run_status: Optional[str]): + return run_status and run_status.lower() in [ + "succeeded", + "failed", + "skipped", + "error", + ] + + @staticmethod + def _assert_run_success(run_id: str, runs_status: str): + if runs_status is None: + # None status usually occurs when run is recently started and has not been scheduled + # Raise different error, allowing user to catch them. + raise ValueError( + f"Run {run_id=} status not available. Run might not have been scheduled." + ) + else: + assert ( + runs_status.lower() == "succeeded" + ), f"Run {run_id=} finished with non-successful state {runs_status}." + + +def get_metaflow_run_id(argo_run_uid: str): """Return metaflow run id useful for querying metadata, datastore, log etc.""" return ( f"argo-{argo_run_uid}" if not argo_run_uid.startswith("argo-") else argo_run_uid ) -def run_id_to_url(argo_run_id: str, kubernetes_namespace: str, argo_workflow_uid: str): +def get_argo_url( + argo_run_id: str, + kubernetes_namespace: str, + argo_workflow_uid: str, +): argo_ui_url = f"{ARGO_RUN_URL_PREFIX}/argo-ui/workflows/{kubernetes_namespace}/{argo_run_id}?uid={argo_workflow_uid}" return argo_ui_url -def run_id_to_metaflow_url(flow_name: str, argo_run_id: str): +def get_metaflow_url(flow_name: str, argo_run_id: str): metaflow_ui_url = ( - f"{METAFLOW_RUN_URL_PREFIX}/{flow_name}/{to_metaflow_run_id(argo_run_id)}" + f"{METAFLOW_RUN_URL_PREFIX}/{flow_name}/{get_metaflow_run_id(argo_run_id)}" ) return metaflow_ui_url - - -def _is_finished_run(run_status: Optional[str]): - return run_status and run_status.lower() in [ - "succeeded", - "failed", - "skipped", - "error", - ] - - -def _assert_run_success(run_id: str, runs_status: str): - if runs_status is None: - # None status usually occurs when run is recently started and has not been scheduled - # Raise different error, allowing user to catch them. - raise ValueError( - f"Run {run_id=} status not available. Run might not have been scheduled." - ) - else: - assert ( - runs_status.lower() == "succeeded" - ), f"Run {run_id=} finished with non-successful state {runs_status}." - - -def wait_for_argo_run_completion( - run_id: str, - kubernetes_namespace: str, - wait_timeout: Union[int, float, datetime.timedelta] = 0, - min_check_delay: int = 10, - max_check_delay: int = 30, - assert_success: bool = True, -) -> str: - """Check for Argo run status. Returns status as a string. - - If timeout (in second) is positive this function waits for flow to complete. - Raise timeout if run is not finished after seconds - - finished or not (bool) - - success or not (bool) - - run info (kfp_server_api.ApiRun) - - Status check frequency will be close to min_check_delay for the first 11 minutes, - and gradually approaches max_check_delay after 23 minutes. - - A close mimic to async is to use _get_kfp_run above. - Implementation for async is not prioritized until specifically requested. - - TODO(yunw)(AIP-5671): Async version - """ - - def get_delay(secs_since_start, min_delay, max_delay): - """ - this sigmoid function reaches - - 0.1 after 11 minutes - - 0.5 after 15 minutes - - 1.0 after 23 minutes - in other words, the delay is close to min_delay during the first 10 minutes - """ - sigmoid = 1.0 / (1.0 + math.exp(-0.01 * secs_since_start + 9.0)) - return min_delay + sigmoid * (max_delay - min_delay) - - client = ArgoClient( - namespace=kubernetes_namespace, - ) - run_status: str = client.get_workflow_run_status(run_id) - - if not _is_finished_run(run_status) and wait_timeout: - if isinstance(wait_timeout, datetime.timedelta): - wait_timeout = wait_timeout.total_seconds() - - # A mimic of aip.Client.wait_for_run_completion with customized logging - logger.info( - f"Waiting for workflow {run_id} to finish. Timeout: {wait_timeout} second(s)" - ) - start_time = datetime.datetime.now() - while not _is_finished_run(run_status): - elapsed_time = (datetime.datetime.now() - start_time).total_seconds() - logger.info( - f"Waiting for the workflow {run_id} to complete... {elapsed_time:.2f}s / {wait_timeout}s" - ) - if elapsed_time > wait_timeout: - raise TimeoutError( - f"Timeout while waiting for workflow {run_id} to finish." - ) - time.sleep( - get_delay( - elapsed_time, min_delay=min_check_delay, max_delay=max_check_delay - ) - ) - run_status = client.get_workflow_run_status(run_id) - - if assert_success: - _assert_run_success(run_id, run_status) - - return run_status diff --git a/metaflow/plugins/aip/tests/flows/flow_triggering_flow.py b/metaflow/plugins/aip/tests/flows/flow_triggering_flow.py index 662fbffd332..209c6bb4155 100644 --- a/metaflow/plugins/aip/tests/flows/flow_triggering_flow.py +++ b/metaflow/plugins/aip/tests/flows/flow_triggering_flow.py @@ -9,11 +9,9 @@ from metaflow import FlowSpec, Parameter, Step, current, step from metaflow.metaflow_config import KUBERNETES_NAMESPACE from metaflow.plugins.aip import ( - run_id_to_url, - run_argo_workflow, - wait_for_argo_run_completion, - delete_argo_workflow, - to_metaflow_run_id, + ArgoHelper, + get_argo_url, + get_metaflow_run_id, ) from metaflow.plugins.aip import logger @@ -104,7 +102,8 @@ def end(self): """Trigger downstream pipeline and test triggering behaviors""" if self.trigger_enabled: logger.info("\nTesting run_kubeflow_pipeline") - run_id, run_uid = run_argo_workflow( + argo_helper = ArgoHelper(KUBERNETES_NAMESPACE) + run_id, run_uid = argo_helper.template_submit( KUBERNETES_NAMESPACE, self.template_name, parameters={ @@ -113,13 +112,11 @@ def end(self): }, ) logger.info(f"{run_id=}, {run_uid=}") - logger.info(f"{run_id_to_url(run_id, KUBERNETES_NAMESPACE, run_uid)=}") + logger.info(f"{get_argo_url(run_id, KUBERNETES_NAMESPACE, run_uid)=}") logger.info("Testing timeout exception for wait_for_kfp_run_completion") try: - wait_for_argo_run_completion( - run_id, KUBERNETES_NAMESPACE, wait_timeout=0.1 - ) + argo_helper.watch(run_id, wait_timeout=0.1) except TimeoutError: logger.error( "Timeout before flow ends throws timeout exception correctly" @@ -128,20 +125,20 @@ def end(self): raise AssertionError("Timeout error not thrown as expected.") logger.info("Test wait_for_kfp_run_completion without triggering timeout") - status: str = wait_for_argo_run_completion( - run_id, KUBERNETES_NAMESPACE, wait_timeout=datetime.timedelta(minutes=3) + status: str = argo_helper.watch( + run_id, wait_timeout=datetime.timedelta(minutes=3) ) logger.info(f"Run Status of {run_id}: {status=}") - metaflow_run_id: str = to_metaflow_run_id(run_uid) + metaflow_run_id: str = get_metaflow_run_id(run_uid) logger.info(f"Test triggered_by is passed correctly") metaflow_path = f"{current.flow_name}/{metaflow_run_id}/start" _retry_sleep(self.assert_task_triggered_by, metaflow_path=metaflow_path) logger.info(f"Deleting {self.template_name=}") - delete_argo_workflow(KUBERNETES_NAMESPACE, self.template_name) + argo_helper.template_delete(self.template_name) else: logger.info(f"{self.trigger_enabled=}") From d659f5280d159fc5e83334d48ea4564bfc9617f0 Mon Sep 17 00:00:00 2001 From: Yun Wu Date: Tue, 13 Aug 2024 20:55:31 -0700 Subject: [PATCH 03/12] bugfix: labels might not exists --- metaflow/plugins/aip/argo_utils.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/metaflow/plugins/aip/argo_utils.py b/metaflow/plugins/aip/argo_utils.py index 4eaa5fbd0d5..49bc3f36571 100644 --- a/metaflow/plugins/aip/argo_utils.py +++ b/metaflow/plugins/aip/argo_utils.py @@ -30,6 +30,9 @@ def template_submit( **kwarg, ) -> Tuple[str, str]: """ + The template that matches the exact template_name will be triggered. + This function optionally waits for the workflow to complete. + Args: template_name: Name of the workflow template to trigger. parameters: Parameters to pass to the workflow template. @@ -71,6 +74,9 @@ def template_submit_latest( **kwarg, ) -> Tuple[str, str]: """ + This function triggers the latest workflow template that matches the specified filters. + This function optionally waits for the workflow to complete. + Args: template_prefix: Prefix of the template name to match. project_name: Project name to match. @@ -133,12 +139,12 @@ def template_get_latest( for template in templates if ( not project_name - or template["metadata"]["labels"]["gitlab.zgtools.net/project-name"] + or template["metadata"]["labels"].get("gitlab.zgtools.net/project-name") == project_name ) and ( not branch_name - or template["metadata"]["labels"]["gitlab.zgtools.net/branch-name"] + or template["metadata"]["labels"].get("gitlab.zgtools.net/branch-name") == branch_name ) and ( From 446606ef27eb766cb28b452679cbc4cf5f68c1cc Mon Sep 17 00:00:00 2001 From: Yun Wu Date: Tue, 13 Aug 2024 21:01:48 -0700 Subject: [PATCH 04/12] More doc str update --- metaflow/plugins/aip/argo_utils.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/metaflow/plugins/aip/argo_utils.py b/metaflow/plugins/aip/argo_utils.py index 49bc3f36571..4526f54d13d 100644 --- a/metaflow/plugins/aip/argo_utils.py +++ b/metaflow/plugins/aip/argo_utils.py @@ -202,15 +202,17 @@ def watch( Status check frequency will be close to min_check_delay for the first 11 minutes, and gradually approaches max_check_delay after 23 minutes. - A close mimic to async is to use _get_kfp_run above. - Implementation for async is not prioritized until specifically requested. - - TODO(yunw)(AIP-5671): Async version + Args: + run_id: Argo workflow run id. + wait_timeout: Wait timeout in seconds or datetime.timedelta. + min_check_delay: Minimum check delay in seconds. + max_check_delay: Maximum check delay in seconds. + assert_success: Whether to throw exception if run is not successful. """ def get_delay(secs_since_start, min_delay, max_delay): """ - this sigmoid function reaches + This sigmoid function reaches - 0.1 after 11 minutes - 0.5 after 15 minutes - 1.0 after 23 minutes From e2bd8be82f9bdddf0196da71ad20a0500fd7cfcf Mon Sep 17 00:00:00 2001 From: Yun Wu Date: Tue, 13 Aug 2024 21:23:51 -0700 Subject: [PATCH 05/12] Fix integration test --- metaflow/plugins/aip/tests/flows/flow_triggering_flow.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/metaflow/plugins/aip/tests/flows/flow_triggering_flow.py b/metaflow/plugins/aip/tests/flows/flow_triggering_flow.py index 209c6bb4155..26b16cdf84b 100644 --- a/metaflow/plugins/aip/tests/flows/flow_triggering_flow.py +++ b/metaflow/plugins/aip/tests/flows/flow_triggering_flow.py @@ -104,15 +104,14 @@ def end(self): logger.info("\nTesting run_kubeflow_pipeline") argo_helper = ArgoHelper(KUBERNETES_NAMESPACE) run_id, run_uid = argo_helper.template_submit( - KUBERNETES_NAMESPACE, - self.template_name, + template_name=self.template_name, parameters={ "trigger_enabled": False, "triggered_by": current.run_id, }, ) logger.info(f"{run_id=}, {run_uid=}") - logger.info(f"{get_argo_url(run_id, KUBERNETES_NAMESPACE, run_uid)=}") + logger.info(f"{get_argo_url(run_id, run_uid)=}") logger.info("Testing timeout exception for wait_for_kfp_run_completion") try: From a783eb5c94e6b55e228a579ce28c9f1279e53d1a Mon Sep 17 00:00:00 2001 From: Yun Wu Date: Wed, 14 Aug 2024 10:13:42 -0700 Subject: [PATCH 06/12] rename template trigger functions --- metaflow/plugins/aip/argo_utils.py | 14 +++++++------- .../aip/tests/flows/flow_triggering_flow.py | 2 +- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/metaflow/plugins/aip/argo_utils.py b/metaflow/plugins/aip/argo_utils.py index 4526f54d13d..1808a080ffb 100644 --- a/metaflow/plugins/aip/argo_utils.py +++ b/metaflow/plugins/aip/argo_utils.py @@ -22,7 +22,7 @@ def __init__(self, kubernetes_namespace: str): """ self._client = ArgoClient(namespace=kubernetes_namespace) - def template_submit( + def trigger( self, template_name: Optional[str] = None, parameters: Optional[dict] = None, @@ -30,8 +30,8 @@ def template_submit( **kwarg, ) -> Tuple[str, str]: """ - The template that matches the exact template_name will be triggered. - This function optionally waits for the workflow to complete. + Trigger an existing workflow template. + Optionally this function can wait (blocking) for the workflow to complete. Args: template_name: Name of the workflow template to trigger. @@ -63,7 +63,7 @@ def template_submit( return argo_run_id, argo_run_uid - def template_submit_latest( + def trigger_latest( self, template_prefix: Optional[str] = None, project_name: Optional[str] = None, @@ -74,8 +74,8 @@ def template_submit_latest( **kwarg, ) -> Tuple[str, str]: """ - This function triggers the latest workflow template that matches the specified filters. - This function optionally waits for the workflow to complete. + Trigger the latest workflow template that matches the specified filters. + Optionally this function can wait (blocking) for the workflow to complete. Args: template_prefix: Prefix of the template name to match. @@ -100,7 +100,7 @@ def template_submit_latest( filter_func=filter_func, ) - return self.template_submit( + return self.trigger( template_name=template_name, parameters=parameters, wait_timeout=wait_timeout, diff --git a/metaflow/plugins/aip/tests/flows/flow_triggering_flow.py b/metaflow/plugins/aip/tests/flows/flow_triggering_flow.py index 26b16cdf84b..6208e8ae03f 100644 --- a/metaflow/plugins/aip/tests/flows/flow_triggering_flow.py +++ b/metaflow/plugins/aip/tests/flows/flow_triggering_flow.py @@ -103,7 +103,7 @@ def end(self): if self.trigger_enabled: logger.info("\nTesting run_kubeflow_pipeline") argo_helper = ArgoHelper(KUBERNETES_NAMESPACE) - run_id, run_uid = argo_helper.template_submit( + run_id, run_uid = argo_helper.trigger( template_name=self.template_name, parameters={ "trigger_enabled": False, From 6c5e60f6f034576decefe9e8ecd70f26f8f2edb8 Mon Sep 17 00:00:00 2001 From: Yun Wu Date: Wed, 14 Aug 2024 14:48:11 -0700 Subject: [PATCH 07/12] Add unit tests & bug fixes --- metaflow/metaflow_config.py | 4 +- metaflow/plugins/aip/aip.py | 5 +- metaflow/plugins/aip/argo_utils.py | 45 ++++-- .../aip/tests/flows/flow_triggering_flow.py | 140 +++++++++++++----- 4 files changed, 132 insertions(+), 62 deletions(-) diff --git a/metaflow/metaflow_config.py b/metaflow/metaflow_config.py index e12877ce453..6d335505ee3 100644 --- a/metaflow/metaflow_config.py +++ b/metaflow/metaflow_config.py @@ -151,8 +151,8 @@ def from_conf(name, default=None): # Note: `ARGO_RUN_URL_PREFIX` is the URL prefix for ARGO runs on your ARGO cluster. The prefix includes # all parts of the URL except the run_id at the end which we append once the run is created. # For eg, this would look like: "https:///argo-ui/workflows/ -ARGO_RUN_URL_PREFIX = from_conf("ARGO_RUN_URL_PREFIX", "") -METAFLOW_RUN_URL_PREFIX = from_conf("METAFLOW_RUN_URL_PREFIX", "") +ARGO_RUN_URL_PREFIX = from_conf("ARGO_RUN_URL_PREFIX", "").rstrip("/") +METAFLOW_RUN_URL_PREFIX = from_conf("METAFLOW_RUN_URL_PREFIX", "").rstrip("/") AIP_MAX_PARALLELISM = int(from_conf("AIP_MAX_PARALLELISM", 10)) AIP_MAX_RUN_CONCURRENCY = int(from_conf("AIP_MAX_RUN_CONCURRENCY", 10)) AIP_SHOW_METAFLOW_UI_URL = bool(from_conf("AIP_SHOW_METAFLOW_UI_URL", False)) diff --git a/metaflow/plugins/aip/aip.py b/metaflow/plugins/aip/aip.py index 1d81a697e65..04b7af03bb5 100644 --- a/metaflow/plugins/aip/aip.py +++ b/metaflow/plugins/aip/aip.py @@ -291,10 +291,9 @@ def _create_workflow_yaml( # Note the name has to follow k8s format. # self.name is typically CamelCase as it's python class name. # generateName contains a sanitized version of self.name from aip.compiler + default_workflow_name = workflow["metadata"].pop("generateName").rstrip("-") workflow["metadata"]["name"] = ( - sanitize_k8s_name(name) - if name - else workflow["metadata"].pop("generateName").rstrip("-") + sanitize_k8s_name(name) if name else default_workflow_name ) # Service account is added through webhooks. diff --git a/metaflow/plugins/aip/argo_utils.py b/metaflow/plugins/aip/argo_utils.py index 1808a080ffb..44c8e23520a 100644 --- a/metaflow/plugins/aip/argo_utils.py +++ b/metaflow/plugins/aip/argo_utils.py @@ -3,7 +3,11 @@ import datetime from typing import Optional, Union, Dict, Any, Tuple, Callable -from metaflow.metaflow_config import ARGO_RUN_URL_PREFIX, METAFLOW_RUN_URL_PREFIX +from metaflow.metaflow_config import ( + ARGO_RUN_URL_PREFIX, + METAFLOW_RUN_URL_PREFIX, + KUBERNETES_NAMESPACE, +) from metaflow.plugins.aip.argo_client import ArgoClient from metaflow.plugins.aip.aip_decorator import AIPException from metaflow.plugins.aip.aip_utils import _get_aip_logger @@ -13,7 +17,7 @@ class ArgoHelper: - def __init__(self, kubernetes_namespace: str): + def __init__(self, kubernetes_namespace: str = KUBERNETES_NAMESPACE): """ Args: kubernetes_namespace: Namespace where Argo is running. @@ -68,6 +72,7 @@ def trigger_latest( template_prefix: Optional[str] = None, project_name: Optional[str] = None, branch_name: Optional[str] = None, + flow_name: Optional[str] = None, filter_func: Optional[Callable[[Dict[str, Any]], bool]] = None, parameters: Optional[dict] = None, wait_timeout: Union[int, float, datetime.timedelta] = 0, @@ -87,16 +92,11 @@ def trigger_latest( wait_timeout: Time to wait for the workflow to complete. Set to 0 to skip waiting. **kwarg: Other parameters for the watch function. See `ArgoHelper.watch`. """ - if not any([template_prefix, project_name, branch_name]): - raise AIPException( - "Running argo workflow with no specified template nor filters can be dangerous. " - "Please set at least one of project_name, branch_name or template_name_prefix." - ) - template_name: str = self.template_get_latest( project_name=project_name, branch_name=branch_name, - template_name_prefix=template_prefix, + template_prefix=template_prefix, + flow_name=flow_name, filter_func=filter_func, ) @@ -109,17 +109,19 @@ def trigger_latest( def template_get_latest( self, - template_name_prefix: Optional[str] = None, + template_prefix: Optional[str] = None, project_name: Optional[str] = None, branch_name: Optional[str] = None, + flow_name: Optional[str] = None, filter_func: Optional[Callable[[Dict[str, Any]], bool]] = None, name_only: bool = True, ) -> Union[str, Dict[str, Any]]: """ Args: - template_name_prefix: Prefix of the template name to match. + template_prefix: Prefix of the template name to match. project_name: Project name to match. branch_name: Branch name to match. + flow_name: Flow name to match. filter_func: Custom filter function that is passed template, and should return boolean value indicating if the template can be used. name_only: Whether to return only the name of the template or the full manifest. Defaults to True. @@ -127,10 +129,16 @@ def template_get_latest( Returns: The name of the latest workflow template, or the full manifest if name_only is set to False. """ + if not any( + [template_prefix, project_name, branch_name, flow_name, filter_func] + ): + raise AIPException( + "Finding latest argo workflow with no specified filters risks picking up unexpected templates. " + "Please set at least one of project_name, branch_name, template_prefix, flow_name or filter_func." + ) # TODO: # - Add filter by project_id instead of project name - project_id is not added as a label yet. - # - Add filter by flow_name - flow_name is not added as a label yet. templates = self._client.list_workflow_template()["items"] @@ -139,7 +147,7 @@ def template_get_latest( for template in templates if ( not project_name - or template["metadata"]["labels"].get("gitlab.zgtools.net/project-name") + or template["metadata"]["labels"].get("metaflow.org/tag_project-name") == project_name ) and ( @@ -148,15 +156,20 @@ def template_get_latest( == branch_name ) and ( - not template_name_prefix - or template["metadata"]["name"].startswith(template_name_prefix) + not flow_name + or template["metadata"]["labels"].get("metaflow.org/flow_name") + == flow_name + ) + and ( + not template_prefix + or template["metadata"]["name"].startswith(template_prefix) ) and (not filter_func or filter_func(template)) ] if not templates: raise AIPException( f"No workflow template found with constraints " - f"{project_name=}, {branch_name=}, {template_name_prefix=}, {filter_func=}" + f"{project_name=}, {branch_name=}, {template_prefix=}, {filter_func=}" ) # Sort by creation timestamp to get the latest template. diff --git a/metaflow/plugins/aip/tests/flows/flow_triggering_flow.py b/metaflow/plugins/aip/tests/flows/flow_triggering_flow.py index 6208e8ae03f..b2c7497c376 100644 --- a/metaflow/plugins/aip/tests/flows/flow_triggering_flow.py +++ b/metaflow/plugins/aip/tests/flows/flow_triggering_flow.py @@ -59,41 +59,35 @@ def start(self): # and template_name == 'wfdsk-ftf-test-he5d4--6rhai0z0wiysuew' # where aip _create_workflow_yaml() calls sanitize_k8s_name() which returns # 'wfdsk-ftf-test-he5d4-6rhai0z0wiysuew' without the double -- - self.template_name = sanitize_k8s_name( - f"{TEST_TEMPLATE_NAME}-{generate_base64_uuid()}".lower() - ) - logger.info(f"Creating workflow: {self.template_name}") - subprocess.run( - [ - "python", - __file__, - "aip", - "create", - "--name", - self.template_name, - "--yaml-only", - "--pipeline-path", - "/tmp/ftf.yaml", - "--kind", - "WorkflowTemplate", - "--max-run-concurrency", - "0", - ], - check=True, - ) - subprocess.run(["cat", "/tmp/ftf.yaml"]) + print(f"{KUBERNETES_NAMESPACE=}") - subprocess.run( - [ - "argo", - "template", - "-n", - KUBERNETES_NAMESPACE, - "create", - "/tmp/ftf.yaml", - ], - check=True, - ) + + self.workflow_template_names = [ + sanitize_k8s_name( + f"{TEST_TEMPLATE_NAME}-{generate_base64_uuid()}".lower() + ) + for _ in range(3) + ] + self.triggered_by_tag = "triggerred-by" + self.index_tag = "template-index" + + for template_name, template_index in enumerate( + self.workflow_template_names + ): + path = f"/tmp/{template_name}.yaml" + logger.info(f"Creating workflow: {template_name}") + self.comiple_workflow( + template_name, + path, + extra_args=[ + "--tag", + f"{self.triggered_by_tag}:{current.run_id}" "--tag", + f"{self.index}:{template_index}", + ], + ) + subprocess.run(["cat", path]) + self.submit_template(path) + time.sleep(1) # Spacing workflow template submission time. self.next(self.end) @@ -101,23 +95,48 @@ def start(self): def end(self): """Trigger downstream pipeline and test triggering behaviors""" if self.trigger_enabled: - logger.info("\nTesting run_kubeflow_pipeline") - argo_helper = ArgoHelper(KUBERNETES_NAMESPACE) + argo_helper = ArgoHelper() + + # ====== Test template filtering ====== + # Test latest template is returned with prefix filter + assert self.workflow_template_names[-1] == argo_helper.template_get_latest( + template_prefix=sanitize_k8s_name(TEST_TEMPLATE_NAME.lower()), + flow_name=current.flow_name, + filter_func=lambda template: template["metadata"]["labels"][ + f"metaflow.org/tag_{self.triggered_by_tag}" + ] + == current.run_id, + ) + + # Test filter func correctly filters + assert self.workflow_template_names[1] == argo_helper.template_get_latest( + template_prefix=sanitize_k8s_name(TEST_TEMPLATE_NAME.lower()), + flow_name=current.flow_name, + filter_func=lambda template: template["metadata"]["labels"][ + f"metaflow.org/tag_{self.triggered_by_tag}" + ] + == current.run_id + and template["metadata"]["labels"][f"metaflow.org/tag_{self.index_tag}"] + == str(1), + ) + + # ====== Test template triggering ====== + logger.info("\n Testing ArgoHelper.trigger") run_id, run_uid = argo_helper.trigger( - template_name=self.template_name, + template_name=self.workflow_template_names[0], parameters={ "trigger_enabled": False, "triggered_by": current.run_id, }, ) logger.info(f"{run_id=}, {run_uid=}") - logger.info(f"{get_argo_url(run_id, run_uid)=}") + logger.info(f"{get_argo_url(run_id, KUBERNETES_NAMESPACE, run_uid)=}") logger.info("Testing timeout exception for wait_for_kfp_run_completion") try: argo_helper.watch(run_id, wait_timeout=0.1) except TimeoutError: - logger.error( + logger.info( "Timeout before flow ends throws timeout exception correctly" ) else: @@ -136,8 +155,10 @@ def end(self): _retry_sleep(self.assert_task_triggered_by, metaflow_path=metaflow_path) - logger.info(f"Deleting {self.template_name=}") - argo_helper.template_delete(self.template_name) + # ====== Clean up test templates ====== + for template_name in self.workflow_template_names: + logger.info(f"Deleting {template_name}") + argo_helper.template_delete(template_name) else: logger.info(f"{self.trigger_enabled=}") @@ -148,6 +169,43 @@ def assert_task_triggered_by(metaflow_path: str): logger.info(f"assert {start_step.task.data.triggered_by=} == {current.run_id=}") assert start_step.task.data.triggered_by == current.run_id + @staticmethod + def comiple_workflow(template_name, path, extra_args=None): + extra_args = extra_args or [] + subprocess.run( + [ + "python", + __file__, + "aip", + "create", + "--name", + template_name, + "--yaml-only", + "--pipeline-path", + path, + "--kind", + "WorkflowTemplate", + "--max-run-concurrency", + "0", + *extra_args, + ], + check=True, + ) + + @staticmethod + def submit_template(path): + subprocess.run( + [ + "argo", + "template", + "-n", + KUBERNETES_NAMESPACE, + "create", + path, + ], + check=True, + ) + if __name__ == "__main__": FlowTriggeringFlow() From cc5c5971750ec65f300f69b1e672fd6f4234101d Mon Sep 17 00:00:00 2001 From: Yun Wu Date: Wed, 14 Aug 2024 15:42:20 -0700 Subject: [PATCH 08/12] bugfix --- .../aip/tests/flows/flow_triggering_flow.py | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/metaflow/plugins/aip/tests/flows/flow_triggering_flow.py b/metaflow/plugins/aip/tests/flows/flow_triggering_flow.py index b2c7497c376..457329748f1 100644 --- a/metaflow/plugins/aip/tests/flows/flow_triggering_flow.py +++ b/metaflow/plugins/aip/tests/flows/flow_triggering_flow.py @@ -71,7 +71,7 @@ def start(self): self.triggered_by_tag = "triggerred-by" self.index_tag = "template-index" - for template_name, template_index in enumerate( + for template_index, template_name in enumerate( self.workflow_template_names ): path = f"/tmp/{template_name}.yaml" @@ -112,12 +112,16 @@ def end(self): assert self.workflow_template_names[1] == argo_helper.template_get_latest( template_prefix=sanitize_k8s_name(TEST_TEMPLATE_NAME.lower()), flow_name=current.flow_name, - filter_func=lambda template: template["metadata"]["labels"][ - f"metaflow.org/tag_{self.triggered_by_tag}" - ] - == current.run_id - and template["metadata"]["labels"][f"metaflow.org/tag_{self.index_tag}"] - == str(1), + filter_func=lambda template: ( + template["metadata"]["labels"][ + f"metaflow.org/tag_{self.triggered_by_tag}" + ] + == current.run_id + and template["metadata"]["labels"][ + f"metaflow.org/tag_{self.index_tag}" + ] + == str(1) + ), ) # ====== Test template triggering ====== From ee923175c80e366888fefc272bce2bead5e9558a Mon Sep 17 00:00:00 2001 From: Yun Wu Date: Mon, 19 Aug 2024 12:18:29 -0700 Subject: [PATCH 09/12] Update function name; revert bugfix --- metaflow/plugins/aip/aip.py | 5 +++-- metaflow/plugins/aip/argo_utils.py | 4 ++-- metaflow/plugins/aip/tests/flows/flow_triggering_flow.py | 2 +- 3 files changed, 6 insertions(+), 5 deletions(-) diff --git a/metaflow/plugins/aip/aip.py b/metaflow/plugins/aip/aip.py index 04b7af03bb5..1d81a697e65 100644 --- a/metaflow/plugins/aip/aip.py +++ b/metaflow/plugins/aip/aip.py @@ -291,9 +291,10 @@ def _create_workflow_yaml( # Note the name has to follow k8s format. # self.name is typically CamelCase as it's python class name. # generateName contains a sanitized version of self.name from aip.compiler - default_workflow_name = workflow["metadata"].pop("generateName").rstrip("-") workflow["metadata"]["name"] = ( - sanitize_k8s_name(name) if name else default_workflow_name + sanitize_k8s_name(name) + if name + else workflow["metadata"].pop("generateName").rstrip("-") ) # Service account is added through webhooks. diff --git a/metaflow/plugins/aip/argo_utils.py b/metaflow/plugins/aip/argo_utils.py index 44c8e23520a..e5321da6d36 100644 --- a/metaflow/plugins/aip/argo_utils.py +++ b/metaflow/plugins/aip/argo_utils.py @@ -26,7 +26,7 @@ def __init__(self, kubernetes_namespace: str = KUBERNETES_NAMESPACE): """ self._client = ArgoClient(namespace=kubernetes_namespace) - def trigger( + def trigger_exact( self, template_name: Optional[str] = None, parameters: Optional[dict] = None, @@ -100,7 +100,7 @@ def trigger_latest( filter_func=filter_func, ) - return self.trigger( + return self.trigger_exact( template_name=template_name, parameters=parameters, wait_timeout=wait_timeout, diff --git a/metaflow/plugins/aip/tests/flows/flow_triggering_flow.py b/metaflow/plugins/aip/tests/flows/flow_triggering_flow.py index 457329748f1..b6fc454c48f 100644 --- a/metaflow/plugins/aip/tests/flows/flow_triggering_flow.py +++ b/metaflow/plugins/aip/tests/flows/flow_triggering_flow.py @@ -126,7 +126,7 @@ def end(self): # ====== Test template triggering ====== logger.info("\n Testing ArgoHelper.trigger") - run_id, run_uid = argo_helper.trigger( + run_id, run_uid = argo_helper.trigger_exact( template_name=self.workflow_template_names[0], parameters={ "trigger_enabled": False, From 68c4a7d9da41ce938afccf9aba1e14ee31ed715c Mon Sep 17 00:00:00 2001 From: Yun Wu Date: Mon, 19 Aug 2024 12:23:23 -0700 Subject: [PATCH 10/12] Fix integration test --- metaflow/plugins/aip/argo_utils.py | 3 +- .../aip/tests/flows/flow_triggering_flow.py | 49 +++++++++++-------- 2 files changed, 30 insertions(+), 22 deletions(-) diff --git a/metaflow/plugins/aip/argo_utils.py b/metaflow/plugins/aip/argo_utils.py index e5321da6d36..639cb2f10b0 100644 --- a/metaflow/plugins/aip/argo_utils.py +++ b/metaflow/plugins/aip/argo_utils.py @@ -123,7 +123,8 @@ def template_get_latest( branch_name: Branch name to match. flow_name: Flow name to match. filter_func: Custom filter function that is passed template, and should return boolean value - indicating if the template can be used. + indicating if the template can be used. When writing the filter function, please be mindful that + the labels or annotations may not be present in all the template. name_only: Whether to return only the name of the template or the full manifest. Defaults to True. Returns: diff --git a/metaflow/plugins/aip/tests/flows/flow_triggering_flow.py b/metaflow/plugins/aip/tests/flows/flow_triggering_flow.py index b6fc454c48f..55891056df2 100644 --- a/metaflow/plugins/aip/tests/flows/flow_triggering_flow.py +++ b/metaflow/plugins/aip/tests/flows/flow_triggering_flow.py @@ -46,13 +46,17 @@ def _retry_sleep(func: Callable, count=3, seconds=1, **kwargs): class FlowTriggeringFlow(FlowSpec): # Avoid infinite self trigger trigger_enabled: bool = Parameter("trigger_enabled", default=True) - triggered_by: str = Parameter(name="triggered_by", default=None) + + # Function as a test ID for checks and resource cleanups. + parent_workflow: str = Parameter(name="parent_workflow", default=None) @step def start(self): """Upload a downstream pipeline to be triggered""" - if self.triggered_by: - logger.info(f"This flow is triggered by run {self.triggered_by}") + if self.parent_workflow: + logger.info( + f"This workflow is triggered by workflow {self.parent_workflow}" + ) if self.trigger_enabled: # Upload pipeline # for the case where generate_base64_uuid returns a string starting with '-' @@ -68,7 +72,7 @@ def start(self): ) for _ in range(3) ] - self.triggered_by_tag = "triggerred-by" + self.parent_tag = "parent-workflow" self.index_tag = "template-index" for template_index, template_name in enumerate( @@ -81,7 +85,8 @@ def start(self): path, extra_args=[ "--tag", - f"{self.triggered_by_tag}:{current.run_id}" "--tag", + f"{self.parent_tag}:{current.run_id}", + "--tag", f"{self.index}:{template_index}", ], ) @@ -97,29 +102,30 @@ def end(self): if self.trigger_enabled: argo_helper = ArgoHelper() + template_prefix = sanitize_k8s_name(TEST_TEMPLATE_NAME.lower()) # ====== Test template filtering ====== # Test latest template is returned with prefix filter assert self.workflow_template_names[-1] == argo_helper.template_get_latest( - template_prefix=sanitize_k8s_name(TEST_TEMPLATE_NAME.lower()), + template_prefix=template_prefix, flow_name=current.flow_name, - filter_func=lambda template: template["metadata"]["labels"][ - f"metaflow.org/tag_{self.triggered_by_tag}" - ] + filter_func=lambda template: template["metadata"]["labels"].get( + f"metaflow.org/tag_{self.parent_tag}" + ) == current.run_id, ) # Test filter func correctly filters assert self.workflow_template_names[1] == argo_helper.template_get_latest( - template_prefix=sanitize_k8s_name(TEST_TEMPLATE_NAME.lower()), + template_prefix=template_prefix, flow_name=current.flow_name, filter_func=lambda template: ( - template["metadata"]["labels"][ - f"metaflow.org/tag_{self.triggered_by_tag}" - ] + template["metadata"]["labels"].get( + f"metaflow.org/tag_{self.parent_tag}" + ) == current.run_id - and template["metadata"]["labels"][ + and template["metadata"]["labels"].get( f"metaflow.org/tag_{self.index_tag}" - ] + ) == str(1) ), ) @@ -130,7 +136,7 @@ def end(self): template_name=self.workflow_template_names[0], parameters={ "trigger_enabled": False, - "triggered_by": current.run_id, + "parent_workflow": current.run_id, }, ) logger.info(f"{run_id=}, {run_uid=}") @@ -154,10 +160,10 @@ def end(self): logger.info(f"Run Status of {run_id}: {status=}") metaflow_run_id: str = get_metaflow_run_id(run_uid) - logger.info(f"Test triggered_by is passed correctly") + logger.info(f"Test parent_workflow flow is passed correctly") metaflow_path = f"{current.flow_name}/{metaflow_run_id}/start" - _retry_sleep(self.assert_task_triggered_by, metaflow_path=metaflow_path) + _retry_sleep(self.assert_parent_workflow, metaflow_path=metaflow_path) # ====== Clean up test templates ====== for template_name in self.workflow_template_names: @@ -167,11 +173,12 @@ def end(self): logger.info(f"{self.trigger_enabled=}") @staticmethod - def assert_task_triggered_by(metaflow_path: str): + def assert_parent_workflow(metaflow_path: str): logger.info(f"fetching start step {metaflow_path}") start_step = Step(metaflow_path) - logger.info(f"assert {start_step.task.data.triggered_by=} == {current.run_id=}") - assert start_step.task.data.triggered_by == current.run_id + parent_workflow = start_step.task.data.parent_workflow + logger.info(f"assert {parent_workflow=} == {current.run_id=}") + assert parent_workflow == current.run_id @staticmethod def comiple_workflow(template_name, path, extra_args=None): From d334e42bee818d2855e45e395bd102a9cbc9c558 Mon Sep 17 00:00:00 2001 From: Yun Wu Date: Mon, 19 Aug 2024 15:05:22 -0700 Subject: [PATCH 11/12] More fix to the tests --- metaflow/plugins/aip/tests/flows/flow_triggering_flow.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/metaflow/plugins/aip/tests/flows/flow_triggering_flow.py b/metaflow/plugins/aip/tests/flows/flow_triggering_flow.py index 55891056df2..b1113ef101b 100644 --- a/metaflow/plugins/aip/tests/flows/flow_triggering_flow.py +++ b/metaflow/plugins/aip/tests/flows/flow_triggering_flow.py @@ -104,7 +104,7 @@ def end(self): template_prefix = sanitize_k8s_name(TEST_TEMPLATE_NAME.lower()) # ====== Test template filtering ====== - # Test latest template is returned with prefix filter + logger.info("\n Testing ArgoHelper.template_get_latest") assert self.workflow_template_names[-1] == argo_helper.template_get_latest( template_prefix=template_prefix, flow_name=current.flow_name, @@ -114,7 +114,7 @@ def end(self): == current.run_id, ) - # Test filter func correctly filters + logger.info("\n Test filter func correctly filters") assert self.workflow_template_names[1] == argo_helper.template_get_latest( template_prefix=template_prefix, flow_name=current.flow_name, @@ -126,7 +126,7 @@ def end(self): and template["metadata"]["labels"].get( f"metaflow.org/tag_{self.index_tag}" ) - == str(1) + == 1 ), ) From 98a29775d044aa699f20e0ce416c7c943645ac03 Mon Sep 17 00:00:00 2001 From: Yun Wu Date: Mon, 19 Aug 2024 16:06:58 -0700 Subject: [PATCH 12/12] Final fixes --- .../aip/tests/flows/flow_triggering_flow.py | 20 +++++-------------- 1 file changed, 5 insertions(+), 15 deletions(-) diff --git a/metaflow/plugins/aip/tests/flows/flow_triggering_flow.py b/metaflow/plugins/aip/tests/flows/flow_triggering_flow.py index b1113ef101b..5c433e7b4b7 100644 --- a/metaflow/plugins/aip/tests/flows/flow_triggering_flow.py +++ b/metaflow/plugins/aip/tests/flows/flow_triggering_flow.py @@ -59,18 +59,13 @@ def start(self): ) if self.trigger_enabled: # Upload pipeline - # for the case where generate_base64_uuid returns a string starting with '-' - # and template_name == 'wfdsk-ftf-test-he5d4--6rhai0z0wiysuew' - # where aip _create_workflow_yaml() calls sanitize_k8s_name() which returns - # 'wfdsk-ftf-test-he5d4-6rhai0z0wiysuew' without the double -- - - print(f"{KUBERNETES_NAMESPACE=}") + logger.info(f"{KUBERNETES_NAMESPACE=}") self.workflow_template_names = [ sanitize_k8s_name( - f"{TEST_TEMPLATE_NAME}-{generate_base64_uuid()}".lower() + f"{TEST_TEMPLATE_NAME}-{current.run_id}-{index}".lower() ) - for _ in range(3) + for index in range(3) ] self.parent_tag = "parent-workflow" self.index_tag = "template-index" @@ -87,7 +82,7 @@ def start(self): "--tag", f"{self.parent_tag}:{current.run_id}", "--tag", - f"{self.index}:{template_index}", + f"{self.index_tag}:{template_index}", ], ) subprocess.run(["cat", path]) @@ -126,7 +121,7 @@ def end(self): and template["metadata"]["labels"].get( f"metaflow.org/tag_{self.index_tag}" ) - == 1 + == str(1) # All tags value are strings ), ) @@ -164,11 +159,6 @@ def end(self): metaflow_path = f"{current.flow_name}/{metaflow_run_id}/start" _retry_sleep(self.assert_parent_workflow, metaflow_path=metaflow_path) - - # ====== Clean up test templates ====== - for template_name in self.workflow_template_names: - logger.info(f"Deleting {template_name}") - argo_helper.template_delete(template_name) else: logger.info(f"{self.trigger_enabled=}")