|
| 1 | +#include <assert.h> |
| 2 | +#include <inttypes.h> |
| 3 | +#include <stdbool.h> |
| 4 | +#include <stdlib.h> |
| 5 | +#include <string.h> |
| 6 | + |
| 7 | +#include "../include/udx.h" |
| 8 | +#include "helpers.h" |
| 9 | + |
| 10 | +uv_loop_t loop; |
| 11 | +udx_t udx; |
| 12 | + |
| 13 | +udx_socket_t asock; |
| 14 | +udx_stream_t astream; |
| 15 | + |
| 16 | +udx_socket_t bsock; |
| 17 | +udx_stream_t bstream; |
| 18 | + |
| 19 | +struct { |
| 20 | + uint64_t size_bytes; |
| 21 | +} options; |
| 22 | + |
| 23 | +struct { |
| 24 | + uint64_t bytes_read; |
| 25 | + uint64_t last_bytes_read; |
| 26 | + |
| 27 | + uint64_t last_print_ms; |
| 28 | + uint64_t time_zero_ms; |
| 29 | + |
| 30 | + uint64_t last_read_ms; |
| 31 | + int finished; |
| 32 | +} stats; |
| 33 | + |
| 34 | +uint64_t read_hash = HASH_INIT; |
| 35 | +uint64_t write_hash = HASH_INIT; |
| 36 | + |
| 37 | +void |
| 38 | +on_ack (udx_stream_write_t *r, int status, int unordered) { |
| 39 | + printf("write acked, status=%d %s\n", status, status == UV_ECANCELED ? "(UV_ECANCELED)" : ""); |
| 40 | + udx_stream_destroy(r->stream); |
| 41 | + udx_stream_destroy(&astream); |
| 42 | +} |
| 43 | + |
| 44 | +void |
| 45 | +on_read (udx_stream_t *handle, ssize_t read_len, const uv_buf_t *buf) { |
| 46 | + stats.bytes_read += read_len; |
| 47 | + stats.last_read_ms = uv_hrtime() / 1000000; |
| 48 | + |
| 49 | + assert(read_len == buf->len); |
| 50 | + |
| 51 | + read_hash = hash(read_hash, (uint8_t *) buf->base, read_len); |
| 52 | + |
| 53 | + if (stats.bytes_read == options.size_bytes) { |
| 54 | + printf("read all bytes\n"); |
| 55 | + } |
| 56 | +} |
| 57 | + |
| 58 | +static void |
| 59 | +on_b_sock_close () { |
| 60 | + printf("sending socket closing\n"); |
| 61 | +} |
| 62 | + |
| 63 | +static void |
| 64 | +on_b_stream_close () { |
| 65 | + printf("sending stream closing\n"); |
| 66 | + int e = udx_socket_close(&bsock); |
| 67 | + assert(e == 0 && "udx_socket_close (sender, 'b')"); |
| 68 | +} |
| 69 | + |
| 70 | +static void |
| 71 | +on_a_sock_close () { |
| 72 | + printf("receiving socket closing\n"); |
| 73 | +} |
| 74 | + |
| 75 | +static void |
| 76 | +on_a_stream_close () { |
| 77 | + printf("receiving stream closing\n"); |
| 78 | + int e = udx_socket_close(&asock); |
| 79 | + assert(e == 0 && "udx_socket_close (receiver, 'a')"); |
| 80 | +} |
| 81 | + |
| 82 | +int |
| 83 | +main () { |
| 84 | + int e; |
| 85 | + |
| 86 | + udx_stream_write_t *req = malloc(udx_stream_write_sizeof(1)); |
| 87 | + |
| 88 | + uv_loop_init(&loop); |
| 89 | + |
| 90 | + e = udx_init(&loop, &udx, NULL); |
| 91 | + udx.debug_flags |= UDX_DEBUG_FORCE_DROP_PROBES; |
| 92 | + assert(e == 0); |
| 93 | + |
| 94 | + e = udx_socket_init(&udx, &asock, on_a_sock_close); |
| 95 | + assert(e == 0); |
| 96 | + |
| 97 | + e = udx_socket_init(&udx, &bsock, on_b_sock_close); |
| 98 | + assert(e == 0); |
| 99 | + |
| 100 | + struct sockaddr_in baddr; |
| 101 | + uv_ip4_addr("127.0.0.1", 8082, &baddr); |
| 102 | + e = udx_socket_bind(&bsock, (struct sockaddr *) &baddr, 0); |
| 103 | + assert(e == 0); |
| 104 | + |
| 105 | + struct sockaddr_in aaddr; |
| 106 | + uv_ip4_addr("127.0.0.1", 8081, &aaddr); |
| 107 | + e = udx_socket_bind(&asock, (struct sockaddr *) &aaddr, 0); |
| 108 | + assert(e == 0); |
| 109 | + |
| 110 | + e = udx_stream_init(&udx, &astream, 1, on_a_stream_close, NULL); |
| 111 | + assert(e == 0); |
| 112 | + |
| 113 | + e = udx_stream_init(&udx, &bstream, 2, on_b_stream_close, NULL); |
| 114 | + assert(e == 0); |
| 115 | + |
| 116 | + e = udx_stream_connect(&astream, &asock, 2, (struct sockaddr *) &baddr); |
| 117 | + assert(e == 0); |
| 118 | + |
| 119 | + e = udx_stream_connect(&bstream, &bsock, 1, (struct sockaddr *) &aaddr); |
| 120 | + assert(e == 0); |
| 121 | + |
| 122 | + e = udx_stream_read_start(&astream, on_read); |
| 123 | + assert(e == 0); |
| 124 | + |
| 125 | + options.size_bytes = 10 * 1024 * 1024L; |
| 126 | + printf("generating data ... (%" PRIu64 " bytes)\n", options.size_bytes); |
| 127 | + |
| 128 | + uint8_t *data = calloc(options.size_bytes, 1); |
| 129 | + |
| 130 | + write_hash = hash(write_hash, data, options.size_bytes); |
| 131 | + |
| 132 | + assert(data != NULL && "malloc"); |
| 133 | + |
| 134 | + printf("writing data\n"); |
| 135 | + |
| 136 | + uv_buf_t buf = uv_buf_init((char *) data, options.size_bytes); |
| 137 | + udx_stream_write(req, &bstream, &buf, 1, on_ack); |
| 138 | + |
| 139 | + e = uv_run(&loop, UV_RUN_DEFAULT); |
| 140 | + assert(e == 0 && "UV_RUN"); |
| 141 | + |
| 142 | + uv_loop_close(&loop); |
| 143 | + |
| 144 | + free(data); // valgrind |
| 145 | + free(req); // valgrind |
| 146 | + printf("readhash=%" PRIx64 " writehash=%" PRIx64 "\n", read_hash, write_hash); |
| 147 | + assert(read_hash == write_hash); |
| 148 | + |
| 149 | + printf("stats: udx: bytes_rx=%" PRIu64 " packets_rx=%" PRIu64 " bytes_tx=%" PRIu64 " packets_tx=%" PRIu64 "\n", udx.bytes_rx, udx.packets_rx, udx.bytes_tx, udx.packets_tx); |
| 150 | + printf("stats: stream a: bytes_rx=%" PRIu64 " packets_rx=%" PRIu64 " bytes_tx=%" PRIu64 " packets_tx=%" PRIu64 "\n", astream.bytes_rx, astream.packets_rx, astream.bytes_tx, astream.packets_tx); |
| 151 | + printf("stats: stream b: bytes_rx=%" PRIu64 " packets_rx=%" PRIu64 " bytes_tx=%" PRIu64 " packets_tx=%" PRIu64 "\n", bstream.bytes_rx, bstream.packets_rx, bstream.bytes_tx, bstream.packets_tx); |
| 152 | + |
| 153 | + if (asock.packets_dropped_by_kernel != -1 && bsock.packets_dropped_by_kernel != -1) { |
| 154 | + printf("stats: socket a: packets_dropped=%" PRIi64 "\n", asock.packets_dropped_by_kernel); |
| 155 | + printf("stats: socket b: packets_dropped=%" PRIi64 "\n", bsock.packets_dropped_by_kernel); |
| 156 | + |
| 157 | + assert(asock.packets_dropped_by_kernel + bsock.packets_dropped_by_kernel == udx.packets_dropped_by_kernel); |
| 158 | + } |
| 159 | + |
| 160 | + assert(bstream.mtu == UDX_MTU_BASE); |
| 161 | + |
| 162 | + return 0; |
| 163 | +} |
0 commit comments