diff --git a/bp_socket/af_bp.c b/bp_socket/af_bp.c index 4071b31..34ce876 100644 --- a/bp_socket/af_bp.c +++ b/bp_socket/af_bp.c @@ -1,7 +1,14 @@ +#include +#include #include #include -#include +#include +#include +#include +#include +#include #include +#include #include "../include/bp_socket.h" #include "af_bp.h" @@ -30,16 +37,13 @@ static struct sock* bp_alloc_socket(struct net* net, int kern) bp = bp_sk(sk); skb_queue_head_init(&bp->rx_queue); init_waitqueue_head(&bp->rx_waitq); - init_waitqueue_head(&bp->tx_waitq); - mutex_init(&bp->tx_mutex); mutex_init(&bp->rx_mutex); bp->bp_node_id = 0; bp->bp_service_id = 0; - bp->rx_canceled = false; - bp->tx_confirmed = false; - bp->tx_error = false; + sk->sk_rcvtimeo + = MAX_SCHEDULE_TIMEOUT; /* Default: no timeout */ } return sk; @@ -61,12 +65,12 @@ struct proto_ops bp_proto_ops = { .family = AF_BP, .mmap = sock_no_mmap, .accept = sock_no_accept, .getname = sock_no_getname, - // .poll = datagram_poll, + // .poll = bp_poll, .ioctl = sock_no_ioctl, .listen = sock_no_listen, .shutdown = sock_no_shutdown, - .setsockopt = sock_common_setsockopt, - .getsockopt = sock_common_getsockopt, + .setsockopt = bp_setsockopt, + .getsockopt = bp_getsockopt, .sendmsg = bp_sendmsg, .recvmsg = bp_recvmsg }; @@ -169,7 +173,11 @@ int bp_bind(struct socket* sock, struct sockaddr* uaddr, int addr_len) // Notify user-space daemon to open endpoint (bp_open) and prepare // threads/state - open_endpoint_doit(node_id, service_id, 8443); + ret = open_endpoint_doit(node_id, service_id, 8443); + if (ret < 0) { + pr_err("bp_bind: open_endpoint_doit failed (%d)\n", ret); + goto out; + } return 0; @@ -298,42 +306,9 @@ int bp_sendmsg(struct socket* sock, struct msghdr* msg, size_t size) goto err_free; } - // Wait for confirmation from daemon - pr_info("bp_sendmsg: waiting for confirmation for endpoint " - "ipn:%u.%u\n", - bp->bp_node_id, bp->bp_service_id); - - // Reset confirmation flag and wait - mutex_lock(&bp->tx_mutex); - bp->tx_confirmed = false; - bp->tx_error = false; - mutex_unlock(&bp->tx_mutex); - - ret = wait_event_interruptible( - bp->tx_waitq, bp->tx_confirmed || signal_pending(current)); - - if (ret < 0) { - pr_err("bp_sendmsg: interrupted while waiting for " - "confirmation\n"); - goto err_free; - } - - mutex_lock(&bp->tx_mutex); - if (bp->tx_confirmed) { - if (bp->tx_error) { - pr_err("bp_sendmsg: bundle send failed for " - "endpoint ipn:%u.%u\n", - bp->bp_node_id, bp->bp_service_id); - mutex_unlock(&bp->tx_mutex); - ret = -EIO; // Return error to user space - goto err_free; - } else { - pr_info("bp_sendmsg: confirmation received for " - "endpoint ipn:%u.%u\n", - bp->bp_node_id, bp->bp_service_id); - } - } - mutex_unlock(&bp->tx_mutex); + pr_info("bp_sendmsg: bundle sent for endpoint ipn:%u.%u (size: " + "%zu)\n", + bp->bp_node_id, bp->bp_service_id, size); kfree(payload); } @@ -352,9 +327,11 @@ int bp_recvmsg(struct socket* sock, struct msghdr* msg, size_t size, int flags) struct bp_sock* bp; struct sk_buff* skb = NULL; struct sockaddr_bp* src_addr; + long timeo; int ret; sk = sock->sk; + timeo = sock_rcvtimeo(sk, flags & MSG_DONTWAIT); lock_sock(sk); bp = bp_sk(sk); @@ -364,27 +341,18 @@ int bp_recvmsg(struct socket* sock, struct msghdr* msg, size_t size, int flags) goto out; } - ret = request_bundle_doit(bp->bp_node_id, bp->bp_service_id, 8443); - if (ret < 0) { - pr_err("bp_recvmsg: request_bundle_doit failed (%d)\n", ret); - goto out; - } - - ret = wait_event_interruptible( - bp->rx_waitq, !skb_queue_empty(&bp->rx_queue) || bp->rx_canceled); - if (ret < 0) { - pr_err("bp_recvmsg: interrupted while waiting\n"); + ret = wait_event_interruptible_timeout( + bp->rx_waitq, !skb_queue_empty(&bp->rx_queue), timeo); + if (ret == 0) { + pr_err("bp_recvmsg: timeout waiting for message\n"); + ret = -EAGAIN; goto out; } - - mutex_lock(&bp->rx_mutex); - if (bp->rx_canceled) { - pr_info("bp_recvmsg: bundle request canceled\n"); - mutex_unlock(&bp->rx_mutex); - ret = -ECANCELED; + if (ret == -ERESTARTSYS) { + pr_err("bp_recvmsg: interrupted by signal\n"); + ret = -EINTR; goto out; } - mutex_unlock(&bp->rx_mutex); if (sock_flag(sk, SOCK_DEAD)) { pr_err("bp_recvmsg: socket closed while waiting\n"); @@ -395,10 +363,10 @@ int bp_recvmsg(struct socket* sock, struct msghdr* msg, size_t size, int flags) mutex_lock(&bp->rx_mutex); skb = skb_dequeue(&bp->rx_queue); if (!skb) { - pr_info("bp_recvmsg: no messages in the queue for service %d\n", - bp->bp_service_id); + pr_info("bp_recvmsg: no messages in the queue for ipn:%u.%u\n", + bp->bp_node_id, bp->bp_service_id); mutex_unlock(&bp->rx_mutex); - ret = -ENOMSG; + ret = -EAGAIN; goto out; } mutex_unlock(&bp->rx_mutex); @@ -427,11 +395,10 @@ int bp_recvmsg(struct socket* sock, struct msghdr* msg, size_t size, int flags) goto out; } - ret = destroy_bundle_doit(bp->bp_node_id, bp->bp_service_id, 8443); + ret = destroy_bundle_doit(BP_SKB_CB(skb)->adu, 8443); if (ret < 0) { - pr_err( - "destroy_bundle_doit failed (%d), will retry later", ret); - // enqueue_retry(bp->bp_node_id, bp->bp_service_id); + pr_warn( + "bp_recvmsg: failed to destroy bundle, bundle may leak\n"); } ret = skb->len; @@ -442,3 +409,160 @@ int bp_recvmsg(struct socket* sock, struct msghdr* msg, size_t size, int flags) release_sock(sk); return ret; } + +int bp_setsockopt(struct socket* sock, int level, int optname, sockptr_t optval, + unsigned int optlen) +{ + struct sock* sk = sock->sk; + struct bp_sock* bp; + int ret = 0; + + if (level != SOL_SOCKET) + return sock_common_setsockopt( + sock, level, optname, optval, optlen); + + lock_sock(sk); + bp = bp_sk(sk); + + switch (optname) { + + case SO_RCVTIMEO_OLD: { + struct __kernel_old_timeval tv; + unsigned long t; + if (optlen < sizeof(tv)) { + ret = -EINVAL; + break; + } + if (copy_from_sockptr(&tv, optval, sizeof(tv))) { + ret = -EFAULT; + break; + } + + if (tv.tv_sec == 0 && tv.tv_usec == 0) { + t = 0; + } else if (tv.tv_sec < 0 || tv.tv_usec < 0 + || tv.tv_usec >= 1000000) { + ret = -EINVAL; + break; + } else { + u64 usec = (u64)tv.tv_sec * 1000000ULL + tv.tv_usec; + t = usecs_to_jiffies(usec); + } + sk->sk_rcvtimeo = t; + break; + } + + case SO_RCVTIMEO_NEW: { + struct __kernel_timespec ts; + unsigned long t; + if (optlen < sizeof(ts)) { + ret = -EINVAL; + break; + } + if (copy_from_sockptr(&ts, optval, sizeof(ts))) { + ret = -EFAULT; + break; + } + + if (ts.tv_sec == 0 && ts.tv_nsec == 0) { + t = 0; + } else if (ts.tv_sec < 0 || ts.tv_nsec < 0 + || ts.tv_nsec >= 1000000000L) { + ret = -EINVAL; + break; + } else { + struct timespec64 t64 + = { .tv_sec = ts.tv_sec, .tv_nsec = ts.tv_nsec }; + t = timespec64_to_jiffies(&t64); + } + sk->sk_rcvtimeo = t; + break; + } + + default: + ret = sock_common_setsockopt( + sock, level, optname, optval, optlen); + break; + } + + release_sock(sk); + return ret; +} + +int bp_getsockopt(struct socket* sock, int level, int optname, + char __user* optval, int __user* optlen) +{ + struct sock* sk = sock->sk; + struct bp_sock* bp; + int ret = 0, len; + + if (level != SOL_SOCKET) + return sock_common_getsockopt( + sock, level, optname, optval, optlen); + + if (get_user(len, optlen)) + return -EFAULT; + + lock_sock(sk); + bp = bp_sk(sk); + + switch (optname) { + + case SO_RCVTIMEO_OLD: { + struct __kernel_old_timeval tv; + unsigned long t = sk->sk_rcvtimeo; /* ou bp->rcv_timeout */ + + if (len < (int)sizeof(tv)) { + ret = -EINVAL; + break; + } + + if (t == MAX_SCHEDULE_TIMEOUT) { + tv.tv_sec = 0; + tv.tv_usec = 0; + } else { + unsigned long msec = jiffies_to_msecs(t); + tv.tv_sec = msec / 1000; + tv.tv_usec = (msec % 1000) * 1000; + } + + if (copy_to_user(optval, &tv, sizeof(tv)) + || put_user(sizeof(tv), optlen)) + ret = -EFAULT; + break; + } + + case SO_RCVTIMEO_NEW: { + struct __kernel_timespec ts; + unsigned long t = sk->sk_rcvtimeo; + + if (len < (int)sizeof(ts)) { + ret = -EINVAL; + break; + } + + if (t == MAX_SCHEDULE_TIMEOUT) { + ts.tv_sec = 0; + ts.tv_nsec = 0; + } else { + unsigned long msec = jiffies_to_msecs(t); + u64 ns = (u64)msec * 1000000ULL; + ts.tv_sec = div_u64(ns, 1000000000ULL); + ts.tv_nsec = ns % 1000000000ULL; + } + + if (copy_to_user(optval, &ts, sizeof(ts)) + || put_user(sizeof(ts), optlen)) + ret = -EFAULT; + break; + } + + default: + ret = sock_common_getsockopt( + sock, level, optname, optval, optlen); + break; + } + + release_sock(sk); + return ret; +} diff --git a/bp_socket/af_bp.h b/bp_socket/af_bp.h index 4f8688a..2fe7028 100644 --- a/bp_socket/af_bp.h +++ b/bp_socket/af_bp.h @@ -7,6 +7,7 @@ struct bp_skb_cb { u_int32_t src_node_id; u_int32_t src_service_id; + uint64_t adu; }; #define bp_sk(ptr) container_of(ptr, struct bp_sock, sk) @@ -26,13 +27,6 @@ struct bp_sock { struct mutex rx_mutex; struct sk_buff_head rx_queue; wait_queue_head_t rx_waitq; - bool rx_canceled; - - // Transmission - struct mutex tx_mutex; - wait_queue_head_t tx_waitq; - bool tx_confirmed; - bool tx_error; }; int bp_bind(struct socket* sock, struct sockaddr* addr, int addr_len); @@ -40,5 +34,9 @@ int bp_create(struct net* net, struct socket* sock, int protocol, int kern); int bp_release(struct socket* sock); int bp_sendmsg(struct socket* sock, struct msghdr* msg, size_t size); int bp_recvmsg(struct socket* sock, struct msghdr* msg, size_t size, int flags); +int bp_setsockopt(struct socket* sock, int level, int optname, sockptr_t optval, + unsigned int optlen); +int bp_getsockopt(struct socket* sock, int level, int optname, + char __user* optval, int __user* optlen); #endif diff --git a/bp_socket/bp_genl.c b/bp_socket/bp_genl.c index c9e5d8f..1794fb5 100644 --- a/bp_socket/bp_genl.c +++ b/bp_socket/bp_genl.c @@ -11,37 +11,16 @@ static const struct nla_policy nla_policy[BP_GENL_A_MAX + 1] = { [BP_GENL_A_DEST_NODE_ID] = { .type = NLA_U32 }, [BP_GENL_A_DEST_SERVICE_ID] = { .type = NLA_U32 }, [BP_GENL_A_PAYLOAD] = { .type = NLA_BINARY }, - [BP_GENL_A_ERROR_CODE] = { .type = NLA_U32 }, + [BP_GENL_A_ADU] = { .type = NLA_U64 }, }; static struct genl_ops genl_ops[] = { { - .cmd = BP_GENL_CMD_DELIVER_BUNDLE, - .flags = GENL_ADMIN_PERM, - .policy = nla_policy, - .doit = deliver_bundle_doit, - .dumpit = NULL, - }, - { - .cmd = BP_GENL_CMD_CANCEL_BUNDLE_REQUEST, - .flags = GENL_ADMIN_PERM, - .policy = nla_policy, - .doit = cancel_bundle_request_doit, - .dumpit = NULL, - }, - { - .cmd = BP_GENL_CMD_SEND_BUNDLE_CONFIRMATION, - .flags = GENL_ADMIN_PERM, - .policy = nla_policy, - .doit = send_bundle_confirmation_doit, - .dumpit = NULL, - }, - { - .cmd = BP_GENL_CMD_SEND_BUNDLE_FAILURE, - .flags = GENL_ADMIN_PERM, - .policy = nla_policy, - .doit = send_bundle_failure_doit, - .dumpit = NULL, - } }; + .cmd = BP_GENL_CMD_ENQUEUE_BUNDLE, + .flags = GENL_ADMIN_PERM, + .policy = nla_policy, + .doit = enqueue_bundle_doit, + .dumpit = NULL, +} }; /* Multicast groups for our family */ static const struct genl_multicast_group genl_mcgrps[] = { @@ -136,104 +115,14 @@ int send_bundle_doit(void* payload, size_t payload_size, u_int32_t dest_node_id, return ret; } -int request_bundle_doit( - u_int32_t dest_node_id, u_int32_t dest_service_id, int port_id) -{ - void* msg_head; - struct sk_buff* msg; - size_t msg_size; - int ret; - - msg_size = 2 * nla_total_size(sizeof(u_int32_t)); - msg = genlmsg_new(msg_size, GFP_KERNEL); - if (!msg) { - pr_err("request_bundle: failed to allocate message buffer\n"); - ret = -ENOMEM; - goto out; - } - - msg_head - = genlmsg_put(msg, 0, 0, &genl_fam, 0, BP_GENL_CMD_REQUEST_BUNDLE); - if (!msg_head) { - pr_err("request_bundle: failed to create genetlink header\n"); - ret = -EMSGSIZE; - goto err_free; - } - - ret = nla_put_u32(msg, BP_GENL_A_DEST_NODE_ID, dest_node_id); - if (ret) { - pr_err("request_bundle: failed to put BP_GENL_A_DEST_NODE_ID " - "(%d)\n", - ret); - goto err_cancel; - } - - ret = nla_put_u32(msg, BP_GENL_A_DEST_SERVICE_ID, dest_service_id); - if (ret) { - pr_err("request_bundle: failed to put " - "BP_GENL_A_DEST_SERVICE_ID (%d)\n", - ret); - goto err_cancel; - } - - genlmsg_end(msg, msg_head); - return genlmsg_unicast(&init_net, msg, port_id); - -err_cancel: - genlmsg_cancel(msg, msg_head); -err_free: - nlmsg_free(msg); -out: - return ret; -} - -int cancel_bundle_request_doit(struct sk_buff* skb, struct genl_info* info) -{ - struct sock* sk; - struct bp_sock* bp; - u_int32_t dest_node_id, dest_service_id; - - if (!info->attrs[BP_GENL_A_DEST_NODE_ID] - || !info->attrs[BP_GENL_A_DEST_SERVICE_ID]) { - pr_err("cancel_bundle_request: missing required attributes\n"); - return -EINVAL; - } - - dest_node_id = nla_get_u32(info->attrs[BP_GENL_A_DEST_NODE_ID]); - dest_service_id = nla_get_u32(info->attrs[BP_GENL_A_DEST_SERVICE_ID]); - - read_lock_bh(&bp_list_lock); - sk_for_each(sk, &bp_list) - { - bh_lock_sock(sk); - bp = bp_sk(sk); - - if (bp->bp_node_id == dest_node_id - && bp->bp_service_id == dest_service_id) { - - if (waitqueue_active(&bp->rx_waitq)) { - mutex_lock(&bp->rx_mutex); - bp->rx_canceled = true; - mutex_unlock(&bp->rx_mutex); - wake_up_interruptible(&bp->rx_waitq); - } - bh_unlock_sock(sk); - break; - } - bh_unlock_sock(sk); - } - read_unlock_bh(&bp_list_lock); - - return 0; -} - -int deliver_bundle_doit(struct sk_buff* skb, struct genl_info* info) +int enqueue_bundle_doit(struct sk_buff* skb, struct genl_info* info) { struct sock* sk; struct bp_sock* bp; struct sk_buff* new_skb; bool new_skb_queued = false; u_int32_t dest_node_id, dest_service_id, src_node_id, src_service_id; + uint64_t adu; void* payload; size_t payload_len; int ret; @@ -242,8 +131,8 @@ int deliver_bundle_doit(struct sk_buff* skb, struct genl_info* info) || !info->attrs[BP_GENL_A_DEST_SERVICE_ID] || !info->attrs[BP_GENL_A_SRC_NODE_ID] || !info->attrs[BP_GENL_A_SRC_SERVICE_ID] - || !info->attrs[BP_GENL_A_PAYLOAD]) { - pr_err("deliver_bundle: missing required attributes\n"); + || !info->attrs[BP_GENL_A_PAYLOAD] || !info->attrs[BP_GENL_A_ADU]) { + pr_err("enqueue_bundle: missing required attributes\n"); ret = -EINVAL; goto out; } @@ -254,6 +143,7 @@ int deliver_bundle_doit(struct sk_buff* skb, struct genl_info* info) src_service_id = nla_get_u32(info->attrs[BP_GENL_A_SRC_SERVICE_ID]); payload = nla_data(info->attrs[BP_GENL_A_PAYLOAD]); payload_len = nla_len(info->attrs[BP_GENL_A_PAYLOAD]); + adu = nla_get_u64(info->attrs[BP_GENL_A_ADU]); new_skb = alloc_skb(payload_len, GFP_KERNEL); if (!new_skb) { @@ -264,6 +154,7 @@ int deliver_bundle_doit(struct sk_buff* skb, struct genl_info* info) skb_put_data(new_skb, payload, payload_len); BP_SKB_CB(new_skb)->src_node_id = src_node_id; BP_SKB_CB(new_skb)->src_service_id = src_service_id; + BP_SKB_CB(new_skb)->adu = adu; read_lock_bh(&bp_list_lock); sk_for_each(sk, &bp_list) @@ -289,7 +180,7 @@ int deliver_bundle_doit(struct sk_buff* skb, struct genl_info* info) read_unlock_bh(&bp_list_lock); if (!new_skb_queued) { - pr_err("deliver_bundle: no socket found (ipn:%d.%d)\n", + pr_err("enqueue_bundle: no socket found (ipn:%d.%d)\n", dest_node_id, dest_service_id); ret = -ENODEV; goto err_free; @@ -303,15 +194,14 @@ int deliver_bundle_doit(struct sk_buff* skb, struct genl_info* info) return ret; } -int destroy_bundle_doit( - u_int32_t dest_node_id, u_int32_t dest_service_id, int port_id) +int destroy_bundle_doit(uint64_t adu, int port_id) { void* msg_head; struct sk_buff* msg; size_t msg_size; int ret; - msg_size = 2 * nla_total_size(sizeof(u_int32_t)); + msg_size = nla_total_size(sizeof(uint64_t)); msg = genlmsg_new(msg_size, GFP_KERNEL); if (!msg) { pr_err("destroy_bundle: failed to allocate message buffer\n"); @@ -327,19 +217,10 @@ int destroy_bundle_doit( goto err_free; } - ret = nla_put_u32(msg, BP_GENL_A_DEST_NODE_ID, dest_node_id); - if (ret) { - pr_err("destroy_bundle: failed to put BP_GENL_A_DEST_NODE_ID " - "(%d)\n", - ret); - goto err_cancel; - } - - ret = nla_put_u32(msg, BP_GENL_A_DEST_SERVICE_ID, dest_service_id); + ret = nla_put_u64_64bit(msg, BP_GENL_A_ADU, adu, 0); if (ret) { - pr_err("destroy_bundle: failed to put " - "BP_GENL_A_DEST_SERVICE_ID (%d)\n", - ret); + pr_err( + "destroy_bundle: failed to put BP_GENL_A_ADU (%d)\n", ret); goto err_cancel; } @@ -404,92 +285,6 @@ int open_endpoint_doit(u_int32_t node_id, u_int32_t service_id, int port_id) return ret; } -int send_bundle_confirmation_doit(struct sk_buff* skb, struct genl_info* info) -{ - u_int32_t src_node_id, src_service_id; - struct sock* sk; - struct bp_sock* bp; - - if (!info->attrs[BP_GENL_A_SRC_NODE_ID] - || !info->attrs[BP_GENL_A_SRC_SERVICE_ID]) { - pr_err("send_bundle_confirmation_doit: missing attribute(s)\n"); - return -EINVAL; - } - - src_node_id = nla_get_u32(info->attrs[BP_GENL_A_SRC_NODE_ID]); - src_service_id = nla_get_u32(info->attrs[BP_GENL_A_SRC_SERVICE_ID]); - - pr_info("send_bundle_confirmation_doit: received confirmation for " - "ipn:%u.%u\n", - src_node_id, src_service_id); - - // Find the socket waiting for confirmation - read_lock_bh(&bp_list_lock); - sk_for_each(sk, &bp_list) - { - bp = bp_sk(sk); - if (bp->bp_node_id == src_node_id - && bp->bp_service_id == src_service_id) { - // Found the socket, wake it up - mutex_lock(&bp->tx_mutex); - bp->tx_confirmed = true; - mutex_unlock(&bp->tx_mutex); - wake_up(&bp->tx_waitq); - pr_info( - "send_bundle_confirmation_doit: woke up socket for " - "ipn:%u.%u\n", - src_node_id, src_service_id); - break; - } - } - read_unlock_bh(&bp_list_lock); - - return 0; -} - -int send_bundle_failure_doit(struct sk_buff* skb, struct genl_info* info) -{ - u_int32_t src_node_id, src_service_id; - struct sock* sk; - struct bp_sock* bp; - - if (!info->attrs[BP_GENL_A_SRC_NODE_ID] - || !info->attrs[BP_GENL_A_SRC_SERVICE_ID]) { - pr_err("send_bundle_failure_doit: missing attribute(s)\n"); - return -EINVAL; - } - - src_node_id = nla_get_u32(info->attrs[BP_GENL_A_SRC_NODE_ID]); - src_service_id = nla_get_u32(info->attrs[BP_GENL_A_SRC_SERVICE_ID]); - - pr_info("send_bundle_failure_doit: received failure notification for " - "ipn:%u.%u\n", - src_node_id, src_service_id); - - // Find the socket waiting for confirmation and mark it as failed - read_lock_bh(&bp_list_lock); - sk_for_each(sk, &bp_list) - { - bp = bp_sk(sk); - if (bp->bp_node_id == src_node_id - && bp->bp_service_id == src_service_id) { - // Found the socket, mark as failed and wake it up - mutex_lock(&bp->tx_mutex); - bp->tx_confirmed = true; - bp->tx_error = true; - mutex_unlock(&bp->tx_mutex); - wake_up(&bp->tx_waitq); - pr_info("send_bundle_failure_doit: woke up socket for " - "ipn:%u.%u with failure\n", - src_node_id, src_service_id); - break; - } - } - read_unlock_bh(&bp_list_lock); - - return 0; -} - int close_endpoint_doit(u_int32_t node_id, u_int32_t service_id, int port_id) { void* msg_head; diff --git a/bp_socket/bp_genl.h b/bp_socket/bp_genl.h index b21e983..dfed82a 100644 --- a/bp_socket/bp_genl.h +++ b/bp_socket/bp_genl.h @@ -10,13 +10,7 @@ int close_endpoint_doit(u_int32_t node_id, u_int32_t service_id, int port_id); int send_bundle_doit(void* payload, size_t payload_size, u_int32_t dest_node_id, u_int32_t dest_service_id, u_int32_t src_node_id, u_int32_t src_service_id, int port_id); -int send_bundle_confirmation_doit(struct sk_buff* skb, struct genl_info* info); -int send_bundle_failure_doit(struct sk_buff* skb, struct genl_info* info); -int deliver_bundle_doit(struct sk_buff* skb, struct genl_info* info); -int request_bundle_doit( - u_int32_t dest_node_id, u_int32_t dest_service_id, int port_id); -int cancel_bundle_request_doit(struct sk_buff* skb, struct genl_info* info); -int destroy_bundle_doit( - u_int32_t dest_node_id, u_int32_t dest_service_id, int port_id); +int enqueue_bundle_doit(struct sk_buff* skb, struct genl_info* info); +int destroy_bundle_doit(uint64_t adu, int port_id); #endif \ No newline at end of file diff --git a/daemon/adu_registry.c b/daemon/adu_registry.c deleted file mode 100644 index f28a98c..0000000 --- a/daemon/adu_registry.c +++ /dev/null @@ -1,91 +0,0 @@ -#include "adu_registry.h" -#include "log.h" -#include -#include - -static pthread_mutex_t adu_mutex = PTHREAD_MUTEX_INITIALIZER; -static adu_node_t *adu_head = NULL; - -static adu_node_t *find_node(uint32_t node_id, uint32_t service_id, adu_node_t **prev) { - adu_node_t *p = adu_head; - adu_node_t *q = NULL; - while (p) { - if (p->key.node_id == node_id && p->key.service_id == service_id) { - if (prev) *prev = q; - return p; - } - q = p; - p = p->next; - } - if (prev) *prev = NULL; - return NULL; -} - -bool adu_registry_contains(uint32_t node_id, uint32_t service_id) { - pthread_mutex_lock(&adu_mutex); - bool exists = find_node(node_id, service_id, NULL) != NULL; - pthread_mutex_unlock(&adu_mutex); - return exists; -} - -int adu_registry_add(Object adu, uint32_t dest_node_id, uint32_t dest_service_id, - uint32_t src_node_id, uint32_t src_service_id) { - int ret = 0; - adu_node_t *node; - - pthread_mutex_lock(&adu_mutex); - if (find_node(dest_node_id, dest_service_id, NULL)) { - pthread_mutex_unlock(&adu_mutex); - return 0; // Already exists - } - pthread_mutex_unlock(&adu_mutex); - - node = (adu_node_t *)calloc(1, sizeof(adu_node_t)); - if (!node) { - log_error("adu_registry_add: Failed to allocate memory for bundle reference"); - return -ENOMEM; - } - - node->key.node_id = dest_node_id; - node->key.service_id = dest_service_id; - node->src_node_id = src_node_id; - node->src_service_id = src_service_id; - node->adu = adu; - - pthread_mutex_lock(&adu_mutex); - node->next = adu_head; - adu_head = node; - pthread_mutex_unlock(&adu_mutex); - - return ret; -} - -adu_node_t *adu_registry_get(uint32_t node_id, uint32_t service_id) { - pthread_mutex_lock(&adu_mutex); - adu_node_t *node = find_node(node_id, service_id, NULL); - pthread_mutex_unlock(&adu_mutex); - return node; -} - -Object adu_registry_remove(uint32_t node_id, uint32_t service_id) { - pthread_mutex_lock(&adu_mutex); - adu_node_t *prev = NULL; - adu_node_t *node = find_node(node_id, service_id, &prev); - Object adu = 0; - - if (!node) { - pthread_mutex_unlock(&adu_mutex); - log_warn("adu_registry_remove: no bundle found (ipn:%u.%u)", node_id, service_id); - return 0; - } - - adu = node->adu; - if (prev) - prev->next = node->next; - else - adu_head = node->next; - - pthread_mutex_unlock(&adu_mutex); - free(node); - return adu; -} diff --git a/daemon/adu_registry.h b/daemon/adu_registry.h deleted file mode 100644 index e575bda..0000000 --- a/daemon/adu_registry.h +++ /dev/null @@ -1,27 +0,0 @@ -#ifndef ADU_REGISTRY_H -#define ADU_REGISTRY_H - -#include "bp.h" -#include -#include - -typedef struct adu_key { - uint32_t node_id; - uint32_t service_id; -} adu_key_t; - -typedef struct adu_node { - adu_key_t key; - uint32_t src_node_id; - uint32_t src_service_id; - Object adu; - struct adu_node *next; -} adu_node_t; - -bool adu_registry_contains(uint32_t node_id, uint32_t service_id); -int adu_registry_add(Object adu, uint32_t dest_node_id, uint32_t dest_service_id, - uint32_t src_node_id, uint32_t src_service_id); -adu_node_t *adu_registry_get(uint32_t node_id, uint32_t service_id); -Object adu_registry_remove(uint32_t node_id, uint32_t service_id); - -#endif diff --git a/daemon/bp_genl.c b/daemon/bp_genl.c index de610aa..fb6b4d8 100644 --- a/daemon/bp_genl.c +++ b/daemon/bp_genl.c @@ -1,10 +1,15 @@ +#include +#include #include #include #include +#include #include #include +#include #include #include +#include #include "../include/bp_socket.h" #include "bp_genl.h" @@ -12,7 +17,7 @@ #include "daemon.h" #include "log.h" -struct nl_sock *genl_bp_sock_init(Daemon *daemon) { +struct nl_sock *bp_genl_socket_create(Daemon *daemon) { struct nl_sock *sk; int family_id; int err; @@ -25,7 +30,7 @@ struct nl_sock *genl_bp_sock_init(Daemon *daemon) { nl_socket_set_local_port(sk, daemon->nl_pid); nl_socket_disable_seq_check(sk); - nl_socket_modify_cb(sk, NL_CB_VALID, NL_CB_CUSTOM, genl_bp_sock_recvmsg_cb, daemon); + nl_socket_modify_cb(sk, NL_CB_VALID, NL_CB_CUSTOM, bp_genl_message_handler, daemon); nl_socket_set_peer_port(sk, 0); // Send to kernel err = genl_connect(sk); @@ -47,7 +52,7 @@ struct nl_sock *genl_bp_sock_init(Daemon *daemon) { return sk; } -void genl_bp_sock_close(Daemon *daemon) { +void bp_genl_socket_destroy(Daemon *daemon) { if (!daemon->genl_bp_sock) return; nl_socket_free(daemon->genl_bp_sock); @@ -57,7 +62,7 @@ void genl_bp_sock_close(Daemon *daemon) { log_info("Generic Netlink socket closed"); } -int genl_bp_sock_recvmsg_cb(struct nl_msg *msg, void *arg) { +int bp_genl_message_handler(struct nl_msg *msg, void *arg) { Daemon *daemon = (Daemon *)arg; struct nlmsghdr *nlh = nlmsg_hdr(msg); struct genlmsghdr *genlhdr = nlmsg_data(nlh); @@ -74,14 +79,10 @@ int genl_bp_sock_recvmsg_cb(struct nl_msg *msg, void *arg) { switch (genlhdr->cmd) { case BP_GENL_CMD_SEND_BUNDLE: return handle_send_bundle(daemon, attrs); - case BP_GENL_CMD_REQUEST_BUNDLE: - return handle_request_bundle(daemon, attrs); case BP_GENL_CMD_OPEN_ENDPOINT: return handle_open_endpoint(daemon, attrs); case BP_GENL_CMD_CLOSE_ENDPOINT: return handle_close_endpoint(daemon, attrs); - // case BP_GENL_CMD_DELIVER_BUNDLE: - // return handle_deliver_bundle_reply(daemon, attrs); case BP_GENL_CMD_DESTROY_BUNDLE: return handle_destroy_bundle(daemon, attrs); default: @@ -89,3 +90,100 @@ int genl_bp_sock_recvmsg_cb(struct nl_msg *msg, void *arg) { return NL_SKIP; } } + +int bp_genl_enqueue_bundle(int netlink_family, struct nl_sock *netlink_sock, + pthread_mutex_t *netlink_mutex, void *payload, size_t payload_size, + uint32_t src_node_id, uint32_t src_service_id, uint32_t dest_node_id, + uint32_t dest_service_id, uint64_t adu) { + if (payload_size > (size_t)INT_MAX) { + log_error("[ipn:%u.%u] bp_genl_enqueue_bundle: payload too large", dest_node_id, + dest_service_id); + return -EMSGSIZE; + } + struct nl_msg *msg = NULL; + void *hdr; + int ret; + + msg = nlmsg_alloc(); + if (!msg) { + log_error("[ipn:%u.%u] bp_genl_enqueue_bundle: failed to allocate Netlink msg", + dest_node_id, dest_service_id); + ret = -ENOMEM; + goto out; + } + + hdr = genlmsg_put(msg, NL_AUTO_PORT, NL_AUTO_SEQ, netlink_family, 0, 0, + BP_GENL_CMD_ENQUEUE_BUNDLE, BP_GENL_VERSION); + if (!hdr) { + log_error("[ipn:%u.%u] bp_genl_enqueue_bundle: failed to create Netlink header", + dest_node_id, dest_service_id); + ret = -EMSGSIZE; + goto err_free_msg; + } + + if (nla_put_u32(msg, BP_GENL_A_DEST_NODE_ID, dest_node_id) < 0) { + log_error("[ipn:%u.%u] bp_genl_enqueue_bundle: failed to add NODE_ID attribute", + dest_node_id, dest_service_id); + ret = -EMSGSIZE; + goto err_free_msg; + } + + if (nla_put_u32(msg, BP_GENL_A_DEST_SERVICE_ID, dest_service_id) < 0) { + log_error("[ipn:%u.%u] bp_genl_enqueue_bundle: failed to add SERVICE_ID attribute", + dest_node_id, dest_service_id); + ret = -EMSGSIZE; + goto err_free_msg; + } + + if (nla_put_u32(msg, BP_GENL_A_SRC_NODE_ID, src_node_id) < 0) { + log_error("[ipn:%u.%u] bp_genl_enqueue_bundle: failed to add SRC_NODE_ID attribute", + dest_node_id, dest_service_id); + ret = -EMSGSIZE; + goto err_free_msg; + } + + if (nla_put_u32(msg, BP_GENL_A_SRC_SERVICE_ID, src_service_id) < 0) { + log_error("[ipn:%u.%u] bp_genl_enqueue_bundle: failed to add SRC_SERVICE_ID attribute", + dest_node_id, dest_service_id); + ret = -EMSGSIZE; + goto err_free_msg; + } + + if (nla_put(msg, BP_GENL_A_PAYLOAD, (int)payload_size, payload) < 0) { + log_error("[ipn:%u.%u] bp_genl_enqueue_bundle: failed to add PAYLOAD attribute", + dest_node_id, dest_service_id); + ret = -EMSGSIZE; + goto err_free_msg; + } + + if (nla_put_u64(msg, BP_GENL_A_ADU, adu) < 0) { + log_error("[ipn:%u.%u] bp_genl_enqueue_bundle: failed to add ADU attribute", dest_node_id, + dest_service_id); + ret = -EMSGSIZE; + goto err_free_msg; + } + + if (pthread_mutex_lock(netlink_mutex) != 0) { + log_error("[ipn:%u.%u] bp_genl_enqueue_bundle: mutex lock failed", dest_node_id, + dest_service_id); + ret = -EAGAIN; + goto err_free_msg; + } + + ret = nl_send_sync(netlink_sock, msg); + pthread_mutex_unlock(netlink_mutex); + + if (ret < 0) { + log_error("[ipn:%u.%u] bp_genl_enqueue_bundle: enqueue bundle not sent", dest_node_id, + dest_service_id); + ret = -errno; + goto out; + } + + return 0; + +err_free_msg: + nlmsg_free(msg); +out: + return ret; +} diff --git a/daemon/bp_genl.h b/daemon/bp_genl.h index e8e9e42..f796795 100644 --- a/daemon/bp_genl.h +++ b/daemon/bp_genl.h @@ -3,9 +3,17 @@ #include "daemon.h" #include +#include +#include -struct nl_sock *genl_bp_sock_init(Daemon *daemon); -void genl_bp_sock_close(Daemon *daemon); -int genl_bp_sock_recvmsg_cb(struct nl_msg *msg, void *arg); +struct nl_sock *bp_genl_socket_create(Daemon *daemon); +void bp_genl_socket_destroy(Daemon *daemon); +int bp_genl_message_handler(struct nl_msg *msg, void *arg); + +// Netlink send functions +int bp_genl_enqueue_bundle(int netlink_family, struct nl_sock *netlink_sock, pthread_mutex_t *mutex, + void *payload, size_t payload_size, uint32_t src_node_id, + uint32_t src_service_id, uint32_t dest_node_id, uint32_t dest_service_id, + uint64_t adu); #endif diff --git a/daemon/bp_genl_handlers.c b/daemon/bp_genl_handlers.c index 80cd320..cf96372 100644 --- a/daemon/bp_genl_handlers.c +++ b/daemon/bp_genl_handlers.c @@ -8,17 +8,17 @@ #include #include "../include/bp_socket.h" -#include "adu_registry.h" #include "bp.h" #include "bp_genl_handlers.h" #include "daemon.h" +#include "endpoint_registry.h" #include "ion.h" #include "log.h" #include int handle_open_endpoint(Daemon *daemon, struct nlattr **attrs) { u_int32_t node_id, service_id; - (void)daemon; + int ret; if (!attrs[BP_GENL_A_DEST_NODE_ID] || !attrs[BP_GENL_A_DEST_SERVICE_ID]) { log_error("handle_open_endpoint: missing attribute(s)"); @@ -27,8 +27,8 @@ int handle_open_endpoint(Daemon *daemon, struct nlattr **attrs) { node_id = nla_get_u32(attrs[BP_GENL_A_DEST_NODE_ID]); service_id = nla_get_u32(attrs[BP_GENL_A_DEST_SERVICE_ID]); - log_info("[ipn:%u.%u] OPEN_ENDPOINT: opening endpoint", node_id, service_id); - int ret = ion_open_endpoint(node_id, service_id); + ret = ion_open_endpoint(node_id, service_id, daemon->genl_bp_sock, &daemon->netlink_mutex, + daemon->genl_bp_family_id); if (ret == 0) { log_info("[ipn:%u.%u] OPEN_ENDPOINT: endpoint opened successfully", node_id, service_id); } else { @@ -49,14 +49,14 @@ int handle_close_endpoint(Daemon *daemon, struct nlattr **attrs) { node_id = nla_get_u32(attrs[BP_GENL_A_DEST_NODE_ID]); service_id = nla_get_u32(attrs[BP_GENL_A_DEST_SERVICE_ID]); - log_info("[ipn:%u.%u] CLOSE_ENDPOINT: closing endpoint", node_id, service_id); int ret = ion_close_endpoint(node_id, service_id); if (ret == 0) { - log_info("[ipn:%u.%u] CLOSE_ENDPOINT: endpoint closed successfully", node_id, service_id); + log_info("[ipn:%u.%u] CLOSE_ENDPOINT: closing endpoint", node_id, service_id); } else { log_error("[ipn:%u.%u] CLOSE_ENDPOINT: failed to close endpoint (error %d)", node_id, service_id, ret); } + return ret; } @@ -67,6 +67,10 @@ int handle_send_bundle(Daemon *daemon, struct nlattr **attrs) { u_int32_t dest_node_id, dest_service_id, src_node_id, src_service_id; char dest_eid[64]; int written; + pthread_t thread; + struct ion_send_args *args; + struct endpoint_ctx *ctx; + void *payload_copy; if (!attrs[BP_GENL_A_PAYLOAD] || !attrs[BP_GENL_A_DEST_NODE_ID] || !attrs[BP_GENL_A_DEST_SERVICE_ID] || !attrs[BP_GENL_A_SRC_NODE_ID] || @@ -91,108 +95,65 @@ int handle_send_bundle(Daemon *daemon, struct nlattr **attrs) { return -EINVAL; } - // Enqueue to send thread using source endpoint SAP - // Launch async send thread - pthread_t send_thread; - struct ion_send_args *send_args = malloc(sizeof(struct ion_send_args)); - if (!send_args) return -ENOMEM; - send_args->src_node_id = src_node_id; - send_args->src_service_id = src_service_id; - send_args->dest_eid = strndup(dest_eid, sizeof(dest_eid)); - send_args->payload = malloc(payload_size); - send_args->netlink_sock = daemon->genl_bp_sock; - send_args->netlink_family = daemon->genl_bp_family_id; - if (!send_args->dest_eid || !send_args->payload) { - free(send_args->dest_eid); - free(send_args->payload); - free(send_args); - return -ENOMEM; - } - memcpy(send_args->payload, payload, payload_size); - send_args->payload_size = payload_size; - if (pthread_create(&send_thread, NULL, ion_send_thread, send_args) != 0) { - log_error("[ipn:%u.%u] handle_send_bundle: failed to create send thread", src_node_id, - src_service_id); - free(send_args->dest_eid); - free(send_args->payload); - free(send_args); - return -errno; - } - pthread_detach(send_thread); - - return 0; -} - -int handle_request_bundle(Daemon *daemon, struct nlattr **attrs) { - pthread_t thread; - struct ion_recv_args *args; - u_int32_t node_id, service_id; - - if (!attrs[BP_GENL_A_DEST_SERVICE_ID] || !attrs[BP_GENL_A_DEST_NODE_ID]) { - log_error("handle_request_bundle: missing attribute(s) in REQUEST_BUNDLE " - "command (service " - "ID, node ID)"); - return -EINVAL; + ctx = endpoint_registry_get(src_node_id, src_service_id); + if (!ctx) { + log_error("[ipn:%u.%u] handle_send_bundle: no endpoint for ipn:%u.%u", src_node_id, + src_service_id, src_node_id, src_service_id); + return -ENODEV; } - node_id = nla_get_u32(attrs[BP_GENL_A_DEST_NODE_ID]); - service_id = nla_get_u32(attrs[BP_GENL_A_DEST_SERVICE_ID]); - - args = malloc(sizeof(struct ion_recv_args)); - if (!args) { - log_error("handle_request_bundle: failed to allocate thread args", node_id, service_id); + payload_copy = malloc(payload_size); + if (!payload_copy) { + log_error("[ipn:%u.%u] handle_send_bundle: failed to allocate payload", src_node_id, + src_service_id); return -ENOMEM; } - args->node_id = node_id; - args->service_id = service_id; + memcpy(payload_copy, payload, payload_size); + + // Enqueue to send thread using source endpoint SAP + // Launch async send thread + args = malloc(sizeof(struct ion_send_args)); + if (!args) return -ENOMEM; + args->node_id = src_node_id; + args->service_id = src_service_id; + args->dest_eid = strndup(dest_eid, sizeof(dest_eid)); args->netlink_sock = daemon->genl_bp_sock; + args->netlink_mutex = &daemon->netlink_mutex; args->netlink_family = daemon->genl_bp_family_id; + args->payload = payload_copy; + args->payload_size = payload_size; - log_info("[ipn:%u.%u] REQUEST_BUNDLE: bundle request initiated", node_id, service_id); - if (pthread_create(&thread, NULL, ion_receive_thread, args) != 0) { - log_error("[ipn:%u.%u] handle_request_bundle: failed to create receive thread: %s", node_id, - service_id, strerror(errno)); + if (pthread_create(&thread, NULL, ion_send_thread, args) != 0) { + log_error("[ipn:%u.%u] handle_send_bundle: failed to create send thread", src_node_id, + src_service_id); + free(args->dest_eid); + free(args->payload); free(args); return -errno; } - pthread_detach(thread); - return 0; } int handle_destroy_bundle(Daemon *daemon, struct nlattr **attrs) { (void)daemon; - u_int32_t node_id, service_id; - Object adu; + uint64_t adu; int ret = 0; - if (!attrs[BP_GENL_A_DEST_NODE_ID] || !attrs[BP_GENL_A_DEST_SERVICE_ID]) { - log_error("handle_destroy_bundle: missing attribute(s) in DESTROY_BUNDLE " - "command (node ID, " - "service ID)"); - ret = -EINVAL; - goto out; + if (!attrs[BP_GENL_A_ADU]) { + log_error("handle_destroy_bundle: missing ADU attribute in DESTROY_BUNDLE command"); + return -EINVAL; } - node_id = nla_get_u32(attrs[BP_GENL_A_DEST_NODE_ID]); - service_id = nla_get_u32(attrs[BP_GENL_A_DEST_SERVICE_ID]); + adu = nla_get_u64(attrs[BP_GENL_A_ADU]); - adu = adu_registry_remove(node_id, service_id); - if (adu == 0) { - log_error("[ipn:%u.%u] handle_destroy_bundle: failed to destroy bundle: %s", node_id, - service_id, strerror(-ret)); - goto out; - } - ret = ion_destroy_bundle(adu); + ret = ion_destroy_bundle((Object)adu); if (ret < 0) { - log_error("[ipn:%u.%u] handle_destroy_bundle: ion_destroy_bundle failed with error %d", - node_id, service_id, ret); - goto out; + log_error("handle_destroy_bundle: ion_destroy_bundle failed with error %d", ret); + return ret; } - log_info("[ipn:%u.%u] DESTROY_BUNDLE: bundle destroy from ION", node_id, service_id); + log_info("DESTROY_BUNDLE: bundle consumed by a socket (adu: %llu)", (unsigned long long)adu); -out: - return ret; + return 0; } \ No newline at end of file diff --git a/daemon/bp_genl_handlers.h b/daemon/bp_genl_handlers.h index 08d3d9b..0645643 100644 --- a/daemon/bp_genl_handlers.h +++ b/daemon/bp_genl_handlers.h @@ -11,12 +11,9 @@ struct thread_args { u_int32_t service_id; }; -int handle_send_bundle(Daemon *daemon, struct nlattr **attrs); -int handle_request_bundle(Daemon *daemon, struct nlattr **attrs); int handle_open_endpoint(Daemon *daemon, struct nlattr **attrs); int handle_close_endpoint(Daemon *daemon, struct nlattr **attrs); +int handle_send_bundle(Daemon *daemon, struct nlattr **attrs); int handle_destroy_bundle(Daemon *daemon, struct nlattr **attrs); -void *handle_recv_thread(struct thread_args *arg); - #endif \ No newline at end of file diff --git a/daemon/daemon.c b/daemon/daemon.c index fe8cda1..f92edda 100644 --- a/daemon/daemon.c +++ b/daemon/daemon.c @@ -1,5 +1,6 @@ #include "daemon.h" #include "bp_genl.h" +#include "endpoint_registry.h" #include "ion.h" #include "log.h" #include @@ -72,7 +73,7 @@ int daemon_run(Daemon *self) { return ret; } - self->genl_bp_sock = genl_bp_sock_init(self); + self->genl_bp_sock = bp_genl_socket_create(self); if (!self->genl_bp_sock) { log_error("Failed to initialize Generic Netlink socket"); daemon_free(self); @@ -102,7 +103,6 @@ int daemon_run(Daemon *self) { return ret; } - log_info("Attempting to attach to ION..."); if (bp_attach() < 0) { log_error("Can't attach to BP"); daemon_free(self); @@ -124,13 +124,15 @@ int daemon_run(Daemon *self) { void daemon_free(Daemon *self) { if (!self) return; - genl_bp_sock_close(self); + bp_genl_socket_destroy(self); if (self->event_on_nl_sock) event_free(self->event_on_nl_sock); if (self->event_on_sigpipe) event_free(self->event_on_sigpipe); if (self->event_on_sigint) event_free(self->event_on_sigint); if (self->base) event_base_free(self->base); + pthread_mutex_destroy(&self->netlink_mutex); + #if LIBEVENT_VERSION_NUMBER >= 0x02010000 libevent_global_shutdown(); #endif diff --git a/daemon/daemon.h b/daemon/daemon.h index f8162b9..fbb7540 100644 --- a/daemon/daemon.h +++ b/daemon/daemon.h @@ -4,9 +4,11 @@ #include "bp.h" #include #include +#include typedef struct Daemon { struct nl_sock *genl_bp_sock; + pthread_mutex_t netlink_mutex; const char *genl_bp_family_name; int genl_bp_family_id; unsigned int nl_pid; diff --git a/daemon/endpoint_registry.c b/daemon/endpoint_registry.c new file mode 100644 index 0000000..c1e3c85 --- /dev/null +++ b/daemon/endpoint_registry.c @@ -0,0 +1,109 @@ +#include "endpoint_registry.h" +#include "log.h" +#include +#include + +static pthread_mutex_t endpoint_registry_mutex = PTHREAD_MUTEX_INITIALIZER; +static struct endpoint_ctx *endpoint_head = NULL; + +int endpoint_registry_add(struct endpoint_ctx *ctx) { + if (!ctx) { + log_error("endpoint_registry_add: invalid context pointer"); + return -1; + } + + if (pthread_mutex_lock(&endpoint_registry_mutex) != 0) { + log_error("endpoint_registry_add: failed to lock registry mutex"); + return -1; + } + + struct endpoint_ctx *current = endpoint_head; + while (current != NULL) { + if (current->node_id == ctx->node_id && current->service_id == ctx->service_id) { + pthread_mutex_unlock(&endpoint_registry_mutex); + log_error("endpoint_registry_add: endpoint ipn:%u.%u already exists", ctx->node_id, + ctx->service_id); + return -1; + } + current = current->next; + } + + ctx->next = endpoint_head; + endpoint_head = ctx; + pthread_mutex_unlock(&endpoint_registry_mutex); + + return 0; +} + +struct endpoint_ctx *endpoint_registry_get(uint32_t node_id, uint32_t service_id) { + struct endpoint_ctx *current; + + if (pthread_mutex_lock(&endpoint_registry_mutex) != 0) { + log_error("endpoint_registry_get: failed to lock registry mutex"); + return NULL; + } + + current = endpoint_head; + while (current != NULL) { + if (current->node_id == node_id && current->service_id == service_id) { + pthread_mutex_unlock(&endpoint_registry_mutex); + return current; + } + current = current->next; + } + + pthread_mutex_unlock(&endpoint_registry_mutex); + return NULL; +} + +int endpoint_registry_remove(uint32_t node_id, uint32_t service_id) { + struct endpoint_ctx *current = endpoint_head; + struct endpoint_ctx *prev = NULL; + + if (pthread_mutex_lock(&endpoint_registry_mutex) != 0) { + log_error("endpoint_registry_remove: failed to lock registry mutex"); + return -1; + } + + while (current != NULL) { + if (current->node_id == node_id && current->service_id == service_id) { + if (prev == NULL) { + endpoint_head = current->next; + } else { + prev->next = current->next; + } + + free(current); + pthread_mutex_unlock(&endpoint_registry_mutex); + + return 0; + } + prev = current; + current = current->next; + } + + pthread_mutex_unlock(&endpoint_registry_mutex); + log_error("endpoint_registry_remove: endpoint ipn:%u.%u not found", node_id, service_id); + return -1; +} + +bool endpoint_registry_exists(uint32_t node_id, uint32_t service_id) { + struct endpoint_ctx *current; + + if (pthread_mutex_lock(&endpoint_registry_mutex) != 0) { + log_error("endpoint_registry_exists: failed to lock registry mutex"); + return false; + } + + current = endpoint_head; + while (current != NULL) { + if (current->node_id == node_id && current->service_id == service_id) { + pthread_mutex_unlock(&endpoint_registry_mutex); + return true; + } + current = current->next; + } + + pthread_mutex_unlock(&endpoint_registry_mutex); + return false; +} diff --git a/daemon/endpoint_registry.h b/daemon/endpoint_registry.h new file mode 100644 index 0000000..1133dba --- /dev/null +++ b/daemon/endpoint_registry.h @@ -0,0 +1,25 @@ +#ifndef ENDPOINT_REGISTRY_H +#define ENDPOINT_REGISTRY_H + +#include +#include +#include +#include + +struct endpoint_ctx { + uint32_t node_id; + uint32_t service_id; + void *sap; + + _Atomic int running; + pthread_t recv_thread; + + struct endpoint_ctx *next; +}; + +int endpoint_registry_add(struct endpoint_ctx *ctx); +struct endpoint_ctx *endpoint_registry_get(uint32_t node_id, uint32_t service_id); +int endpoint_registry_remove(uint32_t node_id, uint32_t service_id); +bool endpoint_registry_exists(uint32_t node_id, uint32_t service_id); + +#endif diff --git a/daemon/ion.c b/daemon/ion.c index 4257076..0beb350 100644 --- a/daemon/ion.c +++ b/daemon/ion.c @@ -1,8 +1,7 @@ #include "ion.h" -#include "adu_registry.h" +#include "bp_genl.h" +#include "endpoint_registry.h" #include "log.h" -#include "nl_utils.h" -#include "sap_registry.h" #include "sdr.h" #include #include @@ -10,6 +9,7 @@ #include #include #include +#include static pthread_mutex_t sdrmutex = PTHREAD_MUTEX_INITIALIZER; Sdr sdr; @@ -20,17 +20,22 @@ static int make_eid(char *buf, size_t bufsize, u_int32_t node_id, u_int32_t serv return 0; } -int ion_open_endpoint(u_int32_t node_id, u_int32_t service_id) { +int ion_open_endpoint(u_int32_t node_id, u_int32_t service_id, struct nl_sock *netlink_sock, + pthread_mutex_t *netlink_mutex, int netlink_family) { + struct ion_recv_args *args; + struct endpoint_ctx *ctx; char eid[64]; BpSAP sap; + int err; if (make_eid(eid, sizeof(eid), node_id, service_id) < 0) { log_error("ion_open_endpoint: EID too long"); return -EINVAL; } - if (sap_registry_contains(node_id, service_id)) { - return 0; // already open + if (endpoint_registry_exists(node_id, service_id)) { + log_error("ion_open_endpoint: endpoint ipn:%u.%u already exists", node_id, service_id); + return -EADDRINUSE; } if (bp_open(eid, &sap) < 0) { @@ -38,28 +43,63 @@ int ion_open_endpoint(u_int32_t node_id, u_int32_t service_id) { return -EIO; } - if (sap_registry_add(node_id, service_id, sap) < 0) { + ctx = calloc(1, sizeof(struct endpoint_ctx)); + if (!ctx) { + log_error("ion_open_endpoint: failed to allocate endpoint context"); + bp_close(sap); + return -ENOMEM; + } + ctx->node_id = node_id; + ctx->service_id = service_id; + ctx->sap = sap; + atomic_init(&ctx->running, 1); + + err = endpoint_registry_add(ctx); + if (err) { + __atomic_store_n(&ctx->running, 0, __ATOMIC_RELAXED); + bp_close(sap); + free(ctx); + return -ENOMEM; + } + + args = calloc(1, sizeof(struct ion_recv_args)); + if (!args) { + log_error("ion_open_endpoint: failed to allocate thread args"); bp_close(sap); + free(ctx); return -ENOMEM; } + args->netlink_sock = netlink_sock; + args->netlink_mutex = netlink_mutex; + args->netlink_family = netlink_family; + args->ctx = ctx; + + if (pthread_create(&ctx->recv_thread, NULL, ion_receive_thread, args) != 0) { + log_error("ion_open_endpoint: failed to create receive thread: %s", strerror(errno)); + bp_close(sap); + free(args); + free(ctx); + return -errno; + } + return 0; } int ion_close_endpoint(u_int32_t node_id, u_int32_t service_id) { - BpSAP sap = sap_registry_get(node_id, service_id); - if (sap) { - if (sap_registry_has_active_receive(node_id, service_id)) { - log_info("[ipn:%u.%u] CLOSE_ENDPOINT: interrupting active reception", node_id, - service_id); - bp_interrupt(sap); - } else { - log_info("[ipn:%u.%u] CLOSE_ENDPOINT: no active reception, closing directly", node_id, - service_id); - } - - bp_close(sap); + struct endpoint_ctx *ctx = endpoint_registry_get(node_id, service_id); + if (!ctx) { + log_error("ion_close_endpoint: endpoint ipn:%u.%u not found", node_id, service_id); + return -ENOENT; } - sap_registry_remove(node_id, service_id); + + __atomic_store_n(&ctx->running, 0, __ATOMIC_RELAXED); + + bp_interrupt(ctx->sap); + pthread_join(ctx->recv_thread, NULL); + bp_close(ctx->sap); + + endpoint_registry_remove(node_id, service_id); + return 0; } @@ -68,9 +108,9 @@ void *ion_send_thread(void *arg) { const char *dest_eid = args->dest_eid; const void *payload = args->payload; size_t payload_size = args->payload_size; - u_int32_t src_node_id = args->src_node_id; - u_int32_t src_service_id = args->src_service_id; - BpSAP sap = NULL; + u_int32_t node_id = args->node_id; + u_int32_t service_id = args->service_id; + struct endpoint_ctx *ctx; Object sdr_buffer = 0; Object adu = 0; int ret = 0; @@ -78,27 +118,33 @@ void *ion_send_thread(void *arg) { if (!dest_eid || !payload || payload_size == 0) { log_error("ion_send_thread: invalid parameters"); ret = -EINVAL; - goto cleanup_and_notify; + goto cleanup; } - sap = sap_registry_get(src_node_id, src_service_id); - if (!sap) { - log_error("ion_send_thread: no SAP for ipn:%u.%u", src_node_id, src_service_id); + ctx = endpoint_registry_get(node_id, service_id); + if (!ctx) { + log_error("ion_send_thread: no endpoint for ipn:%u.%u", node_id, service_id); ret = -ENODEV; - goto cleanup_and_notify; + goto cleanup; + } + + if (!ctx->sap) { + log_error("ion_send_thread: invalid SAP for ipn:%u.%u", node_id, service_id); + ret = -EINVAL; + goto cleanup; } if (pthread_mutex_lock(&sdrmutex) != 0) { log_error("ion_send_thread: sdr mutex lock failed"); ret = -EAGAIN; - goto cleanup_and_notify; + goto cleanup; } if (sdr_begin_xn(sdr) == 0) { pthread_mutex_unlock(&sdrmutex); log_error("ion_send_thread: sdr_begin_xn failed"); ret = -EIO; - goto cleanup_and_notify; + goto cleanup; } sdr_buffer = sdr_malloc(sdr, payload_size); @@ -106,7 +152,7 @@ void *ion_send_thread(void *arg) { pthread_mutex_unlock(&sdrmutex); log_error("ion_send_thread: no space for payload"); ret = -ENOSPC; - goto cleanup_and_notify; + goto cleanup; } sdr_write(sdr, sdr_buffer, (char *)payload, payload_size); @@ -116,55 +162,34 @@ void *ion_send_thread(void *arg) { pthread_mutex_unlock(&sdrmutex); log_error("ion_send_thread: zco_create failed"); ret = -ENOMEM; - goto cleanup_and_notify; + goto cleanup; } if (sdr_end_xn(sdr) < 0) { pthread_mutex_unlock(&sdrmutex); log_error("ion_send_thread: sdr_end_xn failed"); ret = -EIO; - goto cleanup_and_notify; + goto cleanup; } pthread_mutex_unlock(&sdrmutex); - if (bp_send(sap, (char *)dest_eid, NULL, 86400, BP_STD_PRIORITY, NoCustodyRequested, 0, 0, NULL, - adu, NULL) <= 0) { + if (bp_send(ctx->sap, (char *)dest_eid, NULL, 86400, BP_STD_PRIORITY, NoCustodyRequested, 0, 0, + NULL, adu, NULL) <= 0) { log_error("ion_send_thread: bp_send failed"); ret = -EIO; - goto cleanup_and_notify; + goto cleanup; } - log_info("[ipn:%u.%u] SEND_BUNDLE: bundle sent to EID %s, size %zu (bytes)", args->src_node_id, - args->src_service_id, args->dest_eid, args->payload_size); - - if (nl_send_bundle_confirmation(args->netlink_family, args->netlink_sock, args->src_node_id, - args->src_service_id) < 0) { - log_error("[ipn:%u.%u] SEND_BUNDLE: failed to send confirmation to kernel", - args->src_node_id, args->src_service_id); - } else { - log_info("[ipn:%u.%u] SEND_BUNDLE: confirmation sent to kernel", args->src_node_id, - args->src_service_id); - } + log_info("[ipn:%u.%u] SEND_BUNDLE: bundle sent to EID %s, size %zu (bytes)", node_id, + service_id, args->dest_eid, args->payload_size); free(args->dest_eid); free(args->payload); free(args); return (void *)(intptr_t)0; -cleanup_and_notify: { - int nl_ret = nl_send_bundle_failure(args->netlink_family, args->netlink_sock, args->src_node_id, - args->src_service_id, ret); - if (nl_ret < 0) { - log_error( - "[ipn:%u.%u] SEND_BUNDLE: failed to send failure notification to kernel (err: %d)", - args->src_node_id, args->src_service_id, nl_ret); - } else { - log_info("[ipn:%u.%u] SEND_BUNDLE: failure notification sent to kernel", args->src_node_id, - args->src_service_id); - } -} - +cleanup: free(args->dest_eid); free(args->payload); free(args); @@ -202,20 +227,19 @@ int ion_destroy_bundle(Object adu) { sdr_end_xn(sdr); pthread_mutex_unlock(&sdrmutex); + return 0; } void *ion_receive_thread(void *arg) { struct ion_recv_args *args = arg; - const u_int32_t dest_node_id = args->node_id; - const u_int32_t dest_service_id = args->service_id; - - sap_registry_mark_receive_active(dest_node_id, dest_service_id); + struct endpoint_ctx *ctx = args->ctx; - BpSAP sap; + const u_int32_t dest_node_id = ctx->node_id; + const u_int32_t dest_service_id = ctx->service_id; + BpSAP sap = ctx->sap; BpDelivery dlv; ZcoReader reader; - adu_node_t *adu_ref; u_int32_t own_node_id; void *payload = NULL; size_t payload_size = 0; @@ -227,108 +251,59 @@ void *ion_receive_thread(void *arg) { if (own > (uvast)0xFFFFFFFFu) { log_error("ion_receive_thread: own node ID out of 32-bit range: %llu", (unsigned long long)own); - goto cancel; + goto out; } own_node_id = (u_int32_t)own; } if (dest_node_id != own_node_id) { log_error("ion_receive_thread: node ID mismatch. Expected %u, got %u", own_node_id, dest_node_id); - goto cancel; + goto out; } - adu_ref = adu_registry_get(dest_node_id, dest_service_id); - if (adu_ref != NULL) { - if (pthread_mutex_lock(&sdrmutex) != 0) { - log_error("ion_receive_thread: Failed to lock SDR mutex."); - goto cancel; - } - if (sdr_begin_xn(sdr) == 0) { - log_error("ion_receive_thread: sdr_begin_xn failed."); - pthread_mutex_unlock(&sdrmutex); - goto cancel; - } - payload_size = (size_t)zco_source_data_length(sdr, adu_ref->adu); - payload = malloc(payload_size); - if (!payload) { - log_error("ion_receive_thread: Failed to allocate memory for payload."); - sdr_end_xn(sdr); - pthread_mutex_unlock(&sdrmutex); - goto cancel; - } - zco_start_receiving(adu_ref->adu, &reader); - if (zco_receive_source(sdr, &reader, (vast)payload_size, payload) < 0) { - log_error("ion_receive_thread: zco_receive_source failed."); - free(payload); - payload = NULL; - sdr_end_xn(sdr); - pthread_mutex_unlock(&sdrmutex); - goto cancel; + if (!sap) { + log_error("ion_receive_thread: invalid SAP for ipn:%u.%u", dest_node_id, dest_service_id); + goto out; + } + + while (__atomic_load_n(&ctx->running, __ATOMIC_RELAXED)) { + payload = NULL; + payload_size = 0; + src_node_id = 0; + src_service_id = 0; + + if (bp_receive(sap, &dlv, BP_BLOCKING) < 0) { + log_error("ion_receive_thread: bundle reception failed."); + goto out; } - src_node_id = adu_ref->src_node_id; - src_service_id = adu_ref->src_service_id; - sdr_end_xn(sdr); - pthread_mutex_unlock(&sdrmutex); - } else { - sap = sap_registry_get(dest_node_id, dest_service_id); - if (!sap) { - log_error("ion_receive_thread: no SAP for ipn:%u.%u", dest_node_id, dest_service_id); - goto cancel; + + if (dlv.result == BpReceptionInterrupted || dlv.adu == 0) { + bp_release_delivery(&dlv, 0); + continue; } - while (1) { - if (bp_receive(sap, &dlv, BP_BLOCKING) < 0) { - log_error("ion_receive_thread: bundle reception failed."); - goto cancel; - } - - if (dlv.result == BpReceptionInterrupted) { - if (!sap_registry_contains(dest_node_id, dest_service_id)) { - log_info("ion_receive_thread: endpoint closing, stopping"); - bp_release_delivery(&dlv, 0); - goto cancel; - } - log_info("ion_receive_thread: reception interrupted, continuing to wait"); - bp_release_delivery(&dlv, 0); - continue; - } - - if (dlv.adu == 0) { - log_info("ion_receive_thread: no ADU, continuing to wait"); - bp_release_delivery(&dlv, 0); - continue; - } - - if (dlv.result == BpEndpointStopped) { - log_info("ion_receive_thread: endpoint stopped"); - bp_release_delivery(&dlv, 0); - goto cancel; - } - - break; + if (dlv.result == BpEndpointStopped) { + bp_release_delivery(&dlv, 0); + goto out; } + if (sscanf(dlv.bundleSourceEid, "ipn:%u.%u", &src_node_id, &src_service_id) != 2) { log_error("ion_receive_thread: failed to parse bundleSourceEid: %s", dlv.bundleSourceEid); bp_release_delivery(&dlv, 0); - goto cancel; - } - if (adu_registry_add(dlv.adu, dest_node_id, dest_service_id, src_node_id, src_service_id) < - 0) { - log_error("ion_receive_thread: failed to add bundle reference."); - bp_release_delivery(&dlv, 0); - goto cancel; + continue; } + if (pthread_mutex_lock(&sdrmutex) != 0) { log_error("ion_receive_thread: Failed to lock SDR mutex."); bp_release_delivery(&dlv, 0); - goto cancel; + continue; } if (sdr_begin_xn(sdr) == 0) { log_error("ion_receive_thread: sdr_begin_xn failed."); pthread_mutex_unlock(&sdrmutex); bp_release_delivery(&dlv, 0); - goto cancel; + continue; } payload_size = (size_t)zco_source_data_length(sdr, dlv.adu); payload = malloc(payload_size); @@ -337,7 +312,7 @@ void *ion_receive_thread(void *arg) { sdr_end_xn(sdr); pthread_mutex_unlock(&sdrmutex); bp_release_delivery(&dlv, 0); - goto cancel; + continue; } zco_start_receiving(dlv.adu, &reader); if (zco_receive_source(sdr, &reader, (vast)payload_size, payload) < 0) { @@ -347,47 +322,34 @@ void *ion_receive_thread(void *arg) { sdr_end_xn(sdr); pthread_mutex_unlock(&sdrmutex); bp_release_delivery(&dlv, 0); - goto cancel; + continue; } sdr_end_xn(sdr); pthread_mutex_unlock(&sdrmutex); bp_release_delivery(&dlv, 0); - } - - if (!payload) { - log_info("ion_receive_thread: no payload received for node_id=%u service_id=%u", - dest_node_id, dest_service_id); - goto cancel; - } - err = nl_send_deliver_bundle(args->netlink_family, args->netlink_sock, payload, payload_size, - src_node_id, src_service_id, dest_node_id, dest_service_id); - if (err < 0) { - log_error("[ipn:%u.%u] nl_send_deliver_bundle: failed with error %d", dest_node_id, - dest_service_id, err); - } else { - log_info("[ipn:%u.%u] DELIVER_BUNDLE: bundle sent to kernel", dest_node_id, - dest_service_id); - } - sap_registry_mark_receive_inactive(dest_node_id, dest_service_id); + if (!payload) { + log_info("ion_receive_thread: no payload received for node_id=%u service_id=%u", + dest_node_id, dest_service_id); + continue; + } - free(payload); - free(args); - return NULL; + err = bp_genl_enqueue_bundle(args->netlink_family, args->netlink_sock, args->netlink_mutex, + payload, payload_size, src_node_id, src_service_id, + dest_node_id, dest_service_id, dlv.adu); + if (err < 0) { + log_error("[ipn:%u.%u] bp_genl_enqueue_bundle: failed with error %d", dest_node_id, + dest_service_id, err); + } else { + log_info("[ipn:%u.%u] ENQUEUE_BUNDLE: incoming bundle queued in the kernel (adu: %llu)", + dest_node_id, dest_service_id, (unsigned long long)dlv.adu); + } -cancel: - // Mark thread as inactive before cleanup - sap_registry_mark_receive_inactive(dest_node_id, dest_service_id); - - err = nl_send_cancel_bundle_request(args->netlink_family, args->netlink_sock, dest_node_id, - dest_service_id); - if (err < 0) { - log_error("[ipn:%u.%u] nl_send_cancel_bundle_request failed with error %d", dest_node_id, - dest_service_id, err); - } else { - log_info("[ipn:%u.%u] CANCEL_BUNDLE_REQUEST: bundle request cancelled", dest_node_id, - dest_service_id); + free(payload); + payload = NULL; } + +out: if (payload) free(payload); free(args); return NULL; diff --git a/daemon/ion.h b/daemon/ion.h index 9c17aa0..ecba04c 100644 --- a/daemon/ion.h +++ b/daemon/ion.h @@ -2,28 +2,31 @@ #define ION_H #include "bp.h" +#include #include extern Sdr sdr; struct ion_recv_args { struct nl_sock *netlink_sock; + pthread_mutex_t *netlink_mutex; int netlink_family; - u_int32_t node_id; - u_int32_t service_id; + struct endpoint_ctx *ctx; }; struct ion_send_args { struct nl_sock *netlink_sock; + pthread_mutex_t *netlink_mutex; int netlink_family; - u_int32_t src_node_id; - u_int32_t src_service_id; + u_int32_t node_id; + u_int32_t service_id; char *dest_eid; void *payload; size_t payload_size; }; -int ion_open_endpoint(u_int32_t node_id, u_int32_t service_id); +int ion_open_endpoint(u_int32_t node_id, u_int32_t service_id, struct nl_sock *netlink_sock, + pthread_mutex_t *netlink_mutex, int netlink_family); int ion_close_endpoint(u_int32_t node_id, u_int32_t service_id); int ion_destroy_bundle(Object adu); void *ion_receive_thread(void *arg); diff --git a/daemon/main.c b/daemon/main.c index 76c9656..fcb4734 100644 --- a/daemon/main.c +++ b/daemon/main.c @@ -18,6 +18,7 @@ int main(int argc, char *argv[]) { Daemon daemon = { .genl_bp_sock = NULL, + .netlink_mutex = PTHREAD_MUTEX_INITIALIZER, .genl_bp_family_name = BP_GENL_NAME, .genl_bp_family_id = -1, .nl_pid = NL_PID, diff --git a/daemon/nl_utils.c b/daemon/nl_utils.c deleted file mode 100644 index d9fc0fc..0000000 --- a/daemon/nl_utils.c +++ /dev/null @@ -1,258 +0,0 @@ -#include "nl_utils.h" -#include -#include -#include -#include -#include -#include - -#include "../include/bp_socket.h" -#include "log.h" - -int nl_send_cancel_bundle_request(int netlink_family, struct nl_sock *netlink_sock, - uint32_t node_id, uint32_t service_id) { - struct nl_msg *msg = NULL; - void *hdr; - int ret; - - msg = nlmsg_alloc(); - if (!msg) { - log_error("[ipn:%u.%u] nl_send_cancel_bundle_request: failed to allocate Netlink msg", - node_id, service_id); - ret = -ENOMEM; - goto out; - } - - hdr = genlmsg_put(msg, NL_AUTO_PORT, NL_AUTO_SEQ, netlink_family, 0, 0, - BP_GENL_CMD_CANCEL_BUNDLE_REQUEST, BP_GENL_VERSION); - if (!hdr) { - log_error("[ipn:%u.%u] nl_send_cancel_bundle_request: failed to create Netlink header", - node_id, service_id); - ret = -EMSGSIZE; - goto err_free_msg; - } - - if (nla_put_u32(msg, BP_GENL_A_DEST_NODE_ID, node_id) < 0) { - log_error("[ipn:%u.%u] nl_send_cancel_bundle_request: failed to add NODE_ID attribute", - node_id, service_id); - ret = -EMSGSIZE; - goto err_free_msg; - } - - if (nla_put_u32(msg, BP_GENL_A_DEST_SERVICE_ID, service_id) < 0) { - log_error("[ipn:%u.%u] nl_send_cancel_bundle_request: failed to add SERVICE_ID attribute", - node_id, service_id); - ret = -EMSGSIZE; - goto err_free_msg; - } - - ret = nl_send_sync(netlink_sock, msg); - if (ret < 0) { - log_error("[ipn:%u.%u] nl_send_cancel_bundle_request: bundle request not cancelled", - node_id, service_id); - ret = -errno; - goto out; - } - - return 0; - -err_free_msg: - nlmsg_free(msg); -out: - return ret; -} - -int nl_send_deliver_bundle(int netlink_family, struct nl_sock *netlink_sock, void *payload, - size_t payload_size, uint32_t src_node_id, uint32_t src_service_id, - uint32_t dest_node_id, uint32_t dest_service_id) { - if (payload_size > (size_t)INT_MAX) { - log_error("[ipn:%u.%u] nl_send_deliver_bundle: payload too large", dest_node_id, - dest_service_id); - return -EMSGSIZE; - } - struct nl_msg *msg = NULL; - void *hdr; - int ret; - - msg = nlmsg_alloc(); - if (!msg) { - log_error("[ipn:%u.%u] nl_send_deliver_bundle: failed to allocate Netlink msg", - dest_node_id, dest_service_id); - ret = -ENOMEM; - goto out; - } - - hdr = genlmsg_put(msg, NL_AUTO_PORT, NL_AUTO_SEQ, netlink_family, 0, 0, - BP_GENL_CMD_DELIVER_BUNDLE, BP_GENL_VERSION); - if (!hdr) { - log_error("[ipn:%u.%u] nl_send_deliver_bundle: failed to create Netlink header", - dest_node_id, dest_service_id); - ret = -EMSGSIZE; - goto err_free_msg; - } - - if (nla_put_u32(msg, BP_GENL_A_DEST_NODE_ID, dest_node_id) < 0) { - log_error("[ipn:%u.%u] nl_send_deliver_bundle: failed to add NODE_ID attribute", - dest_node_id, dest_service_id); - ret = -EMSGSIZE; - goto err_free_msg; - } - - if (nla_put_u32(msg, BP_GENL_A_DEST_SERVICE_ID, dest_service_id) < 0) { - log_error("[ipn:%u.%u] nl_send_deliver_bundle: failed to add SERVICE_ID attribute", - dest_node_id, dest_service_id); - ret = -EMSGSIZE; - goto err_free_msg; - } - - if (nla_put_u32(msg, BP_GENL_A_SRC_NODE_ID, src_node_id) < 0) { - log_error("[ipn:%u.%u] nl_send_deliver_bundle: failed to add SRC_NODE_ID attribute", - src_node_id, src_service_id); - ret = -EMSGSIZE; - goto err_free_msg; - } - - if (nla_put_u32(msg, BP_GENL_A_SRC_SERVICE_ID, src_service_id) < 0) { - log_error("[ipn:%u.%u] nl_send_deliver_bundle: failed to add SRC_SERVICE_ID attribute", - src_node_id, src_service_id); - ret = -EMSGSIZE; - goto err_free_msg; - } - - if (nla_put(msg, BP_GENL_A_PAYLOAD, (int)payload_size, payload) < 0) { - log_error("[ipn:%u.%u] nl_send_deliver_bundle: failed to add PAYLOAD attribute", - dest_node_id, dest_service_id); - ret = -EMSGSIZE; - goto err_free_msg; - } - - ret = nl_send_sync(netlink_sock, msg); - if (ret < 0) { - log_warn("[ipn:%u.%u] DELIVER_BUNDLE: bundle not delivered to kernel, keeping reference in " - "memory (no active BP socket " - "client)", - dest_node_id, dest_service_id); - ret = -ENODEV; - goto out; - } - - return 0; - -err_free_msg: - nlmsg_free(msg); -out: - return ret; -} - -int nl_send_bundle_confirmation(int netlink_family, struct nl_sock *netlink_sock, - uint32_t src_node_id, uint32_t src_service_id) { - struct nl_msg *msg = NULL; - void *hdr; - int ret; - - msg = nlmsg_alloc(); - if (!msg) { - log_error("[ipn:%u.%u] nl_send_bundle_confirmation: failed to allocate Netlink msg", - src_node_id, src_service_id); - ret = -ENOMEM; - goto out; - } - - hdr = genlmsg_put(msg, NL_AUTO_PORT, NL_AUTO_SEQ, netlink_family, 0, 0, - BP_GENL_CMD_SEND_BUNDLE_CONFIRMATION, BP_GENL_VERSION); - if (!hdr) { - log_error("[ipn:%u.%u] nl_send_bundle_confirmation: failed to create Netlink header", - src_node_id, src_service_id); - ret = -EMSGSIZE; - goto err_free_msg; - } - - if (nla_put_u32(msg, BP_GENL_A_SRC_NODE_ID, src_node_id) < 0) { - log_error("[ipn:%u.%u] nl_send_bundle_confirmation: failed to add SRC_NODE_ID attribute", - src_node_id, src_service_id); - ret = -EMSGSIZE; - goto err_free_msg; - } - - if (nla_put_u32(msg, BP_GENL_A_SRC_SERVICE_ID, src_service_id) < 0) { - log_error("[ipn:%u.%u] nl_send_bundle_confirmation: failed to add SRC_SERVICE_ID attribute", - src_node_id, src_service_id); - ret = -EMSGSIZE; - goto err_free_msg; - } - - ret = nl_send_sync(netlink_sock, msg); - if (ret < 0) { - log_error("[ipn:%u.%u] nl_send_bundle_confirmation: confirmation not sent", src_node_id, - src_service_id); - ret = -errno; - goto out; - } - - return 0; - -err_free_msg: - nlmsg_free(msg); -out: - return ret; -} - -int nl_send_bundle_failure(int netlink_family, struct nl_sock *netlink_sock, uint32_t src_node_id, - uint32_t src_service_id, int error_code) { - struct nl_msg *msg = NULL; - void *hdr; - int ret; - - msg = nlmsg_alloc(); - if (!msg) { - log_error("[ipn:%u.%u] nl_send_bundle_failure: failed to allocate Netlink msg", src_node_id, - src_service_id); - ret = -ENOMEM; - goto out; - } - - hdr = genlmsg_put(msg, NL_AUTO_PORT, NL_AUTO_SEQ, netlink_family, 0, 0, - BP_GENL_CMD_SEND_BUNDLE_FAILURE, BP_GENL_VERSION); - if (!hdr) { - log_error("[ipn:%u.%u] nl_send_bundle_failure: failed to create Netlink header", - src_node_id, src_service_id); - ret = -EMSGSIZE; - goto err_free_msg; - } - - if (nla_put_u32(msg, BP_GENL_A_SRC_NODE_ID, src_node_id) < 0) { - log_error("[ipn:%u.%u] nl_send_bundle_failure: failed to add SRC_NODE_ID attribute", - src_node_id, src_service_id); - ret = -EMSGSIZE; - goto err_free_msg; - } - - if (nla_put_u32(msg, BP_GENL_A_SRC_SERVICE_ID, src_service_id) < 0) { - log_error("[ipn:%u.%u] nl_send_bundle_failure: failed to add SRC_SERVICE_ID attribute", - src_node_id, src_service_id); - ret = -EMSGSIZE; - goto err_free_msg; - } - - if (nla_put_u32(msg, BP_GENL_A_ERROR_CODE, (uint32_t)error_code) < 0) { - log_error("[ipn:%u.%u] nl_send_bundle_failure: failed to add ERROR_CODE attribute", - src_node_id, src_service_id); - ret = -EMSGSIZE; - goto err_free_msg; - } - - ret = nl_send_sync(netlink_sock, msg); - if (ret < 0) { - log_error("[ipn:%u.%u] nl_send_bundle_failure: failure notification not sent", src_node_id, - src_service_id); - ret = -errno; - goto out; - } - - return 0; - -err_free_msg: - nlmsg_free(msg); -out: - return ret; -} diff --git a/daemon/nl_utils.h b/daemon/nl_utils.h deleted file mode 100644 index f778a1e..0000000 --- a/daemon/nl_utils.h +++ /dev/null @@ -1,18 +0,0 @@ -#ifndef NL_UTILS_H -#define NL_UTILS_H - -#include -#include -#include - -int nl_send_cancel_bundle_request(int netlink_family, struct nl_sock *netlink_sock, - uint32_t node_id, uint32_t service_id); -int nl_send_deliver_bundle(int netlink_family, struct nl_sock *netlink_sock, void *payload, - size_t payload_size, uint32_t src_node_id, uint32_t src_service_id, - uint32_t dest_node_id, uint32_t dest_service_id); -int nl_send_bundle_confirmation(int netlink_family, struct nl_sock *netlink_sock, - uint32_t src_node_id, uint32_t src_service_id); -int nl_send_bundle_failure(int netlink_family, struct nl_sock *netlink_sock, uint32_t src_node_id, - uint32_t src_service_id, int error_code); - -#endif diff --git a/daemon/sap_registry.c b/daemon/sap_registry.c deleted file mode 100644 index 30278fa..0000000 --- a/daemon/sap_registry.c +++ /dev/null @@ -1,104 +0,0 @@ -#include "sap_registry.h" -#include -#include - -static pthread_mutex_t sap_mutex = PTHREAD_MUTEX_INITIALIZER; -static sap_node_t *sap_head = NULL; - -static sap_node_t *find_node(uint32_t node_id, uint32_t service_id, sap_node_t **prev) { - sap_node_t *p = sap_head; - sap_node_t *q = NULL; - while (p) { - if (p->key.node_id == node_id && p->key.service_id == service_id) { - if (prev) *prev = q; - return p; - } - q = p; - p = p->next; - } - if (prev) *prev = NULL; - return NULL; -} - -bool sap_registry_contains(uint32_t node_id, uint32_t service_id) { - pthread_mutex_lock(&sap_mutex); - bool exists = find_node(node_id, service_id, NULL) != NULL; - pthread_mutex_unlock(&sap_mutex); - return exists; -} - -int sap_registry_add(uint32_t node_id, uint32_t service_id, BpSAP sap) { - int ret = 0; - sap_node_t *node; - pthread_mutex_lock(&sap_mutex); - if (find_node(node_id, service_id, NULL)) { - pthread_mutex_unlock(&sap_mutex); - return 0; - } - pthread_mutex_unlock(&sap_mutex); - - node = (sap_node_t *)calloc(1, sizeof(sap_node_t)); - if (!node) return -ENOMEM; - node->key.node_id = node_id; - node->key.service_id = service_id; - node->sap = sap; - node->has_active_receive = false; - - pthread_mutex_lock(&sap_mutex); - node->next = sap_head; - sap_head = node; - pthread_mutex_unlock(&sap_mutex); - return ret; -} - -BpSAP sap_registry_get(uint32_t node_id, uint32_t service_id) { - pthread_mutex_lock(&sap_mutex); - sap_node_t *node = find_node(node_id, service_id, NULL); - BpSAP sap = node ? node->sap : NULL; - pthread_mutex_unlock(&sap_mutex); - return sap; -} - -int sap_registry_remove(uint32_t node_id, uint32_t service_id) { - pthread_mutex_lock(&sap_mutex); - sap_node_t *prev = NULL; - sap_node_t *node = find_node(node_id, service_id, &prev); - if (!node) { - pthread_mutex_unlock(&sap_mutex); - return 0; - } - if (prev) - prev->next = node->next; - else - sap_head = node->next; - pthread_mutex_unlock(&sap_mutex); - free(node); - return 0; -} - -// Helper functions for receive thread tracking -void sap_registry_mark_receive_active(uint32_t node_id, uint32_t service_id) { - pthread_mutex_lock(&sap_mutex); - sap_node_t *node = find_node(node_id, service_id, NULL); - if (node) { - node->has_active_receive = true; - } - pthread_mutex_unlock(&sap_mutex); -} - -void sap_registry_mark_receive_inactive(uint32_t node_id, uint32_t service_id) { - pthread_mutex_lock(&sap_mutex); - sap_node_t *node = find_node(node_id, service_id, NULL); - if (node) { - node->has_active_receive = false; - } - pthread_mutex_unlock(&sap_mutex); -} - -bool sap_registry_has_active_receive(uint32_t node_id, uint32_t service_id) { - pthread_mutex_lock(&sap_mutex); - sap_node_t *node = find_node(node_id, service_id, NULL); - bool has_active = node ? node->has_active_receive : false; - pthread_mutex_unlock(&sap_mutex); - return has_active; -} diff --git a/daemon/sap_registry.h b/daemon/sap_registry.h deleted file mode 100644 index b7f001d..0000000 --- a/daemon/sap_registry.h +++ /dev/null @@ -1,30 +0,0 @@ -#ifndef SAP_REGISTRY_H -#define SAP_REGISTRY_H - -#include "bp.h" -#include -#include - -typedef struct sap_key { - uint32_t node_id; - uint32_t service_id; -} sap_key_t; - -typedef struct sap_node { - sap_key_t key; - BpSAP sap; - bool has_active_receive; - struct sap_node *next; -} sap_node_t; - -bool sap_registry_contains(uint32_t node_id, uint32_t service_id); -int sap_registry_add(uint32_t node_id, uint32_t service_id, BpSAP sap); -BpSAP sap_registry_get(uint32_t node_id, uint32_t service_id); -int sap_registry_remove(uint32_t node_id, uint32_t service_id); - -// Helper functions for receive thread tracking -void sap_registry_mark_receive_active(uint32_t node_id, uint32_t service_id); -void sap_registry_mark_receive_inactive(uint32_t node_id, uint32_t service_id); -bool sap_registry_has_active_receive(uint32_t node_id, uint32_t service_id); - -#endif diff --git a/include/bp_socket.h b/include/bp_socket.h index 51368f4..c87e9bd 100644 --- a/include/bp_socket.h +++ b/include/bp_socket.h @@ -22,7 +22,7 @@ enum bp_genl_attrs { BP_GENL_A_DEST_NODE_ID, BP_GENL_A_DEST_SERVICE_ID, BP_GENL_A_PAYLOAD, - BP_GENL_A_ERROR_CODE, + BP_GENL_A_ADU, __BP_GENL_A_MAX, }; @@ -32,11 +32,7 @@ enum bp_genl_attrs { enum bp_genl_cmds { BP_GENL_CMD_UNSPEC, BP_GENL_CMD_SEND_BUNDLE, - BP_GENL_CMD_SEND_BUNDLE_CONFIRMATION, - BP_GENL_CMD_SEND_BUNDLE_FAILURE, - BP_GENL_CMD_REQUEST_BUNDLE, - BP_GENL_CMD_CANCEL_BUNDLE_REQUEST, - BP_GENL_CMD_DELIVER_BUNDLE, + BP_GENL_CMD_ENQUEUE_BUNDLE, BP_GENL_CMD_DESTROY_BUNDLE, BP_GENL_CMD_OPEN_ENDPOINT, BP_GENL_CMD_CLOSE_ENDPOINT, diff --git a/receiver.c b/receiver.c index 737e974..840fab7 100644 --- a/receiver.c +++ b/receiver.c @@ -7,18 +7,21 @@ #include #include #include +#include #include #define BUFFER_SIZE 1024 #define AF_BP 28 // Custom socket family identifier +volatile int running = 1; + void handle_sigint(int sig) { printf("\nInterrupt received, shutting down...\n"); - exit(1); + running = 0; } int main(int argc, char *argv[]) { - int sfd; + int fd; struct sockaddr_bp addr_bp; struct msghdr msg; struct iovec iov; @@ -48,20 +51,30 @@ int main(int argc, char *argv[]) { return EXIT_FAILURE; } - sfd = socket(AF_BP, SOCK_DGRAM, 1); - if (sfd < 0) { + fd = socket(AF_BP, SOCK_DGRAM, 1); + if (fd < 0) { perror("socket creation failed"); return EXIT_FAILURE; } printf("Socket created.\n"); + struct timeval tv; + tv.tv_sec = 3; + tv.tv_usec = 0; + if (setsockopt(fd, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv)) < 0) { + perror("Failed to set receive timeout"); + ret = EXIT_FAILURE; + goto out; + } + printf("Receive timeout set to 3 seconds.\n"); + memset(&addr_bp, 0, sizeof(addr_bp)); addr_bp.bp_family = AF_BP; addr_bp.bp_scheme = BP_SCHEME_IPN; addr_bp.bp_addr.ipn.node_id = node_id; addr_bp.bp_addr.ipn.service_id = service_id; - if (bind(sfd, (struct sockaddr *)&addr_bp, sizeof(addr_bp)) == -1) { + if (bind(fd, (struct sockaddr *)&addr_bp, sizeof(addr_bp)) == -1) { perror("Failed to bind socket"); ret = EXIT_FAILURE; goto out; @@ -77,23 +90,45 @@ int main(int argc, char *argv[]) { msg.msg_namelen = sizeof(src_addr); printf("Listening for incoming messages...\n"); - ssize_t n = recvmsg(sfd, &msg, 0); - if (n < 0) { - perror("recvmsg failed"); - ret = EXIT_FAILURE; - goto out; - } - - printf("Received message (%zd bytes): %.*s\n", n, (int)n, buffer); - if (msg.msg_namelen >= sizeof(struct sockaddr_bp)) { - printf("Bundle sent by ipn:%u.%u\n", src_addr.bp_addr.ipn.node_id, - src_addr.bp_addr.ipn.service_id); - } else { - printf("Source address not available\n"); + printf("Press Ctrl+C to exit.\n"); + + while (running) { + ssize_t n = recvmsg(fd, &msg, 0); + if (n < 0) { + if (errno == EINTR) { + // Interrupted by signal, exit gracefully + printf("\nInterrupted by signal, exiting...\n"); + break; + } + if (errno == EAGAIN) { + // Timeout occurred + printf("Timeout waiting for message, continuing...\n"); + continue; + } + perror("recvmsg failed"); + ret = EXIT_FAILURE; + goto out; + } + + printf("Received message (%zd bytes): %.*s\n", n, (int)n, buffer); + if (msg.msg_namelen >= sizeof(struct sockaddr_bp)) { + printf("Bundle sent by ipn:%u.%u\n", src_addr.bp_addr.ipn.node_id, + src_addr.bp_addr.ipn.service_id); + } else { + printf("Source address not available\n"); + } + + // Reset message structure for next reception + memset(&msg, 0, sizeof(msg)); + msg.msg_iov = &iov; + msg.msg_iovlen = 1; + memset(&src_addr, 0, sizeof(src_addr)); + msg.msg_name = &src_addr; + msg.msg_namelen = sizeof(src_addr); } out: - close(sfd); + close(fd); printf("Socket closed.\n"); return ret; diff --git a/sender.c b/sender.c index e7d717a..bd1a849 100644 --- a/sender.c +++ b/sender.c @@ -41,7 +41,7 @@ int main(int argc, char *argv[]) { src_addr.bp_family = AF_BP; src_addr.bp_scheme = BP_SCHEME_IPN; src_addr.bp_addr.ipn.node_id = 10; - src_addr.bp_addr.ipn.service_id = 1; + src_addr.bp_addr.ipn.service_id = 2; if (bind(fd, (struct sockaddr *)&src_addr, sizeof(src_addr)) == -1) { perror("Failed to bind socket");