From 442c5719562f9d2f2dce77bcbf77d1f25eced110 Mon Sep 17 00:00:00 2001 From: lijunlin <70465472+L-Xiafeng@users.noreply.github.com> Date: Thu, 26 Sep 2024 23:33:42 +0800 Subject: [PATCH] Bugfix/sigchld (#316) * refactor sigchild handling Signed-off-by: xiafeng * refactor task status on lunch failed Signed-off-by: xiafeng * refactor. Signed-off-by: xiafeng * typo Signed-off-by: xiafeng * refactor Signed-off-by: xiafeng * typo Signed-off-by: xiafeng * Refactor. --------- Signed-off-by: xiafeng Co-authored-by: RileyWen --- src/Craned/CranedPublicDefs.h | 2 + src/Craned/TaskManager.cpp | 232 +++++++++++------- src/Craned/TaskManager.h | 27 +- .../PublicHeader/include/crane/PublicHeader.h | 2 - 4 files changed, 168 insertions(+), 95 deletions(-) diff --git a/src/Craned/CranedPublicDefs.h b/src/Craned/CranedPublicDefs.h index ab9336c65..e4b4a4e35 100644 --- a/src/Craned/CranedPublicDefs.h +++ b/src/Craned/CranedPublicDefs.h @@ -25,6 +25,8 @@ namespace Craned { +inline const uint64_t kEvSigChldResendMs = 500'000; + using EnvPair = std::pair; struct TaskStatusChange { diff --git a/src/Craned/TaskManager.cpp b/src/Craned/TaskManager.cpp index ed865802d..7d86b518f 100644 --- a/src/Craned/TaskManager.cpp +++ b/src/Craned/TaskManager.cpp @@ -111,6 +111,19 @@ TaskManager::TaskManager() { std::terminate(); } } + { + m_ev_process_sigchld_ = event_new(m_ev_base_, -1, EV_PERSIST | EV_READ, + EvProcessSigchldCb_, this); + if (!m_ev_process_sigchld_) { + CRANE_ERROR("Failed to create the Do SIGCHLD event!"); + std::terminate(); + } + + if (event_add(m_ev_process_sigchld_, nullptr) < 0) { + CRANE_ERROR("Could not add the Do SIGCHLD event to base!"); + std::terminate(); + } + } { // SIGINT m_ev_sigint_ = evsignal_new(m_ev_base_, SIGINT, EvSigintCb_, this); if (!m_ev_sigint_) { @@ -267,13 +280,13 @@ void TaskManager::TaskStopAndDoStatusChangeAsync(uint32_t task_id) { switch (instance->err_before_exec) { case CraneErr::kProtobufError: - EvActivateTaskStatusChange_(task_id, crane::grpc::TaskStatus::Cancelled, + EvActivateTaskStatusChange_(task_id, crane::grpc::TaskStatus::Failed, ExitCode::kExitCodeSpawnProcessFail, std::nullopt); break; case CraneErr::kCgroupError: - EvActivateTaskStatusChange_(task_id, crane::grpc::TaskStatus::Cancelled, + EvActivateTaskStatusChange_(task_id, crane::grpc::TaskStatus::Failed, ExitCode::kExitCodeCgroupError, std::nullopt); break; @@ -321,8 +334,6 @@ void TaskManager::EvSigchldCb_(evutil_socket_t sig, short events, assert(m_instance_ptr_->m_instance_ptr_ != nullptr); auto* this_ = reinterpret_cast(user_data); - ProcSigchldInfo sigchld_info{}; - int status; pid_t pid; while (true) { @@ -330,14 +341,22 @@ void TaskManager::EvSigchldCb_(evutil_socket_t sig, short events, /* TODO(More status tracing): | WUNTRACED | WCONTINUED */); if (pid > 0) { + auto sigchld_info = std::make_unique(); + if (WIFEXITED(status)) { // Exited with status WEXITSTATUS(status) - sigchld_info = {pid, false, WEXITSTATUS(status)}; + sigchld_info->pid = pid; + sigchld_info->is_terminated_by_signal = false; + sigchld_info->value = WEXITSTATUS(status); + CRANE_TRACE("Receiving SIGCHLD for pid {}. Signaled: false, Status: {}", pid, WEXITSTATUS(status)); } else if (WIFSIGNALED(status)) { // Killed by signal WTERMSIG(status) - sigchld_info = {pid, true, WTERMSIG(status)}; + sigchld_info->pid = pid; + sigchld_info->is_terminated_by_signal = true; + sigchld_info->value = WTERMSIG(status); + CRANE_TRACE("Receiving SIGCHLD for pid {}. Signaled: true, Signal: {}", pid, WTERMSIG(status)); } @@ -347,61 +366,8 @@ void TaskManager::EvSigchldCb_(evutil_socket_t sig, short events, } else if (WIFCONTINUED(status)) { printf("continued\n"); } */ - - this_->m_mtx_.Lock(); - - auto task_iter = this_->m_pid_task_map_.find(pid); - auto proc_iter = this_->m_pid_proc_map_.find(pid); - if (task_iter == this_->m_pid_task_map_.end() || - proc_iter == this_->m_pid_proc_map_.end()) { - CRANE_WARN("Failed to find task id for pid {}.", pid); - this_->m_mtx_.Unlock(); - } else { - TaskInstance* instance = task_iter->second; - ProcessInstance* proc = proc_iter->second; - uint32_t task_id = instance->task.task_id(); - - // Remove indexes from pid to ProcessInstance* - this_->m_pid_proc_map_.erase(proc_iter); - this_->m_pid_task_map_.erase(task_iter); - - this_->m_mtx_.Unlock(); - - instance->sigchld_info = sigchld_info; - proc->Finish(sigchld_info.is_terminated_by_signal, sigchld_info.value); - - // Free the ProcessInstance. ITask struct is not freed here because - // the ITask for an Interactive task can have no ProcessInstance. - auto pr_it = instance->processes.find(pid); - if (pr_it == instance->processes.end()) { - CRANE_ERROR("Failed to find pid {} in task #{}'s ProcessInstances", - pid, task_id); - } else { - instance->processes.erase(pr_it); - - if (!instance->processes.empty()) { - if (sigchld_info.is_terminated_by_signal) { - // If a task is terminated by a signal and there are other - // running processes belonging to this task, kill them. - this_->TerminateTaskAsync(task_id); - } - } else { - if (instance->task.interactive_meta().interactive_type() == - crane::grpc::Crun) - // TaskStatusChange of a crun task is triggered in - // CforedManager. - g_cfored_manager->TaskProcOnCforedStopped( - instance->task.interactive_meta().cfored_name(), - instance->task.task_id()); - else /* Batch / Calloc */ { - // If the ProcessInstance has no process left, - // send TaskStatusChange for this task. - // See the comment of EvActivateTaskStatusChange_. - this_->TaskStopAndDoStatusChangeAsync(task_id); - } - } - } - } + this_->m_sigchld_queue_.enqueue(std::move(sigchld_info)); + event_active(this_->m_ev_process_sigchld_, 0, 0); } else if (pid == 0) { // There's no child that needs reaping. // If Craned is exiting, check if there's any task remaining. @@ -420,6 +386,101 @@ void TaskManager::EvSigchldCb_(evutil_socket_t sig, short events, } } +void TaskManager::EvProcessSigchldCb_(int sig, short events, void* user_data) { + auto* this_ = reinterpret_cast(user_data); + + std::unique_ptr sigchld_info; + while (this_->m_sigchld_queue_.try_dequeue(sigchld_info)) { + auto pid = sigchld_info->pid; + + if (sigchld_info->resend_timer != nullptr) { + evtimer_del(sigchld_info->resend_timer); + event_free(sigchld_info->resend_timer); + sigchld_info->resend_timer = nullptr; + } + + this_->m_mtx_.Lock(); + auto task_iter = this_->m_pid_task_map_.find(pid); + auto proc_iter = this_->m_pid_proc_map_.find(pid); + + if (task_iter == this_->m_pid_task_map_.end() || + proc_iter == this_->m_pid_proc_map_.end()) { + this_->m_mtx_.Unlock(); + + EvQueueSigchldArg* arg = new EvQueueSigchldArg; + + timeval tv{kEvSigChldResendMs / 1000'000, kEvSigChldResendMs % 1000'000}; + sigchld_info->resend_timer = + event_new(this_->m_ev_base_, -1, 0, EvOnSigchldTimerCb_, arg); + evtimer_add(sigchld_info->resend_timer, &tv); + + CRANE_ASSERT_MSG(sigchld_info->resend_timer != nullptr, + "Failed to create new timer."); + + arg->task_manager = this_; + arg->sigchld_info = std::move(sigchld_info); + + CRANE_TRACE("Child Process {} exit too early, will do SigchldCb later", + sigchld_info->pid); + continue; + } + + TaskInstance* instance = task_iter->second; + ProcessInstance* proc = proc_iter->second; + uint32_t task_id = instance->task.task_id(); + + // Remove indexes from pid to ProcessInstance* + this_->m_pid_proc_map_.erase(proc_iter); + this_->m_pid_task_map_.erase(task_iter); + + this_->m_mtx_.Unlock(); + + instance->sigchld_info = *sigchld_info; + proc->Finish(sigchld_info->is_terminated_by_signal, sigchld_info->value); + + // Free the ProcessInstance. ITask struct is not freed here because + // the ITask for an Interactive task can have no ProcessInstance. + auto pr_it = instance->processes.find(pid); + if (pr_it == instance->processes.end()) { + CRANE_ERROR("Failed to find pid {} in task #{}'s ProcessInstances", pid, + task_id); + } else { + instance->processes.erase(pr_it); + + if (!instance->processes.empty()) { + if (sigchld_info->is_terminated_by_signal) { + // If a task is terminated by a signal and there are other + // running processes belonging to this task, kill them. + this_->TerminateTaskAsync(task_id); + } + } else { + if (instance->IsCrun()) + // TaskStatusChange of a crun task is triggered in + // CforedManager. + g_cfored_manager->TaskProcOnCforedStopped( + instance->task.interactive_meta().cfored_name(), + instance->task.task_id()); + else /* Batch / Calloc */ { + // If the ProcessInstance has no process left, + // send TaskStatusChange for this task. + // See the comment of EvActivateTaskStatusChange_. + this_->TaskStopAndDoStatusChangeAsync(task_id); + } + } + } + } +} + +void TaskManager::EvOnSigchldTimerCb_(int, short, void* arg_) { + auto* arg = reinterpret_cast(arg_); + auto* this_ = arg->task_manager; + + this_->m_sigchld_queue_.enqueue(std::move(arg->sigchld_info)); + event_active(this_->m_ev_process_sigchld_, 0, 0); + + delete arg; +} + void TaskManager::EvSubprocessReadCb_(struct bufferevent* bev, void* process) { auto* proc = reinterpret_cast(process); @@ -547,8 +608,8 @@ void TaskManager::SetSigintCallback(std::function cb) { m_sigint_cb_ = std::move(cb); } -CraneErr TaskManager::SpawnProcessInInstance_( - TaskInstance* instance, std::unique_ptr process) { +CraneErr TaskManager::SpawnProcessInInstance_(TaskInstance* instance, + ProcessInstance* process) { using google::protobuf::io::FileInputStream; using google::protobuf::io::FileOutputStream; using google::protobuf::util::ParseDelimitedFromZeroCopyStream; @@ -636,25 +697,6 @@ CraneErr TaskManager::SpawnProcessInInstance_( instance->task.task_id(), meta->proc_in_fd, meta->proc_out_fd); } - // Note that the following code will move the child process into cgroup. - // Once the child process is moved into cgroup, it might be killed due to - // memory limitation. - // Since the task status change is triggered by SIGCHLD, - // we should put the child pid into the index map IMMEDIATELY when fork() is - // done. - // Otherwise, SIGCHLD handler will not find the pid if the child process - // stops really fast, for example, due to being killed by oom killer. - // In such case, the task status change for this quickly dying job will not - // be triggered, and it will cause infinitely running jobs which are - // actually dead. - m_mtx_.Lock(); - m_pid_task_map_.emplace(child_pid, instance); - m_pid_proc_map_.emplace(child_pid, process.get()); - m_mtx_.Unlock(); - - // Move the ownership of ProcessInstance into the TaskInstance. - instance->processes.emplace(child_pid, std::move(process)); - int ctrl_fd = ctrl_sock_pair[0]; close(ctrl_sock_pair[1]); if (instance->IsCrun()) { @@ -725,7 +767,7 @@ CraneErr TaskManager::SpawnProcessInInstance_( // The child process will be reaped in SIGCHLD handler and // thus only ONE TaskStatusChange will be triggered! instance->err_before_exec = CraneErr::kProtobufError; - KillProcessInstance_(process.get(), SIGKILL); + KillProcessInstance_(process, SIGKILL); return CraneErr::kOk; } @@ -742,7 +784,7 @@ CraneErr TaskManager::SpawnProcessInInstance_( // See comments above. instance->err_before_exec = CraneErr::kProtobufError; - KillProcessInstance_(process.get(), SIGKILL); + KillProcessInstance_(process, SIGKILL); return CraneErr::kOk; } @@ -760,7 +802,7 @@ CraneErr TaskManager::SpawnProcessInInstance_( // See comments above. instance->err_before_exec = CraneErr::kProtobufError; - KillProcessInstance_(process.get(), SIGKILL); + KillProcessInstance_(process, SIGKILL); } // See comments above. @@ -1072,7 +1114,7 @@ void TaskManager::LaunchTaskInstanceMt_(TaskInstance* instance) { // or fork() fails. // In this case, SIGCHLD will NOT be received for this task, and // we should send TaskStatusChange manually. - CraneErr err = SpawnProcessInInstance_(instance, std::move(process)); + CraneErr err = SpawnProcessInInstance_(instance, process.get()); if (err != CraneErr::kOk) { EvActivateTaskStatusChange_( task_id, crane::grpc::TaskStatus::Failed, @@ -1080,6 +1122,22 @@ void TaskManager::LaunchTaskInstanceMt_(TaskInstance* instance) { fmt::format( "Cannot spawn a new process inside the instance of task #{}", task_id)); + } else { + // kOk means that SpawnProcessInInstance_ has successfully forked a child + // process. + // Now we put the child pid into index maps. + // SIGCHLD sent just after fork() and before putting pid into maps + // will repeatedly be sent by timer and eventually be handled once the + // SIGCHLD processing callback sees the pid in index maps. + m_mtx_.Lock(); + m_pid_task_map_.emplace(process->GetPid(), instance); + m_pid_proc_map_.emplace(process->GetPid(), process.get()); + + // Move the ownership of ProcessInstance into the TaskInstance. + // Make sure existing process can be found when handling SIGCHLD. + instance->processes.emplace(process->GetPid(), std::move(process)); + + m_mtx_.Unlock(); } } @@ -1221,7 +1279,7 @@ void TaskManager::EvGrpcQueryTaskIdFromPidCb_(int efd, short events, } } -void TaskManager::EvOnTimerCb_(int, short, void* arg_) { +void TaskManager::EvOnTaskTimerCb_(int, short, void* arg_) { auto* arg = reinterpret_cast(arg_); TaskManager* this_ = arg->task_manager; task_id_t task_id = arg->task_id; diff --git a/src/Craned/TaskManager.h b/src/Craned/TaskManager.h index 9241fba50..7076441b8 100644 --- a/src/Craned/TaskManager.h +++ b/src/Craned/TaskManager.h @@ -153,10 +153,13 @@ struct CrunMetaInTaskInstance : MetaInTaskInstance { ~CrunMetaInTaskInstance() override = default; }; +// also arg for EvDoSigChldCb_ struct ProcSigchldInfo { pid_t pid; bool is_terminated_by_signal; int value; + + event* resend_timer{nullptr}; }; // Todo: Task may consists of multiple subtasks @@ -170,8 +173,7 @@ struct TaskInstance { termination_timer = nullptr; } - if (this->task.type() == crane::grpc::Interactive && - this->task.interactive_meta().interactive_type() == crane::grpc::Crun) { + if (this->IsCrun()) { close(dynamic_cast(meta.get())->proc_in_fd); } } @@ -279,6 +281,11 @@ class TaskManager { std::promise> status_prom; }; + struct EvQueueSigchldArg { + TaskManager* task_manager; + std::unique_ptr sigchld_info; + }; + static std::string ParseFilePathPattern_(const std::string& path_pattern, const std::string& cwd, task_id_t task_id); @@ -286,7 +293,7 @@ class TaskManager { void LaunchTaskInstanceMt_(TaskInstance* instance); CraneErr SpawnProcessInInstance_(TaskInstance* instance, - std::unique_ptr process); + ProcessInstance* process); const TaskInstance* FindInstanceByTaskId_(uint32_t task_id); @@ -324,7 +331,7 @@ class TaskManager { std::chrono::duration_cast(duration - sec) .count()}; - struct event* ev = event_new(m_ev_base_, -1, 0, EvOnTimerCb_, arg); + struct event* ev = event_new(m_ev_base_, -1, 0, EvOnTaskTimerCb_, arg); CRANE_ASSERT_MSG(ev != nullptr, "Failed to create new timer."); evtimer_add(ev, &tv); @@ -338,7 +345,7 @@ class TaskManager { timeval tv{static_cast<__time_t>(secs), 0}; - struct event* ev = event_new(m_ev_base_, -1, 0, EvOnTimerCb_, arg); + struct event* ev = event_new(m_ev_base_, -1, 0, EvOnTaskTimerCb_, arg); CRANE_ASSERT_MSG(ev != nullptr, "Failed to create new timer."); evtimer_add(ev, &tv); @@ -398,6 +405,9 @@ class TaskManager { static void EvSigchldCb_(evutil_socket_t sig, short events, void* user_data); + static void EvProcessSigchldCb_(evutil_socket_t sig, short events, + void* user_data); + // Callback function to handle SIGINT sent by Ctrl+C static void EvSigintCb_(evutil_socket_t sig, short events, void* user_data); @@ -427,7 +437,9 @@ class TaskManager { static void EvExitEventCb_(evutil_socket_t, short events, void* user_data); - static void EvOnTimerCb_(evutil_socket_t, short, void* arg); + static void EvOnTaskTimerCb_(evutil_socket_t, short, void* arg_); + + static void EvOnSigchldTimerCb_(evutil_socket_t, short, void* arg_); struct event_base* m_ev_base_{}; struct event* m_ev_sigchld_{}; @@ -444,6 +456,9 @@ class TaskManager { // ev_sigchld_cb_ will stop the event loop when there is no task running. std::atomic_bool m_is_ending_now_{false}; + struct event* m_ev_process_sigchld_{}; + ConcurrentQueue> m_sigchld_queue_; + struct event* m_ev_query_task_id_from_pid_{}; ConcurrentQueue m_query_task_id_from_pid_queue_; diff --git a/src/Utilities/PublicHeader/include/crane/PublicHeader.h b/src/Utilities/PublicHeader/include/crane/PublicHeader.h index ecd8285f5..7caa53ac2 100644 --- a/src/Utilities/PublicHeader/include/crane/PublicHeader.h +++ b/src/Utilities/PublicHeader/include/crane/PublicHeader.h @@ -20,9 +20,7 @@ #include #include -#include #include -#include #include "protos/Crane.pb.h"