From ea2edbd4f57bf051a699b5cafea2d2b3d7c05635 Mon Sep 17 00:00:00 2001 From: Andreas Eknes Lie Date: Thu, 28 Sep 2023 12:58:40 +0200 Subject: [PATCH 1/4] Revert "Move job_queue_node_refresh_status back to cwrap" This reverts commit 2a807bf617aa2f6bce276f96709197b4c8dbf6c7. --- .../lib/include/ert/job_queue/job_node.hpp | 5 +- src/clib/lib/job_queue/job_node.cpp | 90 +++++++++---------- src/ert/job_queue/job_queue_node.py | 15 ++-- 3 files changed, 51 insertions(+), 59 deletions(-) diff --git a/src/clib/lib/include/ert/job_queue/job_node.hpp b/src/clib/lib/include/ert/job_queue/job_node.hpp index c00b8e556ea..a9c5c0e51f5 100644 --- a/src/clib/lib/include/ert/job_queue/job_node.hpp +++ b/src/clib/lib/include/ert/job_queue/job_node.hpp @@ -77,8 +77,6 @@ extern "C" PY_USED bool job_queue_node_kill_simple(job_queue_node_type *node, extern "C" void job_queue_node_free(job_queue_node_type *node); extern "C" job_status_type job_queue_node_get_status(const job_queue_node_type *node); -extern "C" PY_USED job_status_type job_queue_node_refresh_status( - job_queue_node_type *node, queue_driver_type *driver); extern "C" int job_queue_node_get_submit_attempt(const job_queue_node_type *node); @@ -87,6 +85,5 @@ void job_queue_node_set_queue_index(job_queue_node_type *node, int queue_index); extern "C" void job_queue_node_set_status(job_queue_node_type *node, job_status_type new_status); -extern "C" const char * -job_queue_node_get_failure_message(job_queue_node_type *node); + #endif diff --git a/src/clib/lib/job_queue/job_node.cpp b/src/clib/lib/job_queue/job_node.cpp index 9e6f19558d4..7c4f7753258 100644 --- a/src/clib/lib/job_queue/job_node.cpp +++ b/src/clib/lib/job_queue/job_node.cpp @@ -196,6 +196,9 @@ void job_queue_node_set_status(job_queue_node_type *node, // which are registered in the state JOB_QUEUE_RUNNING. if (new_status == JOB_QUEUE_WAITING || new_status == JOB_QUEUE_RUNNING) node->sim_start = time(NULL); + + if (!(new_status & JOB_QUEUE_COMPLETE_STATUS)) + return; } submit_status_type job_queue_node_submit_simple(job_queue_node_type *node, @@ -258,54 +261,51 @@ bool job_queue_node_kill_simple(job_queue_node_type *node, return result; } -const char *job_queue_node_get_failure_message(job_queue_node_type *node) { - if (node->fail_message.has_value()) - return node->fail_message->c_str(); - else - return ""; -} - -job_status_type job_queue_node_refresh_status(job_queue_node_type *node, - queue_driver_type *driver) { - pthread_mutex_lock(&node->data_mutex); - job_status_type current_status = job_queue_node_get_status(node); - - if (!node->job_data) { - pthread_mutex_unlock(&node->data_mutex); - return current_status; - } +ERT_CLIB_SUBMODULE("queue", m) { + using namespace py::literals; + m.def("_refresh_status", [](Cwrap node, + Cwrap driver) { + pthread_mutex_lock(&node->data_mutex); + job_status_type current_status = job_queue_node_get_status(node); + + if (!node->job_data) { + pthread_mutex_unlock(&node->data_mutex); + return std::make_pair>( + int(current_status), std::nullopt); + } - std::optional msg = std::nullopt; - - if ((current_status & JOB_QUEUE_RUNNING) && - (node->status_file && !(fs::exists(node->status_file)))) { - // it's running, but not confirmed running. - time_t runtime = time(nullptr) - node->sim_start; - if (runtime >= MAX_CONFIRMED_WAIT) { - std::string error_msg = fmt::format( - "max_confirm_wait ({}) has passed since sim_start" - "without success; {} is assumed dead (attempt {})", - MAX_CONFIRMED_WAIT, node->job_name, node->submit_attempt); - logger->info(error_msg); - msg = error_msg; - job_status_type new_status = JOB_QUEUE_DO_KILL_NODE_FAILURE; - job_queue_node_set_status(node, new_status); + std::optional msg = std::nullopt; + + if ((current_status & JOB_QUEUE_RUNNING) && + (node->status_file && !(fs::exists(node->status_file)))) { + // it's running, but not confirmed running. + time_t runtime = time(nullptr) - node->sim_start; + if (runtime >= MAX_CONFIRMED_WAIT) { + std::string error_msg = fmt::format( + "max_confirm_wait ({}) has passed since sim_start" + "without success; {} is assumed dead (attempt {})", + MAX_CONFIRMED_WAIT, node->job_name, node->submit_attempt); + logger->info(error_msg); + msg = error_msg; + job_status_type new_status = JOB_QUEUE_DO_KILL_NODE_FAILURE; + job_queue_node_set_status(node, new_status); + } } - } - current_status = job_queue_node_get_status(node); - if (current_status & JOB_QUEUE_CAN_UPDATE_STATUS) { - job_status_type new_status = - queue_driver_get_status(driver, node->job_data); - if (new_status == JOB_QUEUE_EXIT) - job_queue_node_fscanf_EXIT(node); - job_queue_node_set_status(node, new_status); current_status = job_queue_node_get_status(node); - } - - if (msg.has_value()) - node->fail_message = std::move(msg); + if (current_status & JOB_QUEUE_CAN_UPDATE_STATUS) { + job_status_type new_status = + queue_driver_get_status(driver, node->job_data); + if (new_status == JOB_QUEUE_EXIT) + job_queue_node_fscanf_EXIT(node); + job_queue_node_set_status(node, new_status); + current_status = job_queue_node_get_status(node); + } + if (node->fail_message.has_value() and !msg.has_value()) + msg = node->fail_message; - pthread_mutex_unlock(&node->data_mutex); - return current_status; + pthread_mutex_unlock(&node->data_mutex); + return std::make_pair>( + int(current_status), std::move(msg)); + }); } diff --git a/src/ert/job_queue/job_queue_node.py b/src/ert/job_queue/job_queue_node.py index 7a0dea3399a..a46050b5549 100644 --- a/src/ert/job_queue/job_queue_node.py +++ b/src/ert/job_queue/job_queue_node.py @@ -9,6 +9,7 @@ from cwrap import BaseCClass from ecl.util.util import StringList +from ert._clib.queue import _refresh_status # pylint: disable=import-error from ert.callbacks import forward_model_ok from ert.load_status import LoadStatus @@ -84,12 +85,6 @@ class JobQueueNode(BaseCClass): # type: ignore _get_submit_attempt = ResPrototype( "int job_queue_node_get_submit_attempt(job_queue_node)" ) - _refresh_status = ResPrototype( - "job_status_type_enum job_queue_node_refresh_status(job_queue_node, driver)" - ) - _get_failure_message = ResPrototype( - "char* job_queue_node_get_failure_message(job_queue_node)" - ) # pylint: disable=too-many-arguments def __init__( @@ -158,10 +153,10 @@ def submit_attempt(self) -> int: return self._get_submit_attempt() def _poll_queue_status(self, driver: "Driver") -> JobStatus: - status = self._refresh_status(driver) - msg = self._get_failure_message() - self._status_msg = msg if msg else self._status_msg - return status + result, msg = _refresh_status(self, driver) + if msg is not None: + self._status_msg = msg + return JobStatus(result) @property def queue_status(self) -> JobStatus: From daae97329758525eaf169db3cabf7b43fe430c52 Mon Sep 17 00:00:00 2001 From: Andreas Eknes Lie Date: Thu, 28 Sep 2023 13:03:28 +0200 Subject: [PATCH 2/4] Try to release the GIL --- src/clib/lib/job_queue/job_node.cpp | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/clib/lib/job_queue/job_node.cpp b/src/clib/lib/job_queue/job_node.cpp index 7c4f7753258..bbeb542cc34 100644 --- a/src/clib/lib/job_queue/job_node.cpp +++ b/src/clib/lib/job_queue/job_node.cpp @@ -265,6 +265,9 @@ ERT_CLIB_SUBMODULE("queue", m) { using namespace py::literals; m.def("_refresh_status", [](Cwrap node, Cwrap driver) { + // release the GIL + py::gil_scoped_release release; + pthread_mutex_lock(&node->data_mutex); job_status_type current_status = job_queue_node_get_status(node); From 83cc358eda6aa41e12e5b9a8c30a4ecd76747244 Mon Sep 17 00:00:00 2001 From: Andreas Eknes Lie Date: Thu, 28 Sep 2023 14:14:13 +0200 Subject: [PATCH 3/4] Refactor refresh function to store confirmed running state --- .../lib/include/ert/job_queue/job_node.hpp | 1 + src/clib/lib/job_queue/job_node.cpp | 45 ++++++++++--------- 2 files changed, 26 insertions(+), 20 deletions(-) diff --git a/src/clib/lib/include/ert/job_queue/job_node.hpp b/src/clib/lib/include/ert/job_queue/job_node.hpp index a9c5c0e51f5..1f500d52bc7 100644 --- a/src/clib/lib/include/ert/job_queue/job_node.hpp +++ b/src/clib/lib/include/ert/job_queue/job_node.hpp @@ -43,6 +43,7 @@ struct job_queue_node_struct { /** The commandline arguments. */ char **argv; int queue_index = 0; + bool confirmed_running = false; std::optional fail_message{}; diff --git a/src/clib/lib/job_queue/job_node.cpp b/src/clib/lib/job_queue/job_node.cpp index bbeb542cc34..a6ed16c035d 100644 --- a/src/clib/lib/job_queue/job_node.cpp +++ b/src/clib/lib/job_queue/job_node.cpp @@ -277,38 +277,43 @@ ERT_CLIB_SUBMODULE("queue", m) { int(current_status), std::nullopt); } - std::optional msg = std::nullopt; - - if ((current_status & JOB_QUEUE_RUNNING) && - (node->status_file && !(fs::exists(node->status_file)))) { - // it's running, but not confirmed running. - time_t runtime = time(nullptr) - node->sim_start; - if (runtime >= MAX_CONFIRMED_WAIT) { - std::string error_msg = fmt::format( - "max_confirm_wait ({}) has passed since sim_start" - "without success; {} is assumed dead (attempt {})", - MAX_CONFIRMED_WAIT, node->job_name, node->submit_attempt); - logger->info(error_msg); - msg = error_msg; - job_status_type new_status = JOB_QUEUE_DO_KILL_NODE_FAILURE; - job_queue_node_set_status(node, new_status); + std::optional error_msg = std::nullopt; + + if (current_status & JOB_QUEUE_RUNNING && !node->confirmed_running) { + node->confirmed_running = + !node->status_file || fs::exists(node->status_file); + + if (!node->confirmed_running) { + if ((time(nullptr) - node->sim_start) >= MAX_CONFIRMED_WAIT) { + error_msg = fmt::format( + "max_confirm_wait ({}) has passed since sim_start" + "without success; {} is assumed dead (attempt {})", + MAX_CONFIRMED_WAIT, node->job_name, + node->submit_attempt); + logger->info(error_msg.value()); + job_queue_node_set_status(node, + JOB_QUEUE_DO_KILL_NODE_FAILURE); + current_status = JOB_QUEUE_DO_KILL_NODE_FAILURE; + } } } - current_status = job_queue_node_get_status(node); if (current_status & JOB_QUEUE_CAN_UPDATE_STATUS) { job_status_type new_status = queue_driver_get_status(driver, node->job_data); + if (new_status == JOB_QUEUE_EXIT) job_queue_node_fscanf_EXIT(node); + job_queue_node_set_status(node, new_status); - current_status = job_queue_node_get_status(node); + current_status = new_status; } - if (node->fail_message.has_value() and !msg.has_value()) - msg = node->fail_message; + + if (node->fail_message.has_value() and !error_msg.has_value()) + error_msg = node->fail_message; pthread_mutex_unlock(&node->data_mutex); return std::make_pair>( - int(current_status), std::move(msg)); + int(current_status), std::move(error_msg)); }); } From b373edd8256ec30501e5ee626c1c46cdb2faed4f Mon Sep 17 00:00:00 2001 From: Andreas Eknes Lie Date: Thu, 28 Sep 2023 15:30:46 +0200 Subject: [PATCH 4/4] Move job_node functions to pybind with GIL release Release GIL in submit and kill functions --- .../lib/include/ert/job_queue/job_node.hpp | 6 - src/clib/lib/job_queue/job_node.cpp | 130 +++++++++--------- src/ert/job_queue/job_queue_node.py | 21 ++- 3 files changed, 75 insertions(+), 82 deletions(-) diff --git a/src/clib/lib/include/ert/job_queue/job_node.hpp b/src/clib/lib/include/ert/job_queue/job_node.hpp index 1f500d52bc7..b3e5053e400 100644 --- a/src/clib/lib/include/ert/job_queue/job_node.hpp +++ b/src/clib/lib/include/ert/job_queue/job_node.hpp @@ -62,8 +62,6 @@ struct job_queue_node_struct { typedef bool(job_callback_ftype)(void *); typedef struct job_queue_node_struct job_queue_node_type; -extern "C" PY_USED submit_status_type job_queue_node_submit_simple( - job_queue_node_type *node, queue_driver_type *driver); void job_queue_node_fscanf_EXIT(job_queue_node_type *node); void job_queue_node_free_data(job_queue_node_type *node); @@ -73,13 +71,9 @@ job_queue_node_alloc(const char *job_name, const char *run_path, const stringlist_type *arguments, int num_cpu, const char *status_file, const char *exit_file); -extern "C" PY_USED bool job_queue_node_kill_simple(job_queue_node_type *node, - queue_driver_type *driver); extern "C" void job_queue_node_free(job_queue_node_type *node); extern "C" job_status_type job_queue_node_get_status(const job_queue_node_type *node); -extern "C" int -job_queue_node_get_submit_attempt(const job_queue_node_type *node); int job_queue_node_get_queue_index(const job_queue_node_type *node); void job_queue_node_set_queue_index(job_queue_node_type *node, int queue_index); diff --git a/src/clib/lib/job_queue/job_node.cpp b/src/clib/lib/job_queue/job_node.cpp index a6ed16c035d..102b925244d 100644 --- a/src/clib/lib/job_queue/job_node.cpp +++ b/src/clib/lib/job_queue/job_node.cpp @@ -142,10 +142,6 @@ job_status_type job_queue_node_get_status(const job_queue_node_type *node) { return node->job_status; } -int job_queue_node_get_submit_attempt(const job_queue_node_type *node) { - return node->submit_attempt; -} - job_queue_node_type *job_queue_node_alloc(const char *job_name, const char *run_path, const char *run_cmd, int argc, @@ -201,66 +197,6 @@ void job_queue_node_set_status(job_queue_node_type *node, return; } -submit_status_type job_queue_node_submit_simple(job_queue_node_type *node, - queue_driver_type *driver) { - submit_status_type submit_status; - pthread_mutex_lock(&node->data_mutex); - job_queue_node_set_status(node, JOB_QUEUE_SUBMITTED); - void *job_data = queue_driver_submit_job( - driver, node->run_cmd, node->num_cpu, node->run_path, node->job_name, - node->argc, (const char **)node->argv); - - if (job_data == NULL) { - // In this case the status of the job itself will be - // unmodified; i.e. it will still be WAITING, and a new attempt - // to submit it will be performed in the next round. - submit_status = SUBMIT_DRIVER_FAIL; - logger->warning("Failed to submit job {} (attempt {})", node->job_name, - node->submit_attempt); - pthread_mutex_unlock(&node->data_mutex); - return submit_status; - } - - logger->info("Submitted job {} (attempt {})", node->job_name, - node->submit_attempt); - - node->job_data = job_data; - node->submit_attempt++; - // The status JOB_QUEUE_SUBMITTED is internal, and not exported anywhere. - // The job_queue_update_status() will update this to PENDING or RUNNING at - // the next call. The important difference between SUBMITTED and WAITING is - // that SUBMITTED have job_data != NULL and the job_queue_node free - // function must be called on it. - submit_status = SUBMIT_OK; - job_queue_node_set_status(node, JOB_QUEUE_SUBMITTED); - pthread_mutex_unlock(&node->data_mutex); - return submit_status; -} - -bool job_queue_node_kill_simple(job_queue_node_type *node, - queue_driver_type *driver) { - bool result = false; - pthread_mutex_lock(&node->data_mutex); - job_status_type current_status = job_queue_node_get_status(node); - if (current_status & JOB_QUEUE_CAN_KILL) { - // If the job is killed before it is even started no driver specific - // job data has been assigned; we therefore must check the - // node->job_data pointer before entering. - if (node->job_data) { - queue_driver_kill_job(driver, node->job_data); - queue_driver_free_job(driver, node->job_data); - node->job_data = NULL; - } - job_queue_node_set_status(node, JOB_QUEUE_IS_KILLED); - logger->info("job {} set to killed", node->job_name); - result = true; - } else { - logger->warning("node_kill called but cannot kill {}", node->job_name); - } - pthread_mutex_unlock(&node->data_mutex); - return result; -} - ERT_CLIB_SUBMODULE("queue", m) { using namespace py::literals; m.def("_refresh_status", [](Cwrap node, @@ -316,4 +252,70 @@ ERT_CLIB_SUBMODULE("queue", m) { return std::make_pair>( int(current_status), std::move(error_msg)); }); + + m.def("_submit", [](Cwrap node, + Cwrap driver) { + // release the GIL + py::gil_scoped_release release; + + pthread_mutex_lock(&node->data_mutex); + job_queue_node_set_status(node, JOB_QUEUE_SUBMITTED); + void *job_data = queue_driver_submit_job( + driver, node->run_cmd, node->num_cpu, node->run_path, + node->job_name, node->argc, (const char **)node->argv); + + if (job_data == nullptr) { + // In this case the status of the job itself will be + // unmodified; i.e. it will still be WAITING, and a new attempt + // to submit it will be performed in the next round. + logger->warning("Failed to submit job {} (attempt {})", + node->job_name, node->submit_attempt); + pthread_mutex_unlock(&node->data_mutex); + return static_cast(SUBMIT_DRIVER_FAIL); + } + + logger->info("Submitted job {} (attempt {})", node->job_name, + node->submit_attempt); + + node->job_data = job_data; + node->submit_attempt++; + // The status JOB_QUEUE_SUBMITTED is internal, and not exported anywhere. + // The job_queue_update_status() will update this to PENDING or RUNNING at + // the next call. The important difference between SUBMITTED and WAITING is + // that SUBMITTED have job_data != NULL and the job_queue_node free + // function must be called on it. + job_queue_node_set_status(node, JOB_QUEUE_SUBMITTED); + pthread_mutex_unlock(&node->data_mutex); + return static_cast(SUBMIT_OK); + }); + m.def("_kill", + [](Cwrap node, Cwrap driver) { + // release the GIL + py::gil_scoped_release release; + + bool result = false; + pthread_mutex_lock(&node->data_mutex); + job_status_type current_status = job_queue_node_get_status(node); + if (current_status & JOB_QUEUE_CAN_KILL) { + // If the job is killed before it is even started no driver specific + // job data has been assigned; we therefore must check the + // node->job_data pointer before entering. + if (node->job_data) { + queue_driver_kill_job(driver, node->job_data); + queue_driver_free_job(driver, node->job_data); + node->job_data = NULL; + } + job_queue_node_set_status(node, JOB_QUEUE_IS_KILLED); + logger->info("job {} set to killed", node->job_name); + result = true; + } else { + logger->warning("node_kill called but cannot kill {}", + node->job_name); + } + pthread_mutex_unlock(&node->data_mutex); + return result; + }); + + m.def("_get_submit_attempt", + [](Cwrap node) { return node->submit_attempt; }); } diff --git a/src/ert/job_queue/job_queue_node.py b/src/ert/job_queue/job_queue_node.py index a46050b5549..cda6bee9c85 100644 --- a/src/ert/job_queue/job_queue_node.py +++ b/src/ert/job_queue/job_queue_node.py @@ -9,7 +9,12 @@ from cwrap import BaseCClass from ecl.util.util import StringList -from ert._clib.queue import _refresh_status # pylint: disable=import-error +from ert._clib.queue import ( # pylint: disable=import-error + _get_submit_attempt, + _kill, + _refresh_status, + _submit, +) from ert.callbacks import forward_model_ok from ert.load_status import LoadStatus @@ -71,20 +76,12 @@ class JobQueueNode(BaseCClass): # type: ignore bind=False, ) _free = ResPrototype("void job_queue_node_free(job_queue_node)") - _submit = ResPrototype( - "job_submit_status_type_enum job_queue_node_submit_simple(job_queue_node, driver)" # noqa - ) - _run_kill = ResPrototype("bool job_queue_node_kill_simple(job_queue_node, driver)") - _get_status = ResPrototype( "job_status_type_enum job_queue_node_get_status(job_queue_node)" ) _set_queue_status = ResPrototype( "void job_queue_node_set_status(job_queue_node, job_status_type_enum)" ) - _get_submit_attempt = ResPrototype( - "int job_queue_node_get_submit_attempt(job_queue_node)" - ) # pylint: disable=too-many-arguments def __init__( @@ -150,7 +147,7 @@ def timed_out(self) -> bool: @property def submit_attempt(self) -> int: - return self._get_submit_attempt() + return _get_submit_attempt(self) def _poll_queue_status(self, driver: "Driver") -> JobStatus: result, msg = _refresh_status(self, driver) @@ -167,7 +164,7 @@ def queue_status(self, value: JobStatus) -> None: return self._set_queue_status(value) def submit(self, driver: "Driver") -> SubmitStatus: - return self._submit(driver) + return SubmitStatus(_submit(self, driver)) def run_done_callback(self) -> Optional[LoadStatus]: callback_status, status_msg = forward_model_ok(self.run_arg) @@ -342,7 +339,7 @@ def _transition_status( self.thread_status = thread_status def _kill(self, driver: "Driver") -> None: - self._run_kill(driver) + _kill(self, driver) self._tried_killing += 1 def run(self, driver: "Driver", pool_sema: Semaphore, max_submit: int = 2) -> None: