Skip to content

Commit db7a1bd

Browse files
bretambroseBret Ambrose
andauthored
Serialized scheduling (#754)
Co-authored-by: Bret Ambrose <[email protected]>
1 parent 300e8b4 commit db7a1bd

File tree

9 files changed

+411
-64
lines changed

9 files changed

+411
-64
lines changed

.github/workflows/proof-alarm.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ jobs:
1616
- name: Check
1717
run: |
1818
TMPFILE=$(mktemp)
19-
echo "5109c3b2748a98621ebdc05756fdfa51 source/linux/epoll_event_loop.c" > $TMPFILE
19+
echo "42e574284e4e0c4af33bf3ab16114bcc source/linux/epoll_event_loop.c" > $TMPFILE
2020
md5sum --check $TMPFILE
2121
2222
# No further steps if successful

include/aws/io/event_loop.h

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ struct aws_event_loop_vtable {
3535
int (*stop)(struct aws_event_loop *event_loop);
3636
int (*wait_for_stop_completion)(struct aws_event_loop *event_loop);
3737
void (*schedule_task_now)(struct aws_event_loop *event_loop, struct aws_task *task);
38+
void (*schedule_task_now_serialized)(struct aws_event_loop *event_loop, struct aws_task *task);
3839
void (*schedule_task_future)(struct aws_event_loop *event_loop, struct aws_task *task, uint64_t run_at_nanos);
3940
void (*cancel_task)(struct aws_event_loop *event_loop, struct aws_task *task);
4041
int (*connect_to_io_completion_port)(struct aws_event_loop *event_loop, struct aws_io_handle *handle);
@@ -116,6 +117,18 @@ AWS_EXTERN_C_BEGIN
116117
AWS_IO_API
117118
void aws_event_loop_schedule_task_now(struct aws_event_loop *event_loop, struct aws_task *task);
118119

120+
/**
121+
* Variant of aws_event_loop_schedule_task_now that forces all tasks to go through the cross thread task queue,
122+
* guaranteeing that order-of-submission is order-of-execution. If you need this guarantee, you must use this
123+
* function; the base function contains short-circuiting logic that breaks ordering invariants. Beyond that, all
124+
* properties of aws_event_loop_schedule_task_now apply to this function as well.
125+
*
126+
* Serialization guarantee does not apply to task cancellation (which can occur out-of-order or even out-of-thread in
127+
* certain cases).
128+
*/
129+
AWS_IO_API
130+
void aws_event_loop_schedule_task_now_serialized(struct aws_event_loop *event_loop, struct aws_task *task);
131+
119132
/**
120133
* The event loop will schedule the task and run it at the specified time.
121134
* Use aws_event_loop_current_clock_time() to query the current time in nanoseconds.

source/bsd/kqueue_event_loop.c

Lines changed: 36 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ static int s_run(struct aws_event_loop *event_loop);
3131
static int s_stop(struct aws_event_loop *event_loop);
3232
static int s_wait_for_stop_completion(struct aws_event_loop *event_loop);
3333
static void s_schedule_task_now(struct aws_event_loop *event_loop, struct aws_task *task);
34+
static void s_schedule_task_now_serialized(struct aws_event_loop *event_loop, struct aws_task *task);
3435
static void s_schedule_task_future(struct aws_event_loop *event_loop, struct aws_task *task, uint64_t run_at_nanos);
3536
static void s_cancel_task(struct aws_event_loop *event_loop, struct aws_task *task);
3637
static int s_connect_to_io_completion_port(struct aws_event_loop *event_loop, struct aws_io_handle *handle) {
@@ -134,6 +135,7 @@ struct aws_event_loop_vtable s_kqueue_vtable = {
134135
.stop = s_stop,
135136
.wait_for_stop_completion = s_wait_for_stop_completion,
136137
.schedule_task_now = s_schedule_task_now,
138+
.schedule_task_now_serialized = s_schedule_task_now_serialized,
137139
.schedule_task_future = s_schedule_task_future,
138140
.cancel_task = s_cancel_task,
139141
.connect_to_io_completion_port = s_connect_to_io_completion_port,
@@ -465,29 +467,12 @@ static int s_wait_for_stop_completion(struct aws_event_loop *event_loop) {
465467
return AWS_OP_SUCCESS;
466468
}
467469

468-
/* Common functionality for "now" and "future" task scheduling.
469-
* If `run_at_nanos` is zero then the task is scheduled as a "now" task. */
470-
static void s_schedule_task_common(struct aws_event_loop *event_loop, struct aws_task *task, uint64_t run_at_nanos) {
471-
AWS_ASSERT(task);
470+
static void s_schedule_task_cross_thread(
471+
struct aws_event_loop *event_loop,
472+
struct aws_task *task,
473+
uint64_t run_at_nanos) {
472474
struct kqueue_loop *impl = event_loop->impl_data;
473475

474-
/* If we're on the event-thread, just schedule it directly */
475-
if (s_is_event_thread(event_loop)) {
476-
AWS_LOGF_TRACE(
477-
AWS_LS_IO_EVENT_LOOP,
478-
"id=%p: scheduling task %p in-thread for timestamp %llu",
479-
(void *)event_loop,
480-
(void *)task,
481-
(unsigned long long)run_at_nanos);
482-
if (run_at_nanos == 0) {
483-
aws_task_scheduler_schedule_now(&impl->thread_data.scheduler, task);
484-
} else {
485-
aws_task_scheduler_schedule_future(&impl->thread_data.scheduler, task, run_at_nanos);
486-
}
487-
return;
488-
}
489-
490-
/* Otherwise, add it to cross_thread_data.tasks_to_schedule and signal the event-thread to process it */
491476
AWS_LOGF_TRACE(
492477
AWS_LS_IO_EVENT_LOOP,
493478
"id=%p: scheduling task %p cross-thread for timestamp %llu",
@@ -515,10 +500,40 @@ static void s_schedule_task_common(struct aws_event_loop *event_loop, struct aws
515500
}
516501
}
517502

503+
/* Common functionality for "now" and "future" task scheduling.
504+
* If `run_at_nanos` is zero then the task is scheduled as a "now" task. */
505+
static void s_schedule_task_common(struct aws_event_loop *event_loop, struct aws_task *task, uint64_t run_at_nanos) {
506+
AWS_ASSERT(task);
507+
struct kqueue_loop *impl = event_loop->impl_data;
508+
509+
/* If we're on the event-thread, just schedule it directly */
510+
if (s_is_event_thread(event_loop)) {
511+
AWS_LOGF_TRACE(
512+
AWS_LS_IO_EVENT_LOOP,
513+
"id=%p: scheduling task %p in-thread for timestamp %llu",
514+
(void *)event_loop,
515+
(void *)task,
516+
(unsigned long long)run_at_nanos);
517+
if (run_at_nanos == 0) {
518+
aws_task_scheduler_schedule_now(&impl->thread_data.scheduler, task);
519+
} else {
520+
aws_task_scheduler_schedule_future(&impl->thread_data.scheduler, task, run_at_nanos);
521+
}
522+
return;
523+
}
524+
525+
/* Otherwise, add it to cross_thread_data.tasks_to_schedule and signal the event-thread to process it */
526+
s_schedule_task_cross_thread(event_loop, task, run_at_nanos);
527+
}
528+
518529
static void s_schedule_task_now(struct aws_event_loop *event_loop, struct aws_task *task) {
519530
s_schedule_task_common(event_loop, task, 0); /* Zero is used to denote "now" tasks */
520531
}
521532

533+
static void s_schedule_task_now_serialized(struct aws_event_loop *event_loop, struct aws_task *task) {
534+
s_schedule_task_cross_thread(event_loop, task, 0); /* Zero is used to denote "now" tasks */
535+
}
536+
522537
static void s_schedule_task_future(struct aws_event_loop *event_loop, struct aws_task *task, uint64_t run_at_nanos) {
523538
s_schedule_task_common(event_loop, task, run_at_nanos);
524539
}

source/darwin/dispatch_queue_event_loop.c

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,8 @@ static struct aws_event_loop_vtable s_vtable = {
7373
.stop = s_stop,
7474
.wait_for_stop_completion = s_wait_for_stop_completion,
7575
.schedule_task_now = s_schedule_task_now,
76+
/* dispatch queue event loop impl does not have any short-circuiting, so just use the base scheduling logic */
77+
.schedule_task_now_serialized = s_schedule_task_now,
7678
.schedule_task_future = s_schedule_task_future,
7779
.cancel_task = s_cancel_task,
7880
.connect_to_io_completion_port = s_connect_to_io_completion_port,

source/event_loop.c

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -625,6 +625,12 @@ void aws_event_loop_schedule_task_now(struct aws_event_loop *event_loop, struct
625625
event_loop->vtable->schedule_task_now(event_loop, task);
626626
}
627627

628+
void aws_event_loop_schedule_task_now_serialized(struct aws_event_loop *event_loop, struct aws_task *task) {
629+
AWS_ASSERT(task);
630+
AWS_ASSERT(event_loop->vtable && event_loop->vtable->schedule_task_now_serialized);
631+
event_loop->vtable->schedule_task_now_serialized(event_loop, task);
632+
}
633+
628634
void aws_event_loop_schedule_task_future(
629635
struct aws_event_loop *event_loop,
630636
struct aws_task *task,

source/linux/epoll_event_loop.c

Lines changed: 34 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ static int s_run(struct aws_event_loop *event_loop);
5050
static int s_stop(struct aws_event_loop *event_loop);
5151
static int s_wait_for_stop_completion(struct aws_event_loop *event_loop);
5252
static void s_schedule_task_now(struct aws_event_loop *event_loop, struct aws_task *task);
53+
static void s_schedule_task_now_serialized(struct aws_event_loop *event_loop, struct aws_task *task);
5354
static void s_schedule_task_future(struct aws_event_loop *event_loop, struct aws_task *task, uint64_t run_at_nanos);
5455
static void s_cancel_task(struct aws_event_loop *event_loop, struct aws_task *task);
5556
static int s_connect_to_io_completion_port(struct aws_event_loop *event_loop, struct aws_io_handle *handle) {
@@ -79,6 +80,7 @@ static struct aws_event_loop_vtable s_vtable = {
7980
.stop = s_stop,
8081
.wait_for_stop_completion = s_wait_for_stop_completion,
8182
.schedule_task_now = s_schedule_task_now,
83+
.schedule_task_now_serialized = s_schedule_task_now_serialized,
8284
.schedule_task_future = s_schedule_task_future,
8385
.cancel_task = s_cancel_task,
8486
.connect_to_io_completion_port = s_connect_to_io_completion_port,
@@ -341,27 +343,12 @@ static int s_wait_for_stop_completion(struct aws_event_loop *event_loop) {
341343
return result;
342344
}
343345

344-
static void s_schedule_task_common(struct aws_event_loop *event_loop, struct aws_task *task, uint64_t run_at_nanos) {
346+
static void s_schedule_task_cross_thread(
347+
struct aws_event_loop *event_loop,
348+
struct aws_task *task,
349+
uint64_t run_at_nanos) {
345350
struct epoll_loop *epoll_loop = event_loop->impl_data;
346351

347-
/* if event loop and the caller are the same thread, just schedule and be done with it. */
348-
if (s_is_on_callers_thread(event_loop)) {
349-
AWS_LOGF_TRACE(
350-
AWS_LS_IO_EVENT_LOOP,
351-
"id=%p: scheduling %s task %p in-thread for timestamp %llu",
352-
(void *)event_loop,
353-
task->type_tag,
354-
(void *)task,
355-
(unsigned long long)run_at_nanos);
356-
if (run_at_nanos == 0) {
357-
/* zero denotes "now" task */
358-
aws_task_scheduler_schedule_now(&epoll_loop->scheduler, task);
359-
} else {
360-
aws_task_scheduler_schedule_future(&epoll_loop->scheduler, task, run_at_nanos);
361-
}
362-
return;
363-
}
364-
365352
AWS_LOGF_TRACE(
366353
AWS_LS_IO_EVENT_LOOP,
367354
"id=%p: Scheduling %s task %p cross-thread for timestamp %llu",
@@ -391,10 +378,38 @@ static void s_schedule_task_common(struct aws_event_loop *event_loop, struct aws
391378
aws_mutex_unlock(&epoll_loop->task_pre_queue_mutex);
392379
}
393380

381+
static void s_schedule_task_common(struct aws_event_loop *event_loop, struct aws_task *task, uint64_t run_at_nanos) {
382+
struct epoll_loop *epoll_loop = event_loop->impl_data;
383+
384+
/* if event loop and the caller are the same thread, just schedule and be done with it. */
385+
if (s_is_on_callers_thread(event_loop)) {
386+
AWS_LOGF_TRACE(
387+
AWS_LS_IO_EVENT_LOOP,
388+
"id=%p: scheduling %s task %p in-thread for timestamp %llu",
389+
(void *)event_loop,
390+
task->type_tag,
391+
(void *)task,
392+
(unsigned long long)run_at_nanos);
393+
if (run_at_nanos == 0) {
394+
/* zero denotes "now" task */
395+
aws_task_scheduler_schedule_now(&epoll_loop->scheduler, task);
396+
} else {
397+
aws_task_scheduler_schedule_future(&epoll_loop->scheduler, task, run_at_nanos);
398+
}
399+
return;
400+
}
401+
402+
s_schedule_task_cross_thread(event_loop, task, run_at_nanos);
403+
}
404+
394405
static void s_schedule_task_now(struct aws_event_loop *event_loop, struct aws_task *task) {
395406
s_schedule_task_common(event_loop, task, 0 /* zero denotes "now" task */);
396407
}
397408

409+
static void s_schedule_task_now_serialized(struct aws_event_loop *event_loop, struct aws_task *task) {
410+
s_schedule_task_cross_thread(event_loop, task, 0);
411+
}
412+
398413
static void s_schedule_task_future(struct aws_event_loop *event_loop, struct aws_task *task, uint64_t run_at_nanos) {
399414
s_schedule_task_common(event_loop, task, run_at_nanos);
400415
}

source/windows/iocp/iocp_event_loop.c

Lines changed: 38 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,7 @@ static int s_run(struct aws_event_loop *event_loop);
107107
static int s_stop(struct aws_event_loop *event_loop);
108108
static int s_wait_for_stop_completion(struct aws_event_loop *event_loop);
109109
static void s_schedule_task_now(struct aws_event_loop *event_loop, struct aws_task *task);
110+
static void s_schedule_task_now_serialized(struct aws_event_loop *event_loop, struct aws_task *task);
110111
static void s_schedule_task_future(struct aws_event_loop *event_loop, struct aws_task *task, uint64_t run_at_nanos);
111112
static void s_cancel_task(struct aws_event_loop *event_loop, struct aws_task *task);
112113
static int s_connect_to_io_completion_port(struct aws_event_loop *event_loop, struct aws_io_handle *handle);
@@ -159,6 +160,7 @@ struct aws_event_loop_vtable s_iocp_vtable = {
159160
.stop = s_stop,
160161
.wait_for_stop_completion = s_wait_for_stop_completion,
161162
.schedule_task_now = s_schedule_task_now,
163+
.schedule_task_now_serialized = s_schedule_task_now_serialized,
162164
.schedule_task_future = s_schedule_task_future,
163165
.cancel_task = s_cancel_task,
164166
.connect_to_io_completion_port = s_connect_to_io_completion_port,
@@ -451,30 +453,11 @@ static int s_wait_for_stop_completion(struct aws_event_loop *event_loop) {
451453
return AWS_OP_SUCCESS;
452454
}
453455

454-
/* Common function used by schedule_task_now() and schedule_task_future().
455-
* When run_at_nanos is 0, it's treated as a "now" task.
456-
* Called from any thread */
457-
static void s_schedule_task_common(struct aws_event_loop *event_loop, struct aws_task *task, uint64_t run_at_nanos) {
456+
static void s_schedule_task_cross_thread(
457+
struct aws_event_loop *event_loop,
458+
struct aws_task *task,
459+
uint64_t run_at_nanos) {
458460
struct iocp_loop *impl = event_loop->impl_data;
459-
AWS_ASSERT(impl);
460-
AWS_ASSERT(task);
461-
462-
/* If we're on the event-thread, just schedule it directly */
463-
if (s_is_event_thread(event_loop)) {
464-
AWS_LOGF_TRACE(
465-
AWS_LS_IO_EVENT_LOOP,
466-
"id=%p: scheduling %s task %p in-thread for timestamp %llu",
467-
(void *)event_loop,
468-
task->type_tag,
469-
(void *)task,
470-
(unsigned long long)run_at_nanos);
471-
if (run_at_nanos == 0) {
472-
aws_task_scheduler_schedule_now(&impl->thread_data.scheduler, task);
473-
} else {
474-
aws_task_scheduler_schedule_future(&impl->thread_data.scheduler, task, run_at_nanos);
475-
}
476-
return;
477-
}
478461

479462
AWS_LOGF_TRACE(
480463
AWS_LS_IO_EVENT_LOOP,
@@ -506,11 +489,43 @@ static void s_schedule_task_common(struct aws_event_loop *event_loop, struct aws
506489
}
507490
}
508491

492+
/* Common function used by schedule_task_now() and schedule_task_future().
493+
* When run_at_nanos is 0, it's treated as a "now" task.
494+
* Called from any thread */
495+
static void s_schedule_task_common(struct aws_event_loop *event_loop, struct aws_task *task, uint64_t run_at_nanos) {
496+
struct iocp_loop *impl = event_loop->impl_data;
497+
AWS_ASSERT(impl);
498+
AWS_ASSERT(task);
499+
500+
/* If we're on the event-thread, just schedule it directly */
501+
if (s_is_event_thread(event_loop)) {
502+
AWS_LOGF_TRACE(
503+
AWS_LS_IO_EVENT_LOOP,
504+
"id=%p: scheduling %s task %p in-thread for timestamp %llu",
505+
(void *)event_loop,
506+
task->type_tag,
507+
(void *)task,
508+
(unsigned long long)run_at_nanos);
509+
if (run_at_nanos == 0) {
510+
aws_task_scheduler_schedule_now(&impl->thread_data.scheduler, task);
511+
} else {
512+
aws_task_scheduler_schedule_future(&impl->thread_data.scheduler, task, run_at_nanos);
513+
}
514+
return;
515+
}
516+
517+
s_schedule_task_cross_thread(event_loop, task, run_at_nanos);
518+
}
519+
509520
/* Called from any thread */
510521
static void s_schedule_task_now(struct aws_event_loop *event_loop, struct aws_task *task) {
511522
s_schedule_task_common(event_loop, task, 0 /* use zero to denote it's a "now" task */);
512523
}
513524

525+
static void s_schedule_task_now_serialized(struct aws_event_loop *event_loop, struct aws_task *task) {
526+
s_schedule_task_cross_thread(event_loop, task, 0 /* use zero to denote it's a "now" task */);
527+
}
528+
514529
/* Called from any thread */
515530
static void s_schedule_task_future(struct aws_event_loop *event_loop, struct aws_task *task, uint64_t run_at_nanos) {
516531
s_schedule_task_common(event_loop, task, run_at_nanos);

tests/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ add_test_case(event_loop_epoll_creation)
5959
add_test_case(event_loop_iocp_creation)
6060
add_test_case(event_loop_kqueue_creation)
6161
add_test_case(event_loop_dispatch_queue_creation)
62+
add_test_case(event_loop_serialized_scheduling)
6263

6364
add_test_case(io_testing_channel)
6465

0 commit comments

Comments
 (0)