Skip to content

Commit

Permalink
Merge pull request #303 from zillow/yunw/AIP-8187-run-latest-wftemplate
Browse files Browse the repository at this point in the history
Refactor & add sdk to trigger latest workflow template in the cluster
  • Loading branch information
cloudw authored Aug 20, 2024
2 parents 898b0c0 + 98a2977 commit 97ab1e6
Show file tree
Hide file tree
Showing 8 changed files with 435 additions and 221 deletions.
4 changes: 2 additions & 2 deletions metaflow/metaflow_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,8 @@ def from_conf(name, default=None):
# Note: `ARGO_RUN_URL_PREFIX` is the URL prefix for ARGO runs on your ARGO cluster. The prefix includes
# all parts of the URL except the run_id at the end which we append once the run is created.
# For eg, this would look like: "https://<your-kf-cluster-url>/argo-ui/workflows/
ARGO_RUN_URL_PREFIX = from_conf("ARGO_RUN_URL_PREFIX", "")
METAFLOW_RUN_URL_PREFIX = from_conf("METAFLOW_RUN_URL_PREFIX", "")
ARGO_RUN_URL_PREFIX = from_conf("ARGO_RUN_URL_PREFIX", "").rstrip("/")
METAFLOW_RUN_URL_PREFIX = from_conf("METAFLOW_RUN_URL_PREFIX", "").rstrip("/")
AIP_MAX_PARALLELISM = int(from_conf("AIP_MAX_PARALLELISM", 10))
AIP_MAX_RUN_CONCURRENCY = int(from_conf("AIP_MAX_RUN_CONCURRENCY", 10))
AIP_SHOW_METAFLOW_UI_URL = bool(from_conf("AIP_SHOW_METAFLOW_UI_URL", False))
Expand Down
10 changes: 4 additions & 6 deletions metaflow/plugins/aip/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,10 @@
)

from .argo_utils import (
run_argo_workflow,
run_id_to_url,
run_id_to_metaflow_url,
wait_for_argo_run_completion,
delete_argo_workflow,
to_metaflow_run_id,
ArgoHelper,
get_argo_url,
get_metaflow_url,
get_metaflow_run_id,
)

from .exit_handler_decorator import (
Expand Down
12 changes: 6 additions & 6 deletions metaflow/plugins/aip/aip_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@
check_metadata_service_version,
)
from metaflow.plugins.aip.argo_utils import (
run_id_to_url,
run_id_to_metaflow_url,
to_metaflow_run_id,
get_argo_url,
get_metaflow_url,
get_metaflow_run_id,
)
from metaflow.plugins.aip.aip_decorator import AIPException
from metaflow.plugins.aip.aip_step_init import save_step_environment_variables
Expand Down Expand Up @@ -421,9 +421,9 @@ def _echo_workflow_run(
):
argo_workflow_name = workflow_manifest["metadata"]["name"]
argo_workflow_uid = workflow_manifest["metadata"]["uid"]
metaflow_run_id = to_metaflow_run_id(argo_workflow_uid)
metaflow_ui_url = run_id_to_metaflow_url(flow_name, argo_workflow_uid)
argo_ui_url = run_id_to_url(
metaflow_run_id = get_metaflow_run_id(argo_workflow_uid)
metaflow_ui_url = get_metaflow_url(flow_name, argo_workflow_uid)
argo_ui_url = get_argo_url(
argo_workflow_name, kubernetes_namespace, argo_workflow_uid
)
obj.echo(f"Metaflow run_id=*{metaflow_run_id}*\n", fg="magenta")
Expand Down
6 changes: 2 additions & 4 deletions metaflow/plugins/aip/aip_exit_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from metaflow._vendor import click
import logging

from metaflow.plugins.aip import run_id_to_url
from metaflow.plugins.aip import get_argo_url

_logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -71,9 +71,7 @@ def email_notify(send_to):
email_body = get_env("METAFLOW_NOTIFY_EMAIL_BODY", "")
k8s_namespace = get_env("POD_NAMESPACE", "")

argo_ui_url = run_id_to_url(
argo_workflow_name, k8s_namespace, argo_workflow_uid
)
argo_ui_url = get_argo_url(argo_workflow_name, k8s_namespace, argo_workflow_uid)
body = (
f"status = {status} <br/>\n"
f"{argo_ui_url} <br/>\n"
Expand Down
4 changes: 2 additions & 2 deletions metaflow/plugins/aip/aip_udf_exit_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

from metaflow.decorators import flow_decorators, FlowDecorator
from metaflow.graph import FlowGraph
from metaflow.plugins.aip import run_id_to_url
from metaflow.plugins.aip import get_argo_url

_logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -44,7 +44,7 @@ def get_env(name, default=None) -> str:

argo_workflow_name = get_env("MF_ARGO_WORKFLOW_NAME", "")
k8s_namespace = get_env("POD_NAMESPACE", "")
argo_ui_url = run_id_to_url(argo_workflow_name, k8s_namespace, argo_workflow_uid)
argo_ui_url = get_argo_url(argo_workflow_name, k8s_namespace, argo_workflow_uid)

metaflow_configs: Dict[str, str] = json.loads(metaflow_configs_json)
metaflow_configs_new: Dict[str, str] = {
Expand Down
16 changes: 15 additions & 1 deletion metaflow/plugins/aip/argo_client.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# talebz copied from https://github.com/Netflix/metaflow/blob/master/metaflow/plugins/argo/argo_client.py

import json
from typing import Any, Dict, Optional
from typing import Any, Dict, Optional, List

from metaflow.exception import MetaflowException
from metaflow.plugins.aws.eks.kubernetes_client import KubernetesClient
Expand Down Expand Up @@ -83,6 +83,20 @@ def create_workflow_config_map(self, name: str, config_map: Dict[str, Any]):
json.loads(e.body)["message"] if e.body is not None else e.reason
)

def list_workflow_template(self, namespace: Optional[str] = None):
client = self._client.get()
try:
return client.CustomObjectsApi().list_namespaced_custom_object(
group=self._group,
version=self._version,
namespace=namespace or self._namespace,
plural="workflowtemplates",
)
except client.rest.ApiException as e:
raise ArgoClientException(
json.loads(e.body)["message"] if e.body is not None else e.reason
)

def trigger_workflow_template(self, name: str, parameters: Optional[Dict] = None):
client = self._client.get()
body = {
Expand Down
Loading

0 comments on commit 97ab1e6

Please sign in to comment.