Skip to content

Commit

Permalink
add a stat for packets dropped from socket receive buffer (holepuncht…
Browse files Browse the repository at this point in the history
…o#214)

* wip: added stats for dropped packets (SO_RXQ_OVFL)

* linux: add a socket->packets_dropped_by_kernel stat to detect receive buffer overrun. uses linux-only socket option SO_RXQ_OVFL

* fixes
  • Loading branch information
jthomas43 authored Nov 4, 2024
1 parent d0735c7 commit 19e0a36
Show file tree
Hide file tree
Showing 6 changed files with 65 additions and 0 deletions.
3 changes: 3 additions & 0 deletions include/udx.h
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ struct udx_socket_s {
udx_t *udx;
udx_cirbuf_t *streams_by_id; // for convenience

bool cmsg_wanted; // include a control buffer for recvmsg
int family;
int status;
int readers;
Expand All @@ -157,6 +158,8 @@ struct udx_socket_s {

uint64_t packets_rx;
uint64_t packets_tx;

int64_t packets_dropped_by_kernel;
};

typedef struct udx_cong_s {
Expand Down
3 changes: 3 additions & 0 deletions src/io.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,7 @@ udx__sendmsg (udx_socket_t *handle, const uv_buf_t bufs[], unsigned int bufs_len
ssize_t
udx__recvmsg (udx_socket_t *handle, uv_buf_t *buf, struct sockaddr *addr, int addr_len);

int
udx__udp_set_rxq_ovfl (int fd);

#endif // UDX_IO_H
39 changes: 39 additions & 0 deletions src/io_posix.c
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,48 @@ udx__recvmsg (udx_socket_t *handle, uv_buf_t *buf, struct sockaddr *addr, int ad
h.msg_iov = (struct iovec *) buf;
h.msg_iovlen = 1;

union {
struct cmsghdr align;
uint8_t buf[2048];
} u;

h.msg_control = u.buf;
h.msg_controllen = sizeof(u.buf);

do {
size = recvmsg(handle->io_poll.io_watcher.fd, &h, 0);
} while (size == -1 && errno == EINTR);

#if defined(__linux__)

// relies on SO_RXQ_OVFL being set
uint32_t packets_dropped_by_kernel = 0;

for (struct cmsghdr *cmsg = CMSG_FIRSTHDR(&h); cmsg != NULL; cmsg = CMSG_NXTHDR(&h, cmsg)) {
if (cmsg->cmsg_level == SOL_SOCKET && cmsg->cmsg_type == SO_RXQ_OVFL) {
packets_dropped_by_kernel = *(uint32_t *) CMSG_DATA(cmsg);
}
}

if (packets_dropped_by_kernel) {
handle->packets_dropped_by_kernel = packets_dropped_by_kernel;
}

#endif

return size == -1 ? uv_translate_sys_error(errno) : size;
}

#if defined(__linux__)
int
udx__udp_set_rxq_ovfl (int fd) {
int on = 1;
return setsockopt(fd, SOL_SOCKET, SO_RXQ_OVFL, &on, sizeof(on));
}
#else
int
udx__udp_set_rxq_ovfl (int fd) {
UDX_UNUSED(fd);
return -1;
}
#endif
6 changes: 6 additions & 0 deletions src/io_win.c
Original file line number Diff line number Diff line change
Expand Up @@ -81,3 +81,9 @@ udx__recvmsg (udx_socket_t *socket, uv_buf_t *buf, struct sockaddr *addr, int ad

return bytes;
}

int
udx__udp_set_rxq_ovfl (int fd) {
UDX_UNUSED(fd);
return -1;
}
9 changes: 9 additions & 0 deletions src/udx.c
Original file line number Diff line number Diff line change
Expand Up @@ -1991,6 +1991,9 @@ udx_socket_init (udx_t *udx, udx_socket_t *socket) {
socket->packets_rx = 0;
socket->packets_tx = 0;

socket->packets_dropped_by_kernel = -1;
socket->cmsg_wanted = false;

uv_udp_t *handle = &(socket->handle);
udx__queue_init(&socket->send_queue);

Expand Down Expand Up @@ -2080,6 +2083,12 @@ udx_socket_bind (udx_socket_t *socket, const struct sockaddr *addr, unsigned int
err = uv_poll_init_socket(socket->udx->loop, poll, (uv_os_sock_t) fd);
assert(err == 0);

err = udx__udp_set_rxq_ovfl(fd);
if (!err) {
socket->cmsg_wanted = true;
socket->packets_dropped_by_kernel = 0;
}

socket->status |= UDX_SOCKET_BOUND;
poll->data = socket;

Expand Down
5 changes: 5 additions & 0 deletions test/stream-write-read-perf.c
Original file line number Diff line number Diff line change
Expand Up @@ -149,5 +149,10 @@ main () {
printf("stats: stream a: bytes_rx=%" PRIu64 " packets_rx=%" PRIu64 " bytes_tx=%" PRIu64 " packets_tx=%" PRIu64 "\n", astream.bytes_rx, astream.packets_rx, astream.bytes_tx, astream.packets_tx);
printf("stats: stream b: bytes_rx=%" PRIu64 " packets_rx=%" PRIu64 " bytes_tx=%" PRIu64 " packets_tx=%" PRIu64 "\n", bstream.bytes_rx, bstream.packets_rx, bstream.bytes_tx, bstream.packets_tx);

if (asock.packets_dropped_by_kernel != -1 && bsock.packets_dropped_by_kernel != -1) {
printf("stats: socket a: packets_dropped=%" PRIi64 "\n", asock.packets_dropped_by_kernel);
printf("stats: socket b: packets_dropped=%" PRIi64 "\n", bsock.packets_dropped_by_kernel);
}

return 0;
}

0 comments on commit 19e0a36

Please sign in to comment.