Skip to content

Commit

Permalink
Bugfix: fix sched algo assert failed and skipping task (#324)
Browse files Browse the repository at this point in the history
  • Loading branch information
NamelessOIer authored Sep 20, 2024
1 parent 4852039 commit 54e662f
Showing 1 changed file with 134 additions and 82 deletions.
216 changes: 134 additions & 82 deletions src/CraneCtld/TaskScheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#include "TaskScheduler.h"

#include <absl/time/time.h>
#include <google/protobuf/util/time_util.h>

#include <future>
Expand Down Expand Up @@ -155,6 +156,7 @@ bool TaskScheduler::Init() {
task->nodes_alloc = 0;
task->allocated_craneds_regex.clear();
task->CranedIdsClear();
task->executing_craned_ids.clear();

ok = g_embedded_db_client->UpdateRuntimeAttrOfTask(0, task->TaskDbId(),
task->RuntimeAttr());
Expand Down Expand Up @@ -262,6 +264,7 @@ bool TaskScheduler::Init() {
task->nodes_alloc = 0;
task->allocated_craneds_regex.clear();
task->CranedIdsClear();
task->executing_craned_ids.clear();

ok = g_embedded_db_client->UpdateRuntimeAttrOfTask(
0, task->TaskDbId(), task->RuntimeAttr());
Expand Down Expand Up @@ -1861,8 +1864,17 @@ void MinLoadFirst::CalculateNodeSelectionInfoOfPartition_(
std::vector<std::string> running_task_ids_str;
for (const auto& [task_id, res] : craned_meta->running_task_resource_map) {
const auto& task = running_tasks.at(task_id);
end_time_task_id_vec.emplace_back(task->StartTime() + task->time_limit,
task_id);

// For some completing tasks,
// task->StartTime() + task->time_limit <= absl::Now().
// In this case,
// max(task->StartTime() + task->time_limit, now + absl::Seconds(1))
// should be taken for end time,
// otherwise, tasks might be scheduled and executed even when
// res_avail = 0 and will cause a severe error where res_avail < 0.
absl::Time end_time = std::max(task->StartTime() + task->time_limit,
now + absl::Seconds(1));
end_time_task_id_vec.emplace_back(end_time, task_id);

running_task_ids_str.emplace_back(std::to_string(task_id));
}
Expand Down Expand Up @@ -2005,16 +2017,13 @@ bool MinLoadFirst::CalculateRunningNodesAndStartTime_(
++task_num_node_id_it;
continue;
}

auto& time_avail_res_map =
node_selection_info.node_time_avail_res_map.at(craned_index);
auto craned_meta = craned_meta_map.at(craned_index).GetExclusivePtr();

// TODO: Trim here after finishing ResourceView.
AllocatableResource const& task_alloc_res =
task->Resources().EachNodeResMap().begin()->second.allocatable_res;

// If any of the follow `if` is true, skip this node.
if (!(task->requested_node_res_view <= craned_meta->res_avail)) {
if (!(task->requested_node_res_view <= craned_meta->res_total)) {
if constexpr (kAlgoTraceOutput) {
CRANE_TRACE(
"Task #{} needs more resource than that of craned {}. "
Expand Down Expand Up @@ -2047,6 +2056,7 @@ bool MinLoadFirst::CalculateRunningNodesAndStartTime_(
"selected_node_cnt != task->node_num");

ResourceV2 allocated_res;
task->allocated_res_view.SetToZero();

for (const auto& craned_id : craned_indexes_) {
const auto& craned_meta = craned_meta_map.at(craned_id).GetExclusivePtr();
Expand Down Expand Up @@ -2076,7 +2086,6 @@ bool MinLoadFirst::CalculateRunningNodesAndStartTime_(

auto& time_avail_res_map =
node_selection_info.node_time_avail_res_map.at(craned_id);
// auto& node_meta = craned_meta_map.at(craned_id);

// Find all valid time segments in this node for this task.
// The expected start time must exist because all tasks in
Expand Down Expand Up @@ -2150,12 +2159,10 @@ bool MinLoadFirst::CalculateRunningNodesAndStartTime_(

if constexpr (kAlgoTraceOutput) {
std::vector<std::string> valid_seg_str;
for (auto& seg : intersected_time_segments) {
valid_seg_str.emplace_back(
fmt::format("[start: {}, duration: {}]",
absl::ToInt64Seconds(seg.start - now),
absl::ToInt64Seconds(seg.duration)));
}
for (auto& seg : intersected_time_segments)
valid_seg_str.emplace_back(fmt::format(
"[start: {}, end: {})", absl::ToInt64Seconds(seg.start - now),
absl::ToInt64Seconds(seg.start - now + seg.duration)));
CRANE_TRACE("After looping craned {}, valid time segments: {}",
craned_id, absl::StrJoin(valid_seg_str, ", "));
}
Expand All @@ -2165,26 +2172,35 @@ bool MinLoadFirst::CalculateRunningNodesAndStartTime_(
for (auto&& seg : time_segments) {
absl::Time start = seg.start;
absl::Time end = seg.start + seg.duration;

if constexpr (kAlgoTraceOutput) {
CRANE_TRACE(
"Trying to intersect time segment: [start: {}, end: "
"{})",
absl::ToInt64Seconds(start - now),
absl::ToInt64Seconds(start - now + seg.duration));
}

// Segment: [start, end)
// e.g. segment.start=5, segment.duration=1s => [5,6)

// Find the first time point that >= seg.start + seg.duration
auto it2 = std::lower_bound(intersected_time_segments.begin(),
intersected_time_segments.end(), end);

// If it2 == intersected_time_segments.begin(), this time segment has
// no overlap with any time segment in intersected_time_segments. Just
// skip it.
// it2
// V
// seg *------* *----* .... intersected_time_segments
// *----------*
// ^
// end
//
// Do nothing under such situation.

if (it2 == intersected_time_segments.begin()) {
// If it2 == intersected_time_segments.begin(),
// this time segment has no overlap with any time segment
// in intersected_time_segments.
// Just skip it.
// it2
// V
// seg *------* *----* .... intersected_time_segments
// *----------*
// ^
// end
//
// Do nothing under such situation.
continue;
} else {
it2 = std::prev(it2);
Expand All @@ -2201,80 +2217,117 @@ bool MinLoadFirst::CalculateRunningNodesAndStartTime_(
// We first find a segment (it1) in `intersected_time_segments`
// whose start < seg.start and is closet to seg.start...
// There may be 2 situation:
// 1.
// A1.
// start
// V
// *-------* seg
// *--------* *----*
// ^
// it1 ( != intersected_time_segment.end() )
//
// 2.
// A2.
// *-------* seg *-------* seg
// *--* or *---*
// ^ ^
// it1 it1 ( == intersected_time_segment.begin())
auto it1 = std::upper_bound(intersected_time_segments.begin(),
auto it1 = std::lower_bound(intersected_time_segments.begin(),
intersected_time_segments.end(), start);
if (it1 != intersected_time_segments.begin())
// If it1 == intersected_time_segments.begin(), there is no time
// segment that starts previous to seg.start but there are some
// segments immediately after seg.start. The first one of them is
// the beginning of intersected_time_segments.
// begin() it2
// V
// *---* *----*
// *-------------------*
// ^
// start
{
if (it1 == intersected_time_segments.begin()) {
// Case A2:
//
// If it1 == intersected_time_segments.begin(), there is no time
// segment that starts previous to seg.start but there are some
// segments immediately after seg.start. The first one of them is
// the beginning of intersected_time_segments.
// it1 == begin() it2
// V V
// *---* *----*
// *--------------------* seg
// ^
// start
} else {
// Case A1:
// If there is an overlap between first segment and `seg`, take
// the intersection. std::prev(it1) it1 V V
// the intersection.

// Case A1-1 (end >= it1->start):
//
// std::prev(it1) it1
// V V
// *----------------------------* *--------*
// *-----------------------------*
// |<-intersected part->| ^
// ^ |
// start end
//
// OR
//
// Case A1-2 (end < it1->start):
// it0 == std::prev(it1) it1
// V V
// *--------------------------------* *--------*
// *--------------------*
// |<-intersected part->|
// ^
// start
it1 = std::prev(it1);
if (it1->start + it1->duration > start) {
// Note: If start == seg.start, there's no intersection.
new_intersected_time_segments.emplace_back(
start, it1->start + it1->duration - start);
// ^ ^
// start end
auto it0 = std::prev(it1);
if (it0->start + it0->duration > start) {
// Note: If it0->start + it0->duration == seg.start,
// there's no intersection.

absl::Duration intersected_duration;
if (end < it0->start + it0->duration)
// Case A1-2
intersected_duration = end - start;
else
// Case A1-1
intersected_duration = it0->start + it0->duration - start;

new_intersected_time_segments.emplace_back(start,
intersected_duration);
}
}

// |<-- range -->|
// it1 it should start here it2
// v v v
// *~~~~~~~* *----* *--------* *~~~~~~~~~~~~*
// *--------------------------------*
// If std::distance(it1, it2) >= 2, there are other segments
// between it1 and it2.
if (std::distance(it1, it2) >= 2) {
auto it = std::next(it1);
do {
new_intersected_time_segments.emplace_back(it->start,
it->duration);
++it;
} while (it != it2);
}

// the last insertion handles the following 2 situations.
// it2
// *~~~~~~~* *~~~~~~~* *~~~~~~~~* *------------*
// *--------------------------------*
// OR
// it2
// *~~~~~~~* *~~~~~~~* *~~~~~~~~* *------*
// *-------------------------------------*
// |<-- intersected range -->|
// it1 it2
// v v
// *~~~~~~~* *----* *--------* *~~~~~~~~~~~~*
// *----------------------------------------*
//
//
// The following situation will NOT happen.
// it2
// *~~~~~~~* *~~~~~~~* *~~~~~~~~* *------*
// *-------------------------------------*
if (std::distance(it1, it2) >= 1)
// Case A1-3 (There's no half-intersected tail segment):
// it2 it1
// v v
// *-------------* *--------------*
// *----------*
//
// Or
//
// it2 it1 == end()
// v v
// *-------------*
// *----------*
//
// Note: In case A1-3, it2 < it1.
// Thus, termination condition should be (it2 < it1).
for (auto it = it1; it < it2; ++it)
new_intersected_time_segments.emplace_back(it->start, it->duration);

if (it2 < it1) {
// Case A1-3.
// No half-intersected tail segment should be handled.
} else {
// the last insertion handles the following 2 situations.
// it2
// *~~~~~~~* *~~~~~~~* *~~~~~~~~* *------------*
// *--------------------------------*
// OR
// it2
// *~~~~~~~* *~~~~~~~* *~~~~~~~~* *------*
// *-------------------------------------*
new_intersected_time_segments.emplace_back(
it2->start, std::min(it2->duration, end - it2->start));
}
}
}

Expand All @@ -2283,10 +2336,9 @@ bool MinLoadFirst::CalculateRunningNodesAndStartTime_(
if constexpr (kAlgoTraceOutput) {
std::vector<std::string> valid_seg_str;
for (auto& seg : intersected_time_segments) {
valid_seg_str.emplace_back(
fmt::format("[start: {}, duration: {}]",
absl::ToInt64Seconds(seg.start - now),
absl::ToInt64Seconds(seg.duration)));
valid_seg_str.emplace_back(fmt::format(
"[start: {}, end: {})", absl::ToInt64Seconds(seg.start - now),
absl::ToInt64Seconds(seg.start - now + seg.duration)));
}
CRANE_TRACE("After looping craned {}, valid time segments: {}",
craned_id, absl::StrJoin(valid_seg_str, ", "));
Expand Down

0 comments on commit 54e662f

Please sign in to comment.