Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Exactly-once semantic #1

Open
wants to merge 16 commits into
base: master
Choose a base branch
from
54 changes: 54 additions & 0 deletions r2p2/inc/r2p2/api-internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,25 @@
#define MAGIC 0xCC
#define SHOULD_REPLY 0x01

#define EXCT_ONCE_FLAG 0x04
#define ACK_NOT_RECEIVED UINT16_MAX

// Below macros should be adapted to the system
#define EO_MAX_RETRY_REQUEST 5
#define EO_MAX_RETRY_REPLY 5
#define EO_TO_REQUEST 2500000
#define EO_TO_REPLY 2500000
#define EO_TO_NETWORK_FLUSH 5000000

enum {
REQUEST_MSG = 0,
RESPONSE_MSG,
FEEDBACK_MSG,
ACK_MSG,
DROP_MSG,
REQUEST_EXCT_ONCE,
RESPONSE_EXCT_ONCE,
ACK_EXCT_ONCE,
RAFT_REQ,
RAFT_REP,
RAFT_MSG,
Expand Down Expand Up @@ -75,6 +88,10 @@ struct __attribute__((__packed__)) r2p2_msg {
generic_buffer tail_buffer;
};

struct r2p2_cp_exct_once_info {
uint16_t req_resent;
};

struct r2p2_client_pair {
struct r2p2_msg request;
struct r2p2_msg reply;
Expand All @@ -88,13 +105,22 @@ struct r2p2_client_pair {
void *timer;
void *impl_data; // Used to hold the socket used in linux
void (*on_free)(void *impl_data);
struct r2p2_cp_exct_once_info *eo_info;
};

struct r2p2_sp_exct_once_info {
uint16_t req_received;
uint16_t req_resent;
uint16_t reply_resent;
void *timer;
};

struct r2p2_server_pair {
struct r2p2_msg request;
struct r2p2_msg reply;
uint16_t request_expected_packets;
uint16_t request_received_packets;
struct r2p2_sp_exct_once_info *eo_info;
uint8_t flags;
#ifdef ACCELERATED
long received_at;
Expand All @@ -105,6 +131,7 @@ struct r2p2_server_pair {
static inline int is_response(struct r2p2_header *h)
{
return ((h->type_policy & 0xF0) == (RESPONSE_MSG << 4)) ||
((h->type_policy & 0xF0) == (RESPONSE_EXCT_ONCE << 4)) ||
((h->type_policy & 0xF0) == (ACK_MSG << 4)) ||
((h->type_policy & 0xF0) == (DROP_MSG << 4)) ||
((h->type_policy & 0xF0) == (RAFT_REP << 4));
Expand Down Expand Up @@ -132,6 +159,11 @@ static inline int is_raft_msg(struct r2p2_header *h)
((h->type_policy & 0xF0) == (RAFT_MSG << 4));
}

static inline int is_ack_exct_once(struct r2p2_header *h)
{
return ((h->type_policy & 0xF0) == (ACK_EXCT_ONCE << 4));
}

static inline uint8_t get_policy(struct r2p2_header *h)
{
return (h->type_policy & 0xF);
Expand Down Expand Up @@ -189,6 +221,8 @@ void handle_incoming_pck(generic_buffer gb, int len,
struct r2p2_host_tuple *local_host);
#endif
void timer_triggered(struct r2p2_client_pair *cp);
void sp_timer_triggered(struct r2p2_server_pair *sp);

void forward_request(struct r2p2_server_pair *sp);
struct r2p2_server_pair *alloc_server_pair(void);
void free_server_pair(struct r2p2_server_pair *sp);
Expand All @@ -204,6 +238,26 @@ int prepare_to_send(struct r2p2_client_pair *cp);
int buf_list_send(generic_buffer first_buf, struct r2p2_host_tuple *dest,
void *socket_info);
int disarm_timer(void *timer);

int cp_restart_timer(struct r2p2_client_pair *cp, long timeout);

void sp_get_timer(struct r2p2_server_pair *sp);
int sp_restart_timer(struct r2p2_server_pair *sp, long timeout);
void sp_free_timer(struct r2p2_server_pair *sp);

/*
* Exactly Once specific
*/
static inline int is_exct_once(struct r2p2_ctx *ctx)
{
return (ctx->routing_policy & EXCT_ONCE_FLAG) != 0;
}

void eo_send_ack(struct r2p2_client_pair *cp);
void eo_handle_ack(generic_buffer gb, int len, struct r2p2_header *r2p2h,
struct r2p2_host_tuple *source);
int eo_try_garbage_collect(struct r2p2_server_pair *sp);

void router_notify(uint32_t ip, uint16_t port, uint16_t rid);
static inline void r2p2_prepare_feedback(char *dest, uint32_t ip,
uint16_t port, uint16_t rid)
Expand Down
4 changes: 3 additions & 1 deletion r2p2/inc/r2p2/api.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ enum {
struct __attribute__((packed)) r2p2_ctx {
success_cb_f success_cb;
error_cb_f error_cb;
timeout_cb_f timeout_cb;
timeout_cb_f timeout_cb; // with EO, triggered only in case of failure, i.e. after enough timeouts
void *arg;
long timeout;
int routing_policy;
Expand All @@ -81,3 +81,5 @@ void r2p2_set_app_flow_control_fn(app_flow_control fn);
void r2p2_send_req(struct iovec *iov, int iovcnt, struct r2p2_ctx *ctx);
void r2p2_send_response(long handle, struct iovec *iov, int iovcnt);
void r2p2_recv_resp_done(long handle);

void use_exct_once(struct r2p2_ctx *ctx);
15 changes: 15 additions & 0 deletions r2p2/inc/r2p2/r2p2-linux.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
#define BUFLEN 2048 //(PAYLOAD_SIZE + sizeof(struct r2p2_header) + sizeof(struct
// linux_buf_hdr)) half a page

#define TIMERPOOL_SIZE 128

struct __attribute__((packed)) linux_buf_hdr {
uint32_t payload_size;
struct linux_buf_hdr *next;
Expand All @@ -54,3 +56,16 @@ struct socket_pool {
uint32_t idx;
struct r2p2_socket sockets[SOCKPOOL_SIZE];
};

// Used for server pairs
struct loose_timer {
int fd;
int taken;
void *data;
};

struct timer_pool {
uint32_t count;
uint32_t idx;
struct loose_timer timers[TIMERPOOL_SIZE];
};
117 changes: 114 additions & 3 deletions r2p2/linux-backend.c
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ struct sockaddr_in router_addr;

static __thread int efd;
static __thread struct socket_pool sp;
static __thread struct timer_pool tp;
static __thread struct fixed_mempool *buf_pool;

#ifdef WITH_TIMESTAMPING
Expand Down Expand Up @@ -178,6 +179,25 @@ int r2p2_init_per_core(int core_id, int core_count)
}
}

tp.count = 0;
tp.idx = 0;
// Create the loose timers
for (i = 0; i < TIMERPOOL_SIZE; i++) {
tfd = timerfd_create(CLOCK_REALTIME, TFD_NONBLOCK);

tp.timers[i].fd = tfd;
tp.timers[i].taken = 0;
tp.timers[i].data = NULL;

event.events = EPOLLIN;
event.data.ptr = (void *)&(tp.timers[i]);
ret = epoll_ctl(efd, EPOLL_CTL_ADD, tfd, &event);
if (ret) {
perror("epoll_ctl");
return -1;
}
}

return 0;
}

Expand All @@ -202,6 +222,24 @@ static struct r2p2_socket *get_socket(void)
return res;
}

static struct loose_timer *get_timer(void)
{
struct loose_timer *res;
uint32_t idx;

if (tp.count >= TIMERPOOL_SIZE)
return NULL;

while (tp.timers[tp.idx++ & (TIMERPOOL_SIZE - 1)].taken)
;
idx = (tp.idx - 1) & (TIMERPOOL_SIZE - 1);
res = &tp.timers[idx];
tp.timers[idx].taken = 1;
tp.count++;

return res;
}

static int __disarm_timer(int timerfd)
{
struct itimerspec ts = {0};
Expand Down Expand Up @@ -235,6 +273,35 @@ static void handle_timer_for_socket(struct r2p2_socket *s)
timer_triggered(s->cp);
}

static void handle_loose_timer_triggered(struct loose_timer *ft)
{
assert(ft && ft->data && ft->taken);
__disarm_timer(ft->fd);
sp_timer_triggered((struct r2p2_server_pair *)ft->data);
}

void sp_get_timer(struct r2p2_server_pair *sp)
{
struct loose_timer *t;
t = get_timer();
if (t == NULL) {
perror("Could not allocate free timer");
}
t->data = sp;
sp->eo_info->timer = t;
}

void sp_free_timer(struct r2p2_server_pair *sp)
{
struct loose_timer *t = sp->eo_info->timer;
if (t) {
t->taken = 0;
t->data = NULL;
__disarm_timer(t->fd);
sp->eo_info->timer = NULL;
}
}

/*
* Generic buffer implementation
*/
Expand Down Expand Up @@ -339,7 +406,7 @@ int r2p2_init(int listen_port)
void r2p2_poll(void)
{
struct epoll_event events[MAX_EVENTS];
int ready, i, recvlen, is_timer_event;
int ready, i, recvlen, is_socket_timer_event, is_loose_timer_event;
struct r2p2_socket *s;
generic_buffer gb;
void *buf, *event_arg;
Expand All @@ -356,9 +423,16 @@ void r2p2_poll(void)
event_arg = (struct r2p2_socket *)events[i].data.ptr;
assert(event_arg);
if (events[i].events & EPOLLIN) {
is_timer_event =
is_socket_timer_event =
(unsigned long)event_arg % sizeof(struct r2p2_socket);
if (is_timer_event) {
is_loose_timer_event =
(void *)tp.timers <= events[i].data.ptr &&
events[i].data.ptr < (void *)(tp.timers + TIMERPOOL_SIZE);

if (is_loose_timer_event) {
handle_loose_timer_triggered(
(struct loose_timer *)events[i].data.ptr);
} else if (is_socket_timer_event) {
assert((unsigned long)event_arg % sizeof(struct r2p2_socket) ==
4);
s = container_of(event_arg, struct r2p2_socket, tfd);
Expand Down Expand Up @@ -480,6 +554,43 @@ int disarm_timer(void *timer)
return __disarm_timer(tfd);
}

static void __restart_timer(int tfd, long timeout)
{
struct itimerspec ts;

ts.it_interval.tv_sec = 0;
ts.it_interval.tv_nsec = 0;
ts.it_value.tv_sec = timeout / 1000000;
ts.it_value.tv_nsec = (timeout % 1000000) * 1000;

if (timerfd_settime(tfd, 0, &ts, NULL) < 0) {
perror("Error resetting timer");
assert(0);
}
}

int cp_restart_timer(struct r2p2_client_pair *cp, long timeout)
{
struct r2p2_socket *s;

s = (struct r2p2_socket *)cp->impl_data;
assert(s->taken && s->cp == cp);
__restart_timer(s->tfd, timeout);
cp->timer = (void *)(long)s->tfd;
return 0;
}

int sp_restart_timer(struct r2p2_server_pair *sp, long timeout)
{
struct loose_timer *timer;
assert(sp && sp->eo_info && sp->eo_info->timer);

timer = (struct loose_timer *)sp->eo_info->timer;
assert(timer->data == sp && timer->taken);
__restart_timer(timer->fd, timeout);
return 0;
}

void router_notify(uint32_t ip, uint16_t port, uint16_t rid)
{
#ifdef WITH_ROUTER
Expand Down
Loading