From 9c8279042f4651002506fe8f202e549ded67816d Mon Sep 17 00:00:00 2001 From: Li Junlin Date: Thu, 14 Nov 2024 11:05:39 +0800 Subject: [PATCH 01/11] Refactor: Crun support pty Signed-off-by: Li Junlin --- protos/PublicDefs.proto | 9 +++--- src/Craned/CforedClient.cpp | 61 ++++++++++++++++++++++--------------- src/Craned/CforedClient.h | 6 ++-- src/Craned/TaskManager.cpp | 37 +++++++++------------- src/Craned/TaskManager.h | 5 ++- 5 files changed, 59 insertions(+), 59 deletions(-) diff --git a/protos/PublicDefs.proto b/protos/PublicDefs.proto index 8c328d860..0db577c9d 100644 --- a/protos/PublicDefs.proto +++ b/protos/PublicDefs.proto @@ -219,10 +219,11 @@ message BatchTaskAdditionalMeta { } message InteractiveTaskAdditionalMeta { - string cfored_name = 1; - string sh_script = 2; - string term_env = 3; - InteractiveTaskType interactive_type = 4; + InteractiveTaskType interactive_type = 1; + string cfored_name = 2; + string sh_script = 3; + string term_env = 4; + bool pty = 5; } message TaskInfo { diff --git a/src/Craned/CforedClient.cpp b/src/Craned/CforedClient.cpp index 2818c1e62..066b1b600 100644 --- a/src/Craned/CforedClient.cpp +++ b/src/Craned/CforedClient.cpp @@ -18,6 +18,8 @@ #include "CforedClient.h" +#include + #include "crane/String.h" namespace Craned { @@ -343,12 +345,8 @@ void CforedManager::EvLoopThread_(const std::shared_ptr& uvw_loop) { } void CforedManager::RegisterIOForward(std::string const& cfored, - task_id_t task_id, int in_fd, - int out_fd) { - RegisterElem elem{.cfored = cfored, - .task_id = task_id, - .in_fd = in_fd, - .out_fd = out_fd}; + task_id_t task_id, int fd) { + RegisterElem elem{.cfored = cfored, .task_id = task_id, .fd = fd}; std::promise done; std::future done_fut = done.get_future(); @@ -372,7 +370,7 @@ void CforedManager::RegisterCb_() { } m_cfored_client_map_[elem.cfored]->InitTaskFwdAndSetInputCb( - elem.task_id, [fd = elem.in_fd](const std::string& msg) -> bool { + elem.task_id, [fd = elem.fd](const std::string& msg) -> bool { ssize_t sz_sent = 0, sz_written; while (sz_sent != msg.size()) { sz_written = write(fd, msg.c_str() + sz_sent, msg.size() - sz_sent); @@ -386,9 +384,9 @@ void CforedManager::RegisterCb_() { return true; }); - CRANE_TRACE("Registering fd {} for outputs of task #{}", elem.out_fd, + CRANE_TRACE("Registering fd {} for outputs of task #{}", elem.fd, elem.task_id); - auto poll_handle = m_loop_->resource(elem.out_fd); + auto poll_handle = m_loop_->resource(elem.fd); poll_handle->on([this, elem = std::move(elem)]( const uvw::poll_event&, uvw::poll_handle& h) { @@ -397,25 +395,38 @@ void CforedManager::RegisterCb_() { constexpr int MAX_BUF_SIZE = 4096; char buf[MAX_BUF_SIZE]; - auto ret = read(elem.out_fd, buf, MAX_BUF_SIZE); + auto ret = read(elem.fd, buf, MAX_BUF_SIZE); if (ret == 0) { - CRANE_TRACE("Task #{} to cfored {} finished its output.", elem.task_id, - elem.cfored); - h.close(); - close(elem.out_fd); - - bool ok_to_free = - m_cfored_client_map_[elem.cfored]->TaskOutputFinish(elem.task_id); - if (ok_to_free) { - CRANE_TRACE("It's ok to unregister task #{} on {}", elem.task_id, - elem.cfored); - UnregisterIOForward_(elem.cfored, elem.task_id); - } - return; + CRANE_ASSERT(false); } - if (ret == -1) - CRANE_ERROR("Error when reading task #{} output", elem.task_id); + if (ret == -1) { + if (errno == EIO) { + // For pty output, the read() will return -1 with errno set to EIO + // when process exit. + // ref: https://unix.stackexchange.com/questions/538198 + CRANE_TRACE("Task #{} to cfored {} finished its output.", + elem.task_id, elem.cfored); + h.close(); + close(elem.fd); + + bool ok_to_free = + m_cfored_client_map_[elem.cfored]->TaskOutputFinish(elem.task_id); + if (ok_to_free) { + CRANE_TRACE("It's ok to unregister task #{} on {}", elem.task_id, + elem.cfored); + UnregisterIOForward_(elem.cfored, elem.task_id); + } + return; + } else if (errno == EAGAIN) { + // Read before the process begin. + return; + } else { + CRANE_ERROR("Error when reading task #{} output, error {}", + elem.task_id, std::strerror(errno)); + return; + } + } std::string output(buf, ret); CRANE_TRACE("Fwd to task #{}: {}", elem.task_id, output); diff --git a/src/Craned/CforedClient.h b/src/Craned/CforedClient.h index d61a0ce98..038811922 100644 --- a/src/Craned/CforedClient.h +++ b/src/Craned/CforedClient.h @@ -86,16 +86,14 @@ class CforedManager { bool Init(); - void RegisterIOForward(std::string const& cfored, task_id_t task_id, - int in_fd, int out_fd); + void RegisterIOForward(std::string const& cfored, task_id_t task_id, int fd); void TaskProcOnCforedStopped(std::string const& cfored, task_id_t task_id); private: struct RegisterElem { std::string cfored; task_id_t task_id; - int in_fd; - int out_fd; + int fd; }; struct TaskStopElem { diff --git a/src/Craned/TaskManager.cpp b/src/Craned/TaskManager.cpp index 949e41213..491837468 100644 --- a/src/Craned/TaskManager.cpp +++ b/src/Craned/TaskManager.cpp @@ -21,6 +21,7 @@ #include #include #include +#include #include #include "CforedClient.h" @@ -102,7 +103,9 @@ TaskManager::TaskManager() { m_sigchld_handle_ = m_uvw_loop_->resource(); m_sigchld_handle_->on( - [this](const uvw::signal_event&, uvw::signal_handle&) { EvSigchldCb_(); }); + [this](const uvw::signal_event&, uvw::signal_handle&) { + EvSigchldCb_(); + }); if (m_sigchld_handle_->start(SIGCLD) != 0) { CRANE_ERROR("Failed to start the SIGCLD handle"); @@ -539,26 +542,6 @@ CraneErr TaskManager::SpawnProcessInInstance_(TaskInstance* instance, return CraneErr::kSystemErr; } - // Create IO socket pair for crun tasks. - if (instance->IsCrun()) { - if (socketpair(AF_UNIX, SOCK_STREAM, 0, io_in_sock_pair) != 0) { - CRANE_ERROR("Failed to create socket pair for task io forward: {}", - strerror(errno)); - return CraneErr::kSystemErr; - } - - if (socketpair(AF_UNIX, SOCK_STREAM, 0, io_out_sock_pair) != 0) { - CRANE_ERROR("Failed to create socket pair for task io forward: {}", - strerror(errno)); - return CraneErr::kSystemErr; - } - - auto* crun_meta = - dynamic_cast(instance->meta.get()); - crun_meta->proc_in_fd = io_in_sock_pair[0]; - crun_meta->proc_out_fd = io_out_sock_pair[0]; - } - // save the current uid/gid SavedPrivilege saved_priv{getuid(), getgid()}; @@ -575,7 +558,15 @@ CraneErr TaskManager::SpawnProcessInInstance_(TaskInstance* instance, return CraneErr::kSystemErr; } - pid_t child_pid = fork(); + pid_t child_pid; + + if (instance->IsCrun()) { + auto* crun_meta = + dynamic_cast(instance->meta.get()); + child_pid = forkpty(&crun_meta->msg_fd, NULL, NULL, NULL); + } else { + child_pid = fork(); + } if (child_pid == -1) { CRANE_ERROR("fork() failed for task #{}: {}", instance->task.task_id(), strerror(errno)); @@ -591,7 +582,7 @@ CraneErr TaskManager::SpawnProcessInInstance_(TaskInstance* instance, auto* meta = dynamic_cast(instance->meta.get()); g_cfored_manager->RegisterIOForward( instance->task.interactive_meta().cfored_name(), - instance->task.task_id(), meta->proc_in_fd, meta->proc_out_fd); + instance->task.task_id(), meta->msg_fd); } int ctrl_fd = ctrl_sock_pair[0]; diff --git a/src/Craned/TaskManager.h b/src/Craned/TaskManager.h index 2632db685..3ee9449b6 100644 --- a/src/Craned/TaskManager.h +++ b/src/Craned/TaskManager.h @@ -119,8 +119,7 @@ struct BatchMetaInTaskInstance : MetaInTaskInstance { }; struct CrunMetaInTaskInstance : MetaInTaskInstance { - int proc_in_fd; - int proc_out_fd; + int msg_fd; ~CrunMetaInTaskInstance() override = default; }; @@ -141,7 +140,7 @@ struct TaskInstance { } if (this->IsCrun()) { - close(dynamic_cast(meta.get())->proc_in_fd); + close(dynamic_cast(meta.get())->msg_fd); } } From 7473a953edfb337cbb81cf3235603db58e848b98 Mon Sep 17 00:00:00 2001 From: Li Junlin Date: Thu, 14 Nov 2024 12:00:33 +0800 Subject: [PATCH 02/11] Refactor: Use original grpc message id order Signed-off-by: Li Junlin --- protos/PublicDefs.proto | 8 ++++---- src/CraneCtld/CtldPublicDefs.h | 5 ++++- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/protos/PublicDefs.proto b/protos/PublicDefs.proto index 0db577c9d..6e28d983a 100644 --- a/protos/PublicDefs.proto +++ b/protos/PublicDefs.proto @@ -219,10 +219,10 @@ message BatchTaskAdditionalMeta { } message InteractiveTaskAdditionalMeta { - InteractiveTaskType interactive_type = 1; - string cfored_name = 2; - string sh_script = 3; - string term_env = 4; + string cfored_name = 1; + string sh_script = 2; + string term_env = 3; + InteractiveTaskType interactive_type = 4; bool pty = 5; } diff --git a/src/CraneCtld/CtldPublicDefs.h b/src/CraneCtld/CtldPublicDefs.h index 2e3eeb252..56597006c 100644 --- a/src/CraneCtld/CtldPublicDefs.h +++ b/src/CraneCtld/CtldPublicDefs.h @@ -225,6 +225,7 @@ struct InteractiveMetaInTask { std::string sh_script; std::string term_env; + bool pty; std::function const&)> cb_task_res_allocated; @@ -493,8 +494,10 @@ struct TaskInCtld { InteractiveMeta.interactive_type = val.interactive_meta().interactive_type(); if (InteractiveMeta.interactive_type == - crane::grpc::InteractiveTaskType::Crun) + crane::grpc::InteractiveTaskType::Crun) { InteractiveMeta.term_env = val.interactive_meta().term_env(); + InteractiveMeta.pty = val.interactive_meta().pty(); + } } node_num = val.node_num(); From dfef36540bb871c611f53d31ed7ea92c28c04047 Mon Sep 17 00:00:00 2001 From: Li Junlin Date: Fri, 15 Nov 2024 11:24:03 +0800 Subject: [PATCH 03/11] Fix: Fix ci compile error Signed-off-by: Li Junlin --- CMakeLists.txt | 1 + src/Craned/CMakeLists.txt | 1 + 2 files changed, 2 insertions(+) diff --git a/CMakeLists.txt b/CMakeLists.txt index 9a981c68e..b065eef2d 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -278,6 +278,7 @@ set(DEPENDENCIES_PRE_INSTALLED_DIR ${CMAKE_CURRENT_SOURCE_DIR}/dependencies/pre_ add_subdirectory(${DEPENDENCIES_PRE_INSTALLED_DIR}) find_package(Threads REQUIRED) +find_library(LIBUTIL_LIBRARY util) # New in version cmake3.24: # Set ZLIB_USE_STATIC_LIBS to ON to look for static libraries. Default is OFF. diff --git a/src/Craned/CMakeLists.txt b/src/Craned/CMakeLists.txt index f7d3902bc..a1d5accf6 100644 --- a/src/Craned/CMakeLists.txt +++ b/src/Craned/CMakeLists.txt @@ -44,6 +44,7 @@ target_link_libraries(craned cxxopts Threads::Threads + ${LIBUTIL_LIBRARY} nlohmann_json::nlohmann_json absl::flat_hash_map From f82b9bfb5955b036f7d50f3c562d40e8fbe03b22 Mon Sep 17 00:00:00 2001 From: Li Junlin Date: Tue, 19 Nov 2024 16:35:22 +0800 Subject: [PATCH 04/11] feat: Crun support pty for single node Signed-off-by: Li Junlin --- src/CraneCtld/CranedKeeper.cpp | 1 + src/Craned/CforedClient.cpp | 50 +++++++++++++++++++++++----------- src/Craned/CforedClient.h | 4 ++- src/Craned/TaskManager.cpp | 45 ++++++++++++++++++------------ 4 files changed, 66 insertions(+), 34 deletions(-) diff --git a/src/CraneCtld/CranedKeeper.cpp b/src/CraneCtld/CranedKeeper.cpp index 863bfb5dd..dd67ce3d5 100644 --- a/src/CraneCtld/CranedKeeper.cpp +++ b/src/CraneCtld/CranedKeeper.cpp @@ -319,6 +319,7 @@ crane::grpc::ExecuteTasksRequest CranedStub::NewExecuteTasksRequests( mutable_meta->set_sh_script(meta_in_ctld.sh_script); mutable_meta->set_term_env(meta_in_ctld.term_env); mutable_meta->set_interactive_type(meta_in_ctld.interactive_type); + mutable_meta->set_pty(meta_in_ctld.pty); } } diff --git a/src/Craned/CforedClient.cpp b/src/Craned/CforedClient.cpp index 066b1b600..01c9447f7 100644 --- a/src/Craned/CforedClient.cpp +++ b/src/Craned/CforedClient.cpp @@ -345,8 +345,8 @@ void CforedManager::EvLoopThread_(const std::shared_ptr& uvw_loop) { } void CforedManager::RegisterIOForward(std::string const& cfored, - task_id_t task_id, int fd) { - RegisterElem elem{.cfored = cfored, .task_id = task_id, .fd = fd}; + task_id_t task_id, int fd, bool pty) { + RegisterElem elem{.cfored = cfored, .task_id = task_id, .fd = fd, .pty = pty}; std::promise done; std::future done_fut = done.get_future(); @@ -396,28 +396,30 @@ void CforedManager::RegisterCb_() { char buf[MAX_BUF_SIZE]; auto ret = read(elem.fd, buf, MAX_BUF_SIZE); + bool read_finished{false}; + if (ret == 0) { - CRANE_ASSERT(false); + if (!elem.pty) { + read_finished = true; + } else { + // For pty,do nothing, process exit on return -1 and error set to EIO + CRANE_TRACE("Read EOF from pty task #{} on cfored {}", elem.task_id, + elem.cfored); + } } if (ret == -1) { + if (!elem.pty) { + CRANE_ERROR("Error when reading task #{} output, error {}", + elem.task_id, std::strerror(errno)); + return; + } + if (errno == EIO) { // For pty output, the read() will return -1 with errno set to EIO // when process exit. // ref: https://unix.stackexchange.com/questions/538198 - CRANE_TRACE("Task #{} to cfored {} finished its output.", - elem.task_id, elem.cfored); - h.close(); - close(elem.fd); - - bool ok_to_free = - m_cfored_client_map_[elem.cfored]->TaskOutputFinish(elem.task_id); - if (ok_to_free) { - CRANE_TRACE("It's ok to unregister task #{} on {}", elem.task_id, - elem.cfored); - UnregisterIOForward_(elem.cfored, elem.task_id); - } - return; + read_finished = true; } else if (errno == EAGAIN) { // Read before the process begin. return; @@ -428,6 +430,22 @@ void CforedManager::RegisterCb_() { } } + if (read_finished) { + CRANE_TRACE("Task #{} to cfored {} finished its output.", elem.task_id, + elem.cfored); + h.close(); + close(elem.fd); + + bool ok_to_free = + m_cfored_client_map_[elem.cfored]->TaskOutputFinish(elem.task_id); + if (ok_to_free) { + CRANE_TRACE("It's ok to unregister task #{} on {}", elem.task_id, + elem.cfored); + UnregisterIOForward_(elem.cfored, elem.task_id); + } + return; + } + std::string output(buf, ret); CRANE_TRACE("Fwd to task #{}: {}", elem.task_id, output); m_cfored_client_map_[elem.cfored]->TaskOutPutForward(elem.task_id, diff --git a/src/Craned/CforedClient.h b/src/Craned/CforedClient.h index 038811922..65a36cff9 100644 --- a/src/Craned/CforedClient.h +++ b/src/Craned/CforedClient.h @@ -86,7 +86,8 @@ class CforedManager { bool Init(); - void RegisterIOForward(std::string const& cfored, task_id_t task_id, int fd); + void RegisterIOForward(std::string const& cfored, task_id_t task_id, int fd, + bool pty); void TaskProcOnCforedStopped(std::string const& cfored, task_id_t task_id); private: @@ -94,6 +95,7 @@ class CforedManager { std::string cfored; task_id_t task_id; int fd; + bool pty; }; struct TaskStopElem { diff --git a/src/Craned/TaskManager.cpp b/src/Craned/TaskManager.cpp index 491837468..20c623350 100644 --- a/src/Craned/TaskManager.cpp +++ b/src/Craned/TaskManager.cpp @@ -519,9 +519,10 @@ CraneErr TaskManager::SpawnProcessInInstance_(TaskInstance* instance, using crane::grpc::subprocess::CanStartMessage; using crane::grpc::subprocess::ChildProcessReady; - int ctrl_sock_pair[2]; // Socket pair for passing control messages. - int io_in_sock_pair[2]; // Socket pair for forwarding IO of crun tasks. - int io_out_sock_pair[2]; // Socket pair for forwarding IO of crun tasks. + int ctrl_sock_pair[2]; // Socket pair for passing control messages. + + // Socket pair for forwarding IO of crun tasks. Craned read from index 0. + int crun_msg_sock_pair[2]; // The ResourceInNode structure should be copied here for being accessed in // the child process. @@ -559,11 +560,25 @@ CraneErr TaskManager::SpawnProcessInInstance_(TaskInstance* instance, } pid_t child_pid; + bool launch_pty{false}; if (instance->IsCrun()) { auto* crun_meta = dynamic_cast(instance->meta.get()); - child_pid = forkpty(&crun_meta->msg_fd, NULL, NULL, NULL); + CRANE_DEBUG("Launching crun task #{} pty:{}", instance->task.task_id(), + instance->task.interactive_meta().pty()); + if (instance->task.interactive_meta().pty()) { + launch_pty = true; + child_pid = forkpty(&crun_meta->msg_fd, nullptr, nullptr, nullptr); + } else { + if (socketpair(AF_UNIX, SOCK_STREAM, 0, crun_msg_sock_pair) != 0) { + CRANE_ERROR("Failed to create socket pair for task io forward: {}", + strerror(errno)); + return CraneErr::kSystemErr; + } + crun_meta->msg_fd = crun_msg_sock_pair[0]; + child_pid = fork(); + } } else { child_pid = fork(); } @@ -582,14 +597,13 @@ CraneErr TaskManager::SpawnProcessInInstance_(TaskInstance* instance, auto* meta = dynamic_cast(instance->meta.get()); g_cfored_manager->RegisterIOForward( instance->task.interactive_meta().cfored_name(), - instance->task.task_id(), meta->msg_fd); + instance->task.task_id(), meta->msg_fd, launch_pty); } int ctrl_fd = ctrl_sock_pair[0]; close(ctrl_sock_pair[1]); - if (instance->IsCrun()) { - close(io_in_sock_pair[1]); - close(io_out_sock_pair[1]); + if (instance->IsCrun() && !launch_pty) { + close(crun_msg_sock_pair[1]); } setegid(saved_priv.gid); @@ -770,16 +784,13 @@ CraneErr TaskManager::SpawnProcessInInstance_(TaskInstance* instance, } close(stdout_fd); - } else if (instance->IsCrun()) { - close(io_in_sock_pair[0]); - close(io_out_sock_pair[0]); - - dup2(io_in_sock_pair[1], 0); - close(io_in_sock_pair[1]); + } else if (instance->IsCrun() && !launch_pty) { + close(crun_msg_sock_pair[0]); - dup2(io_out_sock_pair[1], 1); - dup2(io_out_sock_pair[1], 2); - close(io_out_sock_pair[1]); + dup2(crun_msg_sock_pair[1], 0); + dup2(crun_msg_sock_pair[1], 1); + dup2(crun_msg_sock_pair[1], 2); + close(crun_msg_sock_pair[1]); } child_process_ready.set_ok(true); From 98c0d17ca7443c1ee9e2781930592dd43da6a3b7 Mon Sep 17 00:00:00 2001 From: Li Junlin Date: Tue, 26 Nov 2024 10:20:32 +0800 Subject: [PATCH 05/11] feat: Support crun --pty with multi node Signed-off-by: Li Junlin --- src/CraneCtld/TaskScheduler.cpp | 29 ++++++++++++++++++++++------- 1 file changed, 22 insertions(+), 7 deletions(-) diff --git a/src/CraneCtld/TaskScheduler.cpp b/src/CraneCtld/TaskScheduler.cpp index ffa3025e7..4acaf0c83 100644 --- a/src/CraneCtld/TaskScheduler.cpp +++ b/src/CraneCtld/TaskScheduler.cpp @@ -686,22 +686,37 @@ void TaskScheduler::ScheduleThread_() { task->allocated_craneds_regex = util::HostNameListToStr(task->CranedIds()); + // Task execute on all node, otherwise on the first node + bool launch_all_node; if (task->type == crane::grpc::Batch) { // For cbatch tasks whose --node > 1, // only execute the command at the first allocated node. - task->executing_craned_ids.emplace_back(task->CranedIds().front()); + launch_all_node = false; } else { const auto& meta = std::get(task->meta); if (meta.interactive_type == crane::grpc::Calloc) // For calloc tasks we still need to execute a dummy empty task to // set up a timer. - task->executing_craned_ids.emplace_back(task->CranedIds().front()); - else - // For crun tasks we need to execute tasks on all allocated nodes. - for (auto const& craned_id : task->CranedIds()) - task->executing_craned_ids.emplace_back(craned_id); + launch_all_node = false; + else { + // For crun tasks we need to execute tasks on all allocated + // nodes. + + // Crun task with pty only launch on first node + if (task->TaskToCtld().interactive_meta().pty()) + launch_all_node = false; + else + launch_all_node = true; + } } + + if (launch_all_node) { + for (auto const& craned_id : task->CranedIds()) + task->executing_craned_ids.emplace_back(craned_id); + } else + task->executing_craned_ids.emplace_back(task->CranedIds().front()); } + end = std::chrono::steady_clock::now(); CRANE_TRACE( "Set task fields costed {} ms", @@ -1636,7 +1651,7 @@ void TaskScheduler::CleanTaskStatusChangeQueueCb_() { } meta.cb_task_completed(task->TaskId()); } else { // Crun - if (++meta.status_change_cnt < task->node_num) { + if (++meta.status_change_cnt < task->excluded_nodes.size()) { CRANE_TRACE( "{}/{} TaskStatusChanges of Crun task #{} were received. " "Keep waiting...", From 83735bd64ec977147c75a01b9f35ead9e7b5647b Mon Sep 17 00:00:00 2001 From: Li Junlin Date: Tue, 26 Nov 2024 21:29:47 +0800 Subject: [PATCH 06/11] refactor: Refactor cfored/crun/calloc task completion code. Signed-off-by: Li Junlin --- src/CraneCtld/CtldGrpcServer.cpp | 17 +++++---- src/CraneCtld/CtldPublicDefs.h | 5 +-- src/CraneCtld/TaskScheduler.cpp | 59 ++++++++++++++++---------------- src/CraneCtld/TaskScheduler.h | 19 ++++++++-- 4 files changed, 56 insertions(+), 44 deletions(-) diff --git a/src/CraneCtld/CtldGrpcServer.cpp b/src/CraneCtld/CtldGrpcServer.cpp index a27cae29d..a9946c2af 100644 --- a/src/CraneCtld/CtldGrpcServer.cpp +++ b/src/CraneCtld/CtldGrpcServer.cpp @@ -776,15 +776,16 @@ grpc::Status CraneCtldServiceImpl::CforedStream( }; meta.cb_task_cancel = [writer_weak_ptr](task_id_t task_id) { + CRANE_TRACE("Sending TaskCancelRequest in task_cancel", task_id); if (auto writer = writer_weak_ptr.lock(); writer) writer->WriteTaskCancelRequest(task_id); }; - meta.cb_task_completed = [this, i_type, cfored_name, - writer_weak_ptr](task_id_t task_id) { - CRANE_TRACE("Sending TaskCompletionAckReply in task_completed", - task_id); - if (auto writer = writer_weak_ptr.lock(); writer) + meta.cb_task_completed = [this, i_type, cfored_name, writer_weak_ptr]( + task_id_t task_id, + bool send_comlete_ack) { + if (auto writer = writer_weak_ptr.lock(); + writer && send_comlete_ack) writer->WriteTaskCompletionAckReply(task_id); m_ctld_server_->m_mtx_.Lock(); @@ -830,8 +831,7 @@ grpc::Status CraneCtldServiceImpl::CforedStream( case StreamCforedRequest::TASK_COMPLETION_REQUEST: { auto const &payload = cfored_request.payload_task_complete_req(); CRANE_TRACE("Recv TaskCompletionReq of Task #{}", payload.task_id()); - - if (g_task_scheduler->TerminatePendingOrRunningTask( + if (g_task_scheduler->TerminateInteractivePendingOrRunningTask( payload.task_id()) != CraneErr::kOk) stream_writer->WriteTaskCompletionAckReply(payload.task_id()); } break; @@ -965,8 +965,7 @@ CtldServer::SubmitTaskToScheduler(std::unique_ptr task) { task->Username(), task->partition_id, task->account)); } - auto enable_res = - g_account_manager->CheckIfUserOfAccountIsEnabled( + auto enable_res = g_account_manager->CheckIfUserOfAccountIsEnabled( task->Username(), task->account); if (!enable_res) { return std::unexpected(enable_res.error()); diff --git a/src/CraneCtld/CtldPublicDefs.h b/src/CraneCtld/CtldPublicDefs.h index 56597006c..1e7b92386 100644 --- a/src/CraneCtld/CtldPublicDefs.h +++ b/src/CraneCtld/CtldPublicDefs.h @@ -229,9 +229,10 @@ struct InteractiveMetaInTask { std::function const&)> cb_task_res_allocated; - std::function cb_task_completed; - // only for calloc. + std::function cb_task_completed; + + // This will ask front end like crun/calloc to exit std::function cb_task_cancel; // only for crun. diff --git a/src/CraneCtld/TaskScheduler.cpp b/src/CraneCtld/TaskScheduler.cpp index 4acaf0c83..23258d738 100644 --- a/src/CraneCtld/TaskScheduler.cpp +++ b/src/CraneCtld/TaskScheduler.cpp @@ -1288,18 +1288,12 @@ crane::grpc::CancelTaskReply TaskScheduler::CancelPendingOrRunningTask( reply.add_not_cancelled_tasks(task_id); reply.add_not_cancelled_reasons("Permission Denied."); } else { - bool is_calloc = false; if (task->type == crane::grpc::Interactive) { auto& meta = std::get(task->meta); - if (meta.interactive_type == crane::grpc::Calloc) is_calloc = true; - - if (is_calloc && !meta.has_been_cancelled_on_front_end) { + if (!meta.has_been_cancelled_on_front_end) { meta.has_been_cancelled_on_front_end = true; meta.cb_task_cancel(task_id); } - } - - if (is_calloc) { reply.add_cancelled_tasks(task_id); } else { CraneErr err = TerminateRunningTaskNoLock_(task); @@ -1472,8 +1466,12 @@ void TaskScheduler::CleanCancelQueueCb_() { if (task->type == crane::grpc::Interactive) { auto& meta = std::get(task->meta); - g_thread_pool->detach_task([cb = meta.cb_task_cancel, - task_id = task->TaskId()] { cb(task_id); }); + // Cancel request may come from crun/calloc + if (!meta.has_been_cancelled_on_front_end) { + meta.has_been_cancelled_on_front_end = true; + g_thread_pool->detach_task([cb = meta.cb_task_cancel, + task_id = task->TaskId()] { cb(task_id); }); + } } } @@ -1634,34 +1632,35 @@ void TaskScheduler::CleanTaskStatusChangeQueueCb_() { task->SetStatus(new_status); } else { auto& meta = std::get(task->meta); - if (meta.interactive_type == crane::grpc::Calloc) { - // TaskStatusChange may indicate the time limit has been reached and - // the task has been terminated. No more TerminateTask RPC should be - // sent to the craned node if any further CancelTask or - // TaskCompletionRequest RPC is received. - meta.has_been_terminated_on_craned = true; - - if (new_status == crane::grpc::ExceedTimeLimit || - exit_code == ExitCode::kExitCodeCranedDown) { - meta.has_been_cancelled_on_front_end = true; - meta.cb_task_cancel(task->TaskId()); - task->SetStatus(new_status); - } else { - task->SetStatus(crane::grpc::Completed); - } - meta.cb_task_completed(task->TaskId()); - } else { // Crun - if (++meta.status_change_cnt < task->excluded_nodes.size()) { + if (meta.interactive_type == crane::grpc::Crun) { // Crun + if (++meta.status_change_cnt < task->executing_craned_ids.size()) { CRANE_TRACE( "{}/{} TaskStatusChanges of Crun task #{} were received. " "Keep waiting...", - meta.status_change_cnt, task->node_num, task->TaskId()); + meta.status_change_cnt, task->executing_craned_ids.size(), + task->TaskId()); continue; } + } - task->SetStatus(new_status); - meta.cb_task_completed(task->TaskId()); + // TaskStatusChange may indicate the time limit has been reached and + // the task has been terminated. No more TerminateTask RPC should be + // sent to the craned node if any further CancelTask or + // TaskCompletionRequest RPC is received. + + // Task end triggered by craned. + if (!meta.has_been_cancelled_on_front_end) { + meta.has_been_cancelled_on_front_end = true; + meta.cb_task_cancel(task->TaskId()); + // Completion ack will send in grpc server triggered by task complete + // req + meta.cb_task_completed(task->TaskId(), false); + } else { + // Send Completion Ack to frontend now. + meta.cb_task_completed(task->TaskId(), true); } + + task->SetStatus(new_status); } task->SetExitCode(exit_code); diff --git a/src/CraneCtld/TaskScheduler.h b/src/CraneCtld/TaskScheduler.h index 32eab6508..a133f42ad 100644 --- a/src/CraneCtld/TaskScheduler.h +++ b/src/CraneCtld/TaskScheduler.h @@ -254,21 +254,34 @@ class TaskScheduler { crane::grpc::CancelTaskReply CancelPendingOrRunningTask( const crane::grpc::CancelTaskRequest& request); - CraneErr TerminatePendingOrRunningTask(uint32_t task_id) { + CraneErr TerminateInteractivePendingOrRunningTask(uint32_t task_id) { LockGuard pending_guard(&m_pending_task_map_mtx_); LockGuard running_guard(&m_running_task_map_mtx_); auto pd_it = m_pending_task_map_.find(task_id); if (pd_it != m_pending_task_map_.end()) { + auto& task = pd_it->second; + if (task->type == crane::grpc::TaskType::Interactive) { + auto& meta = std::get(task->meta); + meta.has_been_cancelled_on_front_end = true; + } m_cancel_task_queue_.enqueue( - CancelPendingTaskQueueElem{.task = std::move(pd_it->second)}); + CancelPendingTaskQueueElem{.task = std::move(task)}); m_cancel_task_async_handle_->send(); m_pending_task_map_.erase(pd_it); return CraneErr::kOk; } auto rn_it = m_running_task_map_.find(task_id); - if (rn_it == m_running_task_map_.end()) return CraneErr::kNonExistent; + if (rn_it == m_running_task_map_.end()) + return CraneErr::kNonExistent; + else { + auto& task = rn_it->second; + if (task->type == crane::grpc::TaskType::Interactive) { + auto& meta = std::get(task->meta); + meta.has_been_cancelled_on_front_end = true; + } + } return TerminateRunningTaskNoLock_(rn_it->second.get()); } From cb873b019c7fd10f313f31abc442fd93d42a1bd7 Mon Sep 17 00:00:00 2001 From: Li Junlin Date: Tue, 26 Nov 2024 21:37:29 +0800 Subject: [PATCH 07/11] fix: Fix crun/calloc fail when cancel pending task by ctrl c Signed-off-by: Li Junlin --- src/CraneCtld/TaskScheduler.cpp | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/src/CraneCtld/TaskScheduler.cpp b/src/CraneCtld/TaskScheduler.cpp index 23258d738..383c26453 100644 --- a/src/CraneCtld/TaskScheduler.cpp +++ b/src/CraneCtld/TaskScheduler.cpp @@ -1466,11 +1466,17 @@ void TaskScheduler::CleanCancelQueueCb_() { if (task->type == crane::grpc::Interactive) { auto& meta = std::get(task->meta); - // Cancel request may come from crun/calloc + // Cancel request may not come from crun/calloc, ask them to exit if (!meta.has_been_cancelled_on_front_end) { meta.has_been_cancelled_on_front_end = true; g_thread_pool->detach_task([cb = meta.cb_task_cancel, task_id = task->TaskId()] { cb(task_id); }); + } else { + // Cancel request from crun/calloc, reply CompletionAck + g_thread_pool->detach_task( + [cb = meta.cb_task_completed, task_id = task->TaskId()] { + cb(task_id, true); + }); } } } From be2fc468d79f0501ca9ffd9de756f71a7f4048ca Mon Sep 17 00:00:00 2001 From: Li Junlin Date: Fri, 6 Dec 2024 09:32:49 +0800 Subject: [PATCH 08/11] refactor Signed-off-by: Li Junlin --- src/Craned/TaskManager.cpp | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/src/Craned/TaskManager.cpp b/src/Craned/TaskManager.cpp index 20c623350..729ea7efd 100644 --- a/src/Craned/TaskManager.cpp +++ b/src/Craned/TaskManager.cpp @@ -565,10 +565,11 @@ CraneErr TaskManager::SpawnProcessInInstance_(TaskInstance* instance, if (instance->IsCrun()) { auto* crun_meta = dynamic_cast(instance->meta.get()); - CRANE_DEBUG("Launching crun task #{} pty:{}", instance->task.task_id(), - instance->task.interactive_meta().pty()); - if (instance->task.interactive_meta().pty()) { - launch_pty = true; + launch_pty = instance->task.interactive_meta().pty(); + CRANE_DEBUG("Launch crun task #{} pty:{}", instance->task.task_id(), + launch_pty); + + if (launch_pty) { child_pid = forkpty(&crun_meta->msg_fd, nullptr, nullptr, nullptr); } else { if (socketpair(AF_UNIX, SOCK_STREAM, 0, crun_msg_sock_pair) != 0) { @@ -582,6 +583,7 @@ CraneErr TaskManager::SpawnProcessInInstance_(TaskInstance* instance, } else { child_pid = fork(); } + if (child_pid == -1) { CRANE_ERROR("fork() failed for task #{}: {}", instance->task.task_id(), strerror(errno)); From d5d0bd0265c184661a4dbd3678c1c9fd7e7b999f Mon Sep 17 00:00:00 2001 From: Li Junlin Date: Wed, 11 Dec 2024 14:03:55 +0800 Subject: [PATCH 09/11] chore: Typo Signed-off-by: Li Junlin --- src/CraneCtld/CtldGrpcServer.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/CraneCtld/CtldGrpcServer.cpp b/src/CraneCtld/CtldGrpcServer.cpp index a9946c2af..6d21876bc 100644 --- a/src/CraneCtld/CtldGrpcServer.cpp +++ b/src/CraneCtld/CtldGrpcServer.cpp @@ -783,9 +783,9 @@ grpc::Status CraneCtldServiceImpl::CforedStream( meta.cb_task_completed = [this, i_type, cfored_name, writer_weak_ptr]( task_id_t task_id, - bool send_comlete_ack) { + bool send_complete_ack) { if (auto writer = writer_weak_ptr.lock(); - writer && send_comlete_ack) + writer && send_complete_ack) writer->WriteTaskCompletionAckReply(task_id); m_ctld_server_->m_mtx_.Lock(); From 01c1d8a7e9d82e7e24e948cc3212be94bb58c090 Mon Sep 17 00:00:00 2001 From: RileyW Date: Wed, 11 Dec 2024 16:36:24 +0800 Subject: [PATCH 10/11] Refactor. Signed-off-by: RileyW --- src/CraneCtld/CtldGrpcServer.cpp | 35 ++++++++++++++++---------------- src/CraneCtld/CtldGrpcServer.h | 10 +++------ src/CraneCtld/TaskScheduler.cpp | 12 +++++------ src/CraneCtld/TaskScheduler.h | 2 +- src/Craned/TaskManager.cpp | 18 ++++++++-------- 5 files changed, 37 insertions(+), 40 deletions(-) diff --git a/src/CraneCtld/CtldGrpcServer.cpp b/src/CraneCtld/CtldGrpcServer.cpp index 6d21876bc..00e31b9fc 100644 --- a/src/CraneCtld/CtldGrpcServer.cpp +++ b/src/CraneCtld/CtldGrpcServer.cpp @@ -71,7 +71,7 @@ grpc::Status CraneCtldServiceImpl::SubmitBatchTasks( results.emplace_back(std::move(result)); } - for (auto& res : results) { + for (auto &res : results) { if (res.has_value()) response->mutable_task_id_list()->Add(res.value().get()); else @@ -732,21 +732,22 @@ grpc::Status CraneCtldServiceImpl::CforedStream( CRANE_ERROR("Expect type CFORED_REGISTRATION from peer {}.", context->peer()); return Status::CANCELLED; - } else { - cfored_name = cfored_request.payload_cfored_reg().cfored_name(); - CRANE_INFO("Cfored {} registered.", cfored_name); + } - ok = stream_writer->WriteCforedRegistrationAck({}); - if (ok) { - state = StreamState::kWaitMsg; - } else { - CRANE_ERROR( - "Failed to send msg to cfored {}. Connection is broken. " - "Exiting...", - cfored_name); - state = StreamState::kCleanData; - } + cfored_name = cfored_request.payload_cfored_reg().cfored_name(); + CRANE_INFO("Cfored {} registered.", cfored_name); + + ok = stream_writer->WriteCforedRegistrationAck({}); + if (ok) { + state = StreamState::kWaitMsg; + } else { + CRANE_ERROR( + "Failed to send msg to cfored {}. Connection is broken. " + "Exiting...", + cfored_name); + state = StreamState::kCleanData; } + } else { state = StreamState::kCleanData; } @@ -783,9 +784,9 @@ grpc::Status CraneCtldServiceImpl::CforedStream( meta.cb_task_completed = [this, i_type, cfored_name, writer_weak_ptr]( task_id_t task_id, - bool send_complete_ack) { + bool send_completion_ack) { if (auto writer = writer_weak_ptr.lock(); - writer && send_complete_ack) + writer && send_completion_ack) writer->WriteTaskCompletionAckReply(task_id); m_ctld_server_->m_mtx_.Lock(); @@ -831,7 +832,7 @@ grpc::Status CraneCtldServiceImpl::CforedStream( case StreamCforedRequest::TASK_COMPLETION_REQUEST: { auto const &payload = cfored_request.payload_task_complete_req(); CRANE_TRACE("Recv TaskCompletionReq of Task #{}", payload.task_id()); - if (g_task_scheduler->TerminateInteractivePendingOrRunningTask( + if (g_task_scheduler->TerminatePendingOrRunningIaTask( payload.task_id()) != CraneErr::kOk) stream_writer->WriteTaskCompletionAckReply(payload.task_id()); } break; diff --git a/src/CraneCtld/CtldGrpcServer.h b/src/CraneCtld/CtldGrpcServer.h index fdc24d0ac..a4a3c018c 100644 --- a/src/CraneCtld/CtldGrpcServer.h +++ b/src/CraneCtld/CtldGrpcServer.h @@ -44,8 +44,7 @@ class CforedStreamWriter { crane::grpc::StreamCforedRequest> *stream) : m_stream_(stream), m_valid_(true) {} - bool WriteTaskIdReply(pid_t calloc_pid, - std::expected res) { + bool WriteTaskIdReply(pid_t calloc_pid, CraneExpected res) { LockGuard guard(&m_stream_mtx_); if (!m_valid_) return false; @@ -67,9 +66,7 @@ class CforedStreamWriter { bool WriteTaskResAllocReply( task_id_t task_id, - std::expected>, - std::string> - res) { + CraneExpected>> res) { LockGuard guard(&m_stream_mtx_); if (!m_valid_) return false; @@ -121,8 +118,7 @@ class CforedStreamWriter { return m_stream_->Write(reply); } - bool WriteCforedRegistrationAck( - const std::expected &res) { + bool WriteCforedRegistrationAck(const CraneExpected &res) { LockGuard guard(&m_stream_mtx_); if (!m_valid_) return false; diff --git a/src/CraneCtld/TaskScheduler.cpp b/src/CraneCtld/TaskScheduler.cpp index 383c26453..46835eb25 100644 --- a/src/CraneCtld/TaskScheduler.cpp +++ b/src/CraneCtld/TaskScheduler.cpp @@ -687,30 +687,30 @@ void TaskScheduler::ScheduleThread_() { util::HostNameListToStr(task->CranedIds()); // Task execute on all node, otherwise on the first node - bool launch_all_node; + bool launch_on_all_nodes; if (task->type == crane::grpc::Batch) { // For cbatch tasks whose --node > 1, // only execute the command at the first allocated node. - launch_all_node = false; + launch_on_all_nodes = false; } else { const auto& meta = std::get(task->meta); if (meta.interactive_type == crane::grpc::Calloc) // For calloc tasks we still need to execute a dummy empty task to // set up a timer. - launch_all_node = false; + launch_on_all_nodes = false; else { // For crun tasks we need to execute tasks on all allocated // nodes. // Crun task with pty only launch on first node if (task->TaskToCtld().interactive_meta().pty()) - launch_all_node = false; + launch_on_all_nodes = false; else - launch_all_node = true; + launch_on_all_nodes = true; } } - if (launch_all_node) { + if (launch_on_all_nodes) { for (auto const& craned_id : task->CranedIds()) task->executing_craned_ids.emplace_back(craned_id); } else diff --git a/src/CraneCtld/TaskScheduler.h b/src/CraneCtld/TaskScheduler.h index a133f42ad..1896e59de 100644 --- a/src/CraneCtld/TaskScheduler.h +++ b/src/CraneCtld/TaskScheduler.h @@ -254,7 +254,7 @@ class TaskScheduler { crane::grpc::CancelTaskReply CancelPendingOrRunningTask( const crane::grpc::CancelTaskRequest& request); - CraneErr TerminateInteractivePendingOrRunningTask(uint32_t task_id) { + CraneErr TerminatePendingOrRunningIaTask(uint32_t task_id) { LockGuard pending_guard(&m_pending_task_map_mtx_); LockGuard running_guard(&m_running_task_map_mtx_); diff --git a/src/Craned/TaskManager.cpp b/src/Craned/TaskManager.cpp index 729ea7efd..1ef5d37b8 100644 --- a/src/Craned/TaskManager.cpp +++ b/src/Craned/TaskManager.cpp @@ -522,7 +522,7 @@ CraneErr TaskManager::SpawnProcessInInstance_(TaskInstance* instance, int ctrl_sock_pair[2]; // Socket pair for passing control messages. // Socket pair for forwarding IO of crun tasks. Craned read from index 0. - int crun_msg_sock_pair[2]; + int crun_io_sock_pair[2]; // The ResourceInNode structure should be copied here for being accessed in // the child process. @@ -572,12 +572,12 @@ CraneErr TaskManager::SpawnProcessInInstance_(TaskInstance* instance, if (launch_pty) { child_pid = forkpty(&crun_meta->msg_fd, nullptr, nullptr, nullptr); } else { - if (socketpair(AF_UNIX, SOCK_STREAM, 0, crun_msg_sock_pair) != 0) { + if (socketpair(AF_UNIX, SOCK_STREAM, 0, crun_io_sock_pair) != 0) { CRANE_ERROR("Failed to create socket pair for task io forward: {}", strerror(errno)); return CraneErr::kSystemErr; } - crun_meta->msg_fd = crun_msg_sock_pair[0]; + crun_meta->msg_fd = crun_io_sock_pair[0]; child_pid = fork(); } } else { @@ -605,7 +605,7 @@ CraneErr TaskManager::SpawnProcessInInstance_(TaskInstance* instance, int ctrl_fd = ctrl_sock_pair[0]; close(ctrl_sock_pair[1]); if (instance->IsCrun() && !launch_pty) { - close(crun_msg_sock_pair[1]); + close(crun_io_sock_pair[1]); } setegid(saved_priv.gid); @@ -787,12 +787,12 @@ CraneErr TaskManager::SpawnProcessInInstance_(TaskInstance* instance, close(stdout_fd); } else if (instance->IsCrun() && !launch_pty) { - close(crun_msg_sock_pair[0]); + close(crun_io_sock_pair[0]); - dup2(crun_msg_sock_pair[1], 0); - dup2(crun_msg_sock_pair[1], 1); - dup2(crun_msg_sock_pair[1], 2); - close(crun_msg_sock_pair[1]); + dup2(crun_io_sock_pair[1], 0); + dup2(crun_io_sock_pair[1], 1); + dup2(crun_io_sock_pair[1], 2); + close(crun_io_sock_pair[1]); } child_process_ready.set_ok(true); From 64acca1de59a688b5c97d028631bcf5a3dbfaf37 Mon Sep 17 00:00:00 2001 From: RileyW Date: Wed, 11 Dec 2024 16:49:06 +0800 Subject: [PATCH 11/11] Fix compilation error. Signed-off-by: RileyW --- src/CraneCtld/CtldGrpcServer.h | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/src/CraneCtld/CtldGrpcServer.h b/src/CraneCtld/CtldGrpcServer.h index a4a3c018c..d35493fec 100644 --- a/src/CraneCtld/CtldGrpcServer.h +++ b/src/CraneCtld/CtldGrpcServer.h @@ -44,7 +44,8 @@ class CforedStreamWriter { crane::grpc::StreamCforedRequest> *stream) : m_stream_(stream), m_valid_(true) {} - bool WriteTaskIdReply(pid_t calloc_pid, CraneExpected res) { + bool WriteTaskIdReply(pid_t calloc_pid, + std::expected res) { LockGuard guard(&m_stream_mtx_); if (!m_valid_) return false; @@ -66,7 +67,8 @@ class CforedStreamWriter { bool WriteTaskResAllocReply( task_id_t task_id, - CraneExpected>> res) { + std::expected>, std::string> + res) { LockGuard guard(&m_stream_mtx_); if (!m_valid_) return false; @@ -118,7 +120,7 @@ class CforedStreamWriter { return m_stream_->Write(reply); } - bool WriteCforedRegistrationAck(const CraneExpected &res) { + bool WriteCforedRegistrationAck(const std::expected &res) { LockGuard guard(&m_stream_mtx_); if (!m_valid_) return false;