Skip to content

Commit

Permalink
fix deadlock when launch task (#323)
Browse files Browse the repository at this point in the history
* fix deadlock when launch task

Signed-off-by: xiafeng <[email protected]>

* fix deadlock when launch task

Signed-off-by: xiafeng <[email protected]>

* Refactor.

* Remove useless header.

---------

Signed-off-by: xiafeng <[email protected]>
Co-authored-by: RileyWen <[email protected]>
  • Loading branch information
L-Xiafeng and RileyWen authored Sep 25, 2024
1 parent 54e662f commit 8a21a72
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 16 deletions.
33 changes: 24 additions & 9 deletions src/Craned/CgroupManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -486,20 +486,21 @@ std::optional<std::string> CgroupManager::QueryTaskExecutionNode(
return this->m_task_id_to_cg_spec_map_[task_id]->execution_node;
}

std::vector<EnvPair> CgroupManager::GetResourceEnvListOfTask(
std::optional<crane::grpc::ResourceInNode> CgroupManager::GetTaskResourceInNode(
task_id_t task_id) {
std::vector<EnvPair> env_vec;
std::optional<crane::grpc::ResourceInNode> res;

auto cg_spec_ptr = m_task_id_to_cg_spec_map_[task_id];
if (!cg_spec_ptr) {
CRANE_ERROR("Trying to get resource env list of a non-existent task #{}",
task_id);
return env_vec;
auto cg_spec_ptr = this->m_task_id_to_cg_spec_map_[task_id];
if (cg_spec_ptr) {
res = cg_spec_ptr->res_in_node;
}

const auto &res_in_node = cg_spec_ptr->res_in_node;
return res;
}

env_vec = DeviceManager::GetDevEnvListByResInNode(
std::vector<EnvPair> CgroupManager::GetResourceEnvListByResInNode(
const crane::grpc::ResourceInNode &res_in_node) {
std::vector<EnvPair> env_vec = DeviceManager::GetDevEnvListByResInNode(
res_in_node.dedicated_res_in_node());

env_vec.emplace_back(
Expand All @@ -511,6 +512,20 @@ std::vector<EnvPair> CgroupManager::GetResourceEnvListOfTask(
return env_vec;
}

std::vector<EnvPair> CgroupManager::GetResourceEnvListOfTask(
task_id_t task_id) {
std::vector<EnvPair> res;

auto cg_spec_ptr = m_task_id_to_cg_spec_map_[task_id];
if (cg_spec_ptr)
res = GetResourceEnvListByResInNode(cg_spec_ptr->res_in_node);
else
CRANE_ERROR("Trying to get resource env list of a non-existent task #{}",
task_id);

return res;
}

bool Cgroup::MigrateProcIn(pid_t pid) {
using CgroupConstant::Controller;
using CgroupConstant::GetControllerStringView;
Expand Down
6 changes: 6 additions & 0 deletions src/Craned/CgroupManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,12 @@ class CgroupManager {

bool ReleaseCgroupByTaskIdOnly(task_id_t task_id);

std::optional<crane::grpc::ResourceInNode> GetTaskResourceInNode(
task_id_t task_id);

static std::vector<EnvPair> GetResourceEnvListByResInNode(
const crane::grpc::ResourceInNode &res_in_node);

std::vector<EnvPair> GetResourceEnvListOfTask(task_id_t task_id);

private:
Expand Down
26 changes: 20 additions & 6 deletions src/Craned/TaskManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,10 @@
#include <fcntl.h>
#include <google/protobuf/io/zero_copy_stream_impl.h>
#include <google/protobuf/util/delimited_message_util.h>
#include <sys/stat.h>

#include "CforedClient.h"
#include "CgroupManager.h"
#include "crane/OS.h"
#include "crane/String.h"
#include "protos/CraneSubprocess.pb.h"

namespace Craned {
Expand All @@ -38,7 +37,7 @@ bool TaskInstance::IsCalloc() const {
crane::grpc::Calloc;
}

std::vector<EnvPair> TaskInstance::GetEnvList() const {
std::vector<EnvPair> TaskInstance::GetTaskEnvList() const {
std::vector<EnvPair> env_vec;
for (auto& [name, value] : this->task.env()) {
env_vec.emplace_back(name, value);
Expand Down Expand Up @@ -562,6 +561,21 @@ CraneErr TaskManager::SpawnProcessInInstance_(
int io_in_sock_pair[2]; // Socket pair for forwarding IO of crun tasks.
int io_out_sock_pair[2]; // Socket pair for forwarding IO of crun tasks.

// The ResourceInNode structure should be copied here for being accessed in
// the child process.
// Note that CgroupManager acquires a lock for this.
// If the lock is held in the parent process during fork, the forked thread in
// the child proc will block forever.
// That's why we should copy it here and the child proc should not hold any
// lock.
std::optional<crane::grpc::ResourceInNode> res_in_node =
g_cg_mgr->GetTaskResourceInNode(instance->task.task_id());
if (!res_in_node.has_value()) {
CRANE_ERROR("Failed to get resource info for task #{}",
instance->task.task_id());
return CraneErr::kCgroupError;
}

if (socketpair(AF_UNIX, SOCK_STREAM, 0, ctrl_sock_pair) != 0) {
CRANE_ERROR("Failed to create socket pair: {}", strerror(errno));
return CraneErr::kSystemErr;
Expand Down Expand Up @@ -854,9 +868,9 @@ CraneErr TaskManager::SpawnProcessInInstance_(
if (instance->task.type() == crane::grpc::Batch) close(0);
util::os::CloseFdFrom(3);

std::vector<EnvPair> task_env_vec = instance->GetEnvList();
std::vector<EnvPair> task_env_vec = instance->GetTaskEnvList();
std::vector<EnvPair> res_env_vec =
g_cg_mgr->GetResourceEnvListOfTask(instance->task.task_id());
CgroupManager::GetResourceEnvListByResInNode(res_in_node.value());

if (clearenv()) {
fmt::print("clearenv() failed!\n");
Expand Down Expand Up @@ -1168,7 +1182,7 @@ void TaskManager::EvGrpcQueryTaskEnvironmentVariableCb_(int efd, short events,
auto& instance = task_iter->second;

std::vector<std::pair<std::string, std::string>> env_opt;
for (const auto& [name, value] : instance->GetEnvList()) {
for (const auto& [name, value] : instance->GetTaskEnvList()) {
env_opt.emplace_back(name, value);
}
elem.env_prom.set_value(env_opt);
Expand Down
2 changes: 1 addition & 1 deletion src/Craned/TaskManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ struct TaskInstance {

bool IsCrun() const;
bool IsCalloc() const;
std::vector<EnvPair> GetEnvList() const;
std::vector<EnvPair> GetTaskEnvList() const;

crane::grpc::TaskToD task;

Expand Down

0 comments on commit 8a21a72

Please sign in to comment.