Skip to content

Commit

Permalink
chore(sdk): local task execution refactor + cleanup (kubeflow#10420)
Browse files Browse the repository at this point in the history
  • Loading branch information
connor-mccarthy authored Jan 23, 2024
1 parent ddb2f9a commit a990446
Show file tree
Hide file tree
Showing 12 changed files with 198 additions and 38 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -85,3 +85,6 @@ __pycache__
# Coverage
.coverage
.coverage*

# kfp local execution default directory
local_outputs/
2 changes: 1 addition & 1 deletion sdk/python/kfp/dsl/pipeline_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ def _execute_locally(self, args: Dict[str, Any]) -> None:
raise NotImplementedError(
'Local pipeline execution is not currently supported.')

self._outputs = task_dispatcher.run_single_component(
self._outputs = task_dispatcher.run_single_task(
pipeline_spec=self.component_spec.to_pipeline_spec(),
arguments=args,
)
Expand Down
9 changes: 9 additions & 0 deletions sdk/python/kfp/local/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
import os
from typing import Union

from kfp import local


class LocalRunnerType(abc.ABC):
"""The ABC for user-facing Runner configurations.
Expand Down Expand Up @@ -85,6 +87,13 @@ def __init__(
self.pipeline_root = pipeline_root
self.raise_on_error = raise_on_error

@classmethod
def validate(cls):
if cls.instance is None:
raise RuntimeError(
f"Local environment not initialized. Please run '{local.__name__}.{init.__name__}()' before executing tasks locally."
)


def init(
# annotate with subclasses, not parent class, for more helpful ref docs
Expand Down
15 changes: 15 additions & 0 deletions sdk/python/kfp/local/config_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,21 @@ def test_local_runner_config_is_singleton(self):
local.SubprocessRunner(use_venv=False))
self.assertFalse(instance.raise_on_error, False)

def test_validate_success(self):
config.LocalExecutionConfig(
pipeline_root='other/local/root',
runner=local.SubprocessRunner(use_venv=False),
raise_on_error=False,
)
config.LocalExecutionConfig.validate()

def test_validate_fail(self):
with self.assertRaisesRegex(
RuntimeError,
f"Local environment not initialized. Please run 'kfp\.local\.init\(\)' before executing tasks locally\."
):
config.LocalExecutionConfig.validate()


class TestInitCalls(unittest.TestCase):

Expand Down
7 changes: 4 additions & 3 deletions sdk/python/kfp/local/executor_input_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from typing import Any, Dict

from google.protobuf import json_format
from google.protobuf import struct_pb2
from kfp.compiler import pipeline_spec_builder
from kfp.dsl import utils
from kfp.pipeline_spec import pipeline_spec_pb2
Expand Down Expand Up @@ -60,7 +61,7 @@ def construct_executor_input(
for param_name in output_parameter_keys
},
artifacts={
artifact_name: make_artifact_list(
artifact_name: artifact_type_schema_to_artifact_list(
name=artifact_name,
artifact_type=artifact_spec.artifact_type,
task_root=task_root,
Expand Down Expand Up @@ -116,7 +117,7 @@ def construct_local_task_root(
)


def make_artifact_list(
def artifact_type_schema_to_artifact_list(
name: str,
artifact_type: pipeline_spec_pb2.ArtifactTypeSchema,
task_root: str,
Expand All @@ -128,7 +129,7 @@ def make_artifact_list(
type=artifact_type,
uri=os.path.join(task_root, name),
# metadata always starts empty for output artifacts
metadata={},
metadata=struct_pb2.Struct(),
)
])

Expand Down
14 changes: 14 additions & 0 deletions sdk/python/kfp/local/logging_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from typing import Any, Dict, Generator, List

from kfp import dsl
from kfp.local import status


class Color:
Expand Down Expand Up @@ -139,3 +140,16 @@ def make_log_lines_for_outputs(outputs: Dict[str, Any]) -> List[str]:
output_lines.append(f'{key_chars}{value}')

return output_lines


def format_task_name(task_name: str) -> str:
return color_text(f'{task_name!r}', Color.CYAN)


def format_status(task_status: status.Status) -> str:
if task_status == status.Status.SUCCESS:
return color_text(task_status.name, Color.GREEN)
elif task_status == status.Status.FAILURE:
return color_text(task_status.name, Color.RED)
else:
raise ValueError(f'Got unknown status: {task_status}')
27 changes: 27 additions & 0 deletions sdk/python/kfp/local/logging_utils_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

from kfp import dsl
from kfp.local import logging_utils
from kfp.local import status


class TestIndentedPrint(unittest.TestCase):
Expand Down Expand Up @@ -202,5 +203,31 @@ def test_mix_params_and_artifacts(self):
self.assertListEqual(actual, expected)


class TestFormatStatus(unittest.TestCase):

def test_success_status(self):
self.assertEqual(
logging_utils.format_status(status.Status.SUCCESS),
'\x1b[92mSUCCESS\x1b[0m')

def test_failure_status(self):
self.assertEqual(
logging_utils.format_status(status.Status.FAILURE),
'\x1b[91mFAILURE\x1b[0m')

def test_invalid_status(self):
with self.assertRaisesRegex(ValueError,
r'Got unknown status: INVALID_STATUS'):
logging_utils.format_status('INVALID_STATUS')


class TestFormatTaskName(unittest.TestCase):

def test(self):
self.assertEqual(
logging_utils.format_task_name('my-task'),
'\x1b[96m\'my-task\'\x1b[0m')


if __name__ == '__main__':
unittest.main()
10 changes: 8 additions & 2 deletions sdk/python/kfp/local/placeholder_utils_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,12 @@
},
'uri':
'/foo/bar/my-pipeline-2023-10-10-13-32-59-420710/comp/out_a',
# include metadata on outputs since it allows us to
# test the placeholder
# "{{$.outputs.artifacts[''out_a''].metadata[''foo'']}}"
# for comprehensive testing, but in practice metadata
# will never be set on output artifacts since they
# haven't been created yet
'metadata': {
'foo': {
'bar': 'baz'
Expand All @@ -62,7 +68,8 @@
'outputFile':
'/foo/bar/my-pipeline-2023-10-10-13-32-59-420710/comp/executor_output.json'
}
}, executor_input)
},
executor_input)

EXECUTOR_INPUT_DICT = json_format.MessageToDict(executor_input)

Expand Down Expand Up @@ -96,7 +103,6 @@ def test(self):
class TestResolveIndividualPlaceholder(parameterized.TestCase):

# TODO: consider supporting JSON escape
# TODO: update when input artifact constants supported
# TODO: update when output lists of artifacts are supported
@parameterized.parameters([
(
Expand Down
76 changes: 45 additions & 31 deletions sdk/python/kfp/local/task_dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# limitations under the License.
"""Code for dispatching a local task execution."""
import logging
from typing import Any, Dict
from typing import Any, Dict, Tuple

from kfp import local
from kfp.local import config
Expand All @@ -25,10 +25,11 @@
from kfp.local import status
from kfp.local import subprocess_task_handler
from kfp.local import task_handler_interface
from kfp.local import utils
from kfp.pipeline_spec import pipeline_spec_pb2


def run_single_component(
def run_single_task(
pipeline_spec: pipeline_spec_pb2.PipelineSpec,
arguments: Dict[str, Any],
) -> Dict[str, Any]:
Expand All @@ -41,36 +42,59 @@ def run_single_component(
Returns:
A LocalTask instance.
"""
if config.LocalExecutionConfig.instance is None:
raise RuntimeError(
f"Local environment not initialized. Please run '{local.__name__}.{local.init.__name__}()' before executing tasks locally."
)
config.LocalExecutionConfig.validate()
component_name, component_spec = list(pipeline_spec.components.items())[0]
executor_spec = get_executor_spec(
pipeline_spec,
component_spec.executor_label,
)
executor_spec = utils.struct_to_executor_spec(executor_spec)

# all global state should be accessed here
# do not access local config state downstream
return _run_single_component_implementation(
pipeline_spec=pipeline_spec,
outputs, _ = _run_single_task_implementation(
pipeline_name=pipeline_spec.pipeline_info.name,
component_name=component_name,
component_spec=component_spec,
executor_spec=executor_spec,
arguments=arguments,
pipeline_root=config.LocalExecutionConfig.instance.pipeline_root,
runner=config.LocalExecutionConfig.instance.runner,
raise_on_error=config.LocalExecutionConfig.instance.raise_on_error,
)
return outputs


def _run_single_component_implementation(
def get_executor_spec(
pipeline_spec: pipeline_spec_pb2.PipelineSpec,
executor_label: str,
) -> pipeline_spec_pb2.PipelineDeploymentConfig.ExecutorSpec:
return pipeline_spec.deployment_spec['executors'][executor_label]


Outputs = Dict[str, Any]


def _run_single_task_implementation(
pipeline_name: str,
component_name: str,
component_spec: pipeline_spec_pb2.ComponentSpec,
executor_spec: pipeline_spec_pb2.PipelineDeploymentConfig.ExecutorSpec,
arguments: Dict[str, Any],
pipeline_root: str,
runner: config.LocalRunnerType,
raise_on_error: bool,
) -> Dict[str, Any]:
"""The implementation of a single component runner."""
) -> Tuple[Outputs, status.Status]:
"""The implementation of a single component runner.
component_name, component_spec = list(pipeline_spec.components.items())[0]
Returns a tuple of (outputs, status). If status is FAILURE, outputs
is an empty dictionary.
"""

pipeline_resource_name = executor_input_utils.get_local_pipeline_resource_name(
pipeline_spec.pipeline_info.name)
task_resource_name = executor_input_utils.get_local_task_resource_name(
component_name)
pipeline_resource_name = executor_input_utils.get_local_pipeline_resource_name(
pipeline_name)
task_root = executor_input_utils.construct_local_task_root(
pipeline_root=pipeline_root,
pipeline_resource_name=pipeline_resource_name,
Expand All @@ -82,15 +106,9 @@ def _run_single_component_implementation(
task_root=task_root,
)

executor_spec = pipeline_spec.deployment_spec['executors'][
component_spec.executor_label]

container = executor_spec['container']
image = container['image']

command = list(container['command']) if 'command' in container else []
args = list(container['args']) if 'args' in container else []
full_command = command + args
container = executor_spec.container
image = container.image
full_command = list(container.command) + list(container.args)

executor_input_dict = executor_input_utils.executor_input_to_dict(
executor_input=executor_input,
Expand All @@ -115,10 +133,7 @@ def _run_single_component_implementation(
TaskHandler = task_handler_map[runner_type]

with logging_utils.local_logger_context():
task_name_for_logs = logging_utils.color_text(
f'{task_resource_name!r}',
logging_utils.Color.CYAN,
)
task_name_for_logs = logging_utils.format_task_name(task_resource_name)

logging.info(f'Executing task {task_name_for_logs}')
task_handler = TaskHandler(
Expand All @@ -137,7 +152,7 @@ def _run_single_component_implementation(

if task_status == status.Status.SUCCESS:
logging.info(
f'Task {task_name_for_logs} finished with status {logging_utils.color_text(task_status.value, logging_utils.Color.GREEN)}'
f'Task {task_name_for_logs} finished with status {logging_utils.format_status(task_status)}'
)

outputs = executor_output_utils.get_outputs_for_task(
Expand All @@ -148,14 +163,13 @@ def _run_single_component_implementation(
output_string = [
f'Task {task_name_for_logs} outputs:',
*logging_utils.make_log_lines_for_outputs(outputs),
'\n',
]
logging.info('\n'.join(output_string))
else:
logging.info(f'Task {task_name_for_logs} has no outputs')

elif task_status == status.Status.FAILURE:
msg = f'Task {task_name_for_logs} finished with status {logging_utils.color_text(task_status.value, logging_utils.Color.RED)}'
msg = f'Task {task_name_for_logs} finished with status {logging_utils.format_status(task_status)}'
if raise_on_error:
raise RuntimeError(msg)
else:
Expand All @@ -166,4 +180,4 @@ def _run_single_component_implementation(
# for developers; user should never hit this
raise ValueError(f'Got unknown status: {task_status}')

return outputs
return outputs, task_status
2 changes: 1 addition & 1 deletion sdk/python/kfp/local/task_dispatcher_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ def many_type_component(
r'Wrote executor output file to',
r'.*',
r"\d+:\d+:\d+\.\d+ - INFO - Task \x1b\[96m'many-type-component'\x1b\[0m finished with status \x1b\[92mSUCCESS\x1b\[0m\n",
r"\d+:\d+:\d+\.\d+ - INFO - Task \x1b\[96m'many-type-component'\x1b\[0m outputs:\n Output: 'hellohello'\n model: Model\( name='model',\n uri='[a-zA-Z0-9/_\.-]+/local_outputs/many-type-component-\d+-\d+-\d+-\d+-\d+-\d+-\d+/many-type-component/model',\n metadata={'foo': 'bar'} \)\n\n",
r"\d+:\d+:\d+\.\d+ - INFO - Task \x1b\[96m'many-type-component'\x1b\[0m outputs:\n Output: 'hellohello'\n model: Model\( name='model',\n uri='[a-zA-Z0-9/_\.-]+/local_outputs/many-type-component-\d+-\d+-\d+-\d+-\d+-\d+-\d+/many-type-component/model',\n metadata={'foo': 'bar'} \)\n",
]

self.assertRegex(
Expand Down
26 changes: 26 additions & 0 deletions sdk/python/kfp/local/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# Copyright 2024 The Kubeflow Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Assorted utilities."""

from google.protobuf import json_format
from google.protobuf import struct_pb2
from kfp.pipeline_spec import pipeline_spec_pb2


def struct_to_executor_spec(
struct: struct_pb2.Struct,
) -> pipeline_spec_pb2.PipelineDeploymentConfig.ExecutorSpec:
executor_spec = pipeline_spec_pb2.PipelineDeploymentConfig.ExecutorSpec()
json_format.ParseDict(json_format.MessageToDict(struct), executor_spec)
return executor_spec
Loading

0 comments on commit a990446

Please sign in to comment.