diff --git a/CMakeLists.txt b/CMakeLists.txt index ae9e52d..331c0f7 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -36,6 +36,7 @@ target_sources( src/queue.h src/link.h src/io.h + src/drain_thread.c src/udx.c ) diff --git a/include/udx.h b/include/udx.h index 088486e..cc93cdd 100644 --- a/include/udx.h +++ b/include/udx.h @@ -57,6 +57,8 @@ extern "C" { #define UDX_DEBUG_FORCE_DROP_PROBES 0x02 #define UDX_DEBUG_FORCE_DROP_DATA 0x04 +// #define USE_DRAIN_THREAD // experimental + typedef struct { uint32_t seq; } udx_cirbuf_val_t; @@ -109,6 +111,68 @@ typedef void (*udx_lookup_cb)(udx_lookup_t *handle, int status, const struct soc typedef void (*udx_interface_event_cb)(udx_interface_event_t *handle, int status); typedef void (*udx_interface_event_close_cb)(udx_interface_event_t *handle); +#ifdef USE_DRAIN_THREAD + +#include + +typedef struct { + udx_socket_t *socket; + struct sockaddr_storage addr; + uint16_t len; + char buffer[2048]; +} udx__drain_slot_t; + +typedef struct udx_reader_s { + atomic_int status; + uv_thread_t thread_id; + uv_loop_t loop; + + // signals main->main + uv_async_t launch_thread; + + // signals sub->main + uv_async_t signal_drain; + uv_async_t signal_thread_stopped; + + // signals main->sub + uv_async_t signal_control; + void * _Atomic ctrl_queue; + + udx__drain_slot_t *buffer; + uint16_t buffer_len; + + struct { + atomic_int read; + atomic_int drained; + } cursors; + + int64_t perf_load; + int64_t perf_ndrains; +} udx_reader_t; + +int +udx__drainer_init (udx_t *udx); + +int +udx__drainer_destroy (udx_t *udx); + +int +udx__drainer_socket_init (udx_socket_t *socket); + +int +udx__drainer_socket_stop (udx_socket_t *socket); + +void +udx__drainer__on_packet(udx__drain_slot_t *slot); + +void +udx__drainer__on_poll_stop (udx_socket_t *socket); + +void +udx__drainer__on_thread_stop (); + +#endif // USE_DRAIN_THREAD + struct udx_s { uv_loop_t *loop; @@ -133,6 +197,11 @@ struct udx_s { uint64_t packets_tx; int64_t packets_dropped_by_kernel; + +#ifdef USE_DRAIN_THREAD + udx_reader_t thread; + int64_t packets_dropped_by_thread; +#endif }; struct udx_queue_node_s { @@ -179,6 +248,13 @@ struct udx_socket_s { uint64_t packets_tx; int64_t packets_dropped_by_kernel; + +#ifdef USE_DRAIN_THREAD + uv_poll_t drain_poll; + bool drain_poll_initialized; + uv_async_t signal_poll_stopped; + int64_t packets_dropped_by_thread; +#endif }; typedef struct udx_cong_s { diff --git a/src/drain_thread.c b/src/drain_thread.c new file mode 100644 index 0000000..797fc57 --- /dev/null +++ b/src/drain_thread.c @@ -0,0 +1,345 @@ +#include +#include +#include +#include + +#include "../include/udx.h" + +#ifdef USE_DRAIN_THREAD + +#include "debug.h" +#include "io.h" +#include "internal.h" + +// upfront allocated receive buffer size in amount of packets +// size = N_SLOTS * sizeof(udx__drain_slot_t) +#define N_SLOTS 2048 // ~4MB + +enum thread_status { + STOPPED = 0, + INITIALIZED = 1, + RUNNING = 2, +}; + +enum command_id { + SOCKET_INIT = 0, + SOCKET_REMOVE, + THREAD_STOP +}; + +typedef struct command_s { + enum command_id type; + void *data; + struct command_s *next; +} command_t; + +static void +on_drain (uv_async_t *signal) { + udx_t *udx = signal->data; + udx_reader_t *thread = &udx->thread; + + thread->perf_load += (thread->buffer_len + thread->cursors.read - thread->cursors.drained) % thread->buffer_len; + thread->perf_ndrains++; + + udx__drain_slot_t *slot; + + while (thread->cursors.drained != thread->cursors.read) { + slot = &thread->buffer[thread->cursors.drained]; + udx__drainer__on_packet(slot); + thread->cursors.drained = (thread->cursors.drained + 1) % thread->buffer_len; + } +} + +static int +thread_update_read_poll (udx_socket_t *socket); + +static void +thread_on_uv_read_poll (uv_poll_t *handle, int status, int events) { + UDX_UNUSED(status); + udx_socket_t *socket = handle->data; + + udx_t *udx = socket->udx; + udx_reader_t *thread = &udx->thread; + + if (!(events & UV_READABLE)) goto reset_poll; + + ssize_t size; + int current; + udx__drain_slot_t *slot; + uv_buf_t buf; + + // drain socket buffers + do { + if (socket->status & UDX_SOCKET_CLOSED) break; + + current = thread->cursors.read; + slot = &thread->buffer[current]; + + slot->socket = socket; + memset(&slot->addr, 0, sizeof(slot->addr)); + buf.base = (char *) &slot->buffer; + buf.len = sizeof(slot->buffer); + + size = udx__recvmsg(socket, &buf, (struct sockaddr *) &slot->addr, sizeof(slot->addr)); + if (size < 0) break; + + slot->len = size; + + int err = uv_async_send(&thread->signal_drain); + assert(err == 0); + + int next = (thread->cursors.read + 1) % thread->buffer_len; + + if (thread->cursors.drained == next) { + udx->packets_dropped_by_thread++; + socket->packets_dropped_by_thread++; + continue; + } + + thread->cursors.read = next; + } while(1); + +reset_poll: + assert(thread_update_read_poll(socket) == 0); +} + +static int +thread_update_read_poll (udx_socket_t *socket) { + if (socket->status & UDX_SOCKET_CLOSED) return 0; + + uv_poll_t *poll = &socket->drain_poll; + return uv_poll_start(poll, UV_READABLE, thread_on_uv_read_poll); +} + +static void +_on_poll_stopped_main (uv_async_t *signal) { // main loop + debug_printf("drain thread=%zu socket poll stopped\n", uv_thread_self()); + udx_socket_t *socket = signal->data; + + uv_close((uv_handle_t *) signal, NULL); // close the jump signal + + udx__drainer__on_poll_stop(socket); // return to udx.c +} + +static void +thread_read_poll_start (udx_socket_t *socket) { // aka drainer_socket_init + int err = 0; + + assert(socket->udx->thread.status == RUNNING); + assert(socket->drain_poll_initialized == false); + + if (socket->status & UDX_SOCKET_CLOSED) return; + + udx_t *udx = socket->udx; + uv_loop_t *subloop = &udx->thread.loop; + + uv_poll_t *poll = &socket->drain_poll; + uv_os_fd_t fd; + + err = uv_fileno((const uv_handle_t *) &socket->handle, &fd); + assert(err == 0); + + debug_printf("drain thread=%zu socket poll fd=%i start\n", uv_thread_self(), fd); + + err = uv_poll_init_socket(subloop, poll, (uv_os_sock_t) fd); + assert(err == 0); + + poll->data = socket; + + err = thread_update_read_poll(socket); + assert(err == 0); + + socket->drain_poll_initialized = true; +} + +static void +thread_on_uv_poll_close (uv_handle_t *handle) { // sub loop + udx_socket_t *socket = handle->data; + int err = uv_async_send(&socket->signal_poll_stopped); + assert(err == 0); +} + +static inline void +thread_read_poll_stop (udx_socket_t *socket) { // sub loop + int err; + + if (!socket->drain_poll_initialized) { + err = uv_async_send(&socket->signal_poll_stopped); + assert(err == 0); + return; + } + + err = uv_poll_stop(&socket->drain_poll); + assert(err == 0); + + uv_close((uv_handle_t *) &socket->drain_poll, thread_on_uv_poll_close); +} + +static void +thread_on_control (uv_async_t *signal) { + udx_t *udx = signal->data; + + command_t *head = atomic_exchange(&udx->thread.ctrl_queue, NULL); + assert(head != NULL); + + // process queue + while (head != NULL) { + switch (head->type) { + case SOCKET_INIT: + thread_read_poll_start(head->data); + break; + + case SOCKET_REMOVE: + thread_read_poll_stop(head->data); + break; + + case THREAD_STOP: + // close control handle that keeps subloop active. + uv_close((uv_handle_t *) &udx->thread.signal_control, NULL); + break; + + default: + assert(0); + } + + command_t *prev = head; + head = prev->next; + free(prev); + } +} + +static inline int +run_command (udx_t *udx, enum command_id type, void *data) { + command_t *cmd = malloc(sizeof(command_t)); + cmd->type = type; + cmd->data = data; + cmd->next = NULL; + + command_t *head = atomic_load(&udx->thread.ctrl_queue); + + if (head == NULL) { + atomic_store(&udx->thread.ctrl_queue, cmd); + } else { + while (head->next != NULL) head = head->next; + head->next = cmd; + } + + if (udx->thread.status != RUNNING) { + return 0; + } else { + return uv_async_send(&udx->thread.signal_control); + } +} + +static void +on_thread_stop(uv_async_t *signal) { // main loop + udx_t *udx = signal->data; + + assert(udx->thread.status == RUNNING); + udx->thread.status = STOPPED; + + uv_close((uv_handle_t *) signal, NULL); // release main loop +} + +static void +reader_thread (void *data) { + debug_printf("drain thread=%zu start\n", uv_thread_self()); + udx_t *udx = data; + int err; + + uv_loop_t *subloop = &udx->thread.loop; + + err = uv_loop_init(subloop); + assert(err == 0); + + err = uv_async_init(subloop, &udx->thread.signal_control, thread_on_control); + assert(err == 0); + udx->thread.signal_control.data = udx; + + + udx->thread.buffer = malloc(sizeof(udx__drain_slot_t) * N_SLOTS); + udx->thread.buffer_len = N_SLOTS; + + udx->thread.status = RUNNING; + + err = uv_async_send(&udx->thread.signal_control); // queue pending cmds + assert(err == 0); + + err = uv_run(subloop, UV_RUN_DEFAULT); + assert(err == 0); + + free(udx->thread.buffer); + + debug_printf("drain thread=%zu exit\n", uv_thread_self()); + + err = uv_async_send(&udx->thread.signal_thread_stopped); // notify main + assert(err == 0); +} + +static void launch_thread (uv_async_t *signal) { + udx_t *udx = signal->data; + uv_close((uv_handle_t *) signal, NULL); + + int err = uv_thread_create(&udx->thread.thread_id, reader_thread, udx); + assert(err == 0); + + debug_printf("thread launched, id=%zu loop=%p handles{ drain=%p, ctrl=%p }\n", udx->thread.thread_id, &udx->thread.loop, &udx->thread.signal_drain, &udx->thread.signal_control); +} + +/// exports + +int +udx__drainer_init(udx_t *udx) { + if (udx->thread.status != STOPPED) return 0; // do nothing + int err; + uv_loop_t *loop = udx->loop; + + err = uv_async_init(loop, &udx->thread.signal_drain, on_drain); + assert(err == 0); + udx->thread.signal_drain.data = udx; + + err = uv_async_init(loop, &udx->thread.signal_thread_stopped, on_thread_stop); + assert(err == 0); + udx->thread.signal_thread_stopped.data = udx; + + // queue thread start onto main loop (don't spin up subloop before main loop starts) + err = uv_async_init(loop, &udx->thread.launch_thread, launch_thread); // mainloop + assert(err == 0); + udx->thread.launch_thread.data = udx; + + udx->thread.status = INITIALIZED; + + uv_async_send(&udx->thread.launch_thread); + + return 0; +} + +int +udx__drainer_socket_init (udx_socket_t *socket) { + udx_t *udx = socket->udx; + uv_os_fd_t fd; + assert(0 == uv_fileno((uv_handle_t *) &socket->handle, &fd)); + + // initialize poll close signal + int err = uv_async_init(udx->loop, &socket->signal_poll_stopped, _on_poll_stopped_main); + assert(err == 0); + socket->signal_poll_stopped.data = socket; + + return run_command(udx, SOCKET_INIT, socket); +} + +int +udx__drainer_socket_stop (udx_socket_t *socket) { + udx_t *udx = socket->udx; + return run_command(udx, SOCKET_REMOVE, socket); +} + +int +udx__drainer_destroy (udx_t *udx) { + if (udx->thread.status == STOPPED) return 0; + + uv_close((uv_handle_t *) &udx->thread.signal_drain, NULL); + + return run_command(udx, THREAD_STOP, NULL); +} +#endif // USE_DRAIN_THREAD diff --git a/src/udx.c b/src/udx.c index 6b9a4ac..ad01b55 100644 --- a/src/udx.c +++ b/src/udx.c @@ -163,6 +163,10 @@ ref_dec (udx_t *udx) { udx->has_streams = false; } +#ifdef USE_DRAIN_THREAD + udx__drainer_destroy(udx); +#endif + if (udx->on_idle != NULL) { udx->on_idle(udx); } @@ -229,7 +233,12 @@ update_poll (udx_socket_t *socket) { assert(!uv_is_active((uv_handle_t *) &socket->io_poll)); return 0; } - int events = UV_READABLE; + + int events = 0; + +#ifndef USE_DRAIN_THREAD + events = UV_READABLE; +#endif if (socket_write_wanted(socket)) { events |= UV_WRITABLE; @@ -2025,6 +2034,10 @@ udx_teardown (udx_t *udx) { udx_stream_destroy(stream); } +#ifdef USE_DRAIN_THREAD + udx__drainer_destroy(udx); +#endif + udx__link_foreach(udx->listeners, listener) { udx_interface_event_close(listener); } @@ -2034,6 +2047,10 @@ int udx_socket_init (udx_t *udx, udx_socket_t *socket, udx_socket_close_cb cb) { if (udx->teardown) return UV_EINVAL; +#ifdef USE_DRAIN_THREAD + assert(udx__drainer_init(udx) == 0); +#endif + udx->refs++; udx__link_add(udx->sockets, socket); @@ -2148,6 +2165,11 @@ udx_socket_bind (udx_socket_t *socket, const struct sockaddr *addr, unsigned int err = uv_poll_init_socket(socket->udx->loop, poll, (uv_os_sock_t) fd); assert(err == 0); +#ifdef USE_DRAIN_THREAD + err = udx__drainer_socket_init(socket); + assert(err == 0); +#endif + err = udx__udp_set_rxq_ovfl((uv_os_sock_t) fd); if (!err) { socket->cmsg_wanted = true; @@ -2281,6 +2303,11 @@ udx_socket_close (udx_socket_t *socket) { socket->pending_closes++; uv_poll_stop(&(socket->io_poll)); uv_close((uv_handle_t *) &(socket->io_poll), on_uv_close); + +#ifdef USE_DRAIN_THREAD + socket->pending_closes++; // also close separate read poll + udx__drainer_socket_stop(socket); +#endif } socket->pending_closes++; @@ -2970,3 +2997,27 @@ udx_interface_event_close (udx_interface_event_t *handle) { return 0; } + +#ifdef USE_DRAIN_THREAD +void +udx__drainer__on_poll_stop (udx_socket_t *socket) { // identical to on_uv_close(handle) ? + trigger_socket_close(socket); +} + +void udx__drainer__on_packet(udx__drain_slot_t *slot) { + udx_socket_t *socket = slot->socket; + struct sockaddr_storage *addr = &slot->addr; + if (!process_packet(socket, slot->buffer, slot->len, (struct sockaddr *) addr) && socket->on_recv != NULL) { + if (is_addr_v4_mapped((struct sockaddr *) addr)) { + addr_to_v4((struct sockaddr_in6 *) addr); + } + + uv_buf_t buf = { + .base = slot->buffer, + .len = slot->len + }; + socket->on_recv(socket, slot->len, &buf, (struct sockaddr *) addr); + } +} +#endif +