From a8382e42f57a3796b8ea2f7cfdff1011422c73fb Mon Sep 17 00:00:00 2001 From: Ryan O'Leary Date: Tue, 18 Feb 2025 08:02:59 +0000 Subject: [PATCH 01/10] Check instance status before terminating nodes Signed-off-by: Ryan O'Leary --- .../ray/autoscaler/v2/instance_manager/reconciler.py | 5 ++++- python/ray/autoscaler/v2/scheduler.py | 10 ++++++++++ src/ray/protobuf/experimental/instance_manager.proto | 4 +++- 3 files changed, 17 insertions(+), 2 deletions(-) diff --git a/python/ray/autoscaler/v2/instance_manager/reconciler.py b/python/ray/autoscaler/v2/instance_manager/reconciler.py index e13ebe7c07257..5b9979c6a907d 100644 --- a/python/ray/autoscaler/v2/instance_manager/reconciler.py +++ b/python/ray/autoscaler/v2/instance_manager/reconciler.py @@ -1143,9 +1143,12 @@ def _scale_cluster( # Add terminating instances. for terminate_request in to_terminate: instance_id = terminate_request.instance_id + new_instance_status = IMInstance.RAY_STOP_REQUESTED + if terminate_request.instance_status == IMInstance.ALLOCATED: + new_instance_status = IMInstance.TERMINATING updates[terminate_request.instance_id] = IMInstanceUpdateEvent( instance_id=instance_id, - new_instance_status=IMInstance.RAY_STOP_REQUESTED, + new_instance_status=new_instance_status, termination_request=terminate_request, details=f"draining ray: {terminate_request.details}", ) diff --git a/python/ray/autoscaler/v2/scheduler.py b/python/ray/autoscaler/v2/scheduler.py index dac774c9446da..249da805a2684 100644 --- a/python/ray/autoscaler/v2/scheduler.py +++ b/python/ray/autoscaler/v2/scheduler.py @@ -169,6 +169,9 @@ class SchedulingNode: # The instance id of the IM(Instance Manager) instance. None if the node # is not yet in IM. im_instance_id: Optional[str] = None + # The instance status of the IM(Instance Manager) instance. None if the node + # is not yet in IM. + im_instance_status: Optional[str] = None # The ray node id of the ray node. None if the node is not included in # ray cluster's GCS report yet (not running ray yet). ray_node_id: Optional[str] = None @@ -187,6 +190,7 @@ def __init__( labels: Dict[str, str], status: SchedulingNodeStatus, im_instance_id: str = "", + im_instance_status: str = "", ray_node_id: str = "", idle_duration_ms: int = 0, launch_config_hash: str = "", @@ -206,6 +210,7 @@ def __init__( self.labels = labels self.status = status self.im_instance_id = im_instance_id + self.im_instance_status = im_instance_status self.ray_node_id = ray_node_id self.idle_duration_ms = idle_duration_ms self.launch_config_hash = launch_config_hash @@ -274,6 +279,7 @@ def new( labels=dict(instance.ray_node.dynamic_labels), status=SchedulingNodeStatus.SCHEDULABLE, im_instance_id=instance.im_instance.instance_id, + im_instance_status=instance.im_instance.status, ray_node_id=instance.im_instance.node_id, idle_duration_ms=instance.ray_node.idle_duration_ms, launch_config_hash=instance.im_instance.launch_config_hash, @@ -303,9 +309,11 @@ def new( labels={}, status=SchedulingNodeStatus.TO_TERMINATE, im_instance_id=instance.im_instance.instance_id, + im_instance_status=instance.im_instance.status, termination_request=TerminationRequest( id=str(uuid.uuid4()), instance_id=instance.im_instance.instance_id, + instance_status=instance.im_instance.status, cause=TerminationRequest.Cause.OUTDATED, instance_type=instance.im_instance.instance_type, ), @@ -1074,6 +1082,7 @@ def _select_nodes_to_terminate( ray_node_id=node.ray_node_id, cause=cause, instance_type=node.node_type, + instance_status=node.im_instance_status, details=( f"Terminating node due to {TerminationRequest.Cause.Name(cause)}: " f"max_num_nodes={max_num_nodes}, " @@ -1550,6 +1559,7 @@ def _terminate_outdated_nodes( instance_id=node.im_instance_id, ray_node_id=node.ray_node_id, instance_type=node.node_type, + instance_status=node.im_instance_status, cause=TerminationRequest.Cause.OUTDATED, details=f"node from {node.node_type} has outdated config", ) diff --git a/src/ray/protobuf/experimental/instance_manager.proto b/src/ray/protobuf/experimental/instance_manager.proto index dbabae6e76809..d4fd70f1c9e90 100644 --- a/src/ray/protobuf/experimental/instance_manager.proto +++ b/src/ray/protobuf/experimental/instance_manager.proto @@ -71,7 +71,7 @@ message Instance { RAY_STOPPING = 7; // Ray stopped - follows from the RAY_STOPPING/RAY_RUNNING state. RAY_STOPPED = 8; - // The instance is terminating - follows from the RAY_STOPPED state. + // The instance is terminating - follows from the RAY_STOPPED or ALLOCATED state. TERMINATING = 9; // The instance is terminated - follows from TERMINATING state or any other running // states when instance was preempted. @@ -202,6 +202,8 @@ message TerminationRequest { optional uint32 max_num_nodes = 8; // Details of the termination. string details = 9; + // The last instance status of the instance that's terminated. + optional string instance_status = 10; } message UpdateInstanceManagerStateRequest { From bbe5005c5457db839bb6b2191d6084cdd97d88b0 Mon Sep 17 00:00:00 2001 From: ryanaoleary <113500783+ryanaoleary@users.noreply.github.com> Date: Mon, 10 Mar 2025 14:26:31 -0700 Subject: [PATCH 02/10] Update python/ray/autoscaler/v2/scheduler.py Co-authored-by: Rueian Signed-off-by: ryanaoleary <113500783+ryanaoleary@users.noreply.github.com> --- python/ray/autoscaler/v2/scheduler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/ray/autoscaler/v2/scheduler.py b/python/ray/autoscaler/v2/scheduler.py index 249da805a2684..267ff73e398a0 100644 --- a/python/ray/autoscaler/v2/scheduler.py +++ b/python/ray/autoscaler/v2/scheduler.py @@ -190,7 +190,7 @@ def __init__( labels: Dict[str, str], status: SchedulingNodeStatus, im_instance_id: str = "", - im_instance_status: str = "", + im_instance_status: str = None, ray_node_id: str = "", idle_duration_ms: int = 0, launch_config_hash: str = "", From 5fbd206413a3b65f37bd62d7eaeb4cbdbc903f3a Mon Sep 17 00:00:00 2001 From: Ryan O'Leary Date: Mon, 10 Mar 2025 21:46:33 +0000 Subject: [PATCH 03/10] Add note explaining ALLOCATED->TERMINATING state transition justification Signed-off-by: Ryan O'Leary --- python/ray/autoscaler/v2/instance_manager/reconciler.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/python/ray/autoscaler/v2/instance_manager/reconciler.py b/python/ray/autoscaler/v2/instance_manager/reconciler.py index 5b9979c6a907d..83bcea465e608 100644 --- a/python/ray/autoscaler/v2/instance_manager/reconciler.py +++ b/python/ray/autoscaler/v2/instance_manager/reconciler.py @@ -1144,6 +1144,8 @@ def _scale_cluster( for terminate_request in to_terminate: instance_id = terminate_request.instance_id new_instance_status = IMInstance.RAY_STOP_REQUESTED + # The instance is not yet running, so we can't request to stop/drain Ray. + # Therefore, we can skip the RAY_STOP_REQUESTED state and directly terminate the node. if terminate_request.instance_status == IMInstance.ALLOCATED: new_instance_status = IMInstance.TERMINATING updates[terminate_request.instance_id] = IMInstanceUpdateEvent( From 3cb668d282af671c97862db307c333d53f524b94 Mon Sep 17 00:00:00 2001 From: Ryan O'Leary Date: Tue, 11 Mar 2025 18:16:22 +0000 Subject: [PATCH 04/10] Change type hint to Instance.InstanceStatus Signed-off-by: Ryan O'Leary --- python/ray/autoscaler/v2/scheduler.py | 4 ++-- src/ray/protobuf/experimental/instance_manager.proto | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/python/ray/autoscaler/v2/scheduler.py b/python/ray/autoscaler/v2/scheduler.py index 267ff73e398a0..46920ca062cf6 100644 --- a/python/ray/autoscaler/v2/scheduler.py +++ b/python/ray/autoscaler/v2/scheduler.py @@ -171,7 +171,7 @@ class SchedulingNode: im_instance_id: Optional[str] = None # The instance status of the IM(Instance Manager) instance. None if the node # is not yet in IM. - im_instance_status: Optional[str] = None + im_instance_status: Optional[Instance.InstanceStatus] = None # The ray node id of the ray node. None if the node is not included in # ray cluster's GCS report yet (not running ray yet). ray_node_id: Optional[str] = None @@ -190,7 +190,7 @@ def __init__( labels: Dict[str, str], status: SchedulingNodeStatus, im_instance_id: str = "", - im_instance_status: str = None, + im_instance_status: Optional[Instance.InstanceStatus] = None, ray_node_id: str = "", idle_duration_ms: int = 0, launch_config_hash: str = "", diff --git a/src/ray/protobuf/experimental/instance_manager.proto b/src/ray/protobuf/experimental/instance_manager.proto index d4fd70f1c9e90..9150f86c6dd44 100644 --- a/src/ray/protobuf/experimental/instance_manager.proto +++ b/src/ray/protobuf/experimental/instance_manager.proto @@ -203,7 +203,7 @@ message TerminationRequest { // Details of the termination. string details = 9; // The last instance status of the instance that's terminated. - optional string instance_status = 10; + optional Instance.InstanceStatus instance_status = 10; } message UpdateInstanceManagerStateRequest { From 3f7f1be5589a39ce3e1d14f29b2d06bc90e90540 Mon Sep 17 00:00:00 2001 From: Ryan O'Leary Date: Wed, 12 Mar 2025 23:27:04 +0000 Subject: [PATCH 05/10] InstanceStatus should use ValueType Signed-off-by: Ryan O'Leary --- python/ray/autoscaler/v2/scheduler.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/ray/autoscaler/v2/scheduler.py b/python/ray/autoscaler/v2/scheduler.py index 46920ca062cf6..d950d6e884c83 100644 --- a/python/ray/autoscaler/v2/scheduler.py +++ b/python/ray/autoscaler/v2/scheduler.py @@ -171,7 +171,7 @@ class SchedulingNode: im_instance_id: Optional[str] = None # The instance status of the IM(Instance Manager) instance. None if the node # is not yet in IM. - im_instance_status: Optional[Instance.InstanceStatus] = None + im_instance_status: Optional[Instance.InstanceStatus.ValueType] = None # The ray node id of the ray node. None if the node is not included in # ray cluster's GCS report yet (not running ray yet). ray_node_id: Optional[str] = None @@ -190,7 +190,7 @@ def __init__( labels: Dict[str, str], status: SchedulingNodeStatus, im_instance_id: str = "", - im_instance_status: Optional[Instance.InstanceStatus] = None, + im_instance_status: Optional[Instance.InstanceStatus.ValueType] = None, ray_node_id: str = "", idle_duration_ms: int = 0, launch_config_hash: str = "", From e02de1defdee2cb2914598e369cc952d602de32b Mon Sep 17 00:00:00 2001 From: ryanaoleary <113500783+ryanaoleary@users.noreply.github.com> Date: Fri, 14 Mar 2025 07:49:39 -0700 Subject: [PATCH 06/10] Update src/ray/protobuf/experimental/instance_manager.proto Co-authored-by: Kai-Hsun Chen Signed-off-by: ryanaoleary <113500783+ryanaoleary@users.noreply.github.com> --- src/ray/protobuf/experimental/instance_manager.proto | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ray/protobuf/experimental/instance_manager.proto b/src/ray/protobuf/experimental/instance_manager.proto index 9150f86c6dd44..eddd83d2c1ce7 100644 --- a/src/ray/protobuf/experimental/instance_manager.proto +++ b/src/ray/protobuf/experimental/instance_manager.proto @@ -202,7 +202,7 @@ message TerminationRequest { optional uint32 max_num_nodes = 8; // Details of the termination. string details = 9; - // The last instance status of the instance that's terminated. + // The final instance status of the instance that's going to be terminated. optional Instance.InstanceStatus instance_status = 10; } From ee394dd33d554a9a1bdd0f5a8ba0af837b1c95ef Mon Sep 17 00:00:00 2001 From: Ryan O'Leary Date: Fri, 14 Mar 2025 16:46:51 +0000 Subject: [PATCH 07/10] Add longer comment to im_instance_status Signed-off-by: Ryan O'Leary --- python/ray/autoscaler/v2/scheduler.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/ray/autoscaler/v2/scheduler.py b/python/ray/autoscaler/v2/scheduler.py index d950d6e884c83..854fbb27d5cdd 100644 --- a/python/ray/autoscaler/v2/scheduler.py +++ b/python/ray/autoscaler/v2/scheduler.py @@ -169,8 +169,8 @@ class SchedulingNode: # The instance id of the IM(Instance Manager) instance. None if the node # is not yet in IM. im_instance_id: Optional[str] = None - # The instance status of the IM(Instance Manager) instance. None if the node - # is not yet in IM. + # The instance status of the IM(Instance Manager) instance. None if the in-flight node + # has not yet been assigned to an IM instance. im_instance_status: Optional[Instance.InstanceStatus.ValueType] = None # The ray node id of the ray node. None if the node is not included in # ray cluster's GCS report yet (not running ray yet). From 26dfff299a03e808a4da7f9a023b803c09d061ee Mon Sep 17 00:00:00 2001 From: Ryan O'Leary Date: Tue, 18 Mar 2025 02:15:17 +0000 Subject: [PATCH 08/10] Add missing instance_status to return and make instance_status non optional in proto Signed-off-by: Ryan O'Leary --- python/ray/autoscaler/v2/scheduler.py | 1 + src/ray/protobuf/experimental/instance_manager.proto | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/python/ray/autoscaler/v2/scheduler.py b/python/ray/autoscaler/v2/scheduler.py index 854fbb27d5cdd..7aa46ae8e25ea 100644 --- a/python/ray/autoscaler/v2/scheduler.py +++ b/python/ray/autoscaler/v2/scheduler.py @@ -1644,6 +1644,7 @@ def _enforce_idle_termination( ray_node_id=node.ray_node_id, cause=TerminationRequest.Cause.IDLE, instance_type=node.node_type, + instance_status=node.im_instance.status, idle_duration_ms=node.idle_duration_ms, details=f"idle for {node.idle_duration_ms/s_to_ms} secs > " f"timeout={idle_timeout_s} secs", diff --git a/src/ray/protobuf/experimental/instance_manager.proto b/src/ray/protobuf/experimental/instance_manager.proto index eddd83d2c1ce7..219fcb46e8fb5 100644 --- a/src/ray/protobuf/experimental/instance_manager.proto +++ b/src/ray/protobuf/experimental/instance_manager.proto @@ -203,7 +203,7 @@ message TerminationRequest { // Details of the termination. string details = 9; // The final instance status of the instance that's going to be terminated. - optional Instance.InstanceStatus instance_status = 10; + Instance.InstanceStatus instance_status = 10; } message UpdateInstanceManagerStateRequest { From 157638bb4a8c0462e6dab6395d178b6c54785dd4 Mon Sep 17 00:00:00 2001 From: Ryan O'Leary Date: Wed, 19 Mar 2025 05:38:07 +0000 Subject: [PATCH 09/10] Fix naming Signed-off-by: Ryan O'Leary --- python/ray/autoscaler/v2/scheduler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/ray/autoscaler/v2/scheduler.py b/python/ray/autoscaler/v2/scheduler.py index 7aa46ae8e25ea..30c92e5b64b27 100644 --- a/python/ray/autoscaler/v2/scheduler.py +++ b/python/ray/autoscaler/v2/scheduler.py @@ -1644,7 +1644,7 @@ def _enforce_idle_termination( ray_node_id=node.ray_node_id, cause=TerminationRequest.Cause.IDLE, instance_type=node.node_type, - instance_status=node.im_instance.status, + instance_status=node.im_instance_status, idle_duration_ms=node.idle_duration_ms, details=f"idle for {node.idle_duration_ms/s_to_ms} secs > " f"timeout={idle_timeout_s} secs", From 7736214c59e5bf8d982683e7f0572da52e0e0bc8 Mon Sep 17 00:00:00 2001 From: Ryan O'Leary Date: Wed, 19 Mar 2025 06:11:28 +0000 Subject: [PATCH 10/10] Add unit test for deleting allocated instances when max nodes changes Signed-off-by: Ryan O'Leary --- .../ray/autoscaler/v2/tests/test_scheduler.py | 94 +++++++++++++++++++ 1 file changed, 94 insertions(+) diff --git a/python/ray/autoscaler/v2/tests/test_scheduler.py b/python/ray/autoscaler/v2/tests/test_scheduler.py index 53e4cdf34de71..6940d746e4062 100644 --- a/python/ray/autoscaler/v2/tests/test_scheduler.py +++ b/python/ray/autoscaler/v2/tests/test_scheduler.py @@ -563,6 +563,100 @@ def test_max_workers_per_type(): ) +def test_terminate_max_allocated_workers_per_type(): + scheduler = ResourceDemandScheduler(event_logger) + node_type_configs = { + "type_1": NodeTypeConfig( + name="type_1", + resources={"CPU": 1}, + min_worker_nodes=0, + max_worker_nodes=2, + ), + } + + request = sched_request( + node_type_configs=node_type_configs, + ) + + reply = scheduler.schedule(request) + + # No instances created, no-op. + expected_to_terminate = [] + _, actual_to_terminate = _launch_and_terminate(reply) + assert sorted(actual_to_terminate) == sorted(expected_to_terminate) + + instances = [ + make_autoscaler_instance( + ray_node=NodeState( + ray_node_type_name="type_0", + available_resources={"CPU": 1}, + total_resources={"CPU": 1}, + node_id=b"r0", + ), + im_instance=Instance( + instance_type="type_1", + status=Instance.ALLOCATED, + instance_id="0", + node_id="r0", + ), + ), + make_autoscaler_instance( + ray_node=NodeState( + ray_node_type_name="type_1", + available_resources={"CPU": 1}, + total_resources={"CPU": 1}, + node_id=b"r1", + ), + im_instance=Instance( + instance_type="type_1", + status=Instance.ALLOCATED, + instance_id="1", + node_id="r1", + ), + ), + ] + + # 2 nodes in allocated state with max of 2 allowed for type 1. + # Scheduler should leave all of the allocated instances. + request = sched_request( + node_type_configs=node_type_configs, + instances=instances, + ) + + reply = scheduler.schedule(request) + _, actual_to_terminate = _launch_and_terminate(reply) + assert actual_to_terminate == [] + + # Max nodes is now 0 for type 1, scheduler should terminate + # both allocated instances to conform with max num nodes per type. + node_type_configs = { + "type_1": NodeTypeConfig( + name="type_1", + resources={"CPU": 1}, + min_worker_nodes=0, + max_worker_nodes=0, + ), + } + + request = sched_request( + node_type_configs=node_type_configs, + instances=instances, + ) + + reply = scheduler.schedule(request) + _, actual_to_terminate = _launch_and_terminate(reply) + assert sorted(actual_to_terminate) == sorted( + [ + ( + "0", + "", + TerminationRequest.Cause.MAX_NUM_NODE_PER_TYPE, + ), # allocated instance + ("1", "", TerminationRequest.Cause.MAX_NUM_NODE_PER_TYPE), + ] + ) + + def test_max_num_nodes(): scheduler = ResourceDemandScheduler(event_logger) node_type_configs = {