Skip to content
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/proof-alarm.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ jobs:
- name: Check
run: |
TMPFILE=$(mktemp)
echo "e857a2e5f72ab77a94e56372d89abf99 source/linux/epoll_event_loop.c" > $TMPFILE
echo "844fcb2768afd3ab013fa49447c1b855 source/linux/epoll_event_loop.c" > $TMPFILE
md5sum --check $TMPFILE

# No further steps if successful
Expand Down
17 changes: 16 additions & 1 deletion include/aws/io/event_loop.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ struct aws_event_loop_vtable {
void *user_data);
int (*unsubscribe_from_io_events)(struct aws_event_loop *event_loop, struct aws_io_handle *handle);
void (*free_io_event_resources)(void *user_data);
void *(*get_base_event_loop_group)(struct aws_event_loop *event_loop);
bool (*is_on_callers_thread)(struct aws_event_loop *event_loop);
};

Expand Down Expand Up @@ -175,6 +174,22 @@ struct aws_event_loop_group *aws_event_loop_group_acquire(struct aws_event_loop_
AWS_IO_API
void aws_event_loop_group_release(struct aws_event_loop_group *el_group);

/**
* Increments the reference count on the event loop group from event loop, allowing the caller to take a reference to
* it.
*
* Returns the base event loop group of the event loop, or null if the event loop does not belong to a group.
*/
AWS_IO_API
struct aws_event_loop_group *aws_event_loop_group_acquire_from_event_loop(struct aws_event_loop *event_loop);

/**
* Decrements the ref count of the event loop's base event loop group. When the ref count drops to zero, the event loop
* group will be destroyed.
*/
AWS_IO_API
void aws_event_loop_group_release_from_event_loop(struct aws_event_loop *event_loop);

/**
* Returns the event loop at a particular index. If the index is out of bounds, null is returned.
*/
Expand Down
9 changes: 1 addition & 8 deletions include/aws/io/private/event_loop_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ struct aws_event_loop {
uint64_t latest_tick_start;
size_t current_tick_latency_sum;
struct aws_atomic_var next_flush_time;
struct aws_event_loop_group *base_elg;
void *impl_data;
};

Expand Down Expand Up @@ -321,14 +322,6 @@ int aws_event_loop_unsubscribe_from_io_events(struct aws_event_loop *event_loop,
AWS_IO_API
void aws_event_loop_free_io_event_resources(struct aws_event_loop *event_loop, struct aws_io_handle *handle);

/**
* Retrieves the aws_event_loop_group that is the parent of the aws_event_loop. This is only supported when using a
* dispatch queue event loop as they are async and their sockets need to retain a refcount on the elg to keep it alive
* and insure it has not been asyncronously destroyed before anything that needs it.
*/
AWS_IO_API
void *get_base_event_loop_group(struct aws_event_loop *event_loop);

AWS_IO_API
struct aws_event_loop_group *aws_event_loop_group_new_internal(
struct aws_allocator *allocator,
Expand Down
13 changes: 3 additions & 10 deletions source/bsd/kqueue_event_loop.c
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,7 @@ static int s_subscribe_to_io_events(
void *user_data);
static int s_unsubscribe_from_io_events(struct aws_event_loop *event_loop, struct aws_io_handle *handle);
static void s_free_io_event_resources(void *user_data);
static void *s_get_base_event_loop_group(struct aws_event_loop *event_loop) {
(void)event_loop;
AWS_LOGF_ERROR(
AWS_LS_IO_EVENT_LOOP,
"id=%p: get_base_event_loop_group() is not supported using KQueue Event Loops",
(void *)event_loop);
aws_raise_error(AWS_ERROR_PLATFORM_NOT_SUPPORTED);
return NULL;
}

static bool s_is_event_thread(struct aws_event_loop *event_loop);

static void aws_event_loop_thread(void *user_data);
Expand Down Expand Up @@ -148,7 +140,6 @@ struct aws_event_loop_vtable s_kqueue_vtable = {
.subscribe_to_io_events = s_subscribe_to_io_events,
.unsubscribe_from_io_events = s_unsubscribe_from_io_events,
.free_io_event_resources = s_free_io_event_resources,
.get_base_event_loop_group = s_get_base_event_loop_group,
.is_on_callers_thread = s_is_event_thread,
};

Expand Down Expand Up @@ -275,6 +266,8 @@ struct aws_event_loop *aws_event_loop_new_with_kqueue(

event_loop->vtable = &s_kqueue_vtable;

event_loop->base_elg = options->parent_elg;

/* success */
return event_loop;

Expand Down
15 changes: 1 addition & 14 deletions source/darwin/dispatch_queue_event_loop.c
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ static void s_free_io_event_resources(void *user_data) {
/* No io event resources to free */
(void)user_data;
}
static void *s_get_base_event_loop_group(struct aws_event_loop *event_loop);
static bool s_is_on_callers_thread(struct aws_event_loop *event_loop);

static struct aws_event_loop_vtable s_vtable = {
Expand All @@ -80,7 +79,6 @@ static struct aws_event_loop_vtable s_vtable = {
.subscribe_to_io_events = s_subscribe_to_io_events,
.unsubscribe_from_io_events = s_unsubscribe_from_io_events,
.free_io_event_resources = s_free_io_event_resources,
.get_base_event_loop_group = s_get_base_event_loop_group,
.is_on_callers_thread = s_is_on_callers_thread,
};

Expand Down Expand Up @@ -249,12 +247,12 @@ struct aws_event_loop *aws_event_loop_new_with_dispatch_queue(
}

loop->vtable = &s_vtable;
loop->base_elg = options->parent_elg;

dispatch_loop = aws_mem_calloc(alloc, 1, sizeof(struct aws_dispatch_loop));
dispatch_loop->allocator = alloc;
loop->impl_data = dispatch_loop;
dispatch_loop->base_loop = loop;
dispatch_loop->base_elg = options->parent_elg;
dispatch_loop->synced_data.execution_state = AWS_DLES_SUSPENDED;
aws_ref_count_init(&dispatch_loop->ref_count, dispatch_loop, s_dispatch_event_loop_on_zero_ref_count);

Expand Down Expand Up @@ -738,17 +736,6 @@ static int s_connect_to_io_completion_port(struct aws_event_loop *event_loop, st
return AWS_OP_SUCCESS;
}

/*
* Because dispatch queue is async we may need to acquire a refcount of the parent event loop group to prevent
* the event loop or dispatch loop from being cleaned out from underneath something that needs it. We expose the
* base elg so anything that needs to insure the event loops and dispatch loops don't get prematurely cleaned can
* hold a refcount.
*/
static void *s_get_base_event_loop_group(struct aws_event_loop *event_loop) {
struct aws_dispatch_loop *dispatch_loop = event_loop->impl_data;
return dispatch_loop->base_elg;
}

/*
* We use aws_thread_id_equal with syched_data.current_thread_id and synced_data.is_executing to determine
* if operation is being executed on the same dispatch queue thread.
Expand Down
1 change: 0 additions & 1 deletion source/darwin/dispatch_queue_event_loop_private.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ struct aws_dispatch_loop {
dispatch_queue_t dispatch_queue;
struct aws_task_scheduler scheduler;
struct aws_event_loop *base_loop;
struct aws_event_loop_group *base_elg;

struct aws_ref_count ref_count;

Expand Down
60 changes: 46 additions & 14 deletions source/darwin/nw_socket.c
Original file line number Diff line number Diff line change
Expand Up @@ -412,19 +412,28 @@ static bool s_validate_event_loop(struct aws_event_loop *event_loop) {
return event_loop && event_loop->vtable && event_loop->impl_data;
}

static void s_set_event_loop(struct aws_socket *aws_socket, struct aws_event_loop *event_loop) {
static int s_set_event_loop(struct aws_socket *aws_socket, struct aws_event_loop *event_loop) {
aws_socket->event_loop = event_loop;
struct nw_socket *nw_socket = aws_socket->impl;
// Never re-assign an event loop
AWS_FATAL_ASSERT(nw_socket->event_loop == NULL);
nw_socket->event_loop = event_loop;

// Acquire the event loop group from the event loop. The event loop group will be released when the socket is
// destroyed.
if (!aws_event_loop_group_acquire_from_event_loop(event_loop)) {
AWS_LOGF_ERROR(
AWS_LS_IO_SOCKET,
"id=%p nw_socket=%p: failed to acquire event loop group.",
(void *)aws_socket,
(void *)nw_socket);
return AWS_OP_ERR;
}

nw_socket->event_loop = event_loop;
AWS_LOGF_DEBUG(
AWS_LS_IO_SOCKET,
"id=%p nw_socket=%p: s_set_event_loop: socket acquire event loop group.",
(void *)aws_socket,
(void *)nw_socket);
aws_event_loop_group_acquire(get_base_event_loop_group(event_loop));
AWS_LS_IO_SOCKET, "id=%p nw_socket=%p: nw_socket set event loop.", (void *)aws_socket, (void *)nw_socket);

return AWS_OP_SUCCESS;
}

static void s_release_event_loop(struct nw_socket *nw_socket) {
Expand All @@ -433,7 +442,7 @@ static void s_release_event_loop(struct nw_socket *nw_socket) {
AWS_LS_IO_SOCKET, "nw_socket=%p: s_release_event_loop: socket has not event loop.", (void *)nw_socket);
return;
}
aws_event_loop_group_release(get_base_event_loop_group(nw_socket->event_loop));
aws_event_loop_group_release_from_event_loop(nw_socket->event_loop);
AWS_LOGF_DEBUG(
AWS_LS_IO_SOCKET, "nw_socket=%p: s_release_event_loop: socket release event loop group.", (void *)nw_socket);
nw_socket->event_loop = NULL;
Expand Down Expand Up @@ -1798,7 +1807,10 @@ static int s_socket_connect_fn(struct aws_socket *socket, struct aws_socket_conn
}

/* event_loop must be set prior to setup of socket parameters. */
s_set_event_loop(socket, event_loop);
if (s_set_event_loop(socket, event_loop)) {
goto error;
}

if (s_setup_socket_params(nw_socket, &socket->options)) {
goto error;
}
Expand Down Expand Up @@ -2316,15 +2328,25 @@ static int s_socket_start_accept_fn(
return aws_raise_error(AWS_IO_EVENT_LOOP_ALREADY_ASSIGNED);
}

if (s_set_event_loop(socket, accept_loop)) {
AWS_LOGF_ERROR(
AWS_LS_IO_SOCKET,
"id=%p handle=%p: failed to set event loop %p, invalid event loop. It is most likely the event loop does "
"not has a parent event loop group.",
(void *)socket,
socket->io_handle.data.handle,
(void *)accept_loop);
s_unlock_socket_synced_data(nw_socket);
return aws_raise_error(AWS_IO_SOCKET_INVALID_OPTIONS);
}

aws_event_loop_connect_handle_to_io_completion_port(accept_loop, &socket->io_handle);
socket->accept_result_fn = options.on_accept_result;
socket->connect_accept_user_data = options.on_accept_result_user_data;

nw_socket->on_accept_started_fn = options.on_accept_start;
nw_socket->listen_accept_started_user_data = options.on_accept_start_user_data;

s_set_event_loop(socket, accept_loop);

nw_listener_set_state_changed_handler(
socket->io_handle.data.handle, ^(nw_listener_state_t state, nw_error_t error) {
s_handle_listener_state_changed_fn(nw_socket, state, error);
Expand Down Expand Up @@ -2433,9 +2455,20 @@ static int s_socket_assign_to_event_loop_fn(struct aws_socket *socket, struct aw
socket->io_handle.data.handle,
(void *)event_loop);

if (aws_event_loop_connect_handle_to_io_completion_port(event_loop, &socket->io_handle)) {
if (s_set_event_loop(socket, event_loop)) {
AWS_LOGF_ERROR(
AWS_LS_IO_SOCKET,
"id=%p handle=%p: assigning event loop %p failed. Invalid event loop. It is likely the event loop "
"does not has a parent event loop group.",
(void *)socket,
socket->io_handle.data.handle,
(void *)event_loop);
aws_raise_error(AWS_IO_SOCKET_INVALID_OPTIONS);
return AWS_IO_SOCKET_INVALID_OPTIONS;
}

AWS_LOGF_DEBUG(
if (aws_event_loop_connect_handle_to_io_completion_port(event_loop, &socket->io_handle)) {
AWS_LOGF_ERROR(
AWS_LS_IO_SOCKET,
"id=%p handle=%p: assigning event loop %p failed",
(void *)socket,
Expand All @@ -2444,7 +2477,6 @@ static int s_socket_assign_to_event_loop_fn(struct aws_socket *socket, struct aw
return AWS_OP_ERR;
}

s_set_event_loop(socket, event_loop);
nw_connection_start(socket->io_handle.data.handle);
return AWS_OP_SUCCESS;
}
Expand Down
15 changes: 10 additions & 5 deletions source/event_loop.c
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,16 @@ void aws_event_loop_group_release(struct aws_event_loop_group *el_group) {
}
}

struct aws_event_loop_group *aws_event_loop_group_acquire_from_event_loop(struct aws_event_loop *event_loop) {
AWS_ASSERT(event_loop);
return aws_event_loop_group_acquire(event_loop->base_elg);
}

void aws_event_loop_group_release_from_event_loop(struct aws_event_loop *event_loop) {
AWS_ASSERT(event_loop);
aws_event_loop_group_release(event_loop->base_elg);
}

size_t aws_event_loop_group_get_loop_count(const struct aws_event_loop_group *el_group) {
return aws_array_list_length(&el_group->event_loops);
}
Expand Down Expand Up @@ -655,11 +665,6 @@ void aws_event_loop_free_io_event_resources(struct aws_event_loop *event_loop, s
event_loop->vtable->free_io_event_resources(handle->additional_data);
}

void *get_base_event_loop_group(struct aws_event_loop *event_loop) {
AWS_ASSERT(event_loop && event_loop->vtable->get_base_event_loop_group);
return event_loop->vtable->get_base_event_loop_group(event_loop);
}

bool aws_event_loop_thread_is_callers_thread(struct aws_event_loop *event_loop) {
AWS_ASSERT(event_loop->vtable && event_loop->vtable->is_on_callers_thread);
return event_loop->vtable->is_on_callers_thread(event_loop);
Expand Down
11 changes: 1 addition & 10 deletions source/linux/epoll_event_loop.c
Original file line number Diff line number Diff line change
Expand Up @@ -68,15 +68,6 @@ static int s_subscribe_to_io_events(
void *user_data);
static int s_unsubscribe_from_io_events(struct aws_event_loop *event_loop, struct aws_io_handle *handle);
static void s_free_io_event_resources(void *user_data);
static void *s_get_base_event_loop_group(struct aws_event_loop *event_loop) {
(void)event_loop;
AWS_LOGF_ERROR(
AWS_LS_IO_EVENT_LOOP,
"id=%p: get_base_event_loop_group() is not supported using Epoll Event Loops",
(void *)event_loop);
aws_raise_error(AWS_ERROR_PLATFORM_NOT_SUPPORTED);
return NULL;
}
static bool s_is_on_callers_thread(struct aws_event_loop *event_loop);

static void aws_event_loop_thread(void *args);
Expand All @@ -94,7 +85,6 @@ static struct aws_event_loop_vtable s_vtable = {
.subscribe_to_io_events = s_subscribe_to_io_events,
.unsubscribe_from_io_events = s_unsubscribe_from_io_events,
.free_io_event_resources = s_free_io_event_resources,
.get_base_event_loop_group = s_get_base_event_loop_group,
.is_on_callers_thread = s_is_on_callers_thread,
};

Expand Down Expand Up @@ -218,6 +208,7 @@ struct aws_event_loop *aws_event_loop_new_with_epoll(

loop->impl_data = epoll_loop;
loop->vtable = &s_vtable;
loop->base_elg = options->parent_elg;

return loop;

Expand Down
11 changes: 1 addition & 10 deletions source/windows/iocp/iocp_event_loop.c
Original file line number Diff line number Diff line change
Expand Up @@ -124,15 +124,6 @@ static int s_subscribe_to_io_events(
}
static int s_unsubscribe_from_io_events(struct aws_event_loop *event_loop, struct aws_io_handle *handle);
static void s_free_io_event_resources(void *user_data);
static void *s_get_base_event_loop_group(struct aws_event_loop *event_loop) {
(void)event_loop;
AWS_LOGF_ERROR(
AWS_LS_IO_EVENT_LOOP,
"id=%p: get_base_event_loop_group() is not supported using IOCP Event Loops",
(void *)event_loop);
aws_raise_error(AWS_ERROR_PLATFORM_NOT_SUPPORTED);
return NULL;
}
static void aws_event_loop_thread(void *user_data);

void aws_overlapped_init(
Expand Down Expand Up @@ -169,7 +160,6 @@ struct aws_event_loop_vtable s_iocp_vtable = {
.subscribe_to_io_events = s_subscribe_to_io_events,
.unsubscribe_from_io_events = s_unsubscribe_from_io_events,
.free_io_event_resources = s_free_io_event_resources,
.get_base_event_loop_group = s_get_base_event_loop_group,
.is_on_callers_thread = s_is_event_thread,
};

Expand Down Expand Up @@ -272,6 +262,7 @@ struct aws_event_loop *aws_event_loop_new_with_iocp(
event_loop->impl_data = impl;

event_loop->vtable = &s_iocp_vtable;
event_loop->base_elg = options->parent_elg;

return event_loop;

Expand Down
1 change: 1 addition & 0 deletions tests/vcc/new_destroy.c
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ struct aws_event_loop *aws_event_loop_new_with_epoll(

loop->impl_data = epoll_loop;
loop->vtable = &s_vtable;
loop->base_elg = options->parent_elg;

_(wrap(&epoll_loop->task_pre_queue.head))
_(wrap(&epoll_loop->task_pre_queue.tail))
Expand Down