From e8c46d80e1d250feeddcd2ae381f42bd823c5bd1 Mon Sep 17 00:00:00 2001 From: Madhur Tandon Date: Sun, 12 Jan 2025 21:34:05 +0530 Subject: [PATCH 1/8] metadata for spot termination --- metaflow/plugins/__init__.py | 5 + .../kubernetes/kubernetes_decorator.py | 5 + metaflow/plugins/kubernetes/spot_monitor.py | 112 ++++++++++++++++++ metaflow/plugins/metadata_cli.py | 66 +++++++++++ 4 files changed, 188 insertions(+) create mode 100644 metaflow/plugins/kubernetes/spot_monitor.py create mode 100644 metaflow/plugins/metadata_cli.py diff --git a/metaflow/plugins/__init__.py b/metaflow/plugins/__init__.py index 4409fab1a35..75ab7353c9f 100644 --- a/metaflow/plugins/__init__.py +++ b/metaflow/plugins/__init__.py @@ -16,6 +16,7 @@ ("argo-workflows", ".argo.argo_workflows_cli.cli"), ("card", ".cards.card_cli.cli"), ("tag", ".tag_cli.cli"), + ("metadata", ".metadata_cli.cli"), ("logs", ".logs_cli.cli"), ] @@ -104,6 +105,10 @@ "save_logs_periodically", "..mflog.save_logs_periodically.SaveLogsPeriodicallySidecar", ), + ( + "spot_termination_monitor", + ".kubernetes.spot_monitor.SpotTerminationMonitorSidecar", + ), ("heartbeat", "metaflow.metadata_provider.heartbeat.MetadataHeartBeat"), ] diff --git a/metaflow/plugins/kubernetes/kubernetes_decorator.py b/metaflow/plugins/kubernetes/kubernetes_decorator.py index 0142db6f4f2..5a98e67138b 100644 --- a/metaflow/plugins/kubernetes/kubernetes_decorator.py +++ b/metaflow/plugins/kubernetes/kubernetes_decorator.py @@ -547,6 +547,10 @@ def task_pre_step( self._save_logs_sidecar = Sidecar("save_logs_periodically") self._save_logs_sidecar.start() + # Start spot termination monitor sidecar. + self._spot_monitor_sidecar = Sidecar("spot_termination_monitor") + self._spot_monitor_sidecar.start() + num_parallel = None if hasattr(flow, "_parallel_ubf_iter"): num_parallel = flow._parallel_ubf_iter.num_parallel @@ -605,6 +609,7 @@ def task_finished( try: self._save_logs_sidecar.terminate() + self._spot_monitor_sidecar.terminate() except: # Best effort kill pass diff --git a/metaflow/plugins/kubernetes/spot_monitor.py b/metaflow/plugins/kubernetes/spot_monitor.py new file mode 100644 index 00000000000..39f6c27c28b --- /dev/null +++ b/metaflow/plugins/kubernetes/spot_monitor.py @@ -0,0 +1,112 @@ +import os +import sys +import time +import signal +import datetime +import requests +import subprocess +from multiprocessing import Process +from metaflow.sidecar import MessageTypes +from metaflow.metaflow_current import current + + +class SpotTerminationMonitorSidecar(object): + METADATA_URL = "http://169.254.169.254/latest/meta-data/spot/termination-time" + TOKEN_URL = "http://169.254.169.254/latest/api/token" + POLL_INTERVAL = 5 # seconds + + def __init__(self): + self.is_alive = True + self._process = None + self._token = None + self._token_expiry = 0 + + if self._is_aws_spot_instance(): + self._process = Process(target=self._monitor_loop) + self._process.start() + + def process_message(self, msg): + if msg.msg_type == MessageTypes.SHUTDOWN: + self.is_alive = False + if self._process: + self._process.terminate() + + @classmethod + def get_worker(cls): + return cls + + def _get_imds_token(self): + current_time = time.time() + if current_time >= self._token_expiry - 60: # Refresh 60s before expiry + try: + response = requests.put( + url=self.TOKEN_URL, + headers={"X-aws-ec2-metadata-token-ttl-seconds": "300"}, + timeout=1, + ) + if response.status_code == 200: + self._token = response.text + self._token_expiry = current_time + 240 # Slightly less than TTL + except requests.exceptions.RequestException: + pass + return self._token + + def _make_ec2_request(self, url, timeout): + token = self._get_imds_token() + headers = {"X-aws-ec2-metadata-token": token} if token else {} + response = requests.get(url=url, headers=headers, timeout=timeout) + return response + + def _is_aws_spot_instance(self): + try: + response = self._make_ec2_request(url=self.METADATA_URL, timeout=1) + # A 404 means we're on EC2 but not a spot instance + # A timeout/connection error means we're not on EC2 at all + return response.status_code != 404 + except (requests.exceptions.RequestException, requests.exceptions.Timeout): + return False + + def _monitor_loop(self): + while self.is_alive: + try: + response = self._make_ec2_request(url=self.METADATA_URL, timeout=1) + if response.status_code == 200: + termination_time = response.text + self._emit_termination_metadata(termination_time) + os.kill(os.getppid(), signal.SIGTERM) + break + except (requests.exceptions.RequestException, requests.exceptions.Timeout): + pass + time.sleep(self.POLL_INTERVAL) + + def _emit_termination_metadata(self, termination_time): + command = [ + sys.executable, + current.flow_name, + "record", + "--run-id", + current.run_id, + "--step-name", + current.step_name, + "--task_id", + current.task_id, + "--field", + "spot-termination-notice", + str(True), + "--field", + "spot-termination-time", + termination_time, + "--field", + "spot-termination-received-at", + datetime.now(datetime.timezone.utc).isoformat(), + "--tag", + "attempt_id:{}".format(current.attempt), + ] + + result = subprocess.run(command, capture_output=True, text=True) + + if result.returncode != 0: + print( + f"Failed to record spot termination metadata: {result.stderr}", + file=sys.stderr, + ) diff --git a/metaflow/plugins/metadata_cli.py b/metaflow/plugins/metadata_cli.py new file mode 100644 index 00000000000..7e6d453c810 --- /dev/null +++ b/metaflow/plugins/metadata_cli.py @@ -0,0 +1,66 @@ +from metaflow._vendor import click +from metaflow.tagging_util import validate_tags +from metaflow.metadata_provider import MetaDatum + + +@click.group() +def cli(): + pass + + +@cli.group(help="Commands related to metadata.") +def metadata(): + pass + + +@metadata.command(help="Record metadata for a task.") +@click.option( + "--run-id", + required=True, + help="Run ID for which metadata is to be recorded.", +) +@click.option( + "--step-name", + required=True, + help="Step Name for which metadata is to be recorded.", +) +@click.option( + "--task-id", + required=True, + help="Task ID for which metadata is to be recorded.", +) +@click.option( + "--field", + multiple=True, + required=True, + type=(str, str), + help="Metadata key value pairs.", +) +@click.option( + "--tag", + "tags", + multiple=True, + required=False, + default=None, + help="List of tags.", +) +@click.pass_obj +def record(obj, run_id, step_name, task_id, field, tags=None): + validate_tags(tags) + + tag_list = list(tags) if tags else [] + + entries = [] + for k, v in dict(field).items(): + entries.append( + MetaDatum( + field=k, + value=v, + type=k, + tags=tag_list, + ) + ) + + obj.metadata.register_metadata( + run_id=run_id, step_name=step_name, task_id=task_id, metadata=entries + ) From bc91dfab9f576ed70b9eb0cccf23e34e893385b3 Mon Sep 17 00:00:00 2001 From: Madhur Tandon Date: Wed, 15 Jan 2025 01:04:19 +0530 Subject: [PATCH 2/8] fixes --- metaflow/plugins/kubernetes/kubernetes_cli.py | 10 +++++++ metaflow/plugins/kubernetes/spot_monitor.py | 29 +++++++++---------- 2 files changed, 23 insertions(+), 16 deletions(-) diff --git a/metaflow/plugins/kubernetes/kubernetes_cli.py b/metaflow/plugins/kubernetes/kubernetes_cli.py index 54a6f4206da..358745ca1f6 100644 --- a/metaflow/plugins/kubernetes/kubernetes_cli.py +++ b/metaflow/plugins/kubernetes/kubernetes_cli.py @@ -275,6 +275,16 @@ def _sync_metadata(): ), ) + env.update( + { + "FLOW_FILE_PATH": os.path.basename(sys.argv[0]), + "RUN_ID": kwargs["run_id"], + "STEP_NAME": step_name, + "TASK_ID": task_id, + "RETRY_COUNT": retry_count, + } + ) + try: kubernetes = Kubernetes( datastore=ctx.obj.flow_datastore, diff --git a/metaflow/plugins/kubernetes/spot_monitor.py b/metaflow/plugins/kubernetes/spot_monitor.py index 39f6c27c28b..a3784f8e1f2 100644 --- a/metaflow/plugins/kubernetes/spot_monitor.py +++ b/metaflow/plugins/kubernetes/spot_monitor.py @@ -2,15 +2,16 @@ import sys import time import signal -import datetime import requests import subprocess from multiprocessing import Process +from datetime import datetime, timezone from metaflow.sidecar import MessageTypes from metaflow.metaflow_current import current class SpotTerminationMonitorSidecar(object): + EC2_TYPE_URL = "http://169.254.169.254/latest/meta-data/instance-life-cycle" METADATA_URL = "http://169.254.169.254/latest/meta-data/spot/termination-time" TOKEN_URL = "http://169.254.169.254/latest/api/token" POLL_INTERVAL = 5 # seconds @@ -59,10 +60,8 @@ def _make_ec2_request(self, url, timeout): def _is_aws_spot_instance(self): try: - response = self._make_ec2_request(url=self.METADATA_URL, timeout=1) - # A 404 means we're on EC2 but not a spot instance - # A timeout/connection error means we're not on EC2 at all - return response.status_code != 404 + response = self._make_ec2_request(url=self.EC2_TYPE_URL, timeout=1) + return response.status_code == 200 and response.text == "spot" except (requests.exceptions.RequestException, requests.exceptions.Timeout): return False @@ -82,14 +81,15 @@ def _monitor_loop(self): def _emit_termination_metadata(self, termination_time): command = [ sys.executable, - current.flow_name, + f"/metaflow/{os.getenv('FLOW_FILE_PATH')}", + "metadata", "record", "--run-id", - current.run_id, + os.getenv("RUN_ID"), "--step-name", - current.step_name, - "--task_id", - current.task_id, + os.getenv("STEP_NAME"), + "--task-id", + os.getenv("TASK_ID"), "--field", "spot-termination-notice", str(True), @@ -98,15 +98,12 @@ def _emit_termination_metadata(self, termination_time): termination_time, "--field", "spot-termination-received-at", - datetime.now(datetime.timezone.utc).isoformat(), + datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"), "--tag", - "attempt_id:{}".format(current.attempt), + "attempt_id:{}".format(os.getenv("RETRY_COUNT")), ] result = subprocess.run(command, capture_output=True, text=True) if result.returncode != 0: - print( - f"Failed to record spot termination metadata: {result.stderr}", - file=sys.stderr, - ) + print(f"Failed to record spot termination metadata: {result.stderr}") From 71d063e81c0344cb9fbf23d68cfc646eb54c4c82 Mon Sep 17 00:00:00 2001 From: Madhur Tandon Date: Wed, 15 Jan 2025 01:24:09 +0530 Subject: [PATCH 3/8] change dir structure and renaming --- metaflow/plugins/__init__.py | 4 ++-- .../{metadata_cli.py => kubernetes/spot_metadata_cli.py} | 6 +++--- .../kubernetes/{spot_monitor.py => spot_monitor_sidecar.py} | 3 +-- 3 files changed, 6 insertions(+), 7 deletions(-) rename metaflow/plugins/{metadata_cli.py => kubernetes/spot_metadata_cli.py} (90%) rename metaflow/plugins/kubernetes/{spot_monitor.py => spot_monitor_sidecar.py} (98%) diff --git a/metaflow/plugins/__init__.py b/metaflow/plugins/__init__.py index 75ab7353c9f..c10886b1708 100644 --- a/metaflow/plugins/__init__.py +++ b/metaflow/plugins/__init__.py @@ -16,7 +16,7 @@ ("argo-workflows", ".argo.argo_workflows_cli.cli"), ("card", ".cards.card_cli.cli"), ("tag", ".tag_cli.cli"), - ("metadata", ".metadata_cli.cli"), + ("spot-metadata", ".kubernetes.spot_metadata_cli.cli"), ("logs", ".logs_cli.cli"), ] @@ -107,7 +107,7 @@ ), ( "spot_termination_monitor", - ".kubernetes.spot_monitor.SpotTerminationMonitorSidecar", + ".kubernetes.spot_monitor_sidecar.SpotTerminationMonitorSidecar", ), ("heartbeat", "metaflow.metadata_provider.heartbeat.MetadataHeartBeat"), ] diff --git a/metaflow/plugins/metadata_cli.py b/metaflow/plugins/kubernetes/spot_metadata_cli.py similarity index 90% rename from metaflow/plugins/metadata_cli.py rename to metaflow/plugins/kubernetes/spot_metadata_cli.py index 7e6d453c810..78eb6667cc1 100644 --- a/metaflow/plugins/metadata_cli.py +++ b/metaflow/plugins/kubernetes/spot_metadata_cli.py @@ -8,12 +8,12 @@ def cli(): pass -@cli.group(help="Commands related to metadata.") -def metadata(): +@cli.group(help="Commands related to spot metadata.") +def spot_metadata(): pass -@metadata.command(help="Record metadata for a task.") +@spot_metadata.command(help="Record spot metadata for a task.") @click.option( "--run-id", required=True, diff --git a/metaflow/plugins/kubernetes/spot_monitor.py b/metaflow/plugins/kubernetes/spot_monitor_sidecar.py similarity index 98% rename from metaflow/plugins/kubernetes/spot_monitor.py rename to metaflow/plugins/kubernetes/spot_monitor_sidecar.py index a3784f8e1f2..8db7835920f 100644 --- a/metaflow/plugins/kubernetes/spot_monitor.py +++ b/metaflow/plugins/kubernetes/spot_monitor_sidecar.py @@ -7,7 +7,6 @@ from multiprocessing import Process from datetime import datetime, timezone from metaflow.sidecar import MessageTypes -from metaflow.metaflow_current import current class SpotTerminationMonitorSidecar(object): @@ -82,7 +81,7 @@ def _emit_termination_metadata(self, termination_time): command = [ sys.executable, f"/metaflow/{os.getenv('FLOW_FILE_PATH')}", - "metadata", + "spot-metadata", "record", "--run-id", os.getenv("RUN_ID"), From 85afe0ffd68841d05e7a482b7f1d40cdb3d49b46 Mon Sep 17 00:00:00 2001 From: Madhur Tandon Date: Wed, 15 Jan 2025 01:26:56 +0530 Subject: [PATCH 4/8] remove spot-termination-notice --- metaflow/plugins/kubernetes/spot_monitor_sidecar.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/metaflow/plugins/kubernetes/spot_monitor_sidecar.py b/metaflow/plugins/kubernetes/spot_monitor_sidecar.py index 8db7835920f..257c7dd4c07 100644 --- a/metaflow/plugins/kubernetes/spot_monitor_sidecar.py +++ b/metaflow/plugins/kubernetes/spot_monitor_sidecar.py @@ -90,9 +90,6 @@ def _emit_termination_metadata(self, termination_time): "--task-id", os.getenv("TASK_ID"), "--field", - "spot-termination-notice", - str(True), - "--field", "spot-termination-time", termination_time, "--field", From 259965411a01b8f5d2cf533cb997567768411d7e Mon Sep 17 00:00:00 2001 From: Madhur Tandon Date: Thu, 16 Jan 2025 11:35:29 +0400 Subject: [PATCH 5/8] re-use existing vars --- metaflow/plugins/kubernetes/kubernetes_cli.py | 12 +----------- .../plugins/kubernetes/spot_monitor_sidecar.py | 14 +++++++++----- 2 files changed, 10 insertions(+), 16 deletions(-) diff --git a/metaflow/plugins/kubernetes/kubernetes_cli.py b/metaflow/plugins/kubernetes/kubernetes_cli.py index 358745ca1f6..178bc48dbd9 100644 --- a/metaflow/plugins/kubernetes/kubernetes_cli.py +++ b/metaflow/plugins/kubernetes/kubernetes_cli.py @@ -190,7 +190,7 @@ def echo(msg, stream="stderr", job_id=None, **kwargs): executable = ctx.obj.environment.executable(step_name, executable) # Set environment - env = {} + env = {"MF_FLOW_FILENAME": os.path.basename(sys.argv[0])} env_deco = [deco for deco in node.decorators if deco.name == "environment"] if env_deco: env = env_deco[0].attributes["vars"] @@ -275,16 +275,6 @@ def _sync_metadata(): ), ) - env.update( - { - "FLOW_FILE_PATH": os.path.basename(sys.argv[0]), - "RUN_ID": kwargs["run_id"], - "STEP_NAME": step_name, - "TASK_ID": task_id, - "RETRY_COUNT": retry_count, - } - ) - try: kubernetes = Kubernetes( datastore=ctx.obj.flow_datastore, diff --git a/metaflow/plugins/kubernetes/spot_monitor_sidecar.py b/metaflow/plugins/kubernetes/spot_monitor_sidecar.py index 257c7dd4c07..52dec87d5bf 100644 --- a/metaflow/plugins/kubernetes/spot_monitor_sidecar.py +++ b/metaflow/plugins/kubernetes/spot_monitor_sidecar.py @@ -78,17 +78,21 @@ def _monitor_loop(self): time.sleep(self.POLL_INTERVAL) def _emit_termination_metadata(self, termination_time): + flow_filename = os.getenv("MF_FLOW_FILENAME") + pathspec = os.getenv("MF_PATHSPEC") + _, run_id, step_name, task_id = pathspec.split("/") + retry_count = os.getenv("MF_ATTEMPT") command = [ sys.executable, - f"/metaflow/{os.getenv('FLOW_FILE_PATH')}", + f"/metaflow/{flow_filename}", "spot-metadata", "record", "--run-id", - os.getenv("RUN_ID"), + run_id, "--step-name", - os.getenv("STEP_NAME"), + step_name, "--task-id", - os.getenv("TASK_ID"), + task_id, "--field", "spot-termination-time", termination_time, @@ -96,7 +100,7 @@ def _emit_termination_metadata(self, termination_time): "spot-termination-received-at", datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"), "--tag", - "attempt_id:{}".format(os.getenv("RETRY_COUNT")), + "attempt_id:{}".format(retry_count), ] result = subprocess.run(command, capture_output=True, text=True) From ed0b6390a8078db94be3ebef0932389692f7211b Mon Sep 17 00:00:00 2001 From: Madhur Tandon Date: Fri, 17 Jan 2025 01:22:22 +0400 Subject: [PATCH 6/8] write to file as well --- metaflow/plugins/argo/argo_workflows.py | 1 + metaflow/plugins/kubernetes/kubernetes_decorator.py | 3 +++ metaflow/plugins/kubernetes/spot_monitor_sidecar.py | 4 ++++ 3 files changed, 8 insertions(+) diff --git a/metaflow/plugins/argo/argo_workflows.py b/metaflow/plugins/argo/argo_workflows.py index 2f1a1d57f6a..a3ddc526691 100644 --- a/metaflow/plugins/argo/argo_workflows.py +++ b/metaflow/plugins/argo/argo_workflows.py @@ -1705,6 +1705,7 @@ def _container_templates(self): }, **{ # Some optional values for bookkeeping + "MF_FLOW_FILENAME": os.path.basename(sys.argv[0]), "METAFLOW_FLOW_NAME": self.flow.name, "METAFLOW_STEP_NAME": node.name, "METAFLOW_RUN_ID": run_id, diff --git a/metaflow/plugins/kubernetes/kubernetes_decorator.py b/metaflow/plugins/kubernetes/kubernetes_decorator.py index 5a98e67138b..551feecd1ee 100644 --- a/metaflow/plugins/kubernetes/kubernetes_decorator.py +++ b/metaflow/plugins/kubernetes/kubernetes_decorator.py @@ -548,6 +548,9 @@ def task_pre_step( self._save_logs_sidecar.start() # Start spot termination monitor sidecar. + current._update_env( + {"spot_termination_notice": "/tmp/spot_termination_notice"} + ) self._spot_monitor_sidecar = Sidecar("spot_termination_monitor") self._spot_monitor_sidecar.start() diff --git a/metaflow/plugins/kubernetes/spot_monitor_sidecar.py b/metaflow/plugins/kubernetes/spot_monitor_sidecar.py index 52dec87d5bf..a1c96587f4d 100644 --- a/metaflow/plugins/kubernetes/spot_monitor_sidecar.py +++ b/metaflow/plugins/kubernetes/spot_monitor_sidecar.py @@ -82,6 +82,10 @@ def _emit_termination_metadata(self, termination_time): pathspec = os.getenv("MF_PATHSPEC") _, run_id, step_name, task_id = pathspec.split("/") retry_count = os.getenv("MF_ATTEMPT") + + with open("/tmp/spot_termination_notice", "w") as fp: + fp.write(termination_time) + command = [ sys.executable, f"/metaflow/{flow_filename}", From c402e03fa1a3fa498399eb7379ec118a779054a9 Mon Sep 17 00:00:00 2001 From: Madhur Tandon Date: Fri, 17 Jan 2025 01:43:02 +0400 Subject: [PATCH 7/8] address nitpick --- metaflow/plugins/argo/argo_workflows.py | 2 +- metaflow/plugins/kubernetes/kubernetes_cli.py | 2 +- metaflow/plugins/kubernetes/spot_monitor_sidecar.py | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/metaflow/plugins/argo/argo_workflows.py b/metaflow/plugins/argo/argo_workflows.py index a3ddc526691..6fc1b57c807 100644 --- a/metaflow/plugins/argo/argo_workflows.py +++ b/metaflow/plugins/argo/argo_workflows.py @@ -1705,7 +1705,7 @@ def _container_templates(self): }, **{ # Some optional values for bookkeeping - "MF_FLOW_FILENAME": os.path.basename(sys.argv[0]), + "METAFLOW_FLOW_FILENAME": os.path.basename(sys.argv[0]), "METAFLOW_FLOW_NAME": self.flow.name, "METAFLOW_STEP_NAME": node.name, "METAFLOW_RUN_ID": run_id, diff --git a/metaflow/plugins/kubernetes/kubernetes_cli.py b/metaflow/plugins/kubernetes/kubernetes_cli.py index 178bc48dbd9..7bc82313bba 100644 --- a/metaflow/plugins/kubernetes/kubernetes_cli.py +++ b/metaflow/plugins/kubernetes/kubernetes_cli.py @@ -190,7 +190,7 @@ def echo(msg, stream="stderr", job_id=None, **kwargs): executable = ctx.obj.environment.executable(step_name, executable) # Set environment - env = {"MF_FLOW_FILENAME": os.path.basename(sys.argv[0])} + env = {"METAFLOW_FLOW_FILENAME": os.path.basename(sys.argv[0])} env_deco = [deco for deco in node.decorators if deco.name == "environment"] if env_deco: env = env_deco[0].attributes["vars"] diff --git a/metaflow/plugins/kubernetes/spot_monitor_sidecar.py b/metaflow/plugins/kubernetes/spot_monitor_sidecar.py index a1c96587f4d..b8a0402b08d 100644 --- a/metaflow/plugins/kubernetes/spot_monitor_sidecar.py +++ b/metaflow/plugins/kubernetes/spot_monitor_sidecar.py @@ -78,7 +78,7 @@ def _monitor_loop(self): time.sleep(self.POLL_INTERVAL) def _emit_termination_metadata(self, termination_time): - flow_filename = os.getenv("MF_FLOW_FILENAME") + flow_filename = os.getenv("METAFLOW_FLOW_FILENAME") pathspec = os.getenv("MF_PATHSPEC") _, run_id, step_name, task_id = pathspec.split("/") retry_count = os.getenv("MF_ATTEMPT") From 69f0d1f5ae9cdfc0331ffa25c39ca69b1e8e35a9 Mon Sep 17 00:00:00 2001 From: Madhur Tandon Date: Fri, 17 Jan 2025 02:32:28 +0400 Subject: [PATCH 8/8] less generic --- .../plugins/kubernetes/spot_metadata_cli.py | 35 ++++++++++--------- .../kubernetes/spot_monitor_sidecar.py | 6 +--- 2 files changed, 20 insertions(+), 21 deletions(-) diff --git a/metaflow/plugins/kubernetes/spot_metadata_cli.py b/metaflow/plugins/kubernetes/spot_metadata_cli.py index 78eb6667cc1..7b333dad3c8 100644 --- a/metaflow/plugins/kubernetes/spot_metadata_cli.py +++ b/metaflow/plugins/kubernetes/spot_metadata_cli.py @@ -1,4 +1,5 @@ from metaflow._vendor import click +from datetime import datetime, timezone from metaflow.tagging_util import validate_tags from metaflow.metadata_provider import MetaDatum @@ -13,7 +14,7 @@ def spot_metadata(): pass -@spot_metadata.command(help="Record spot metadata for a task.") +@spot_metadata.command(help="Record spot termination metadata for a task.") @click.option( "--run-id", required=True, @@ -30,11 +31,9 @@ def spot_metadata(): help="Task ID for which metadata is to be recorded.", ) @click.option( - "--field", - multiple=True, + "--termination-notice-time", required=True, - type=(str, str), - help="Metadata key value pairs.", + help="Spot termination notice time.", ) @click.option( "--tag", @@ -45,21 +44,25 @@ def spot_metadata(): help="List of tags.", ) @click.pass_obj -def record(obj, run_id, step_name, task_id, field, tags=None): +def record(obj, run_id, step_name, task_id, termination_notice_time, tags=None): validate_tags(tags) tag_list = list(tags) if tags else [] - entries = [] - for k, v in dict(field).items(): - entries.append( - MetaDatum( - field=k, - value=v, - type=k, - tags=tag_list, - ) - ) + entries = [ + MetaDatum( + field="spot-termination-received-at", + value=datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"), + type="spot-termination-received-at", + tags=tag_list, + ), + MetaDatum( + field="spot-termination-time", + value=termination_notice_time, + type="spot-termination-time", + tags=tag_list, + ), + ] obj.metadata.register_metadata( run_id=run_id, step_name=step_name, task_id=task_id, metadata=entries diff --git a/metaflow/plugins/kubernetes/spot_monitor_sidecar.py b/metaflow/plugins/kubernetes/spot_monitor_sidecar.py index b8a0402b08d..59f821f885e 100644 --- a/metaflow/plugins/kubernetes/spot_monitor_sidecar.py +++ b/metaflow/plugins/kubernetes/spot_monitor_sidecar.py @@ -97,12 +97,8 @@ def _emit_termination_metadata(self, termination_time): step_name, "--task-id", task_id, - "--field", - "spot-termination-time", + "--termination-notice-time", termination_time, - "--field", - "spot-termination-received-at", - datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"), "--tag", "attempt_id:{}".format(retry_count), ]