Skip to content

Commit

Permalink
SpinRuntime kind of works
Browse files Browse the repository at this point in the history
  • Loading branch information
talsperre committed Oct 18, 2024
1 parent 60eba72 commit 517203d
Show file tree
Hide file tree
Showing 2 changed files with 281 additions and 46 deletions.
218 changes: 203 additions & 15 deletions metaflow/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,11 +151,6 @@ def cli(ctx):
pass


# @cli.group(help="Commands related to Spin step")
# def spin():
# pass


@cli.command(help="Check that the flow is valid (default).")
@click.option(
"--warnings/--no-warnings",
Expand Down Expand Up @@ -368,9 +363,9 @@ def dump(obj, input_path, private=None, max_value_size=None, include=None, file=
help="A python module that contains the artifacts in a dictionary called `mf_artifacts`.",
)
@click.option(
"--skip-decorators",
"--skip-decorators/--no-skip-decorators",
is_flag=True,
default=False,
default=True,
show_default=True,
help="Skip decorators attached to the step.",
)
Expand Down Expand Up @@ -405,7 +400,6 @@ def spin(
runner_attribute_file=None,
**kwargs,
):
# TODO: skip_decorators is not used for now, but we might need it in the future
from .spin_utils import SpinParserValidator
from metaflow.datastore.spin_datastore.step_datastore import SpinStepDatastore
from metaflow.datastore.spin_datastore.inputs_datastore import SpinInputsDatastore
Expand All @@ -432,6 +426,7 @@ def spin(
# We now set the parameters, step_name, and the constants for the flow
ctx.obj.step_name = step_name
ctx.obj.flow._set_constants(ctx.obj.graph, kwargs)

step_datastore, join_inputs_datastore = None, None
# We now generate the input datastore for our spin task
if spin_parser_validator.step_type == "join":
Expand All @@ -457,6 +452,8 @@ def spin(
step_datastore=step_datastore,
join_inputs_datastore=join_inputs_datastore,
step_func=step_func,
run_id_file=run_id_file,
runner_attribute_file=runner_attribute_file,
)
spin_runtime.execute()
end_spin_init_time = time.time()
Expand Down Expand Up @@ -514,13 +511,204 @@ def spin(
# print(f"Total Time: {end_time - start_time}")


# @spin.command(help="Internal command to execute a single spin task.", hidden=True)
# @click.argument("step-name")
# def spin_internal(
# ctx,
# step_name,
# ):
# pass
@cli.command(help="Internal command to execute a single spin task.", hidden=True)
@click.argument("step-name")
@click.option(
"--run-id",
default=None,
required=True,
help="ID for one execution of the spin step of the flow.",
)
@click.option(
"--task-id",
default=None,
required=True,
show_default=True,
help="ID for this instance of the step.",
)
@click.option(
"--input-paths",
help="A comma-separated list of pathspecs specifying inputs for this step.",
)
@click.option(
"--retry-count",
default=0,
help="How many times we have attempted to run this task.",
)
@click.option(
"--max-user-code-retries",
default=0,
help="How many times we should attempt running the user code.",
)
@click.option(
"--namespace",
"opt_namespace",
default=None,
help="Change namespace from the default (your username) to the specified tag.",
)
@click.option(
"--prev-run-id",
default=None,
required=True,
help="Run ID of a previous execution to fetch the artifacts from.",
)
@click.option(
"--ancestor-tasks",
type=str,
default=None,
show_default=True,
help="A JSON string consisting of the key-value pairs of the ancestor tasks.",
)
@click.option(
"--foreach-index",
type=int,
default=None,
show_default=True,
help="For-each index to use for the spin task",
)
@click.option(
"--foreach-value",
type=int,
default=None,
show_default=True,
help="The value of the for-each variable to use for the spin task",
)
@click.option(
"--artifacts",
type=str,
default=None,
help="A JSON string consisting of the key-value pairs of the artifacts.",
)
@click.option(
"--artifacts-module",
type=str,
default=None,
help="A python module that contains the artifacts in a dictionary called `mf_artifacts`.",
)
@click.option(
"--skip-decorators/--no-skip-decorators",
is_flag=True,
default=True,
show_default=True,
help="Skip decorators attached to the step.",
)
@click.option(
"--run-id-file",
default=None,
show_default=True,
type=str,
help="Write the ID of this run to the file specified.",
)
@click.option(
"--runner-attribute-file",
default=None,
show_default=True,
type=str,
help="Write the metadata and pathspec of this run to the file specified. Used internally for Metaflow's Runner API.",
)
@click.pass_context
def spin_internal(
ctx,
step_name,
run_id=None,
task_id=None,
input_paths=None,
retry_count=None,
max_user_code_retries=None,
opt_namespace=None,
tags=None,
decospecs=None,
prev_run_id=None,
ancestor_tasks=None,
foreach_index=None,
foreach_value=None,
artifacts=None,
artifacts_module=None,
skip_decorators=False,
run_id_file=None,
runner_attribute_file=None,
**kwargs,
):
ctx.obj.echo("Running *spin_internal*")

from .spin_utils import SpinParserValidator
from metaflow.datastore.spin_datastore.step_datastore import SpinStepDatastore
from metaflow.datastore.spin_datastore.inputs_datastore import SpinInputsDatastore

# We first validate that appropriate parameters are passed
step_func = getattr(ctx.obj.flow, step_name)
spin_parser_validator = SpinParserValidator(
ctx,
step_name,
prev_run_id,
ancestor_tasks=ancestor_tasks,
artifacts=artifacts,
artifacts_module=artifacts_module,
foreach_index=foreach_index,
foreach_value=foreach_value,
skip_decorators=skip_decorators,
step_func=step_func,
)
start_time = time.time()
spin_parser_validator.validate()
end_validation_time = time.time()
ctx.obj.echo(f"Validation Time: {end_validation_time - start_time}")

task = MetaflowTask(
ctx.obj.flow,
ctx.obj.flow_datastore, # local datastore
ctx.obj.metadata, # local metadata provider
ctx.obj.environment, # local environment
ctx.obj.echo,
ctx.obj.event_logger, # null logger
ctx.obj.monitor, # null monitor
None, # no unbounded foreach context
)
end_task_init_time = time.time()
ctx.obj.logger(
f"Spinning up step `{step_name}` with run_id {run_id} and task_id {task_id}",
system_msg=True,
)

write_latest_run_id(ctx.obj, run_id)
write_file(run_id_file, run_id)

local_metadata = (
f"{ctx.obj.metadata.__class__.TYPE}@{ctx.obj.metadata.__class__.INFO}"
)
if runner_attribute_file:
with open(runner_attribute_file, "w") as f:
json.dump(
{
"task_id": new_task_id,
"step_name": step_name,
"run_id": new_run_id,
"flow_name": ctx.obj.flow.name,
"metadata": local_metadata,
},
f,
)

step_datastore, join_inputs_datastore = None, None
# We now generate the input datastore for our spin task
if spin_parser_validator.step_type == "join":
join_inputs_datastore = SpinInputsDatastore(spin_parser_validator)
else:
step_datastore = SpinStepDatastore(spin_parser_validator)

task.run_spin_step(
spin_parser_validator=spin_parser_validator,
new_task_id=task_id,
new_run_id=run_id,
step_datastore=step_datastore,
join_inputs_datastore=join_inputs_datastore,
)
print("-" * 100)
end_time = time.time()
print(f"Total Time: {end_time - start_time}")

# print(f"ctx.obj.flow._graph_info: {ctx.obj.flow._graph_info}")


# TODO - move step and init under a separate 'internal' subcommand

Expand Down
Loading

0 comments on commit 517203d

Please sign in to comment.