diff --git a/include/udx.h b/include/udx.h index 7b52293..22efb61 100644 --- a/include/udx.h +++ b/include/udx.h @@ -96,6 +96,7 @@ typedef void (*udx_stream_send_cb)(udx_stream_send_t *req, int status); typedef void (*udx_stream_recv_cb)(udx_stream_t *stream, ssize_t read_len, const uv_buf_t *buf); typedef void (*udx_stream_close_cb)(udx_stream_t *stream, int status); typedef void (*udx_stream_finalize_cb)(udx_stream_t *stream); +typedef uint32_t (*udx_stream_get_read_buffer_size_cb)(udx_stream_t *stream); typedef void (*udx_lookup_cb)(udx_lookup_t *handle, int status, const struct sockaddr *addr, int addr_len); @@ -222,6 +223,7 @@ struct udx_stream_s { udx_stream_drain_cb on_drain; udx_stream_close_cb on_close; udx_stream_finalize_cb on_finalize; + udx_stream_get_read_buffer_size_cb get_read_buffer_size; // mtu. RFC8899 5.1.1 and 5.1.3 int mtu_state; // MTU_STATE_* @@ -268,8 +270,8 @@ struct udx_stream_s { uint32_t ssthresh; uint32_t cwnd; // packets uint32_t cwnd_cnt; - uint32_t recv_rwnd; // tcp: rcv.wnd. bytes - uint32_t send_rwnd; // remote advertised rwnd + uint32_t send_rwnd; // remote advertised rwnd + uint32_t recv_rwnd_max; // default: UDX_DEFAULT_RWND_MAX uint32_t send_wl1; // seq at last window update uint32_t send_wl2; // ack at last window update @@ -460,6 +462,12 @@ udx_stream_get_ack (udx_stream_t *stream, uint32_t *ack); int udx_stream_set_ack (udx_stream_t *stream, uint32_t ack); +int +udx_stream_get_rwnd_max (udx_stream_t *stream, uint32_t *rwnd_max); + +int +udx_stream_set_rwnd_max (udx_stream_t *stream, uint32_t rwnd_max); + int udx_stream_connect (udx_stream_t *stream, udx_socket_t *socket, uint32_t remote_id, const struct sockaddr *remote_addr); diff --git a/src/udx.c b/src/udx.c index 08538f7..1297e78 100644 --- a/src/udx.c +++ b/src/udx.c @@ -45,7 +45,7 @@ #define UDX_CONG_MAX_CWND 65536 #define UDX_RTO_MAX_MS 30000 #define UDX_RTT_MAX_MS 30000 -#define UDX_INIT_RWND_BYTES 131072 // arbitrary, ~90 1500 mtu packets, 52mbits/sec at 20millis latency +#define UDX_DEFAULT_RWND_MAX (256 * 1024) // arbitrary, ~175 1500 mtu packets, @20ms latency = 104 mbits/sec #define UDX_HIGH_WATERMARK 262144 @@ -480,6 +480,25 @@ clear_outgoing_packets (udx_stream_t *stream) { } } +// returns the rwnd to advertise to the sender +// to provide a rwnd value the user provides two values +// 1. a maximum buffer size. default is UDX_DEFAULT_RWND_MAX +// 2. a callback to return the number of bytes already in the buffer. +// the window is then set to the + +static uint32_t +get_recv_rwnd (udx_stream_t *stream) { + uint32_t bufsize = 0; + if (stream->get_read_buffer_size) { + bufsize = stream->get_read_buffer_size(stream); + } + if (stream->recv_rwnd_max > bufsize) { + return stream->recv_rwnd_max - bufsize; + } else { + return 0; + } +} + static void init_stream_packet (udx_packet_t *pkt, int type, udx_stream_t *stream, const uv_buf_t *userbufs, int nuserbufs) { uint8_t *b = (uint8_t *) &(pkt->header); @@ -495,7 +514,7 @@ init_stream_packet (udx_packet_t *pkt, int type, udx_stream_t *stream, const uv_ // 32 bit (le) remote id *(i++) = udx__swap_uint32_if_be(stream->remote_id); // 32 bit (le) recv window - *(i++) = udx__swap_uint32_if_be(stream->recv_rwnd); + *(i++) = udx__swap_uint32_if_be(get_recv_rwnd(stream)); // 32 bit (le) seq *(i++) = udx__swap_uint32_if_be(stream->seq); // 32 bit (le) ack @@ -2289,14 +2308,15 @@ udx_stream_init (udx_t *udx, udx_stream_t *stream, uint32_t local_id, udx_stream stream->ssthresh = 255; stream->cwnd = UDX_CONG_INIT_CWND; stream->cwnd_cnt = 0; - stream->recv_rwnd = UDX_INIT_RWND_BYTES; - stream->send_rwnd = UDX_INIT_RWND_BYTES; + stream->recv_rwnd_max = UDX_DEFAULT_RWND_MAX; + stream->send_rwnd = UDX_DEFAULT_RWND_MAX; stream->on_firewall = NULL; stream->on_read = NULL; stream->on_recv = NULL; stream->on_drain = NULL; stream->on_close = close_cb; stream->on_finalize = finalize_cb; + stream->get_read_buffer_size = NULL; // Clear congestion state memset(&(stream->cong), 0, sizeof(udx_cong_t)); @@ -2361,6 +2381,18 @@ udx_stream_set_ack (udx_stream_t *stream, uint32_t ack) { return 0; } +int +udx_stream_get_rwnd_max (udx_stream_t *stream, uint32_t *size) { + *size = stream->recv_rwnd_max; + return 0; +} + +int +udx_stream_set_rwnd_max (udx_stream_t *stream, uint32_t size) { + stream->recv_rwnd_max = size; + return 0; +} + int udx_stream_firewall (udx_stream_t *stream, udx_stream_firewall_cb cb) { stream->on_firewall = cb; diff --git a/test/stream-write-read-receive-window.c b/test/stream-write-read-receive-window.c index 49a521e..578038e 100644 --- a/test/stream-write-read-receive-window.c +++ b/test/stream-write-read-receive-window.c @@ -5,6 +5,13 @@ #include "../include/udx.h" +/* + * this test is contrived to start with a receiver advertizing zero receive + * window to cause the sender to send zero window probes. after two zero-window + * probes are received we switch to advertizing an empty receive window (see `on_read` + * below) to allow the data to flow at max speed. + */ + uv_loop_t loop; udx_t udx; @@ -67,6 +74,16 @@ on_ack (udx_stream_write_t *req, int status, int unordered) { udx_stream_write_end(recv_end_req, &recv_stream, NULL, 0, on_end); } +uint32_t +pretend_buffer_is_full (udx_stream_t *stream) { + return stream->recv_rwnd_max; +} + +uint32_t +pretend_buffer_is_empty (udx_stream_t *stream) { + return 0; +} + void on_read (udx_stream_t *handle, ssize_t read_len, const uv_buf_t *buf) { read_counter++; @@ -78,7 +95,7 @@ on_read (udx_stream_t *handle, ssize_t read_len, const uv_buf_t *buf) { // read_counter 2: read fired from the first zero window probe triggered by timeout. if (read_counter == 2) { - recv_stream.recv_rwnd = 131072; + handle->get_read_buffer_size = &pretend_buffer_is_empty; } } @@ -115,7 +132,7 @@ main () { e = udx_stream_init(&udx, &send_stream, 2, on_close, on_finalize); assert(e == 0); - recv_stream.recv_rwnd = 0; + recv_stream.get_read_buffer_size = &pretend_buffer_is_full; send_stream.send_rwnd = 0; assert(recv_stream.rto == 1000); @@ -129,7 +146,7 @@ main () { assert(e == 0); int data_sz = UDX_MTU_MAX * 6; - uint8_t *data = malloc(data_sz); + char *data = malloc(data_sz); uv_buf_t buf = uv_buf_init(data, data_sz); e = udx_stream_write(req, &send_stream, &buf, 1, on_ack);