Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 31 additions & 9 deletions erts/emulator/sys/common/erl_check_io.c
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ typedef struct {
EventStateFlags flags;
int count; /* Number of times this fd has triggered
without being deselected. */
Eterm last_select_pid; /* Last process that called enif_select */
} ErtsDrvEventState;

struct drv_ev_state_shared {
Expand Down Expand Up @@ -290,6 +291,7 @@ static ERTS_INLINE int grow_drv_ev_state(ErtsSysFdType fd) {
sizeof(ErtsDrvEventState) * (new_len - old_len));
for (i = old_len; i < new_len; i++) {
drv_ev_state.v[i].fd = (ErtsSysFdType) i;
drv_ev_state.v[i].last_select_pid = NIL;
}
erts_atomic_set_nob(&drv_ev_state.len, new_len);
for (i=0; i<ERTS_CHECK_IO_DRV_EV_STATE_LOCK_CNT; i++) {
Expand Down Expand Up @@ -333,6 +335,8 @@ static ERTS_INLINE ErtsDrvEventState* new_drv_ev_state(ErtsDrvEventState *state,
tmpl.active_events = 0;
tmpl.type = ERTS_EV_TYPE_NONE;
tmpl.flags = 0;
tmpl.count = 0;
tmpl.last_select_pid = NIL;

return (ErtsDrvEventState *) safe_hash_put(&drv_ev_state.tab, (void *) &tmpl);
}
Expand Down Expand Up @@ -922,6 +926,7 @@ deselect(ErtsDrvEventState *state, int mode)
state->type = ERTS_EV_TYPE_NONE;
state->flags = ERTS_EV_FLAG_CLEAR;
state->count = 0;
state->last_select_pid = NIL;
} else {
ErtsPollEvents new_events =
erts_io_control(state, ERTS_POLL_OP_MOD, state->active_events);
Expand Down Expand Up @@ -1248,6 +1253,7 @@ enif_select_x(ErlNifEnv* env,
ErtsDrvSelectDataState *free_select = NULL;
ErtsNifSelectDataState *free_nif = NULL;
ErtsPollEvents new_events = 0;
const Eterm recipient = pid ? pid->pid : env->proc->common.id;
#ifdef DEBUG_PRINT_MODE
char tmp_buff[255];
#endif
Expand Down Expand Up @@ -1340,15 +1346,24 @@ enif_select_x(ErlNifEnv* env,
old_events = state->events;

if (on) {
ASSERT(is_internal_pid(recipient));

if (state->type == ERTS_EV_TYPE_NONE)
ctl_op = ERTS_POLL_OP_ADD;
#if ERTS_POLL_USE_SCHEDULER_POLLING
else {
if (!(state->flags & ERTS_EV_FLAG_SCHEDULER) &&
!(state->flags & ERTS_EV_FLAG_FALLBACK) &&
(ctl_events & ERTS_POLL_EV_IN)) {
if (erts_sched_poll_enabled() && (state->flags & ERTS_EV_FLAG_NIF_SELECT) &&
state->count++ > 10) {
else if (ctl_events & ERTS_POLL_EV_IN) {
if ((state->flags & (ERTS_EV_FLAG_SCHEDULER |
ERTS_EV_FLAG_FALLBACK |
ERTS_EV_FLAG_NIF_SELECT)) == ERTS_EV_FLAG_NIF_SELECT
&& erts_sched_poll_enabled()) {
/* Check if this is a different process than last time.
* If so, reset the counter to prevent scheduler pollset migration
* in multi-process scenarios (e.g., multiple socket:accept callers). */
if (state->last_select_pid != recipient) {
state->count = 0;
state->last_select_pid = recipient;
}
else if (++(state->count) > 10) {
int wake_poller = 0;
DEBUG_PRINT_FD("moving to scheduler ps", state);
new_events = erts_poll_control(get_scheduler_pollset(), fd, ERTS_POLL_OP_ADD,
Expand All @@ -1359,7 +1374,7 @@ enif_select_x(ErlNifEnv* env,
old_events = state->events;
}
}
if (ctl_events & ERTS_POLL_EV_IN) {
{
ErtsSchedulerData *esdp = erts_get_scheduler_data();
erts_io_clear_nif_select(fd, state);
/* Clear the marker in scheduler data so that the scheduler
Expand Down Expand Up @@ -1408,6 +1423,8 @@ enif_select_x(ErlNifEnv* env,
if (state->type == ERTS_EV_TYPE_NIF && !old_events) {
state->type = ERTS_EV_TYPE_NONE;
state->flags = 0;
state->count = 0;
state->last_select_pid = NIL;
state->driver.nif->in.pid = NIL;
state->driver.nif->out.pid = NIL;
state->driver.nif->err.pid = NIL;
Expand All @@ -1421,8 +1438,6 @@ enif_select_x(ErlNifEnv* env,
|| state->type == ERTS_EV_TYPE_NONE);

if (on) {
const Eterm recipient = pid ? pid->pid : env->proc->common.id;
ASSERT(is_internal_pid(recipient));
if (!state->driver.nif)
state->driver.nif = alloc_nif_select_data();
if (state->type == ERTS_EV_TYPE_NONE) {
Expand Down Expand Up @@ -1497,6 +1512,7 @@ enif_select_x(ErlNifEnv* env,
ret |= ERL_NIF_SELECT_STOP_SCHEDULED;
}
state->count = 0;
state->last_select_pid = NIL;
state->flags &= ~ERTS_EV_FLAG_WANT_ERROR;
}
else
Expand Down Expand Up @@ -1777,6 +1793,8 @@ steal_pending_stop_use(erts_dsprintf_buf_t *dsbufp, ErlDrvPort ix,
}
state->type = ERTS_EV_TYPE_NONE;
state->flags = 0;
state->count = 0;
state->last_select_pid = NIL;
state->driver.stop.drv_ptr = NULL;
}
else {
Expand Down Expand Up @@ -1816,6 +1834,8 @@ steal_pending_stop_nif(erts_dsprintf_buf_t *dsbufp, ErtsResource* resource,
enif_release_resource(state->driver.stop.resource->data);
state->type = ERTS_EV_TYPE_NONE;
state->flags = 0;
state->count = 0;
state->last_select_pid = NIL;
state->driver.stop.resource = NULL;
}
else {
Expand Down Expand Up @@ -2014,6 +2034,7 @@ erts_check_io(ErtsPollThread *psi, ErtsMonotonicTime timeout_time, int poll_only
erts_poll_control(get_scheduler_pollset(), fd, ERTS_POLL_OP_DEL, 0, &wake_poller);
state->flags &= ~(ERTS_EV_FLAG_SCHEDULER|ERTS_EV_FLAG_IN_SCHEDULER);
state->count = 0;
state->last_select_pid = NIL;
}
#endif
} else {
Expand All @@ -2034,6 +2055,7 @@ erts_check_io(ErtsPollThread *psi, ErtsMonotonicTime timeout_time, int poll_only
state->flags &= ~(ERTS_EV_FLAG_IN_SCHEDULER|ERTS_EV_FLAG_SCHEDULER);
state->active_events &= ~ERTS_POLL_EV_IN;
state->count = 0;
state->last_select_pid = NIL;
}
} else
#endif
Expand Down
Loading