Skip to content

Commit

Permalink
bugfix
Browse files Browse the repository at this point in the history
Signed-off-by: xiafeng <[email protected]>
  • Loading branch information
L-Xiafeng committed Jul 26, 2024
1 parent 1e10c39 commit f5b0d6e
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 55 deletions.
6 changes: 3 additions & 3 deletions src/CraneCtld/CtldGrpcServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1037,9 +1037,9 @@ grpc::Status CraneCtldServiceImpl::CforedStream(
// calloc will not send TaskCompletionAckReply when task
// Complete.
// crun task will send TaskStatusChange from Craned,
// if (meta.interactive_type == InteractiveTaskType::Crun) {
// stream_writer.WriteTaskCompletionAckReply(task_id);
// }
if (auto writer = writer_weak_ptr.lock();
writer && meta.interactive_type == crane::grpc::Calloc)
writer->WriteTaskCompletionAckReply(task_id);
m_ctld_server_->m_mtx_.Lock();

// If cfored disconnected, the cfored_name should have be
Expand Down
115 changes: 63 additions & 52 deletions src/CraneCtld/TaskScheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -690,15 +690,17 @@ void TaskScheduler::ScheduleThread_() {
} else {
const auto& meta = std::get<InteractiveMetaInTask>(task->meta);
for (auto const& craned_id : task->CranedIds())
task->executing_craned_ids.emplace_back(craned_id);
// if (meta.interactive_type == crane::grpc::Calloc)
// // For calloc tasks we still need to execute a dummy empty task to
// // set up a timer.
// task->executing_craned_ids.emplace_back(task->CranedIds().front());
// else
// // For crun tasks we need to execute tasks on all allocated nodes.
// for (auto const& craned_id : task->CranedIds())
// task->executing_craned_ids.emplace_back(craned_id);
task->executing_craned_ids.emplace_back(craned_id);
// if (meta.interactive_type == crane::grpc::Calloc)
// // For calloc tasks we still need to execute a dummy
// empty task to
// // set up a timer.
// task->executing_craned_ids.emplace_back(task->CranedIds().front());
// else
// // For crun tasks we need to execute tasks on all
// allocated nodes. for (auto const& craned_id :
// task->CranedIds())
// task->executing_craned_ids.emplace_back(craned_id);
}
}
end = std::chrono::steady_clock::now();
Expand Down Expand Up @@ -1418,6 +1420,58 @@ void TaskScheduler::CleanCancelQueueCb_() {
running_task_craned_id_map[craned_id].emplace_back(task_id);
}

if (!pending_tasks_vec.empty()) {
// Carry the ownership of TaskInCtld for automatic destruction.
std::vector<std::unique_ptr<TaskInCtld>> task_ptr_vec;
std::vector<TaskInCtld*> task_raw_ptr_vec;
{
// Allow temporary inconsistency on task querying here.
// In a very short duration, some cancelled tasks might not be visible
// immediately after changing to CANCELLED state.
// Also, since here we erase the task id from the pending task
// map with a little latency, some task id we retrieve might have been
// cancelled. Just ignore those who have already been cancelled.
LockGuard pending_guard(&m_pending_task_map_mtx_);
for (task_id_t task_id : pending_tasks_vec) {
auto it = m_pending_task_map_.find(task_id);
if (it == m_pending_task_map_.end()) {
CRANE_TRACE(
"Pending task #{} not found when doing actual cancelling. "
"Skipping it..",
task_id);
continue;
}

TaskInCtld* task = it->second.get();
task->SetStatus(crane::grpc::Cancelled);
task->SetEndTime(absl::Now());

if (task->type == crane::grpc::Interactive) {
auto& meta = std::get<InteractiveMetaInTask>(task->meta);
if (meta.interactive_type == crane::grpc::Crun)
g_thread_pool->detach_task([cb = meta.cb_task_completed,
task_id = task_id] { cb(task_id); });
}

task_raw_ptr_vec.emplace_back(it->second.get());
task_ptr_vec.emplace_back(std::move(it->second));

m_pending_task_map_.erase(it);
}
}
PersistAndTransferTasksToMongodb_(task_raw_ptr_vec);
}

{
LockGuard running_guard(&m_running_task_map_mtx_);
for (auto task_id : pending_tasks_vec) {
auto it = m_running_task_map_.find(task_id);
if (it == m_running_task_map_.end()) continue;
for (const auto& craned_id : it->second->CranedIds())
running_task_craned_id_map[craned_id].emplace_back(task_id);
}
}

for (auto&& [craned_id, task_ids] : running_task_craned_id_map) {
g_thread_pool->detach_task(
[id = craned_id, task_ids_to_cancel = task_ids]() {
Expand All @@ -1428,49 +1482,6 @@ void TaskScheduler::CleanCancelQueueCb_() {
stub->TerminateTasks(task_ids_to_cancel);
});
}

if (pending_tasks_vec.empty()) return;

// Carry the ownership of TaskInCtld for automatic destruction.
std::vector<std::unique_ptr<TaskInCtld>> task_ptr_vec;
std::vector<TaskInCtld*> task_raw_ptr_vec;
{
// Allow temporary inconsistency on task querying here.
// In a very short duration, some cancelled tasks might not be visible
// immediately after changing to CANCELLED state.
// Also, since here we erase the task id from the pending task
// map with a little latency, some task id we retrieve might have been
// cancelled. Just ignore those who have already been cancelled.
LockGuard pending_guard(&m_pending_task_map_mtx_);
for (task_id_t task_id : pending_tasks_vec) {
auto it = m_pending_task_map_.find(task_id);
if (it == m_pending_task_map_.end()) {
CRANE_TRACE(
"Pending task #{} not found when doing actual cancelling. "
"Skipping it..",
task_id);
continue;
}

TaskInCtld* task = it->second.get();
task->SetStatus(crane::grpc::Cancelled);
task->SetEndTime(absl::Now());

if (task->type == crane::grpc::Interactive) {
auto& meta = std::get<InteractiveMetaInTask>(task->meta);
if (meta.interactive_type == crane::grpc::Crun)
g_thread_pool->detach_task([cb = meta.cb_task_completed,
task_id = task_id] { cb(task_id); });
}

task_raw_ptr_vec.emplace_back(it->second.get());
task_ptr_vec.emplace_back(std::move(it->second));

m_pending_task_map_.erase(it);
}
}

PersistAndTransferTasksToMongodb_(task_raw_ptr_vec);
}

void TaskScheduler::SubmitTaskTimerCb_() {
Expand Down

0 comments on commit f5b0d6e

Please sign in to comment.