Skip to content

Commit 1ce2dc1

Browse files
authored
[Core] Support Multiple Resources (skypilot-org#2498)
* merge master into multi-acc * debug and bash format.sh * fix bug initialize resources_pref_list * fix bug * format * improve printing * format * tmp * fix spot launch issue * fuzzing target * bug fix * fix bugs refactor * add in new_task_resources * fix list copy * len(self.resources * spacing * is_resources_ordered * Refactor a bit * remove print * fix bug * updated optimization table * comment out logger info * bug fix unordered list * format * debug and added launched accelerators in recovery strategy * bug fix * sort rows by resources_pref_list * UI tweak * multi_resources * using use_spot to sort the tables * added mixed spot and demand * address weilin comments * format.sh * fix bug * added default spot recovery to make * added test smoke, removed mixed spot, addressed comments * format.sh * added smoke test and resources_ordered * make the output log use 1 line vs. 2. * region lists, fix bug with managed spot dashboard * conditions for len(region) <= 1 * multi_resources.yaml * merge master * change yaml example * Refactor backend_utils and pytest * address comments * addressed comments * fix pytest * fix test * format * disable sky.exec on multiple resources * minor formatting changes * refactor * format.sh * style nit: avoid abbreviation * added check_resources_fit_cluster to exec * combine get_resources * added usage.lib collection * update comment * update comment * combine multi resources checks * fix multiple resources, same accelerator type * format.sh and move exec position * update list and set for resources * debugging * address comment * fix format and test * fix bug * update test * support exec * using dict to represent res * support {'A100-40GB:1', 'K80:1', 'V100:1', 'T4:1'} * address Zhanghao's code review * tmp * fix bugs * _get_resource_group_hash * added _execute * added tpu back * did the tests pass * added monkeypatch * enabled all clouds * comment out unordered * comment out unordered * fix get_task_resources_str * overwrite resources with valid_resource * removed regions from this PR * used v[chosen_resources * fix key hash issue * move check to cloud.py * assert * remove assert * refactor optimizer _optimize_objective * added in assert for optimize by cost * addressed most of the code review * fix resources import * optimize_dag_with_user * fix bug * update yaml config * fix node_to_cost_map out of sync with best_plan * check * fix bug * fix basic changes * refactor _optimize_dag * fix compare_optimization_results * format * fix bug * fix test * fix * addressed some pr comments * fix check schemas * added single resource schema * fix bug * added test_multiple_resources_unordered * address code reviews * fix ports issue * address code review * fix smoke test * smoke
1 parent aecb2de commit 1ce2dc1

22 files changed

+851
-242
lines changed

docs/source/reference/yaml-spec.rst

+10-2
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,16 @@ Available fields:
4343
# Accelerator name and count per node (optional).
4444
#
4545
# Use `sky show-gpus` to view available accelerator configurations.
46-
#
47-
# Format: <name>:<count> (or simply <name>, short for a count of 1).
46+
# The following three ways are valid for specifying accelerators for a cluster:
47+
# To specify a single accelerator:
48+
# Format: <name>:<count> (or simply <name>, short for a count of 1).
49+
# accelerators: V100:4
50+
# To specify a ordered list of accelerators: Try the accelerators in the specified order.
51+
# Format: [<name>:<count>, ...]
52+
# accelerators: ['K80:1', 'V100:1', 'T4:1']
53+
# To specify an unordered set of accelerators: Optimize all specified accelerators together, and try accelerator with lowest cost first.
54+
# Format: {<name>:<count>, ...}
55+
# accelerators: {'K80:1', 'V100:1', 'T4:1'}
4856
accelerators: V100:4
4957
5058
# Number of vCPUs per node (optional).

examples/multi_accelerators.yaml

+12
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
name: multi-accelerators
2+
3+
resources:
4+
5+
# Ordered list of accelerators: Try the accelerators in the specified order.
6+
# accelerators: ['A100-40GB:1', 'V100:1', 'K80:1', 'T4:1']
7+
8+
# Unordered set of accelerators: Optimize all specified accelerators together, and try accelerator with lowest cost first.
9+
accelerators: {'A100-40GB:1', 'K80:1', 'V100:1', 'T4:1', 'T4:4'}
10+
11+
run: |
12+
nvidia-smi

examples/multi_resources.yaml

+18
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
name: multi-resources
2+
3+
resources:
4+
ordered:
5+
- cloud: AWS
6+
accelerators: A10g
7+
- cloud: GCP
8+
accelerators: L4
9+
10+
# resources:
11+
# any_of:
12+
# - cloud: AWS
13+
# accelerators: A10g
14+
# - cloud: GCP
15+
# accelerators: L4
16+
17+
run: |
18+
nvidia-smi

sky/backends/backend_utils.py

+80-36
Original file line numberDiff line numberDiff line change
@@ -1967,43 +1967,66 @@ def check_can_clone_disk_and_override_task(
19671967
'disk is only supported when creating a new cluster. To fix: specify '
19681968
'a new target cluster name.')
19691969

1970-
assert len(task.resources) == 1, task.resources
1971-
task_resources = list(task.resources)[0]
1972-
if handle.launched_resources.disk_size > task_resources.disk_size:
1973-
# The target cluster's disk should be at least as large as the source.
1974-
with ux_utils.print_exception_no_traceback():
1975-
target_cluster_name_str = f' {target_cluster_name!r}'
1976-
if target_cluster_name is None:
1977-
target_cluster_name_str = ''
1978-
raise exceptions.NotSupportedError(
1979-
f'The target cluster{target_cluster_name_str} should have a disk size '
1980-
f'of at least {handle.launched_resources.disk_size} GB to clone the '
1981-
f'disk from {cluster_name!r}.')
1982-
override_param = {}
1970+
new_task_resources = []
19831971
original_cloud = handle.launched_resources.cloud
1984-
assert original_cloud is not None, handle.launched_resources
1985-
if task_resources.cloud is None:
1986-
override_param['cloud'] = original_cloud
1987-
else:
1988-
if not original_cloud.is_same_cloud(task_resources.cloud):
1989-
with ux_utils.print_exception_no_traceback():
1990-
raise ValueError(
1991-
f'Cannot clone disk across cloud from {original_cloud} to '
1992-
f'{task_resources.cloud}.')
19931972
original_cloud.check_features_are_supported(
19941973
{clouds.CloudImplementationFeatures.CLONE_DISK_FROM_CLUSTER})
19951974

1996-
if task_resources.region is None:
1997-
override_param['region'] = handle.launched_resources.region
1998-
1999-
if override_param:
2000-
logger.info(
2001-
f'No cloud/region specified for the task. Using the same region '
2002-
f'as source cluster {cluster_name!r}: '
2003-
f'{handle.launched_resources.cloud}'
2004-
f'({handle.launched_resources.region}).')
1975+
assert original_cloud is not None, handle.launched_resources
1976+
has_override = False
1977+
has_disk_size_met = False
1978+
has_cloud_met = False
1979+
for task_resources in task.resources:
1980+
if handle.launched_resources.disk_size > task_resources.disk_size:
1981+
# The target cluster's disk should be at least as large as the source.
1982+
continue
1983+
has_disk_size_met = True
1984+
if task_resources.cloud is not None and not original_cloud.is_same_cloud(
1985+
task_resources.cloud):
1986+
continue
1987+
has_cloud_met = True
1988+
1989+
override_param = {}
1990+
if task_resources.cloud is None:
1991+
override_param['cloud'] = original_cloud
1992+
if task_resources.region is None:
1993+
override_param['region'] = handle.launched_resources.region
1994+
1995+
if override_param:
1996+
logger.info(
1997+
f'No cloud/region specified for the task {task_resources}. Using the same region '
1998+
f'as source cluster {cluster_name!r}: '
1999+
f'{handle.launched_resources.cloud}'
2000+
f'({handle.launched_resources.region}).')
2001+
has_override = True
20052002
task_resources = task_resources.copy(**override_param)
2006-
task.set_resources({task_resources})
2003+
new_task_resources.append(task_resources)
2004+
2005+
if not new_task_resources:
2006+
if not has_disk_size_met:
2007+
with ux_utils.print_exception_no_traceback():
2008+
target_cluster_name_str = f' {target_cluster_name!r}'
2009+
if target_cluster_name is None:
2010+
target_cluster_name_str = ''
2011+
raise exceptions.NotSupportedError(
2012+
f'The target cluster{target_cluster_name_str} should have a disk size '
2013+
f'of at least {handle.launched_resources.disk_size} GB to clone the '
2014+
f'disk from {cluster_name!r}.')
2015+
if not has_cloud_met:
2016+
task_resources_cloud_str = '[' + ','.join(
2017+
[f'{res.cloud}' for res in task.resources]) + ']'
2018+
task_resources_str = '[' + ','.join(
2019+
[f'{res}' for res in task.resources]) + ']'
2020+
with ux_utils.print_exception_no_traceback():
2021+
raise ValueError(
2022+
f'Cannot clone disk across cloud from {original_cloud} to '
2023+
f'{task_resources_cloud_str} for resources {task_resources_str}.'
2024+
)
2025+
assert False, 'Should not reach here.'
2026+
# set the new_task_resources to be the same type (list or set) as the
2027+
# original task.resources
2028+
if has_override:
2029+
task.set_resources(type(task.resources)(new_task_resources))
20072030
# Reset the best_resources to triger re-optimization
20082031
# later, so that the new task_resources will be used.
20092032
task.best_resources = None
@@ -2724,11 +2747,32 @@ def get_task_demands_dict(task: 'task_lib.Task') -> Optional[Dict[str, float]]:
27242747

27252748

27262749
def get_task_resources_str(task: 'task_lib.Task') -> str:
2727-
resources_dict = get_task_demands_dict(task)
2728-
if resources_dict is None:
2729-
resources_str = f'CPU:{DEFAULT_TASK_CPU_DEMAND}'
2750+
if task.best_resources is not None:
2751+
accelerator_dict = task.best_resources.accelerators
2752+
if accelerator_dict is None:
2753+
resources_str = f'CPU:{DEFAULT_TASK_CPU_DEMAND}'
2754+
else:
2755+
resources_str = ', '.join(
2756+
f'{k}:{v}' for k, v in accelerator_dict.items())
2757+
elif len(task.resources) == 1:
2758+
resources_dict = list(task.resources)[0].accelerators
2759+
if resources_dict is None:
2760+
resources_str = f'CPU:{DEFAULT_TASK_CPU_DEMAND}'
2761+
else:
2762+
resources_str = ', '.join(
2763+
f'{k}:{v}' for k, v in resources_dict.items())
27302764
else:
2731-
resources_str = ', '.join(f'{k}:{v}' for k, v in resources_dict.items())
2765+
resource_accelerators = []
2766+
for resource in task.resources:
2767+
if resource.accelerators is None:
2768+
continue
2769+
for k, v in resource.accelerators.items():
2770+
resource_accelerators.append(f'{k}:{v}')
2771+
2772+
if resource_accelerators:
2773+
resources_str = ', '.join(set(resource_accelerators))
2774+
else:
2775+
resources_str = f'CPU:{DEFAULT_TASK_CPU_DEMAND}'
27322776
resources_str = f'{task.num_nodes}x [{resources_str}]'
27332777
return resources_str
27342778

sky/backends/cloud_vm_ray_backend.py

+73-47
Original file line numberDiff line numberDiff line change
@@ -2199,7 +2199,7 @@ def provision_with_retries(
21992199
config_dict = self._retry_zones(
22002200
to_provision,
22012201
num_nodes,
2202-
requested_resources=task.resources,
2202+
requested_resources=set(task.resources),
22032203
dryrun=dryrun,
22042204
stream_logs=stream_logs,
22052205
cluster_name=cluster_name,
@@ -2706,21 +2706,23 @@ def check_resources_fit_cluster(
27062706
handle: CloudVmRayResourceHandle,
27072707
task: task_lib.Task,
27082708
check_ports: bool = False,
2709-
):
2709+
) -> resources_lib.Resources:
27102710
"""Check if resources requested by the task fit the cluster.
27112711
27122712
The resources requested by the task should be smaller than the existing
27132713
cluster.
2714+
If multiple resources are specified, this checking will pass when
2715+
at least one resource fits the cluster.
27142716
27152717
Raises:
27162718
exceptions.ResourcesMismatchError: If the resources in the task
27172719
does not match the existing cluster.
27182720
"""
2719-
assert len(task.resources) == 1, task.resources
27202721

27212722
launched_resources = handle.launched_resources
2722-
task_resources = list(task.resources)[0]
27232723
cluster_name = handle.cluster_name
2724+
2725+
# Usage Collection:
27242726
usage_lib.messages.usage.update_cluster_resources(
27252727
handle.launched_nodes, launched_resources)
27262728
record = global_user_state.get_cluster_from_name(cluster_name)
@@ -2739,40 +2741,55 @@ def check_resources_fit_cluster(
27392741
launched_resources)
27402742
mismatch_str = ('To fix: use accelerators/number of nodes that can '
27412743
'be satisfied by the local cluster')
2742-
# Requested_resources <= actual_resources.
2743-
# Special handling for local cloud case, which assumes a cluster can
2744-
# be heterogeneous. Here, launched_resources is a list of custom
2745-
# accelerators per node, and Resources.less_demanding_than determines
2746-
# how many nodes satisfy task resource requirements.
2747-
if not (task.num_nodes <= handle.launched_nodes and
2748-
task_resources.less_demanding_than(
2749-
launched_resources,
2750-
requested_num_nodes=task.num_nodes,
2751-
check_ports=check_ports)):
2752-
if (task_resources.region is not None and
2753-
task_resources.region != launched_resources.region):
2754-
with ux_utils.print_exception_no_traceback():
2755-
raise exceptions.ResourcesMismatchError(
2756-
'Task requested resources in region '
2757-
f'{task_resources.region!r}, but the existing cluster '
2758-
f'is in region {launched_resources.region!r}.')
2759-
if (task_resources.zone is not None and
2760-
task_resources.zone != launched_resources.zone):
2761-
zone_str = (f'is in zone {launched_resources.zone!r}.'
2762-
if launched_resources.zone is not None else
2763-
'does not have zone specified.')
2764-
with ux_utils.print_exception_no_traceback():
2765-
raise exceptions.ResourcesMismatchError(
2766-
'Task requested resources in zone '
2767-
f'{task_resources.zone!r}, but the existing cluster '
2768-
f'{zone_str}')
2744+
2745+
valid_resource = None
2746+
requested_resource_list = []
2747+
for resource in task.resources:
2748+
if (task.num_nodes <= handle.launched_nodes and
2749+
resource.less_demanding_than(
2750+
launched_resources,
2751+
requested_num_nodes=task.num_nodes,
2752+
check_ports=check_ports)):
2753+
valid_resource = resource
2754+
break
2755+
else:
2756+
requested_resource_list.append(f'{task.num_nodes}x {resource}')
2757+
2758+
if valid_resource is None:
2759+
for example_resource in task.resources:
2760+
if (example_resource.region is not None and
2761+
example_resource.region != launched_resources.region):
2762+
with ux_utils.print_exception_no_traceback():
2763+
raise exceptions.ResourcesMismatchError(
2764+
f'Task requested resources {example_resource} in region ' # pylint: disable=line-too-long
2765+
f'{example_resource.region!r}'
2766+
', but the existing cluster '
2767+
f'is in region {launched_resources.region!r}.')
2768+
if (example_resource.zone is not None and
2769+
example_resource.zone != launched_resources.zone):
2770+
zone_str = (f'is in zone {launched_resources.zone!r}.'
2771+
if launched_resources.zone is not None else
2772+
'does not have zone specified.')
2773+
with ux_utils.print_exception_no_traceback():
2774+
raise exceptions.ResourcesMismatchError(
2775+
f'Task requested resources {example_resource} in zone ' # pylint: disable=line-too-long
2776+
f'{example_resource.zone!r},'
2777+
'but the existing cluster '
2778+
f'{zone_str}')
2779+
requested_resource_str = ', '.join(requested_resource_list)
2780+
if isinstance(task.resources, list):
2781+
requested_resource_str = f'[{requested_resource_str}]'
2782+
elif isinstance(task.resources, set):
2783+
requested_resource_str = f'{{{requested_resource_str}}}'
27692784
with ux_utils.print_exception_no_traceback():
27702785
raise exceptions.ResourcesMismatchError(
2771-
'Requested resources do not match the existing cluster.\n'
2772-
f' Requested:\t{task.num_nodes}x {task_resources} \n'
2786+
'Requested resources do not match the existing '
2787+
'cluster.\n'
2788+
f' Requested:\t{requested_resource_str}\n'
27732789
f' Existing:\t{handle.launched_nodes}x '
27742790
f'{handle.launched_resources}\n'
27752791
f'{mismatch_str}')
2792+
return valid_resource
27762793

27772794
def _provision(
27782795
self,
@@ -3092,7 +3109,7 @@ def _update_after_cluster_provisioned(
30923109
global_user_state.add_or_update_cluster(
30933110
handle.cluster_name,
30943111
handle,
3095-
task.resources,
3112+
set(task.resources),
30963113
ready=True,
30973114
)
30983115
usage_lib.messages.usage.update_final_cluster_status(
@@ -3499,23 +3516,31 @@ def _execute(
34993516
# Check the task resources vs the cluster resources. Since `sky exec`
35003517
# will not run the provision and _check_existing_cluster
35013518
# We need to check ports here since sky.exec shouldn't change resources
3502-
self.check_resources_fit_cluster(handle, task, check_ports=True)
3503-
3504-
resources_str = backend_utils.get_task_resources_str(task)
3519+
valid_resource = self.check_resources_fit_cluster(handle,
3520+
task,
3521+
check_ports=True)
3522+
task_copy = copy.copy(task)
3523+
# Handle multiple resources exec case.
3524+
task_copy.set_resources(valid_resource)
3525+
if len(task.resources) > 1:
3526+
logger.info('Multiple resources are specified'
3527+
f'for the task, using: {valid_resource}')
3528+
task_copy.best_resources = None
3529+
resources_str = backend_utils.get_task_resources_str(task_copy)
35053530

35063531
if dryrun:
35073532
logger.info(f'Dryrun complete. Would have run:\n{task}')
35083533
return None
35093534

3510-
job_id = self._add_job(handle, task.name, resources_str)
3535+
job_id = self._add_job(handle, task_copy.name, resources_str)
35113536

35123537
is_tpu_vm_pod = tpu_utils.is_tpu_vm_pod(handle.launched_resources)
35133538
# Case: task_lib.Task(run, num_nodes=N) or TPU VM Pods
3514-
if task.num_nodes > 1 or is_tpu_vm_pod:
3515-
self._execute_task_n_nodes(handle, task, job_id, detach_run)
3539+
if task_copy.num_nodes > 1 or is_tpu_vm_pod:
3540+
self._execute_task_n_nodes(handle, task_copy, job_id, detach_run)
35163541
else:
35173542
# Case: task_lib.Task(run, num_nodes=1)
3518-
self._execute_task_one_node(handle, task, job_id, detach_run)
3543+
self._execute_task_one_node(handle, task_copy, job_id, detach_run)
35193544

35203545
return job_id
35213546

@@ -4343,7 +4368,9 @@ def _check_existing_cluster(
43434368
self.check_resources_fit_cluster(handle, task)
43444369
# Use the existing cluster.
43454370
assert handle.launched_resources is not None, (cluster_name, handle)
4346-
assert len(task.resources) == 1
4371+
# Assume resources share the same ports.
4372+
for resource in task.resources:
4373+
assert resource.ports == list(task.resources)[0].ports
43474374
all_ports = resources_utils.port_set_to_ranges(
43484375
resources_utils.port_ranges_to_set(
43494376
handle.launched_resources.ports) |
@@ -4359,13 +4386,12 @@ def _check_existing_cluster(
43594386
prev_cluster_status=prev_cluster_status,
43604387
prev_handle=handle)
43614388
usage_lib.messages.usage.set_new_cluster()
4362-
assert len(task.resources) == 1, task.resources
43634389
# Use the task_cloud, because the cloud in `to_provision` can be changed
43644390
# later during the retry.
4365-
resources = list(task.resources)[0]
4366-
task_cloud = (resources.cloud
4367-
if resources.cloud is not None else clouds.Cloud)
4368-
task_cloud.check_cluster_name_is_valid(cluster_name)
4391+
for resources in task.resources:
4392+
task_cloud = (resources.cloud
4393+
if resources.cloud is not None else clouds.Cloud)
4394+
task_cloud.check_cluster_name_is_valid(cluster_name)
43694395

43704396
if to_provision is None:
43714397
# The cluster is recently terminated either by autostop or manually

0 commit comments

Comments
 (0)