Skip to content

Commit

Permalink
Refactor: Rename private variables
Browse files Browse the repository at this point in the history
Signed-off-by: Li Junlin <[email protected]>
  • Loading branch information
L-Xiafeng committed Nov 17, 2024
1 parent ec93109 commit d7f7d82
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 24 deletions.
40 changes: 21 additions & 19 deletions src/Craned/TaskManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,17 +98,17 @@ TaskManager::TaskManager() {
// Only called once. Guaranteed by singleton pattern.
m_instance_ptr_ = this;

m_uvw_loop = uvw::loop::create();
m_uvw_loop_ = uvw::loop::create();

m_sigchld_handle_ = m_uvw_loop->resource<uvw::signal_handle>();
m_sigchld_handle_ = m_uvw_loop_->resource<uvw::signal_handle>();
m_sigchld_handle_->on<uvw::signal_event>(
[this](const uvw::signal_event&, uvw::signal_handle&) { SigchldCb_(); });

if (m_sigchld_handle_->start(SIGCLD) != 0) {
CRANE_ERROR("Failed to start the SIGCLD handle");
}

m_sigint_handle_ = m_uvw_loop->resource<uvw::signal_handle>();
m_sigint_handle_ = m_uvw_loop_->resource<uvw::signal_handle>();
m_sigint_handle_->on<uvw::signal_event>(
[this](const uvw::signal_event&, uvw::signal_handle&) { SigintCb_(); });
if (m_sigint_handle_->start(SIGINT) != 0) {
Expand All @@ -117,66 +117,68 @@ TaskManager::TaskManager() {

// gRPC: QueryTaskIdFromPid
m_query_task_id_from_pid_async_handle_ =
m_uvw_loop->resource<uvw::async_handle>();
m_uvw_loop_->resource<uvw::async_handle>();
m_query_task_id_from_pid_async_handle_->on<uvw::async_event>(
[this](const uvw::async_event&, uvw::async_handle&) {
CleanGrpcQueryTaskIdFromPidQueueCb_();
});

// gRPC: QueryTaskEnvironmentVariable
m_query_task_environment_variables_async_handle_ =
m_uvw_loop->resource<uvw::async_handle>();
m_uvw_loop_->resource<uvw::async_handle>();
m_query_task_environment_variables_async_handle_->on<uvw::async_event>(
[this](const uvw::async_event&, uvw::async_handle&) {
CleanGrpcQueryTaskEnvironmentVariableQueueCb_();
});

// gRPC Execute Task Event
m_grpc_execute_task_async_handle_ = m_uvw_loop->resource<uvw::async_handle>();
m_grpc_execute_task_async_handle_ =
m_uvw_loop_->resource<uvw::async_handle>();
m_grpc_execute_task_async_handle_->on<uvw::async_event>(
[this](const uvw::async_event&, uvw::async_handle&) {
CleanGrpcExecuteTaskQueueCb_();
});

m_process_sigchld_async_handle_ = m_uvw_loop->resource<uvw::async_handle>();
m_process_sigchld_async_handle_ = m_uvw_loop_->resource<uvw::async_handle>();
m_process_sigchld_async_handle_->on<uvw::async_event>(
[this](const uvw::async_event&, uvw::async_handle&) {
CleanSigchldQueueCb_();
});

// Task Status Change Event
m_task_status_change_async_handle_ =
m_uvw_loop->resource<uvw::async_handle>();
m_uvw_loop_->resource<uvw::async_handle>();
m_task_status_change_async_handle_->on<uvw::async_event>(
[this](const uvw::async_event&, uvw::async_handle&) {
CleanTaskStatusChangeQueueCb_();
});

m_change_task_time_limit_async_handle_ =
m_uvw_loop->resource<uvw::async_handle>();
m_uvw_loop_->resource<uvw::async_handle>();
m_change_task_time_limit_async_handle_->on<uvw::async_event>(
[this](const uvw::async_event&, uvw::async_handle&) {
CleanChangeTaskTimeLimitQueueCb_();
});

m_terminate_task_async_handle_ = m_uvw_loop->resource<uvw::async_handle>();
m_terminate_task_async_handle_ = m_uvw_loop_->resource<uvw::async_handle>();
m_terminate_task_async_handle_->on<uvw::async_event>(
[this](const uvw::async_event&, uvw::async_handle&) {
CleanTerminateTaskQueueCb_();
});

m_check_task_status_async_handle_ = m_uvw_loop->resource<uvw::async_handle>();
m_check_task_status_async_handle_ =
m_uvw_loop_->resource<uvw::async_handle>();
m_check_task_status_async_handle_->on<uvw::async_event>(
[this](const uvw::async_event&, uvw::async_handle&) {
CleanCheckTaskStatusQueueCb_();
});

m_uvw_thread = std::thread([this]() {
m_uvw_thread_ = std::thread([this]() {
util::SetCurrentThreadName("TaskMgrLoopThr");
auto idle_handle = m_uvw_loop->resource<uvw::idle_handle>();
auto idle_handle = m_uvw_loop_->resource<uvw::idle_handle>();
idle_handle->on<uvw::idle_event>(
[this](const uvw::idle_event&, uvw::idle_handle& h) {
if (m_task_cleared) {
if (m_task_cleared_) {
h.parent().walk([](auto&& h) { h.close(); });
h.parent().stop();
}
Expand All @@ -185,12 +187,12 @@ TaskManager::TaskManager() {
if (idle_handle->start() != 0) {
CRANE_ERROR("Failed to start the idle event in TaskManager loop.");
}
m_uvw_loop->run();
m_uvw_loop_->run();
});
}

TaskManager::~TaskManager() {
if (m_uvw_thread.joinable()) m_uvw_thread.join();
if (m_uvw_thread_.joinable()) m_uvw_thread_.join();
}

const TaskInstance* TaskManager::FindInstanceByTaskId_(uint32_t task_id) {
Expand Down Expand Up @@ -339,7 +341,7 @@ void TaskManager::CleanSigchldQueueCb_() {

auto* sigchld_info_raw_ptr = sigchld_info.release();
sigchld_info_raw_ptr->resend_timer =
m_uvw_loop->resource<uvw::timer_handle>();
m_uvw_loop_->resource<uvw::timer_handle>();
sigchld_info_raw_ptr->resend_timer->on<uvw::timer_event>(
[this, sigchld_info_raw_ptr](const uvw::timer_event&,
uvw::timer_handle&) {
Expand Down Expand Up @@ -473,11 +475,11 @@ void TaskManager::SigintCb_() {
void TaskManager::ActivateShutdownAsync_() {
CRANE_TRACE("Triggering exit event...");
CRANE_ASSERT(m_is_ending_now_ == true);
m_task_cleared = true;
m_task_cleared_ = true;
}

void TaskManager::Wait() {
if (m_uvw_thread.joinable()) m_uvw_thread.join();
if (m_uvw_thread_.joinable()) m_uvw_thread_.join();
}

CraneErr TaskManager::KillProcessInstance_(const ProcessInstance* proc,
Expand Down
10 changes: 5 additions & 5 deletions src/Craned/TaskManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ class TaskManager {

template <typename Duration>
void AddTerminationTimer_(TaskInstance* instance, Duration duration) {
auto termination_handel = m_uvw_loop->resource<uvw::timer_handle>();
auto termination_handel = m_uvw_loop_->resource<uvw::timer_handle>();
termination_handel->on<uvw::timer_event>(
[this, task_id = instance->task.task_id()](const uvw::timer_event&,
uvw::timer_handle& h) {
Expand All @@ -320,7 +320,7 @@ class TaskManager {
}

void AddTerminationTimer_(TaskInstance* instance, int64_t secs) {
auto termination_handel = m_uvw_loop->resource<uvw::timer_handle>();
auto termination_handel = m_uvw_loop_->resource<uvw::timer_handle>();
termination_handel->on<uvw::timer_event>(
[this, task_id = instance->task.task_id()](const uvw::timer_event&,
uvw::timer_handle& h) {
Expand Down Expand Up @@ -407,7 +407,7 @@ class TaskManager {

void SigchldTimerCb_(ProcSigchldInfo* sigchld_info);

std::shared_ptr<uvw::loop> m_uvw_loop;
std::shared_ptr<uvw::loop> m_uvw_loop_;

std::shared_ptr<uvw::signal_handle> m_sigchld_handle_;

Expand Down Expand Up @@ -451,9 +451,9 @@ class TaskManager {
std::atomic_bool m_is_ending_now_{false};

// After m_is_ending_now_ set to true, when all task are cleared, we can exit.
std::atomic_bool m_task_cleared{false};
std::atomic_bool m_task_cleared_{false};

std::thread m_uvw_thread;
std::thread m_uvw_thread_;

static inline TaskManager* m_instance_ptr_;
};
Expand Down

0 comments on commit d7f7d82

Please sign in to comment.