Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/proof-alarm.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ jobs:
- name: Check
run: |
TMPFILE=$(mktemp)
echo "e857a2e5f72ab77a94e56372d89abf99 source/linux/epoll_event_loop.c" > $TMPFILE
echo "844fcb2768afd3ab013fa49447c1b855 source/linux/epoll_event_loop.c" > $TMPFILE
md5sum --check $TMPFILE

# No further steps if successful
Expand Down
17 changes: 16 additions & 1 deletion include/aws/io/event_loop.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ struct aws_event_loop_vtable {
void *user_data);
int (*unsubscribe_from_io_events)(struct aws_event_loop *event_loop, struct aws_io_handle *handle);
void (*free_io_event_resources)(void *user_data);
void *(*get_base_event_loop_group)(struct aws_event_loop *event_loop);
bool (*is_on_callers_thread)(struct aws_event_loop *event_loop);
};

Expand Down Expand Up @@ -175,6 +174,22 @@ struct aws_event_loop_group *aws_event_loop_group_acquire(struct aws_event_loop_
AWS_IO_API
void aws_event_loop_group_release(struct aws_event_loop_group *el_group);

/**
* Increments the reference count on the event loop group from event loop, allowing the caller to take a reference to
* it.
*
* Returns the base event loop group of the event loop, or null if the event loop does not belong to a group.
*/
AWS_IO_API
struct aws_event_loop_group *aws_event_loop_group_acquire_from_event_loop(struct aws_event_loop *event_loop);

/**
* Decrements the ref count of the event loop's base event loop group. When the ref count drops to zero, the event loop
* group will be destroyed.
*/
AWS_IO_API
void aws_event_loop_group_release_from_event_loop(struct aws_event_loop *event_loop);

/**
* Returns the event loop at a particular index. If the index is out of bounds, null is returned.
*/
Expand Down
9 changes: 1 addition & 8 deletions include/aws/io/private/event_loop_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ struct aws_event_loop {
uint64_t latest_tick_start;
size_t current_tick_latency_sum;
struct aws_atomic_var next_flush_time;
struct aws_event_loop_group *base_elg;
void *impl_data;
};

Expand Down Expand Up @@ -321,14 +322,6 @@ int aws_event_loop_unsubscribe_from_io_events(struct aws_event_loop *event_loop,
AWS_IO_API
void aws_event_loop_free_io_event_resources(struct aws_event_loop *event_loop, struct aws_io_handle *handle);

/**
* Retrieves the aws_event_loop_group that is the parent of the aws_event_loop. This is only supported when using a
* dispatch queue event loop as they are async and their sockets need to retain a refcount on the elg to keep it alive
* and insure it has not been asyncronously destroyed before anything that needs it.
*/
AWS_IO_API
void *get_base_event_loop_group(struct aws_event_loop *event_loop);

AWS_IO_API
struct aws_event_loop_group *aws_event_loop_group_new_internal(
struct aws_allocator *allocator,
Expand Down
13 changes: 3 additions & 10 deletions source/bsd/kqueue_event_loop.c
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,7 @@ static int s_subscribe_to_io_events(
void *user_data);
static int s_unsubscribe_from_io_events(struct aws_event_loop *event_loop, struct aws_io_handle *handle);
static void s_free_io_event_resources(void *user_data);
static void *s_get_base_event_loop_group(struct aws_event_loop *event_loop) {
(void)event_loop;
AWS_LOGF_ERROR(
AWS_LS_IO_EVENT_LOOP,
"id=%p: get_base_event_loop_group() is not supported using KQueue Event Loops",
(void *)event_loop);
aws_raise_error(AWS_ERROR_PLATFORM_NOT_SUPPORTED);
return NULL;
}

static bool s_is_event_thread(struct aws_event_loop *event_loop);

static void aws_event_loop_thread(void *user_data);
Expand Down Expand Up @@ -148,7 +140,6 @@ struct aws_event_loop_vtable s_kqueue_vtable = {
.subscribe_to_io_events = s_subscribe_to_io_events,
.unsubscribe_from_io_events = s_unsubscribe_from_io_events,
.free_io_event_resources = s_free_io_event_resources,
.get_base_event_loop_group = s_get_base_event_loop_group,
.is_on_callers_thread = s_is_event_thread,
};

Expand Down Expand Up @@ -275,6 +266,8 @@ struct aws_event_loop *aws_event_loop_new_with_kqueue(

event_loop->vtable = &s_kqueue_vtable;

event_loop->base_elg = options->parent_elg;

/* success */
return event_loop;

Expand Down
15 changes: 1 addition & 14 deletions source/darwin/dispatch_queue_event_loop.c
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ static void s_free_io_event_resources(void *user_data) {
/* No io event resources to free */
(void)user_data;
}
static void *s_get_base_event_loop_group(struct aws_event_loop *event_loop);
static bool s_is_on_callers_thread(struct aws_event_loop *event_loop);

static struct aws_event_loop_vtable s_vtable = {
Expand All @@ -80,7 +79,6 @@ static struct aws_event_loop_vtable s_vtable = {
.subscribe_to_io_events = s_subscribe_to_io_events,
.unsubscribe_from_io_events = s_unsubscribe_from_io_events,
.free_io_event_resources = s_free_io_event_resources,
.get_base_event_loop_group = s_get_base_event_loop_group,
.is_on_callers_thread = s_is_on_callers_thread,
};

Expand Down Expand Up @@ -249,12 +247,12 @@ struct aws_event_loop *aws_event_loop_new_with_dispatch_queue(
}

loop->vtable = &s_vtable;
loop->base_elg = options->parent_elg;

dispatch_loop = aws_mem_calloc(alloc, 1, sizeof(struct aws_dispatch_loop));
dispatch_loop->allocator = alloc;
loop->impl_data = dispatch_loop;
dispatch_loop->base_loop = loop;
dispatch_loop->base_elg = options->parent_elg;
dispatch_loop->synced_data.execution_state = AWS_DLES_SUSPENDED;
aws_ref_count_init(&dispatch_loop->ref_count, dispatch_loop, s_dispatch_event_loop_on_zero_ref_count);

Expand Down Expand Up @@ -738,17 +736,6 @@ static int s_connect_to_io_completion_port(struct aws_event_loop *event_loop, st
return AWS_OP_SUCCESS;
}

/*
* Because dispatch queue is async we may need to acquire a refcount of the parent event loop group to prevent
* the event loop or dispatch loop from being cleaned out from underneath something that needs it. We expose the
* base elg so anything that needs to insure the event loops and dispatch loops don't get prematurely cleaned can
* hold a refcount.
*/
static void *s_get_base_event_loop_group(struct aws_event_loop *event_loop) {
struct aws_dispatch_loop *dispatch_loop = event_loop->impl_data;
return dispatch_loop->base_elg;
}

/*
* We use aws_thread_id_equal with syched_data.current_thread_id and synced_data.is_executing to determine
* if operation is being executed on the same dispatch queue thread.
Expand Down
1 change: 0 additions & 1 deletion source/darwin/dispatch_queue_event_loop_private.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ struct aws_dispatch_loop {
dispatch_queue_t dispatch_queue;
struct aws_task_scheduler scheduler;
struct aws_event_loop *base_loop;
struct aws_event_loop_group *base_elg;

struct aws_ref_count ref_count;

Expand Down
60 changes: 46 additions & 14 deletions source/darwin/nw_socket.c
Original file line number Diff line number Diff line change
Expand Up @@ -412,19 +412,28 @@ static bool s_validate_event_loop(struct aws_event_loop *event_loop) {
return event_loop && event_loop->vtable && event_loop->impl_data;
}

static void s_set_event_loop(struct aws_socket *aws_socket, struct aws_event_loop *event_loop) {
static bool s_set_event_loop(struct aws_socket *aws_socket, struct aws_event_loop *event_loop) {
aws_socket->event_loop = event_loop;
struct nw_socket *nw_socket = aws_socket->impl;
// Never re-assign an event loop
AWS_FATAL_ASSERT(nw_socket->event_loop == NULL);
nw_socket->event_loop = event_loop;

// Acquire the event loop group from the event loop. The event loop group will be released when the socket is
// destroyed.
if (!aws_event_loop_group_acquire_from_event_loop(event_loop)) {
AWS_LOGF_ERROR(
AWS_LS_IO_SOCKET,
"id=%p nw_socket=%p: failed to acquire event loop group.",
(void *)aws_socket,
(void *)nw_socket);
return false;
}

nw_socket->event_loop = event_loop;
AWS_LOGF_DEBUG(
AWS_LS_IO_SOCKET,
"id=%p nw_socket=%p: s_set_event_loop: socket acquire event loop group.",
(void *)aws_socket,
(void *)nw_socket);
aws_event_loop_group_acquire(get_base_event_loop_group(event_loop));
AWS_LS_IO_SOCKET, "id=%p nw_socket=%p: nw_socket set event loop.", (void *)aws_socket, (void *)nw_socket);

return true;
}

static void s_release_event_loop(struct nw_socket *nw_socket) {
Expand All @@ -433,7 +442,7 @@ static void s_release_event_loop(struct nw_socket *nw_socket) {
AWS_LS_IO_SOCKET, "nw_socket=%p: s_release_event_loop: socket has not event loop.", (void *)nw_socket);
return;
}
aws_event_loop_group_release(get_base_event_loop_group(nw_socket->event_loop));
aws_event_loop_group_release_from_event_loop(nw_socket->event_loop);
AWS_LOGF_DEBUG(
AWS_LS_IO_SOCKET, "nw_socket=%p: s_release_event_loop: socket release event loop group.", (void *)nw_socket);
nw_socket->event_loop = NULL;
Expand Down Expand Up @@ -1798,7 +1807,10 @@ static int s_socket_connect_fn(struct aws_socket *socket, struct aws_socket_conn
}

/* event_loop must be set prior to setup of socket parameters. */
s_set_event_loop(socket, event_loop);
if (!s_set_event_loop(socket, event_loop)) {
goto error;
}

if (s_setup_socket_params(nw_socket, &socket->options)) {
goto error;
}
Expand Down Expand Up @@ -2316,15 +2328,25 @@ static int s_socket_start_accept_fn(
return aws_raise_error(AWS_IO_EVENT_LOOP_ALREADY_ASSIGNED);
}

if (!s_set_event_loop(socket, accept_loop)) {
AWS_LOGF_ERROR(
AWS_LS_IO_SOCKET,
"id=%p handle=%p: failed to set event loop %p, invalid event loop. It is most likely the event loop does "
"not has a parent event loop group.",
(void *)socket,
socket->io_handle.data.handle,
(void *)accept_loop);
s_unlock_socket_synced_data(nw_socket);
return aws_raise_error(AWS_IO_SOCKET_INVALID_OPTIONS);
}

aws_event_loop_connect_handle_to_io_completion_port(accept_loop, &socket->io_handle);
socket->accept_result_fn = options.on_accept_result;
socket->connect_accept_user_data = options.on_accept_result_user_data;

nw_socket->on_accept_started_fn = options.on_accept_start;
nw_socket->listen_accept_started_user_data = options.on_accept_start_user_data;

s_set_event_loop(socket, accept_loop);

nw_listener_set_state_changed_handler(
socket->io_handle.data.handle, ^(nw_listener_state_t state, nw_error_t error) {
s_handle_listener_state_changed_fn(nw_socket, state, error);
Expand Down Expand Up @@ -2433,9 +2455,20 @@ static int s_socket_assign_to_event_loop_fn(struct aws_socket *socket, struct aw
socket->io_handle.data.handle,
(void *)event_loop);

if (aws_event_loop_connect_handle_to_io_completion_port(event_loop, &socket->io_handle)) {
if (!s_set_event_loop(socket, event_loop)) {
AWS_LOGF_ERROR(
AWS_LS_IO_SOCKET,
"id=%p handle=%p: assigning event loop %p failed. Invalid event loop. It is likely the event loop "
"does not has a parent event loop group.",
(void *)socket,
socket->io_handle.data.handle,
(void *)event_loop);
aws_raise_error(AWS_IO_SOCKET_INVALID_OPTIONS);
return AWS_IO_SOCKET_INVALID_OPTIONS;
}

AWS_LOGF_DEBUG(
if (aws_event_loop_connect_handle_to_io_completion_port(event_loop, &socket->io_handle)) {
AWS_LOGF_ERROR(
AWS_LS_IO_SOCKET,
"id=%p handle=%p: assigning event loop %p failed",
(void *)socket,
Expand All @@ -2444,7 +2477,6 @@ static int s_socket_assign_to_event_loop_fn(struct aws_socket *socket, struct aw
return AWS_OP_ERR;
}

s_set_event_loop(socket, event_loop);
nw_connection_start(socket->io_handle.data.handle);
return AWS_OP_SUCCESS;
}
Expand Down
15 changes: 10 additions & 5 deletions source/event_loop.c
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,16 @@ void aws_event_loop_group_release(struct aws_event_loop_group *el_group) {
}
}

struct aws_event_loop_group *aws_event_loop_group_acquire_from_event_loop(struct aws_event_loop *event_loop) {
AWS_ASSERT(event_loop);
return aws_event_loop_group_acquire(event_loop->base_elg);
}

void aws_event_loop_group_release_from_event_loop(struct aws_event_loop *event_loop) {
AWS_ASSERT(event_loop);
aws_event_loop_group_release(event_loop->base_elg);
}

size_t aws_event_loop_group_get_loop_count(const struct aws_event_loop_group *el_group) {
return aws_array_list_length(&el_group->event_loops);
}
Expand Down Expand Up @@ -655,11 +665,6 @@ void aws_event_loop_free_io_event_resources(struct aws_event_loop *event_loop, s
event_loop->vtable->free_io_event_resources(handle->additional_data);
}

void *get_base_event_loop_group(struct aws_event_loop *event_loop) {
AWS_ASSERT(event_loop && event_loop->vtable->get_base_event_loop_group);
return event_loop->vtable->get_base_event_loop_group(event_loop);
}

bool aws_event_loop_thread_is_callers_thread(struct aws_event_loop *event_loop) {
AWS_ASSERT(event_loop->vtable && event_loop->vtable->is_on_callers_thread);
return event_loop->vtable->is_on_callers_thread(event_loop);
Expand Down
11 changes: 1 addition & 10 deletions source/linux/epoll_event_loop.c
Original file line number Diff line number Diff line change
Expand Up @@ -68,15 +68,6 @@ static int s_subscribe_to_io_events(
void *user_data);
static int s_unsubscribe_from_io_events(struct aws_event_loop *event_loop, struct aws_io_handle *handle);
static void s_free_io_event_resources(void *user_data);
static void *s_get_base_event_loop_group(struct aws_event_loop *event_loop) {
(void)event_loop;
AWS_LOGF_ERROR(
AWS_LS_IO_EVENT_LOOP,
"id=%p: get_base_event_loop_group() is not supported using Epoll Event Loops",
(void *)event_loop);
aws_raise_error(AWS_ERROR_PLATFORM_NOT_SUPPORTED);
return NULL;
}
static bool s_is_on_callers_thread(struct aws_event_loop *event_loop);

static void aws_event_loop_thread(void *args);
Expand All @@ -94,7 +85,6 @@ static struct aws_event_loop_vtable s_vtable = {
.subscribe_to_io_events = s_subscribe_to_io_events,
.unsubscribe_from_io_events = s_unsubscribe_from_io_events,
.free_io_event_resources = s_free_io_event_resources,
.get_base_event_loop_group = s_get_base_event_loop_group,
.is_on_callers_thread = s_is_on_callers_thread,
};

Expand Down Expand Up @@ -218,6 +208,7 @@ struct aws_event_loop *aws_event_loop_new_with_epoll(

loop->impl_data = epoll_loop;
loop->vtable = &s_vtable;
loop->base_elg = options->parent_elg;

return loop;

Expand Down
11 changes: 1 addition & 10 deletions source/windows/iocp/iocp_event_loop.c
Original file line number Diff line number Diff line change
Expand Up @@ -124,15 +124,6 @@ static int s_subscribe_to_io_events(
}
static int s_unsubscribe_from_io_events(struct aws_event_loop *event_loop, struct aws_io_handle *handle);
static void s_free_io_event_resources(void *user_data);
static void *s_get_base_event_loop_group(struct aws_event_loop *event_loop) {
(void)event_loop;
AWS_LOGF_ERROR(
AWS_LS_IO_EVENT_LOOP,
"id=%p: get_base_event_loop_group() is not supported using IOCP Event Loops",
(void *)event_loop);
aws_raise_error(AWS_ERROR_PLATFORM_NOT_SUPPORTED);
return NULL;
}
static void aws_event_loop_thread(void *user_data);

void aws_overlapped_init(
Expand Down Expand Up @@ -169,7 +160,6 @@ struct aws_event_loop_vtable s_iocp_vtable = {
.subscribe_to_io_events = s_subscribe_to_io_events,
.unsubscribe_from_io_events = s_unsubscribe_from_io_events,
.free_io_event_resources = s_free_io_event_resources,
.get_base_event_loop_group = s_get_base_event_loop_group,
.is_on_callers_thread = s_is_event_thread,
};

Expand Down Expand Up @@ -272,6 +262,7 @@ struct aws_event_loop *aws_event_loop_new_with_iocp(
event_loop->impl_data = impl;

event_loop->vtable = &s_iocp_vtable;
event_loop->base_elg = options->parent_elg;

return event_loop;

Expand Down
1 change: 1 addition & 0 deletions tests/vcc/new_destroy.c
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ struct aws_event_loop *aws_event_loop_new_with_epoll(

loop->impl_data = epoll_loop;
loop->vtable = &s_vtable;
loop->base_elg = options->parent_elg;

_(wrap(&epoll_loop->task_pre_queue.head))
_(wrap(&epoll_loop->task_pre_queue.tail))
Expand Down