Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rwnd p2 simple #226

Merged
merged 3 commits into from
Dec 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions include/udx.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ typedef void (*udx_stream_send_cb)(udx_stream_send_t *req, int status);
typedef void (*udx_stream_recv_cb)(udx_stream_t *stream, ssize_t read_len, const uv_buf_t *buf);
typedef void (*udx_stream_close_cb)(udx_stream_t *stream, int status);
typedef void (*udx_stream_finalize_cb)(udx_stream_t *stream);
typedef uint32_t (*udx_stream_get_read_buffer_size_cb)(udx_stream_t *stream);

typedef void (*udx_lookup_cb)(udx_lookup_t *handle, int status, const struct sockaddr *addr, int addr_len);

Expand Down Expand Up @@ -222,6 +223,7 @@ struct udx_stream_s {
udx_stream_drain_cb on_drain;
udx_stream_close_cb on_close;
udx_stream_finalize_cb on_finalize;
udx_stream_get_read_buffer_size_cb get_read_buffer_size;

// mtu. RFC8899 5.1.1 and 5.1.3
int mtu_state; // MTU_STATE_*
Expand Down Expand Up @@ -268,8 +270,8 @@ struct udx_stream_s {
uint32_t ssthresh;
uint32_t cwnd; // packets
uint32_t cwnd_cnt;
uint32_t recv_rwnd; // tcp: rcv.wnd. bytes
uint32_t send_rwnd; // remote advertised rwnd
uint32_t send_rwnd; // remote advertised rwnd
uint32_t recv_rwnd_max; // default: UDX_DEFAULT_RWND_MAX

uint32_t send_wl1; // seq at last window update
uint32_t send_wl2; // ack at last window update
Expand Down Expand Up @@ -460,6 +462,12 @@ udx_stream_get_ack (udx_stream_t *stream, uint32_t *ack);
int
udx_stream_set_ack (udx_stream_t *stream, uint32_t ack);

int
udx_stream_get_rwnd_max (udx_stream_t *stream, uint32_t *rwnd_max);

int
udx_stream_set_rwnd_max (udx_stream_t *stream, uint32_t rwnd_max);

int
udx_stream_connect (udx_stream_t *stream, udx_socket_t *socket, uint32_t remote_id, const struct sockaddr *remote_addr);

Expand Down
40 changes: 36 additions & 4 deletions src/udx.c
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
#define UDX_CONG_MAX_CWND 65536
#define UDX_RTO_MAX_MS 30000
#define UDX_RTT_MAX_MS 30000
#define UDX_INIT_RWND_BYTES 131072 // arbitrary, ~90 1500 mtu packets, 52mbits/sec at 20millis latency
#define UDX_DEFAULT_RWND_MAX (256 * 1024) // arbitrary, ~175 1500 mtu packets, @20ms latency = 104 mbits/sec

#define UDX_HIGH_WATERMARK 262144

Expand Down Expand Up @@ -480,6 +480,25 @@ clear_outgoing_packets (udx_stream_t *stream) {
}
}

// returns the rwnd to advertise to the sender
// to provide a rwnd value the user provides two values
// 1. a maximum buffer size. default is UDX_DEFAULT_RWND_MAX
// 2. a callback to return the number of bytes already in the buffer.
// the window is then set to the

static uint32_t
get_recv_rwnd (udx_stream_t *stream) {
uint32_t bufsize = 0;
if (stream->get_read_buffer_size) {
bufsize = stream->get_read_buffer_size(stream);
}
if (stream->recv_rwnd_max > bufsize) {
return stream->recv_rwnd_max - bufsize;
} else {
return 0;
}
}

static void
init_stream_packet (udx_packet_t *pkt, int type, udx_stream_t *stream, const uv_buf_t *userbufs, int nuserbufs) {
uint8_t *b = (uint8_t *) &(pkt->header);
Expand All @@ -495,7 +514,7 @@ init_stream_packet (udx_packet_t *pkt, int type, udx_stream_t *stream, const uv_
// 32 bit (le) remote id
*(i++) = udx__swap_uint32_if_be(stream->remote_id);
// 32 bit (le) recv window
*(i++) = udx__swap_uint32_if_be(stream->recv_rwnd);
*(i++) = udx__swap_uint32_if_be(get_recv_rwnd(stream));
// 32 bit (le) seq
*(i++) = udx__swap_uint32_if_be(stream->seq);
// 32 bit (le) ack
Expand Down Expand Up @@ -2289,14 +2308,15 @@ udx_stream_init (udx_t *udx, udx_stream_t *stream, uint32_t local_id, udx_stream
stream->ssthresh = 255;
stream->cwnd = UDX_CONG_INIT_CWND;
stream->cwnd_cnt = 0;
stream->recv_rwnd = UDX_INIT_RWND_BYTES;
stream->send_rwnd = UDX_INIT_RWND_BYTES;
stream->recv_rwnd_max = UDX_DEFAULT_RWND_MAX;
stream->send_rwnd = UDX_DEFAULT_RWND_MAX;
stream->on_firewall = NULL;
stream->on_read = NULL;
stream->on_recv = NULL;
stream->on_drain = NULL;
stream->on_close = close_cb;
stream->on_finalize = finalize_cb;
stream->get_read_buffer_size = NULL;

// Clear congestion state
memset(&(stream->cong), 0, sizeof(udx_cong_t));
Expand Down Expand Up @@ -2361,6 +2381,18 @@ udx_stream_set_ack (udx_stream_t *stream, uint32_t ack) {
return 0;
}

int
udx_stream_get_rwnd_max (udx_stream_t *stream, uint32_t *size) {
*size = stream->recv_rwnd_max;
return 0;
}

int
udx_stream_set_rwnd_max (udx_stream_t *stream, uint32_t size) {
stream->recv_rwnd_max = size;
return 0;
}

int
udx_stream_firewall (udx_stream_t *stream, udx_stream_firewall_cb cb) {
stream->on_firewall = cb;
Expand Down
23 changes: 20 additions & 3 deletions test/stream-write-read-receive-window.c
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,13 @@

#include "../include/udx.h"

/*
* this test is contrived to start with a receiver advertizing zero receive
* window to cause the sender to send zero window probes. after two zero-window
* probes are received we switch to advertizing an empty receive window (see `on_read`
* below) to allow the data to flow at max speed.
*/

uv_loop_t loop;
udx_t udx;

Expand Down Expand Up @@ -67,6 +74,16 @@ on_ack (udx_stream_write_t *req, int status, int unordered) {
udx_stream_write_end(recv_end_req, &recv_stream, NULL, 0, on_end);
}

uint32_t
pretend_buffer_is_full (udx_stream_t *stream) {
return stream->recv_rwnd_max;
}

uint32_t
pretend_buffer_is_empty (udx_stream_t *stream) {
return 0;
}

void
on_read (udx_stream_t *handle, ssize_t read_len, const uv_buf_t *buf) {
read_counter++;
Expand All @@ -78,7 +95,7 @@ on_read (udx_stream_t *handle, ssize_t read_len, const uv_buf_t *buf) {
// read_counter 2: read fired from the first zero window probe triggered by timeout.

if (read_counter == 2) {
recv_stream.recv_rwnd = 131072;
handle->get_read_buffer_size = &pretend_buffer_is_empty;
}
}

Expand Down Expand Up @@ -115,7 +132,7 @@ main () {
e = udx_stream_init(&udx, &send_stream, 2, on_close, on_finalize);
assert(e == 0);

recv_stream.recv_rwnd = 0;
recv_stream.get_read_buffer_size = &pretend_buffer_is_full;
send_stream.send_rwnd = 0;
assert(recv_stream.rto == 1000);

Expand All @@ -129,7 +146,7 @@ main () {
assert(e == 0);

int data_sz = UDX_MTU_MAX * 6;
uint8_t *data = malloc(data_sz);
char *data = malloc(data_sz);
uv_buf_t buf = uv_buf_init(data, data_sz);

e = udx_stream_write(req, &send_stream, &buf, 1, on_ack);
Expand Down