Skip to content

Commit

Permalink
[core] Remove unused release_resources flag (#50854)
Browse files Browse the repository at this point in the history
This flag has only been set to `true` for a long time (there's even an
assertion in the code).

---------

Signed-off-by: Edward Oakes <[email protected]>
  • Loading branch information
edoakes authored Feb 24, 2025
1 parent 2325ed9 commit 9008eb6
Show file tree
Hide file tree
Showing 7 changed files with 11 additions and 33 deletions.
3 changes: 1 addition & 2 deletions src/ray/core_worker/core_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1171,8 +1171,7 @@ void CoreWorker::Exit(
exiting_detail_ = std::optional<std::string>{detail};
}
// Release the resources early in case draining takes a long time.
auto status =
local_raylet_client_->NotifyDirectCallTaskBlocked(/*release_resources*/ true);
auto status = local_raylet_client_->NotifyDirectCallTaskBlocked();
if (!status.ok()) {
RAY_LOG(WARNING)
<< "Failed to notify Raylet. It is either the raylet is already dead or the "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ Status CoreWorkerMemoryStore::GetImpl(const std::vector<ObjectID> &object_ids,
(raylet_client_ != nullptr && ctx.ShouldReleaseResourcesOnBlockingCalls());
// Wait for remaining objects (or timeout).
if (should_notify_raylet) {
RAY_CHECK_OK(raylet_client_->NotifyDirectCallTaskBlocked(/*release_resources=*/true));
RAY_CHECK_OK(raylet_client_->NotifyDirectCallTaskBlocked());
}

bool done = false;
Expand Down
1 change: 0 additions & 1 deletion src/ray/raylet/format/node_manager.fbs
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,6 @@ table NotifyUnblocked {
}

table NotifyDirectCallTaskBlocked {
release_resources: bool;
}

table NotifyDirectCallTaskUnblocked {
Expand Down
19 changes: 4 additions & 15 deletions src/ray/raylet/node_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1236,11 +1236,10 @@ void NodeManager::ProcessClientMessage(const std::shared_ptr<ClientConnection> &
ProcessFetchOrReconstructMessage(client, message_data);
} break;
case protocol::MessageType::NotifyDirectCallTaskBlocked: {
ProcessDirectCallTaskBlocked(client, message_data);
HandleDirectCallTaskBlocked(registered_worker);
} break;
case protocol::MessageType::NotifyDirectCallTaskUnblocked: {
std::shared_ptr<WorkerInterface> worker = worker_pool_.GetRegisteredWorker(client);
HandleDirectCallTaskUnblocked(worker);
HandleDirectCallTaskUnblocked(registered_worker);
} break;
case protocol::MessageType::NotifyUnblocked: {
// TODO(ekl) this is still used from core worker even in direct call mode to
Expand Down Expand Up @@ -1725,15 +1724,6 @@ void NodeManager::ProcessFetchOrReconstructMessage(
}
}

void NodeManager::ProcessDirectCallTaskBlocked(
const std::shared_ptr<ClientConnection> &client, const uint8_t *message_data) {
auto message =
flatbuffers::GetRoot<protocol::NotifyDirectCallTaskBlocked>(message_data);
RAY_CHECK(message->release_resources());
std::shared_ptr<WorkerInterface> worker = worker_pool_.GetRegisteredWorker(client);
HandleDirectCallTaskBlocked(worker, true);
}

void NodeManager::ProcessWaitRequestMessage(
const std::shared_ptr<ClientConnection> &client, const uint8_t *message_data) {
// Read the data.
Expand Down Expand Up @@ -2301,9 +2291,8 @@ void NodeManager::MarkObjectsAsFailed(
}

void NodeManager::HandleDirectCallTaskBlocked(
const std::shared_ptr<WorkerInterface> &worker, bool release_resources) {
if (!worker || worker->IsBlocked() || worker->GetAssignedTaskId().IsNil() ||
!release_resources) {
const std::shared_ptr<WorkerInterface> &worker) {
if (!worker || worker->IsBlocked() || worker->GetAssignedTaskId().IsNil()) {
return; // The worker may have died or is no longer processing the task.
}
local_task_manager_->ReleaseCpuResourcesFromBlockedWorker(worker);
Expand Down
10 changes: 1 addition & 9 deletions src/ray/raylet/node_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -349,8 +349,7 @@ class NodeManager : public rpc::NodeManagerServiceHandler,
/// arrive after the worker lease has been returned to the node manager.
///
/// \param worker Shared ptr to the worker, or nullptr if lost.
void HandleDirectCallTaskBlocked(const std::shared_ptr<WorkerInterface> &worker,
bool release_resources);
void HandleDirectCallTaskBlocked(const std::shared_ptr<WorkerInterface> &worker);

/// Handle a direct call task that is unblocked. Note that this callback may
/// arrive after the worker lease has been returned to the node manager.
Expand Down Expand Up @@ -419,13 +418,6 @@ class NodeManager : public rpc::NodeManagerServiceHandler,
/// \return Void.
void HandleJobFinished(const JobID &job_id, const JobTableData &job_data);

/// Process client message of NotifyDirectCallTaskBlocked
///
/// \param message_data A pointer to the message data.
/// \return Void.
void ProcessDirectCallTaskBlocked(const std::shared_ptr<ClientConnection> &client,
const uint8_t *message_data);

/// Process client message of RegisterClientRequest
///
/// \param client The client that sent the message.
Expand Down
4 changes: 2 additions & 2 deletions src/ray/raylet_client/raylet_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -165,9 +165,9 @@ Status RayletClient::NotifyUnblocked(const TaskID &current_task_id) {
return conn_->WriteMessage(MessageType::NotifyUnblocked, &fbb);
}

Status RayletClient::NotifyDirectCallTaskBlocked(bool release_resources) {
Status RayletClient::NotifyDirectCallTaskBlocked() {
flatbuffers::FlatBufferBuilder fbb;
auto message = protocol::CreateNotifyDirectCallTaskBlocked(fbb, release_resources);
auto message = protocol::CreateNotifyDirectCallTaskBlocked(fbb);
fbb.Finish(message);
return conn_->WriteMessage(MessageType::NotifyDirectCallTaskBlocked, &fbb);
}
Expand Down
5 changes: 2 additions & 3 deletions src/ray/raylet_client/raylet_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -350,9 +350,8 @@ class RayletClient : public RayletClientInterface {
/// Notify the raylet that this client is blocked. This is only used for direct task
/// calls. Note that ordering of this with respect to Unblock calls is important.
///
/// \param release_resources: true if the dirct call blocking needs to release
/// resources. \return ray::Status.
ray::Status NotifyDirectCallTaskBlocked(bool release_resources);
/// \return ray::Status.
ray::Status NotifyDirectCallTaskBlocked();

/// Notify the raylet that this client is unblocked. This is only used for direct task
/// calls. Note that ordering of this with respect to Block calls is important.
Expand Down

0 comments on commit 9008eb6

Please sign in to comment.