Skip to content

Commit 2c01f4a

Browse files
committed
Adding ability to run latest argo workflow template
1 parent 0a913d1 commit 2c01f4a

File tree

2 files changed

+87
-5
lines changed

2 files changed

+87
-5
lines changed

Diff for: metaflow/plugins/aip/argo_client.py

+15-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
# talebz copied from https://github.com/Netflix/metaflow/blob/master/metaflow/plugins/argo/argo_client.py
22

33
import json
4-
from typing import Any, Dict, Optional
4+
from typing import Any, Dict, Optional, List
55

66
from metaflow.exception import MetaflowException
77
from metaflow.plugins.aws.eks.kubernetes_client import KubernetesClient
@@ -83,6 +83,20 @@ def create_workflow_config_map(self, name: str, config_map: Dict[str, Any]):
8383
json.loads(e.body)["message"] if e.body is not None else e.reason
8484
)
8585

86+
def list_workflow_template(self, namespace: Optional[str] = None):
87+
client = self._client.get()
88+
try:
89+
return client.CustomObjectsApi().list_namespaced_custom_object(
90+
group=self._group,
91+
version=self._version,
92+
namespace=namespace or self._namespace,
93+
plural="workflowtemplates",
94+
)
95+
except client.rest.ApiException as e:
96+
raise ArgoClientException(
97+
json.loads(e.body)["message"] if e.body is not None else e.reason
98+
)
99+
86100
def trigger_workflow_template(self, name: str, parameters: Optional[Dict] = None):
87101
client = self._client.get()
88102
body = {

Diff for: metaflow/plugins/aip/argo_utils.py

+72-4
Original file line numberDiff line numberDiff line change
@@ -14,16 +14,38 @@
1414

1515
def run_argo_workflow(
1616
kubernetes_namespace: str,
17-
template_name: str,
17+
template_name: Optional[str] = None,
18+
project_name: Optional[str] = None,
19+
branch_name: Optional[str] = None,
20+
template_name_prefix: Optional[str] = None,
1821
parameters: Optional[dict] = None,
1922
wait_timeout: Union[int, float, datetime.timedelta] = 0,
2023
**kwarg, # Other parameters for wait function
2124
) -> Tuple[str, str]:
25+
"""
26+
Using template_name to trigger a workflow template name with exact match.
27+
28+
If no template_name is provided, the latest workflow template satisfying
29+
the project_name, branch_name and template_name_prefix will be used.
30+
All of these filters are optional. If not provided, the latest workflow
31+
from the namespace will be used.
32+
"""
33+
client = ArgoClient(namespace=kubernetes_namespace)
34+
35+
if not template_name:
36+
template_name = get_latest_workflow(
37+
kubernetes_namespace,
38+
project_name=project_name,
39+
branch_name=branch_name,
40+
template_name_prefix=template_name_prefix,
41+
)
42+
2243
try:
2344
# TODO(talebz): add tag of origin-run-id to correlate parent flow
24-
workflow_manifest: Dict[str, Any] = ArgoClient(
25-
namespace=kubernetes_namespace,
26-
).trigger_workflow_template(template_name, parameters)
45+
logger.info(f"Triggering workflow template: {template_name}")
46+
workflow_manifest: Dict[str, Any] = client.trigger_workflow_template(
47+
template_name, parameters
48+
)
2749
except Exception as e:
2850
raise AIPException(str(e))
2951

@@ -41,6 +63,52 @@ def run_argo_workflow(
4163
return argo_run_id, argo_run_uid
4264

4365

66+
def get_latest_workflow(
67+
kubernetes_namespace: str,
68+
project_name: Optional[str] = None,
69+
branch_name: Optional[str] = None,
70+
template_name_prefix: Optional[str] = None,
71+
):
72+
# TODO:
73+
# - Add filter by project_id instead of project name - project_id is not added as a label yet.
74+
# - Add filter by flow_name - flow_name is not added as a label yet.
75+
client = ArgoClient(namespace=kubernetes_namespace)
76+
77+
templates = client.list_workflow_template()["items"]
78+
templates = [
79+
template
80+
for template in templates
81+
if (
82+
not project_name
83+
or template["metadata"]["labels"]["gitlab.zgtools.net/project-name"]
84+
== project_name
85+
)
86+
and (
87+
not branch_name
88+
or template["metadata"]["labels"]["gitlab.zgtools.net/branch-name"]
89+
== branch_name
90+
)
91+
and (
92+
not template_name_prefix
93+
or template["metadata"]["name"].startswith(template_name_prefix)
94+
)
95+
]
96+
if not templates:
97+
raise AIPException(
98+
f"No workflow template found with constraints "
99+
f"project_name={project_name}, branch_name={branch_name}, template_name_prefix={template_name_prefix}"
100+
)
101+
# Sort by creation timestamp to get the latest template.
102+
templates.sort(
103+
key=lambda template: template["metadata"]["creationTimestamp"], reverse=True
104+
)
105+
template_name = templates[1]["metadata"]["name"]
106+
logger.info(
107+
f"Found {len(templates)} WorkflowTemplates. Using latest workflow template: {template_name}"
108+
)
109+
return template_name
110+
111+
44112
def delete_argo_workflow(
45113
kubernetes_namespace: str,
46114
template_name: str,

0 commit comments

Comments
 (0)