Skip to content

Commit 562bac6

Browse files
committed
Revert "Revert Convert some job_node functions to pybind"
This reverts commit 6b27f63.
1 parent 7b37647 commit 562bac6

File tree

3 files changed

+69
-82
lines changed

3 files changed

+69
-82
lines changed

src/clib/lib/include/ert/job_queue/job_node.hpp

-6
Original file line numberDiff line numberDiff line change
@@ -62,8 +62,6 @@ struct job_queue_node_struct {
6262
typedef bool(job_callback_ftype)(void *);
6363
typedef struct job_queue_node_struct job_queue_node_type;
6464

65-
extern "C" PY_USED submit_status_type job_queue_node_submit_simple(
66-
job_queue_node_type *node, queue_driver_type *driver);
6765
void job_queue_node_fscanf_EXIT(job_queue_node_type *node);
6866
void job_queue_node_free_data(job_queue_node_type *node);
6967

@@ -73,13 +71,9 @@ job_queue_node_alloc(const char *job_name, const char *run_path,
7371
const stringlist_type *arguments, int num_cpu,
7472
const char *status_file, const char *exit_file);
7573

76-
extern "C" PY_USED bool job_queue_node_kill_simple(job_queue_node_type *node,
77-
queue_driver_type *driver);
7874
extern "C" void job_queue_node_free(job_queue_node_type *node);
7975
extern "C" job_status_type
8076
job_queue_node_get_status(const job_queue_node_type *node);
81-
extern "C" int
82-
job_queue_node_get_submit_attempt(const job_queue_node_type *node);
8377

8478
int job_queue_node_get_queue_index(const job_queue_node_type *node);
8579
void job_queue_node_set_queue_index(job_queue_node_type *node, int queue_index);

src/clib/lib/job_queue/job_node.cpp

+60-64
Original file line numberDiff line numberDiff line change
@@ -142,10 +142,6 @@ job_status_type job_queue_node_get_status(const job_queue_node_type *node) {
142142
return node->job_status;
143143
}
144144

145-
int job_queue_node_get_submit_attempt(const job_queue_node_type *node) {
146-
return node->submit_attempt;
147-
}
148-
149145
job_queue_node_type *job_queue_node_alloc(const char *job_name,
150146
const char *run_path,
151147
const char *run_cmd, int argc,
@@ -201,66 +197,6 @@ void job_queue_node_set_status(job_queue_node_type *node,
201197
return;
202198
}
203199

204-
submit_status_type job_queue_node_submit_simple(job_queue_node_type *node,
205-
queue_driver_type *driver) {
206-
submit_status_type submit_status;
207-
pthread_mutex_lock(&node->data_mutex);
208-
job_queue_node_set_status(node, JOB_QUEUE_SUBMITTED);
209-
void *job_data = queue_driver_submit_job(
210-
driver, node->run_cmd, node->num_cpu, node->run_path, node->job_name,
211-
node->argc, (const char **)node->argv);
212-
213-
if (job_data == NULL) {
214-
// In this case the status of the job itself will be
215-
// unmodified; i.e. it will still be WAITING, and a new attempt
216-
// to submit it will be performed in the next round.
217-
submit_status = SUBMIT_DRIVER_FAIL;
218-
logger->warning("Failed to submit job {} (attempt {})", node->job_name,
219-
node->submit_attempt);
220-
pthread_mutex_unlock(&node->data_mutex);
221-
return submit_status;
222-
}
223-
224-
logger->info("Submitted job {} (attempt {})", node->job_name,
225-
node->submit_attempt);
226-
227-
node->job_data = job_data;
228-
node->submit_attempt++;
229-
// The status JOB_QUEUE_SUBMITTED is internal, and not exported anywhere.
230-
// The job_queue_update_status() will update this to PENDING or RUNNING at
231-
// the next call. The important difference between SUBMITTED and WAITING is
232-
// that SUBMITTED have job_data != NULL and the job_queue_node free
233-
// function must be called on it.
234-
submit_status = SUBMIT_OK;
235-
job_queue_node_set_status(node, JOB_QUEUE_SUBMITTED);
236-
pthread_mutex_unlock(&node->data_mutex);
237-
return submit_status;
238-
}
239-
240-
bool job_queue_node_kill_simple(job_queue_node_type *node,
241-
queue_driver_type *driver) {
242-
bool result = false;
243-
pthread_mutex_lock(&node->data_mutex);
244-
job_status_type current_status = job_queue_node_get_status(node);
245-
if (current_status & JOB_QUEUE_CAN_KILL) {
246-
// If the job is killed before it is even started no driver specific
247-
// job data has been assigned; we therefore must check the
248-
// node->job_data pointer before entering.
249-
if (node->job_data) {
250-
queue_driver_kill_job(driver, node->job_data);
251-
queue_driver_free_job(driver, node->job_data);
252-
node->job_data = NULL;
253-
}
254-
job_queue_node_set_status(node, JOB_QUEUE_IS_KILLED);
255-
logger->info("job {} set to killed", node->job_name);
256-
result = true;
257-
} else {
258-
logger->warning("node_kill called but cannot kill {}", node->job_name);
259-
}
260-
pthread_mutex_unlock(&node->data_mutex);
261-
return result;
262-
}
263-
264200
ERT_CLIB_SUBMODULE("queue", m) {
265201
using namespace py::literals;
266202
m.def("_refresh_status", [](Cwrap<job_queue_node_type> node,
@@ -316,4 +252,64 @@ ERT_CLIB_SUBMODULE("queue", m) {
316252
return std::make_pair<int, std::optional<std::string>>(
317253
int(current_status), std::move(error_msg));
318254
});
255+
256+
m.def("_submit", [](Cwrap<job_queue_node_type> node,
257+
Cwrap<queue_driver_type> driver) {
258+
pthread_mutex_lock(&node->data_mutex);
259+
job_queue_node_set_status(node, JOB_QUEUE_SUBMITTED);
260+
void *job_data = queue_driver_submit_job(
261+
driver, node->run_cmd, node->num_cpu, node->run_path,
262+
node->job_name, node->argc, (const char **)node->argv);
263+
264+
if (job_data == nullptr) {
265+
// In this case the status of the job itself will be
266+
// unmodified; i.e. it will still be WAITING, and a new attempt
267+
// to submit it will be performed in the next round.
268+
logger->warning("Failed to submit job {} (attempt {})",
269+
node->job_name, node->submit_attempt);
270+
pthread_mutex_unlock(&node->data_mutex);
271+
return static_cast<int>(SUBMIT_DRIVER_FAIL);
272+
}
273+
274+
logger->info("Submitted job {} (attempt {})", node->job_name,
275+
node->submit_attempt);
276+
277+
node->job_data = job_data;
278+
node->submit_attempt++;
279+
// The status JOB_QUEUE_SUBMITTED is internal, and not exported anywhere.
280+
// The job_queue_update_status() will update this to PENDING or RUNNING at
281+
// the next call. The important difference between SUBMITTED and WAITING is
282+
// that SUBMITTED have job_data != NULL and the job_queue_node free
283+
// function must be called on it.
284+
job_queue_node_set_status(node, JOB_QUEUE_SUBMITTED);
285+
pthread_mutex_unlock(&node->data_mutex);
286+
return static_cast<int>(SUBMIT_OK);
287+
});
288+
m.def("_kill",
289+
[](Cwrap<job_queue_node_type> node, Cwrap<queue_driver_type> driver) {
290+
bool result = false;
291+
pthread_mutex_lock(&node->data_mutex);
292+
job_status_type current_status = job_queue_node_get_status(node);
293+
if (current_status & JOB_QUEUE_CAN_KILL) {
294+
// If the job is killed before it is even started no driver specific
295+
// job data has been assigned; we therefore must check the
296+
// node->job_data pointer before entering.
297+
if (node->job_data) {
298+
queue_driver_kill_job(driver, node->job_data);
299+
queue_driver_free_job(driver, node->job_data);
300+
node->job_data = NULL;
301+
}
302+
job_queue_node_set_status(node, JOB_QUEUE_IS_KILLED);
303+
logger->info("job {} set to killed", node->job_name);
304+
result = true;
305+
} else {
306+
logger->warning("node_kill called but cannot kill {}",
307+
node->job_name);
308+
}
309+
pthread_mutex_unlock(&node->data_mutex);
310+
return result;
311+
});
312+
313+
m.def("_get_submit_attempt",
314+
[](Cwrap<job_queue_node_type> node) { return node->submit_attempt; });
319315
}

src/ert/job_queue/job_queue_node.py

+9-12
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,13 @@
99
from cwrap import BaseCClass
1010
from ecl.util.util import StringList
1111

12-
from ert._clib.queue import _refresh_status # pylint: disable=import-error
1312
from ert.callbacks import forward_model_ok
13+
from ert._clib.queue import ( # pylint: disable=import-error
14+
_get_submit_attempt,
15+
_kill,
16+
_refresh_status,
17+
_submit,
18+
)
1419
from ert.load_status import LoadStatus
1520

1621
from ..realization_state import RealizationState
@@ -71,20 +76,12 @@ class JobQueueNode(BaseCClass): # type: ignore
7176
bind=False,
7277
)
7378
_free = ResPrototype("void job_queue_node_free(job_queue_node)")
74-
_submit = ResPrototype(
75-
"job_submit_status_type_enum job_queue_node_submit_simple(job_queue_node, driver)" # noqa
76-
)
77-
_run_kill = ResPrototype("bool job_queue_node_kill_simple(job_queue_node, driver)")
78-
7979
_get_status = ResPrototype(
8080
"job_status_type_enum job_queue_node_get_status(job_queue_node)"
8181
)
8282
_set_queue_status = ResPrototype(
8383
"void job_queue_node_set_status(job_queue_node, job_status_type_enum)"
8484
)
85-
_get_submit_attempt = ResPrototype(
86-
"int job_queue_node_get_submit_attempt(job_queue_node)"
87-
)
8885

8986
# pylint: disable=too-many-arguments
9087
def __init__(
@@ -150,7 +147,7 @@ def timed_out(self) -> bool:
150147

151148
@property
152149
def submit_attempt(self) -> int:
153-
return self._get_submit_attempt()
150+
return _get_submit_attempt(self)
154151

155152
def _poll_queue_status(self, driver: "Driver") -> JobStatus:
156153
result, msg = _refresh_status(self, driver)
@@ -167,7 +164,7 @@ def queue_status(self, value: JobStatus) -> None:
167164
return self._set_queue_status(value)
168165

169166
def submit(self, driver: "Driver") -> SubmitStatus:
170-
return self._submit(driver)
167+
return SubmitStatus(_submit(self, driver))
171168

172169
def run_done_callback(self) -> Optional[LoadStatus]:
173170
callback_status, status_msg = forward_model_ok(self.run_arg)
@@ -342,7 +339,7 @@ def _transition_status(
342339
self.thread_status = thread_status
343340

344341
def _kill(self, driver: "Driver") -> None:
345-
self._run_kill(driver)
342+
_kill(self, driver)
346343
self._tried_killing += 1
347344

348345
def run(self, driver: "Driver", pool_sema: Semaphore, max_submit: int = 2) -> None:

0 commit comments

Comments
 (0)