Skip to content

Commit

Permalink
Test commit
Browse files Browse the repository at this point in the history
  • Loading branch information
talsperre committed Oct 18, 2024
1 parent ee1e09d commit 60eba72
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 36 deletions.
26 changes: 21 additions & 5 deletions metaflow/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,11 @@ def cli(ctx):
pass


# @cli.group(help="Commands related to Spin step")
# def spin():
# pass


@cli.command(help="Check that the flow is valid (default).")
@click.option(
"--warnings/--no-warnings",
Expand Down Expand Up @@ -324,7 +329,7 @@ def dump(obj, input_path, private=None, max_value_size=None, include=None, file=
"the run.",
)
@click.option(
"--run-id",
"--prev-run-id",
default=None,
required=True,
help="Run ID of a previous execution to fetch the artifacts from.",
Expand Down Expand Up @@ -389,7 +394,7 @@ def spin(
step_name,
tags=None,
decospecs=None,
run_id=None,
prev_run_id=None,
ancestor_tasks=None,
foreach_index=None,
foreach_value=None,
Expand All @@ -410,7 +415,7 @@ def spin(
spin_parser_validator = SpinParserValidator(
ctx,
step_name,
run_id,
prev_run_id,
ancestor_tasks=ancestor_tasks,
artifacts=artifacts,
artifacts_module=artifacts_module,
Expand All @@ -422,6 +427,7 @@ def spin(
start_time = time.time()
spin_parser_validator.validate()
end_validation_time = time.time()
ctx.obj.echo(f"Validation Time: {end_validation_time - start_time}")

# We now set the parameters, step_name, and the constants for the flow
ctx.obj.step_name = step_name
Expand Down Expand Up @@ -453,6 +459,10 @@ def spin(
step_func=step_func,
)
spin_runtime.execute()
end_spin_init_time = time.time()
ctx.obj.echo(
f"Spin Runtime Initialization Time: {end_spin_init_time - end_validation_time}"
)

# task = MetaflowTask(
# ctx.obj.flow,
Expand Down Expand Up @@ -504,6 +514,14 @@ def spin(
# print(f"Total Time: {end_time - start_time}")


# @spin.command(help="Internal command to execute a single spin task.", hidden=True)
# @click.argument("step-name")
# def spin_internal(
# ctx,
# step_name,
# ):
# pass

# TODO - move step and init under a separate 'internal' subcommand


Expand Down Expand Up @@ -1269,7 +1287,6 @@ def start(
# things they provide may be used by some of the objects initialized after.
# In case of spin subcommand, we want to initialize flow decorators after
# the metadata, environment, and datastore are modified.
print(f"Before init flow decorators: {ctx.obj.flow}")
decorators._init_flow_decorators(
ctx.obj.flow,
ctx.obj.graph,
Expand All @@ -1280,7 +1297,6 @@ def start(
echo,
deco_options,
)
print(f"After init flow decorators: {ctx.obj.flow}")

if ctx.invoked_subcommand not in ("run", "resume", "spin"):
# run/resume/spin are special cases because they can add more decorators with
Expand Down
58 changes: 29 additions & 29 deletions metaflow/runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,28 +111,31 @@ def step_name(self):

@property
def input_paths(self):
def _format_input_paths(task):
return "/".join(task.path_components[1:])

if self._input_paths:
return self._input_paths

# Special logic for start step
if self.step_name == "start":
from metaflow import Step

task = Step(f"{self._flow.name}/{self.prev_run_id}/_parameters").task
self._input_paths = [_format_input_paths(task)]
elif self.spin_parser_validator.step_type == "join":
self._input_paths = []
for task in self.join_inputs_datastore.get_previous_tasks:
self._input_paths.append(_format_input_paths(task))
else:
self._input_paths = [
_format_input_paths(self.step_datastore.previous_task())
]
print(f"Input paths are: {self._input_paths}")
return self._input_paths
# For spin steps, we don't need to fetch the input paths
# But if we do need to fetch the input paths, we can use the following code
# def _format_input_paths(task):
# return "/".join(task.path_components[1:])
#
# if self._input_paths:
# return self._input_paths
#
# start_time = time.time()
# # Special logic for start step
# if self.step_name == "start":
# from metaflow import Step
#
# task = Step(f"{self._flow.name}/{self.prev_run_id}/_parameters").task
# self._input_paths = [_format_input_paths(task)]
# elif self.spin_parser_validator.step_type == "join":
# self._input_paths = []
# for task in self.join_inputs_datastore.get_previous_tasks:
# self._input_paths.append(_format_input_paths(task))
# else:
# self._input_paths = [
# _format_input_paths(self.step_datastore.previous_task())
# ]
# return self._input_paths
return []

@property
def split_index(self):
Expand All @@ -158,16 +161,12 @@ def _new_task(self, step, input_paths=None, **kwargs):
)

def execute(self):
# We also create a new task_id for the spin task
self._new_task_id = str(
self._metadata.new_task_id(self._new_run_id, self.step_name)
)
print(f"New task id is: {self._new_task_id}")

task = self._new_task(self.step_name, {})
print(f"New task id is: {task.task_id}")
for deco in self.spin_parser_validator.step_decorators:
deco.runtime_task_created(
self._ds,
task.task_id,
self._new_task_id,
self.split_index,
self.input_paths,
Expand All @@ -187,7 +186,7 @@ def execute(self):
)

# We now set the command to be spin_internal to execute our spin step
cli_args.commands = ["spin-internal"]
args.commands = ["spin-internal"]
env.update(args.get_env())
env["PYTHONUNBUFFERED"] = "x"
cmdline = args.get_args()
Expand Down Expand Up @@ -1635,6 +1634,7 @@ def _options(mapping):
args.extend(self.commands)
args.extend(self.command_args)
args.extend(_options(self.command_options))
print(f"Command Options: {self.command_options}")
return args

def get_env(self):
Expand Down
4 changes: 2 additions & 2 deletions metaflow/spin_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ def __init__(
self,
ctx,
step_name,
run_id,
prev_run_id,
ancestor_tasks=None,
artifacts=None,
artifacts_module=None,
Expand All @@ -19,7 +19,7 @@ def __init__(
):
self.ctx = ctx
self.step_name = step_name
self.run_id = run_id
self.run_id = prev_run_id
self.ancestor_tasks = ancestor_tasks
self.artifacts = artifacts if artifacts else []
self.artifacts_module = artifacts_module
Expand Down

0 comments on commit 60eba72

Please sign in to comment.