Skip to content

[core] Move locations when recovering and correct logs + comments #52394

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Apr 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 25 additions & 28 deletions src/ray/core_worker/core_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -868,30 +868,28 @@ CoreWorker::CoreWorker(CoreWorkerOptions options, const WorkerID &worker_id)
gcs_client_, *actor_task_submitter_, *reference_counter_);

std::function<Status(const ObjectID &object_id, const ObjectLookupCallback &callback)>
object_lookup_fn;

object_lookup_fn = [this, node_addr_factory](const ObjectID &object_id,
const ObjectLookupCallback &callback) {
std::vector<rpc::Address> locations;
const std::optional<absl::flat_hash_set<NodeID>> object_locations =
reference_counter_->GetObjectLocations(object_id);
if (object_locations.has_value()) {
locations.reserve(object_locations.value().size());
for (const auto &node_id : object_locations.value()) {
std::optional<rpc::Address> addr = node_addr_factory(node_id);
if (addr.has_value()) {
locations.emplace_back(std::move(addr.value()));
continue;
object_lookup_fn = [this, node_addr_factory](const ObjectID &object_id,
const ObjectLookupCallback &callback) {
std::vector<rpc::Address> locations;
const std::optional<absl::flat_hash_set<NodeID>> object_locations =
reference_counter_->GetObjectLocations(object_id);
if (object_locations.has_value()) {
locations.reserve(object_locations.value().size());
for (const auto &node_id : object_locations.value()) {
std::optional<rpc::Address> addr = node_addr_factory(node_id);
if (addr.has_value()) {
locations.emplace_back(std::move(addr.value()));
continue;
}
// We're getting potentially stale locations directly from the reference
// counter, so the location might be a dead node.
RAY_LOG(DEBUG).WithField(object_id).WithField(node_id)
<< "Object location is dead, not using it in the recovery of object";
}
}
// We're getting potentially stale locations directly from the reference
// counter, so the location might be a dead node.
RAY_LOG(DEBUG).WithField(object_id).WithField(node_id)
<< "Object location is dead, not using it in the recovery of object";
}
}
callback(object_id, locations);
return Status::OK();
};
callback(object_id, std::move(locations));
return Status::OK();
};
object_recovery_manager_ = std::make_unique<ObjectRecoveryManager>(
rpc_address_,
raylet_client_factory,
Expand All @@ -903,9 +901,7 @@ CoreWorker::CoreWorker(CoreWorkerOptions options, const WorkerID &worker_id)
[this](const ObjectID &object_id, rpc::ErrorType reason, bool pin_object) {
RAY_LOG(DEBUG).WithField(object_id)
<< "Failed to recover object due to " << rpc::ErrorType_Name(reason);
// NOTE(swang): Failure here means the local raylet is probably dead.
// We do not assert failure though, because we should throw the object
// error to the application.
// We should throw the object error to the application.
RAY_UNUSED(Put(RayObject(reason),
/*contained_object_ids=*/{},
object_id,
Expand Down Expand Up @@ -945,8 +941,9 @@ CoreWorker::CoreWorker(CoreWorkerOptions options, const WorkerID &worker_id)
if (!lost_objects.empty()) {
// Keep :info_message: in sync with LOG_PREFIX_INFO_MESSAGE in ray_constants.py.
RAY_LOG(ERROR) << ":info_message: Attempting to recover " << lost_objects.size()
<< " lost objects by resubmitting their tasks. To disable "
<< "object reconstruction, set @ray.remote(max_retries=0).";
<< " lost objects by resubmitting their tasks or setting a new "
"primary location from existing copies. To disable object "
"reconstruction, set @ray.remote(max_retries=0).";
// Delete the objects from the in-memory store to indicate that they are not
// available. The object recovery manager will guarantee that a new value
// will eventually be stored for the objects (either an
Expand Down
58 changes: 29 additions & 29 deletions src/ray/core_worker/object_recovery_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,12 @@ bool ObjectRecoveryManager::RecoverObject(const ObjectID &object_id) {
}
RAY_LOG(INFO).WithField(object_id) << "Recovery complete for object";
});
// Lookup the object in the GCS to find another copy.
// Gets the node ids from reference_counter and then gets addresses from the local
// gcs_client.
RAY_CHECK_OK(object_lookup_(
object_id,
[this](const ObjectID &object_id, const std::vector<rpc::Address> &locations) {
PinOrReconstructObject(object_id, locations);
[this](const ObjectID &object_id, std::vector<rpc::Address> locations) {
PinOrReconstructObject(object_id, std::move(locations));
}));
} else if (requires_recovery) {
RAY_LOG(DEBUG).WithField(object_id) << "Recovery already started for object";
Expand All @@ -88,17 +89,16 @@ bool ObjectRecoveryManager::RecoverObject(const ObjectID &object_id) {
return true;
}

void ObjectRecoveryManager::PinOrReconstructObject(
const ObjectID &object_id, const std::vector<rpc::Address> &locations) {
void ObjectRecoveryManager::PinOrReconstructObject(const ObjectID &object_id,
std::vector<rpc::Address> locations) {
RAY_LOG(DEBUG).WithField(object_id)
<< "Lost object has " << locations.size() << " locations";
// The object to recovery has secondary copies, pin one copy to promote it to primary
// one.
if (!locations.empty()) {
auto locations_copy = locations;
const auto location = std::move(locations_copy.back());
locations_copy.pop_back();
PinExistingObjectCopy(object_id, location, locations_copy);
const auto location = std::move(locations.back());
locations.pop_back();
PinExistingObjectCopy(object_id, location, std::move(locations));
} else {
// There are no more copies to pin, try to reconstruct the object.
ReconstructObject(object_id);
Expand All @@ -108,7 +108,7 @@ void ObjectRecoveryManager::PinOrReconstructObject(
void ObjectRecoveryManager::PinExistingObjectCopy(
const ObjectID &object_id,
const rpc::Address &raylet_address,
const std::vector<rpc::Address> &other_locations) {
std::vector<rpc::Address> other_locations) {
// If a copy still exists, pin the object by sending a
// PinObjectIDs RPC.
const auto node_id = NodeID::FromBinary(raylet_address.raylet_id());
Expand All @@ -132,25 +132,25 @@ void ObjectRecoveryManager::PinExistingObjectCopy(
client = client_it->second;
}

client->PinObjectIDs(rpc_address_,
{object_id},
/*generator_id=*/ObjectID::Nil(),
[this, object_id, other_locations, node_id](
const Status &status, const rpc::PinObjectIDsReply &reply) {
if (status.ok() && reply.successes(0)) {
// TODO(swang): Make sure that the node is still alive when
// marking the object as pinned.
RAY_CHECK(in_memory_store_.Put(
RayObject(rpc::ErrorType::OBJECT_IN_PLASMA), object_id));
reference_counter_.UpdateObjectPinnedAtRaylet(object_id,
node_id);
} else {
RAY_LOG(INFO).WithField(object_id)
<< "Error pinning secondary copy of lost object due to "
<< status << ", trying again with other locations";
PinOrReconstructObject(object_id, other_locations);
}
});
client->PinObjectIDs(
rpc_address_,
{object_id},
/*generator_id=*/ObjectID::Nil(),
[this, object_id, other_locations = std::move(other_locations), node_id](
const Status &status, const rpc::PinObjectIDsReply &reply) mutable {
if (status.ok() && reply.successes(0)) {
// TODO(swang): Make sure that the node is still alive when
// marking the object as pinned.
RAY_CHECK(in_memory_store_.Put(RayObject(rpc::ErrorType::OBJECT_IN_PLASMA),
object_id));
reference_counter_.UpdateObjectPinnedAtRaylet(object_id, node_id);
} else {
RAY_LOG(INFO).WithField(object_id)
<< "Error pinning secondary copy of lost object due to " << status
<< ", trying again with other locations";
PinOrReconstructObject(object_id, std::move(other_locations));
}
});
}

void ObjectRecoveryManager::ReconstructObject(const ObjectID &object_id) {
Expand Down
6 changes: 3 additions & 3 deletions src/ray/core_worker/object_recovery_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ using ObjectPinningClientFactoryFn = std::function<std::shared_ptr<PinObjectsInt
const std::string &ip_address, int port)>;

using ObjectLookupCallback = std::function<void(
const ObjectID &object_id, const std::vector<rpc::Address> &raylet_locations)>;
const ObjectID &object_id, std::vector<rpc::Address> raylet_locations)>;

// A callback for if we fail to recover an object.
using ObjectRecoveryFailureCallback = std::function<void(
Expand Down Expand Up @@ -99,13 +99,13 @@ class ObjectRecoveryManager {
/// fails, attempt to reconstruct it by resubmitting the task that created
/// the object.
void PinOrReconstructObject(const ObjectID &object_id,
const std::vector<rpc::Address> &locations);
std::vector<rpc::Address> locations);

/// Pin a new copy for the object at the given location. If that fails, then
/// try one of the other locations.
void PinExistingObjectCopy(const ObjectID &object_id,
const rpc::Address &raylet_address,
const std::vector<rpc::Address> &other_locations);
std::vector<rpc::Address> other_locations);

/// Reconstruct an object by resubmitting the task that created it.
void ReconstructObject(const ObjectID &object_id);
Expand Down