From 1fd028d2f78fb3fba876041b13cbd3325562f5fb Mon Sep 17 00:00:00 2001 From: db <1301189887@qq.com> Date: Thu, 26 Dec 2024 17:33:32 +0800 Subject: [PATCH 1/6] fix cacct priority val error --- protos/PublicDefs.proto | 1 + src/CraneCtld/DbClient.cpp | 6 ++++-- src/CraneCtld/TaskScheduler.cpp | 3 +++ 3 files changed, 8 insertions(+), 2 deletions(-) diff --git a/protos/PublicDefs.proto b/protos/PublicDefs.proto index 7e96a3aad..801b857fa 100644 --- a/protos/PublicDefs.proto +++ b/protos/PublicDefs.proto @@ -130,6 +130,7 @@ message TaskToCtld { bool requeue_if_failed = 12; bool get_user_env = 13; + uint32 priority = 14; oneof payload { BatchTaskAdditionalMeta batch_meta = 21; diff --git a/src/CraneCtld/DbClient.cpp b/src/CraneCtld/DbClient.cpp index 1bef5af95..62b0c5468 100644 --- a/src/CraneCtld/DbClient.cpp +++ b/src/CraneCtld/DbClient.cpp @@ -348,6 +348,8 @@ bool MongodbClient::FetchJobRecords( task->set_type((crane::grpc::TaskType)view["type"].get_int32().value); task->set_extra_attr(view["extra_attr"].get_string().value.data()); + + task->set_priority(view["priority"].get_int64().value); } } catch (const bsoncxx::exception& e) { PrintError_(e.what()); @@ -869,7 +871,7 @@ MongodbClient::document MongodbClient::TaskInEmbeddedDbToDocument_( util::HostNameListToStr(runtime_attr.craned_ids()), runtime_attr.craned_ids().size(), 0, task_to_ctld.partition_name(), // 15-19 - 0, 0, runtime_attr.start_time().seconds(), + task_to_ctld.priority(), 0, runtime_attr.start_time().seconds(), runtime_attr.end_time().seconds(), 0, // 20-24 task_to_ctld.batch_meta().sh_script(), runtime_attr.status(), @@ -942,7 +944,7 @@ MongodbClient::document MongodbClient::TaskInCtldToDocument_(TaskInCtld* task) { static_cast(task->Gid()), task->allocated_craneds_regex, static_cast(task->nodes_alloc), 0, task->partition_id, // 15-19 - 0, 0, task->StartTimeInUnixSecond(), task->EndTimeInUnixSecond(), + static_cast(task->cached_priority), 0, task->StartTimeInUnixSecond(), task->EndTimeInUnixSecond(), 0, // 20-24 script, task->Status(), absl::ToInt64Seconds(task->time_limit), diff --git a/src/CraneCtld/TaskScheduler.cpp b/src/CraneCtld/TaskScheduler.cpp index 46835eb25..0a2b5d386 100644 --- a/src/CraneCtld/TaskScheduler.cpp +++ b/src/CraneCtld/TaskScheduler.cpp @@ -2904,6 +2904,9 @@ std::vector MultiFactorPriority::GetOrderedTaskIdList( task->cached_priority = priority; task->pending_reason = "Priority"; task_priority_vec.emplace_back(task.get(), priority); + task->MutableTaskToCtld()->set_priority(priority); + g_embedded_db_client->UpdateTaskToCtldIfExists(0, task->TaskDbId(), + task->TaskToCtld()); } std::sort(task_priority_vec.begin(), task_priority_vec.end(), From 3daab95f5c0da2fc9c9679abca99d460ee51ee6e Mon Sep 17 00:00:00 2001 From: db <1301189887@qq.com> Date: Thu, 6 Feb 2025 16:25:58 +0800 Subject: [PATCH 2/6] fix comments --- protos/PublicDefs.proto | 1 - src/CraneCtld/DbClient.cpp | 2 +- src/CraneCtld/TaskScheduler.cpp | 3 --- 3 files changed, 1 insertion(+), 5 deletions(-) diff --git a/protos/PublicDefs.proto b/protos/PublicDefs.proto index 801b857fa..7e96a3aad 100644 --- a/protos/PublicDefs.proto +++ b/protos/PublicDefs.proto @@ -130,7 +130,6 @@ message TaskToCtld { bool requeue_if_failed = 12; bool get_user_env = 13; - uint32 priority = 14; oneof payload { BatchTaskAdditionalMeta batch_meta = 21; diff --git a/src/CraneCtld/DbClient.cpp b/src/CraneCtld/DbClient.cpp index 62b0c5468..ac8731ef9 100644 --- a/src/CraneCtld/DbClient.cpp +++ b/src/CraneCtld/DbClient.cpp @@ -871,7 +871,7 @@ MongodbClient::document MongodbClient::TaskInEmbeddedDbToDocument_( util::HostNameListToStr(runtime_attr.craned_ids()), runtime_attr.craned_ids().size(), 0, task_to_ctld.partition_name(), // 15-19 - task_to_ctld.priority(), 0, runtime_attr.start_time().seconds(), + 0, 0, runtime_attr.start_time().seconds(), runtime_attr.end_time().seconds(), 0, // 20-24 task_to_ctld.batch_meta().sh_script(), runtime_attr.status(), diff --git a/src/CraneCtld/TaskScheduler.cpp b/src/CraneCtld/TaskScheduler.cpp index 0a2b5d386..46835eb25 100644 --- a/src/CraneCtld/TaskScheduler.cpp +++ b/src/CraneCtld/TaskScheduler.cpp @@ -2904,9 +2904,6 @@ std::vector MultiFactorPriority::GetOrderedTaskIdList( task->cached_priority = priority; task->pending_reason = "Priority"; task_priority_vec.emplace_back(task.get(), priority); - task->MutableTaskToCtld()->set_priority(priority); - g_embedded_db_client->UpdateTaskToCtldIfExists(0, task->TaskDbId(), - task->TaskToCtld()); } std::sort(task_priority_vec.begin(), task_priority_vec.end(), From ea88412fe5a9f4c63abcc01ef28dc0e31864fd77 Mon Sep 17 00:00:00 2001 From: db <1301189887@qq.com> Date: Thu, 6 Feb 2025 16:59:49 +0800 Subject: [PATCH 3/6] fix comments --- protos/PublicDefs.proto | 1 + src/CraneCtld/CtldPublicDefs.h | 7 ++++++- src/CraneCtld/DbClient.cpp | 2 +- src/CraneCtld/TaskScheduler.cpp | 1 + 4 files changed, 9 insertions(+), 2 deletions(-) diff --git a/protos/PublicDefs.proto b/protos/PublicDefs.proto index 7e96a3aad..a2e9cd7aa 100644 --- a/protos/PublicDefs.proto +++ b/protos/PublicDefs.proto @@ -170,6 +170,7 @@ message RuntimeAttrOfTask { bool held = 18; ResourceV2 resources = 19; + uint32 priority = 20; } message TaskToD { diff --git a/src/CraneCtld/CtldPublicDefs.h b/src/CraneCtld/CtldPublicDefs.h index 1e7b92386..276e6207e 100644 --- a/src/CraneCtld/CtldPublicDefs.h +++ b/src/CraneCtld/CtldPublicDefs.h @@ -464,6 +464,10 @@ struct TaskInCtld { } bool const& Held() const { return held; } + void SetPriority(uint32_t val) { + runtime_attr.set_priority(val); + } + void SetResources(ResourceV2&& val) { *runtime_attr.mutable_resources() = static_cast(val); @@ -535,7 +539,8 @@ struct TaskInCtld { status = runtime_attr.status(); held = runtime_attr.held(); - + cached_priority = runtime_attr.priority(); + if (status != crane::grpc::TaskStatus::Pending) { craned_ids.assign(runtime_attr.craned_ids().begin(), runtime_attr.craned_ids().end()); diff --git a/src/CraneCtld/DbClient.cpp b/src/CraneCtld/DbClient.cpp index ac8731ef9..162d36260 100644 --- a/src/CraneCtld/DbClient.cpp +++ b/src/CraneCtld/DbClient.cpp @@ -871,7 +871,7 @@ MongodbClient::document MongodbClient::TaskInEmbeddedDbToDocument_( util::HostNameListToStr(runtime_attr.craned_ids()), runtime_attr.craned_ids().size(), 0, task_to_ctld.partition_name(), // 15-19 - 0, 0, runtime_attr.start_time().seconds(), + runtime_attr.priority(), 0, runtime_attr.start_time().seconds(), runtime_attr.end_time().seconds(), 0, // 20-24 task_to_ctld.batch_meta().sh_script(), runtime_attr.status(), diff --git a/src/CraneCtld/TaskScheduler.cpp b/src/CraneCtld/TaskScheduler.cpp index 46835eb25..a0fd08f7e 100644 --- a/src/CraneCtld/TaskScheduler.cpp +++ b/src/CraneCtld/TaskScheduler.cpp @@ -2529,6 +2529,7 @@ void MinLoadFirst::NodeSelect( g_meta_container->MallocResourceFromNode(craned_id, task->TaskId(), task->Resources()); + task->SetPriority(task->cached_priority); std::unique_ptr moved_task; // Move task out of pending_task_map and insert it to the From 1379751f6e4da2adb0ac4df337c3b4ff9b565f84 Mon Sep 17 00:00:00 2001 From: db <1301189887@qq.com> Date: Thu, 6 Feb 2025 17:27:06 +0800 Subject: [PATCH 4/6] fix pending priority when recovery --- src/CraneCtld/CtldPublicDefs.h | 1 + 1 file changed, 1 insertion(+) diff --git a/src/CraneCtld/CtldPublicDefs.h b/src/CraneCtld/CtldPublicDefs.h index 276e6207e..2019c9b49 100644 --- a/src/CraneCtld/CtldPublicDefs.h +++ b/src/CraneCtld/CtldPublicDefs.h @@ -565,6 +565,7 @@ struct TaskInCtld { start_time = absl::FromUnixSeconds(runtime_attr.start_time().seconds()); end_time = absl::FromUnixSeconds(runtime_attr.end_time().seconds()); + submit_time = absl::FromUnixSeconds(runtime_attr.submit_time().seconds()); } // Helper function to set the fields of TaskInfo using info in From b8ea5f1e35b1ddb421076e51bbcd5bd8947217e7 Mon Sep 17 00:00:00 2001 From: db <1301189887@qq.com> Date: Thu, 13 Feb 2025 13:57:54 +0800 Subject: [PATCH 5/6] fix comments --- src/CraneCtld/TaskScheduler.cpp | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/CraneCtld/TaskScheduler.cpp b/src/CraneCtld/TaskScheduler.cpp index 7bdcaaf70..baae7f124 100644 --- a/src/CraneCtld/TaskScheduler.cpp +++ b/src/CraneCtld/TaskScheduler.cpp @@ -677,6 +677,7 @@ void TaskScheduler::ScheduleThread_() { task->SetStatus(crane::grpc::TaskStatus::Running); task->SetCranedIds(std::move(it.second)); + task->SetPriority(task->cached_priority); task->nodes_alloc = task->CranedIds().size(); // CRANE_DEBUG( @@ -2278,8 +2279,6 @@ void MinLoadFirst::NodeSelect( for (CranedId const& craned_id : craned_ids) g_meta_container->MallocResourceFromNode(craned_id, task->TaskId(), task->Resources()); - - task->SetPriority(task->cached_priority); std::unique_ptr moved_task; // Move task out of pending_task_map and insert it to the From 5e2d5bea1dfe34becfc81a98d0dacc990e439644 Mon Sep 17 00:00:00 2001 From: RileyWen Date: Sun, 23 Feb 2025 00:42:40 +0800 Subject: [PATCH 6/6] Refactor. --- protos/PublicDefs.proto | 4 ++-- src/CraneCtld/CtldGrpcServer.cpp | 3 ++- src/CraneCtld/CtldPublicDefs.h | 14 +++++++++----- src/CraneCtld/DbClient.cpp | 7 ++++--- src/CraneCtld/TaskScheduler.cpp | 3 +-- 5 files changed, 18 insertions(+), 13 deletions(-) diff --git a/protos/PublicDefs.proto b/protos/PublicDefs.proto index 523bd175f..1e7d12652 100644 --- a/protos/PublicDefs.proto +++ b/protos/PublicDefs.proto @@ -171,7 +171,7 @@ message RuntimeAttrOfTask { bool held = 18; ResourceV2 resources = 19; - uint32 priority = 20; + double cached_priority = 20; } message TaskToD { @@ -226,7 +226,7 @@ message InteractiveTaskAdditionalMeta { string sh_script = 2; string term_env = 3; InteractiveTaskType interactive_type = 4; - + bool pty = 5; bool x11 = 6; diff --git a/src/CraneCtld/CtldGrpcServer.cpp b/src/CraneCtld/CtldGrpcServer.cpp index f6810cb8c..2df2a895b 100644 --- a/src/CraneCtld/CtldGrpcServer.cpp +++ b/src/CraneCtld/CtldGrpcServer.cpp @@ -989,8 +989,9 @@ CtldServer::SubmitTaskToScheduler(std::unique_ptr task) { if (err == CraneErr::kOk) err = g_task_scheduler->CheckTaskValidity(task.get()); + task->SetSubmitTime(absl::Now()); + if (err == CraneErr::kOk) { - task->SetSubmitTime(absl::Now()); std::future future = g_task_scheduler->SubmitTaskAsync(std::move(task)); return {std::move(future)}; diff --git a/src/CraneCtld/CtldPublicDefs.h b/src/CraneCtld/CtldPublicDefs.h index a527940af..7ec2a62a8 100644 --- a/src/CraneCtld/CtldPublicDefs.h +++ b/src/CraneCtld/CtldPublicDefs.h @@ -343,6 +343,9 @@ struct TaskInCtld { absl::Time start_time; absl::Time end_time; + // persisted for querying priority of running tasks + double cached_priority{0.0}; + // Might change at each scheduling cycle. ResourceV2 resources; @@ -380,7 +383,6 @@ struct TaskInCtld { std::string pending_reason; double mandated_priority{0.0}; - double cached_priority{0.0}; // Helper function public: @@ -473,9 +475,11 @@ struct TaskInCtld { } bool const& Held() const { return held; } - void SetPriority(uint32_t val) { - runtime_attr.set_priority(val); + void SetCachedPriority(const double val) { + cached_priority = val; + runtime_attr.set_cached_priority(val); } + double CachedPriority() const { return cached_priority; } void SetResources(ResourceV2&& val) { *runtime_attr.mutable_resources() = @@ -558,8 +562,8 @@ struct TaskInCtld { status = runtime_attr.status(); held = runtime_attr.held(); - cached_priority = runtime_attr.priority(); - + cached_priority = runtime_attr.cached_priority(); + if (status != crane::grpc::TaskStatus::Pending) { craned_ids.assign(runtime_attr.craned_ids().begin(), runtime_attr.craned_ids().end()); diff --git a/src/CraneCtld/DbClient.cpp b/src/CraneCtld/DbClient.cpp index 71e1b7f2b..7c99ffb0a 100644 --- a/src/CraneCtld/DbClient.cpp +++ b/src/CraneCtld/DbClient.cpp @@ -871,7 +871,8 @@ MongodbClient::document MongodbClient::TaskInEmbeddedDbToDocument_( util::HostNameListToStr(runtime_attr.craned_ids()), runtime_attr.craned_ids().size(), 0, task_to_ctld.partition_name(), // 15-19 - runtime_attr.priority(), 0, runtime_attr.start_time().seconds(), + runtime_attr.cached_priority(), 0, + runtime_attr.start_time().seconds(), runtime_attr.end_time().seconds(), 0, // 20-24 task_to_ctld.batch_meta().sh_script(), runtime_attr.status(), @@ -944,8 +945,8 @@ MongodbClient::document MongodbClient::TaskInCtldToDocument_(TaskInCtld* task) { static_cast(task->gid), task->allocated_craneds_regex, static_cast(task->nodes_alloc), 0, task->partition_id, // 15-19 - static_cast(task->cached_priority), 0, task->StartTimeInUnixSecond(), task->EndTimeInUnixSecond(), - 0, + static_cast(task->CachedPriority()), 0, + task->StartTimeInUnixSecond(), task->EndTimeInUnixSecond(), 0, // 20-24 script, task->Status(), absl::ToInt64Seconds(task->time_limit), task->SubmitTimeInUnixSecond(), task->cwd, diff --git a/src/CraneCtld/TaskScheduler.cpp b/src/CraneCtld/TaskScheduler.cpp index baae7f124..db0959bef 100644 --- a/src/CraneCtld/TaskScheduler.cpp +++ b/src/CraneCtld/TaskScheduler.cpp @@ -677,7 +677,6 @@ void TaskScheduler::ScheduleThread_() { task->SetStatus(crane::grpc::TaskStatus::Running); task->SetCranedIds(std::move(it.second)); - task->SetPriority(task->cached_priority); task->nodes_alloc = task->CranedIds().size(); // CRANE_DEBUG( @@ -2643,7 +2642,7 @@ std::vector MultiFactorPriority::GetOrderedTaskIdList( double priority = (task->mandated_priority == 0.0) ? CalculatePriority_(task.get(), now) : task->mandated_priority; - task->cached_priority = priority; + task->SetCachedPriority(priority); task->pending_reason = "Priority"; task_priority_vec.emplace_back(task.get(), priority); }