From bac51cfb1afab93a93926b1ca66601cc1ef0d18f Mon Sep 17 00:00:00 2001 From: Paul Renauld Date: Sun, 10 May 2020 15:55:01 +0200 Subject: [PATCH 01/15] added local changes --- r2p2/inc/r2p2/api-internal.h | 82 ++++++++++ r2p2/inc/r2p2/api.h | 5 +- r2p2/inc/r2p2/r2p2-linux.h | 14 ++ r2p2/linux-backend.c | 148 +++++++++++++++++- r2p2/r2p2-common.c | 281 ++++++++++++++++++++++++++++++----- 5 files changed, 480 insertions(+), 50 deletions(-) diff --git a/r2p2/inc/r2p2/api-internal.h b/r2p2/inc/r2p2/api-internal.h index 19d54e0..34ec1e1 100644 --- a/r2p2/inc/r2p2/api-internal.h +++ b/r2p2/inc/r2p2/api-internal.h @@ -38,12 +38,24 @@ #define L_FLAG 0x40 #define MAGIC 0xCC +#define EXCT_ONCE_FLAG 0x02 +#define EO_MAX_RETRY_REQUEST 5 +#define EO_MAX_RETRY_REPLY 5 +#define EO_TO_REPLY 2500000 +#define EO_TO_NETWORK_FLUSH 5000000 +#define ACK_NOT_RECEIVED UINT16_MAX + +#define MIN_HEADER_SIZE (sizeof(struct r2p2_header))// - sizeof(uint16_t)) + enum { REQUEST_MSG = 0, RESPONSE_MSG, CONTROL_MSG, ACK_MSG, DROP_MSG, + REQUEST_EXCT_ONCE, + RESPONSE_EXCT_ONCE, + ACK_EXCT_ONCE }; typedef void *generic_buffer; @@ -55,6 +67,8 @@ struct r2p2_header { uint8_t flags; uint16_t rid; uint16_t p_order; +// uint16_t extended_rid; + // add session ID? }; struct r2p2_msg { @@ -64,6 +78,11 @@ struct r2p2_msg { generic_buffer tail_buffer; }; +struct r2p2_cp_exct_once_info { + // extended rid + uint16_t req_resent; +}; + struct r2p2_client_pair { struct r2p2_msg request; struct r2p2_msg reply; @@ -77,6 +96,16 @@ struct r2p2_client_pair { void *timer; void *impl_data; // Used to hold the socket used in linux void (*on_free)(void *impl_data); + struct r2p2_cp_exct_once_info *eo_info; +}; + + +struct r2p2_sp_exct_once_info { + // extended rid + uint16_t req_received; + uint16_t req_resent; + uint16_t reply_resent; + void *timer; }; struct r2p2_server_pair { @@ -84,12 +113,26 @@ struct r2p2_server_pair { struct r2p2_msg reply; uint16_t request_expected_packets; uint16_t request_received_packets; + struct r2p2_sp_exct_once_info *eo_info; // Add here fields for garbage collection, e.g. last received }; +enum { + EO_NEW = 0, + EO_IN_PROGRESS, + EO_COMPLETED, + EO_STALE +}; + +struct r2p2_eo_client_info { + uint16_t next_seq; +// uint16_t extended_rid; // exclusive +}; + static inline int is_response(struct r2p2_header *h) { return ((h->type_policy & 0xF0) == (RESPONSE_MSG << 4)) || + ((h->type_policy & 0xF0) == (RESPONSE_EXCT_ONCE << 4)) || ((h->type_policy & 0xF0) == (ACK_MSG << 4)) || ((h->type_policy & 0xF0) == (DROP_MSG << 4)); } @@ -109,6 +152,11 @@ static inline uint8_t get_msg_type(struct r2p2_header *h) return (h->type_policy & 0xF0) >> 4; } +static inline unsigned int get_header_size(const char* buf) +{ + return ((struct r2p2_header*) buf)->header_size; +} + /* * Generic buffer API */ @@ -141,6 +189,7 @@ void handle_incoming_pck(generic_buffer gb, int len, struct r2p2_host_tuple *local_host); #endif void timer_triggered(struct r2p2_client_pair *cp); +void sp_timer_triggered(struct r2p2_server_pair *sp); /* Exposed only for lancet */ void r2p2_prepare_msg(struct r2p2_msg *msg, struct iovec *iov, int iovcnt, uint8_t req_type, uint8_t policy, uint16_t req_id); @@ -152,4 +201,37 @@ int prepare_to_send(struct r2p2_client_pair *cp); int buf_list_send(generic_buffer first_buf, struct r2p2_host_tuple *dest, void *socket_info); int disarm_timer(void *timer); +int cp_restart_timer(struct r2p2_client_pair *cp, long timeout); + +void sp_get_timer(struct r2p2_server_pair *sp); +int sp_restart_timer(struct r2p2_server_pair *sp, long timeout); +void sp_free_timer(struct r2p2_server_pair *sp); + void router_notify(void); + +/* + * Exactly Once specific + */ +static inline int is_exct_once(struct r2p2_ctx *ctx) +{ + return (ctx->routing_policy & EXCT_ONCE_FLAG) != 0; +} + +static inline int is_ack_exct_once(struct r2p2_header *h) +{ + return ((h->type_policy & 0xF0) == (ACK_EXCT_ONCE << 4)); +} + +void send_eo_ack(struct r2p2_client_pair *cp); + +void handle_ack_eo(generic_buffer gb, int len, struct r2p2_header *r2p2h, + struct r2p2_host_tuple *source); + +int eo_try_garbage_collect(struct r2p2_server_pair *sp); + + +#define DEBUG 1 + +#if DEBUG +void __debug_dump(); +#endif diff --git a/r2p2/inc/r2p2/api.h b/r2p2/inc/r2p2/api.h index d5fecf0..35252e1 100644 --- a/r2p2/inc/r2p2/api.h +++ b/r2p2/inc/r2p2/api.h @@ -47,12 +47,13 @@ enum { enum { ERR_NO_SOCKET=1, ERR_DROP_MSG, + ERR_FULL_EXCT_ONCE_BUFFER }; struct r2p2_ctx { success_cb_f success_cb; error_cb_f error_cb; - timeout_cb_f timeout_cb; + timeout_cb_f timeout_cb; // with EO, triggered only in case of failure, i.e. after enough timeouts void *arg; long timeout; int routing_policy; @@ -79,3 +80,5 @@ void r2p2_set_app_flow_control_fn(app_flow_control fn); void r2p2_send_req(struct iovec *iov, int iovcnt, struct r2p2_ctx *ctx); void r2p2_send_response(long handle, struct iovec *iov, int iovcnt); void r2p2_recv_resp_done(long handle); + +void use_exct_once(struct r2p2_ctx *ctx); diff --git a/r2p2/inc/r2p2/r2p2-linux.h b/r2p2/inc/r2p2/r2p2-linux.h index c3c9371..21ab191 100644 --- a/r2p2/inc/r2p2/r2p2-linux.h +++ b/r2p2/inc/r2p2/r2p2-linux.h @@ -33,6 +33,8 @@ #define BUFLEN 2048 //(PAYLOAD_SIZE + sizeof(struct r2p2_header) + sizeof(struct // linux_buf_hdr)) half a page +#define TIMERPOOL_SIZE 128 + struct __attribute__((packed)) linux_buf_hdr { uint32_t payload_size; struct linux_buf_hdr *next; @@ -54,3 +56,15 @@ struct socket_pool { uint32_t idx; struct r2p2_socket sockets[SOCKPOOL_SIZE]; }; + +struct free_timer { + int fd; + int taken; + void* data; +}; + +struct timer_pool { + uint32_t count; + uint32_t idx; + struct free_timer timers[TIMERPOOL_SIZE]; +}; diff --git a/r2p2/linux-backend.c b/r2p2/linux-backend.c index 17aa99f..834dfda 100644 --- a/r2p2/linux-backend.c +++ b/r2p2/linux-backend.c @@ -22,16 +22,22 @@ * SOFTWARE. */ -#include #include #include #include #include #include #include -#include + +# if __APPLE__ +#include "linuxsys4mac.h" +#else #include +#include #include +#include +#endif + #include #include @@ -47,6 +53,7 @@ struct sockaddr_in router_addr; static __thread int efd; static __thread struct socket_pool sp; +static __thread struct timer_pool tp; static __thread struct fixed_mempool *buf_pool; #ifdef WITH_TIMESTAMPING @@ -101,6 +108,14 @@ int r2p2_init_per_core(int core_id, int core_count) return -1; } +#if DEBUG + event.events = EPOLLIN; + event.data.fd = 0; + ret = epoll_ctl(efd, EPOLL_CTL_ADD, 0, &event); + if (ret) + return -1; +#endif + // Add the server socket event.events = EPOLLIN; event.data.ptr = (void *)&sock.fd; @@ -178,6 +193,25 @@ int r2p2_init_per_core(int core_id, int core_count) } } + tp.count = 0; + tp.idx = 0; + // Create the free timers + for (i = 0; i < TIMERPOOL_SIZE; i++) { + tfd = timerfd_create(CLOCK_REALTIME, TFD_NONBLOCK); + + tp.timers[i].fd = tfd; + tp.timers[i].taken = 0; + tp.timers[i].data = NULL; + + event.events = EPOLLIN; + event.data.ptr = (void *)&(tp.timers[i]); + ret = epoll_ctl(efd, EPOLL_CTL_ADD, tfd, &event); + if (ret) { + perror("epoll_ctl"); + return -1; + } + } + return 0; } @@ -202,6 +236,24 @@ static struct r2p2_socket *get_socket(void) return res; } +static struct free_timer *get_timer(void) +{ + struct free_timer *res; + uint32_t idx; + + if (tp.count >= TIMERPOOL_SIZE) + return NULL; + + while (tp.timers[tp.idx++ & (TIMERPOOL_SIZE - 1)].taken) + ; + idx = (tp.idx - 1) & (TIMERPOOL_SIZE - 1); + res = &tp.timers[idx]; + tp.timers[idx].taken = 1; + tp.count++; + + return res; +} + static int __disarm_timer(int timerfd) { struct itimerspec ts = {0}; @@ -222,6 +274,7 @@ static void free_socket(struct r2p2_socket *s) static void linux_on_client_pair_free(void *data) { + printf("Free client pair\n"); struct r2p2_socket *sock = (struct r2p2_socket *)data; __disarm_timer(sock->tfd); free_socket(sock); @@ -235,6 +288,32 @@ static void handle_timer_for_socket(struct r2p2_socket *s) timer_triggered(s->cp); } +static void handle_free_timer(struct free_timer *ft) { + assert(ft && ft->data && ft->taken); + __disarm_timer(ft->fd); + sp_timer_triggered((struct r2p2_server_pair *)ft->data); +} + +void sp_get_timer(struct r2p2_server_pair *sp) { + struct free_timer *t; + t = get_timer(); + if (t == NULL) { + perror("Could not allocate free timer"); + } + t->data = sp; + sp->eo_info->timer = t; +} + +void sp_free_timer(struct r2p2_server_pair *sp) { + struct free_timer *t = sp->eo_info->timer; + if (t) { + t->taken = 0; + t->data = NULL; + __disarm_timer(t->fd); + sp->eo_info->timer = NULL; + } +} + /* * Generic buffer implementation */ @@ -339,7 +418,7 @@ int r2p2_init(int listen_port) void r2p2_poll(void) { struct epoll_event events[MAX_EVENTS]; - int ready, i, recvlen, is_timer_event; + int ready, i, recvlen, is_socket_timer_event, is_free_timer_event; struct r2p2_socket *s; generic_buffer gb; void *buf, *event_arg; @@ -353,12 +432,29 @@ void r2p2_poll(void) ready = epoll_wait(efd, events, MAX_EVENTS, 0); for (i = 0; i < ready; i++) { - event_arg = (struct r2p2_socket *)events[i].data.ptr; - assert(event_arg); + +#if DEBUG + if (events[i].data.fd == 0) { + int c; + while ((c = getchar()) != '\n' && c != EOF) { } + __debug_dump(); + continue; + } +#endif + + event_arg = (struct r2p2_socket *)events[i].data.ptr; + assert(event_arg); if (events[i].events & EPOLLIN) { - is_timer_event = - (unsigned long)event_arg % sizeof(struct r2p2_socket); - if (is_timer_event) { + is_socket_timer_event = + (unsigned long) event_arg % sizeof(struct r2p2_socket); + is_free_timer_event = + (void *) tp.timers <= events[i].data.ptr && + events[i].data.ptr < (void *) (tp.timers + TIMERPOOL_SIZE); + + if (is_free_timer_event) { + handle_free_timer((struct free_timer *)events[i].data.ptr); + + } else if (is_socket_timer_event) { assert((unsigned long)event_arg % sizeof(struct r2p2_socket) == 4); s = container_of(event_arg, struct r2p2_socket, tfd); @@ -480,6 +576,42 @@ int disarm_timer(void *timer) return __disarm_timer(tfd); } +static void __restart_timer(int tfd, long timeout) { + struct itimerspec ts; + + ts.it_interval.tv_sec = 0; + ts.it_interval.tv_nsec = 0; + ts.it_value.tv_sec = timeout / 1000000; + ts.it_value.tv_nsec = (timeout % 1000000) * 1000; + + if (timerfd_settime(tfd, 0, &ts, NULL) < 0) { + perror("Error resetting timer"); + assert(0); + } +} + + +int cp_restart_timer(struct r2p2_client_pair *cp, long timeout) +{ + struct r2p2_socket *s; + + s = (struct r2p2_socket *)cp->impl_data; + assert(s->taken && s->cp == cp); + __restart_timer(s->tfd, timeout); + cp->timer = (void *)(long)s->tfd; + return 0; +} + +int sp_restart_timer(struct r2p2_server_pair *sp, long timeout) { + struct free_timer *timer; + assert(sp && sp->eo_info && sp->eo_info->timer); + + timer = (struct free_timer *)sp->eo_info->timer; + assert(timer->data == sp && timer->taken); + __restart_timer(timer->fd, timeout); + return 0; +} + void router_notify(void) { #ifdef WITH_ROUTER diff --git a/r2p2/r2p2-common.c b/r2p2/r2p2-common.c index 0f1d496..34c6209 100644 --- a/r2p2/r2p2-common.c +++ b/r2p2/r2p2-common.c @@ -49,7 +49,38 @@ static __thread struct fixed_linked_list pending_client_pairs = {0}; static __thread struct fixed_linked_list pending_server_pairs = {0}; static __thread struct iovec to_app_iovec[0xFF]; // change this to 0xFF; -static struct r2p2_client_pair *alloc_client_pair(void) +static __thread struct r2p2_eo_client_info eo_client_info = {0}; // possible improvement: alloc only for client + +#if DEBUG +static void print_cp(void* __cp) { + struct r2p2_client_pair* cp = (struct r2p2_client_pair*) __cp; + if (cp->eo_info) + printf("{req_id: %d, retries: %d}", cp->request.req_id, cp->eo_info->req_resent); +} + +static void print_sp(void* __sp) { + struct r2p2_server_pair* sp = (struct r2p2_server_pair*) __sp; + if (sp->eo_info) + printf("{req_id: %d, retries: %d, req_received: %d}", sp->request.req_id, sp->eo_info->req_resent, sp->eo_info->req_received); +} + +static void print_linked_list(const char* str, struct fixed_linked_list *ll, void(*print_fun)(void*)) { + printf("%s: ", str); + for (struct fixed_obj *obj = ll->head; obj; obj = obj->next) { + print_fun(obj->elem); + } + printf("\n"); +} + +void __debug_dump() +{ + if (pending_client_pairs.head != NULL) print_linked_list("cp", &pending_client_pairs, print_cp); + else if (pending_server_pairs.head != NULL) print_linked_list("sp", &pending_server_pairs, print_sp); + else printf("empty debug\n"); +} +#endif + +static struct r2p2_client_pair *alloc_client_pair(int with_eo_info) { struct r2p2_client_pair *cp; @@ -58,6 +89,11 @@ static struct r2p2_client_pair *alloc_client_pair(void) bzero(cp, sizeof(struct r2p2_client_pair)); + if (with_eo_info) { + cp->eo_info = malloc(sizeof(struct r2p2_cp_exct_once_info)); // TODO: use alloc_object + assert(cp->eo_info); + } + return cp; } @@ -85,10 +121,15 @@ static void free_client_pair(struct r2p2_client_pair *cp) if (cp->on_free) cp->on_free(cp->impl_data); + if (cp->eo_info) { + free(cp->eo_info); // TODO: use free_object + cp->eo_info = NULL; + } + free_object(cp); } -static struct r2p2_server_pair *alloc_server_pair(void) +static struct r2p2_server_pair *alloc_server_pair(int with_eo_info) { struct r2p2_server_pair *sp; @@ -97,6 +138,11 @@ static struct r2p2_server_pair *alloc_server_pair(void) bzero(sp, sizeof(struct r2p2_server_pair)); + if (with_eo_info) { + sp->eo_info = malloc(sizeof(struct r2p2_sp_exct_once_info)); // TODO: use alloc_object + assert(sp->eo_info); + } + return sp; } @@ -120,6 +166,12 @@ static void free_server_pair(struct r2p2_server_pair *sp) } #endif + if (sp->eo_info) { + sp_free_timer(sp); + free(sp->eo_info); // TODO: use free_object + sp->eo_info = NULL; + } + free_object(sp); } @@ -195,8 +247,8 @@ static int prepare_to_app_iovec(struct r2p2_msg *msg) buf = get_buffer_payload(gb); assert(buf); len = get_buffer_payload_size(gb); - to_app_iovec[iovcnt].iov_base = ((struct r2p2_header *)buf) + 1; - to_app_iovec[iovcnt++].iov_len = len - sizeof(struct r2p2_header); + to_app_iovec[iovcnt].iov_base = buf + get_header_size(buf); + to_app_iovec[iovcnt++].iov_len = len - get_header_size(buf); gb = get_buffer_next(gb); assert(iovcnt < 0xFF); } @@ -236,11 +288,13 @@ void r2p2_prepare_msg(struct r2p2_msg *msg, struct iovec *iov, int iovcnt, uint8_t req_type, uint8_t policy, uint16_t req_id) { unsigned int iov_idx, bufferleft, copied, tocopy, buffer_cnt, total_payload, - single_packet_msg, is_first, should_small_first; + single_packet_msg, is_first, should_small_first, header_size; struct r2p2_header *r2p2h; generic_buffer gb, new_gb; char *target, *src; + header_size = MIN_HEADER_SIZE; + // Compute the total payload total_payload = 0; for (int i = 0; i < iovcnt; i++) @@ -266,12 +320,10 @@ void r2p2_prepare_msg(struct r2p2_msg *msg, struct iovec *iov, int iovcnt, // Set the last buffer to full size if (gb) { if (is_first && should_small_first) { - set_buffer_payload_size(gb, MIN_PAYLOAD_SIZE + - sizeof(struct r2p2_header)); + set_buffer_payload_size(gb, MIN_PAYLOAD_SIZE + header_size); is_first = 0; } else - set_buffer_payload_size(gb, PAYLOAD_SIZE + - sizeof(struct r2p2_header)); + set_buffer_payload_size(gb, PAYLOAD_SIZE + header_size); } new_gb = get_buffer(); assert(new_gb); @@ -284,14 +336,15 @@ void r2p2_prepare_msg(struct r2p2_msg *msg, struct iovec *iov, int iovcnt, bufferleft = PAYLOAD_SIZE; // FIX the header r2p2h = (struct r2p2_header *)target; - bzero(r2p2h, sizeof(struct r2p2_header)); + bzero(r2p2h, header_size); + r2p2h->magic = MAGIC; r2p2h->rid = req_id; - r2p2h->header_size = sizeof(struct r2p2_header); + r2p2h->header_size = header_size; r2p2h->type_policy = (req_type << 4) | (0x0F & policy); r2p2h->p_order = buffer_cnt++; r2p2h->flags = 0; - target += sizeof(struct r2p2_header); + target += header_size; } src = iov[iov_idx].iov_base; tocopy = min(bufferleft, iov[iov_idx].iov_len - copied); @@ -306,8 +359,7 @@ void r2p2_prepare_msg(struct r2p2_msg *msg, struct iovec *iov, int iovcnt, } // Set the len of the last buffer - set_buffer_payload_size(gb, PAYLOAD_SIZE + sizeof(struct r2p2_header) - - bufferleft); + set_buffer_payload_size(gb, PAYLOAD_SIZE + header_size - bufferleft); // Fix the header of the first and last packet r2p2h = (struct r2p2_header *)get_buffer_payload(msg->head_buffer); @@ -376,6 +428,11 @@ static void handle_response(generic_buffer gb, int len, cp->reply.sender = *source; switch(get_msg_type(r2p2h)) { + case RESPONSE_EXCT_ONCE: + // todo: handle already received response + assert(cp->eo_info); + send_eo_ack(cp); + // no break, continue like regular response case RESPONSE_MSG: assert(cp->state == R2P2_W_RESPONSE); set_buffer_payload_size(gb, len); @@ -427,9 +484,9 @@ static void handle_response(generic_buffer gb, int len, case ACK_MSG: // Send the rest packets assert(cp->state == R2P2_W_ACK); - if (len != (sizeof(struct r2p2_header) + 3)) + if (len != (MIN_HEADER_SIZE + 3)) printf("ACK msg size is %d\n", len); - assert(len == (sizeof(struct r2p2_header) + 3)); + assert(len == (MIN_HEADER_SIZE + 3)); free_buffer(gb); #ifdef LINUX rest_to_send = get_buffer_next(cp->request.head_buffer); @@ -454,13 +511,31 @@ static void handle_request(generic_buffer gb, int len, struct r2p2_header *r2p2h, struct r2p2_host_tuple *source) { + printf("handle_request start: "); __debug_dump(); struct r2p2_server_pair *sp; uint16_t req_id; char ack_payload[] = "ACK"; struct iovec ack; struct r2p2_msg ack_msg = {0}; + int exct_once; + exct_once = get_msg_type(r2p2h) == REQUEST_EXCT_ONCE; req_id = r2p2h->rid; + + if (exct_once) { + sp = find_in_pending_server_pairs(req_id, source); + if (sp != NULL) { + assert(sp->eo_info); + if (is_first(r2p2h)) { + sp->eo_info->req_received++; +// buf_list_send(sp->reply.head_buffer, &sp->request.sender, NULL); + eo_try_garbage_collect(sp); + } + free_buffer(gb); + return; + } + } + if (is_first(r2p2h)) { /* * FIXME @@ -468,12 +543,18 @@ static void handle_request(generic_buffer gb, int len, * src ip port is already there * remove before starting the new one */ - sp = alloc_server_pair(); + sp = alloc_server_pair(exct_once); assert(sp); sp->request.sender = *source; sp->request.req_id = req_id; sp->request_expected_packets = r2p2h->p_order; sp->request_received_packets = 1; + if (exct_once) { + sp->eo_info->req_received = 1; + sp->eo_info->req_resent = ACK_NOT_RECEIVED; + sp->eo_info->reply_resent = 0; + sp_get_timer(sp); + } if (!should_keep_req(sp)) { set_buffer_payload_size(gb, len); @@ -495,6 +576,9 @@ static void handle_request(generic_buffer gb, int len, #ifdef LINUX free_buffer(ack_msg.head_buffer); #endif + } else if (exct_once) { + // add to pending request + add_to_pending_server_pairs(sp); } } else { // find in pending msgs @@ -536,11 +620,14 @@ void handle_incoming_pck(generic_buffer gb, int len, struct r2p2_header *r2p2h; char *buf; - if ((unsigned)len < sizeof(struct r2p2_header)) + if ((unsigned)len < MIN_HEADER_SIZE) printf("I received %d\n", len); - assert((unsigned)len >= sizeof(struct r2p2_header)); - buf = get_buffer_payload(gb); - r2p2h = (struct r2p2_header *)buf; + assert((unsigned)len >= MIN_HEADER_SIZE); + + buf = get_buffer_payload(gb); + r2p2h = (struct r2p2_header *)buf; + printf("\nReceived packet from %d:%d, seq=%d, len=%d\n", source->ip, source->port, r2p2h->rid, len); +// assert(r2p2h->header_size == (get_msg_type(r2p2h) == REQUEST_EXCT_ONCE ? sizeof(struct r2p2_header) : MIN_HEADER_SIZE)); if (is_response(r2p2h)) #ifdef WITH_TIMESTAMPING @@ -548,6 +635,8 @@ void handle_incoming_pck(generic_buffer gb, int len, #else handle_response(gb, len, r2p2h, source, local_host); #endif + else if (is_ack_exct_once(r2p2h)) + handle_ack_eo(gb, len, r2p2h, source); else handle_request(gb, len, r2p2h, source); } @@ -568,17 +657,33 @@ int r2p2_backend_init_per_core(void) void timer_triggered(struct r2p2_client_pair *cp) { - struct fixed_obj *fo = get_object_meta(cp); - if (!fo->taken) - return; - - assert(cp->ctx->timeout_cb); - cp->ctx->timeout_cb(cp->ctx->arg); - //printf("Timer triggered: received packets %d expected %d\n", - // cp->reply_received_packets, cp->reply_expected_packets); + struct fixed_obj *fo = get_object_meta(cp); + if (!fo->taken) + return; + + if (cp->eo_info) { + if (cp->eo_info->req_resent < EO_MAX_RETRY_REQUEST) { + printf("EO timeout, retry\n"); + cp_restart_timer(cp, cp->ctx->timeout); + buf_list_send(cp->request.head_buffer, cp->ctx->destination, cp->impl_data); + } else { + cp->ctx->timeout_cb(cp->ctx->arg); + // Flush the data + remove_from_pending_client_pairs(cp); + free_client_pair(cp); + return; + } + cp->eo_info->req_resent++; - remove_from_pending_client_pairs(cp); - free_client_pair(cp); + } else { + assert(cp->ctx->timeout_cb); + cp->ctx->timeout_cb(cp->ctx->arg); + //printf("Timer triggered: received packets %d expected %d\n", + // cp->reply_received_packets, cp->reply_expected_packets); + + remove_from_pending_client_pairs(cp); + free_client_pair(cp); + } } /* @@ -589,15 +694,27 @@ void r2p2_send_response(long handle, struct iovec *iov, int iovcnt) struct r2p2_server_pair *sp; sp = (struct r2p2_server_pair *)handle; - r2p2_prepare_msg(&sp->reply, iov, iovcnt, RESPONSE_MSG, FIXED_ROUTE, - sp->request.req_id); - buf_list_send(sp->reply.head_buffer, &sp->request.sender, NULL); + + printf("send response for %d\n", sp->request.req_id); + + int exct_once = sp->eo_info != NULL; + + r2p2_prepare_msg(&sp->reply, iov, iovcnt, + exct_once ? RESPONSE_EXCT_ONCE : RESPONSE_MSG, + FIXED_ROUTE, sp->request.req_id); + + buf_list_send(sp->reply.head_buffer, &sp->request.sender, NULL); // Notify router router_notify(); - remove_from_pending_server_pairs(sp); - free_server_pair(sp); + if (!exct_once) { + remove_from_pending_server_pairs(sp); + free_server_pair(sp); + } else { + sp->eo_info->reply_resent = 0; + sp_restart_timer(sp, EO_TO_REPLY); + } } void r2p2_send_req(struct iovec *iov, int iovcnt, struct r2p2_ctx *ctx) @@ -605,8 +722,9 @@ void r2p2_send_req(struct iovec *iov, int iovcnt, struct r2p2_ctx *ctx) generic_buffer second_buffer; struct r2p2_client_pair *cp; uint16_t rid; + uint8_t req_type; - cp = alloc_client_pair(); + cp = alloc_client_pair(is_exct_once(ctx)); assert(cp); cp->ctx = ctx; @@ -615,10 +733,19 @@ void r2p2_send_req(struct iovec *iov, int iovcnt, struct r2p2_ctx *ctx) return; } - rid = rand(); - r2p2_prepare_msg(&cp->request, iov, iovcnt, REQUEST_MSG, - ctx->routing_policy, rid); - cp->state = cp->request.head_buffer == cp->request.tail_buffer + if (is_exct_once(ctx)) { + cp->eo_info->req_resent = 0; + rid = eo_client_info.next_seq++; + req_type = REQUEST_EXCT_ONCE; + printf("send exct once request. rid=%d\n", rid); + } else { + rid = rand(); + req_type = REQUEST_MSG; + } + + r2p2_prepare_msg(&cp->request, iov, iovcnt, req_type, + ctx->routing_policy, rid); + cp->state = cp->request.head_buffer == cp->request.tail_buffer ? R2P2_W_RESPONSE : R2P2_W_ACK; @@ -652,3 +779,75 @@ void r2p2_set_app_flow_control_fn(app_flow_control fn) { afc_fn = fn; } + +void use_exct_once(struct r2p2_ctx *ctx) +{ + ctx->routing_policy |= EXCT_ONCE_FLAG; +} + +void send_eo_ack(struct r2p2_client_pair *cp) +{ + assert(cp->eo_info); + struct iovec ack; + struct r2p2_msg ack_msg = {0}; + + ack.iov_base = &(cp->eo_info->req_resent); + ack.iov_len = sizeof(uint16_t); + r2p2_prepare_msg(&ack_msg, &ack, 1, ACK_EXCT_ONCE, FIXED_ROUTE, + cp->request.req_id); + buf_list_send(ack_msg.head_buffer, &cp->reply.sender, cp->impl_data); +#ifdef LINUX + free_buffer(ack_msg.head_buffer); +#endif +} + +void handle_ack_eo(generic_buffer gb, int len, + struct r2p2_header *r2p2h, + struct r2p2_host_tuple *source) +{ + struct r2p2_server_pair *sp; + uint16_t nb_retries; + assert(len == r2p2h->header_size + sizeof(uint16_t)); + + nb_retries = *(uint16_t *) (get_buffer_payload(gb) + r2p2h->header_size); + printf("Received ack from %d for %d with %d retries\n", source->port, r2p2h->rid, nb_retries); + + sp = find_in_pending_server_pairs(r2p2h->rid, source); + assert(sp && sp->eo_info); + sp->eo_info->req_resent = nb_retries; + if (!eo_try_garbage_collect(sp)) { + sp_restart_timer(sp, EO_TO_NETWORK_FLUSH); + } +} + +int eo_try_garbage_collect(struct r2p2_server_pair *sp) +{ + assert(sp != NULL || sp->eo_info != NULL); + + if (sp->eo_info->req_received > sp->eo_info->req_resent) { + printf("GC early for req %d, received %d/%d\n", sp->request.req_id, sp->eo_info->req_received, sp->eo_info->req_resent); + remove_from_pending_server_pairs(sp); + free_server_pair(sp); + return 1; + } else return 0; + +} + +void sp_timer_triggered(struct r2p2_server_pair *sp) +{ + int timeout; + assert(sp && sp->eo_info); + + if (sp->eo_info->req_resent == ACK_NOT_RECEIVED && sp->eo_info->reply_resent < EO_MAX_RETRY_REPLY) { + printf("Retransmits based on timeout "); __debug_dump(); + buf_list_send(sp->reply.head_buffer, &sp->request.sender, NULL); + timeout = ++sp->eo_info->reply_resent == EO_MAX_RETRY_REPLY ? + EO_TO_NETWORK_FLUSH : EO_TO_REPLY; + sp_restart_timer(sp, timeout); + } else { + printf("GC by timeout for req %d, received %d/%d\n", sp->request.req_id, sp->eo_info->req_received, sp->eo_info->req_resent); + remove_from_pending_server_pairs(sp); + free_server_pair(sp); + } + +} From b56a8145bdbf8af358d95391340ecf6de4e1c547 Mon Sep 17 00:00:00 2001 From: Paul Renauld Date: Mon, 11 May 2020 23:36:00 +0200 Subject: [PATCH 02/15] compile --- r2p2/inc/r2p2/api-internal.h | 7 +-- r2p2/r2p2-common.c | 114 ++++++++++++++++++----------------- 2 files changed, 62 insertions(+), 59 deletions(-) diff --git a/r2p2/inc/r2p2/api-internal.h b/r2p2/inc/r2p2/api-internal.h index 4c4e0a6..e97ebd0 100644 --- a/r2p2/inc/r2p2/api-internal.h +++ b/r2p2/inc/r2p2/api-internal.h @@ -238,8 +238,9 @@ void handle_incoming_pck(generic_buffer gb, int len, #endif void timer_triggered(struct r2p2_client_pair *cp); void sp_timer_triggered(struct r2p2_server_pair *sp); -/* Exposed only for lancet */ -void forward_request(struct r2p2_server_pair *sp); // FIXME + +void forward_request(struct r2p2_server_pair *sp); + struct r2p2_server_pair *alloc_server_pair(void); void free_server_pair(struct r2p2_server_pair *sp); void r2p2_msg_add_payload(struct r2p2_msg *msg, generic_buffer gb); @@ -261,8 +262,6 @@ void sp_get_timer(struct r2p2_server_pair *sp); int sp_restart_timer(struct r2p2_server_pair *sp, long timeout); void sp_free_timer(struct r2p2_server_pair *sp); -void router_notify(void); - /* * Exactly Once specific */ diff --git a/r2p2/r2p2-common.c b/r2p2/r2p2-common.c index 8794f67..5e048b6 100644 --- a/r2p2/r2p2-common.c +++ b/r2p2/r2p2-common.c @@ -67,8 +67,6 @@ static __thread struct fixed_linked_list pending_server_pairs = {0}; static __thread struct iovec to_app_iovec[0xFF]; static __thread uint16_t rid = 0; -static __thread struct r2p2_eo_client_info eo_client_info = {0}; // possible improvement: alloc only for client - #if DEBUG static void print_cp(void* __cp) { struct r2p2_client_pair* cp = (struct r2p2_client_pair*) __cp; @@ -147,7 +145,7 @@ static void free_client_pair(struct r2p2_client_pair *cp) free_object(cp); } -static struct r2p2_server_pair *alloc_server_pair(int with_eo_info) +static struct r2p2_server_pair *__alloc_server_pair(int with_eo_info) { struct r2p2_server_pair *sp; @@ -156,14 +154,19 @@ static struct r2p2_server_pair *alloc_server_pair(int with_eo_info) bzero(sp, sizeof(struct r2p2_server_pair)); - if (with_eo_info) { - sp->eo_info = malloc(sizeof(struct r2p2_sp_exct_once_info)); // TODO: use alloc_object - assert(sp->eo_info); - } + if (with_eo_info) { + sp->eo_info = malloc(sizeof(struct r2p2_sp_exct_once_info)); // TODO: use alloc_object + assert(sp->eo_info); + } return sp; } +struct r2p2_server_pair *alloc_server_pair() { + return __alloc_server_pair(0); +} + + void free_server_pair(struct r2p2_server_pair *sp) { generic_buffer gb; @@ -308,13 +311,13 @@ void r2p2_prepare_msg(struct r2p2_msg *msg, struct iovec *iov, int iovcnt, uint8_t req_type, uint8_t policy, uint16_t req_id) { unsigned int iov_idx, buffer_cnt, total_payload, single_packet_msg, - is_first, should_small_first; + is_first, should_small_first, header_size; int i, bufferleft, copied, tocopy; struct r2p2_header *r2p2h; generic_buffer gb, new_gb; char *target, *src; - header_size = MIN_HEADER_SIZE; + header_size = MIN_HEADER_SIZE; // Compute the total payload msg->req_id = req_id; @@ -542,24 +545,24 @@ static void handle_request(generic_buffer gb, int len, char ack_payload[] = "ACK"; struct iovec ack; struct r2p2_msg ack_msg = {0}; - int exct_once; + int exct_once; int was_in_pending_sp = 0; - exct_once = get_msg_type(r2p2h) == REQUEST_EXCT_ONCE; + exct_once = get_msg_type(r2p2h) == REQUEST_EXCT_ONCE; req_id = r2p2h->rid; if (exct_once) { - sp = find_in_pending_server_pairs(req_id, source); - if (sp != NULL) { - assert(sp->eo_info); - if (is_first(r2p2h)) { - sp->eo_info->req_received++; -// buf_list_send(sp->reply.head_buffer, &sp->request.sender, NULL); - eo_try_garbage_collect(sp); - } - free_buffer(gb); - return; - } + sp = find_in_pending_server_pairs(req_id, source); + if (sp != NULL) { + assert(sp->eo_info); + if (is_first(r2p2h)) { + sp->eo_info->req_received++; + // buf_list_send(sp->reply.head_buffer, &sp->request.sender, NULL); + eo_try_garbage_collect(sp); + } + free_buffer(gb); + return; + } } if (is_first(r2p2h)) { @@ -569,17 +572,17 @@ static void handle_request(generic_buffer gb, int len, * src ip port is already there * remove before starting the new one */ - sp = alloc_server_pair(exct_once); + sp = __alloc_server_pair(exct_once); assert(sp); sp->request.sender = *source; sp->request.req_id = req_id; sp->request_expected_packets = r2p2h->p_order; sp->request_received_packets = 1; if (exct_once) { - sp->eo_info->req_received = 1; - sp->eo_info->req_resent = ACK_NOT_RECEIVED; - sp->eo_info->reply_resent = 0; - sp_get_timer(sp); + sp->eo_info->req_received = 1; + sp->eo_info->req_resent = ACK_NOT_RECEIVED; + sp->eo_info->reply_resent = 0; + sp_get_timer(sp); } /* Flow control only request messages, not Raft reqs */ @@ -600,14 +603,16 @@ static void handle_request(generic_buffer gb, int len, // send ACK ack.iov_base = ack_payload; ack.iov_len = 3; - r2p2_prepare_msg(&ack_msg, &ack, 1, ACK_MSG, FIXED_ROUTE, req_id); + r2p2_prepare_msg(&ack_msg, &ack, 1, ACK_MSG, FIXED_ROUTE, + req_id); buf_list_send(ack_msg.head_buffer, source, NULL); #ifdef LINUX free_buffer(ack_msg.head_buffer); #endif - } else if (exct_once) { - // add to pending request - add_to_pending_server_pairs(sp); + } else if (exct_once) { + // add to pending request + add_to_pending_server_pairs(sp); + } } } else { // find in pending msgs @@ -794,22 +799,23 @@ static inline void __r2p2_send_response(long handle, struct iovec *iov, } else { r2p2_prepare_msg(&sp->reply, iov, iovcnt, - exct_once ? RESPONSE_EXCT_ONCE : RESPONSE_MSG, - FIXED_ROUTE, sp->request.req_id); + exct_once ? RESPONSE_EXCT_ONCE : RESPONSE_MSG, + FIXED_ROUTE, sp->request.req_id); - buf_list_send(sp->reply.head_buffer, &sp->request.sender, NULL); + buf_list_send(sp->reply.head_buffer, &sp->request.sender, NULL); // Notify router not for Raft requests if (!is_raft_msg(r2p2h)) router_notify(sp->request.sender.ip, sp->request.sender.port, - sp->request.req_id); + sp->request.req_id); - if (!exct_once) { - remove_from_pending_server_pairs(sp); - free_server_pair(sp); - } else { - sp->eo_info->reply_resent = 0; - sp_restart_timer(sp, EO_TO_REPLY); + if (!exct_once) { + // remove_from_pending_server_pairs(sp); + free_server_pair(sp); + } else { + sp->eo_info->reply_resent = 0; + sp_restart_timer(sp, EO_TO_REPLY); + } } } @@ -838,8 +844,6 @@ static inline void __r2p2_send_req(struct iovec *iov, int iovcnt, { generic_buffer second_buffer; struct r2p2_client_pair *cp; - uint16_t rid; - uint8_t req_type; cp = alloc_client_pair(is_exct_once(ctx)); assert(cp); @@ -850,19 +854,18 @@ static inline void __r2p2_send_req(struct iovec *iov, int iovcnt, return; } - if (is_exct_once(ctx)) { - cp->eo_info->req_resent = 0; - rid = eo_client_info.next_seq++; - req_type = REQUEST_EXCT_ONCE; - printf("send exct once request. rid=%d\n", rid); - } else { - rid = rand(); - req_type = REQUEST_MSG; - } + // FIXME: Make sure the rid is available + //rid = rand(); + rid++; + + if (req_type == REQUEST_EXCT_ONCE) { + cp->eo_info->req_resent = 0; + printf("send exct once request. rid=%d\n", rid); + } - r2p2_prepare_msg(&cp->request, iov, iovcnt, req_type, + r2p2_prepare_msg(&cp->request, iov, iovcnt, req_type, ctx->routing_policy, rid); - cp->state = cp->request.head_buffer == cp->request.tail_buffer + cp->state = cp->request.head_buffer == cp->request.tail_buffer ? R2P2_W_RESPONSE : R2P2_W_ACK; @@ -890,7 +893,8 @@ static inline void __r2p2_send_req(struct iovec *iov, int iovcnt, void r2p2_send_req(struct iovec *iov, int iovcnt, struct r2p2_ctx *ctx) { - __r2p2_send_req(iov, iovcnt, ctx, REQUEST_MSG); + __r2p2_send_req(iov, iovcnt, ctx, + is_exct_once(ctx) ? REQUEST_EXCT_ONCE : REQUEST_MSG); } #ifdef WITH_RAFT From 05a8987004766f266322ab5fd9b8f8520bafafb1 Mon Sep 17 00:00:00 2001 From: Paul Renauld Date: Tue, 12 May 2020 00:03:18 +0200 Subject: [PATCH 03/15] some merge fixes --- r2p2/inc/r2p2/api-internal.h | 12 --- r2p2/inc/r2p2/api.h | 1 - r2p2/linux-backend.c | 189 ++++++++++++++++++----------------- r2p2/r2p2-common.c | 92 ++++++++--------- 4 files changed, 140 insertions(+), 154 deletions(-) diff --git a/r2p2/inc/r2p2/api-internal.h b/r2p2/inc/r2p2/api-internal.h index e97ebd0..133388c 100644 --- a/r2p2/inc/r2p2/api-internal.h +++ b/r2p2/inc/r2p2/api-internal.h @@ -132,18 +132,6 @@ struct r2p2_server_pair { // Add here fields for garbage collection, e.g. last received }; -enum { - EO_NEW = 0, - EO_IN_PROGRESS, - EO_COMPLETED, - EO_STALE -}; - -struct r2p2_eo_client_info { - uint16_t next_seq; -// uint16_t extended_rid; // exclusive -}; - static inline int is_response(struct r2p2_header *h) { return ((h->type_policy & 0xF0) == (RESPONSE_MSG << 4)) || diff --git a/r2p2/inc/r2p2/api.h b/r2p2/inc/r2p2/api.h index 9e19063..42bfca0 100644 --- a/r2p2/inc/r2p2/api.h +++ b/r2p2/inc/r2p2/api.h @@ -49,7 +49,6 @@ enum { enum { ERR_NO_SOCKET=1, ERR_DROP_MSG, - ERR_FULL_EXCT_ONCE_BUFFER }; struct __attribute__((packed)) r2p2_ctx { diff --git a/r2p2/linux-backend.c b/r2p2/linux-backend.c index 4ababb1..80fa64c 100644 --- a/r2p2/linux-backend.c +++ b/r2p2/linux-backend.c @@ -109,9 +109,9 @@ int r2p2_init_per_core(int core_id, int core_count) } #if DEBUG - event.events = EPOLLIN; + event.events = EPOLLIN; event.data.fd = 0; - ret = epoll_ctl(efd, EPOLL_CTL_ADD, 0, &event); + ret = epoll_ctl(efd, EPOLL_CTL_ADD, 0, &event); if (ret) return -1; #endif @@ -196,21 +196,21 @@ int r2p2_init_per_core(int core_id, int core_count) tp.count = 0; tp.idx = 0; // Create the free timers - for (i = 0; i < TIMERPOOL_SIZE; i++) { - tfd = timerfd_create(CLOCK_REALTIME, TFD_NONBLOCK); - - tp.timers[i].fd = tfd; - tp.timers[i].taken = 0; - tp.timers[i].data = NULL; - - event.events = EPOLLIN; - event.data.ptr = (void *)&(tp.timers[i]); - ret = epoll_ctl(efd, EPOLL_CTL_ADD, tfd, &event); - if (ret) { - perror("epoll_ctl"); - return -1; - } - } + for (i = 0; i < TIMERPOOL_SIZE; i++) { + tfd = timerfd_create(CLOCK_REALTIME, TFD_NONBLOCK); + + tp.timers[i].fd = tfd; + tp.timers[i].taken = 0; + tp.timers[i].data = NULL; + + event.events = EPOLLIN; + event.data.ptr = (void *)&(tp.timers[i]); + ret = epoll_ctl(efd, EPOLL_CTL_ADD, tfd, &event); + if (ret) { + perror("epoll_ctl"); + return -1; + } + } return 0; } @@ -238,20 +238,20 @@ static struct r2p2_socket *get_socket(void) static struct free_timer *get_timer(void) { - struct free_timer *res; - uint32_t idx; + struct free_timer *res; + uint32_t idx; - if (tp.count >= TIMERPOOL_SIZE) - return NULL; + if (tp.count >= TIMERPOOL_SIZE) + return NULL; - while (tp.timers[tp.idx++ & (TIMERPOOL_SIZE - 1)].taken) - ; - idx = (tp.idx - 1) & (TIMERPOOL_SIZE - 1); - res = &tp.timers[idx]; - tp.timers[idx].taken = 1; - tp.count++; + while (tp.timers[tp.idx++ & (TIMERPOOL_SIZE - 1)].taken) + ; + idx = (tp.idx - 1) & (TIMERPOOL_SIZE - 1); + res = &tp.timers[idx]; + tp.timers[idx].taken = 1; + tp.count++; - return res; + return res; } static int __disarm_timer(int timerfd) @@ -274,7 +274,7 @@ static void free_socket(struct r2p2_socket *s) static void linux_on_client_pair_free(void *data) { - printf("Free client pair\n"); + printf("Free client pair\n"); struct r2p2_socket *sock = (struct r2p2_socket *)data; __disarm_timer(sock->tfd); free_socket(sock); @@ -288,30 +288,33 @@ static void handle_timer_for_socket(struct r2p2_socket *s) timer_triggered(s->cp); } -static void handle_free_timer(struct free_timer *ft) { - assert(ft && ft->data && ft->taken); - __disarm_timer(ft->fd); - sp_timer_triggered((struct r2p2_server_pair *)ft->data); +static void handle_free_timer(struct free_timer *ft) +{ + assert(ft && ft->data && ft->taken); + __disarm_timer(ft->fd); + sp_timer_triggered((struct r2p2_server_pair *)ft->data); } -void sp_get_timer(struct r2p2_server_pair *sp) { - struct free_timer *t; - t = get_timer(); - if (t == NULL) { - perror("Could not allocate free timer"); - } - t->data = sp; - sp->eo_info->timer = t; +void sp_get_timer(struct r2p2_server_pair *sp) +{ + struct free_timer *t; + t = get_timer(); + if (t == NULL) { + perror("Could not allocate free timer"); + } + t->data = sp; + sp->eo_info->timer = t; } -void sp_free_timer(struct r2p2_server_pair *sp) { - struct free_timer *t = sp->eo_info->timer; - if (t) { - t->taken = 0; - t->data = NULL; - __disarm_timer(t->fd); - sp->eo_info->timer = NULL; - } +void sp_free_timer(struct r2p2_server_pair *sp) +{ + struct free_timer *t = sp->eo_info->timer; + if (t) { + t->taken = 0; + t->data = NULL; + __disarm_timer(t->fd); + sp->eo_info->timer = NULL; + } } /* @@ -434,29 +437,28 @@ void r2p2_poll(void) for (i = 0; i < ready; i++) { #if DEBUG - if (events[i].data.fd == 0) { - int c; - while ((c = getchar()) != '\n' && c != EOF) { } - __debug_dump(); - continue; - } + if (events[i].data.fd == 0) { + int c; + while ((c = getchar()) != '\n' && c != EOF) { } + __debug_dump(); + continue; + } #endif - event_arg = (struct r2p2_socket *)events[i].data.ptr; - assert(event_arg); + event_arg = (struct r2p2_socket *)events[i].data.ptr; + assert(event_arg); if (events[i].events & EPOLLIN) { - is_socket_timer_event = - (unsigned long) event_arg % sizeof(struct r2p2_socket); - is_free_timer_event = - (void *) tp.timers <= events[i].data.ptr && - events[i].data.ptr < (void *) (tp.timers + TIMERPOOL_SIZE); - - if (is_free_timer_event) { - handle_free_timer((struct free_timer *)events[i].data.ptr); - - } else if (is_socket_timer_event) { - assert((unsigned long)event_arg % sizeof(struct r2p2_socket) == - 4); + is_socket_timer_event = + (unsigned long)event_arg % sizeof(struct r2p2_socket); + is_free_timer_event = + (void *)tp.timers <= events[i].data.ptr && + events[i].data.ptr < (void *)(tp.timers + TIMERPOOL_SIZE); + + if (is_free_timer_event) { + handle_free_timer((struct free_timer *)events[i].data.ptr); + + } else if (is_socket_timer_event) { + assert((unsigned long)event_arg % sizeof(struct r2p2_socket) == 4); s = container_of(event_arg, struct r2p2_socket, tfd); handle_timer_for_socket(s); } else { @@ -576,40 +578,41 @@ int disarm_timer(void *timer) return __disarm_timer(tfd); } -static void __restart_timer(int tfd, long timeout) { - struct itimerspec ts; +static void __restart_timer(int tfd, long timeout) +{ + struct itimerspec ts; - ts.it_interval.tv_sec = 0; - ts.it_interval.tv_nsec = 0; - ts.it_value.tv_sec = timeout / 1000000; - ts.it_value.tv_nsec = (timeout % 1000000) * 1000; + ts.it_interval.tv_sec = 0; + ts.it_interval.tv_nsec = 0; + ts.it_value.tv_sec = timeout / 1000000; + ts.it_value.tv_nsec = (timeout % 1000000) * 1000; - if (timerfd_settime(tfd, 0, &ts, NULL) < 0) { - perror("Error resetting timer"); - assert(0); - } + if (timerfd_settime(tfd, 0, &ts, NULL) < 0) { + perror("Error resetting timer"); + assert(0); + } } - int cp_restart_timer(struct r2p2_client_pair *cp, long timeout) { - struct r2p2_socket *s; + struct r2p2_socket *s; - s = (struct r2p2_socket *)cp->impl_data; - assert(s->taken && s->cp == cp); - __restart_timer(s->tfd, timeout); - cp->timer = (void *)(long)s->tfd; - return 0; + s = (struct r2p2_socket *)cp->impl_data; + assert(s->taken && s->cp == cp); + __restart_timer(s->tfd, timeout); + cp->timer = (void *)(long)s->tfd; + return 0; } -int sp_restart_timer(struct r2p2_server_pair *sp, long timeout) { - struct free_timer *timer; - assert(sp && sp->eo_info && sp->eo_info->timer); +int sp_restart_timer(struct r2p2_server_pair *sp, long timeout) +{ + struct free_timer *timer; + assert(sp && sp->eo_info && sp->eo_info->timer); - timer = (struct free_timer *)sp->eo_info->timer; - assert(timer->data == sp && timer->taken); - __restart_timer(timer->fd, timeout); - return 0; + timer = (struct free_timer *)sp->eo_info->timer; + assert(timer->data == sp && timer->taken); + __restart_timer(timer->fd, timeout); + return 0; } void router_notify(uint32_t ip, uint16_t port, uint16_t rid) diff --git a/r2p2/r2p2-common.c b/r2p2/r2p2-common.c index 5e048b6..ef16baf 100644 --- a/r2p2/r2p2-common.c +++ b/r2p2/r2p2-common.c @@ -455,16 +455,16 @@ static void handle_response(generic_buffer gb, int len, cp->reply.sender = *source; - switch(get_msg_type(r2p2h)) { - case RESPONSE_EXCT_ONCE: - // todo: handle already received response - assert(cp->eo_info); - send_eo_ack(cp); - // no break, continue like regular response FIXME + switch (get_msg_type(r2p2h)) { + case RESPONSE_EXCT_ONCE: + // todo: handle already received response + assert(cp->eo_info); + send_eo_ack(cp); + // no break, continue like regular response case RAFT_REP: -#ifndef WITH_RAFT - assert(0); -#endif +//#ifndef WITH_RAFT +// assert(0); +//#endif case RESPONSE_MSG: assert(cp->state == R2P2_W_RESPONSE); set_buffer_payload_size(gb, len); @@ -539,7 +539,7 @@ static void handle_request(generic_buffer gb, int len, struct r2p2_header *r2p2h, struct r2p2_host_tuple *source) { - printf("handle_request start: "); __debug_dump(); + printf("handle_request start: "); __debug_dump(); struct r2p2_server_pair *sp; uint16_t req_id; char ack_payload[] = "ACK"; @@ -603,8 +603,7 @@ static void handle_request(generic_buffer gb, int len, // send ACK ack.iov_base = ack_payload; ack.iov_len = 3; - r2p2_prepare_msg(&ack_msg, &ack, 1, ACK_MSG, FIXED_ROUTE, - req_id); + r2p2_prepare_msg(&ack_msg, &ack, 1, ACK_MSG, FIXED_ROUTE, req_id); buf_list_send(ack_msg.head_buffer, source, NULL); #ifdef LINUX free_buffer(ack_msg.head_buffer); @@ -715,7 +714,7 @@ void handle_incoming_pck(generic_buffer gb, int len, handle_response(gb, len, r2p2h, source, local_host); #endif else if (is_ack_exct_once(r2p2h)) - handle_ack_eo(gb, len, r2p2h, source); + handle_ack_eo(gb, len, r2p2h, source); else handle_request(gb, len, r2p2h, source); } @@ -739,33 +738,32 @@ int r2p2_backend_init_per_core(void) void timer_triggered(struct r2p2_client_pair *cp) { - struct fixed_obj *fo = get_object_meta(cp); - if (!fo->taken) - return; - - if (cp->eo_info) { - if (cp->eo_info->req_resent < EO_MAX_RETRY_REQUEST) { - printf("EO timeout, retry\n"); - cp_restart_timer(cp, cp->ctx->timeout); - buf_list_send(cp->request.head_buffer, cp->ctx->destination, cp->impl_data); - } else { - cp->ctx->timeout_cb(cp->ctx->arg); - // Flush the data - remove_from_pending_client_pairs(cp); - free_client_pair(cp); - return; - } - cp->eo_info->req_resent++; + struct fixed_obj *fo = get_object_meta(cp); + if (!fo->taken) + return; + + if (cp->eo_info) { + if (cp->eo_info->req_resent < EO_MAX_RETRY_REQUEST) { + printf("EO timeout, retry\n"); + cp_restart_timer(cp, cp->ctx->timeout); + buf_list_send(cp->request.head_buffer, cp->ctx->destination, + cp->impl_data); + } else { + cp->ctx->timeout_cb(cp->ctx->arg); + // Flush the data + remove_from_pending_client_pairs(cp); + free_client_pair(cp); + return; + } + cp->eo_info->req_resent++; } else { - assert(cp->ctx->timeout_cb); - cp->ctx->timeout_cb(cp->ctx->arg); - //printf("Timer triggered: received packets %d expected %d\n", - // cp->reply_received_packets, cp->reply_expected_packets); + assert(cp->ctx->timeout_cb); + cp->ctx->timeout_cb(cp->ctx->arg); - remove_from_pending_client_pairs(cp); - free_client_pair(cp); - } + remove_from_pending_client_pairs(cp); + free_client_pair(cp); + } } /* @@ -778,7 +776,6 @@ static inline void __r2p2_send_response(long handle, struct iovec *iov, struct r2p2_header *r2p2h; sp = (struct r2p2_server_pair *)handle; - int exct_once = sp->eo_info != NULL; r2p2h = (struct r2p2_header *)get_buffer_payload(sp->request.head_buffer); if (is_replicated_req(r2p2h)) { #ifndef WITH_RAFT @@ -798,9 +795,8 @@ static inline void __r2p2_send_response(long handle, struct iovec *iov, sp->request.req_id); } else { - r2p2_prepare_msg(&sp->reply, iov, iovcnt, - exct_once ? RESPONSE_EXCT_ONCE : RESPONSE_MSG, - FIXED_ROUTE, sp->request.req_id); + r2p2_prepare_msg(&sp->reply, iov, iovcnt, rep_type, FIXED_ROUTE, + sp->request.req_id); buf_list_send(sp->reply.head_buffer, &sp->request.sender, NULL); @@ -809,7 +805,7 @@ static inline void __r2p2_send_response(long handle, struct iovec *iov, router_notify(sp->request.sender.ip, sp->request.sender.port, sp->request.req_id); - if (!exct_once) { + if (rep_type == REQUEST_EXCT_ONCE) { // remove_from_pending_server_pairs(sp); free_server_pair(sp); } else { @@ -821,7 +817,10 @@ static inline void __r2p2_send_response(long handle, struct iovec *iov, void r2p2_send_response(long handle, struct iovec *iov, int iovcnt) { - return __r2p2_send_response(handle, iov, iovcnt, RESPONSE_MSG); + int req_type = ((struct r2p2_server_pair *)handle)->eo_info == NULL + ? RESPONSE_MSG + : RESPONSE_EXCT_ONCE; + return __r2p2_send_response(handle, iov, iovcnt, req_type); } #ifdef WITH_RAFT @@ -863,11 +862,8 @@ static inline void __r2p2_send_req(struct iovec *iov, int iovcnt, printf("send exct once request. rid=%d\n", rid); } - r2p2_prepare_msg(&cp->request, iov, iovcnt, req_type, - ctx->routing_policy, rid); - cp->state = cp->request.head_buffer == cp->request.tail_buffer - ? R2P2_W_RESPONSE - : R2P2_W_ACK; + r2p2_prepare_msg(&cp->request, iov, iovcnt, req_type, ctx->routing_policy, + rid); add_to_pending_client_pairs(cp); From d1f1052fbd82643705760dad2b2e76948c5f60d8 Mon Sep 17 00:00:00 2001 From: Paul Renauld Date: Tue, 12 May 2020 13:51:01 +0200 Subject: [PATCH 04/15] format --- r2p2/inc/r2p2/api-internal.h | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/r2p2/inc/r2p2/api-internal.h b/r2p2/inc/r2p2/api-internal.h index 133388c..58d7584 100644 --- a/r2p2/inc/r2p2/api-internal.h +++ b/r2p2/inc/r2p2/api-internal.h @@ -90,8 +90,8 @@ struct __attribute__((__packed__)) r2p2_msg { }; struct r2p2_cp_exct_once_info { - // extended rid - uint16_t req_resent; + // extended rid + uint16_t req_resent; }; struct r2p2_client_pair { @@ -110,13 +110,12 @@ struct r2p2_client_pair { struct r2p2_cp_exct_once_info *eo_info; }; - struct r2p2_sp_exct_once_info { - // extended rid - uint16_t req_received; - uint16_t req_resent; - uint16_t reply_resent; - void *timer; + // extended rid + uint16_t req_received; + uint16_t req_resent; + uint16_t reply_resent; + void *timer; }; struct r2p2_server_pair { From 203a27dec567119be4ce78790652928f34eacab9 Mon Sep 17 00:00:00 2001 From: Paul Renauld Date: Tue, 12 May 2020 14:17:58 +0200 Subject: [PATCH 05/15] receives --- r2p2/linux-backend.c | 8 ++++---- r2p2/r2p2-common.c | 8 ++++---- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/r2p2/linux-backend.c b/r2p2/linux-backend.c index 80fa64c..d5ab710 100644 --- a/r2p2/linux-backend.c +++ b/r2p2/linux-backend.c @@ -438,10 +438,10 @@ void r2p2_poll(void) #if DEBUG if (events[i].data.fd == 0) { - int c; - while ((c = getchar()) != '\n' && c != EOF) { } - __debug_dump(); - continue; + int c; + while ((c = getchar()) != '\n' && c != EOF) {} + __debug_dump(); + continue; } #endif diff --git a/r2p2/r2p2-common.c b/r2p2/r2p2-common.c index ef16baf..06b8de8 100644 --- a/r2p2/r2p2-common.c +++ b/r2p2/r2p2-common.c @@ -692,10 +692,10 @@ void handle_incoming_pck(generic_buffer gb, int len, printf("I received %d\n", len); assert((unsigned)len >= MIN_HEADER_SIZE); - buf = get_buffer_payload(gb); - r2p2h = (struct r2p2_header *)buf; - printf("\nReceived packet from %d:%d, seq=%d, len=%d\n", source->ip, source->port, r2p2h->rid, len); -// assert(r2p2h->header_size == (get_msg_type(r2p2h) == REQUEST_EXCT_ONCE ? sizeof(struct r2p2_header) : MIN_HEADER_SIZE)); + buf = get_buffer_payload(gb); + r2p2h = (struct r2p2_header *)buf; + printf("\nReceived packet from %d:%d, seq=%d, len=%d\n", source->ip, source->port, r2p2h->rid, len); + // assert(r2p2h->header_size == (get_msg_type(r2p2h) == REQUEST_EXCT_ONCE ? sizeof(struct r2p2_header) : MIN_HEADER_SIZE)); // Fix endianness r2p2h->rid = ntohs(r2p2h->rid); From bcb460e923aeaa0fcd339dc3de88c91005a5f873 Mon Sep 17 00:00:00 2001 From: Paul Renauld Date: Tue, 12 May 2020 14:37:02 +0200 Subject: [PATCH 06/15] seems to work --- r2p2/inc/r2p2/api-internal.h | 2 +- r2p2/r2p2-common.c | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/r2p2/inc/r2p2/api-internal.h b/r2p2/inc/r2p2/api-internal.h index 58d7584..35a543d 100644 --- a/r2p2/inc/r2p2/api-internal.h +++ b/r2p2/inc/r2p2/api-internal.h @@ -40,7 +40,7 @@ #define MAGIC 0xCC #define SHOULD_REPLY 0x01 -#define EXCT_ONCE_FLAG 0x02 +#define EXCT_ONCE_FLAG 0x04 #define EO_MAX_RETRY_REQUEST 5 #define EO_MAX_RETRY_REPLY 5 #define EO_TO_REPLY 2500000 diff --git a/r2p2/r2p2-common.c b/r2p2/r2p2-common.c index 06b8de8..f3a54d0 100644 --- a/r2p2/r2p2-common.c +++ b/r2p2/r2p2-common.c @@ -608,10 +608,10 @@ static void handle_request(generic_buffer gb, int len, #ifdef LINUX free_buffer(ack_msg.head_buffer); #endif - } else if (exct_once) { - // add to pending request - add_to_pending_server_pairs(sp); } + } else if (exct_once) { + // add to pending request + add_to_pending_server_pairs(sp); } } else { // find in pending msgs From dd8e20c15eeffa7ae7349f2df1de01a7361d8e65 Mon Sep 17 00:00:00 2001 From: Paul Renauld Date: Tue, 12 May 2020 14:58:18 +0200 Subject: [PATCH 07/15] fix bug --- r2p2/r2p2-common.c | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/r2p2/r2p2-common.c b/r2p2/r2p2-common.c index f3a54d0..4dcfe12 100644 --- a/r2p2/r2p2-common.c +++ b/r2p2/r2p2-common.c @@ -154,6 +154,8 @@ static struct r2p2_server_pair *__alloc_server_pair(int with_eo_info) bzero(sp, sizeof(struct r2p2_server_pair)); + printf("alloc sp with eo:%d\n", with_eo_info); + if (with_eo_info) { sp->eo_info = malloc(sizeof(struct r2p2_sp_exct_once_info)); // TODO: use alloc_object assert(sp->eo_info); @@ -204,12 +206,14 @@ static void add_to_pending_client_pairs(struct r2p2_client_pair *cp) static void add_to_pending_server_pairs(struct r2p2_server_pair *sp) { + printf("add_to_pending_server_pairs\n"); struct fixed_obj *fo = get_object_meta(sp); add_to_list(&pending_server_pairs, fo); } static void remove_from_pending_server_pairs(struct r2p2_server_pair *sp) { + printf("remove_from_pending_server_pairs\n"); struct fixed_obj *fo = get_object_meta(sp); remove_from_list(&pending_server_pairs, fo); } @@ -372,6 +376,7 @@ void r2p2_prepare_msg(struct r2p2_msg *msg, struct iovec *iov, int iovcnt, r2p2h->magic = MAGIC; r2p2h->rid = req_id; r2p2h->header_size = header_size; + printf("Send with policy: %d\n", (int) policy); r2p2h->type_policy = (req_type << 4) | (0x0F & policy); r2p2h->p_order = htons(buffer_cnt++); r2p2h->flags = 0; @@ -627,6 +632,7 @@ static void handle_request(generic_buffer gb, int len, free_server_pair(sp); return; } + printf("was_in_pending_sp\n"); was_in_pending_sp = 1; } set_buffer_payload_size(gb, len); @@ -695,6 +701,7 @@ void handle_incoming_pck(generic_buffer gb, int len, buf = get_buffer_payload(gb); r2p2h = (struct r2p2_header *)buf; printf("\nReceived packet from %d:%d, seq=%d, len=%d\n", source->ip, source->port, r2p2h->rid, len); + __debug_dump(); // assert(r2p2h->header_size == (get_msg_type(r2p2h) == REQUEST_EXCT_ONCE ? sizeof(struct r2p2_header) : MIN_HEADER_SIZE)); // Fix endianness @@ -806,11 +813,11 @@ static inline void __r2p2_send_response(long handle, struct iovec *iov, sp->request.req_id); if (rep_type == REQUEST_EXCT_ONCE) { - // remove_from_pending_server_pairs(sp); - free_server_pair(sp); - } else { sp->eo_info->reply_resent = 0; sp_restart_timer(sp, EO_TO_REPLY); + } else { + // remove_from_pending_server_pairs(sp); + free_server_pair(sp); } } } From bd812c9402982a1225113d8edebf94375971cb15 Mon Sep 17 00:00:00 2001 From: Paul Renauld Date: Sat, 23 May 2020 14:43:47 +0200 Subject: [PATCH 08/15] remove dynamic header size --- r2p2/inc/r2p2/api-internal.h | 2 -- r2p2/r2p2-common.c | 29 ++++++++++++++--------------- 2 files changed, 14 insertions(+), 17 deletions(-) diff --git a/r2p2/inc/r2p2/api-internal.h b/r2p2/inc/r2p2/api-internal.h index 35a543d..1e63454 100644 --- a/r2p2/inc/r2p2/api-internal.h +++ b/r2p2/inc/r2p2/api-internal.h @@ -47,8 +47,6 @@ #define EO_TO_NETWORK_FLUSH 5000000 #define ACK_NOT_RECEIVED UINT16_MAX -#define MIN_HEADER_SIZE (sizeof(struct r2p2_header))// - sizeof(uint16_t)) - enum { REQUEST_MSG = 0, RESPONSE_MSG, diff --git a/r2p2/r2p2-common.c b/r2p2/r2p2-common.c index 4dcfe12..76db93a 100644 --- a/r2p2/r2p2-common.c +++ b/r2p2/r2p2-common.c @@ -315,15 +315,12 @@ void r2p2_prepare_msg(struct r2p2_msg *msg, struct iovec *iov, int iovcnt, uint8_t req_type, uint8_t policy, uint16_t req_id) { unsigned int iov_idx, buffer_cnt, total_payload, single_packet_msg, - is_first, should_small_first, header_size; + is_first, should_small_first; int i, bufferleft, copied, tocopy; struct r2p2_header *r2p2h; generic_buffer gb, new_gb; char *target, *src; - header_size = MIN_HEADER_SIZE; - - // Compute the total payload msg->req_id = req_id; // Fix endianness for the header req_id = htons(req_id); @@ -355,10 +352,12 @@ void r2p2_prepare_msg(struct r2p2_msg *msg, struct iovec *iov, int iovcnt, // Set the last buffer to full size if (gb) { if (is_first && should_small_first) { - set_buffer_payload_size(gb, MIN_PAYLOAD_SIZE + header_size); + set_buffer_payload_size(gb, MIN_PAYLOAD_SIZE + + sizeof(struct r2p2_header)); is_first = 0; } else - set_buffer_payload_size(gb, PAYLOAD_SIZE + header_size); + set_buffer_payload_size(gb, PAYLOAD_SIZE + + sizeof(struct r2p2_header)); } new_gb = get_buffer(); assert(new_gb); @@ -371,16 +370,15 @@ void r2p2_prepare_msg(struct r2p2_msg *msg, struct iovec *iov, int iovcnt, bufferleft = PAYLOAD_SIZE; // FIX the header r2p2h = (struct r2p2_header *)target; - bzero(r2p2h, header_size); + bzero(r2p2h, sizeof(struct r2p2_header)); r2p2h->magic = MAGIC; r2p2h->rid = req_id; - r2p2h->header_size = header_size; - printf("Send with policy: %d\n", (int) policy); + r2p2h->header_size = sizeof(struct r2p2_header); r2p2h->type_policy = (req_type << 4) | (0x0F & policy); r2p2h->p_order = htons(buffer_cnt++); r2p2h->flags = 0; - target += header_size; + target += sizeof(struct r2p2_header); } src = iov[iov_idx].iov_base; tocopy = min(bufferleft, (int)(iov[iov_idx].iov_len - copied)); @@ -395,7 +393,8 @@ void r2p2_prepare_msg(struct r2p2_msg *msg, struct iovec *iov, int iovcnt, } // Set the len of the last buffer - set_buffer_payload_size(gb, PAYLOAD_SIZE + header_size - bufferleft); + set_buffer_payload_size(gb, PAYLOAD_SIZE + sizeof(struct r2p2_header) - + bufferleft); // Fix the header of the first and last packet r2p2h = (struct r2p2_header *)get_buffer_payload(msg->head_buffer); @@ -517,9 +516,9 @@ static void handle_response(generic_buffer gb, int len, case ACK_MSG: // Send the rest packets assert(cp->state == R2P2_W_ACK); - if (len != (MIN_HEADER_SIZE + 3)) + if (len != (sizeof(struct r2p2_header) + 3)) printf("ACK msg size is %d\n", len); - assert(len == (MIN_HEADER_SIZE + 3)); + assert(len == (sizeof(struct r2p2_header) + 3)); free_buffer(gb); #ifdef LINUX rest_to_send = get_buffer_next(cp->request.head_buffer); @@ -694,9 +693,9 @@ void handle_incoming_pck(generic_buffer gb, int len, return; } #endif - if ((unsigned)len < MIN_HEADER_SIZE) + if ((unsigned)len < sizeof(struct r2p2_header)) printf("I received %d\n", len); - assert((unsigned)len >= MIN_HEADER_SIZE); + assert((unsigned)len >= sizeof(struct r2p2_header)); buf = get_buffer_payload(gb); r2p2h = (struct r2p2_header *)buf; From 9e3e43722e56c653a0712cc5e9b78b3c52b18dac Mon Sep 17 00:00:00 2001 From: Paul Renauld Date: Sat, 23 May 2020 15:08:43 +0200 Subject: [PATCH 09/15] remove dynamic header size 2 --- r2p2/inc/r2p2/api-internal.h | 5 ----- r2p2/r2p2-common.c | 4 ++-- 2 files changed, 2 insertions(+), 7 deletions(-) diff --git a/r2p2/inc/r2p2/api-internal.h b/r2p2/inc/r2p2/api-internal.h index 1e63454..73da616 100644 --- a/r2p2/inc/r2p2/api-internal.h +++ b/r2p2/inc/r2p2/api-internal.h @@ -170,11 +170,6 @@ static inline uint8_t get_msg_type(struct r2p2_header *h) return (h->type_policy & 0xF0) >> 4; } -static inline unsigned int get_header_size(const char* buf) -{ - return ((struct r2p2_header*) buf)->header_size; -} - static inline int is_replicated_req(struct r2p2_header *h) { return (get_policy(h) == REPLICATED_ROUTE || diff --git a/r2p2/r2p2-common.c b/r2p2/r2p2-common.c index 76db93a..232d1cc 100644 --- a/r2p2/r2p2-common.c +++ b/r2p2/r2p2-common.c @@ -274,8 +274,8 @@ static int prepare_to_app_iovec(struct r2p2_msg *msg) buf = get_buffer_payload(gb); assert(buf); len = get_buffer_payload_size(gb); - to_app_iovec[iovcnt].iov_base = buf + get_header_size(buf); - to_app_iovec[iovcnt++].iov_len = len - get_header_size(buf); + to_app_iovec[iovcnt].iov_base = ((struct r2p2_header *)buf) + 1; + to_app_iovec[iovcnt++].iov_len = len - sizeof(struct r2p2_header); gb = get_buffer_next(gb); assert(iovcnt < 0xFF); } From 9fd699997636e3f3d426f6676158bf8fb8483e4c Mon Sep 17 00:00:00 2001 From: Paul Renauld Date: Sat, 23 May 2020 15:29:40 +0200 Subject: [PATCH 10/15] cleanup --- r2p2/inc/r2p2/api-internal.h | 19 ++-- r2p2/inc/r2p2/r2p2-linux.h | 15 +-- r2p2/linux-backend.c | 40 ++++---- r2p2/r2p2-common.c | 183 ++++++++++++++++------------------- 4 files changed, 118 insertions(+), 139 deletions(-) diff --git a/r2p2/inc/r2p2/api-internal.h b/r2p2/inc/r2p2/api-internal.h index 73da616..0b7e810 100644 --- a/r2p2/inc/r2p2/api-internal.h +++ b/r2p2/inc/r2p2/api-internal.h @@ -70,8 +70,6 @@ struct __attribute__((__packed__)) r2p2_header { uint8_t flags; uint16_t rid; uint16_t p_order; -// uint16_t extended_rid; - // add session ID? }; struct __attribute__((__packed__)) r2p2_feedback { @@ -88,7 +86,6 @@ struct __attribute__((__packed__)) r2p2_msg { }; struct r2p2_cp_exct_once_info { - // extended rid uint16_t req_resent; }; @@ -109,7 +106,6 @@ struct r2p2_client_pair { }; struct r2p2_sp_exct_once_info { - // extended rid uint16_t req_received; uint16_t req_resent; uint16_t reply_resent; @@ -160,6 +156,11 @@ static inline int is_raft_msg(struct r2p2_header *h) ((h->type_policy & 0xF0) == (RAFT_MSG << 4)); } +static inline int is_ack_exct_once(struct r2p2_header *h) +{ + return ((h->type_policy & 0xF0) == (ACK_EXCT_ONCE << 4)); +} + static inline uint8_t get_policy(struct r2p2_header *h) { return (h->type_policy & 0xF); @@ -220,7 +221,6 @@ void timer_triggered(struct r2p2_client_pair *cp); void sp_timer_triggered(struct r2p2_server_pair *sp); void forward_request(struct r2p2_server_pair *sp); - struct r2p2_server_pair *alloc_server_pair(void); void free_server_pair(struct r2p2_server_pair *sp); void r2p2_msg_add_payload(struct r2p2_msg *msg, generic_buffer gb); @@ -250,14 +250,9 @@ static inline int is_exct_once(struct r2p2_ctx *ctx) return (ctx->routing_policy & EXCT_ONCE_FLAG) != 0; } -static inline int is_ack_exct_once(struct r2p2_header *h) -{ - return ((h->type_policy & 0xF0) == (ACK_EXCT_ONCE << 4)); -} - -void send_eo_ack(struct r2p2_client_pair *cp); +void eo_send_ack(struct r2p2_client_pair *cp); -void handle_ack_eo(generic_buffer gb, int len, struct r2p2_header *r2p2h, +void eo_handle_ack(generic_buffer gb, int len, struct r2p2_header *r2p2h, struct r2p2_host_tuple *source); int eo_try_garbage_collect(struct r2p2_server_pair *sp); diff --git a/r2p2/inc/r2p2/r2p2-linux.h b/r2p2/inc/r2p2/r2p2-linux.h index 21ab191..ecced24 100644 --- a/r2p2/inc/r2p2/r2p2-linux.h +++ b/r2p2/inc/r2p2/r2p2-linux.h @@ -57,14 +57,15 @@ struct socket_pool { struct r2p2_socket sockets[SOCKPOOL_SIZE]; }; -struct free_timer { - int fd; - int taken; - void* data; +// Used for client pairs +struct loose_timer { + int fd; + int taken; + void *data; }; struct timer_pool { - uint32_t count; - uint32_t idx; - struct free_timer timers[TIMERPOOL_SIZE]; + uint32_t count; + uint32_t idx; + struct loose_timer timers[TIMERPOOL_SIZE]; }; diff --git a/r2p2/linux-backend.c b/r2p2/linux-backend.c index d5ab710..483550a 100644 --- a/r2p2/linux-backend.c +++ b/r2p2/linux-backend.c @@ -22,22 +22,16 @@ * SOFTWARE. */ +#include #include #include #include #include #include #include - -# if __APPLE__ -#include "linuxsys4mac.h" -#else -#include #include +#include #include -#include -#endif - #include #include @@ -195,7 +189,7 @@ int r2p2_init_per_core(int core_id, int core_count) tp.count = 0; tp.idx = 0; - // Create the free timers + // Create the loose timers for (i = 0; i < TIMERPOOL_SIZE; i++) { tfd = timerfd_create(CLOCK_REALTIME, TFD_NONBLOCK); @@ -236,9 +230,9 @@ static struct r2p2_socket *get_socket(void) return res; } -static struct free_timer *get_timer(void) +static struct loose_timer *get_timer(void) { - struct free_timer *res; + struct loose_timer *res; uint32_t idx; if (tp.count >= TIMERPOOL_SIZE) @@ -274,7 +268,6 @@ static void free_socket(struct r2p2_socket *s) static void linux_on_client_pair_free(void *data) { - printf("Free client pair\n"); struct r2p2_socket *sock = (struct r2p2_socket *)data; __disarm_timer(sock->tfd); free_socket(sock); @@ -288,7 +281,7 @@ static void handle_timer_for_socket(struct r2p2_socket *s) timer_triggered(s->cp); } -static void handle_free_timer(struct free_timer *ft) +static void handle_loose_timer_triggered(struct loose_timer *ft) { assert(ft && ft->data && ft->taken); __disarm_timer(ft->fd); @@ -297,7 +290,7 @@ static void handle_free_timer(struct free_timer *ft) void sp_get_timer(struct r2p2_server_pair *sp) { - struct free_timer *t; + struct loose_timer *t; t = get_timer(); if (t == NULL) { perror("Could not allocate free timer"); @@ -308,7 +301,7 @@ void sp_get_timer(struct r2p2_server_pair *sp) void sp_free_timer(struct r2p2_server_pair *sp) { - struct free_timer *t = sp->eo_info->timer; + struct loose_timer *t = sp->eo_info->timer; if (t) { t->taken = 0; t->data = NULL; @@ -421,7 +414,7 @@ int r2p2_init(int listen_port) void r2p2_poll(void) { struct epoll_event events[MAX_EVENTS]; - int ready, i, recvlen, is_socket_timer_event, is_free_timer_event; + int ready, i, recvlen, is_socket_timer_event, is_loose_timer_event; struct r2p2_socket *s; generic_buffer gb; void *buf, *event_arg; @@ -450,15 +443,16 @@ void r2p2_poll(void) if (events[i].events & EPOLLIN) { is_socket_timer_event = (unsigned long)event_arg % sizeof(struct r2p2_socket); - is_free_timer_event = + is_loose_timer_event = (void *)tp.timers <= events[i].data.ptr && events[i].data.ptr < (void *)(tp.timers + TIMERPOOL_SIZE); - if (is_free_timer_event) { - handle_free_timer((struct free_timer *)events[i].data.ptr); - + if (is_loose_timer_event) { + handle_loose_timer_triggered( + (struct loose_timer *)events[i].data.ptr); } else if (is_socket_timer_event) { - assert((unsigned long)event_arg % sizeof(struct r2p2_socket) == 4); + assert((unsigned long)event_arg % sizeof(struct r2p2_socket) == + 4); s = container_of(event_arg, struct r2p2_socket, tfd); handle_timer_for_socket(s); } else { @@ -606,10 +600,10 @@ int cp_restart_timer(struct r2p2_client_pair *cp, long timeout) int sp_restart_timer(struct r2p2_server_pair *sp, long timeout) { - struct free_timer *timer; + struct loose_timer *timer; assert(sp && sp->eo_info && sp->eo_info->timer); - timer = (struct free_timer *)sp->eo_info->timer; + timer = (struct loose_timer *)sp->eo_info->timer; assert(timer->data == sp && timer->taken); __restart_timer(timer->fd, timeout); return 0; diff --git a/r2p2/r2p2-common.c b/r2p2/r2p2-common.c index 232d1cc..846ab20 100644 --- a/r2p2/r2p2-common.c +++ b/r2p2/r2p2-common.c @@ -62,6 +62,8 @@ static app_flow_control afc_fn = NULL; static __thread struct fixed_mempool *client_pairs; static __thread struct fixed_mempool *server_pairs; +static __thread struct fixed_mempool *client_pairs_eo_info; +static __thread struct fixed_mempool *server_pairs_eo_info; static __thread struct fixed_linked_list pending_client_pairs = {0}; static __thread struct fixed_linked_list pending_server_pairs = {0}; static __thread struct iovec to_app_iovec[0xFF]; @@ -106,8 +108,8 @@ static struct r2p2_client_pair *alloc_client_pair(int with_eo_info) bzero(cp, sizeof(struct r2p2_client_pair)); if (with_eo_info) { - cp->eo_info = malloc(sizeof(struct r2p2_cp_exct_once_info)); // TODO: use alloc_object - assert(cp->eo_info); + cp->eo_info = alloc_object(client_pairs_eo_info); + assert(cp->eo_info); } return cp; @@ -138,8 +140,8 @@ static void free_client_pair(struct r2p2_client_pair *cp) cp->on_free(cp->impl_data); if (cp->eo_info) { - free(cp->eo_info); // TODO: use free_object - cp->eo_info = NULL; + free_object(cp->eo_info); + cp->eo_info = NULL; } free_object(cp); @@ -157,7 +159,7 @@ static struct r2p2_server_pair *__alloc_server_pair(int with_eo_info) printf("alloc sp with eo:%d\n", with_eo_info); if (with_eo_info) { - sp->eo_info = malloc(sizeof(struct r2p2_sp_exct_once_info)); // TODO: use alloc_object + sp->eo_info = alloc_object(server_pairs_eo_info); assert(sp->eo_info); } @@ -189,11 +191,11 @@ void free_server_pair(struct r2p2_server_pair *sp) } #endif - if (sp->eo_info) { - sp_free_timer(sp); - free(sp->eo_info); // TODO: use free_object - sp->eo_info = NULL; - } + if (sp->eo_info) { + sp_free_timer(sp); + free_object(sp->eo_info); + sp->eo_info = NULL; + } free_object(sp); } @@ -206,14 +208,12 @@ static void add_to_pending_client_pairs(struct r2p2_client_pair *cp) static void add_to_pending_server_pairs(struct r2p2_server_pair *sp) { - printf("add_to_pending_server_pairs\n"); struct fixed_obj *fo = get_object_meta(sp); add_to_list(&pending_server_pairs, fo); } static void remove_from_pending_server_pairs(struct r2p2_server_pair *sp) { - printf("remove_from_pending_server_pairs\n"); struct fixed_obj *fo = get_object_meta(sp); remove_from_list(&pending_server_pairs, fo); } @@ -371,7 +371,6 @@ void r2p2_prepare_msg(struct r2p2_msg *msg, struct iovec *iov, int iovcnt, // FIX the header r2p2h = (struct r2p2_header *)target; bzero(r2p2h, sizeof(struct r2p2_header)); - r2p2h->magic = MAGIC; r2p2h->rid = req_id; r2p2h->header_size = sizeof(struct r2p2_header); @@ -459,16 +458,15 @@ static void handle_response(generic_buffer gb, int len, cp->reply.sender = *source; - switch (get_msg_type(r2p2h)) { + switch(get_msg_type(r2p2h)) { case RESPONSE_EXCT_ONCE: // todo: handle already received response assert(cp->eo_info); - send_eo_ack(cp); + eo_send_ack(cp); // no break, continue like regular response +#ifdef WITH_RAFT case RAFT_REP: -//#ifndef WITH_RAFT -// assert(0); -//#endif +#endif case RESPONSE_MSG: assert(cp->state == R2P2_W_RESPONSE); set_buffer_payload_size(gb, len); @@ -543,7 +541,6 @@ static void handle_request(generic_buffer gb, int len, struct r2p2_header *r2p2h, struct r2p2_host_tuple *source) { - printf("handle_request start: "); __debug_dump(); struct r2p2_server_pair *sp; uint16_t req_id; char ack_payload[] = "ACK"; @@ -561,7 +558,6 @@ static void handle_request(generic_buffer gb, int len, assert(sp->eo_info); if (is_first(r2p2h)) { sp->eo_info->req_received++; - // buf_list_send(sp->reply.head_buffer, &sp->request.sender, NULL); eo_try_garbage_collect(sp); } free_buffer(gb); @@ -631,7 +627,6 @@ static void handle_request(generic_buffer gb, int len, free_server_pair(sp); return; } - printf("was_in_pending_sp\n"); was_in_pending_sp = 1; } set_buffer_payload_size(gb, len); @@ -647,7 +642,7 @@ static void handle_request(generic_buffer gb, int len, return; } - if (was_in_pending_sp) + if (was_in_pending_sp && !exct_once) remove_from_pending_server_pairs(sp); #ifdef ACCELERATED @@ -696,12 +691,8 @@ void handle_incoming_pck(generic_buffer gb, int len, if ((unsigned)len < sizeof(struct r2p2_header)) printf("I received %d\n", len); assert((unsigned)len >= sizeof(struct r2p2_header)); - buf = get_buffer_payload(gb); r2p2h = (struct r2p2_header *)buf; - printf("\nReceived packet from %d:%d, seq=%d, len=%d\n", source->ip, source->port, r2p2h->rid, len); - __debug_dump(); - // assert(r2p2h->header_size == (get_msg_type(r2p2h) == REQUEST_EXCT_ONCE ? sizeof(struct r2p2_header) : MIN_HEADER_SIZE)); // Fix endianness r2p2h->rid = ntohs(r2p2h->rid); @@ -720,7 +711,7 @@ void handle_incoming_pck(generic_buffer gb, int len, handle_response(gb, len, r2p2h, source, local_host); #endif else if (is_ack_exct_once(r2p2h)) - handle_ack_eo(gb, len, r2p2h, source); + eo_handle_ack(gb, len, r2p2h, source); else handle_request(gb, len, r2p2h, source); } @@ -733,6 +724,12 @@ int r2p2_backend_init_per_core(void) assert(client_pairs); server_pairs = create_mempool(POOL_SIZE, sizeof(struct r2p2_server_pair)); assert(server_pairs); + client_pairs_eo_info = + create_mempool(POOL_SIZE, sizeof(struct r2p2_cp_exct_once_info)); + assert(client_pairs_eo_info); + server_pairs_eo_info = + create_mempool(POOL_SIZE, sizeof(struct r2p2_sp_exct_once_info)); + assert(server_pairs_eo_info); srand((unsigned)time(&t)); @@ -748,21 +745,12 @@ void timer_triggered(struct r2p2_client_pair *cp) if (!fo->taken) return; - if (cp->eo_info) { - if (cp->eo_info->req_resent < EO_MAX_RETRY_REQUEST) { - printf("EO timeout, retry\n"); - cp_restart_timer(cp, cp->ctx->timeout); - buf_list_send(cp->request.head_buffer, cp->ctx->destination, - cp->impl_data); - } else { - cp->ctx->timeout_cb(cp->ctx->arg); - // Flush the data - remove_from_pending_client_pairs(cp); - free_client_pair(cp); - return; - } + if (cp->eo_info && cp->eo_info->req_resent < EO_MAX_RETRY_REQUEST) { + printf("EO timeout, retry\n"); + cp_restart_timer(cp, cp->ctx->timeout); + buf_list_send(cp->request.head_buffer, cp->ctx->destination, + cp->impl_data); cp->eo_info->req_resent++; - } else { assert(cp->ctx->timeout_cb); cp->ctx->timeout_cb(cp->ctx->arg); @@ -800,22 +788,19 @@ static inline void __r2p2_send_response(long handle, struct iovec *iov, router_notify(sp->request.sender.ip, sp->request.sender.port, sp->request.req_id); } else { - r2p2_prepare_msg(&sp->reply, iov, iovcnt, rep_type, FIXED_ROUTE, sp->request.req_id); - buf_list_send(sp->reply.head_buffer, &sp->request.sender, NULL); // Notify router not for Raft requests if (!is_raft_msg(r2p2h)) router_notify(sp->request.sender.ip, sp->request.sender.port, - sp->request.req_id); + sp->request.req_id); if (rep_type == REQUEST_EXCT_ONCE) { sp->eo_info->reply_resent = 0; sp_restart_timer(sp, EO_TO_REPLY); } else { - // remove_from_pending_server_pairs(sp); free_server_pair(sp); } } @@ -869,7 +854,7 @@ static inline void __r2p2_send_req(struct iovec *iov, int iovcnt, } r2p2_prepare_msg(&cp->request, iov, iovcnt, req_type, ctx->routing_policy, - rid); + rid); add_to_pending_client_pairs(cp); @@ -985,75 +970,79 @@ int gbuffer_reader_init(struct gbuffer_reader *reader, generic_buffer gb) return 0; } - void use_exct_once(struct r2p2_ctx *ctx) { - ctx->routing_policy |= EXCT_ONCE_FLAG; + ctx->routing_policy |= EXCT_ONCE_FLAG; } -void send_eo_ack(struct r2p2_client_pair *cp) +void eo_send_ack(struct r2p2_client_pair *cp) { - assert(cp->eo_info); - struct iovec ack; - struct r2p2_msg ack_msg = {0}; - - ack.iov_base = &(cp->eo_info->req_resent); - ack.iov_len = sizeof(uint16_t); - r2p2_prepare_msg(&ack_msg, &ack, 1, ACK_EXCT_ONCE, FIXED_ROUTE, - cp->request.req_id); - buf_list_send(ack_msg.head_buffer, &cp->reply.sender, cp->impl_data); + assert(cp->eo_info); + struct iovec ack; + struct r2p2_msg ack_msg = {0}; + + ack.iov_base = &(cp->eo_info->req_resent); + ack.iov_len = sizeof(uint16_t); + r2p2_prepare_msg(&ack_msg, &ack, 1, ACK_EXCT_ONCE, FIXED_ROUTE, + cp->request.req_id); + buf_list_send(ack_msg.head_buffer, &cp->reply.sender, cp->impl_data); #ifdef LINUX - free_buffer(ack_msg.head_buffer); + free_buffer(ack_msg.head_buffer); #endif } -void handle_ack_eo(generic_buffer gb, int len, - struct r2p2_header *r2p2h, - struct r2p2_host_tuple *source) +void eo_handle_ack(generic_buffer gb, int len, struct r2p2_header *r2p2h, + struct r2p2_host_tuple *source) { - struct r2p2_server_pair *sp; - uint16_t nb_retries; - assert(len == r2p2h->header_size + sizeof(uint16_t)); - - nb_retries = *(uint16_t *) (get_buffer_payload(gb) + r2p2h->header_size); - printf("Received ack from %d for %d with %d retries\n", source->port, r2p2h->rid, nb_retries); - - sp = find_in_pending_server_pairs(r2p2h->rid, source); - assert(sp && sp->eo_info); - sp->eo_info->req_resent = nb_retries; - if (!eo_try_garbage_collect(sp)) { - sp_restart_timer(sp, EO_TO_NETWORK_FLUSH); - } + struct r2p2_server_pair *sp; + uint16_t nb_retries; + assert(len == r2p2h->header_size + sizeof(uint16_t)); + + nb_retries = *(uint16_t *)(get_buffer_payload(gb) + r2p2h->header_size); + printf("Received ack from %d for %d with %d retries\n", source->port, + r2p2h->rid, nb_retries); + + sp = find_in_pending_server_pairs(r2p2h->rid, source); + assert(sp && sp->eo_info); + sp->eo_info->req_resent = nb_retries; + if (!eo_try_garbage_collect(sp)) { + sp_restart_timer(sp, EO_TO_NETWORK_FLUSH); + } } int eo_try_garbage_collect(struct r2p2_server_pair *sp) { - assert(sp != NULL || sp->eo_info != NULL); - - if (sp->eo_info->req_received > sp->eo_info->req_resent) { - printf("GC early for req %d, received %d/%d\n", sp->request.req_id, sp->eo_info->req_received, sp->eo_info->req_resent); - remove_from_pending_server_pairs(sp); - free_server_pair(sp); - return 1; - } else return 0; + assert(sp != NULL || sp->eo_info != NULL); + if (sp->eo_info->req_received > sp->eo_info->req_resent) { + printf("GC early for req %d, received %d/%d\n", sp->request.req_id, + sp->eo_info->req_received, sp->eo_info->req_resent); + remove_from_pending_server_pairs(sp); + free_server_pair(sp); + return 1; + } else { + return 0; + } } void sp_timer_triggered(struct r2p2_server_pair *sp) { - int timeout; - assert(sp && sp->eo_info); - - if (sp->eo_info->req_resent == ACK_NOT_RECEIVED && sp->eo_info->reply_resent < EO_MAX_RETRY_REPLY) { - printf("Retransmits based on timeout "); __debug_dump(); - buf_list_send(sp->reply.head_buffer, &sp->request.sender, NULL); - timeout = ++sp->eo_info->reply_resent == EO_MAX_RETRY_REPLY ? - EO_TO_NETWORK_FLUSH : EO_TO_REPLY; - sp_restart_timer(sp, timeout); - } else { - printf("GC by timeout for req %d, received %d/%d\n", sp->request.req_id, sp->eo_info->req_received, sp->eo_info->req_resent); - remove_from_pending_server_pairs(sp); - free_server_pair(sp); - } + int timeout; + assert(sp && sp->eo_info); + if (sp->eo_info->req_resent == ACK_NOT_RECEIVED && + sp->eo_info->reply_resent < EO_MAX_RETRY_REPLY) { + printf("Retransmits based on timeout "); + __debug_dump(); + buf_list_send(sp->reply.head_buffer, &sp->request.sender, NULL); + timeout = ++sp->eo_info->reply_resent == EO_MAX_RETRY_REPLY + ? EO_TO_NETWORK_FLUSH + : EO_TO_REPLY; + sp_restart_timer(sp, timeout); + } else { + printf("GC by timeout for req %d, received %d/%d\n", sp->request.req_id, + sp->eo_info->req_received, sp->eo_info->req_resent); + remove_from_pending_server_pairs(sp); + free_server_pair(sp); + } } From 0e5d480722d575d7558259b5221522d496f3db52 Mon Sep 17 00:00:00 2001 From: Paul Renauld Date: Sat, 23 May 2020 15:41:41 +0200 Subject: [PATCH 11/15] typo --- r2p2/r2p2-common.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/r2p2/r2p2-common.c b/r2p2/r2p2-common.c index 846ab20..93e2079 100644 --- a/r2p2/r2p2-common.c +++ b/r2p2/r2p2-common.c @@ -797,7 +797,7 @@ static inline void __r2p2_send_response(long handle, struct iovec *iov, router_notify(sp->request.sender.ip, sp->request.sender.port, sp->request.req_id); - if (rep_type == REQUEST_EXCT_ONCE) { + if (rep_type == RESPONSE_EXCT_ONCE) { sp->eo_info->reply_resent = 0; sp_restart_timer(sp, EO_TO_REPLY); } else { From 3ae259dc9c42f511a5cf0a3a792b07b8f5e28b07 Mon Sep 17 00:00:00 2001 From: Paul Renauld Date: Sun, 24 May 2020 15:28:17 +0200 Subject: [PATCH 12/15] clean --- r2p2/r2p2-common.c | 1 - 1 file changed, 1 deletion(-) diff --git a/r2p2/r2p2-common.c b/r2p2/r2p2-common.c index 93e2079..6938065 100644 --- a/r2p2/r2p2-common.c +++ b/r2p2/r2p2-common.c @@ -460,7 +460,6 @@ static void handle_response(generic_buffer gb, int len, switch(get_msg_type(r2p2h)) { case RESPONSE_EXCT_ONCE: - // todo: handle already received response assert(cp->eo_info); eo_send_ack(cp); // no break, continue like regular response From a4c68e452197973a3baebb4644bdfb6d8c8fce3d Mon Sep 17 00:00:00 2001 From: Paul Renauld Date: Sun, 24 May 2020 15:33:08 +0200 Subject: [PATCH 13/15] remove debug --- r2p2/inc/r2p2/api-internal.h | 9 --------- r2p2/linux-backend.c | 18 ------------------ r2p2/r2p2-common.c | 30 ------------------------------ 3 files changed, 57 deletions(-) diff --git a/r2p2/inc/r2p2/api-internal.h b/r2p2/inc/r2p2/api-internal.h index 0b7e810..cab4697 100644 --- a/r2p2/inc/r2p2/api-internal.h +++ b/r2p2/inc/r2p2/api-internal.h @@ -251,19 +251,10 @@ static inline int is_exct_once(struct r2p2_ctx *ctx) } void eo_send_ack(struct r2p2_client_pair *cp); - void eo_handle_ack(generic_buffer gb, int len, struct r2p2_header *r2p2h, struct r2p2_host_tuple *source); - int eo_try_garbage_collect(struct r2p2_server_pair *sp); - -#define DEBUG 1 - -#if DEBUG -void __debug_dump(); -#endif - void router_notify(uint32_t ip, uint16_t port, uint16_t rid); static inline void r2p2_prepare_feedback(char *dest, uint32_t ip, uint16_t port, uint16_t rid) diff --git a/r2p2/linux-backend.c b/r2p2/linux-backend.c index 483550a..081355c 100644 --- a/r2p2/linux-backend.c +++ b/r2p2/linux-backend.c @@ -102,14 +102,6 @@ int r2p2_init_per_core(int core_id, int core_count) return -1; } -#if DEBUG - event.events = EPOLLIN; - event.data.fd = 0; - ret = epoll_ctl(efd, EPOLL_CTL_ADD, 0, &event); - if (ret) - return -1; -#endif - // Add the server socket event.events = EPOLLIN; event.data.ptr = (void *)&sock.fd; @@ -428,16 +420,6 @@ void r2p2_poll(void) ready = epoll_wait(efd, events, MAX_EVENTS, 0); for (i = 0; i < ready; i++) { - -#if DEBUG - if (events[i].data.fd == 0) { - int c; - while ((c = getchar()) != '\n' && c != EOF) {} - __debug_dump(); - continue; - } -#endif - event_arg = (struct r2p2_socket *)events[i].data.ptr; assert(event_arg); if (events[i].events & EPOLLIN) { diff --git a/r2p2/r2p2-common.c b/r2p2/r2p2-common.c index 6938065..c995a89 100644 --- a/r2p2/r2p2-common.c +++ b/r2p2/r2p2-common.c @@ -69,35 +69,6 @@ static __thread struct fixed_linked_list pending_server_pairs = {0}; static __thread struct iovec to_app_iovec[0xFF]; static __thread uint16_t rid = 0; -#if DEBUG -static void print_cp(void* __cp) { - struct r2p2_client_pair* cp = (struct r2p2_client_pair*) __cp; - if (cp->eo_info) - printf("{req_id: %d, retries: %d}", cp->request.req_id, cp->eo_info->req_resent); -} - -static void print_sp(void* __sp) { - struct r2p2_server_pair* sp = (struct r2p2_server_pair*) __sp; - if (sp->eo_info) - printf("{req_id: %d, retries: %d, req_received: %d}", sp->request.req_id, sp->eo_info->req_resent, sp->eo_info->req_received); -} - -static void print_linked_list(const char* str, struct fixed_linked_list *ll, void(*print_fun)(void*)) { - printf("%s: ", str); - for (struct fixed_obj *obj = ll->head; obj; obj = obj->next) { - print_fun(obj->elem); - } - printf("\n"); -} - -void __debug_dump() -{ - if (pending_client_pairs.head != NULL) print_linked_list("cp", &pending_client_pairs, print_cp); - else if (pending_server_pairs.head != NULL) print_linked_list("sp", &pending_server_pairs, print_sp); - else printf("empty debug\n"); -} -#endif - static struct r2p2_client_pair *alloc_client_pair(int with_eo_info) { struct r2p2_client_pair *cp; @@ -1032,7 +1003,6 @@ void sp_timer_triggered(struct r2p2_server_pair *sp) if (sp->eo_info->req_resent == ACK_NOT_RECEIVED && sp->eo_info->reply_resent < EO_MAX_RETRY_REPLY) { printf("Retransmits based on timeout "); - __debug_dump(); buf_list_send(sp->reply.head_buffer, &sp->request.sender, NULL); timeout = ++sp->eo_info->reply_resent == EO_MAX_RETRY_REPLY ? EO_TO_NETWORK_FLUSH From f80d940ce8b0bd7233c4a9a286e66527871d13b6 Mon Sep 17 00:00:00 2001 From: Paul Renauld Date: Sun, 24 May 2020 18:03:18 +0200 Subject: [PATCH 14/15] cleanup --- r2p2/r2p2-common.c | 2 -- 1 file changed, 2 deletions(-) diff --git a/r2p2/r2p2-common.c b/r2p2/r2p2-common.c index c995a89..cf222ef 100644 --- a/r2p2/r2p2-common.c +++ b/r2p2/r2p2-common.c @@ -127,8 +127,6 @@ static struct r2p2_server_pair *__alloc_server_pair(int with_eo_info) bzero(sp, sizeof(struct r2p2_server_pair)); - printf("alloc sp with eo:%d\n", with_eo_info); - if (with_eo_info) { sp->eo_info = alloc_object(server_pairs_eo_info); assert(sp->eo_info); From 6491b412ce21a704bb608aa0a54c1dcad8825cb4 Mon Sep 17 00:00:00 2001 From: Paul Renauld Date: Fri, 5 Jun 2020 21:20:56 +0200 Subject: [PATCH 15/15] add EO_TO_REQUEST --- r2p2/inc/r2p2/api-internal.h | 5 ++++- r2p2/inc/r2p2/r2p2-linux.h | 2 +- r2p2/r2p2-common.c | 1 + 3 files changed, 6 insertions(+), 2 deletions(-) diff --git a/r2p2/inc/r2p2/api-internal.h b/r2p2/inc/r2p2/api-internal.h index cab4697..6772a93 100644 --- a/r2p2/inc/r2p2/api-internal.h +++ b/r2p2/inc/r2p2/api-internal.h @@ -41,11 +41,14 @@ #define SHOULD_REPLY 0x01 #define EXCT_ONCE_FLAG 0x04 +#define ACK_NOT_RECEIVED UINT16_MAX + +// Below macros should be adapted to the system #define EO_MAX_RETRY_REQUEST 5 #define EO_MAX_RETRY_REPLY 5 +#define EO_TO_REQUEST 2500000 #define EO_TO_REPLY 2500000 #define EO_TO_NETWORK_FLUSH 5000000 -#define ACK_NOT_RECEIVED UINT16_MAX enum { REQUEST_MSG = 0, diff --git a/r2p2/inc/r2p2/r2p2-linux.h b/r2p2/inc/r2p2/r2p2-linux.h index ecced24..45f7ea9 100644 --- a/r2p2/inc/r2p2/r2p2-linux.h +++ b/r2p2/inc/r2p2/r2p2-linux.h @@ -57,7 +57,7 @@ struct socket_pool { struct r2p2_socket sockets[SOCKPOOL_SIZE]; }; -// Used for client pairs +// Used for server pairs struct loose_timer { int fd; int taken; diff --git a/r2p2/r2p2-common.c b/r2p2/r2p2-common.c index cf222ef..541d6f2 100644 --- a/r2p2/r2p2-common.c +++ b/r2p2/r2p2-common.c @@ -941,6 +941,7 @@ int gbuffer_reader_init(struct gbuffer_reader *reader, generic_buffer gb) void use_exct_once(struct r2p2_ctx *ctx) { ctx->routing_policy |= EXCT_ONCE_FLAG; + ctx->timeout = EO_TO_REQUEST; } void eo_send_ack(struct r2p2_client_pair *cp)