Skip to content

Commit 8414100

Browse files
committed
AIP-7511 exit_handler DAG
- split AIP email notify & SQS DLQ into separate Argo steps within the exit_handler DAG - cleanup the Argo entrypoint to be the Argo flow DAG, not exit-handler.
1 parent db524bb commit 8414100

File tree

2 files changed

+152
-52
lines changed

2 files changed

+152
-52
lines changed

metaflow/plugins/aip/aip.py

+113-21
Original file line numberDiff line numberDiff line change
@@ -294,6 +294,59 @@ def _create_workflow_yaml(
294294

295295
KubeflowPipelines._add_archive_section_to_cards_artifacts(workflow)
296296

297+
if "onExit" in workflow["spec"]:
298+
# replace entrypoint content with the exit handler handler content
299+
"""
300+
# What it looks like beforehand...
301+
entrypoint: helloflow
302+
templates:
303+
- name: exit-handler-1
304+
dag:
305+
tasks:
306+
- name: end
307+
template: end
308+
dependencies: [start]
309+
- {name: start, template: start}
310+
- name: helloflow
311+
dag:
312+
tasks:
313+
- {name: exit-handler-1, template: exit-handler-1}
314+
- {name: sqs-exit-handler, template: sqs-exit-handler}
315+
"""
316+
# find the exit-handler-1 template
317+
exit_handler_template: dict = [
318+
template
319+
for template in workflow["spec"]["templates"]
320+
if template["name"] == "exit-handler-1"
321+
][0]
322+
323+
# find the entrypoint template
324+
entrypoint_template: dict = [
325+
template
326+
for template in workflow["spec"]["templates"]
327+
if template["name"] == workflow["spec"]["entrypoint"]
328+
][0]
329+
330+
# replace the entrypoint template with the exit handler template
331+
entrypoint_template["dag"] = exit_handler_template["dag"]
332+
333+
# rename exit-handler-1 to exit-handler
334+
exit_handler_template["name"] = "exit-handler"
335+
workflow["spec"]["onExit"] = "exit-handler"
336+
exit_handler_template["dag"] = {
337+
"tasks": [
338+
{
339+
"name": "sqs-exit-handler",
340+
"template": "sqs-exit-handler",
341+
"dependencies": ["notify-email-exit-handler"],
342+
},
343+
{
344+
"name": "notify-email-exit-handler",
345+
"template": "notify-email-exit-handler",
346+
},
347+
]
348+
}
349+
297350
return workflow
298351

299352
@staticmethod
@@ -1239,11 +1292,18 @@ def call_build_kfp_dag(workflow_uid_op: ContainerOp):
12391292
)
12401293

12411294
if self.notify or self.sqs_url_on_error:
1242-
with dsl.ExitHandler(
1243-
self._create_exit_handler_op(
1244-
flow_variables.package_commands, flow_parameters
1245-
)
1246-
):
1295+
op = self._create_notify_exit_handler_op(
1296+
flow_variables.package_commands, flow_parameters
1297+
)
1298+
1299+
# The following exit handler gets created and added as a ContainerOp
1300+
# and also as a parallel task to the Argo template "exit-handler-1"
1301+
# (the hardcoded kfp compiler name of the exit handler)
1302+
# We replace, and rename, this parallel task dag with dag of steps in _create_workflow_yaml().
1303+
self._create_sqs_exit_handler_op(
1304+
flow_variables.package_commands, flow_parameters
1305+
)
1306+
with dsl.ExitHandler(op):
12471307
s3_sensor_op: Optional[ContainerOp] = self.create_s3_sensor_op(
12481308
flow_variables,
12491309
)
@@ -1551,12 +1611,36 @@ def _create_s3_sensor_op(
15511611
)
15521612
return s3_sensor_op
15531613

