From 9008eb665bdcc978456211f01f2e93ebe052c6fa Mon Sep 17 00:00:00 2001 From: Edward Oakes Date: Mon, 24 Feb 2025 15:03:18 -0600 Subject: [PATCH] [core] Remove unused `release_resources` flag (#50854) This flag has only been set to `true` for a long time (there's even an assertion in the code). --------- Signed-off-by: Edward Oakes --- src/ray/core_worker/core_worker.cc | 3 +-- .../memory_store/memory_store.cc | 2 +- src/ray/raylet/format/node_manager.fbs | 1 - src/ray/raylet/node_manager.cc | 19 ++++--------------- src/ray/raylet/node_manager.h | 10 +--------- src/ray/raylet_client/raylet_client.cc | 4 ++-- src/ray/raylet_client/raylet_client.h | 5 ++--- 7 files changed, 11 insertions(+), 33 deletions(-) diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 5990bf8249e45..944bf84b05c47 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -1171,8 +1171,7 @@ void CoreWorker::Exit( exiting_detail_ = std::optional{detail}; } // Release the resources early in case draining takes a long time. - auto status = - local_raylet_client_->NotifyDirectCallTaskBlocked(/*release_resources*/ true); + auto status = local_raylet_client_->NotifyDirectCallTaskBlocked(); if (!status.ok()) { RAY_LOG(WARNING) << "Failed to notify Raylet. It is either the raylet is already dead or the " diff --git a/src/ray/core_worker/store_provider/memory_store/memory_store.cc b/src/ray/core_worker/store_provider/memory_store/memory_store.cc index 928edfd65e823..d20572c323f98 100644 --- a/src/ray/core_worker/store_provider/memory_store/memory_store.cc +++ b/src/ray/core_worker/store_provider/memory_store/memory_store.cc @@ -347,7 +347,7 @@ Status CoreWorkerMemoryStore::GetImpl(const std::vector &object_ids, (raylet_client_ != nullptr && ctx.ShouldReleaseResourcesOnBlockingCalls()); // Wait for remaining objects (or timeout). if (should_notify_raylet) { - RAY_CHECK_OK(raylet_client_->NotifyDirectCallTaskBlocked(/*release_resources=*/true)); + RAY_CHECK_OK(raylet_client_->NotifyDirectCallTaskBlocked()); } bool done = false; diff --git a/src/ray/raylet/format/node_manager.fbs b/src/ray/raylet/format/node_manager.fbs index 8076187e22028..f09eaa7a6088a 100644 --- a/src/ray/raylet/format/node_manager.fbs +++ b/src/ray/raylet/format/node_manager.fbs @@ -211,7 +211,6 @@ table NotifyUnblocked { } table NotifyDirectCallTaskBlocked { - release_resources: bool; } table NotifyDirectCallTaskUnblocked { diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 0c1bb0c512b39..0bce9f02d4060 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -1236,11 +1236,10 @@ void NodeManager::ProcessClientMessage(const std::shared_ptr & ProcessFetchOrReconstructMessage(client, message_data); } break; case protocol::MessageType::NotifyDirectCallTaskBlocked: { - ProcessDirectCallTaskBlocked(client, message_data); + HandleDirectCallTaskBlocked(registered_worker); } break; case protocol::MessageType::NotifyDirectCallTaskUnblocked: { - std::shared_ptr worker = worker_pool_.GetRegisteredWorker(client); - HandleDirectCallTaskUnblocked(worker); + HandleDirectCallTaskUnblocked(registered_worker); } break; case protocol::MessageType::NotifyUnblocked: { // TODO(ekl) this is still used from core worker even in direct call mode to @@ -1725,15 +1724,6 @@ void NodeManager::ProcessFetchOrReconstructMessage( } } -void NodeManager::ProcessDirectCallTaskBlocked( - const std::shared_ptr &client, const uint8_t *message_data) { - auto message = - flatbuffers::GetRoot(message_data); - RAY_CHECK(message->release_resources()); - std::shared_ptr worker = worker_pool_.GetRegisteredWorker(client); - HandleDirectCallTaskBlocked(worker, true); -} - void NodeManager::ProcessWaitRequestMessage( const std::shared_ptr &client, const uint8_t *message_data) { // Read the data. @@ -2301,9 +2291,8 @@ void NodeManager::MarkObjectsAsFailed( } void NodeManager::HandleDirectCallTaskBlocked( - const std::shared_ptr &worker, bool release_resources) { - if (!worker || worker->IsBlocked() || worker->GetAssignedTaskId().IsNil() || - !release_resources) { + const std::shared_ptr &worker) { + if (!worker || worker->IsBlocked() || worker->GetAssignedTaskId().IsNil()) { return; // The worker may have died or is no longer processing the task. } local_task_manager_->ReleaseCpuResourcesFromBlockedWorker(worker); diff --git a/src/ray/raylet/node_manager.h b/src/ray/raylet/node_manager.h index 7bde7baa28e26..0ab684032381d 100644 --- a/src/ray/raylet/node_manager.h +++ b/src/ray/raylet/node_manager.h @@ -349,8 +349,7 @@ class NodeManager : public rpc::NodeManagerServiceHandler, /// arrive after the worker lease has been returned to the node manager. /// /// \param worker Shared ptr to the worker, or nullptr if lost. - void HandleDirectCallTaskBlocked(const std::shared_ptr &worker, - bool release_resources); + void HandleDirectCallTaskBlocked(const std::shared_ptr &worker); /// Handle a direct call task that is unblocked. Note that this callback may /// arrive after the worker lease has been returned to the node manager. @@ -419,13 +418,6 @@ class NodeManager : public rpc::NodeManagerServiceHandler, /// \return Void. void HandleJobFinished(const JobID &job_id, const JobTableData &job_data); - /// Process client message of NotifyDirectCallTaskBlocked - /// - /// \param message_data A pointer to the message data. - /// \return Void. - void ProcessDirectCallTaskBlocked(const std::shared_ptr &client, - const uint8_t *message_data); - /// Process client message of RegisterClientRequest /// /// \param client The client that sent the message. diff --git a/src/ray/raylet_client/raylet_client.cc b/src/ray/raylet_client/raylet_client.cc index e6925180de230..8bb6d0e2bffe6 100644 --- a/src/ray/raylet_client/raylet_client.cc +++ b/src/ray/raylet_client/raylet_client.cc @@ -165,9 +165,9 @@ Status RayletClient::NotifyUnblocked(const TaskID ¤t_task_id) { return conn_->WriteMessage(MessageType::NotifyUnblocked, &fbb); } -Status RayletClient::NotifyDirectCallTaskBlocked(bool release_resources) { +Status RayletClient::NotifyDirectCallTaskBlocked() { flatbuffers::FlatBufferBuilder fbb; - auto message = protocol::CreateNotifyDirectCallTaskBlocked(fbb, release_resources); + auto message = protocol::CreateNotifyDirectCallTaskBlocked(fbb); fbb.Finish(message); return conn_->WriteMessage(MessageType::NotifyDirectCallTaskBlocked, &fbb); } diff --git a/src/ray/raylet_client/raylet_client.h b/src/ray/raylet_client/raylet_client.h index 5ec00a81cab44..aeffe5d5456de 100644 --- a/src/ray/raylet_client/raylet_client.h +++ b/src/ray/raylet_client/raylet_client.h @@ -350,9 +350,8 @@ class RayletClient : public RayletClientInterface { /// Notify the raylet that this client is blocked. This is only used for direct task /// calls. Note that ordering of this with respect to Unblock calls is important. /// - /// \param release_resources: true if the dirct call blocking needs to release - /// resources. \return ray::Status. - ray::Status NotifyDirectCallTaskBlocked(bool release_resources); + /// \return ray::Status. + ray::Status NotifyDirectCallTaskBlocked(); /// Notify the raylet that this client is unblocked. This is only used for direct task /// calls. Note that ordering of this with respect to Block calls is important.