diff --git a/metaflow/plugins/argo/argo_workflows.py b/metaflow/plugins/argo/argo_workflows.py index d5b38715ba0..ac4ab03f486 100644 --- a/metaflow/plugins/argo/argo_workflows.py +++ b/metaflow/plugins/argo/argo_workflows.py @@ -255,6 +255,7 @@ def terminate(cls, flow_name, name): flow_name=flow_name, run_id=name ) ) + return True @staticmethod def get_workflow_status(flow_name, name): diff --git a/metaflow/plugins/argo/argo_workflows_cli.py b/metaflow/plugins/argo/argo_workflows_cli.py index fdae0b8a248..300a526f345 100644 --- a/metaflow/plugins/argo/argo_workflows_cli.py +++ b/metaflow/plugins/argo/argo_workflows_cli.py @@ -769,7 +769,22 @@ def _convert_value(param): if kwargs.get(param.name.replace("-", "_").lower()) is not None } - response = ArgoWorkflows.trigger(obj.workflow_name, params) + workflow_name_to_deploy = obj.workflow_name + # For users that upgraded the client but did not redeploy their flow, + # we fallback to old workflow names in case of a conflict. + if obj.workflow_name != obj._v1_workflow_name: + # use the old name only if there exists a deployment. + if ArgoWorkflows.get_existing_deployment(obj._v1_workflow_name): + obj.echo("Warning! ", bold=True, nl=False) + obj.echo( + "Found a deployment of this flow with an old style name, defaulted to triggering *%s*. \nDue to new naming restrictions on Argo Workflows, " + "this flow will have a shorter name with newer\nversions of Metaflow (>=2.13) " + "which will allow it to be triggered through Argo UI as well. " + % obj._v1_workflow_name + ) + obj.echo("re-deploy your flow in order to get rid of this message.") + workflow_name_to_deploy = obj._v1_workflow_name + response = ArgoWorkflows.trigger(workflow_name_to_deploy, params) run_id = "argo-" + response["metadata"]["name"] if run_id_file: @@ -780,7 +795,7 @@ def _convert_value(param): with open(deployer_attribute_file, "w") as f: json.dump( { - "name": obj.workflow_name, + "name": workflow_name_to_deploy, "metadata": get_metadata(), "pathspec": "/".join((obj.flow.name, run_id)), }, @@ -789,7 +804,7 @@ def _convert_value(param): obj.echo( "Workflow *{name}* triggered on Argo Workflows " - "(run-id *{run_id}*).".format(name=obj.workflow_name, run_id=run_id), + "(run-id *{run_id}*).".format(name=workflow_name_to_deploy, run_id=run_id), bold=True, ) @@ -831,26 +846,56 @@ def _token_instructions(flow_name, prev_user): "about production tokens." ) - validate_token(obj.workflow_name, obj.token_prefix, authorize, _token_instructions) - obj.echo("Deleting workflow *{name}*...".format(name=obj.workflow_name), bold=True) + # Cases and expected behaviours: + # old name exists, new name does not exist -> delete old and do not fail on missing new + # old name exists, new name exists -> delete both + # old name does not exist, new name exists -> only try to delete new + # old name does not exist, new name does not exist -> keep previous behaviour where missing deployment raises error for the new name. + def _delete(workflow_name): + validate_token(workflow_name, obj.token_prefix, authorize, _token_instructions) + obj.echo("Deleting workflow *{name}*...".format(name=workflow_name), bold=True) + + schedule_deleted, sensor_deleted, workflow_deleted = ArgoWorkflows.delete( + workflow_name + ) - schedule_deleted, sensor_deleted, workflow_deleted = ArgoWorkflows.delete( - obj.workflow_name - ) + if schedule_deleted: + obj.echo( + "Deleting cronworkflow *{name}*...".format(name=workflow_name), + bold=True, + ) - if schedule_deleted: - obj.echo( - "Deleting cronworkflow *{name}*...".format(name=obj.workflow_name), - bold=True, - ) + if sensor_deleted: + obj.echo( + "Deleting sensor *{name}*...".format(name=workflow_name), + bold=True, + ) + return workflow_deleted + + workflows_deleted = False + cleanup_old_name = False + if obj.workflow_name != obj._v1_workflow_name: + # Only add the old name if there exists a deployment with such name. + # This is due to the way validate_token is tied to an existing deployment. + if ArgoWorkflows.get_existing_deployment(obj._v1_workflow_name) is not None: + cleanup_old_name = True + obj.echo( + "This flow has been deployed with another name in the past due to a limitation with Argo Workflows.\n" + "Will also delete the older deployment." + ) + _delete(obj._v1_workflow_name) + workflows_deleted = True - if sensor_deleted: - obj.echo( - "Deleting sensor *{name}*...".format(name=obj.workflow_name), - bold=True, - ) + # Always try to delete the current name. + # Do not raise exception if we deleted old name before this. + try: + _delete(obj.workflow_name) + workflows_deleted = True + except ArgoWorkflowsException: + if not cleanup_old_name: + raise - if workflow_deleted: + if workflows_deleted: obj.echo( "Deleting Kubernetes resources may take a while. " "Deploying the flow again to Argo Workflows while the delete is in-flight will fail." @@ -889,17 +934,26 @@ def _token_instructions(flow_name, prev_user): "about production tokens." ) - validate_run_id( - obj.workflow_name, obj.token_prefix, authorize, run_id, _token_instructions - ) + workflows = [obj.workflow_name] + if obj.workflow_name != obj._v1_workflow_name: + # Only add the old name if there exists a deployment with such name. + # This is due to the way validate_token is tied to an existing deployment. + if ArgoWorkflows.get_existing_deployment(obj._v1_workflow_name) is not None: + workflows.append(obj._v1_workflow_name) - # Trim prefix from run_id - name = run_id[5:] + for workflow_name in workflows: + validate_run_id( + workflow_name, obj.token_prefix, authorize, run_id, _token_instructions + ) + + # Trim prefix from run_id + name = run_id[5:] - workflow_suspended = ArgoWorkflows.suspend(name) + workflow_suspended = ArgoWorkflows.suspend(name) - if workflow_suspended: - obj.echo("Suspended execution of *%s*" % run_id) + if workflow_suspended: + obj.echo("Suspended execution of *%s*" % run_id) + break # no need to try out all workflow_names if we found the running one. @argo_workflows.command(help="Unsuspend flow execution on Argo Workflows.") @@ -933,17 +987,26 @@ def _token_instructions(flow_name, prev_user): "about production tokens." ) - validate_run_id( - obj.workflow_name, obj.token_prefix, authorize, run_id, _token_instructions - ) + workflows = [obj.workflow_name] + if obj.workflow_name != obj._v1_workflow_name: + # Only add the old name if there exists a deployment with such name. + # This is due to the way validate_token is tied to an existing deployment. + if ArgoWorkflows.get_existing_deployment(obj._v1_workflow_name) is not None: + workflows.append(obj._v1_workflow_name) - # Trim prefix from run_id - name = run_id[5:] + for workflow_name in workflows: + validate_run_id( + workflow_name, obj.token_prefix, authorize, run_id, _token_instructions + ) - workflow_suspended = ArgoWorkflows.unsuspend(name) + # Trim prefix from run_id + name = run_id[5:] - if workflow_suspended: - obj.echo("Unsuspended execution of *%s*" % run_id) + workflow_suspended = ArgoWorkflows.unsuspend(name) + + if workflow_suspended: + obj.echo("Unsuspended execution of *%s*" % run_id) + break # no need to try all workflow_names if we found one. def validate_token(name, token_prefix, authorize, instructions_fn=None): @@ -1051,22 +1114,31 @@ def _token_instructions(flow_name, prev_user): "about production tokens." ) - validate_run_id( - obj.workflow_name, obj.token_prefix, authorize, run_id, _token_instructions - ) + workflows = [obj.workflow_name] + if obj.workflow_name != obj._v1_workflow_name: + # Only add the old name if there exists a deployment with such name. + # This is due to the way validate_token is tied to an existing deployment. + if ArgoWorkflows.get_existing_deployment(obj._v1_workflow_name) is not None: + workflows.append(obj._v1_workflow_name) - # Trim prefix from run_id - name = run_id[5:] - obj.echo( - "Terminating run *{run_id}* for {flow_name} ...".format( - run_id=run_id, flow_name=obj.flow.name - ), - bold=True, - ) + for workflow_name in workflows: + validate_run_id( + workflow_name, obj.token_prefix, authorize, run_id, _token_instructions + ) + + # Trim prefix from run_id + name = run_id[5:] + obj.echo( + "Terminating run *{run_id}* for {flow_name} ...".format( + run_id=run_id, flow_name=obj.flow.name + ), + bold=True, + ) - terminated = ArgoWorkflows.terminate(obj.flow.name, name) - if terminated: - obj.echo("\nRun terminated.") + terminated = ArgoWorkflows.terminate(obj.flow.name, name) + if terminated: + obj.echo("\nRun terminated.") + break # no need to try all workflow_names if we found the running one. @argo_workflows.command(help="List Argo Workflow templates for the flow.")