diff --git a/.gitignore b/.gitignore index 534fa6e..734708f 100644 --- a/.gitignore +++ b/.gitignore @@ -59,6 +59,7 @@ demo-app-bp-recv .vagrant sender receiver +bp_client tests/sender tests/receiver daemon/bp_daemon diff --git a/bp_client.c b/bp_client.c new file mode 100644 index 0000000..ac35bec --- /dev/null +++ b/bp_client.c @@ -0,0 +1,204 @@ +#include "include/bp_socket.h" +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#define BUFFER_SIZE 1024 +#define AF_BP 28 + +volatile int running = 1; + +struct client_data { + int fd; + struct sockaddr_bp dest_addr; + struct sockaddr_bp src_addr; +}; + +void handle_sigint(int sig) { + printf("\nInterrupt received, shutting down...\n"); + running = 0; +} + +void *send_thread(void *arg) { + struct client_data *data = (struct client_data *)arg; + char send_buffer[BUFFER_SIZE]; + int message_count = 0; + + printf("Send thread started\n"); + + while (running) { + message_count++; + snprintf(send_buffer, sizeof(send_buffer), "Hello from client! Message #%d", + message_count); + + int flags = 0; + flags |= MSG_ACK_REQUESTED; + + int ret = + sendto(data->fd, send_buffer, strlen(send_buffer) + 1, flags, + (struct sockaddr *)&data->dest_addr, sizeof(data->dest_addr)); + if (ret < 0) { + if (errno == EAGAIN || errno == EWOULDBLOCK) { + continue; + } + printf("sendto failed for ipn:%u.%u: %s\n", + data->dest_addr.bp_addr.ipn.node_id, + data->dest_addr.bp_addr.ipn.service_id, strerror(errno)); + break; + } + + printf("Message sent: %s\n", send_buffer); + } + + printf("Send thread exiting\n"); + return NULL; +} + +void *receive_thread(void *arg) { + struct client_data *data = (struct client_data *)arg; + struct msghdr msg; + struct iovec iov; + char buffer[BUFFER_SIZE]; + struct sockaddr_bp src_addr; + + // Set up message structure for receiving + iov.iov_base = buffer; + iov.iov_len = sizeof(buffer); + memset(&msg, 0, sizeof(msg)); + msg.msg_iov = &iov; + msg.msg_iovlen = 1; + memset(&src_addr, 0, sizeof(src_addr)); + msg.msg_name = &src_addr; + msg.msg_namelen = sizeof(src_addr); + + printf("Receive thread started\n"); + + while (running) { + ssize_t n = recvmsg(data->fd, &msg, 0); + if (n < 0) { + if (errno == EINTR) { + printf("\nInterrupted by signal, exiting...\n"); + break; + } + if (errno == EAGAIN || errno == EWOULDBLOCK) { + // Timeout reached, continue loop to check running flag + continue; + } + perror("recvmsg failed"); + break; + } + + printf("Received message (%zd bytes): %.*s\n", n, (int)n, buffer); + if (msg.msg_namelen >= sizeof(struct sockaddr_bp)) { + printf("Bundle sent by ipn:%u.%u\n", src_addr.bp_addr.ipn.node_id, + src_addr.bp_addr.ipn.service_id); + } + + memset(&msg, 0, sizeof(msg)); + msg.msg_iov = &iov; + msg.msg_iovlen = 1; + memset(&src_addr, 0, sizeof(src_addr)); + msg.msg_name = &src_addr; + msg.msg_namelen = sizeof(src_addr); + } + + printf("Receive thread exiting\n"); + return NULL; +} + +int main(int argc, char *argv[]) { + struct sockaddr_bp dest_addr, src_addr; + int fd; + uint32_t node_id, service_id; + int ret = 0; + pthread_t send_tid, recv_tid; + struct client_data data; + + if (argc < 3) { + printf("Usage: %s \n", argv[0]); + return EXIT_FAILURE; + } + + signal(SIGINT, handle_sigint); + + node_id = (uint32_t)atoi(argv[1]); + service_id = (uint32_t)atoi(argv[2]); + + if (service_id < 1 || service_id > 255) { + fprintf(stderr, "Invalid service_id (must be in 1-255)\n"); + return EXIT_FAILURE; + } + + if (node_id == 0) { + fprintf(stderr, "Invalid node_id (cannot be 0)\n"); + return EXIT_FAILURE; + } + + fd = socket(AF_BP, SOCK_DGRAM, 0); + if (fd < 0) { + perror("socket creation failed"); + return EXIT_FAILURE; + } + + src_addr.bp_family = AF_BP; + src_addr.bp_scheme = BP_SCHEME_IPN; + src_addr.bp_addr.ipn.node_id = 10; + src_addr.bp_addr.ipn.service_id = 2; + if (bind(fd, (struct sockaddr *)&src_addr, sizeof(src_addr)) == -1) { + perror("Failed to bind socket"); + ret = EXIT_FAILURE; + goto out; + } + + struct timeval timeout; + timeout.tv_sec = 1; // 1 second timeout + timeout.tv_usec = 0; + if (setsockopt(fd, SOL_SOCKET, SO_RCVTIMEO, &timeout, sizeof(timeout)) < 0) { + perror("Failed to set socket timeout"); + ret = EXIT_FAILURE; + goto out; + } + + dest_addr.bp_family = AF_BP; + dest_addr.bp_scheme = BP_SCHEME_IPN; + dest_addr.bp_addr.ipn.node_id = node_id; + dest_addr.bp_addr.ipn.service_id = service_id; + + data.fd = fd; + data.dest_addr = dest_addr; + data.src_addr = src_addr; + + printf("BP Client started - sending messages to ipn:%u.%u\n", node_id, + service_id); + printf("Press Ctrl+C to exit.\n"); + + if (pthread_create(&send_tid, NULL, send_thread, &data) != 0) { + perror("Failed to create send thread"); + ret = EXIT_FAILURE; + goto out; + } + + if (pthread_create(&recv_tid, NULL, receive_thread, &data) != 0) { + perror("Failed to create receive thread"); + running = 0; + pthread_join(send_tid, NULL); + ret = EXIT_FAILURE; + goto out; + } + + pthread_join(send_tid, NULL); + pthread_join(recv_tid, NULL); + +out: + close(fd); + printf("Socket closed.\n"); + return ret; +} diff --git a/bp_socket/af_bp.c b/bp_socket/af_bp.c index 34ce876..67939ef 100644 --- a/bp_socket/af_bp.c +++ b/bp_socket/af_bp.c @@ -285,7 +285,7 @@ int bp_sendmsg(struct socket* sock, struct msghdr* msg, size_t size) } if (size > 0) { - payload = kmalloc(size, GFP_KERNEL); + payload = kmalloc(size, GFP_ATOMIC); if (!payload) { pr_err("bp_sendmsg: failed to allocate memory\n"); ret = -ENOMEM; @@ -299,17 +299,14 @@ int bp_sendmsg(struct socket* sock, struct msghdr* msg, size_t size) } ret = send_bundle_doit(payload, size, dest_node_id, - dest_service_id, bp->bp_node_id, bp->bp_service_id, 8443); + dest_service_id, bp->bp_node_id, bp->bp_service_id, + msg->msg_flags, 8443); if (ret < 0) { pr_err( "bp_sendmsg: send_bundle_doit failed (%d)\n", ret); goto err_free; } - pr_info("bp_sendmsg: bundle sent for endpoint ipn:%u.%u (size: " - "%zu)\n", - bp->bp_node_id, bp->bp_service_id, size); - kfree(payload); } diff --git a/bp_socket/bp_genl.c b/bp_socket/bp_genl.c index 1794fb5..fd441ee 100644 --- a/bp_socket/bp_genl.c +++ b/bp_socket/bp_genl.c @@ -12,6 +12,7 @@ static const struct nla_policy nla_policy[BP_GENL_A_MAX + 1] = { [BP_GENL_A_DEST_SERVICE_ID] = { .type = NLA_U32 }, [BP_GENL_A_PAYLOAD] = { .type = NLA_BINARY }, [BP_GENL_A_ADU] = { .type = NLA_U64 }, + [BP_GENL_A_FLAGS] = { .type = NLA_U32 }, }; static struct genl_ops genl_ops[] = { { @@ -41,14 +42,14 @@ struct genl_family genl_fam = { int send_bundle_doit(void* payload, size_t payload_size, u_int32_t dest_node_id, u_int32_t dest_service_id, u_int32_t src_node_id, u_int32_t src_service_id, - int port_id) + u_int32_t flags, int port_id) { void* msg_head; struct sk_buff* msg; size_t msg_size; int ret; - msg_size = 4 * nla_total_size(sizeof(u_int32_t)) + msg_size = 5 * nla_total_size(sizeof(u_int32_t)) + nla_total_size(payload_size); msg = genlmsg_new(msg_size, GFP_KERNEL); if (!msg) { @@ -104,6 +105,13 @@ int send_bundle_doit(void* payload, size_t payload_size, u_int32_t dest_node_id, goto err_cancel; } + ret = nla_put_u32(msg, BP_GENL_A_FLAGS, flags); + if (ret) { + pr_err( + "send_bundle: failed to put BP_GENL_A_FLAGS (%d)\n", ret); + goto err_cancel; + } + genlmsg_end(msg, msg_head); return genlmsg_unicast(&init_net, msg, port_id); diff --git a/bp_socket/bp_genl.h b/bp_socket/bp_genl.h index dfed82a..0d4e3d8 100644 --- a/bp_socket/bp_genl.h +++ b/bp_socket/bp_genl.h @@ -9,7 +9,7 @@ int open_endpoint_doit(u_int32_t node_id, u_int32_t service_id, int port_id); int close_endpoint_doit(u_int32_t node_id, u_int32_t service_id, int port_id); int send_bundle_doit(void* payload, size_t payload_size, u_int32_t dest_node_id, u_int32_t dest_service_id, u_int32_t src_node_id, u_int32_t src_service_id, - int port_id); + u_int32_t flags, int port_id); int enqueue_bundle_doit(struct sk_buff* skb, struct genl_info* info); int destroy_bundle_doit(uint64_t adu, int port_id); diff --git a/daemon/bp_genl_handlers.c b/daemon/bp_genl_handlers.c index cf96372..c7ebe1e 100644 --- a/daemon/bp_genl_handlers.c +++ b/daemon/bp_genl_handlers.c @@ -67,17 +67,15 @@ int handle_send_bundle(Daemon *daemon, struct nlattr **attrs) { u_int32_t dest_node_id, dest_service_id, src_node_id, src_service_id; char dest_eid[64]; int written; - pthread_t thread; - struct ion_send_args *args; - struct endpoint_ctx *ctx; - void *payload_copy; + u_int32_t flags; + int ret; if (!attrs[BP_GENL_A_PAYLOAD] || !attrs[BP_GENL_A_DEST_NODE_ID] || !attrs[BP_GENL_A_DEST_SERVICE_ID] || !attrs[BP_GENL_A_SRC_NODE_ID] || - !attrs[BP_GENL_A_SRC_SERVICE_ID]) { + !attrs[BP_GENL_A_SRC_SERVICE_ID] || !attrs[BP_GENL_A_FLAGS]) { log_error( "handle_send_bundle: missing attribute(s) in SEND_BUNDLE command (payload, node ID, " - "service ID)"); + "service ID, flags)"); return -EINVAL; } @@ -87,6 +85,7 @@ int handle_send_bundle(Daemon *daemon, struct nlattr **attrs) { dest_service_id = nla_get_u32(attrs[BP_GENL_A_DEST_SERVICE_ID]); src_node_id = nla_get_u32(attrs[BP_GENL_A_SRC_NODE_ID]); src_service_id = nla_get_u32(attrs[BP_GENL_A_SRC_SERVICE_ID]); + flags = nla_get_u32(attrs[BP_GENL_A_FLAGS]); written = snprintf(dest_eid, sizeof(dest_eid), "ipn:%u.%u", dest_node_id, dest_service_id); if (written < 0 || written >= (int)sizeof(dest_eid)) { @@ -95,42 +94,16 @@ int handle_send_bundle(Daemon *daemon, struct nlattr **attrs) { return -EINVAL; } - ctx = endpoint_registry_get(src_node_id, src_service_id); - if (!ctx) { - log_error("[ipn:%u.%u] handle_send_bundle: no endpoint for ipn:%u.%u", src_node_id, - src_service_id, src_node_id, src_service_id); - return -ENODEV; + ret = endpoint_registry_enqueue_send(src_node_id, src_service_id, dest_eid, payload, + payload_size, flags); + if (ret < 0) { + log_error("[ipn:%u.%u] handle_send_bundle: failed to enqueue send (error: %d)", src_node_id, + src_service_id, ret); + return ret; } - payload_copy = malloc(payload_size); - if (!payload_copy) { - log_error("[ipn:%u.%u] handle_send_bundle: failed to allocate payload", src_node_id, - src_service_id); - return -ENOMEM; - } - memcpy(payload_copy, payload, payload_size); - - // Enqueue to send thread using source endpoint SAP - // Launch async send thread - args = malloc(sizeof(struct ion_send_args)); - if (!args) return -ENOMEM; - args->node_id = src_node_id; - args->service_id = src_service_id; - args->dest_eid = strndup(dest_eid, sizeof(dest_eid)); - args->netlink_sock = daemon->genl_bp_sock; - args->netlink_mutex = &daemon->netlink_mutex; - args->netlink_family = daemon->genl_bp_family_id; - args->payload = payload_copy; - args->payload_size = payload_size; - - if (pthread_create(&thread, NULL, ion_send_thread, args) != 0) { - log_error("[ipn:%u.%u] handle_send_bundle: failed to create send thread", src_node_id, - src_service_id); - free(args->dest_eid); - free(args->payload); - free(args); - return -errno; - } + log_info("[ipn:%u.%u] SEND_BUNDLE: bundle queued for sending to EID %s, size %zu (bytes)", + src_node_id, src_service_id, dest_eid, payload_size); return 0; } diff --git a/daemon/endpoint_registry.c b/daemon/endpoint_registry.c index c1e3c85..b0035c1 100644 --- a/daemon/endpoint_registry.c +++ b/daemon/endpoint_registry.c @@ -1,7 +1,9 @@ #include "endpoint_registry.h" #include "log.h" +#include #include #include +#include static pthread_mutex_t endpoint_registry_mutex = PTHREAD_MUTEX_INITIALIZER; static struct endpoint_ctx *endpoint_head = NULL; @@ -30,6 +32,8 @@ int endpoint_registry_add(struct endpoint_ctx *ctx) { ctx->next = endpoint_head; endpoint_head = ctx; + pthread_mutex_init(&ctx->send_queue_mutex, NULL); + pthread_cond_init(&ctx->send_queue_cond, NULL); pthread_mutex_unlock(&endpoint_registry_mutex); return 0; @@ -73,6 +77,8 @@ int endpoint_registry_remove(uint32_t node_id, uint32_t service_id) { prev->next = current->next; } + pthread_mutex_destroy(¤t->send_queue_mutex); + pthread_cond_destroy(¤t->send_queue_cond); free(current); pthread_mutex_unlock(&endpoint_registry_mutex); @@ -107,3 +113,63 @@ bool endpoint_registry_exists(uint32_t node_id, uint32_t service_id) { pthread_mutex_unlock(&endpoint_registry_mutex); return false; } + +int endpoint_registry_enqueue_send(uint32_t node_id, uint32_t service_id, const char *dest_eid, + const void *payload, size_t payload_size, uint32_t flags) { + struct endpoint_ctx *ctx = endpoint_registry_get(node_id, service_id); + if (!ctx) { + log_error("endpoint_registry_enqueue_send: no endpoint for ipn:%u.%u", node_id, service_id); + return -ENODEV; + } + + struct send_queue_item *item = malloc(sizeof(struct send_queue_item)); + if (!item) { + log_error("endpoint_registry_enqueue_send: failed to allocate queue item"); + return -ENOMEM; + } + + item->dest_eid = strdup(dest_eid); + if (!item->dest_eid) { + log_error("endpoint_registry_enqueue_send: failed to duplicate dest_eid"); + free(item); + return -ENOMEM; + } + + item->payload = malloc(payload_size); + if (!item->payload) { + log_error("endpoint_registry_enqueue_send: failed to allocate payload"); + free(item->dest_eid); + free(item); + return -ENOMEM; + } + + memcpy(item->payload, payload, payload_size); + item->payload_size = payload_size; + item->flags = flags; + item->next = NULL; + + pthread_mutex_lock(&ctx->send_queue_mutex); + + if (ctx->send_queue_size >= 1000) { // Limite de queue + pthread_mutex_unlock(&ctx->send_queue_mutex); + log_warn("endpoint_registry_enqueue_send: queue full for ipn:%u.%u", node_id, service_id); + free(item->dest_eid); + free(item->payload); + free(item); + return -EAGAIN; + } + + if (ctx->send_queue_tail) { + ctx->send_queue_tail->next = item; + ctx->send_queue_tail = item; + } else { + ctx->send_queue_head = item; + ctx->send_queue_tail = item; + } + ctx->send_queue_size++; + + pthread_cond_signal(&ctx->send_queue_cond); + pthread_mutex_unlock(&ctx->send_queue_mutex); + + return 0; +} diff --git a/daemon/endpoint_registry.h b/daemon/endpoint_registry.h index 1133dba..83474bd 100644 --- a/daemon/endpoint_registry.h +++ b/daemon/endpoint_registry.h @@ -13,13 +13,30 @@ struct endpoint_ctx { _Atomic int running; pthread_t recv_thread; + pthread_t send_thread; + + struct send_queue_item *send_queue_head; + struct send_queue_item *send_queue_tail; + pthread_mutex_t send_queue_mutex; + pthread_cond_t send_queue_cond; + int send_queue_size; struct endpoint_ctx *next; }; +struct send_queue_item { + char *dest_eid; + void *payload; + size_t payload_size; + uint32_t flags; + struct send_queue_item *next; +}; + int endpoint_registry_add(struct endpoint_ctx *ctx); struct endpoint_ctx *endpoint_registry_get(uint32_t node_id, uint32_t service_id); int endpoint_registry_remove(uint32_t node_id, uint32_t service_id); bool endpoint_registry_exists(uint32_t node_id, uint32_t service_id); +int endpoint_registry_enqueue_send(uint32_t node_id, uint32_t service_id, const char *dest_eid, + const void *payload, size_t payload_size, uint32_t flags); #endif diff --git a/daemon/ion.c b/daemon/ion.c index 0beb350..e81c07b 100644 --- a/daemon/ion.c +++ b/daemon/ion.c @@ -1,4 +1,5 @@ #include "ion.h" +#include "../include/bp_socket.h" #include "bp_genl.h" #include "endpoint_registry.h" #include "log.h" @@ -22,7 +23,8 @@ static int make_eid(char *buf, size_t bufsize, u_int32_t node_id, u_int32_t serv int ion_open_endpoint(u_int32_t node_id, u_int32_t service_id, struct nl_sock *netlink_sock, pthread_mutex_t *netlink_mutex, int netlink_family) { - struct ion_recv_args *args; + struct ion_recv_args *recv_args; + struct ion_send_args *send_args; struct endpoint_ctx *ctx; char eid[64]; BpSAP sap; @@ -53,31 +55,57 @@ int ion_open_endpoint(u_int32_t node_id, u_int32_t service_id, struct nl_sock *n ctx->service_id = service_id; ctx->sap = sap; atomic_init(&ctx->running, 1); + ctx->send_queue_head = NULL; + ctx->send_queue_tail = NULL; + ctx->send_queue_size = 0; + pthread_mutex_init(&ctx->send_queue_mutex, NULL); + pthread_cond_init(&ctx->send_queue_cond, NULL); err = endpoint_registry_add(ctx); if (err) { __atomic_store_n(&ctx->running, 0, __ATOMIC_RELAXED); + pthread_mutex_destroy(&ctx->send_queue_mutex); + pthread_cond_destroy(&ctx->send_queue_cond); bp_close(sap); free(ctx); return -ENOMEM; } - args = calloc(1, sizeof(struct ion_recv_args)); - if (!args) { - log_error("ion_open_endpoint: failed to allocate thread args"); + recv_args = calloc(1, sizeof(struct ion_recv_args)); + if (!recv_args) { + log_error("ion_open_endpoint: failed to allocate receive thread args"); bp_close(sap); free(ctx); return -ENOMEM; } - args->netlink_sock = netlink_sock; - args->netlink_mutex = netlink_mutex; - args->netlink_family = netlink_family; - args->ctx = ctx; + recv_args->netlink_sock = netlink_sock; + recv_args->netlink_mutex = netlink_mutex; + recv_args->netlink_family = netlink_family; + recv_args->ctx = ctx; - if (pthread_create(&ctx->recv_thread, NULL, ion_receive_thread, args) != 0) { + if (pthread_create(&ctx->recv_thread, NULL, ion_receive_thread, recv_args) != 0) { log_error("ion_open_endpoint: failed to create receive thread: %s", strerror(errno)); bp_close(sap); - free(args); + free(recv_args); + free(ctx); + return -errno; + } + + send_args = calloc(1, sizeof(struct ion_send_args)); + if (!send_args) { + log_error("ion_open_endpoint: failed to allocate send thread args"); + bp_close(sap); + free(recv_args); + free(ctx); + return -ENOMEM; + } + send_args->ctx = ctx; + + if (pthread_create(&ctx->send_thread, NULL, ion_send_thread, send_args) != 0) { + log_error("ion_open_endpoint: failed to create send thread: %s", strerror(errno)); + bp_close(sap); + free(send_args); + free(recv_args); free(ctx); return -errno; } @@ -94,10 +122,30 @@ int ion_close_endpoint(u_int32_t node_id, u_int32_t service_id) { __atomic_store_n(&ctx->running, 0, __ATOMIC_RELAXED); + pthread_mutex_lock(&ctx->send_queue_mutex); + pthread_cond_broadcast(&ctx->send_queue_cond); + pthread_mutex_unlock(&ctx->send_queue_mutex); bp_interrupt(ctx->sap); pthread_join(ctx->recv_thread, NULL); - bp_close(ctx->sap); + pthread_join(ctx->send_thread, NULL); + + pthread_mutex_lock(&ctx->send_queue_mutex); + struct send_queue_item *item = ctx->send_queue_head; + while (item) { + struct send_queue_item *next = item->next; + free(item->dest_eid); + free(item->payload); + free(item); + item = next; + } + ctx->send_queue_head = NULL; + ctx->send_queue_tail = NULL; + ctx->send_queue_size = 0; + pthread_mutex_unlock(&ctx->send_queue_mutex); + pthread_mutex_destroy(&ctx->send_queue_mutex); + pthread_cond_destroy(&ctx->send_queue_cond); + bp_close(ctx->sap); endpoint_registry_remove(node_id, service_id); return 0; @@ -105,95 +153,88 @@ int ion_close_endpoint(u_int32_t node_id, u_int32_t service_id) { void *ion_send_thread(void *arg) { struct ion_send_args *args = arg; - const char *dest_eid = args->dest_eid; - const void *payload = args->payload; - size_t payload_size = args->payload_size; - u_int32_t node_id = args->node_id; - u_int32_t service_id = args->service_id; - struct endpoint_ctx *ctx; + struct endpoint_ctx *ctx = args->ctx; + struct send_queue_item *item; Object sdr_buffer = 0; Object adu = 0; - int ret = 0; + struct bp_send_flags parsed_flags; - if (!dest_eid || !payload || payload_size == 0) { - log_error("ion_send_thread: invalid parameters"); - ret = -EINVAL; - goto cleanup; - } + log_info("ion_send_thread: started for ipn:%u.%u", ctx->node_id, ctx->service_id); - ctx = endpoint_registry_get(node_id, service_id); - if (!ctx) { - log_error("ion_send_thread: no endpoint for ipn:%u.%u", node_id, service_id); - ret = -ENODEV; - goto cleanup; - } + while (__atomic_load_n(&ctx->running, __ATOMIC_RELAXED)) { + pthread_mutex_lock(&ctx->send_queue_mutex); + while (ctx->send_queue_head == NULL && __atomic_load_n(&ctx->running, __ATOMIC_RELAXED)) { + pthread_cond_wait(&ctx->send_queue_cond, &ctx->send_queue_mutex); + } - if (!ctx->sap) { - log_error("ion_send_thread: invalid SAP for ipn:%u.%u", node_id, service_id); - ret = -EINVAL; - goto cleanup; - } + if (!__atomic_load_n(&ctx->running, __ATOMIC_RELAXED)) { + pthread_mutex_unlock(&ctx->send_queue_mutex); + break; + } - if (pthread_mutex_lock(&sdrmutex) != 0) { - log_error("ion_send_thread: sdr mutex lock failed"); - ret = -EAGAIN; - goto cleanup; - } + item = ctx->send_queue_head; + ctx->send_queue_head = item->next; + if (ctx->send_queue_head == NULL) { + ctx->send_queue_tail = NULL; + } + ctx->send_queue_size--; + pthread_mutex_unlock(&ctx->send_queue_mutex); - if (sdr_begin_xn(sdr) == 0) { - pthread_mutex_unlock(&sdrmutex); - log_error("ion_send_thread: sdr_begin_xn failed"); - ret = -EIO; - goto cleanup; - } + if (pthread_mutex_lock(&sdrmutex) != 0) { + log_error("ion_send_thread: sdr mutex lock failed"); + goto cleanup_item; + } - sdr_buffer = sdr_malloc(sdr, payload_size); - if (sdr_buffer == 0) { - pthread_mutex_unlock(&sdrmutex); - log_error("ion_send_thread: no space for payload"); - ret = -ENOSPC; - goto cleanup; - } + if (sdr_begin_xn(sdr) == 0) { + pthread_mutex_unlock(&sdrmutex); + log_error("ion_send_thread: sdr_begin_xn failed"); + goto cleanup_item; + } - sdr_write(sdr, sdr_buffer, (char *)payload, payload_size); + sdr_buffer = sdr_malloc(sdr, item->payload_size); + if (sdr_buffer == 0) { + pthread_mutex_unlock(&sdrmutex); + log_error("ion_send_thread: no space for payload"); + goto cleanup_item; + } - adu = zco_create(sdr, ZcoSdrSource, sdr_buffer, 0, (vast)payload_size, ZcoOutbound); - if (adu <= 0) { - pthread_mutex_unlock(&sdrmutex); - log_error("ion_send_thread: zco_create failed"); - ret = -ENOMEM; - goto cleanup; - } + sdr_write(sdr, sdr_buffer, (char *)item->payload, item->payload_size); - if (sdr_end_xn(sdr) < 0) { - pthread_mutex_unlock(&sdrmutex); - log_error("ion_send_thread: sdr_end_xn failed"); - ret = -EIO; - goto cleanup; - } + adu = zco_create(sdr, ZcoSdrSource, sdr_buffer, 0, (vast)item->payload_size, ZcoOutbound); + if (adu <= 0) { + pthread_mutex_unlock(&sdrmutex); + log_error("ion_send_thread: zco_create failed"); + goto cleanup_item; + } - pthread_mutex_unlock(&sdrmutex); + if (sdr_end_xn(sdr) < 0) { + pthread_mutex_unlock(&sdrmutex); + log_error("ion_send_thread: sdr_end_xn failed"); + goto cleanup_item; + } - if (bp_send(ctx->sap, (char *)dest_eid, NULL, 86400, BP_STD_PRIORITY, NoCustodyRequested, 0, 0, - NULL, adu, NULL) <= 0) { - log_error("ion_send_thread: bp_send failed"); - ret = -EIO; - goto cleanup; - } + pthread_mutex_unlock(&sdrmutex); - log_info("[ipn:%u.%u] SEND_BUNDLE: bundle sent to EID %s, size %zu (bytes)", node_id, - service_id, args->dest_eid, args->payload_size); + parsed_flags = bp_parse_flags(item->flags); + if (bp_send(ctx->sap, (char *)item->dest_eid, NULL, 86400, parsed_flags.class_of_service, + parsed_flags.custody_switch, parsed_flags.srr_flags, parsed_flags.ack_requested, + NULL, adu, NULL) <= 0) { + log_error("ion_send_thread: bp_send failed for %s", item->dest_eid); + goto cleanup_item; + } - free(args->dest_eid); - free(args->payload); - free(args); - return (void *)(intptr_t)0; + log_info("ion_send_thread: bundle sent to %s (size: %zu)", item->dest_eid, + item->payload_size); -cleanup: - free(args->dest_eid); - free(args->payload); + cleanup_item: + free(item->dest_eid); + free(item->payload); + free(item); + } + + log_info("ion_send_thread: exiting for ipn:%u.%u", ctx->node_id, ctx->service_id); free(args); - return (void *)(intptr_t)ret; + return NULL; } const char *bp_result_text(BpIndResult result) { diff --git a/daemon/ion.h b/daemon/ion.h index ecba04c..1c952a2 100644 --- a/daemon/ion.h +++ b/daemon/ion.h @@ -1,6 +1,7 @@ #ifndef ION_H #define ION_H +#include "../include/bp_socket.h" #include "bp.h" #include #include @@ -15,16 +16,47 @@ struct ion_recv_args { }; struct ion_send_args { - struct nl_sock *netlink_sock; - pthread_mutex_t *netlink_mutex; - int netlink_family; - u_int32_t node_id; - u_int32_t service_id; - char *dest_eid; - void *payload; - size_t payload_size; + struct endpoint_ctx *ctx; +}; + +struct bp_send_flags { + bool ack_requested; + unsigned char srr_flags; + int class_of_service; + BpCustodySwitch custody_switch; }; +// Helper function to parse flags into structured format +static inline struct bp_send_flags bp_parse_flags(u_int32_t flags) { + struct bp_send_flags result = { + .ack_requested = (flags & MSG_ACK_REQUESTED) != 0, + .srr_flags = 0, + .class_of_service = BP_STD_PRIORITY, + .custody_switch = NoCustodyRequested, + }; + + // Status reporting flags + if (flags & MSG_RECEIVED_RPT) result.srr_flags |= BP_RECEIVED_RPT; + if (flags & MSG_CUSTODY_RPT) result.srr_flags |= BP_CUSTODY_RPT; + if (flags & MSG_FORWARDED_RPT) result.srr_flags |= BP_FORWARDED_RPT; + if (flags & MSG_DELIVERED_RPT) result.srr_flags |= BP_DELIVERED_RPT; + if (flags & MSG_DELETED_RPT) result.srr_flags |= BP_DELETED_RPT; + + // Priority flags (mutually exclusive) + if (flags & MSG_BP_BULK_PRIORITY) + result.class_of_service = BP_BULK_PRIORITY; + else if (flags & MSG_BP_EXPEDITED_PRIORITY) + result.class_of_service = BP_EXPEDITED_PRIORITY; + + // Custody flags (mutually exclusive) + if (flags & MSG_SOURCE_CUSTODY_REQUIRED) + result.custody_switch = SourceCustodyRequired; + else if (flags & MSG_SOURCE_CUSTODY_OPTIONAL) + result.custody_switch = SourceCustodyOptional; + + return result; +} + int ion_open_endpoint(u_int32_t node_id, u_int32_t service_id, struct nl_sock *netlink_sock, pthread_mutex_t *netlink_mutex, int netlink_family); int ion_close_endpoint(u_int32_t node_id, u_int32_t service_id); diff --git a/include/bp_socket.h b/include/bp_socket.h index c87e9bd..e06f942 100644 --- a/include/bp_socket.h +++ b/include/bp_socket.h @@ -23,11 +23,32 @@ enum bp_genl_attrs { BP_GENL_A_DEST_SERVICE_ID, BP_GENL_A_PAYLOAD, BP_GENL_A_ADU, + BP_GENL_A_FLAGS, __BP_GENL_A_MAX, }; - #define BP_GENL_A_MAX (__BP_GENL_A_MAX - 1) +/* Bundle Protocol socket flags */ +/* ackRequested flag */ +#define MSG_ACK_REQUESTED 0x00000001 + +/* Status reporting flags (srrFlags) - can be combined with OR */ +#define MSG_RECEIVED_RPT 0x00000002 +#define MSG_CUSTODY_RPT 0x00000004 +#define MSG_FORWARDED_RPT 0x00000008 +#define MSG_DELIVERED_RPT 0x00000010 +#define MSG_DELETED_RPT 0x00000020 + +/* Priority flags (classOfService) - mutually exclusive */ +#define MSG_BP_BULK_PRIORITY 0x00000100 +#define MSG_BP_STD_PRIORITY 0x00000200 +#define MSG_BP_EXPEDITED_PRIORITY 0x00000400 + +/* Custody flags (custodySwitch) - mutually exclusive */ +#define MSG_SOURCE_CUSTODY_REQUIRED 0x00001000 +#define MSG_SOURCE_CUSTODY_OPTIONAL 0x00002000 +#define MSG_NO_CUSTODY_REQUIRED 0x00004000 + /* Commands */ enum bp_genl_cmds { BP_GENL_CMD_UNSPEC, diff --git a/sender.c b/sender.c index bd1a849..9cc334d 100644 --- a/sender.c +++ b/sender.c @@ -55,7 +55,14 @@ int main(int argc, char *argv[]) { dest_addr.bp_addr.ipn.service_id = service_id; char *message = "Hello!"; - ret = sendto(fd, message, strlen(message) + 1, 0, + int flags = 0; + flags |= MSG_ACK_REQUESTED; + // flags |= MSG_RECEIVED_RPT; + // flags |= MSG_DELIVERED_RPT; + // flags |= MSG_BP_EXPEDITED_PRIORITY; + // flags |= MSG_SOURCE_CUSTODY_OPTIONAL; + + ret = sendto(fd, message, strlen(message) + 1, flags, (struct sockaddr *)&dest_addr, sizeof(dest_addr)); if (ret < 0) { perror("sendto failed"); diff --git a/tests/Makefile b/tests/Makefile deleted file mode 100644 index 2d40ba0..0000000 --- a/tests/Makefile +++ /dev/null @@ -1,20 +0,0 @@ -CC = gcc -CFLAGS = -Wall -g -LDFLAGS = -lbp -lici -lm - -all: sender receiver - -sender: sender.c - $(CC) $(CFLAGS) -o sender sender.c $(LDFLAGS) - -receiver: receiver.c - $(CC) $(CFLAGS) -o receiver receiver.c $(LDFLAGS) - -clean: - rm -f sender receiver - -format: - clang-format -i sender.c receiver.c - -test: - ./inject_bundles.sh diff --git a/tests/inject_bundles.sh b/tests/inject_bundles.sh deleted file mode 100755 index d077df1..0000000 --- a/tests/inject_bundles.sh +++ /dev/null @@ -1,48 +0,0 @@ -#!/usr/bin/env bash - -# EID cible (modifie si besoin) -TARGET_EID="ipn:10.42" - -# Fonction d'envoi -send() { - label="$1" - file="$2" - echo "[*] Sending payload: $label" - bpsource "$TARGET_EID" < "$file" -} - -# Répertoire temporaire -TMPDIR=$(mktemp -d) -trap "rm -rf $TMPDIR" EXIT - -# 1) Texte ASCII simple -echo "Hello world" > "$TMPDIR/ascii.txt" -send "ascii" "$TMPDIR/ascii.txt" - -# 2) Texte UTF-8 multibyte -echo "こんにちは世界" > "$TMPDIR/utf8.txt" -send "utf8" "$TMPDIR/utf8.txt" - -# 3) JSON -echo '{"key": "value"}' > "$TMPDIR/json.txt" -send "json" "$TMPDIR/json.txt" - -# 4) Binaire incrémental (0x00 à 0xFF) -for i in $(seq 0 255); do - printf "\\x$(printf %02x $i)" -done > "$TMPDIR/incremental.bin" -send "incremental" "$TMPDIR/incremental.bin" - -# 5) Binaire uniforme (0xAA) -head -c 1024 < /dev/zero | tr '\0' '\xAA' > "$TMPDIR/uniform.bin" -send "uniform" "$TMPDIR/uniform.bin" - -# 6) Données aléatoires (4 Ko) -head -c 4096 < /dev/urandom > "$TMPDIR/random.bin" -send "random" "$TMPDIR/random.bin" - -# 7) Gros fichier (1 Mo) -head -c $((1 * 1024 * 1024)) < /dev/zero > "$TMPDIR/big.bin" -send "big" "$TMPDIR/big.bin" - -echo "[+] All bundles sent successfully." diff --git a/tests/receiver.c b/tests/receiver.c deleted file mode 100644 index d987acb..0000000 --- a/tests/receiver.c +++ /dev/null @@ -1,49 +0,0 @@ -#include "../include/bp_socket.h" -#include -#include -#include -#include -#include -#include - -#define AF_BP 28 -#define BUFFER_SIZE (1024) - -void receive_and_check(int sfd) { - char buf[BUFFER_SIZE]; - ssize_t n = recv(sfd, buf, sizeof(buf), 0); - if (n < 0) { - fprintf(stderr, "recv failed: %s\n", strerror(errno)); - abort(); - } - - printf("[+] Received %zd bytes\n", n); -} - -int main() { - int sfd = socket(AF_BP, SOCK_DGRAM, 1); - if (sfd < 0) { - perror("socket"); - return 1; - } - - struct sockaddr_bp addr = {0}; - addr.bp_family = AF_BP; - addr.bp_scheme = BP_SCHEME_IPN; - addr.bp_addr.ipn.node_id = 1; - addr.bp_addr.ipn.service_id = 42; - - if (bind(sfd, (struct sockaddr *)&addr, sizeof(addr)) < 0) { - perror("bind"); - return 1; - } - - printf("Waiting for incoming bundles...\n"); - - while (1) { - receive_and_check(sfd); - } - - close(sfd); - return 0; -} diff --git a/tests/sender.c b/tests/sender.c deleted file mode 100644 index f6001bd..0000000 --- a/tests/sender.c +++ /dev/null @@ -1,112 +0,0 @@ -#include "../include/bp_socket.h" -#include -#include -#include -#include -#include -#include - -#define AF_BP 28 - -struct test_payload { - const char *label; - const void *data; - size_t len; - ssize_t expected_size; - int expected_errno; -}; - -void send_payload(int sfd, uint32_t node_id, uint32_t service_id, - const void *data, size_t len, const char *label, - ssize_t expected_size, int expected_errno) { - struct sockaddr_bp addr = {0}; - addr.bp_family = AF_BP; - addr.bp_scheme = BP_SCHEME_IPN; - addr.bp_addr.ipn.node_id = node_id; - addr.bp_addr.ipn.service_id = service_id; - - errno = 0; - ssize_t n = sendto(sfd, data, len, 0, (struct sockaddr *)&addr, sizeof(addr)); - - if (n < 0) { - if (expected_errno == 0) { - fprintf(stderr, "[%s] Unexpected error: %s\n", label, strerror(errno)); - abort(); - } - if (errno != expected_errno) { - fprintf( - stderr, - "[%s] Assertion failed: expected errno %d (%s), got errno %d (%s)\n", - label, expected_errno, strerror(expected_errno), errno, - strerror(errno)); - abort(); - } - } else { - if (expected_errno != 0) { - fprintf(stderr, - "[%s] Expected failure with errno %d (%s), but sendto succeeded " - "with %zd bytes\n", - label, expected_errno, strerror(expected_errno), n); - abort(); - } - if (n != expected_size) { - fprintf(stderr, "[%s] Assertion failed: expected %zd bytes, got %zd\n", - label, expected_size, n); - abort(); - } - } -} - -int main() { - int sfd = socket(AF_BP, SOCK_DGRAM, 1); - if (sfd < 0) { - perror("socket"); - return 1; - } - - const char *ascii = "Hello world"; - const char *utf8 = "こんにちは世界"; - const char *json = "{\"key\":\"value\"}"; - - unsigned char incremental[256]; - for (int i = 0; i < 256; i++) - incremental[i] = i; - - unsigned char uniform[1024]; - memset(uniform, 0xAA, sizeof(uniform)); - - unsigned char random[4096]; - for (int i = 0; i < 4096; i++) - random[i] = rand() % 256; - - char *big = malloc(10 * 1024 * 1024); - memset(big, 'X', 10 * 1024 * 1024); - - struct test_payload payloads[] = { - {"empty", "", 0, 0, 0}, - {"ascii", ascii, strlen(ascii), strlen(ascii), 0}, - {"utf8", utf8, strlen(utf8), strlen(utf8), 0}, - {"json", json, strlen(json), strlen(json), 0}, - {"incremental", incremental, sizeof(incremental), sizeof(incremental), 0}, - {"uniform", uniform, sizeof(uniform), sizeof(uniform), 0}, - {"random", random, sizeof(random), sizeof(random), 0}, - {"small", "small data", 11, 11, 0}, - {"medium", "medium data that is larger than small but smaller than big", - 60, 60, 0}, - {"large", - "a large payload that is significantly larger than medium but still " - "manageable", - 1000, 1000, 0}, - {"big", big, 10 * 1024 * 1024, -1, EMSGSIZE}, - }; - - for (size_t i = 0; i < sizeof(payloads) / sizeof(payloads[0]); i++) { - send_payload(sfd, 10, 42, payloads[i].data, payloads[i].len, - payloads[i].label, payloads[i].expected_size, - payloads[i].expected_errno); - } - - free(big); - close(sfd); - return 0; -}