Skip to content

Commit

Permalink
fix lock issue
Browse files Browse the repository at this point in the history
  • Loading branch information
NamelessOIer committed Nov 12, 2024
1 parent a3dab1d commit 2f5fd58
Showing 1 changed file with 59 additions and 52 deletions.
111 changes: 59 additions & 52 deletions src/CraneCtld/TaskScheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1370,7 +1370,8 @@ crane::grpc::CreateReservationReply TaskScheduler::CreateReservation(
reply.set_reason(fmt::format("Partition {} not found", partition));
return reply;
}
auto& partition_meta_ptr = all_partitions_meta_map->at(partition);
const util::Synchronized<PartitionMeta>& partition_meta_ptr =
all_partitions_meta_map->at(partition);
auto craned_meta_map = g_meta_container->GetCranedMetaMapConstPtr();
auto reservation_meta_map = g_meta_container->GetReservationMetaMapPtr();

Expand All @@ -1383,8 +1384,6 @@ crane::grpc::CreateReservationReply TaskScheduler::CreateReservation(
std::vector<std::pair<CranedMetaContainer::CranedMetaPtr, ResourceInNode>>
craned_meta_res_vec;

LockGuard running_guard(&m_running_task_map_mtx_);

ResourceV2 allocated_res;
for (CranedId const& craned_id : craned_ids) {
if (!partition_meta_ptr.GetExclusivePtr()->craned_ids.contains(craned_id)) {
Expand All @@ -1393,62 +1392,70 @@ crane::grpc::CreateReservationReply TaskScheduler::CreateReservation(
fmt::format("Node {} is not in partition {}", craned_id, partition));
return reply;
}
auto craned_meta = g_meta_container->GetCranedMetaPtr(craned_id);
if (craned_meta.get() == nullptr) {
reply.set_ok(false);
reply.set_reason(fmt::format("Node {} not found", craned_id));
return reply;
}
ResourceInNode res_avail = craned_meta->res_total;
for (const auto& [task_id, res] : craned_meta->running_task_resource_map) {
const auto& task = m_running_task_map_.at(task_id);
absl::Time task_end_time = task->StartTime() + task->time_limit;
if (task_end_time > start_time) {
if (whole_node) {
reply.set_ok(false);
reply.set_reason(
fmt::format("Node {} has running tasks that end after the "
"reservation start time",
craned_id));
return reply;
}

{
LockGuard running_guard(&m_running_task_map_mtx_);

for (CranedId const& craned_id : craned_ids) {
auto craned_meta = g_meta_container->GetCranedMetaPtr(craned_id);
if (craned_meta.get() == nullptr) {
reply.set_ok(false);
reply.set_reason(fmt::format("Node {} not found", craned_id));
return reply;
}
ResourceInNode res_avail = craned_meta->res_total;
for (const auto& [task_id, res] :
craned_meta->running_task_resource_map) {
const auto& task = m_running_task_map_.at(task_id);
absl::Time task_end_time = task->StartTime() + task->time_limit;
if (task_end_time > start_time) {
if (whole_node) {
reply.set_ok(false);
reply.set_reason(
fmt::format("Node {} has running tasks that end after the "
"reservation start time",
craned_id));
return reply;
}
res_avail -= res;
}
res_avail -= res;
}
}
for (const auto& [reservation_name, res] :
craned_meta->reservation_resource_map) {
const auto& reservation =
reservation_meta_map->at(reservation_name).GetExclusivePtr();
if (reservation->start_time < end_time &&
reservation->end_time > start_time) {
if (whole_node) {
for (const auto& [reservation_name, res] :
craned_meta->reservation_resource_map) {
const auto& reservation =
reservation_meta_map->at(reservation_name).GetExclusivePtr();
if (reservation->start_time < end_time &&
reservation->end_time > start_time) {
if (whole_node) {
reply.set_ok(false);
reply.set_reason(
fmt::format("Node {} has reservations that overlap with the "
"new reservation",
craned_id));
return reply;
}
res_avail -= res;
}
}
ResourceInNode feasible_res;
if (whole_node) {
feasible_res = res_avail;
} else {
bool ok = resources.GetFeasibleResourceInNode(res_avail, &feasible_res);
if (!ok) {
reply.set_ok(false);
reply.set_reason(
fmt::format("Node {} has reservations that overlap with the "
"new reservation",
craned_id));
reply.set_reason(fmt::format(
"Node {} does not have enough resources for the reservation",
craned_id));
return reply;
}
res_avail -= res;
}
}
ResourceInNode feasible_res;
if (whole_node) {
feasible_res = res_avail;
} else {
bool ok = resources.GetFeasibleResourceInNode(res_avail, &feasible_res);
if (!ok) {
reply.set_ok(false);
reply.set_reason(fmt::format(
"Node {} does not have enough resources for the reservation",
craned_id));
return reply;
}
}

allocated_res.AddResourceInNode(craned_id, feasible_res);
craned_meta_res_vec.emplace_back(std::move(craned_meta),
std::move(feasible_res));
allocated_res.AddResourceInNode(craned_id, feasible_res);
craned_meta_res_vec.emplace_back(std::move(craned_meta),
std::move(feasible_res));
}
}

const auto& [it, ok] = reservation_meta_map->emplace(
Expand Down

0 comments on commit 2f5fd58

Please sign in to comment.