Skip to content

Commit 510f483

Browse files
authored
Move C functions to pybind with GIL release
* Revert "Move job_queue_node_refresh_status back to cwrap" This reverts commit 2a807bf. * Try to release the GIL * Refactor refresh function to store confirmed running state * Move job_node functions to pybind with GIL release Release GIL in submit and kill functions
1 parent 1e5e989 commit 510f483

File tree

3 files changed

+127
-133
lines changed

3 files changed

+127
-133
lines changed

src/clib/lib/include/ert/job_queue/job_node.hpp

+2-10
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ struct job_queue_node_struct {
4343
/** The commandline arguments. */
4444
char **argv;
4545
int queue_index = 0;
46+
bool confirmed_running = false;
4647

4748
std::optional<std::string> fail_message{};
4849

@@ -61,8 +62,6 @@ struct job_queue_node_struct {
6162
typedef bool(job_callback_ftype)(void *);
6263
typedef struct job_queue_node_struct job_queue_node_type;
6364

64-
extern "C" PY_USED submit_status_type job_queue_node_submit_simple(
65-
job_queue_node_type *node, queue_driver_type *driver);
6665
void job_queue_node_fscanf_EXIT(job_queue_node_type *node);
6766
void job_queue_node_free_data(job_queue_node_type *node);
6867

@@ -72,21 +71,14 @@ job_queue_node_alloc(const char *job_name, const char *run_path,
7271
const stringlist_type *arguments, int num_cpu,
7372
const char *status_file, const char *exit_file);
7473

75-
extern "C" PY_USED bool job_queue_node_kill_simple(job_queue_node_type *node,
76-
queue_driver_type *driver);
7774
extern "C" void job_queue_node_free(job_queue_node_type *node);
7875
extern "C" job_status_type
7976
job_queue_node_get_status(const job_queue_node_type *node);
80-
extern "C" PY_USED job_status_type job_queue_node_refresh_status(
81-
job_queue_node_type *node, queue_driver_type *driver);
82-
extern "C" int
83-
job_queue_node_get_submit_attempt(const job_queue_node_type *node);
8477

8578
int job_queue_node_get_queue_index(const job_queue_node_type *node);
8679
void job_queue_node_set_queue_index(job_queue_node_type *node, int queue_index);
8780

8881
extern "C" void job_queue_node_set_status(job_queue_node_type *node,
8982
job_status_type new_status);
90-
extern "C" const char *
91-
job_queue_node_get_failure_message(job_queue_node_type *node);
83+
9284
#endif

src/clib/lib/job_queue/job_node.cpp

+112-102
Original file line numberDiff line numberDiff line change
@@ -142,10 +142,6 @@ job_status_type job_queue_node_get_status(const job_queue_node_type *node) {
142142
return node->job_status;
143143
}
144144

145-
int job_queue_node_get_submit_attempt(const job_queue_node_type *node) {
146-
return node->submit_attempt;
147-
}
148-
149145
job_queue_node_type *job_queue_node_alloc(const char *job_name,
150146
const char *run_path,
151147
const char *run_cmd, int argc,
@@ -196,116 +192,130 @@ void job_queue_node_set_status(job_queue_node_type *node,
196192
// which are registered in the state JOB_QUEUE_RUNNING.
197193
if (new_status == JOB_QUEUE_WAITING || new_status == JOB_QUEUE_RUNNING)
198194
node->sim_start = time(NULL);
195+
196+
if (!(new_status & JOB_QUEUE_COMPLETE_STATUS))
197+
return;
199198
}
200199

201-
submit_status_type job_queue_node_submit_simple(job_queue_node_type *node,
202-
queue_driver_type *driver) {
203-
submit_status_type submit_status;
204-
pthread_mutex_lock(&node->data_mutex);
205-
job_queue_node_set_status(node, JOB_QUEUE_SUBMITTED);
206-
void *job_data = queue_driver_submit_job(
207-
driver, node->run_cmd, node->num_cpu, node->run_path, node->job_name,
208-
node->argc, (const char **)node->argv);
209-
210-
if (job_data == NULL) {
211-
// In this case the status of the job itself will be
212-
// unmodified; i.e. it will still be WAITING, and a new attempt
213-
// to submit it will be performed in the next round.
214-
submit_status = SUBMIT_DRIVER_FAIL;
215-
logger->warning("Failed to submit job {} (attempt {})", node->job_name,
216-
node->submit_attempt);
217-
pthread_mutex_unlock(&node->data_mutex);
218-
return submit_status;
219-
}
200+
ERT_CLIB_SUBMODULE("queue", m) {
201+
using namespace py::literals;
202+
m.def("_refresh_status", [](Cwrap<job_queue_node_type> node,
203+
Cwrap<queue_driver_type> driver) {
204+
// release the GIL
205+
py::gil_scoped_release release;
220206

221-
logger->info("Submitted job {} (attempt {})", node->job_name,
222-
node->submit_attempt);
223-
224-
node->job_data = job_data;
225-
node->submit_attempt++;
226-
// The status JOB_QUEUE_SUBMITTED is internal, and not exported anywhere.
227-
// The job_queue_update_status() will update this to PENDING or RUNNING at
228-
// the next call. The important difference between SUBMITTED and WAITING is
229-
// that SUBMITTED have job_data != NULL and the job_queue_node free
230-
// function must be called on it.
231-
submit_status = SUBMIT_OK;
232-
job_queue_node_set_status(node, JOB_QUEUE_SUBMITTED);
233-
pthread_mutex_unlock(&node->data_mutex);
234-
return submit_status;
235-
}
207+
pthread_mutex_lock(&node->data_mutex);
208+
job_status_type current_status = job_queue_node_get_status(node);
236209

237-
bool job_queue_node_kill_simple(job_queue_node_type *node,
238-
queue_driver_type *driver) {
239-
bool result = false;
240-
pthread_mutex_lock(&node->data_mutex);
241-
job_status_type current_status = job_queue_node_get_status(node);
242-
if (current_status & JOB_QUEUE_CAN_KILL) {
243-
// If the job is killed before it is even started no driver specific
244-
// job data has been assigned; we therefore must check the
245-
// node->job_data pointer before entering.
246-
if (node->job_data) {
247-
queue_driver_kill_job(driver, node->job_data);
248-
queue_driver_free_job(driver, node->job_data);
249-
node->job_data = NULL;
210+
if (!node->job_data) {
211+
pthread_mutex_unlock(&node->data_mutex);
212+
return std::make_pair<int, std::optional<std::string>>(
213+
int(current_status), std::nullopt);
250214
}
251-
job_queue_node_set_status(node, JOB_QUEUE_IS_KILLED);
252-
logger->info("job {} set to killed", node->job_name);
253-
result = true;
254-
} else {
255-
logger->warning("node_kill called but cannot kill {}", node->job_name);
256-
}
257-
pthread_mutex_unlock(&node->data_mutex);
258-
return result;
259-
}
260215

261-
const char *job_queue_node_get_failure_message(job_queue_node_type *node) {
262-
if (node->fail_message.has_value())
263-
return node->fail_message->c_str();
264-
else
265-
return "";
266-
}
216+
std::optional<std::string> error_msg = std::nullopt;
267217

268-
job_status_type job_queue_node_refresh_status(job_queue_node_type *node,
269-
queue_driver_type *driver) {
270-
pthread_mutex_lock(&node->data_mutex);
271-
job_status_type current_status = job_queue_node_get_status(node);
218+
if (current_status & JOB_QUEUE_RUNNING && !node->confirmed_running) {
219+
node->confirmed_running =
220+
!node->status_file || fs::exists(node->status_file);
272221

273-
if (!node->job_data) {
274-
pthread_mutex_unlock(&node->data_mutex);
275-
return current_status;
276-
}
222+
if (!node->confirmed_running) {
223+
if ((time(nullptr) - node->sim_start) >= MAX_CONFIRMED_WAIT) {
224+
error_msg = fmt::format(
225+
"max_confirm_wait ({}) has passed since sim_start"
226+
"without success; {} is assumed dead (attempt {})",
227+
MAX_CONFIRMED_WAIT, node->job_name,
228+
node->submit_attempt);
229+
logger->info(error_msg.value());
230+
job_queue_node_set_status(node,
231+
JOB_QUEUE_DO_KILL_NODE_FAILURE);
232+
current_status = JOB_QUEUE_DO_KILL_NODE_FAILURE;
233+
}
234+
}
235+
}
236+
237+
if (current_status & JOB_QUEUE_CAN_UPDATE_STATUS) {
238+
job_status_type new_status =
239+
queue_driver_get_status(driver, node->job_data);
240+
241+
if (new_status == JOB_QUEUE_EXIT)
242+
job_queue_node_fscanf_EXIT(node);
277243

278-
std::optional<std::string> msg = std::nullopt;
279-
280-
if ((current_status & JOB_QUEUE_RUNNING) &&
281-
(node->status_file && !(fs::exists(node->status_file)))) {
282-
// it's running, but not confirmed running.
283-
time_t runtime = time(nullptr) - node->sim_start;
284-
if (runtime >= MAX_CONFIRMED_WAIT) {
285-
std::string error_msg = fmt::format(
286-
"max_confirm_wait ({}) has passed since sim_start"
287-
"without success; {} is assumed dead (attempt {})",
288-
MAX_CONFIRMED_WAIT, node->job_name, node->submit_attempt);
289-
logger->info(error_msg);
290-
msg = error_msg;
291-
job_status_type new_status = JOB_QUEUE_DO_KILL_NODE_FAILURE;
292244
job_queue_node_set_status(node, new_status);
245+
current_status = new_status;
293246
}
294-
}
295247

296-
current_status = job_queue_node_get_status(node);
297-
if (current_status & JOB_QUEUE_CAN_UPDATE_STATUS) {
298-
job_status_type new_status =
299-
queue_driver_get_status(driver, node->job_data);
300-
if (new_status == JOB_QUEUE_EXIT)
301-
job_queue_node_fscanf_EXIT(node);
302-
job_queue_node_set_status(node, new_status);
303-
current_status = job_queue_node_get_status(node);
304-
}
248+
if (node->fail_message.has_value() and !error_msg.has_value())
249+
error_msg = node->fail_message;
305250

306-
if (msg.has_value())
307-
node->fail_message = std::move(msg);
251+
pthread_mutex_unlock(&node->data_mutex);
252+
return std::make_pair<int, std::optional<std::string>>(
253+
int(current_status), std::move(error_msg));
254+
});
255+
256+
m.def("_submit", [](Cwrap<job_queue_node_type> node,
257+
Cwrap<queue_driver_type> driver) {
258+
// release the GIL
259+
py::gil_scoped_release release;
260+
261+
pthread_mutex_lock(&node->data_mutex);
262+
job_queue_node_set_status(node, JOB_QUEUE_SUBMITTED);
263+
void *job_data = queue_driver_submit_job(
264+
driver, node->run_cmd, node->num_cpu, node->run_path,
265+
node->job_name, node->argc, (const char **)node->argv);
266+
267+
if (job_data == nullptr) {
268+
// In this case the status of the job itself will be
269+
// unmodified; i.e. it will still be WAITING, and a new attempt
270+
// to submit it will be performed in the next round.
271+
logger->warning("Failed to submit job {} (attempt {})",
272+
node->job_name, node->submit_attempt);
273+
pthread_mutex_unlock(&node->data_mutex);
274+
return static_cast<int>(SUBMIT_DRIVER_FAIL);
275+
}
308276

309-
pthread_mutex_unlock(&node->data_mutex);
310-
return current_status;
277+
logger->info("Submitted job {} (attempt {})", node->job_name,
278+
node->submit_attempt);
279+
280+
node->job_data = job_data;
281+
node->submit_attempt++;
282+
// The status JOB_QUEUE_SUBMITTED is internal, and not exported anywhere.
283+
// The job_queue_update_status() will update this to PENDING or RUNNING at
284+
// the next call. The important difference between SUBMITTED and WAITING is
285+
// that SUBMITTED have job_data != NULL and the job_queue_node free
286+
// function must be called on it.
287+
job_queue_node_set_status(node, JOB_QUEUE_SUBMITTED);
288+
pthread_mutex_unlock(&node->data_mutex);
289+
return static_cast<int>(SUBMIT_OK);
290+
});
291+
m.def("_kill",
292+
[](Cwrap<job_queue_node_type> node, Cwrap<queue_driver_type> driver) {
293+
// release the GIL
294+
py::gil_scoped_release release;
295+
296+
bool result = false;
297+
pthread_mutex_lock(&node->data_mutex);
298+
job_status_type current_status = job_queue_node_get_status(node);
299+
if (current_status & JOB_QUEUE_CAN_KILL) {
300+
// If the job is killed before it is even started no driver specific
301+
// job data has been assigned; we therefore must check the
302+
// node->job_data pointer before entering.
303+
if (node->job_data) {
304+
queue_driver_kill_job(driver, node->job_data);
305+
queue_driver_free_job(driver, node->job_data);
306+
node->job_data = NULL;
307+
}
308+
job_queue_node_set_status(node, JOB_QUEUE_IS_KILLED);
309+
logger->info("job {} set to killed", node->job_name);
310+
result = true;
311+
} else {
312+
logger->warning("node_kill called but cannot kill {}",
313+
node->job_name);
314+
}
315+
pthread_mutex_unlock(&node->data_mutex);
316+
return result;
317+
});
318+
319+
m.def("_get_submit_attempt",
320+
[](Cwrap<job_queue_node_type> node) { return node->submit_attempt; });
311321
}

src/ert/job_queue/job_queue_node.py

+13-21
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,12 @@
99
from cwrap import BaseCClass
1010
from ecl.util.util import StringList
1111

12+
from ert._clib.queue import ( # pylint: disable=import-error
13+
_get_submit_attempt,
14+
_kill,
15+
_refresh_status,
16+
_submit,
17+
)
1218
from ert.callbacks import forward_model_ok
1319
from ert.load_status import LoadStatus
1420

@@ -70,26 +76,12 @@ class JobQueueNode(BaseCClass): # type: ignore
7076
bind=False,
7177
)
7278
_free = ResPrototype("void job_queue_node_free(job_queue_node)")
73-
_submit = ResPrototype(
74-
"job_submit_status_type_enum job_queue_node_submit_simple(job_queue_node, driver)" # noqa
75-
)
76-
_run_kill = ResPrototype("bool job_queue_node_kill_simple(job_queue_node, driver)")
77-
7879
_get_status = ResPrototype(
7980
"job_status_type_enum job_queue_node_get_status(job_queue_node)"
8081
)
8182
_set_queue_status = ResPrototype(
8283
"void job_queue_node_set_status(job_queue_node, job_status_type_enum)"
8384
)
84-
_get_submit_attempt = ResPrototype(
85-
"int job_queue_node_get_submit_attempt(job_queue_node)"
86-
)
87-
_refresh_status = ResPrototype(
88-
"job_status_type_enum job_queue_node_refresh_status(job_queue_node, driver)"
89-
)
90-
_get_failure_message = ResPrototype(
91-
"char* job_queue_node_get_failure_message(job_queue_node)"
92-
)
9385

9486
# pylint: disable=too-many-arguments
9587
def __init__(
@@ -155,13 +147,13 @@ def timed_out(self) -> bool:
155147

156148
@property
157149
def submit_attempt(self) -> int:
158-
return self._get_submit_attempt()
150+
return _get_submit_attempt(self)
159151

160152
def _poll_queue_status(self, driver: "Driver") -> JobStatus:
161-
status = self._refresh_status(driver)
162-
msg = self._get_failure_message()
163-
self._status_msg = msg if msg else self._status_msg
164-
return status
153+
result, msg = _refresh_status(self, driver)
154+
if msg is not None:
155+
self._status_msg = msg
156+
return JobStatus(result)
165157

166158
@property
167159
def queue_status(self) -> JobStatus:
@@ -172,7 +164,7 @@ def queue_status(self, value: JobStatus) -> None:
172164
return self._set_queue_status(value)
173165

174166
def submit(self, driver: "Driver") -> SubmitStatus:
175-
return self._submit(driver)
167+
return SubmitStatus(_submit(self, driver))
176168

177169
def run_done_callback(self) -> Optional[LoadStatus]:
178170
callback_status, status_msg = forward_model_ok(self.run_arg)
@@ -347,7 +339,7 @@ def _transition_status(
347339
self.thread_status = thread_status
348340

349341
def _kill(self, driver: "Driver") -> None:
350-
self._run_kill(driver)
342+
_kill(self, driver)
351343
self._tried_killing += 1
352344

353345
def run(self, driver: "Driver", pool_sema: Semaphore, max_submit: int = 2) -> None:

0 commit comments

Comments
 (0)