Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move C functions to pybind with GIL release #6204

Merged
merged 4 commits into from
Oct 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 2 additions & 10 deletions src/clib/lib/include/ert/job_queue/job_node.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ struct job_queue_node_struct {
/** The commandline arguments. */
char **argv;
int queue_index = 0;
bool confirmed_running = false;

std::optional<std::string> fail_message{};

Expand All @@ -61,8 +62,6 @@ struct job_queue_node_struct {
typedef bool(job_callback_ftype)(void *);
typedef struct job_queue_node_struct job_queue_node_type;

extern "C" PY_USED submit_status_type job_queue_node_submit_simple(
job_queue_node_type *node, queue_driver_type *driver);
void job_queue_node_fscanf_EXIT(job_queue_node_type *node);
void job_queue_node_free_data(job_queue_node_type *node);

Expand All @@ -72,21 +71,14 @@ job_queue_node_alloc(const char *job_name, const char *run_path,
const stringlist_type *arguments, int num_cpu,
const char *status_file, const char *exit_file);

extern "C" PY_USED bool job_queue_node_kill_simple(job_queue_node_type *node,
queue_driver_type *driver);
extern "C" void job_queue_node_free(job_queue_node_type *node);
extern "C" job_status_type
job_queue_node_get_status(const job_queue_node_type *node);
extern "C" PY_USED job_status_type job_queue_node_refresh_status(
job_queue_node_type *node, queue_driver_type *driver);
extern "C" int
job_queue_node_get_submit_attempt(const job_queue_node_type *node);

int job_queue_node_get_queue_index(const job_queue_node_type *node);
void job_queue_node_set_queue_index(job_queue_node_type *node, int queue_index);

extern "C" void job_queue_node_set_status(job_queue_node_type *node,
job_status_type new_status);
extern "C" const char *
job_queue_node_get_failure_message(job_queue_node_type *node);

#endif
214 changes: 112 additions & 102 deletions src/clib/lib/job_queue/job_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -142,10 +142,6 @@ job_status_type job_queue_node_get_status(const job_queue_node_type *node) {
return node->job_status;
}

int job_queue_node_get_submit_attempt(const job_queue_node_type *node) {
return node->submit_attempt;
}

job_queue_node_type *job_queue_node_alloc(const char *job_name,
const char *run_path,
const char *run_cmd, int argc,
Expand Down Expand Up @@ -196,116 +192,130 @@ void job_queue_node_set_status(job_queue_node_type *node,
// which are registered in the state JOB_QUEUE_RUNNING.
if (new_status == JOB_QUEUE_WAITING || new_status == JOB_QUEUE_RUNNING)
node->sim_start = time(NULL);

if (!(new_status & JOB_QUEUE_COMPLETE_STATUS))
return;
}

submit_status_type job_queue_node_submit_simple(job_queue_node_type *node,
queue_driver_type *driver) {
submit_status_type submit_status;
pthread_mutex_lock(&node->data_mutex);
job_queue_node_set_status(node, JOB_QUEUE_SUBMITTED);
void *job_data = queue_driver_submit_job(
driver, node->run_cmd, node->num_cpu, node->run_path, node->job_name,
node->argc, (const char **)node->argv);

if (job_data == NULL) {
// In this case the status of the job itself will be
// unmodified; i.e. it will still be WAITING, and a new attempt
// to submit it will be performed in the next round.
submit_status = SUBMIT_DRIVER_FAIL;
logger->warning("Failed to submit job {} (attempt {})", node->job_name,
node->submit_attempt);
pthread_mutex_unlock(&node->data_mutex);
return submit_status;
}
ERT_CLIB_SUBMODULE("queue", m) {
using namespace py::literals;
m.def("_refresh_status", [](Cwrap<job_queue_node_type> node,
Cwrap<queue_driver_type> driver) {
// release the GIL
py::gil_scoped_release release;

logger->info("Submitted job {} (attempt {})", node->job_name,
node->submit_attempt);

node->job_data = job_data;
node->submit_attempt++;
// The status JOB_QUEUE_SUBMITTED is internal, and not exported anywhere.
// The job_queue_update_status() will update this to PENDING or RUNNING at
// the next call. The important difference between SUBMITTED and WAITING is
// that SUBMITTED have job_data != NULL and the job_queue_node free
// function must be called on it.
submit_status = SUBMIT_OK;
job_queue_node_set_status(node, JOB_QUEUE_SUBMITTED);
pthread_mutex_unlock(&node->data_mutex);
return submit_status;
}
pthread_mutex_lock(&node->data_mutex);
job_status_type current_status = job_queue_node_get_status(node);

bool job_queue_node_kill_simple(job_queue_node_type *node,
queue_driver_type *driver) {
bool result = false;
pthread_mutex_lock(&node->data_mutex);
job_status_type current_status = job_queue_node_get_status(node);
if (current_status & JOB_QUEUE_CAN_KILL) {
// If the job is killed before it is even started no driver specific
// job data has been assigned; we therefore must check the
// node->job_data pointer before entering.
if (node->job_data) {
queue_driver_kill_job(driver, node->job_data);
queue_driver_free_job(driver, node->job_data);
node->job_data = NULL;
if (!node->job_data) {
pthread_mutex_unlock(&node->data_mutex);
return std::make_pair<int, std::optional<std::string>>(
int(current_status), std::nullopt);
}
job_queue_node_set_status(node, JOB_QUEUE_IS_KILLED);
logger->info("job {} set to killed", node->job_name);
result = true;
} else {
logger->warning("node_kill called but cannot kill {}", node->job_name);
}
pthread_mutex_unlock(&node->data_mutex);
return result;
}

const char *job_queue_node_get_failure_message(job_queue_node_type *node) {
if (node->fail_message.has_value())
return node->fail_message->c_str();
else
return "";
}
std::optional<std::string> error_msg = std::nullopt;

job_status_type job_queue_node_refresh_status(job_queue_node_type *node,
queue_driver_type *driver) {
pthread_mutex_lock(&node->data_mutex);
job_status_type current_status = job_queue_node_get_status(node);
if (current_status & JOB_QUEUE_RUNNING && !node->confirmed_running) {
node->confirmed_running =
!node->status_file || fs::exists(node->status_file);

if (!node->job_data) {
pthread_mutex_unlock(&node->data_mutex);
return current_status;
}
if (!node->confirmed_running) {
if ((time(nullptr) - node->sim_start) >= MAX_CONFIRMED_WAIT) {
error_msg = fmt::format(
"max_confirm_wait ({}) has passed since sim_start"
"without success; {} is assumed dead (attempt {})",
MAX_CONFIRMED_WAIT, node->job_name,
node->submit_attempt);
logger->info(error_msg.value());
job_queue_node_set_status(node,
JOB_QUEUE_DO_KILL_NODE_FAILURE);
current_status = JOB_QUEUE_DO_KILL_NODE_FAILURE;
}
}
}

if (current_status & JOB_QUEUE_CAN_UPDATE_STATUS) {
job_status_type new_status =
queue_driver_get_status(driver, node->job_data);

if (new_status == JOB_QUEUE_EXIT)
job_queue_node_fscanf_EXIT(node);

std::optional<std::string> msg = std::nullopt;

if ((current_status & JOB_QUEUE_RUNNING) &&
(node->status_file && !(fs::exists(node->status_file)))) {
// it's running, but not confirmed running.
time_t runtime = time(nullptr) - node->sim_start;
if (runtime >= MAX_CONFIRMED_WAIT) {
std::string error_msg = fmt::format(
"max_confirm_wait ({}) has passed since sim_start"
"without success; {} is assumed dead (attempt {})",
MAX_CONFIRMED_WAIT, node->job_name, node->submit_attempt);
logger->info(error_msg);
msg = error_msg;
job_status_type new_status = JOB_QUEUE_DO_KILL_NODE_FAILURE;
job_queue_node_set_status(node, new_status);
current_status = new_status;
}
}

current_status = job_queue_node_get_status(node);
if (current_status & JOB_QUEUE_CAN_UPDATE_STATUS) {
job_status_type new_status =
queue_driver_get_status(driver, node->job_data);
if (new_status == JOB_QUEUE_EXIT)
job_queue_node_fscanf_EXIT(node);
job_queue_node_set_status(node, new_status);
current_status = job_queue_node_get_status(node);
}
if (node->fail_message.has_value() and !error_msg.has_value())
error_msg = node->fail_message;

if (msg.has_value())
node->fail_message = std::move(msg);
pthread_mutex_unlock(&node->data_mutex);
return std::make_pair<int, std::optional<std::string>>(
int(current_status), std::move(error_msg));
});

m.def("_submit", [](Cwrap<job_queue_node_type> node,
Cwrap<queue_driver_type> driver) {
// release the GIL
py::gil_scoped_release release;

pthread_mutex_lock(&node->data_mutex);
job_queue_node_set_status(node, JOB_QUEUE_SUBMITTED);
void *job_data = queue_driver_submit_job(
driver, node->run_cmd, node->num_cpu, node->run_path,
node->job_name, node->argc, (const char **)node->argv);

if (job_data == nullptr) {
// In this case the status of the job itself will be
// unmodified; i.e. it will still be WAITING, and a new attempt
// to submit it will be performed in the next round.
logger->warning("Failed to submit job {} (attempt {})",
node->job_name, node->submit_attempt);
pthread_mutex_unlock(&node->data_mutex);
return static_cast<int>(SUBMIT_DRIVER_FAIL);
}

pthread_mutex_unlock(&node->data_mutex);
return current_status;
logger->info("Submitted job {} (attempt {})", node->job_name,
node->submit_attempt);

node->job_data = job_data;
node->submit_attempt++;
// The status JOB_QUEUE_SUBMITTED is internal, and not exported anywhere.
// The job_queue_update_status() will update this to PENDING or RUNNING at
// the next call. The important difference between SUBMITTED and WAITING is
// that SUBMITTED have job_data != NULL and the job_queue_node free
// function must be called on it.
job_queue_node_set_status(node, JOB_QUEUE_SUBMITTED);
pthread_mutex_unlock(&node->data_mutex);
return static_cast<int>(SUBMIT_OK);
});
m.def("_kill",
[](Cwrap<job_queue_node_type> node, Cwrap<queue_driver_type> driver) {
// release the GIL
py::gil_scoped_release release;

bool result = false;
pthread_mutex_lock(&node->data_mutex);
job_status_type current_status = job_queue_node_get_status(node);
if (current_status & JOB_QUEUE_CAN_KILL) {
// If the job is killed before it is even started no driver specific
// job data has been assigned; we therefore must check the
// node->job_data pointer before entering.
if (node->job_data) {
queue_driver_kill_job(driver, node->job_data);
queue_driver_free_job(driver, node->job_data);
node->job_data = NULL;
}
job_queue_node_set_status(node, JOB_QUEUE_IS_KILLED);
logger->info("job {} set to killed", node->job_name);
result = true;
} else {
logger->warning("node_kill called but cannot kill {}",
node->job_name);
}
pthread_mutex_unlock(&node->data_mutex);
return result;
});

m.def("_get_submit_attempt",
[](Cwrap<job_queue_node_type> node) { return node->submit_attempt; });
}
34 changes: 13 additions & 21 deletions src/ert/job_queue/job_queue_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,12 @@
from cwrap import BaseCClass
from ecl.util.util import StringList

from ert._clib.queue import ( # pylint: disable=import-error
_get_submit_attempt,
_kill,
_refresh_status,
_submit,
)
from ert.callbacks import forward_model_ok
from ert.load_status import LoadStatus

Expand Down Expand Up @@ -70,26 +76,12 @@ class JobQueueNode(BaseCClass): # type: ignore
bind=False,
)
_free = ResPrototype("void job_queue_node_free(job_queue_node)")
_submit = ResPrototype(
"job_submit_status_type_enum job_queue_node_submit_simple(job_queue_node, driver)" # noqa
)
_run_kill = ResPrototype("bool job_queue_node_kill_simple(job_queue_node, driver)")

_get_status = ResPrototype(
"job_status_type_enum job_queue_node_get_status(job_queue_node)"
)
_set_queue_status = ResPrototype(
"void job_queue_node_set_status(job_queue_node, job_status_type_enum)"
)
_get_submit_attempt = ResPrototype(
"int job_queue_node_get_submit_attempt(job_queue_node)"
)
_refresh_status = ResPrototype(
"job_status_type_enum job_queue_node_refresh_status(job_queue_node, driver)"
)
_get_failure_message = ResPrototype(
"char* job_queue_node_get_failure_message(job_queue_node)"
)

# pylint: disable=too-many-arguments
def __init__(
Expand Down Expand Up @@ -155,13 +147,13 @@ def timed_out(self) -> bool:

@property
def submit_attempt(self) -> int:
return self._get_submit_attempt()
return _get_submit_attempt(self)

def _poll_queue_status(self, driver: "Driver") -> JobStatus:
status = self._refresh_status(driver)
msg = self._get_failure_message()
self._status_msg = msg if msg else self._status_msg
return status
result, msg = _refresh_status(self, driver)
if msg is not None:
self._status_msg = msg
return JobStatus(result)

@property
def queue_status(self) -> JobStatus:
Expand All @@ -172,7 +164,7 @@ def queue_status(self, value: JobStatus) -> None:
return self._set_queue_status(value)

def submit(self, driver: "Driver") -> SubmitStatus:
return self._submit(driver)
return SubmitStatus(_submit(self, driver))

def run_done_callback(self) -> Optional[LoadStatus]:
callback_status, status_msg = forward_model_ok(self.run_arg)
Expand Down Expand Up @@ -347,7 +339,7 @@ def _transition_status(
self.thread_status = thread_status

def _kill(self, driver: "Driver") -> None:
self._run_kill(driver)
_kill(self, driver)
self._tried_killing += 1

def run(self, driver: "Driver", pool_sema: Semaphore, max_submit: int = 2) -> None:
Expand Down