From ec907fc725c5b6cd7851ed4747c8824b561d9c73 Mon Sep 17 00:00:00 2001 From: Hamza Tahir Date: Mon, 28 Apr 2025 08:21:07 +0200 Subject: [PATCH 01/15] Update Skypilot orchestrator settings and features --- .../skypilot_orchestrator_base_vm_config.py | 53 ++++++++++++------- .../skypilot_base_vm_orchestrator.py | 28 +++++++++- .../skypilot_orchestrator_entrypoint.py | 26 ++++++++- 3 files changed, 85 insertions(+), 22 deletions(-) diff --git a/src/zenml/integrations/skypilot/flavors/skypilot_orchestrator_base_vm_config.py b/src/zenml/integrations/skypilot/flavors/skypilot_orchestrator_base_vm_config.py index c80926620ac..1a417147edd 100644 --- a/src/zenml/integrations/skypilot/flavors/skypilot_orchestrator_base_vm_config.py +++ b/src/zenml/integrations/skypilot/flavors/skypilot_orchestrator_base_vm_config.py @@ -13,9 +13,7 @@ # permissions and limitations under the License. """Skypilot orchestrator base config and settings.""" -from typing import Dict, List, Literal, Optional, Union - -from pydantic import Field +from typing import Any, Dict, List, Literal, Optional, Union from zenml.config.base_settings import BaseSettings from zenml.logger import get_logger @@ -67,6 +65,14 @@ class SkypilotBaseOrchestratorSettings(BaseSettings): disk_size: the size of the OS disk in GiB. disk_tier: the disk performance tier to use. If None, defaults to ``'medium'``. + ports: Ports to expose. Could be an integer, a range, or a list of + integers and ranges. All ports will be exposed to the public internet. + labels: Labels to apply to instances as key-value pairs. These are + mapped to cloud-specific implementations (instance tags in AWS, + instance labels in GCP, etc.) + any_of: List of candidate resources to try in order of preference based on + cost (determined by the optimizer). + ordered: List of candidate resources to try in the specified order. cluster_name: name of the cluster to create/reuse. If None, auto-generate a name. @@ -88,29 +94,32 @@ class SkypilotBaseOrchestratorSettings(BaseSettings): stream_logs: if True, show the logs in the terminal. docker_run_args: Optional arguments to pass to the `docker run` command running inside the VM. + workdir: Working directory to sync to the VM. Synced to ~/sky_workdir. + task_name: Task name used for display purposes. + num_nodes: Number of nodes to launch (including the head node). + file_mounts: File and storage mounts configuration for remote cluster. + envs: Environment variables for the task. Accessible in setup/run. """ # Resources instance_type: Optional[str] = None - cpus: Union[None, int, float, str] = Field( - default=None, union_mode="left_to_right" - ) - memory: Union[None, int, float, str] = Field( - default=None, union_mode="left_to_right" - ) - accelerators: Union[None, str, Dict[str, int]] = Field( - default=None, union_mode="left_to_right" - ) - accelerator_args: Optional[Dict[str, str]] = None + cpus: Union[None, int, float, str] = None + memory: Union[None, int, float, str] = None + accelerators: Union[None, str, Dict[str, int], List[str]] = None + accelerator_args: Optional[Dict[str, Any]] = None use_spot: Optional[bool] = None - job_recovery: Optional[str] = None + job_recovery: Union[None, str, Dict[str, Any]] = None region: Optional[str] = None zone: Optional[str] = None - image_id: Union[Dict[str, str], str, None] = Field( - default=None, union_mode="left_to_right" - ) + image_id: Union[Dict[str, str], str, None] = None disk_size: Optional[int] = None - disk_tier: Optional[Literal["high", "medium", "low"]] = None + disk_tier: Optional[Literal["high", "medium", "low", "ultra", "best"]] = ( + None + ) + ports: Union[None, int, str, List[Union[int, str]]] = None + labels: Optional[Dict[str, str]] = None + any_of: Optional[List[Dict[str, Any]]] = None + ordered: Optional[List[Dict[str, Any]]] = None # Run settings cluster_name: Optional[str] = None @@ -118,9 +127,15 @@ class SkypilotBaseOrchestratorSettings(BaseSettings): idle_minutes_to_autostop: Optional[int] = 30 down: bool = True stream_logs: bool = True - docker_run_args: List[str] = [] + # Additional SkyPilot features + workdir: Optional[str] = None + task_name: Optional[str] = None + num_nodes: Optional[int] = None + file_mounts: Optional[Dict[str, Any]] = None + envs: Optional[Dict[str, str]] = None + class SkypilotBaseOrchestratorConfig( BaseOrchestratorConfig, SkypilotBaseOrchestratorSettings diff --git a/src/zenml/integrations/skypilot/orchestrators/skypilot_base_vm_orchestrator.py b/src/zenml/integrations/skypilot/orchestrators/skypilot_base_vm_orchestrator.py index 913b91b65c4..303efb1b52b 100644 --- a/src/zenml/integrations/skypilot/orchestrators/skypilot_base_vm_orchestrator.py +++ b/src/zenml/integrations/skypilot/orchestrators/skypilot_base_vm_orchestrator.py @@ -294,13 +294,29 @@ def prepare_or_run_pipeline( run_command = f"sudo docker run --rm {custom_run_args}{docker_environment_str} {image} {entrypoint_str} {arguments_str}" down = settings.down idle_minutes_to_autostop = settings.idle_minutes_to_autostop + + # Merge envs from settings with existing task_envs + merged_envs = {} + # First add user-provided envs + if settings.envs: + merged_envs.update(settings.envs) + # Then add task_envs which take precedence + if task_envs: + merged_envs.update(task_envs) + + # Create the Task with all parameters task = sky.Task( run=run_command, setup=setup, - envs=task_envs, + envs=merged_envs, + name=settings.task_name or f"{orchestrator_run_name}", + workdir=settings.workdir, + file_mounts=settings.file_mounts, ) + logger.debug(f"Running run: {run_command}") + # Set resources with all parameters task = task.set_resources( sky.Resources( cloud=self.cloud, @@ -318,13 +334,20 @@ def prepare_or_run_pipeline( else settings.image_id, disk_size=settings.disk_size, disk_tier=settings.disk_tier, + ports=settings.ports, + labels=settings.labels, + any_of=settings.any_of, + ordered=settings.ordered, ) ) + # Do not detach run if logs are being streamed # Otherwise, the logs will not be streamed after the task is submitted - # Could also be a parameter in the settings to control this behavior detach_run = not settings.stream_logs + # Use num_nodes from settings or default to 1 + num_nodes = settings.num_nodes or 1 + launch_new_cluster = True if settings.cluster_name: cluster_info = sky.status( @@ -360,6 +383,7 @@ def prepare_or_run_pipeline( backend=None, detach_setup=True, detach_run=detach_run, + num_nodes=num_nodes, ) else: # Make sure the cluster is up - diff --git a/src/zenml/integrations/skypilot/orchestrators/skypilot_orchestrator_entrypoint.py b/src/zenml/integrations/skypilot/orchestrators/skypilot_orchestrator_entrypoint.py index 8cfe80a1d13..47c6b0592ba 100644 --- a/src/zenml/integrations/skypilot/orchestrators/skypilot_orchestrator_entrypoint.py +++ b/src/zenml/integrations/skypilot/orchestrators/skypilot_orchestrator_entrypoint.py @@ -195,12 +195,27 @@ def run_step_on_skypilot_vm(step_name: str) -> None: # Set up the task run_command = f"docker run --rm {custom_run_args}{docker_environment_str} {image} {entrypoint_str} {arguments_str}" task_name = f"{deployment.id}-{step_name}-{time.time()}" + + # Merge envs from settings with existing task_envs + merged_envs = {} + # First add user-provided envs + if settings.envs: + merged_envs.update(settings.envs) + # Then add task_envs which take precedence + if task_envs: + merged_envs.update(task_envs) + + # Create the Task with all parameters task = sky.Task( run=run_command, setup=setup, - envs=task_envs, + envs=merged_envs, name=task_name, + workdir=settings.workdir, + file_mounts=settings.file_mounts, ) + + # Set resources with all parameters task = task.set_resources( sky.Resources( cloud=orchestrator.cloud, @@ -217,9 +232,16 @@ def run_step_on_skypilot_vm(step_name: str) -> None: region=settings.region, zone=settings.zone, image_id=settings.image_id, + ports=settings.ports, + labels=settings.labels, + any_of=settings.any_of, + ordered=settings.ordered, ) ) + # Use num_nodes from settings or default to 1 + num_nodes = settings.num_nodes or 1 + sky.launch( task, cluster_name, @@ -227,8 +249,10 @@ def run_step_on_skypilot_vm(step_name: str) -> None: idle_minutes_to_autostop=settings.idle_minutes_to_autostop, down=settings.down, stream_logs=settings.stream_logs, + backend=None, detach_setup=True, detach_run=True, + num_nodes=num_nodes, ) # Wait for pod to finish. From 333127e2222d969da81c5241919176fdd5970a01 Mon Sep 17 00:00:00 2001 From: Hamza Tahir Date: Mon, 28 Apr 2025 08:25:10 +0200 Subject: [PATCH 02/15] Add support for arbitrary task, resource, launch settings --- .../skypilot_orchestrator_base_vm_config.py | 14 ++ .../skypilot_base_vm_orchestrator.py | 127 +++++++++++------- .../skypilot_orchestrator_entrypoint.py | 103 ++++++++------ 3 files changed, 158 insertions(+), 86 deletions(-) diff --git a/src/zenml/integrations/skypilot/flavors/skypilot_orchestrator_base_vm_config.py b/src/zenml/integrations/skypilot/flavors/skypilot_orchestrator_base_vm_config.py index 1a417147edd..e81720fd0db 100644 --- a/src/zenml/integrations/skypilot/flavors/skypilot_orchestrator_base_vm_config.py +++ b/src/zenml/integrations/skypilot/flavors/skypilot_orchestrator_base_vm_config.py @@ -99,6 +99,15 @@ class SkypilotBaseOrchestratorSettings(BaseSettings): num_nodes: Number of nodes to launch (including the head node). file_mounts: File and storage mounts configuration for remote cluster. envs: Environment variables for the task. Accessible in setup/run. + task_settings: Dictionary of arbitrary settings to pass to sky.Task(). + This allows passing future parameters added by SkyPilot without + requiring updates to ZenML. + resources_settings: Dictionary of arbitrary settings to pass to + sky.Resources(). This allows passing future parameters added + by SkyPilot without requiring updates to ZenML. + launch_settings: Dictionary of arbitrary settings to pass to + sky.launch(). This allows passing future parameters added + by SkyPilot without requiring updates to ZenML. """ # Resources @@ -136,6 +145,11 @@ class SkypilotBaseOrchestratorSettings(BaseSettings): file_mounts: Optional[Dict[str, Any]] = None envs: Optional[Dict[str, str]] = None + # Future-proofing settings dictionaries + task_settings: Dict[str, Any] = {} + resources_settings: Dict[str, Any] = {} + launch_settings: Dict[str, Any] = {} + class SkypilotBaseOrchestratorConfig( BaseOrchestratorConfig, SkypilotBaseOrchestratorSettings diff --git a/src/zenml/integrations/skypilot/orchestrators/skypilot_base_vm_orchestrator.py b/src/zenml/integrations/skypilot/orchestrators/skypilot_base_vm_orchestrator.py index 303efb1b52b..98fd4b685f3 100644 --- a/src/zenml/integrations/skypilot/orchestrators/skypilot_base_vm_orchestrator.py +++ b/src/zenml/integrations/skypilot/orchestrators/skypilot_base_vm_orchestrator.py @@ -304,42 +304,55 @@ def prepare_or_run_pipeline( if task_envs: merged_envs.update(task_envs) - # Create the Task with all parameters - task = sky.Task( - run=run_command, - setup=setup, - envs=merged_envs, - name=settings.task_name or f"{orchestrator_run_name}", - workdir=settings.workdir, - file_mounts=settings.file_mounts, - ) - + # Create the Task with all parameters and additional task settings + task_kwargs = { + "run": run_command, + "setup": setup, + "envs": merged_envs, + "name": settings.task_name or f"{orchestrator_run_name}", + "workdir": settings.workdir, + "file_mounts": settings.file_mounts, + **settings.task_settings, # Add any arbitrary task settings + } + + # Remove None values to avoid overriding SkyPilot defaults + task_kwargs = { + k: v for k, v in task_kwargs.items() if v is not None + } + + task = sky.Task(**task_kwargs) logger.debug(f"Running run: {run_command}") - # Set resources with all parameters - task = task.set_resources( - sky.Resources( - cloud=self.cloud, - instance_type=instance_type, - cpus=settings.cpus, - memory=settings.memory, - accelerators=settings.accelerators, - accelerator_args=settings.accelerator_args, - use_spot=settings.use_spot, - job_recovery=settings.job_recovery, - region=settings.region, - zone=settings.zone, - image_id=image - if isinstance(self.cloud, sky.clouds.Kubernetes) - else settings.image_id, - disk_size=settings.disk_size, - disk_tier=settings.disk_tier, - ports=settings.ports, - labels=settings.labels, - any_of=settings.any_of, - ordered=settings.ordered, - ) - ) + # Set resources with all parameters and additional resource settings + resources_kwargs = { + "cloud": self.cloud, + "instance_type": instance_type, + "cpus": settings.cpus, + "memory": settings.memory, + "accelerators": settings.accelerators, + "accelerator_args": settings.accelerator_args, + "use_spot": settings.use_spot, + "job_recovery": settings.job_recovery, + "region": settings.region, + "zone": settings.zone, + "image_id": image + if isinstance(self.cloud, sky.clouds.Kubernetes) + else settings.image_id, + "disk_size": settings.disk_size, + "disk_tier": settings.disk_tier, + "ports": settings.ports, + "labels": settings.labels, + "any_of": settings.any_of, + "ordered": settings.ordered, + **settings.resources_settings, # Add any arbitrary resource settings + } + + # Remove None values to avoid overriding SkyPilot defaults + resources_kwargs = { + k: v for k, v in resources_kwargs.items() if v is not None + } + + task = task.set_resources(sky.Resources(**resources_kwargs)) # Do not detach run if logs are being streamed # Otherwise, the logs will not be streamed after the task is submitted @@ -373,19 +386,44 @@ def prepare_or_run_pipeline( ) if launch_new_cluster: + # Prepare launch parameters with additional launch settings + launch_kwargs = { + "retry_until_up": settings.retry_until_up, + "idle_minutes_to_autostop": idle_minutes_to_autostop, + "down": down, + "stream_logs": settings.stream_logs, + "backend": None, + "detach_setup": True, + "detach_run": detach_run, + "num_nodes": num_nodes, + **settings.launch_settings, # Add any arbitrary launch settings + } + + # Remove None values to avoid overriding SkyPilot defaults + launch_kwargs = { + k: v for k, v in launch_kwargs.items() if v is not None + } + sky.launch( task, cluster_name, - retry_until_up=settings.retry_until_up, - idle_minutes_to_autostop=idle_minutes_to_autostop, - down=down, - stream_logs=settings.stream_logs, - backend=None, - detach_setup=True, - detach_run=detach_run, - num_nodes=num_nodes, + **launch_kwargs, ) else: + # Prepare exec parameters with additional launch settings + exec_kwargs = { + "down": down, + "stream_logs": settings.stream_logs, + "backend": None, + "detach_run": detach_run, + **settings.launch_settings, # Can reuse same settings for exec + } + + # Remove None values to avoid overriding SkyPilot defaults + exec_kwargs = { + k: v for k, v in exec_kwargs.items() if v is not None + } + # Make sure the cluster is up - # If the cluster is already up, this will not do anything sky.start( @@ -397,10 +435,7 @@ def prepare_or_run_pipeline( sky.exec( task, settings.cluster_name, - down=down, - stream_logs=settings.stream_logs, - backend=None, - detach_run=detach_run, + **exec_kwargs, ) except Exception as e: diff --git a/src/zenml/integrations/skypilot/orchestrators/skypilot_orchestrator_entrypoint.py b/src/zenml/integrations/skypilot/orchestrators/skypilot_orchestrator_entrypoint.py index 47c6b0592ba..6b0597d807a 100644 --- a/src/zenml/integrations/skypilot/orchestrators/skypilot_orchestrator_entrypoint.py +++ b/src/zenml/integrations/skypilot/orchestrators/skypilot_orchestrator_entrypoint.py @@ -205,54 +205,77 @@ def run_step_on_skypilot_vm(step_name: str) -> None: if task_envs: merged_envs.update(task_envs) - # Create the Task with all parameters - task = sky.Task( - run=run_command, - setup=setup, - envs=merged_envs, - name=task_name, - workdir=settings.workdir, - file_mounts=settings.file_mounts, - ) + # Create the Task with all parameters and additional task settings + task_kwargs = { + "run": run_command, + "setup": setup, + "envs": merged_envs, + "name": task_name, + "workdir": settings.workdir, + "file_mounts": settings.file_mounts, + **settings.task_settings, # Add any arbitrary task settings + } - # Set resources with all parameters - task = task.set_resources( - sky.Resources( - cloud=orchestrator.cloud, - instance_type=settings.instance_type - or orchestrator.DEFAULT_INSTANCE_TYPE, - cpus=settings.cpus, - memory=settings.memory, - disk_size=settings.disk_size, - disk_tier=settings.disk_tier, - accelerators=settings.accelerators, - accelerator_args=settings.accelerator_args, - use_spot=settings.use_spot, - job_recovery=settings.job_recovery, - region=settings.region, - zone=settings.zone, - image_id=settings.image_id, - ports=settings.ports, - labels=settings.labels, - any_of=settings.any_of, - ordered=settings.ordered, - ) - ) + # Remove None values to avoid overriding SkyPilot defaults + task_kwargs = {k: v for k, v in task_kwargs.items() if v is not None} + + task = sky.Task(**task_kwargs) + + # Set resources with all parameters and additional resource settings + resources_kwargs = { + "cloud": orchestrator.cloud, + "instance_type": settings.instance_type + or orchestrator.DEFAULT_INSTANCE_TYPE, + "cpus": settings.cpus, + "memory": settings.memory, + "disk_size": settings.disk_size, + "disk_tier": settings.disk_tier, + "accelerators": settings.accelerators, + "accelerator_args": settings.accelerator_args, + "use_spot": settings.use_spot, + "job_recovery": settings.job_recovery, + "region": settings.region, + "zone": settings.zone, + "image_id": settings.image_id, + "ports": settings.ports, + "labels": settings.labels, + "any_of": settings.any_of, + "ordered": settings.ordered, + **settings.resources_settings, # Add any arbitrary resource settings + } + + # Remove None values to avoid overriding SkyPilot defaults + resources_kwargs = { + k: v for k, v in resources_kwargs.items() if v is not None + } + + task = task.set_resources(sky.Resources(**resources_kwargs)) # Use num_nodes from settings or default to 1 num_nodes = settings.num_nodes or 1 + # Prepare launch parameters with additional launch settings + launch_kwargs = { + "retry_until_up": settings.retry_until_up, + "idle_minutes_to_autostop": settings.idle_minutes_to_autostop, + "down": settings.down, + "stream_logs": settings.stream_logs, + "backend": None, + "detach_setup": True, + "detach_run": True, + "num_nodes": num_nodes, + **settings.launch_settings, # Add any arbitrary launch settings + } + + # Remove None values to avoid overriding SkyPilot defaults + launch_kwargs = { + k: v for k, v in launch_kwargs.items() if v is not None + } + sky.launch( task, cluster_name, - retry_until_up=settings.retry_until_up, - idle_minutes_to_autostop=settings.idle_minutes_to_autostop, - down=settings.down, - stream_logs=settings.stream_logs, - backend=None, - detach_setup=True, - detach_run=True, - num_nodes=num_nodes, + **launch_kwargs, ) # Wait for pod to finish. From ba5560a91fd7cc1ee35e8f5c65c4124fd16aa158 Mon Sep 17 00:00:00 2001 From: Hamza Tahir Date: Sun, 4 May 2025 08:56:33 +0200 Subject: [PATCH 03/15] Add Field annotations to Skypilot Orchestrator Base VM config --- .../skypilot_orchestrator_base_vm_config.py | 32 +++++++++++++------ 1 file changed, 23 insertions(+), 9 deletions(-) diff --git a/src/zenml/integrations/skypilot/flavors/skypilot_orchestrator_base_vm_config.py b/src/zenml/integrations/skypilot/flavors/skypilot_orchestrator_base_vm_config.py index e81720fd0db..6f677e31c05 100644 --- a/src/zenml/integrations/skypilot/flavors/skypilot_orchestrator_base_vm_config.py +++ b/src/zenml/integrations/skypilot/flavors/skypilot_orchestrator_base_vm_config.py @@ -15,6 +15,8 @@ from typing import Any, Dict, List, Literal, Optional, Union +from pydantic import Field + from zenml.config.base_settings import BaseSettings from zenml.logger import get_logger from zenml.orchestrators import BaseOrchestratorConfig @@ -112,23 +114,29 @@ class SkypilotBaseOrchestratorSettings(BaseSettings): # Resources instance_type: Optional[str] = None - cpus: Union[None, int, float, str] = None - memory: Union[None, int, float, str] = None - accelerators: Union[None, str, Dict[str, int], List[str]] = None + cpus: Union[None, int, float, str] = Field( + default=None, union_mode="left_to_right" + ) + memory: Union[None, int, float, str] = Field( + default=None, union_mode="left_to_right" + ) + accelerators: Union[None, str, Dict[str, int], List[str]] = Field( + default=None, union_mode="left_to_right" + ) accelerator_args: Optional[Dict[str, Any]] = None use_spot: Optional[bool] = None - job_recovery: Union[None, str, Dict[str, Any]] = None + job_recovery: Union[None, str, Dict[str, Any]] = Field( + default=None, union_mode="left_to_right" + ) region: Optional[str] = None zone: Optional[str] = None - image_id: Union[Dict[str, str], str, None] = None + image_id: Union[Dict[str, str], str, None] = Field( + default=None, union_mode="left_to_right" + ) disk_size: Optional[int] = None disk_tier: Optional[Literal["high", "medium", "low", "ultra", "best"]] = ( None ) - ports: Union[None, int, str, List[Union[int, str]]] = None - labels: Optional[Dict[str, str]] = None - any_of: Optional[List[Dict[str, Any]]] = None - ordered: Optional[List[Dict[str, Any]]] = None # Run settings cluster_name: Optional[str] = None @@ -139,6 +147,12 @@ class SkypilotBaseOrchestratorSettings(BaseSettings): docker_run_args: List[str] = [] # Additional SkyPilot features + ports: Union[None, int, str, List[Union[int, str]]] = Field( + default=None, union_mode="left_to_right" + ) + labels: Optional[Dict[str, str]] = None + any_of: Optional[List[Dict[str, Any]]] = None + ordered: Optional[List[Dict[str, Any]]] = None workdir: Optional[str] = None task_name: Optional[str] = None num_nodes: Optional[int] = None From 97f3ad8f18b271f4270c91c214a033ac5dbe14ec Mon Sep 17 00:00:00 2001 From: Hamza Tahir Date: Sun, 4 May 2025 09:05:10 +0200 Subject: [PATCH 04/15] Refactor Skypilot orchestrator utility functions --- .../skypilot_base_vm_orchestrator.py | 160 ++++++----------- .../skypilot_orchestrator_entrypoint.py | 131 +++++--------- .../skypilot/orchestrators/test_utils.py | 170 ++++++++++++++++++ 3 files changed, 261 insertions(+), 200 deletions(-) create mode 100644 tests/integration/integrations/skypilot/orchestrators/test_utils.py diff --git a/src/zenml/integrations/skypilot/orchestrators/skypilot_base_vm_orchestrator.py b/src/zenml/integrations/skypilot/orchestrators/skypilot_base_vm_orchestrator.py index 98fd4b685f3..fa114a61b55 100644 --- a/src/zenml/integrations/skypilot/orchestrators/skypilot_base_vm_orchestrator.py +++ b/src/zenml/integrations/skypilot/orchestrators/skypilot_base_vm_orchestrator.py @@ -14,7 +14,6 @@ """Implementation of the Skypilot base VM orchestrator.""" import os -import re from abc import abstractmethod from typing import TYPE_CHECKING, Any, Dict, Optional, Tuple, cast from uuid import uuid4 @@ -31,6 +30,14 @@ from zenml.integrations.skypilot.orchestrators.skypilot_orchestrator_entrypoint_configuration import ( SkypilotOrchestratorEntrypointConfiguration, ) +from zenml.integrations.skypilot.utils import ( + create_docker_run_command, + prepare_docker_setup, + prepare_launch_kwargs, + prepare_resources_kwargs, + prepare_task_kwargs, + sanitize_cluster_name, +) from zenml.logger import get_logger from zenml.orchestrators import ( ContainerizedOrchestrator, @@ -252,15 +259,7 @@ def prepare_or_run_pipeline( entrypoint_str = " ".join(command) arguments_str = " ".join(args) - task_envs = environment - docker_environment_str = " ".join( - f"-e {k}={v}" for k, v in environment.items() - ) - custom_run_args = " ".join(settings.docker_run_args) - if custom_run_args: - custom_run_args += " " - - instance_type = settings.instance_type or self.DEFAULT_INSTANCE_TYPE + task_envs = environment.copy() # Set up credentials self.setup_credentials() @@ -268,16 +267,15 @@ def prepare_or_run_pipeline( # Guaranteed by stack validation assert stack is not None and stack.container_registry is not None - if docker_creds := stack.container_registry.credentials: - docker_username, docker_password = docker_creds - setup = ( - f"sudo docker login --username $DOCKER_USERNAME --password " - f"$DOCKER_PASSWORD {stack.container_registry.config.uri}" - ) - task_envs["DOCKER_USERNAME"] = docker_username - task_envs["DOCKER_PASSWORD"] = docker_password - else: - setup = None + # Prepare Docker setup + setup, docker_creds_envs = prepare_docker_setup( + container_registry_uri=stack.container_registry.config.uri, + credentials=stack.container_registry.credentials, + ) + + # Update task_envs with Docker credentials + if docker_creds_envs: + task_envs.update(docker_creds_envs) # Run the entire pipeline @@ -291,73 +289,40 @@ def prepare_or_run_pipeline( down = False idle_minutes_to_autostop = None else: - run_command = f"sudo docker run --rm {custom_run_args}{docker_environment_str} {image} {entrypoint_str} {arguments_str}" + run_command = create_docker_run_command( + image=image, + entrypoint_str=entrypoint_str, + arguments_str=arguments_str, + environment=task_envs, + docker_run_args=settings.docker_run_args, + ) down = settings.down idle_minutes_to_autostop = settings.idle_minutes_to_autostop - # Merge envs from settings with existing task_envs - merged_envs = {} - # First add user-provided envs - if settings.envs: - merged_envs.update(settings.envs) - # Then add task_envs which take precedence - if task_envs: - merged_envs.update(task_envs) - - # Create the Task with all parameters and additional task settings - task_kwargs = { - "run": run_command, - "setup": setup, - "envs": merged_envs, - "name": settings.task_name or f"{orchestrator_run_name}", - "workdir": settings.workdir, - "file_mounts": settings.file_mounts, - **settings.task_settings, # Add any arbitrary task settings - } - - # Remove None values to avoid overriding SkyPilot defaults - task_kwargs = { - k: v for k, v in task_kwargs.items() if v is not None - } + # Create the Task with all parameters and task settings + task_kwargs = prepare_task_kwargs( + settings=settings, + run_command=run_command, + setup=setup, + task_envs=task_envs, + task_name=f"{orchestrator_run_name}", + ) task = sky.Task(**task_kwargs) logger.debug(f"Running run: {run_command}") - # Set resources with all parameters and additional resource settings - resources_kwargs = { - "cloud": self.cloud, - "instance_type": instance_type, - "cpus": settings.cpus, - "memory": settings.memory, - "accelerators": settings.accelerators, - "accelerator_args": settings.accelerator_args, - "use_spot": settings.use_spot, - "job_recovery": settings.job_recovery, - "region": settings.region, - "zone": settings.zone, - "image_id": image + # Set resources with all parameters and resource settings + resources_kwargs = prepare_resources_kwargs( + cloud=self.cloud, + settings=settings, + default_instance_type=self.DEFAULT_INSTANCE_TYPE, + kubernetes_image=image if isinstance(self.cloud, sky.clouds.Kubernetes) - else settings.image_id, - "disk_size": settings.disk_size, - "disk_tier": settings.disk_tier, - "ports": settings.ports, - "labels": settings.labels, - "any_of": settings.any_of, - "ordered": settings.ordered, - **settings.resources_settings, # Add any arbitrary resource settings - } - - # Remove None values to avoid overriding SkyPilot defaults - resources_kwargs = { - k: v for k, v in resources_kwargs.items() if v is not None - } + else None, + ) task = task.set_resources(sky.Resources(**resources_kwargs)) - # Do not detach run if logs are being streamed - # Otherwise, the logs will not be streamed after the task is submitted - detach_run = not settings.stream_logs - # Use num_nodes from settings or default to 1 num_nodes = settings.num_nodes or 1 @@ -378,7 +343,7 @@ def prepare_or_run_pipeline( ) cluster_name = settings.cluster_name else: - cluster_name = self.sanitize_cluster_name( + cluster_name = sanitize_cluster_name( f"{orchestrator_run_name}" ) logger.info( @@ -387,22 +352,13 @@ def prepare_or_run_pipeline( if launch_new_cluster: # Prepare launch parameters with additional launch settings - launch_kwargs = { - "retry_until_up": settings.retry_until_up, - "idle_minutes_to_autostop": idle_minutes_to_autostop, - "down": down, - "stream_logs": settings.stream_logs, - "backend": None, - "detach_setup": True, - "detach_run": detach_run, - "num_nodes": num_nodes, - **settings.launch_settings, # Add any arbitrary launch settings - } - - # Remove None values to avoid overriding SkyPilot defaults - launch_kwargs = { - k: v for k, v in launch_kwargs.items() if v is not None - } + launch_kwargs = prepare_launch_kwargs( + settings=settings, + stream_logs=settings.stream_logs, + down=down, + idle_minutes_to_autostop=idle_minutes_to_autostop, + num_nodes=num_nodes, + ) sky.launch( task, @@ -415,7 +371,7 @@ def prepare_or_run_pipeline( "down": down, "stream_logs": settings.stream_logs, "backend": None, - "detach_run": detach_run, + "detach_run": not settings.stream_logs, # detach_run is opposite of stream_logs **settings.launch_settings, # Can reuse same settings for exec } @@ -445,19 +401,3 @@ def prepare_or_run_pipeline( finally: # Unset the service connector AWS profile ENV variable self.prepare_environment_variable(set=False) - - def sanitize_cluster_name(self, name: str) -> str: - """Sanitize the value to be used in a cluster name. - - Args: - name: Arbitrary input cluster name. - - Returns: - Sanitized cluster name. - """ - name = re.sub( - r"[^a-z0-9-]", "-", name.lower() - ) # replaces any character that is not a lowercase letter, digit, or hyphen with a hyphen - name = re.sub(r"^[-]+", "", name) # trim leading hyphens - name = re.sub(r"[-]+$", "", name) # trim trailing hyphens - return name diff --git a/src/zenml/integrations/skypilot/orchestrators/skypilot_orchestrator_entrypoint.py b/src/zenml/integrations/skypilot/orchestrators/skypilot_orchestrator_entrypoint.py index 6b0597d807a..3f65217e1f7 100644 --- a/src/zenml/integrations/skypilot/orchestrators/skypilot_orchestrator_entrypoint.py +++ b/src/zenml/integrations/skypilot/orchestrators/skypilot_orchestrator_entrypoint.py @@ -32,6 +32,14 @@ ENV_ZENML_SKYPILOT_ORCHESTRATOR_RUN_ID, SkypilotBaseOrchestrator, ) +from zenml.integrations.skypilot.utils import ( + create_docker_run_command, + prepare_docker_setup, + prepare_launch_kwargs, + prepare_resources_kwargs, + prepare_task_kwargs, + sanitize_cluster_name, +) from zenml.logger import get_logger from zenml.orchestrators.dag_runner import ThreadedDagRunner from zenml.orchestrators.utils import get_config_environment_vars @@ -102,19 +110,11 @@ def main() -> None: if container_registry is None: raise ValueError("Container registry cannot be None.") - if docker_creds := container_registry.credentials: - docker_username, docker_password = docker_creds - setup = ( - f"docker login --username $DOCKER_USERNAME --password " - f"$DOCKER_PASSWORD {container_registry.config.uri}" - ) - task_envs = { - "DOCKER_USERNAME": docker_username, - "DOCKER_PASSWORD": docker_password, - } - else: - setup = None - task_envs = None + # Prepare Docker setup + setup, task_envs = prepare_docker_setup( + container_registry_uri=container_registry.config.uri, + credentials=container_registry.credentials, + ) unique_resource_configs: Dict[str, str] = {} for step_name, step in deployment.step_configurations.items(): @@ -142,7 +142,7 @@ def main() -> None: accelerators_hashable, ) cluster_name_parts = [ - orchestrator.sanitize_cluster_name(str(part)) + sanitize_cluster_name(str(part)) for part in resource_config if part is not None ] @@ -185,92 +185,43 @@ def run_step_on_skypilot_vm(step_name: str) -> None: env = get_config_environment_vars() env[ENV_ZENML_SKYPILOT_ORCHESTRATOR_RUN_ID] = orchestrator_run_id - docker_environment_str = " ".join( - f"-e {k}={v}" for k, v in env.items() + # Create the Docker run command + run_command = create_docker_run_command( + image=image, + entrypoint_str=entrypoint_str, + arguments_str=arguments_str, + environment=env, + docker_run_args=settings.docker_run_args, ) - custom_run_args = " ".join(settings.docker_run_args) - if custom_run_args: - custom_run_args += " " - # Set up the task - run_command = f"docker run --rm {custom_run_args}{docker_environment_str} {image} {entrypoint_str} {arguments_str}" task_name = f"{deployment.id}-{step_name}-{time.time()}" - # Merge envs from settings with existing task_envs - merged_envs = {} - # First add user-provided envs - if settings.envs: - merged_envs.update(settings.envs) - # Then add task_envs which take precedence - if task_envs: - merged_envs.update(task_envs) - - # Create the Task with all parameters and additional task settings - task_kwargs = { - "run": run_command, - "setup": setup, - "envs": merged_envs, - "name": task_name, - "workdir": settings.workdir, - "file_mounts": settings.file_mounts, - **settings.task_settings, # Add any arbitrary task settings - } - - # Remove None values to avoid overriding SkyPilot defaults - task_kwargs = {k: v for k, v in task_kwargs.items() if v is not None} + # Create task kwargs + task_kwargs = prepare_task_kwargs( + settings=settings, + run_command=run_command, + setup=setup, + task_envs=task_envs, + task_name=task_name, + ) task = sky.Task(**task_kwargs) - # Set resources with all parameters and additional resource settings - resources_kwargs = { - "cloud": orchestrator.cloud, - "instance_type": settings.instance_type - or orchestrator.DEFAULT_INSTANCE_TYPE, - "cpus": settings.cpus, - "memory": settings.memory, - "disk_size": settings.disk_size, - "disk_tier": settings.disk_tier, - "accelerators": settings.accelerators, - "accelerator_args": settings.accelerator_args, - "use_spot": settings.use_spot, - "job_recovery": settings.job_recovery, - "region": settings.region, - "zone": settings.zone, - "image_id": settings.image_id, - "ports": settings.ports, - "labels": settings.labels, - "any_of": settings.any_of, - "ordered": settings.ordered, - **settings.resources_settings, # Add any arbitrary resource settings - } - - # Remove None values to avoid overriding SkyPilot defaults - resources_kwargs = { - k: v for k, v in resources_kwargs.items() if v is not None - } + # Set resources + resources_kwargs = prepare_resources_kwargs( + cloud=orchestrator.cloud, + settings=settings, + default_instance_type=orchestrator.DEFAULT_INSTANCE_TYPE, + ) task = task.set_resources(sky.Resources(**resources_kwargs)) - # Use num_nodes from settings or default to 1 - num_nodes = settings.num_nodes or 1 - - # Prepare launch parameters with additional launch settings - launch_kwargs = { - "retry_until_up": settings.retry_until_up, - "idle_minutes_to_autostop": settings.idle_minutes_to_autostop, - "down": settings.down, - "stream_logs": settings.stream_logs, - "backend": None, - "detach_setup": True, - "detach_run": True, - "num_nodes": num_nodes, - **settings.launch_settings, # Add any arbitrary launch settings - } - - # Remove None values to avoid overriding SkyPilot defaults - launch_kwargs = { - k: v for k, v in launch_kwargs.items() if v is not None - } + # Prepare launch parameters + launch_kwargs = prepare_launch_kwargs( + settings=settings, + stream_logs=settings.stream_logs, + num_nodes=settings.num_nodes, + ) sky.launch( task, diff --git a/tests/integration/integrations/skypilot/orchestrators/test_utils.py b/tests/integration/integrations/skypilot/orchestrators/test_utils.py new file mode 100644 index 00000000000..895f7bcd7b6 --- /dev/null +++ b/tests/integration/integrations/skypilot/orchestrators/test_utils.py @@ -0,0 +1,170 @@ +"""Tests for Skypilot utility functions.""" + +from unittest.mock import patch + +from zenml.integrations.skypilot.flavors.skypilot_orchestrator_base_vm_config import ( + SkypilotBaseOrchestratorSettings, +) +from zenml.integrations.skypilot.utils import ( + create_docker_run_command, + prepare_docker_setup, + prepare_launch_kwargs, + prepare_resources_kwargs, + prepare_task_kwargs, + sanitize_cluster_name, +) + + +def test_sanitize_cluster_name(): + """Test the sanitize_cluster_name function.""" + # Test with valid input + assert sanitize_cluster_name("test-cluster") == "test-cluster" + + # Test with uppercase + assert sanitize_cluster_name("Test-Cluster") == "test-cluster" + + # Test with special characters + assert ( + sanitize_cluster_name("test!@#$%^&*()cluster") + == "test---------cluster" + ) + + # Test with leading/trailing hyphens + assert sanitize_cluster_name("---test-cluster---") == "test-cluster" + + +def test_prepare_docker_setup(): + """Test the prepare_docker_setup function.""" + # Test with credentials + setup, envs = prepare_docker_setup( + container_registry_uri="registry.example.com", + credentials=("username", "password"), + ) + assert "sudo docker login" in setup + assert "registry.example.com" in setup + assert envs["DOCKER_USERNAME"] == "username" + assert envs["DOCKER_PASSWORD"] == "password" + + # Test without credentials + setup, envs = prepare_docker_setup( + container_registry_uri="registry.example.com", + ) + assert setup is None + assert envs == {} + + +def test_create_docker_run_command(): + """Test the create_docker_run_command function.""" + command = create_docker_run_command( + image="test-image:latest", + entrypoint_str="python -m app", + arguments_str="--arg1 value1 --arg2 value2", + environment={"ENV1": "val1", "ENV2": "val2"}, + docker_run_args=["--network=host", "--privileged"], + ) + + assert "sudo docker run" in command + assert "--rm" in command + assert "--network=host --privileged" in command + assert "-e ENV1=val1" in command + assert "-e ENV2=val2" in command + assert "test-image:latest" in command + assert "python -m app" in command + assert "--arg1 value1 --arg2 value2" in command + + +def test_prepare_task_kwargs(): + """Test the prepare_task_kwargs function.""" + settings = SkypilotBaseOrchestratorSettings( + envs={"SETTING_ENV": "setting_val"}, + task_name=None, + workdir="/workdir", + file_mounts={"/src": "/dest"}, + task_settings={"custom": "value"}, + ) + + task_kwargs = prepare_task_kwargs( + settings=settings, + run_command="echo hello", + setup="apt-get update", + task_envs={"TASK_ENV": "task_val"}, + task_name="test-task", + ) + + assert task_kwargs["run"] == "echo hello" + assert task_kwargs["setup"] == "apt-get update" + assert task_kwargs["name"] == "test-task" + assert task_kwargs["workdir"] == "/workdir" + assert task_kwargs["file_mounts"] == {"/src": "/dest"} + assert task_kwargs["custom"] == "value" + assert task_kwargs["envs"]["SETTING_ENV"] == "setting_val" + assert task_kwargs["envs"]["TASK_ENV"] == "task_val" + + +@patch("sky.clouds.Cloud") +def test_prepare_resources_kwargs(mock_cloud): + """Test the prepare_resources_kwargs function.""" + settings = SkypilotBaseOrchestratorSettings( + instance_type="m4.large", + cpus=4, + memory=16, + accelerators={"V100": 2}, + region="us-west-2", + disk_size=100, + resources_settings={"custom_resource": "value"}, + ) + + resources_kwargs = prepare_resources_kwargs( + cloud=mock_cloud, + settings=settings, + default_instance_type="t2.micro", + ) + + assert resources_kwargs["cloud"] == mock_cloud + assert resources_kwargs["instance_type"] == "m4.large" + assert resources_kwargs["cpus"] == 4 + assert resources_kwargs["memory"] == 16 + assert resources_kwargs["accelerators"] == {"V100": 2} + assert resources_kwargs["region"] == "us-west-2" + assert resources_kwargs["disk_size"] == 100 + assert resources_kwargs["custom_resource"] == "value" + + +def test_prepare_launch_kwargs(): + """Test the prepare_launch_kwargs function.""" + settings = SkypilotBaseOrchestratorSettings( + retry_until_up=True, + idle_minutes_to_autostop=60, + down=True, + stream_logs=True, + num_nodes=3, + launch_settings={"custom_launch": "value"}, + ) + + launch_kwargs = prepare_launch_kwargs( + settings=settings, + stream_logs=True, + ) + + assert launch_kwargs["retry_until_up"] is True + assert launch_kwargs["idle_minutes_to_autostop"] == 60 + assert launch_kwargs["down"] is True + assert launch_kwargs["stream_logs"] is True + assert launch_kwargs["detach_run"] is False # opposite of stream_logs + assert launch_kwargs["num_nodes"] == 3 + assert launch_kwargs["custom_launch"] == "value" + + # Test with override values + launch_kwargs = prepare_launch_kwargs( + settings=settings, + stream_logs=False, + down=False, + idle_minutes_to_autostop=30, + num_nodes=5, + ) + + assert launch_kwargs["idle_minutes_to_autostop"] == 30 + assert launch_kwargs["down"] is False + assert launch_kwargs["stream_logs"] is False + assert launch_kwargs["detach_run"] is True # opposite of stream_logs + assert launch_kwargs["num_nodes"] == 5 From ed78460f67f65ad0d0af02ca7cb3514760aa3ee0 Mon Sep 17 00:00:00 2001 From: Hamza Tahir Date: Sun, 4 May 2025 09:08:03 +0200 Subject: [PATCH 05/15] Fix typo in sanitize_cluster_name test expected output --- .../integrations/skypilot/orchestrators/test_utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration/integrations/skypilot/orchestrators/test_utils.py b/tests/integration/integrations/skypilot/orchestrators/test_utils.py index 895f7bcd7b6..7e24db72338 100644 --- a/tests/integration/integrations/skypilot/orchestrators/test_utils.py +++ b/tests/integration/integrations/skypilot/orchestrators/test_utils.py @@ -26,7 +26,7 @@ def test_sanitize_cluster_name(): # Test with special characters assert ( sanitize_cluster_name("test!@#$%^&*()cluster") - == "test---------cluster" + == "test----------cluster" ) # Test with leading/trailing hyphens From 4aa8599864864c2120bf5dc3d9ae4cece28dd42f Mon Sep 17 00:00:00 2001 From: Hamza Tahir Date: Sun, 4 May 2025 09:10:22 +0200 Subject: [PATCH 06/15] Update Skypilot integrations to version 0.9.2 --- src/zenml/integrations/skypilot_aws/__init__.py | 2 +- src/zenml/integrations/skypilot_azure/__init__.py | 2 +- src/zenml/integrations/skypilot_gcp/__init__.py | 2 +- src/zenml/integrations/skypilot_kubernetes/__init__.py | 2 +- src/zenml/integrations/skypilot_lambda/__init__.py | 2 +- 5 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/zenml/integrations/skypilot_aws/__init__.py b/src/zenml/integrations/skypilot_aws/__init__.py index 1e572692fcc..f64ff12fe9b 100644 --- a/src/zenml/integrations/skypilot_aws/__init__.py +++ b/src/zenml/integrations/skypilot_aws/__init__.py @@ -32,7 +32,7 @@ class SkypilotAWSIntegration(Integration): NAME = SKYPILOT_AWS # all 0.6.x versions of skypilot[aws] are compatible - REQUIREMENTS = ["skypilot[aws]~=0.8.0"] + REQUIREMENTS = ["skypilot[aws]~=0.9.2"] APT_PACKAGES = ["openssh-client", "rsync"] @classmethod diff --git a/src/zenml/integrations/skypilot_azure/__init__.py b/src/zenml/integrations/skypilot_azure/__init__.py index 3e92c100ef7..1053f447bc3 100644 --- a/src/zenml/integrations/skypilot_azure/__init__.py +++ b/src/zenml/integrations/skypilot_azure/__init__.py @@ -31,7 +31,7 @@ class SkypilotAzureIntegration(Integration): """Definition of Skypilot (Azure) Integration for ZenML.""" NAME = SKYPILOT_AZURE - REQUIREMENTS = ["skypilot[azure]~=0.8.0"] + REQUIREMENTS = ["skypilot[azure]~=0.9.2"] APT_PACKAGES = ["openssh-client", "rsync"] @classmethod diff --git a/src/zenml/integrations/skypilot_gcp/__init__.py b/src/zenml/integrations/skypilot_gcp/__init__.py index 3cec89a5a0f..1bc8b71441e 100644 --- a/src/zenml/integrations/skypilot_gcp/__init__.py +++ b/src/zenml/integrations/skypilot_gcp/__init__.py @@ -31,7 +31,7 @@ class SkypilotGCPIntegration(Integration): """Definition of Skypilot (GCP) Integration for ZenML.""" NAME = SKYPILOT_GCP - REQUIREMENTS = ["skypilot[gcp]~=0.8.0"] + REQUIREMENTS = ["skypilot[gcp]~=0.9.2"] APT_PACKAGES = ["openssh-client", "rsync"] @classmethod diff --git a/src/zenml/integrations/skypilot_kubernetes/__init__.py b/src/zenml/integrations/skypilot_kubernetes/__init__.py index 5445884385d..1349a2b0268 100644 --- a/src/zenml/integrations/skypilot_kubernetes/__init__.py +++ b/src/zenml/integrations/skypilot_kubernetes/__init__.py @@ -32,7 +32,7 @@ class SkypilotKubernetesIntegration(Integration): NAME = SKYPILOT_KUBERNETES # all 0.6.x versions of skypilot[kubernetes] are compatible - REQUIREMENTS = ["skypilot[kubernetes]~=0.8.0"] + REQUIREMENTS = ["skypilot[kubernetes]~=0.9.2"] APT_PACKAGES = ["openssh-client", "rsync"] @classmethod diff --git a/src/zenml/integrations/skypilot_lambda/__init__.py b/src/zenml/integrations/skypilot_lambda/__init__.py index 2af780772c5..d18010fa692 100644 --- a/src/zenml/integrations/skypilot_lambda/__init__.py +++ b/src/zenml/integrations/skypilot_lambda/__init__.py @@ -31,7 +31,7 @@ class SkypilotLambdaIntegration(Integration): """Definition of Skypilot Lambda Integration for ZenML.""" NAME = SKYPILOT_LAMBDA - REQUIREMENTS = ["skypilot[lambda]~=0.8.0"] + REQUIREMENTS = ["skypilot[lambda]~=0.9.2"] @classmethod def flavors(cls) -> List[Type[Flavor]]: From 89ac93e5135fff1d8c46063ad93cb2554dc83f43 Mon Sep 17 00:00:00 2001 From: Hamza Tahir Date: Sun, 4 May 2025 09:12:00 +0200 Subject: [PATCH 07/15] Remove unnecessary lines in Skypilot integrations --- src/zenml/integrations/skypilot_aws/__init__.py | 1 - src/zenml/integrations/skypilot_azure/__init__.py | 1 - src/zenml/integrations/skypilot_kubernetes/__init__.py | 1 - src/zenml/integrations/skypilot_lambda/__init__.py | 1 - 4 files changed, 4 deletions(-) diff --git a/src/zenml/integrations/skypilot_aws/__init__.py b/src/zenml/integrations/skypilot_aws/__init__.py index f64ff12fe9b..72c8c9c31d2 100644 --- a/src/zenml/integrations/skypilot_aws/__init__.py +++ b/src/zenml/integrations/skypilot_aws/__init__.py @@ -47,4 +47,3 @@ def flavors(cls) -> List[Type[Flavor]]: ) return [SkypilotAWSOrchestratorFlavor] - diff --git a/src/zenml/integrations/skypilot_azure/__init__.py b/src/zenml/integrations/skypilot_azure/__init__.py index 1053f447bc3..34186a89c7d 100644 --- a/src/zenml/integrations/skypilot_azure/__init__.py +++ b/src/zenml/integrations/skypilot_azure/__init__.py @@ -46,4 +46,3 @@ def flavors(cls) -> List[Type[Flavor]]: ) return [SkypilotAzureOrchestratorFlavor] - diff --git a/src/zenml/integrations/skypilot_kubernetes/__init__.py b/src/zenml/integrations/skypilot_kubernetes/__init__.py index 1349a2b0268..1f1e768d303 100644 --- a/src/zenml/integrations/skypilot_kubernetes/__init__.py +++ b/src/zenml/integrations/skypilot_kubernetes/__init__.py @@ -47,4 +47,3 @@ def flavors(cls) -> List[Type[Flavor]]: ) return [SkypilotKubernetesOrchestratorFlavor] - diff --git a/src/zenml/integrations/skypilot_lambda/__init__.py b/src/zenml/integrations/skypilot_lambda/__init__.py index d18010fa692..87071c06bf2 100644 --- a/src/zenml/integrations/skypilot_lambda/__init__.py +++ b/src/zenml/integrations/skypilot_lambda/__init__.py @@ -45,4 +45,3 @@ def flavors(cls) -> List[Type[Flavor]]: ) return [SkypilotLambdaOrchestratorFlavor] - From 5062c4f0c2806e33a9ae3c6f99de9a8996077b66 Mon Sep 17 00:00:00 2001 From: Hamza Tahir Date: Sun, 4 May 2025 09:16:29 +0200 Subject: [PATCH 08/15] Add use_sudo parameter to prepare_docker_setup(). --- .../skypilot_base_vm_orchestrator.py | 2 + .../skypilot_orchestrator_entrypoint.py | 3 ++ .../skypilot/orchestrators/test_utils.py | 41 ++++++++++++++++++- 3 files changed, 45 insertions(+), 1 deletion(-) diff --git a/src/zenml/integrations/skypilot/orchestrators/skypilot_base_vm_orchestrator.py b/src/zenml/integrations/skypilot/orchestrators/skypilot_base_vm_orchestrator.py index fa114a61b55..cc9322a8577 100644 --- a/src/zenml/integrations/skypilot/orchestrators/skypilot_base_vm_orchestrator.py +++ b/src/zenml/integrations/skypilot/orchestrators/skypilot_base_vm_orchestrator.py @@ -271,6 +271,7 @@ def prepare_or_run_pipeline( setup, docker_creds_envs = prepare_docker_setup( container_registry_uri=stack.container_registry.config.uri, credentials=stack.container_registry.credentials, + use_sudo=True, # Base orchestrator uses sudo ) # Update task_envs with Docker credentials @@ -295,6 +296,7 @@ def prepare_or_run_pipeline( arguments_str=arguments_str, environment=task_envs, docker_run_args=settings.docker_run_args, + use_sudo=True, # Base orchestrator uses sudo ) down = settings.down idle_minutes_to_autostop = settings.idle_minutes_to_autostop diff --git a/src/zenml/integrations/skypilot/orchestrators/skypilot_orchestrator_entrypoint.py b/src/zenml/integrations/skypilot/orchestrators/skypilot_orchestrator_entrypoint.py index 3f65217e1f7..921de7242d4 100644 --- a/src/zenml/integrations/skypilot/orchestrators/skypilot_orchestrator_entrypoint.py +++ b/src/zenml/integrations/skypilot/orchestrators/skypilot_orchestrator_entrypoint.py @@ -114,6 +114,7 @@ def main() -> None: setup, task_envs = prepare_docker_setup( container_registry_uri=container_registry.config.uri, credentials=container_registry.credentials, + use_sudo=False, # Entrypoint doesn't use sudo ) unique_resource_configs: Dict[str, str] = {} @@ -192,6 +193,7 @@ def run_step_on_skypilot_vm(step_name: str) -> None: arguments_str=arguments_str, environment=env, docker_run_args=settings.docker_run_args, + use_sudo=False, # Entrypoint doesn't use sudo ) task_name = f"{deployment.id}-{step_name}-{time.time()}" @@ -221,6 +223,7 @@ def run_step_on_skypilot_vm(step_name: str) -> None: settings=settings, stream_logs=settings.stream_logs, num_nodes=settings.num_nodes, + detach_run=True, # Entrypoint always detaches ) sky.launch( diff --git a/tests/integration/integrations/skypilot/orchestrators/test_utils.py b/tests/integration/integrations/skypilot/orchestrators/test_utils.py index 7e24db72338..f47010d25a4 100644 --- a/tests/integration/integrations/skypilot/orchestrators/test_utils.py +++ b/tests/integration/integrations/skypilot/orchestrators/test_utils.py @@ -35,16 +35,27 @@ def test_sanitize_cluster_name(): def test_prepare_docker_setup(): """Test the prepare_docker_setup function.""" - # Test with credentials + # Test with credentials and sudo setup, envs = prepare_docker_setup( container_registry_uri="registry.example.com", credentials=("username", "password"), + use_sudo=True, ) assert "sudo docker login" in setup assert "registry.example.com" in setup assert envs["DOCKER_USERNAME"] == "username" assert envs["DOCKER_PASSWORD"] == "password" + # Test with credentials and no sudo + setup, envs = prepare_docker_setup( + container_registry_uri="registry.example.com", + credentials=("username", "password"), + use_sudo=False, + ) + assert "docker login" in setup + assert "sudo" not in setup + assert "registry.example.com" in setup + # Test without credentials setup, envs = prepare_docker_setup( container_registry_uri="registry.example.com", @@ -55,12 +66,14 @@ def test_prepare_docker_setup(): def test_create_docker_run_command(): """Test the create_docker_run_command function.""" + # Test with sudo command = create_docker_run_command( image="test-image:latest", entrypoint_str="python -m app", arguments_str="--arg1 value1 --arg2 value2", environment={"ENV1": "val1", "ENV2": "val2"}, docker_run_args=["--network=host", "--privileged"], + use_sudo=True, ) assert "sudo docker run" in command @@ -72,6 +85,19 @@ def test_create_docker_run_command(): assert "python -m app" in command assert "--arg1 value1 --arg2 value2" in command + # Test without sudo + command = create_docker_run_command( + image="test-image:latest", + entrypoint_str="python -m app", + arguments_str="--arg1 value1 --arg2 value2", + environment={"ENV1": "val1", "ENV2": "val2"}, + docker_run_args=["--network=host", "--privileged"], + use_sudo=False, + ) + + assert "docker run" in command + assert "sudo" not in command + def test_prepare_task_kwargs(): """Test the prepare_task_kwargs function.""" @@ -141,6 +167,7 @@ def test_prepare_launch_kwargs(): launch_settings={"custom_launch": "value"}, ) + # Test default behavior (detach_run opposite of stream_logs) launch_kwargs = prepare_launch_kwargs( settings=settings, stream_logs=True, @@ -154,6 +181,18 @@ def test_prepare_launch_kwargs(): assert launch_kwargs["num_nodes"] == 3 assert launch_kwargs["custom_launch"] == "value" + # Test with explicit detach_run override + launch_kwargs = prepare_launch_kwargs( + settings=settings, + stream_logs=True, + detach_run=True, # Explicitly override + ) + + assert launch_kwargs["stream_logs"] is True + assert ( + launch_kwargs["detach_run"] is True + ) # Explicitly set, not derived from stream_logs + # Test with override values launch_kwargs = prepare_launch_kwargs( settings=settings, From c057a59a46f73cf68b2554bec3b23100d86087fe Mon Sep 17 00:00:00 2001 From: Hamza Tahir Date: Sun, 4 May 2025 09:26:14 +0200 Subject: [PATCH 09/15] Update Skypilot integrations with omegaconf>=2.4.0.dev3 --- src/zenml/integrations/skypilot_aws/__init__.py | 2 +- src/zenml/integrations/skypilot_azure/__init__.py | 2 +- src/zenml/integrations/skypilot_gcp/__init__.py | 2 +- src/zenml/integrations/skypilot_kubernetes/__init__.py | 2 +- src/zenml/integrations/skypilot_lambda/__init__.py | 2 +- 5 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/zenml/integrations/skypilot_aws/__init__.py b/src/zenml/integrations/skypilot_aws/__init__.py index 72c8c9c31d2..0c41d75aa1d 100644 --- a/src/zenml/integrations/skypilot_aws/__init__.py +++ b/src/zenml/integrations/skypilot_aws/__init__.py @@ -32,7 +32,7 @@ class SkypilotAWSIntegration(Integration): NAME = SKYPILOT_AWS # all 0.6.x versions of skypilot[aws] are compatible - REQUIREMENTS = ["skypilot[aws]~=0.9.2"] + REQUIREMENTS = ["skypilot[aws]~=0.9.2", "omegaconf>=2.4.0.dev3"] APT_PACKAGES = ["openssh-client", "rsync"] @classmethod diff --git a/src/zenml/integrations/skypilot_azure/__init__.py b/src/zenml/integrations/skypilot_azure/__init__.py index 34186a89c7d..ae7ab33c2c9 100644 --- a/src/zenml/integrations/skypilot_azure/__init__.py +++ b/src/zenml/integrations/skypilot_azure/__init__.py @@ -31,7 +31,7 @@ class SkypilotAzureIntegration(Integration): """Definition of Skypilot (Azure) Integration for ZenML.""" NAME = SKYPILOT_AZURE - REQUIREMENTS = ["skypilot[azure]~=0.9.2"] + REQUIREMENTS = ["skypilot[azure]~=0.9.2", "omegaconf>=2.4.0.dev3"] APT_PACKAGES = ["openssh-client", "rsync"] @classmethod diff --git a/src/zenml/integrations/skypilot_gcp/__init__.py b/src/zenml/integrations/skypilot_gcp/__init__.py index 1bc8b71441e..bed7c1278a1 100644 --- a/src/zenml/integrations/skypilot_gcp/__init__.py +++ b/src/zenml/integrations/skypilot_gcp/__init__.py @@ -31,7 +31,7 @@ class SkypilotGCPIntegration(Integration): """Definition of Skypilot (GCP) Integration for ZenML.""" NAME = SKYPILOT_GCP - REQUIREMENTS = ["skypilot[gcp]~=0.9.2"] + REQUIREMENTS = ["skypilot[gcp]~=0.9.2", "omegaconf>=2.4.0.dev3"] APT_PACKAGES = ["openssh-client", "rsync"] @classmethod diff --git a/src/zenml/integrations/skypilot_kubernetes/__init__.py b/src/zenml/integrations/skypilot_kubernetes/__init__.py index 1f1e768d303..c5d9fe59fcc 100644 --- a/src/zenml/integrations/skypilot_kubernetes/__init__.py +++ b/src/zenml/integrations/skypilot_kubernetes/__init__.py @@ -32,7 +32,7 @@ class SkypilotKubernetesIntegration(Integration): NAME = SKYPILOT_KUBERNETES # all 0.6.x versions of skypilot[kubernetes] are compatible - REQUIREMENTS = ["skypilot[kubernetes]~=0.9.2"] + REQUIREMENTS = ["skypilot[kubernetes]~=0.9.2", "omegaconf>=2.4.0.dev3"] APT_PACKAGES = ["openssh-client", "rsync"] @classmethod diff --git a/src/zenml/integrations/skypilot_lambda/__init__.py b/src/zenml/integrations/skypilot_lambda/__init__.py index 87071c06bf2..d00d35ba07a 100644 --- a/src/zenml/integrations/skypilot_lambda/__init__.py +++ b/src/zenml/integrations/skypilot_lambda/__init__.py @@ -31,7 +31,7 @@ class SkypilotLambdaIntegration(Integration): """Definition of Skypilot Lambda Integration for ZenML.""" NAME = SKYPILOT_LAMBDA - REQUIREMENTS = ["skypilot[lambda]~=0.9.2"] + REQUIREMENTS = ["skypilot[lambda]~=0.9.2", "omegaconf>=2.4.0.dev3"] @classmethod def flavors(cls) -> List[Type[Flavor]]: From 90155a11cff50dfed4ce8c2017f0ef9d07a8d64a Mon Sep 17 00:00:00 2001 From: Hamza Tahir Date: Sun, 4 May 2025 09:37:15 +0200 Subject: [PATCH 10/15] Update omegaconf requirement to include version 2.3 --- src/zenml/integrations/skypilot_aws/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/zenml/integrations/skypilot_aws/__init__.py b/src/zenml/integrations/skypilot_aws/__init__.py index 0c41d75aa1d..5d6ba0edd24 100644 --- a/src/zenml/integrations/skypilot_aws/__init__.py +++ b/src/zenml/integrations/skypilot_aws/__init__.py @@ -32,7 +32,7 @@ class SkypilotAWSIntegration(Integration): NAME = SKYPILOT_AWS # all 0.6.x versions of skypilot[aws] are compatible - REQUIREMENTS = ["skypilot[aws]~=0.9.2", "omegaconf>=2.4.0.dev3"] + REQUIREMENTS = ["skypilot[aws]~=0.9.2", "omegaconf~=2.3, >=2.4.0.dev3"] APT_PACKAGES = ["openssh-client", "rsync"] @classmethod From 37879422079530a9d054285413cd7a0e617f4604 Mon Sep 17 00:00:00 2001 From: Hamza Tahir Date: Sun, 4 May 2025 09:57:39 +0200 Subject: [PATCH 11/15] Add Skypilot orchestrator utility functions --- src/zenml/integrations/skypilot/utils.py | 235 +++++++++++++++++++++++ 1 file changed, 235 insertions(+) create mode 100644 src/zenml/integrations/skypilot/utils.py diff --git a/src/zenml/integrations/skypilot/utils.py b/src/zenml/integrations/skypilot/utils.py new file mode 100644 index 00000000000..89e41ace7e5 --- /dev/null +++ b/src/zenml/integrations/skypilot/utils.py @@ -0,0 +1,235 @@ +"""Utility functions for Skypilot orchestrators.""" + +import re +from typing import Any, Dict, List, Optional, Tuple + +import sky + +from zenml.integrations.skypilot.flavors.skypilot_orchestrator_base_vm_config import ( + SkypilotBaseOrchestratorSettings, +) +from zenml.logger import get_logger + +logger = get_logger(__name__) + + +def sanitize_cluster_name(name: str) -> str: + """Sanitize the value to be used in a cluster name. + + Args: + name: Arbitrary input cluster name. + + Returns: + Sanitized cluster name. + """ + name = re.sub( + r"[^a-z0-9-]", "-", name.lower() + ) # replaces any character that is not a lowercase letter, digit, or hyphen with a hyphen + name = re.sub(r"^[-]+", "", name) # trim leading hyphens + name = re.sub(r"[-]+$", "", name) # trim trailing hyphens + return name + + +def prepare_docker_setup( + container_registry_uri: str, + credentials: Optional[Tuple[str, str]] = None, + use_sudo: bool = True, +) -> Tuple[Optional[str], Dict[str, str]]: + """Prepare Docker login setup command and environment variables. + + Args: + container_registry_uri: URI of the container registry. + credentials: Optional credentials (username, password) tuple. + use_sudo: Whether to use sudo prefix in docker commands. + + Returns: + Tuple of (setup command, environment variables) + """ + if credentials: + docker_username, docker_password = credentials + sudo_prefix = "sudo " if use_sudo else "" + setup = ( + f"{sudo_prefix}docker login --username $DOCKER_USERNAME --password " + f"$DOCKER_PASSWORD {container_registry_uri}" + ) + task_envs = { + "DOCKER_USERNAME": docker_username, + "DOCKER_PASSWORD": docker_password, + } + else: + setup = None + task_envs = {} + + return setup, task_envs + + +def create_docker_run_command( + image: str, + entrypoint_str: str, + arguments_str: str, + environment: Dict[str, str], + docker_run_args: List[str], + use_sudo: bool = True, +) -> str: + """Create a Docker run command string. + + Args: + image: Docker image to run. + entrypoint_str: Entrypoint command. + arguments_str: Command arguments. + environment: Environment variables. + docker_run_args: Additional Docker run arguments. + use_sudo: Whether to use sudo prefix in docker commands. + + Returns: + Docker run command as string. + """ + docker_environment_str = " ".join( + f"-e {k}={v}" for k, v in environment.items() + ) + custom_run_args = " ".join(docker_run_args) + if custom_run_args: + custom_run_args += " " + + sudo_prefix = "sudo " if use_sudo else "" + return f"{sudo_prefix}docker run --rm {custom_run_args}{docker_environment_str} {image} {entrypoint_str} {arguments_str}" + + +def prepare_task_kwargs( + settings: SkypilotBaseOrchestratorSettings, + run_command: str, + setup: Optional[str], + task_envs: Dict[str, str], + task_name: str, +) -> Dict[str, Any]: + """Prepare task keyword arguments for sky.Task. + + Args: + settings: Skypilot orchestrator settings. + run_command: Command to run. + setup: Setup command. + task_envs: Task environment variables. + task_name: Task name. + + Returns: + Task keyword arguments dictionary. + """ + # Merge envs from settings with existing task_envs + merged_envs = {} + # First add user-provided envs + if settings.envs: + merged_envs.update(settings.envs) + # Then add task_envs which take precedence + if task_envs: + merged_envs.update(task_envs) + + task_kwargs = { + "run": run_command, + "setup": setup, + "envs": merged_envs, + "name": settings.task_name or task_name, + "workdir": settings.workdir, + "file_mounts": settings.file_mounts, + **settings.task_settings, # Add any arbitrary task settings + } + + # Remove None values to avoid overriding SkyPilot defaults + return {k: v for k, v in task_kwargs.items() if v is not None} + + +def prepare_resources_kwargs( + cloud: sky.clouds.Cloud, + settings: SkypilotBaseOrchestratorSettings, + default_instance_type: Optional[str] = None, + kubernetes_image: Optional[str] = None, +) -> Dict[str, Any]: + """Prepare resources keyword arguments for sky.Resources. + + Args: + cloud: Skypilot cloud. + settings: Skypilot orchestrator settings. + default_instance_type: Default instance type. + kubernetes_image: Image to use for Kubernetes (if applicable). + + Returns: + Resources keyword arguments dictionary. + """ + resources_kwargs = { + "cloud": cloud, + "instance_type": settings.instance_type or default_instance_type, + "cpus": settings.cpus, + "memory": settings.memory, + "accelerators": settings.accelerators, + "accelerator_args": settings.accelerator_args, + "use_spot": settings.use_spot, + "job_recovery": settings.job_recovery, + "region": settings.region, + "zone": settings.zone, + "image_id": kubernetes_image + if isinstance(cloud, sky.clouds.Kubernetes) + else settings.image_id, + "disk_size": settings.disk_size, + "disk_tier": settings.disk_tier, + "ports": settings.ports, + "labels": settings.labels, + "any_of": settings.any_of, + "ordered": settings.ordered, + **settings.resources_settings, # Add any arbitrary resource settings + } + + # Remove None values to avoid overriding SkyPilot defaults + return {k: v for k, v in resources_kwargs.items() if v is not None} + + +def prepare_launch_kwargs( + settings: SkypilotBaseOrchestratorSettings, + stream_logs: bool, + down: Optional[bool] = None, + idle_minutes_to_autostop: Optional[int] = None, + num_nodes: Optional[int] = None, + detach_run: Optional[bool] = None, +) -> Dict[str, Any]: + """Prepare launch keyword arguments for sky.launch. + + Args: + settings: Skypilot orchestrator settings. + stream_logs: Whether to stream logs. + down: Whether to tear down the cluster after job completion. + idle_minutes_to_autostop: Minutes to autostop after idleness. + num_nodes: Number of nodes to launch. + detach_run: Whether to detach from the run. If None, will be + determined as the opposite of stream_logs. + + Returns: + Launch keyword arguments dictionary. + """ + # Do not detach run if logs are being streamed + detach_run_value = ( + detach_run if detach_run is not None else not stream_logs + ) + + # Use provided values or settings if not provided + down_value = down if down is not None else settings.down + idle_value = ( + idle_minutes_to_autostop + if idle_minutes_to_autostop is not None + else settings.idle_minutes_to_autostop + ) + nodes_value = ( + num_nodes if num_nodes is not None else (settings.num_nodes or 1) + ) + + launch_kwargs = { + "retry_until_up": settings.retry_until_up, + "idle_minutes_to_autostop": idle_value, + "down": down_value, + "stream_logs": stream_logs, + "backend": None, + "detach_setup": True, + "detach_run": detach_run_value, + "num_nodes": nodes_value, + **settings.launch_settings, # Add any arbitrary launch settings + } + + # Remove None values to avoid overriding SkyPilot defaults + return {k: v for k, v in launch_kwargs.items() if v is not None} From 3cd356cbe41e9d7aea0d47d1f621339451b97be4 Mon Sep 17 00:00:00 2001 From: Hamza Tahir Date: Mon, 5 May 2025 11:53:46 +0200 Subject: [PATCH 12/15] Update Skypilot integrations requirements --- scripts/install-zenml-dev.sh | 2 +- src/zenml/integrations/skypilot/utils.py | 11 ++++++----- src/zenml/integrations/skypilot_aws/__init__.py | 2 +- src/zenml/integrations/skypilot_azure/__init__.py | 2 +- src/zenml/integrations/skypilot_gcp/__init__.py | 2 +- .../integrations/skypilot_kubernetes/__init__.py | 2 +- src/zenml/integrations/skypilot_lambda/__init__.py | 2 +- 7 files changed, 12 insertions(+), 11 deletions(-) diff --git a/scripts/install-zenml-dev.sh b/scripts/install-zenml-dev.sh index 45bfd391217..21c39a0edec 100755 --- a/scripts/install-zenml-dev.sh +++ b/scripts/install-zenml-dev.sh @@ -36,7 +36,7 @@ install_integrations() { # figure out the python version python_version=$(python -c "import sys; print('.'.join(map(str, sys.version_info[:2])))") - ignore_integrations="feast label_studio bentoml seldon pycaret skypilot_aws skypilot_gcp skypilot_azure pigeon prodigy argilla" + ignore_integrations="feast label_studio bentoml seldon pycaret skypilot_aws skypilot_gcp skypilot_azure skypilot_kubernetes skypilot_lambda pigeon prodigy argilla" # Ignore tensorflow and deepchecks only on Python 3.12 if [ "$python_version" = "3.12" ]; then diff --git a/src/zenml/integrations/skypilot/utils.py b/src/zenml/integrations/skypilot/utils.py index 89e41ace7e5..eb749dfb54a 100644 --- a/src/zenml/integrations/skypilot/utils.py +++ b/src/zenml/integrations/skypilot/utils.py @@ -1,9 +1,7 @@ """Utility functions for Skypilot orchestrators.""" import re -from typing import Any, Dict, List, Optional, Tuple - -import sky +from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple from zenml.integrations.skypilot.flavors.skypilot_orchestrator_base_vm_config import ( SkypilotBaseOrchestratorSettings, @@ -12,6 +10,9 @@ logger = get_logger(__name__) +if TYPE_CHECKING: + from sky.clouds.cloud import Cloud + def sanitize_cluster_name(name: str) -> str: """Sanitize the value to be used in a cluster name. @@ -138,7 +139,7 @@ def prepare_task_kwargs( def prepare_resources_kwargs( - cloud: sky.clouds.Cloud, + cloud: "Cloud", settings: SkypilotBaseOrchestratorSettings, default_instance_type: Optional[str] = None, kubernetes_image: Optional[str] = None, @@ -166,7 +167,7 @@ def prepare_resources_kwargs( "region": settings.region, "zone": settings.zone, "image_id": kubernetes_image - if isinstance(cloud, sky.clouds.Kubernetes) + if kubernetes_image else settings.image_id, "disk_size": settings.disk_size, "disk_tier": settings.disk_tier, diff --git a/src/zenml/integrations/skypilot_aws/__init__.py b/src/zenml/integrations/skypilot_aws/__init__.py index 5d6ba0edd24..72c8c9c31d2 100644 --- a/src/zenml/integrations/skypilot_aws/__init__.py +++ b/src/zenml/integrations/skypilot_aws/__init__.py @@ -32,7 +32,7 @@ class SkypilotAWSIntegration(Integration): NAME = SKYPILOT_AWS # all 0.6.x versions of skypilot[aws] are compatible - REQUIREMENTS = ["skypilot[aws]~=0.9.2", "omegaconf~=2.3, >=2.4.0.dev3"] + REQUIREMENTS = ["skypilot[aws]~=0.9.2"] APT_PACKAGES = ["openssh-client", "rsync"] @classmethod diff --git a/src/zenml/integrations/skypilot_azure/__init__.py b/src/zenml/integrations/skypilot_azure/__init__.py index ae7ab33c2c9..34186a89c7d 100644 --- a/src/zenml/integrations/skypilot_azure/__init__.py +++ b/src/zenml/integrations/skypilot_azure/__init__.py @@ -31,7 +31,7 @@ class SkypilotAzureIntegration(Integration): """Definition of Skypilot (Azure) Integration for ZenML.""" NAME = SKYPILOT_AZURE - REQUIREMENTS = ["skypilot[azure]~=0.9.2", "omegaconf>=2.4.0.dev3"] + REQUIREMENTS = ["skypilot[azure]~=0.9.2"] APT_PACKAGES = ["openssh-client", "rsync"] @classmethod diff --git a/src/zenml/integrations/skypilot_gcp/__init__.py b/src/zenml/integrations/skypilot_gcp/__init__.py index bed7c1278a1..1bc8b71441e 100644 --- a/src/zenml/integrations/skypilot_gcp/__init__.py +++ b/src/zenml/integrations/skypilot_gcp/__init__.py @@ -31,7 +31,7 @@ class SkypilotGCPIntegration(Integration): """Definition of Skypilot (GCP) Integration for ZenML.""" NAME = SKYPILOT_GCP - REQUIREMENTS = ["skypilot[gcp]~=0.9.2", "omegaconf>=2.4.0.dev3"] + REQUIREMENTS = ["skypilot[gcp]~=0.9.2"] APT_PACKAGES = ["openssh-client", "rsync"] @classmethod diff --git a/src/zenml/integrations/skypilot_kubernetes/__init__.py b/src/zenml/integrations/skypilot_kubernetes/__init__.py index c5d9fe59fcc..1f1e768d303 100644 --- a/src/zenml/integrations/skypilot_kubernetes/__init__.py +++ b/src/zenml/integrations/skypilot_kubernetes/__init__.py @@ -32,7 +32,7 @@ class SkypilotKubernetesIntegration(Integration): NAME = SKYPILOT_KUBERNETES # all 0.6.x versions of skypilot[kubernetes] are compatible - REQUIREMENTS = ["skypilot[kubernetes]~=0.9.2", "omegaconf>=2.4.0.dev3"] + REQUIREMENTS = ["skypilot[kubernetes]~=0.9.2"] APT_PACKAGES = ["openssh-client", "rsync"] @classmethod diff --git a/src/zenml/integrations/skypilot_lambda/__init__.py b/src/zenml/integrations/skypilot_lambda/__init__.py index d00d35ba07a..87071c06bf2 100644 --- a/src/zenml/integrations/skypilot_lambda/__init__.py +++ b/src/zenml/integrations/skypilot_lambda/__init__.py @@ -31,7 +31,7 @@ class SkypilotLambdaIntegration(Integration): """Definition of Skypilot Lambda Integration for ZenML.""" NAME = SKYPILOT_LAMBDA - REQUIREMENTS = ["skypilot[lambda]~=0.9.2", "omegaconf>=2.4.0.dev3"] + REQUIREMENTS = ["skypilot[lambda]~=0.9.2"] @classmethod def flavors(cls) -> List[Type[Flavor]]: From 9074d93c0173faa163d09e1005798a26641bf9b8 Mon Sep 17 00:00:00 2001 From: Hamza Tahir Date: Mon, 5 May 2025 11:59:54 +0200 Subject: [PATCH 13/15] Update Skypilot VM Orchestrator installation command --- .../book/component-guide/orchestrators/skypilot-vm.md | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/docs/book/component-guide/orchestrators/skypilot-vm.md b/docs/book/component-guide/orchestrators/skypilot-vm.md index e3b4ddfe35d..24ff5f1ba5b 100644 --- a/docs/book/component-guide/orchestrators/skypilot-vm.md +++ b/docs/book/component-guide/orchestrators/skypilot-vm.md @@ -71,13 +71,18 @@ To use the SkyPilot VM Orchestrator, you need: {% tabs %} {% tab title="AWS" %} -We need first to install the SkyPilot integration for AWS and the AWS connectors extra, using the following two commands: +We need first to install the SkyPilot integration for AWS and the AWS connectors extra, using the following command: ```shell - pip install "zenml[connectors-aws]" - zenml integration install aws skypilot_aws + pip install "zenml[connectors-aws]" "skypilot[lambda]~=0.9.2" "aws-profile-manager" "boto3" ``` +{% hint style="warning" %} +Please note that currently the ZenML AWS and Skypilot integration are pip-incompatible, therefore executing `zenml integration install aws skypilot_aws` will not work. Please +install the requirements of AWS components like the container registry and artifact store +directly with pip to avoid any installation problems. +{% endhint %} + To provision VMs on AWS, your VM Orchestrator stack component needs to be configured to authenticate with [AWS Service Connector](https://docs.zenml.io/how-to/infrastructure-deployment/auth-management/aws-service-connector). To configure the AWS Service Connector, you need to register a new service connector configured with AWS credentials that have at least the minimum permissions required by SkyPilot as documented [here](https://skypilot.readthedocs.io/en/latest/cloud-setup/cloud-permissions/aws.html). First, check that the AWS service connector type is available using the following command: From 7a7ebaf125a466b2d408feafa46dbae272faf6d7 Mon Sep 17 00:00:00 2001 From: Hamza Tahir Date: Mon, 5 May 2025 12:04:08 +0200 Subject: [PATCH 14/15] Update Skypilot-VM integration instructions --- docs/book/component-guide/orchestrators/skypilot-vm.md | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/docs/book/component-guide/orchestrators/skypilot-vm.md b/docs/book/component-guide/orchestrators/skypilot-vm.md index 24ff5f1ba5b..68bb8503ae9 100644 --- a/docs/book/component-guide/orchestrators/skypilot-vm.md +++ b/docs/book/component-guide/orchestrators/skypilot-vm.md @@ -78,9 +78,7 @@ We need first to install the SkyPilot integration for AWS and the AWS connectors ``` {% hint style="warning" %} -Please note that currently the ZenML AWS and Skypilot integration are pip-incompatible, therefore executing `zenml integration install aws skypilot_aws` will not work. Please -install the requirements of AWS components like the container registry and artifact store -directly with pip to avoid any installation problems. +Please note that currently the ZenML AWS and Skypilot integration are pip-incompatible therefore executing `zenml integration install aws skypilot_aws` will not work. Please install the requirements of AWS components like the container registry and artifact store directly with pip to avoid any installation problems. {% endhint %} To provision VMs on AWS, your VM Orchestrator stack component needs to be configured to authenticate with [AWS Service Connector](https://docs.zenml.io/how-to/infrastructure-deployment/auth-management/aws-service-connector). To configure the AWS Service Connector, you need to register a new service connector configured with AWS credentials that have at least the minimum permissions required by SkyPilot as documented [here](https://skypilot.readthedocs.io/en/latest/cloud-setup/cloud-permissions/aws.html). From 4902413eb06a571accd807d37ebdcaa30f933282 Mon Sep 17 00:00:00 2001 From: Hamza Tahir Date: Mon, 5 May 2025 12:11:04 +0200 Subject: [PATCH 15/15] Add installation instructions for Skypilot AWS dependencies --- docs/book/component-guide/orchestrators/skypilot-vm.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/docs/book/component-guide/orchestrators/skypilot-vm.md b/docs/book/component-guide/orchestrators/skypilot-vm.md index 68bb8503ae9..080e9b95e31 100644 --- a/docs/book/component-guide/orchestrators/skypilot-vm.md +++ b/docs/book/component-guide/orchestrators/skypilot-vm.md @@ -74,7 +74,8 @@ To use the SkyPilot VM Orchestrator, you need: We need first to install the SkyPilot integration for AWS and the AWS connectors extra, using the following command: ```shell - pip install "zenml[connectors-aws]" "skypilot[lambda]~=0.9.2" "aws-profile-manager" "boto3" + # Installs dependencies for Skypilot AWS, AWS Container Registry, and S3 Artifact Store + pip install "zenml[connectors-aws]" "skypilot[lambda]~=0.9.2" "aws-profile-manager" boto3 argparse fsspec ``` {% hint style="warning" %}