1554-
def _create_exit_handler_op(
1614+
def _create_sqs_exit_handler_op(
1615+
self,
1616+
package_commands: str,
1617+
flow_parameters: Dict,
1618+
) -> ContainerOp:
1619+
env_variables: dict = {
1620+
key: from_conf(key)
1621+
for key in [
1622+
"ARGO_RUN_URL_PREFIX",
1623+
]
1624+
if from_conf(key)
1625+
}
1626+
1627+
if self.sqs_role_arn_on_error:
1628+
env_variables["METAFLOW_SQS_ROLE_ARN_ON_ERROR"] = self.sqs_role_arn_on_error
1629+
1630+
return self._get_aip_exit_handler_op(
1631+
flow_parameters,
1632+
env_variables,
1633+
package_commands,
1634+
name="sqs-exit-handler",
1635+
flag="--run_sqs_on_error",
1636+
)
1637+
1638+
def _create_notify_exit_handler_op(
15551639
self,
15561640
package_commands: str,
15571641
flow_parameters: Dict,
15581642
) -> ContainerOp:
1559-
notify_variables: dict = {
1643+
env_variables: dict = {
15601644
key: from_conf(key)
15611645
for key in [
15621646
"METAFLOW_NOTIFY_EMAIL_FROM",
@@ -1569,19 +1653,27 @@ def _create_exit_handler_op(
15691653
}
15701654

15711655
if self.notify_on_error:
1572-
notify_variables["METAFLOW_NOTIFY_ON_ERROR"] = self.notify_on_error
1656+
env_variables["METAFLOW_NOTIFY_ON_ERROR"] = self.notify_on_error
15731657

15741658
if self.notify_on_success:
1575-
notify_variables["METAFLOW_NOTIFY_ON_SUCCESS"] = self.notify_on_success
1576-
1577-
if self.sqs_url_on_error:
1578-
notify_variables["METAFLOW_SQS_URL_ON_ERROR"] = self.sqs_url_on_error
1579-
1580-
if self.sqs_role_arn_on_error:
1581-
notify_variables[
1582-
"METAFLOW_SQS_ROLE_ARN_ON_ERROR"
1583-
] = self.sqs_role_arn_on_error
1659+
env_variables["METAFLOW_NOTIFY_ON_SUCCESS"] = self.notify_on_success
1660+
1661+
return self._get_aip_exit_handler_op(
1662+
flow_parameters,
1663+
env_variables,
1664+
package_commands,
1665+
name="notify-email-exit-handler",
1666+
flag="--run_email_notify",
1667+
)
15841668

1669+
def _get_aip_exit_handler_op(
1670+
self,
1671+
flow_parameters: Dict,
1672+
env_variables: Dict,
1673+
package_commands: str,
1674+
name: str,
1675+
flag: str = "",
1676+
) -> ContainerOp:
15851677
# when there are no flow parameters argo complains
15861678
# that {{workflow.parameters}} failed to resolve
15871679
# see https://github.com/argoproj/argo-workflows/issues/6036
@@ -1594,19 +1686,19 @@ def _create_exit_handler_op(
15941686
" && python -m metaflow.plugins.aip.aip_exit_handler"
15951687
f" --flow_name {self.name}"
15961688
" --run_id {{workflow.name}}"
1597-
f" --notify_variables_json {json.dumps(json.dumps(notify_variables))}"
1689+
f" --env_variables_json {json.dumps(json.dumps(env_variables))}"
15981690
f" --flow_parameters_json {flow_parameters_json if flow_parameters else '{}'}"
15991691
" --status {{workflow.status}}"
1692+
f" {flag}"
16001693
),
16011694
]
1602-
16031695
return (
16041696
dsl.ContainerOp(
1605-
name="exit_handler",
1697+
name=name,
16061698
image=self.base_image,
16071699
command=exit_handler_command,
16081700
)
1609-
.set_display_name("exit_handler")
1701+
.set_display_name(name)
16101702
.set_retry(
16111703
EXIT_HANDLER_RETRY_COUNT,
16121704
policy="Always",

metaflow/plugins/aip/aip_exit_handler.py

+39-31
Original file line numberDiff line numberDiff line change
@@ -10,14 +10,18 @@
1010
@click.option("--flow_name")
1111
@click.option("--status")
1212
@click.option("--run_id")
13-
@click.option("--notify_variables_json")
13+
@click.option("--env_variables_json")
1414
@click.option("--flow_parameters_json")
15+
@click.option("--run_email_notify", is_flag=True)
16+
@click.option("--run_sqs_on_error", is_flag=True)
1517
def exit_handler(
1618
flow_name: str,
1719
status: str,
1820
run_id: str,
19-
notify_variables_json: str,
21+
env_variables_json: str,
2022
flow_parameters_json: str,
23+
run_email_notify: bool = False,
24+
run_sqs_on_error: bool = False,
2125
):
2226
"""
2327
The environment variables that this depends on:
@@ -38,10 +42,10 @@ def exit_handler(
3842
import boto3
3943
from botocore.session import Session
4044

41-
notify_variables: Dict[str, str] = json.loads(notify_variables_json)
45+
env_variables: Dict[str, str] = json.loads(env_variables_json)
4246

4347
def get_env(name, default=None) -> str:
44-
return notify_variables.get(name, os.environ.get(name, default=default))
48+
return env_variables.get(name, os.environ.get(name, default=default))
4549

4650
def email_notify(send_to):
4751
import posixpath
@@ -133,34 +137,38 @@ def send_sqs_message(queue_url: str, message_body: str, *, role_arn: str = None)
133137
_logger.error(err)
134138
raise err
135139

136-
notify_on_error = get_env("METAFLOW_NOTIFY_ON_ERROR")
137-
notify_on_success = get_env("METAFLOW_NOTIFY_ON_SUCCESS")
138-
139-
print(f"Flow completed with status={status}")
140-
if notify_on_error and status == "Failed":
141-
email_notify(notify_on_error)
142-
elif notify_on_success and status == "Succeeded":
143-
email_notify(notify_on_success)
144-
else:
145-
print("No notification is necessary!")
146-
147-
# Send message to SQS if 'METAFLOW_SQS_URL_ON_ERROR' is set
148-
metaflow_sqs_url_on_error = get_env("METAFLOW_SQS_URL_ON_ERROR")
149-
150-
if metaflow_sqs_url_on_error:
151-
if status == "Failed":
152-
message_body = flow_parameters_json
153-
metaflow_sqs_role_arn_on_error = get_env("METAFLOW_SQS_ROLE_ARN_ON_ERROR")
154-
send_sqs_message(
155-
metaflow_sqs_url_on_error,
156-
message_body,
157-
role_arn=metaflow_sqs_role_arn_on_error,
158-
)
159-
print(f"message was sent to: {metaflow_sqs_url_on_error} successfully")
140+
if run_email_notify:
141+
notify_on_error = get_env("METAFLOW_NOTIFY_ON_ERROR")
142+
notify_on_success = get_env("METAFLOW_NOTIFY_ON_SUCCESS")
143+
144+
print(f"Flow completed with status={status}")
145+
if notify_on_error and status == "Failed":
146+
email_notify(notify_on_error)
147+
elif notify_on_success and status == "Succeeded":
148+
email_notify(notify_on_success)
149+
else:
150+
print("No notification is necessary!")
151+
152+
if run_sqs_on_error:
153+
# Send message to SQS if 'METAFLOW_SQS_URL_ON_ERROR' is set
154+
metaflow_sqs_url_on_error = get_env("METAFLOW_SQS_URL_ON_ERROR")
155+
156+
if metaflow_sqs_url_on_error:
157+
if status == "Failed":
158+
message_body = flow_parameters_json
159+
metaflow_sqs_role_arn_on_error = get_env(
160+
"METAFLOW_SQS_ROLE_ARN_ON_ERROR"
161+
)
162+
send_sqs_message(
163+
metaflow_sqs_url_on_error,
164+
message_body,
165+
role_arn=metaflow_sqs_role_arn_on_error,
166+
)
167+
print(f"message was sent to: {metaflow_sqs_url_on_error} successfully")
168+
else:
169+
print("Workflow succeeded, thus no SQS message is sent to SQS!")
160170
else:
161-
print("Workflow succeeded, thus no SQS message is sent to SQS!")
162-
else:
163-
print("SQS is not configured!")
171+
print("SQS is not configured!")
164172

165173

166174
if __name__ == "__main__":

0 commit comments

Comments
 (0)