diff --git a/bp_socket/af_bp.c b/bp_socket/af_bp.c index 9bf9d7b..4071b31 100644 --- a/bp_socket/af_bp.c +++ b/bp_socket/af_bp.c @@ -30,8 +30,16 @@ static struct sock* bp_alloc_socket(struct net* net, int kern) bp = bp_sk(sk); skb_queue_head_init(&bp->rx_queue); init_waitqueue_head(&bp->rx_waitq); + init_waitqueue_head(&bp->tx_waitq); + + mutex_init(&bp->tx_mutex); + mutex_init(&bp->rx_mutex); + bp->bp_node_id = 0; bp->bp_service_id = 0; + bp->rx_canceled = false; + bp->tx_confirmed = false; + bp->tx_error = false; } return sk; @@ -159,6 +167,10 @@ int bp_bind(struct socket* sock, struct sockaddr* uaddr, int addr_len) write_unlock_bh(&bp_list_lock); release_sock(sk); + // Notify user-space daemon to open endpoint (bp_open) and prepare + // threads/state + open_endpoint_doit(node_id, service_id, 8443); + return 0; out: @@ -180,6 +192,13 @@ int bp_release(struct socket* sock) write_unlock_bh(&bp_list_lock); skb_queue_purge(&bp->rx_queue); + // Notify user-space daemon to close endpoint (bp_close) and + // cleanup + if (bp->bp_node_id && bp->bp_service_id) { + close_endpoint_doit( + bp->bp_node_id, bp->bp_service_id, 8443); + } + sock->sk = NULL; release_sock(sk); sock_put(sk); @@ -279,6 +298,43 @@ int bp_sendmsg(struct socket* sock, struct msghdr* msg, size_t size) goto err_free; } + // Wait for confirmation from daemon + pr_info("bp_sendmsg: waiting for confirmation for endpoint " + "ipn:%u.%u\n", + bp->bp_node_id, bp->bp_service_id); + + // Reset confirmation flag and wait + mutex_lock(&bp->tx_mutex); + bp->tx_confirmed = false; + bp->tx_error = false; + mutex_unlock(&bp->tx_mutex); + + ret = wait_event_interruptible( + bp->tx_waitq, bp->tx_confirmed || signal_pending(current)); + + if (ret < 0) { + pr_err("bp_sendmsg: interrupted while waiting for " + "confirmation\n"); + goto err_free; + } + + mutex_lock(&bp->tx_mutex); + if (bp->tx_confirmed) { + if (bp->tx_error) { + pr_err("bp_sendmsg: bundle send failed for " + "endpoint ipn:%u.%u\n", + bp->bp_node_id, bp->bp_service_id); + mutex_unlock(&bp->tx_mutex); + ret = -EIO; // Return error to user space + goto err_free; + } else { + pr_info("bp_sendmsg: confirmation received for " + "endpoint ipn:%u.%u\n", + bp->bp_node_id, bp->bp_service_id); + } + } + mutex_unlock(&bp->tx_mutex); + kfree(payload); } @@ -294,7 +350,7 @@ int bp_recvmsg(struct socket* sock, struct msghdr* msg, size_t size, int flags) { struct sock* sk; struct bp_sock* bp; - struct sk_buff* skb; + struct sk_buff* skb = NULL; struct sockaddr_bp* src_addr; int ret; @@ -321,11 +377,14 @@ int bp_recvmsg(struct socket* sock, struct msghdr* msg, size_t size, int flags) goto out; } + mutex_lock(&bp->rx_mutex); if (bp->rx_canceled) { pr_info("bp_recvmsg: bundle request canceled\n"); + mutex_unlock(&bp->rx_mutex); ret = -ECANCELED; goto out; } + mutex_unlock(&bp->rx_mutex); if (sock_flag(sk, SOCK_DEAD)) { pr_err("bp_recvmsg: socket closed while waiting\n"); @@ -333,13 +392,16 @@ int bp_recvmsg(struct socket* sock, struct msghdr* msg, size_t size, int flags) goto out; } + mutex_lock(&bp->rx_mutex); skb = skb_dequeue(&bp->rx_queue); if (!skb) { pr_info("bp_recvmsg: no messages in the queue for service %d\n", bp->bp_service_id); + mutex_unlock(&bp->rx_mutex); ret = -ENOMSG; goto out; } + mutex_unlock(&bp->rx_mutex); if (skb->len > size) { pr_err("bp_recvmsg: buffer too small for message (required=%u, " diff --git a/bp_socket/af_bp.h b/bp_socket/af_bp.h index d6b4446..4f8688a 100644 --- a/bp_socket/af_bp.h +++ b/bp_socket/af_bp.h @@ -20,10 +20,19 @@ extern const struct net_proto_family bp_family_ops; struct bp_sock { struct sock sk; u_int32_t bp_node_id; - u_int8_t bp_service_id; + u_int32_t bp_service_id; + + // Reception + struct mutex rx_mutex; struct sk_buff_head rx_queue; wait_queue_head_t rx_waitq; bool rx_canceled; + + // Transmission + struct mutex tx_mutex; + wait_queue_head_t tx_waitq; + bool tx_confirmed; + bool tx_error; }; int bp_bind(struct socket* sock, struct sockaddr* addr, int addr_len); diff --git a/bp_socket/bp_genl.c b/bp_socket/bp_genl.c index 78c028f..c9e5d8f 100644 --- a/bp_socket/bp_genl.c +++ b/bp_socket/bp_genl.c @@ -1,6 +1,7 @@ #include "bp_genl.h" #include "../include/bp_socket.h" #include "af_bp.h" +#include #include static const struct nla_policy nla_policy[BP_GENL_A_MAX + 1] = { @@ -10,6 +11,7 @@ static const struct nla_policy nla_policy[BP_GENL_A_MAX + 1] = { [BP_GENL_A_DEST_NODE_ID] = { .type = NLA_U32 }, [BP_GENL_A_DEST_SERVICE_ID] = { .type = NLA_U32 }, [BP_GENL_A_PAYLOAD] = { .type = NLA_BINARY }, + [BP_GENL_A_ERROR_CODE] = { .type = NLA_U32 }, }; static struct genl_ops genl_ops[] = { { @@ -25,6 +27,20 @@ static struct genl_ops genl_ops[] = { { .policy = nla_policy, .doit = cancel_bundle_request_doit, .dumpit = NULL, + }, + { + .cmd = BP_GENL_CMD_SEND_BUNDLE_CONFIRMATION, + .flags = GENL_ADMIN_PERM, + .policy = nla_policy, + .doit = send_bundle_confirmation_doit, + .dumpit = NULL, + }, + { + .cmd = BP_GENL_CMD_SEND_BUNDLE_FAILURE, + .flags = GENL_ADMIN_PERM, + .policy = nla_policy, + .doit = send_bundle_failure_doit, + .dumpit = NULL, } }; /* Multicast groups for our family */ @@ -196,7 +212,9 @@ int cancel_bundle_request_doit(struct sk_buff* skb, struct genl_info* info) && bp->bp_service_id == dest_service_id) { if (waitqueue_active(&bp->rx_waitq)) { + mutex_lock(&bp->rx_mutex); bp->rx_canceled = true; + mutex_unlock(&bp->rx_mutex); wake_up_interruptible(&bp->rx_waitq); } bh_unlock_sock(sk); @@ -256,7 +274,9 @@ int deliver_bundle_doit(struct sk_buff* skb, struct genl_info* info) if (bp->bp_node_id == dest_node_id && bp->bp_service_id == dest_service_id) { + mutex_lock(&bp->rx_mutex); skb_queue_tail(&bp->rx_queue, new_skb); + mutex_unlock(&bp->rx_mutex); new_skb_queued = true; if (waitqueue_active(&bp->rx_waitq)) { wake_up_interruptible(&bp->rx_waitq); @@ -326,6 +346,192 @@ int destroy_bundle_doit( genlmsg_end(msg, msg_head); return genlmsg_unicast(&init_net, msg, port_id); +err_cancel: + genlmsg_cancel(msg, msg_head); +err_free: + nlmsg_free(msg); +out: + return ret; +} + +int open_endpoint_doit(u_int32_t node_id, u_int32_t service_id, int port_id) +{ + void* msg_head; + struct sk_buff* msg; + size_t msg_size; + int ret; + + msg_size = 2 * nla_total_size(sizeof(u_int32_t)); + msg = genlmsg_new(msg_size, GFP_KERNEL); + if (!msg) { + pr_err("open_endpoint: failed to allocate message buffer\n"); + ret = -ENOMEM; + goto out; + } + + msg_head + = genlmsg_put(msg, 0, 0, &genl_fam, 0, BP_GENL_CMD_OPEN_ENDPOINT); + if (!msg_head) { + pr_err("open_endpoint: failed to create genetlink header\n"); + ret = -EMSGSIZE; + goto err_free; + } + + ret = nla_put_u32(msg, BP_GENL_A_DEST_NODE_ID, node_id); + if (ret) { + pr_err("open_endpoint: failed to put BP_GENL_A_DEST_NODE_ID " + "(%d)\n", + ret); + goto err_cancel; + } + + ret = nla_put_u32(msg, BP_GENL_A_DEST_SERVICE_ID, service_id); + if (ret) { + pr_err("open_endpoint: failed to put BP_GENL_A_DEST_SERVICE_ID " + "(%d)\n", + ret); + goto err_cancel; + } + + genlmsg_end(msg, msg_head); + return genlmsg_unicast(&init_net, msg, port_id); + +err_cancel: + genlmsg_cancel(msg, msg_head); +err_free: + nlmsg_free(msg); +out: + return ret; +} + +int send_bundle_confirmation_doit(struct sk_buff* skb, struct genl_info* info) +{ + u_int32_t src_node_id, src_service_id; + struct sock* sk; + struct bp_sock* bp; + + if (!info->attrs[BP_GENL_A_SRC_NODE_ID] + || !info->attrs[BP_GENL_A_SRC_SERVICE_ID]) { + pr_err("send_bundle_confirmation_doit: missing attribute(s)\n"); + return -EINVAL; + } + + src_node_id = nla_get_u32(info->attrs[BP_GENL_A_SRC_NODE_ID]); + src_service_id = nla_get_u32(info->attrs[BP_GENL_A_SRC_SERVICE_ID]); + + pr_info("send_bundle_confirmation_doit: received confirmation for " + "ipn:%u.%u\n", + src_node_id, src_service_id); + + // Find the socket waiting for confirmation + read_lock_bh(&bp_list_lock); + sk_for_each(sk, &bp_list) + { + bp = bp_sk(sk); + if (bp->bp_node_id == src_node_id + && bp->bp_service_id == src_service_id) { + // Found the socket, wake it up + mutex_lock(&bp->tx_mutex); + bp->tx_confirmed = true; + mutex_unlock(&bp->tx_mutex); + wake_up(&bp->tx_waitq); + pr_info( + "send_bundle_confirmation_doit: woke up socket for " + "ipn:%u.%u\n", + src_node_id, src_service_id); + break; + } + } + read_unlock_bh(&bp_list_lock); + + return 0; +} + +int send_bundle_failure_doit(struct sk_buff* skb, struct genl_info* info) +{ + u_int32_t src_node_id, src_service_id; + struct sock* sk; + struct bp_sock* bp; + + if (!info->attrs[BP_GENL_A_SRC_NODE_ID] + || !info->attrs[BP_GENL_A_SRC_SERVICE_ID]) { + pr_err("send_bundle_failure_doit: missing attribute(s)\n"); + return -EINVAL; + } + + src_node_id = nla_get_u32(info->attrs[BP_GENL_A_SRC_NODE_ID]); + src_service_id = nla_get_u32(info->attrs[BP_GENL_A_SRC_SERVICE_ID]); + + pr_info("send_bundle_failure_doit: received failure notification for " + "ipn:%u.%u\n", + src_node_id, src_service_id); + + // Find the socket waiting for confirmation and mark it as failed + read_lock_bh(&bp_list_lock); + sk_for_each(sk, &bp_list) + { + bp = bp_sk(sk); + if (bp->bp_node_id == src_node_id + && bp->bp_service_id == src_service_id) { + // Found the socket, mark as failed and wake it up + mutex_lock(&bp->tx_mutex); + bp->tx_confirmed = true; + bp->tx_error = true; + mutex_unlock(&bp->tx_mutex); + wake_up(&bp->tx_waitq); + pr_info("send_bundle_failure_doit: woke up socket for " + "ipn:%u.%u with failure\n", + src_node_id, src_service_id); + break; + } + } + read_unlock_bh(&bp_list_lock); + + return 0; +} + +int close_endpoint_doit(u_int32_t node_id, u_int32_t service_id, int port_id) +{ + void* msg_head; + struct sk_buff* msg; + size_t msg_size; + int ret; + + msg_size = 2 * nla_total_size(sizeof(u_int32_t)); + msg = genlmsg_new(msg_size, GFP_KERNEL); + if (!msg) { + pr_err("close_endpoint: failed to allocate message buffer\n"); + ret = -ENOMEM; + goto out; + } + + msg_head + = genlmsg_put(msg, 0, 0, &genl_fam, 0, BP_GENL_CMD_CLOSE_ENDPOINT); + if (!msg_head) { + pr_err("close_endpoint: failed to create genetlink header\n"); + ret = -EMSGSIZE; + goto err_free; + } + + ret = nla_put_u32(msg, BP_GENL_A_DEST_NODE_ID, node_id); + if (ret) { + pr_err("close_endpoint: failed to put BP_GENL_A_DEST_NODE_ID " + "(%d)\n", + ret); + goto err_cancel; + } + + ret = nla_put_u32(msg, BP_GENL_A_DEST_SERVICE_ID, service_id); + if (ret) { + pr_err("close_endpoint: failed to put " + "BP_GENL_A_DEST_SERVICE_ID (%d)\n", + ret); + goto err_cancel; + } + + genlmsg_end(msg, msg_head); + return genlmsg_unicast(&init_net, msg, port_id); + err_cancel: genlmsg_cancel(msg, msg_head); err_free: diff --git a/bp_socket/bp_genl.h b/bp_socket/bp_genl.h index 3dc1153..b21e983 100644 --- a/bp_socket/bp_genl.h +++ b/bp_socket/bp_genl.h @@ -5,9 +5,13 @@ extern struct genl_family genl_fam; +int open_endpoint_doit(u_int32_t node_id, u_int32_t service_id, int port_id); +int close_endpoint_doit(u_int32_t node_id, u_int32_t service_id, int port_id); int send_bundle_doit(void* payload, size_t payload_size, u_int32_t dest_node_id, u_int32_t dest_service_id, u_int32_t src_node_id, u_int32_t src_service_id, int port_id); +int send_bundle_confirmation_doit(struct sk_buff* skb, struct genl_info* info); +int send_bundle_failure_doit(struct sk_buff* skb, struct genl_info* info); int deliver_bundle_doit(struct sk_buff* skb, struct genl_info* info); int request_bundle_doit( u_int32_t dest_node_id, u_int32_t dest_service_id, int port_id); diff --git a/daemon/Makefile b/daemon/Makefile index 2b7fd35..f7bf80c 100644 --- a/daemon/Makefile +++ b/daemon/Makefile @@ -1,6 +1,6 @@ CC := gcc -CFLAGS := -Wall -Wno-deprecated-declarations -g -LDFLAGS := -lpthread -lbp -lici -lm \ +CFLAGS := -Wall -Wextra -Wpedantic -Wconversion -Wshadow -Wformat=2 -Wvla -fstack-protector-strong -D_FORTIFY_SOURCE=3 -fPIE -g -Wno-deprecated-declarations +LDFLAGS := -Wl,-z,relro,-z,now -pie -lpthread -lbp -lici -lm \ $(shell pkg-config --libs libevent libnl-genl-3.0) INCLUDES := $(shell pkg-config --cflags libnl-3.0) @@ -15,7 +15,7 @@ SRC_FILES := $(wildcard *.c *.h) all: $(EXEC) -release: CFLAGS := -Wall -Wno-deprecated-declarations -O3 -DNO_LOG +release: CFLAGS := -Wall -Wextra -Wpedantic -Wconversion -Wshadow -Wformat=2 -Wvla -fstack-protector-strong -D_FORTIFY_SOURCE=3 -fPIE -O3 -DNO_LOG -Wno-deprecated-declarations release: all $(EXEC): $(OBJECTS) diff --git a/daemon/adu_ref.c b/daemon/adu_ref.c deleted file mode 100644 index 902067d..0000000 --- a/daemon/adu_ref.c +++ /dev/null @@ -1,79 +0,0 @@ -#include "adu_ref.h" -#include "log.h" -#include "sdr.h" -#include -#include - -static struct adu_reference *adu_refs = NULL; -static pthread_mutex_t adu_refs_mutex = PTHREAD_MUTEX_INITIALIZER; - -int add_adu(Sdr sdr, Object adu, u_int32_t dest_node_id, u_int32_t dest_service_id, - u_int32_t src_node_id, u_int32_t src_service_id) { - struct adu_reference *ref; - - if (pthread_mutex_lock(&adu_refs_mutex) != 0) { - log_error("add_adu: Failed to lock SDR mutex."); - return -1; - } - - ref = malloc(sizeof(struct adu_reference)); - if (!ref) { - log_error("add_adu: Failed to allocate memory for bundle reference."); - pthread_mutex_unlock(&adu_refs_mutex); - return -ENOMEM; - } - - ref->adu = adu; - ref->dest_node_id = dest_node_id; - ref->dest_service_id = dest_service_id; - ref->src_node_id = src_node_id; - ref->src_service_id = src_service_id; - ref->next = adu_refs; - adu_refs = ref; - - pthread_mutex_unlock(&adu_refs_mutex); - return 0; -} - -struct adu_reference *find_adu_ref(Sdr sdr, u_int32_t dest_node_id, u_int32_t dest_service_id) { - struct adu_reference *ref; - - for (ref = adu_refs; ref != NULL; ref = ref->next) { - if (ref->dest_node_id == dest_node_id && ref->dest_service_id == dest_service_id) { - return ref; - } - } - return NULL; -} - -Object remove_adu_ref(Sdr sdr, u_int32_t dest_node_id, u_int32_t dest_service_id) { - struct adu_reference *prev = NULL; - struct adu_reference *current = adu_refs; - Object adu = 0; - - if (pthread_mutex_lock(&adu_refs_mutex) != 0) { - log_error("remove_adu_ref: Failed to lock adu_refs mutex."); - return -EAGAIN; - } - - while (current) { - if (current->dest_node_id == dest_node_id && current->dest_service_id == dest_service_id) { - adu = current->adu; - if (prev) { - prev->next = current->next; - } else { - adu_refs = current->next; - } - - free(current); - pthread_mutex_unlock(&adu_refs_mutex); - return adu; - } - prev = current; - current = current->next; - } - - pthread_mutex_unlock(&adu_refs_mutex); - log_warn("remove_adu_ref: no bundle found (ipn:%u.%u)", dest_node_id, dest_service_id); - return adu; -} \ No newline at end of file diff --git a/daemon/adu_ref.h b/daemon/adu_ref.h deleted file mode 100644 index 8fb42ab..0000000 --- a/daemon/adu_ref.h +++ /dev/null @@ -1,20 +0,0 @@ -#ifndef ADU_REF_H -#define ADU_REF_H - -#include "bp.h" - -struct adu_reference { - Object adu; - u_int32_t dest_node_id; - u_int32_t dest_service_id; - u_int32_t src_node_id; - u_int32_t src_service_id; - struct adu_reference *next; -}; - -int add_adu(Sdr sdr, Object adu, u_int32_t dest_node_id, u_int32_t dest_service_id, - u_int32_t src_node_id, u_int32_t src_service_id); -struct adu_reference *find_adu_ref(Sdr sdr, u_int32_t dest_node_id, u_int32_t dest_service_id); -Object remove_adu_ref(Sdr sdr, u_int32_t dest_node_id, u_int32_t dest_service_id); - -#endif \ No newline at end of file diff --git a/daemon/adu_registry.c b/daemon/adu_registry.c new file mode 100644 index 0000000..f28a98c --- /dev/null +++ b/daemon/adu_registry.c @@ -0,0 +1,91 @@ +#include "adu_registry.h" +#include "log.h" +#include +#include + +static pthread_mutex_t adu_mutex = PTHREAD_MUTEX_INITIALIZER; +static adu_node_t *adu_head = NULL; + +static adu_node_t *find_node(uint32_t node_id, uint32_t service_id, adu_node_t **prev) { + adu_node_t *p = adu_head; + adu_node_t *q = NULL; + while (p) { + if (p->key.node_id == node_id && p->key.service_id == service_id) { + if (prev) *prev = q; + return p; + } + q = p; + p = p->next; + } + if (prev) *prev = NULL; + return NULL; +} + +bool adu_registry_contains(uint32_t node_id, uint32_t service_id) { + pthread_mutex_lock(&adu_mutex); + bool exists = find_node(node_id, service_id, NULL) != NULL; + pthread_mutex_unlock(&adu_mutex); + return exists; +} + +int adu_registry_add(Object adu, uint32_t dest_node_id, uint32_t dest_service_id, + uint32_t src_node_id, uint32_t src_service_id) { + int ret = 0; + adu_node_t *node; + + pthread_mutex_lock(&adu_mutex); + if (find_node(dest_node_id, dest_service_id, NULL)) { + pthread_mutex_unlock(&adu_mutex); + return 0; // Already exists + } + pthread_mutex_unlock(&adu_mutex); + + node = (adu_node_t *)calloc(1, sizeof(adu_node_t)); + if (!node) { + log_error("adu_registry_add: Failed to allocate memory for bundle reference"); + return -ENOMEM; + } + + node->key.node_id = dest_node_id; + node->key.service_id = dest_service_id; + node->src_node_id = src_node_id; + node->src_service_id = src_service_id; + node->adu = adu; + + pthread_mutex_lock(&adu_mutex); + node->next = adu_head; + adu_head = node; + pthread_mutex_unlock(&adu_mutex); + + return ret; +} + +adu_node_t *adu_registry_get(uint32_t node_id, uint32_t service_id) { + pthread_mutex_lock(&adu_mutex); + adu_node_t *node = find_node(node_id, service_id, NULL); + pthread_mutex_unlock(&adu_mutex); + return node; +} + +Object adu_registry_remove(uint32_t node_id, uint32_t service_id) { + pthread_mutex_lock(&adu_mutex); + adu_node_t *prev = NULL; + adu_node_t *node = find_node(node_id, service_id, &prev); + Object adu = 0; + + if (!node) { + pthread_mutex_unlock(&adu_mutex); + log_warn("adu_registry_remove: no bundle found (ipn:%u.%u)", node_id, service_id); + return 0; + } + + adu = node->adu; + if (prev) + prev->next = node->next; + else + adu_head = node->next; + + pthread_mutex_unlock(&adu_mutex); + free(node); + return adu; +} diff --git a/daemon/adu_registry.h b/daemon/adu_registry.h new file mode 100644 index 0000000..e575bda --- /dev/null +++ b/daemon/adu_registry.h @@ -0,0 +1,27 @@ +#ifndef ADU_REGISTRY_H +#define ADU_REGISTRY_H + +#include "bp.h" +#include +#include + +typedef struct adu_key { + uint32_t node_id; + uint32_t service_id; +} adu_key_t; + +typedef struct adu_node { + adu_key_t key; + uint32_t src_node_id; + uint32_t src_service_id; + Object adu; + struct adu_node *next; +} adu_node_t; + +bool adu_registry_contains(uint32_t node_id, uint32_t service_id); +int adu_registry_add(Object adu, uint32_t dest_node_id, uint32_t dest_service_id, + uint32_t src_node_id, uint32_t src_service_id); +adu_node_t *adu_registry_get(uint32_t node_id, uint32_t service_id); +Object adu_registry_remove(uint32_t node_id, uint32_t service_id); + +#endif diff --git a/daemon/bp_genl.c b/daemon/bp_genl.c index 51158bc..de610aa 100644 --- a/daemon/bp_genl.c +++ b/daemon/bp_genl.c @@ -67,7 +67,7 @@ int genl_bp_sock_recvmsg_cb(struct nl_msg *msg, void *arg) { err = nla_parse(attrs, BP_GENL_A_MAX, genlmsg_attrdata(genlhdr, 0), genlmsg_attrlen(genlhdr, 0), NULL); if (err < 0) { - log_error("Failed to parse Netlink attributes: %s", strerror(-err)); + log_error("Failed to parse Netlink attributes: %s", nl_geterror(err)); return NL_SKIP; } @@ -76,6 +76,10 @@ int genl_bp_sock_recvmsg_cb(struct nl_msg *msg, void *arg) { return handle_send_bundle(daemon, attrs); case BP_GENL_CMD_REQUEST_BUNDLE: return handle_request_bundle(daemon, attrs); + case BP_GENL_CMD_OPEN_ENDPOINT: + return handle_open_endpoint(daemon, attrs); + case BP_GENL_CMD_CLOSE_ENDPOINT: + return handle_close_endpoint(daemon, attrs); // case BP_GENL_CMD_DELIVER_BUNDLE: // return handle_deliver_bundle_reply(daemon, attrs); case BP_GENL_CMD_DESTROY_BUNDLE: diff --git a/daemon/bp_genl.h b/daemon/bp_genl.h index d7b3d6b..e8e9e42 100644 --- a/daemon/bp_genl.h +++ b/daemon/bp_genl.h @@ -6,8 +6,6 @@ struct nl_sock *genl_bp_sock_init(Daemon *daemon); void genl_bp_sock_close(Daemon *daemon); -int genl_bp_sock_sendmsg(Daemon *self, void *payload, size_t len); -int genl_bp_sock_recvmsg(Daemon *self, void *payload, size_t len); int genl_bp_sock_recvmsg_cb(struct nl_msg *msg, void *arg); #endif diff --git a/daemon/bp_genl_handlers.c b/daemon/bp_genl_handlers.c index dbaa63c..80cd320 100644 --- a/daemon/bp_genl_handlers.c +++ b/daemon/bp_genl_handlers.c @@ -8,19 +8,64 @@ #include #include "../include/bp_socket.h" -#include "adu_ref.h" +#include "adu_registry.h" #include "bp.h" #include "bp_genl_handlers.h" #include "daemon.h" #include "ion.h" #include "log.h" +#include + +int handle_open_endpoint(Daemon *daemon, struct nlattr **attrs) { + u_int32_t node_id, service_id; + (void)daemon; + + if (!attrs[BP_GENL_A_DEST_NODE_ID] || !attrs[BP_GENL_A_DEST_SERVICE_ID]) { + log_error("handle_open_endpoint: missing attribute(s)"); + return -EINVAL; + } + node_id = nla_get_u32(attrs[BP_GENL_A_DEST_NODE_ID]); + service_id = nla_get_u32(attrs[BP_GENL_A_DEST_SERVICE_ID]); + + log_info("[ipn:%u.%u] OPEN_ENDPOINT: opening endpoint", node_id, service_id); + int ret = ion_open_endpoint(node_id, service_id); + if (ret == 0) { + log_info("[ipn:%u.%u] OPEN_ENDPOINT: endpoint opened successfully", node_id, service_id); + } else { + log_error("[ipn:%u.%u] OPEN_ENDPOINT: failed to open endpoint (error %d)", node_id, + service_id, ret); + } + return ret; +} + +int handle_close_endpoint(Daemon *daemon, struct nlattr **attrs) { + u_int32_t node_id, service_id; + (void)daemon; + + if (!attrs[BP_GENL_A_DEST_NODE_ID] || !attrs[BP_GENL_A_DEST_SERVICE_ID]) { + log_error("handle_close_endpoint: missing attribute(s)"); + return -EINVAL; + } + node_id = nla_get_u32(attrs[BP_GENL_A_DEST_NODE_ID]); + service_id = nla_get_u32(attrs[BP_GENL_A_DEST_SERVICE_ID]); + + log_info("[ipn:%u.%u] CLOSE_ENDPOINT: closing endpoint", node_id, service_id); + int ret = ion_close_endpoint(node_id, service_id); + if (ret == 0) { + log_info("[ipn:%u.%u] CLOSE_ENDPOINT: endpoint closed successfully", node_id, service_id); + } else { + log_error("[ipn:%u.%u] CLOSE_ENDPOINT: failed to close endpoint (error %d)", node_id, + service_id, ret); + } + return ret; +} int handle_send_bundle(Daemon *daemon, struct nlattr **attrs) { + (void)daemon; void *payload; size_t payload_size; u_int32_t dest_node_id, dest_service_id, src_node_id, src_service_id; char dest_eid[64]; - int err = 0; int written; if (!attrs[BP_GENL_A_PAYLOAD] || !attrs[BP_GENL_A_DEST_NODE_ID] || @@ -33,7 +78,7 @@ int handle_send_bundle(Daemon *daemon, struct nlattr **attrs) { } payload = nla_data(attrs[BP_GENL_A_PAYLOAD]); - payload_size = nla_len(attrs[BP_GENL_A_PAYLOAD]); + payload_size = (size_t)nla_len(attrs[BP_GENL_A_PAYLOAD]); dest_node_id = nla_get_u32(attrs[BP_GENL_A_DEST_NODE_ID]); dest_service_id = nla_get_u32(attrs[BP_GENL_A_DEST_SERVICE_ID]); src_node_id = nla_get_u32(attrs[BP_GENL_A_SRC_NODE_ID]); @@ -46,22 +91,41 @@ int handle_send_bundle(Daemon *daemon, struct nlattr **attrs) { return -EINVAL; } - err = bp_send_to_eid(daemon->sdr, payload, payload_size, dest_eid); - if (err < 0) { - log_error("[ipn:%u.%u] handle_send_bundle: bp_send_to_eid failed with error %d", - dest_node_id, dest_service_id, err); - return err; + // Enqueue to send thread using source endpoint SAP + // Launch async send thread + pthread_t send_thread; + struct ion_send_args *send_args = malloc(sizeof(struct ion_send_args)); + if (!send_args) return -ENOMEM; + send_args->src_node_id = src_node_id; + send_args->src_service_id = src_service_id; + send_args->dest_eid = strndup(dest_eid, sizeof(dest_eid)); + send_args->payload = malloc(payload_size); + send_args->netlink_sock = daemon->genl_bp_sock; + send_args->netlink_family = daemon->genl_bp_family_id; + if (!send_args->dest_eid || !send_args->payload) { + free(send_args->dest_eid); + free(send_args->payload); + free(send_args); + return -ENOMEM; } - - log_info("[ipn:%u.%u] SEND_BUNDLE: bundle sent to EID %s, size %d (bytes)", src_node_id, - src_service_id, dest_eid, payload_size); + memcpy(send_args->payload, payload, payload_size); + send_args->payload_size = payload_size; + if (pthread_create(&send_thread, NULL, ion_send_thread, send_args) != 0) { + log_error("[ipn:%u.%u] handle_send_bundle: failed to create send thread", src_node_id, + src_service_id); + free(send_args->dest_eid); + free(send_args->payload); + free(send_args); + return -errno; + } + pthread_detach(send_thread); return 0; } int handle_request_bundle(Daemon *daemon, struct nlattr **attrs) { pthread_t thread; - struct thread_args *args; + struct ion_recv_args *args; u_int32_t node_id, service_id; if (!attrs[BP_GENL_A_DEST_SERVICE_ID] || !attrs[BP_GENL_A_DEST_NODE_ID]) { @@ -74,22 +138,21 @@ int handle_request_bundle(Daemon *daemon, struct nlattr **attrs) { node_id = nla_get_u32(attrs[BP_GENL_A_DEST_NODE_ID]); service_id = nla_get_u32(attrs[BP_GENL_A_DEST_SERVICE_ID]); - args = malloc(sizeof(struct thread_args)); + args = malloc(sizeof(struct ion_recv_args)); if (!args) { - log_error("[ipn:%u.%u] handle_send_bundle: failed to allocate thread args", args->node_id, - args->service_id); + log_error("handle_request_bundle: failed to allocate thread args", node_id, service_id); return -ENOMEM; } args->node_id = node_id; args->service_id = service_id; args->netlink_sock = daemon->genl_bp_sock; args->netlink_family = daemon->genl_bp_family_id; - args->sdr = daemon->sdr; log_info("[ipn:%u.%u] REQUEST_BUNDLE: bundle request initiated", node_id, service_id); - if (pthread_create(&thread, NULL, (void *(*)(void *))handle_recv_thread, args) != 0) { + if (pthread_create(&thread, NULL, ion_receive_thread, args) != 0) { log_error("[ipn:%u.%u] handle_request_bundle: failed to create receive thread: %s", node_id, service_id, strerror(errno)); + free(args); return -errno; } @@ -98,174 +161,8 @@ int handle_request_bundle(Daemon *daemon, struct nlattr **attrs) { return 0; } -void *handle_recv_thread(struct thread_args *args) { - int err; - struct reply_bundle reply; - - reply = bp_recv_once(args->sdr, args->node_id, - args->service_id); // Blocking invocation to receive a bundle - if (!reply.is_present) { - err = handle_cancel_bundle_request(args->netlink_family, args->netlink_sock, args->node_id, - args->service_id); - if (err < 0) { - log_error("[ipn:%u.%u] handle_cancel_bundle_request failed with error %d", - args->node_id, args->service_id, err); - goto out; - } - - log_info("[ipn:%u.%u] CANCEL_BUNDLE_REQUEST: bundle request cancelled", args->node_id, - args->service_id); - } else { - err = handle_deliver_bundle(args->netlink_family, args->netlink_sock, reply.payload, - reply.payload_size, reply.src_node_id, reply.src_service_id, - args->node_id, args->service_id); - if (err < 0) { - log_error("[ipn:%u.%u] handle_deliver_bundle: failed with error %d", args->node_id, - args->service_id, err); - goto out; - } - - log_info("[ipn:%u.%u] DELIVER_BUNDLE: bundle sent to kernel", args->node_id, - args->service_id); - } - -out: - if (reply.payload) free(reply.payload); - free(args); - return NULL; -} - -int handle_cancel_bundle_request(int netlink_family, struct nl_sock *netlink_sock, - u_int32_t node_id, u_int32_t service_id) { - struct nl_msg *msg = NULL; - void *hdr; - int ret; - - msg = nlmsg_alloc(); - if (!msg) { - log_error("[ipn:%u.%u] handle_cancel_bundle_request: failed to allocate Netlink msg", - node_id, service_id); - ret = -ENOMEM; - goto out; - } - - hdr = genlmsg_put(msg, NL_AUTO_PORT, NL_AUTO_SEQ, netlink_family, 0, 0, - BP_GENL_CMD_CANCEL_BUNDLE_REQUEST, BP_GENL_VERSION); - if (!hdr) { - log_error("[ipn:%u.%u] handle_cancel_bundle_request: failed to create Netlink header", - node_id, service_id); - ret = -EMSGSIZE; - goto err_free_msg; - } - - if (nla_put_u32(msg, BP_GENL_A_DEST_NODE_ID, node_id) < 0) { - log_error("[ipn:%u.%u] handle_cancel_bundle_request: failed to add NODE_ID attribute", - node_id, service_id); - ret = -EMSGSIZE; - goto err_free_msg; - } - - if (nla_put_u32(msg, BP_GENL_A_DEST_SERVICE_ID, service_id) < 0) { - log_error("[ipn:%u.%u] handle_cancel_bundle_request: failed to add SERVICE_ID attribute", - node_id, service_id); - ret = -EMSGSIZE; - goto err_free_msg; - } - - ret = nl_send_sync(netlink_sock, msg); - if (ret < 0) { - log_error("[ipn:%u.%u] handle_cancel_bundle_request: bundle request not cancelled", node_id, - service_id); - ret = -errno; - goto out; - } - - return 0; - -err_free_msg: - nlmsg_free(msg); -out: - return ret; -} - -int handle_deliver_bundle(int netlink_family, struct nl_sock *netlink_sock, void *payload, - int payload_size, u_int32_t src_node_id, u_int32_t src_service_id, - u_int32_t dest_node_id, u_int32_t dest_service_id) { - struct nl_msg *msg = NULL; - void *hdr; - int ret; - - msg = nlmsg_alloc(); - if (!msg) { - log_error("[ipn:%u.%u] handle_deliver_bundle: failed to allocate Netlink msg", dest_node_id, - dest_service_id); - ret = -ENOMEM; - goto out; - } - - hdr = genlmsg_put(msg, NL_AUTO_PORT, NL_AUTO_SEQ, netlink_family, 0, 0, - BP_GENL_CMD_DELIVER_BUNDLE, BP_GENL_VERSION); - if (!hdr) { - log_error("[ipn:%u.%u] handle_deliver_bundle: failed to create Netlink header", - dest_node_id, dest_service_id); - ret = -EMSGSIZE; - goto err_free_msg; - } - - if (nla_put_u32(msg, BP_GENL_A_DEST_NODE_ID, dest_node_id) < 0) { - log_error("[ipn:%u.%u] handle_deliver_bundle: failed to add NODE_ID attribute", - dest_node_id, dest_service_id); - ret = -EMSGSIZE; - goto err_free_msg; - } - - if (nla_put_u32(msg, BP_GENL_A_DEST_SERVICE_ID, dest_service_id) < 0) { - log_error("[ipn:%u.%u] handle_deliver_bundle: failed to add SERVICE_ID attribute", - dest_node_id, dest_service_id); - ret = -EMSGSIZE; - goto err_free_msg; - } - - if (nla_put_u32(msg, BP_GENL_A_SRC_NODE_ID, src_node_id) < 0) { - log_error("[ipn:%u.%u] handle_deliver_bundle: failed to add SRC_NODE_ID attribute", - src_node_id, src_service_id); - ret = -EMSGSIZE; - goto err_free_msg; - } - - if (nla_put_u32(msg, BP_GENL_A_SRC_SERVICE_ID, src_service_id) < 0) { - log_error("[ipn:%u.%u] handle_deliver_bundle: failed to add SRC_SERVICE_ID attribute", - src_node_id, src_service_id); - ret = -EMSGSIZE; - goto err_free_msg; - } - - if (nla_put(msg, BP_GENL_A_PAYLOAD, payload_size, payload) < 0) { - log_error("[ipn:%u.%u] handle_deliver_bundle: failed to add PAYLOAD attribute", - dest_node_id, dest_service_id); - ret = -EMSGSIZE; - goto err_free_msg; - } - - ret = nl_send_sync(netlink_sock, msg); - if (ret < 0) { - log_warn("[ipn:%u.%u] DELIVER_BUNDLE: bundle not delivered to kernel, keeping reference in " - "memory (no active BP socket " - "client)", - dest_node_id, dest_service_id); - ret = -ENODEV; - goto out; - } - - return 0; - -err_free_msg: - nlmsg_free(msg); -out: - return ret; -} - int handle_destroy_bundle(Daemon *daemon, struct nlattr **attrs) { + (void)daemon; u_int32_t node_id, service_id; Object adu; int ret = 0; @@ -281,16 +178,16 @@ int handle_destroy_bundle(Daemon *daemon, struct nlattr **attrs) { node_id = nla_get_u32(attrs[BP_GENL_A_DEST_NODE_ID]); service_id = nla_get_u32(attrs[BP_GENL_A_DEST_SERVICE_ID]); - adu = remove_adu_ref(daemon->sdr, node_id, service_id); + adu = adu_registry_remove(node_id, service_id); if (adu == 0) { log_error("[ipn:%u.%u] handle_destroy_bundle: failed to destroy bundle: %s", node_id, service_id, strerror(-ret)); goto out; } - ret = destroy_bundle(daemon->sdr, adu); + ret = ion_destroy_bundle(adu); if (ret < 0) { - log_error("[ipn:%u.%u] handle_destroy_bundle: destroy_bundle failed with error %d", node_id, - service_id, ret); + log_error("[ipn:%u.%u] handle_destroy_bundle: ion_destroy_bundle failed with error %d", + node_id, service_id, ret); goto out; } diff --git a/daemon/bp_genl_handlers.h b/daemon/bp_genl_handlers.h index e0b10a7..08d3d9b 100644 --- a/daemon/bp_genl_handlers.h +++ b/daemon/bp_genl_handlers.h @@ -9,16 +9,12 @@ struct thread_args { int netlink_family; u_int32_t node_id; u_int32_t service_id; - Sdr sdr; }; int handle_send_bundle(Daemon *daemon, struct nlattr **attrs); int handle_request_bundle(Daemon *daemon, struct nlattr **attrs); -int handle_cancel_bundle_request(int netlink_family, struct nl_sock *netlink_sock, - u_int32_t node_id, u_int32_t service_id); -int handle_deliver_bundle(int netlink_family, struct nl_sock *netlink_sock, void *payload, - int payload_size, u_int32_t src_node_id, u_int32_t src_service_id, - u_int32_t dest_node_id, u_int32_t dest_service_id); +int handle_open_endpoint(Daemon *daemon, struct nlattr **attrs); +int handle_close_endpoint(Daemon *daemon, struct nlattr **attrs); int handle_destroy_bundle(Daemon *daemon, struct nlattr **attrs); void *handle_recv_thread(struct thread_args *arg); diff --git a/daemon/daemon.c b/daemon/daemon.c index 01884e0..fe8cda1 100644 --- a/daemon/daemon.c +++ b/daemon/daemon.c @@ -1,24 +1,34 @@ #include "daemon.h" #include "bp_genl.h" +#include "ion.h" #include "log.h" #include #include #include #include -void on_sigint(evutil_socket_t fd, short what, void *arg) { +void on_sigint(evutil_socket_t fd, short event, void *arg) { + (void)fd; + (void)event; + struct event_base *base = arg; log_info("SIGINT received, exiting..."); event_base_loopexit(base, NULL); } -void on_sigpipe(evutil_socket_t fd, short what, void *arg) { +void on_sigpipe(evutil_socket_t fd, short event, void *arg) { + (void)fd; + (void)event; + struct event_base *base = arg; - log_info("SIGINT received, exiting..."); + log_info("SIGPIPE received, exiting..."); event_base_loopexit(base, NULL); } void on_netlink(evutil_socket_t fd, short event, void *arg) { + (void)fd; + (void)event; + Daemon *daemon = (Daemon *)arg; nl_recvmsgs_default( daemon->genl_bp_sock); // call the callback registered with genl_bp_sock_recvmsg_cb() @@ -29,10 +39,19 @@ int daemon_run(Daemon *self) { int ret; self->base = event_base_new(); + if (!self->base) { + log_error("Failed to create libevent base"); + return -ENOMEM; + } log_info("Using libevent version %s with %s behind the scenes", (char *)event_get_version(), (char *)event_base_get_method(self->base)); self->event_on_sigint = evsignal_new(self->base, SIGINT, on_sigint, self->base); + if (!self->event_on_sigint) { + log_error("Couldn't create SIGINT event"); + daemon_free(self); + return -ENOMEM; + } ret = event_add(self->event_on_sigint, NULL); if (ret < 0) { log_error("Couldn't add SIGINT event"); @@ -41,6 +60,11 @@ int daemon_run(Daemon *self) { } self->event_on_sigpipe = evsignal_new(self->base, SIGPIPE, on_sigpipe, self->base); + if (!self->event_on_sigpipe) { + log_error("Couldn't create SIGPIPE event"); + daemon_free(self); + return -ENOMEM; + } ret = event_add(self->event_on_sigpipe, NULL); if (ret < 0) { log_error("Couldn't add SIGPIPE event"); @@ -58,6 +82,11 @@ int daemon_run(Daemon *self) { fd = nl_socket_get_fd(self->genl_bp_sock); self->event_on_nl_sock = event_new(self->base, fd, EV_READ | EV_PERSIST, on_netlink, self); + if (!self->event_on_nl_sock) { + log_error("Couldn't create Netlink event"); + daemon_free(self); + return -ENOMEM; + } ret = event_add(self->event_on_nl_sock, NULL); if (ret < 0) { log_error("Couldn't add Netlink event"); @@ -79,7 +108,7 @@ int daemon_run(Daemon *self) { daemon_free(self); return -EAGAIN; } - self->sdr = bp_get_sdr(); + sdr = bp_get_sdr(); log_info("Successfully attached to ION"); log_info("Daemon started successfully"); diff --git a/daemon/daemon.h b/daemon/daemon.h index 1bc4048..f8162b9 100644 --- a/daemon/daemon.h +++ b/daemon/daemon.h @@ -7,10 +7,9 @@ typedef struct Daemon { struct nl_sock *genl_bp_sock; - char *genl_bp_family_name; + const char *genl_bp_family_name; int genl_bp_family_id; unsigned int nl_pid; - Sdr sdr; struct event_base *base; struct event *event_on_sigpipe; diff --git a/daemon/ion.c b/daemon/ion.c index 87bf013..4257076 100644 --- a/daemon/ion.c +++ b/daemon/ion.c @@ -1,11 +1,175 @@ #include "ion.h" -#include "adu_ref.h" +#include "adu_registry.h" #include "log.h" +#include "nl_utils.h" +#include "sap_registry.h" #include "sdr.h" #include +#include #include +#include +#include +#include static pthread_mutex_t sdrmutex = PTHREAD_MUTEX_INITIALIZER; +Sdr sdr; + +static int make_eid(char *buf, size_t bufsize, u_int32_t node_id, u_int32_t service_id) { + int n = snprintf(buf, bufsize, "ipn:%u.%u", node_id, service_id); + if (n < 0 || n >= (int)bufsize) return -1; + return 0; +} + +int ion_open_endpoint(u_int32_t node_id, u_int32_t service_id) { + char eid[64]; + BpSAP sap; + + if (make_eid(eid, sizeof(eid), node_id, service_id) < 0) { + log_error("ion_open_endpoint: EID too long"); + return -EINVAL; + } + + if (sap_registry_contains(node_id, service_id)) { + return 0; // already open + } + + if (bp_open(eid, &sap) < 0) { + log_error("ion_open_endpoint: bp_open failed for %s", eid); + return -EIO; + } + + if (sap_registry_add(node_id, service_id, sap) < 0) { + bp_close(sap); + return -ENOMEM; + } + return 0; +} + +int ion_close_endpoint(u_int32_t node_id, u_int32_t service_id) { + BpSAP sap = sap_registry_get(node_id, service_id); + if (sap) { + if (sap_registry_has_active_receive(node_id, service_id)) { + log_info("[ipn:%u.%u] CLOSE_ENDPOINT: interrupting active reception", node_id, + service_id); + bp_interrupt(sap); + } else { + log_info("[ipn:%u.%u] CLOSE_ENDPOINT: no active reception, closing directly", node_id, + service_id); + } + + bp_close(sap); + } + sap_registry_remove(node_id, service_id); + return 0; +} + +void *ion_send_thread(void *arg) { + struct ion_send_args *args = arg; + const char *dest_eid = args->dest_eid; + const void *payload = args->payload; + size_t payload_size = args->payload_size; + u_int32_t src_node_id = args->src_node_id; + u_int32_t src_service_id = args->src_service_id; + BpSAP sap = NULL; + Object sdr_buffer = 0; + Object adu = 0; + int ret = 0; + + if (!dest_eid || !payload || payload_size == 0) { + log_error("ion_send_thread: invalid parameters"); + ret = -EINVAL; + goto cleanup_and_notify; + } + + sap = sap_registry_get(src_node_id, src_service_id); + if (!sap) { + log_error("ion_send_thread: no SAP for ipn:%u.%u", src_node_id, src_service_id); + ret = -ENODEV; + goto cleanup_and_notify; + } + + if (pthread_mutex_lock(&sdrmutex) != 0) { + log_error("ion_send_thread: sdr mutex lock failed"); + ret = -EAGAIN; + goto cleanup_and_notify; + } + + if (sdr_begin_xn(sdr) == 0) { + pthread_mutex_unlock(&sdrmutex); + log_error("ion_send_thread: sdr_begin_xn failed"); + ret = -EIO; + goto cleanup_and_notify; + } + + sdr_buffer = sdr_malloc(sdr, payload_size); + if (sdr_buffer == 0) { + pthread_mutex_unlock(&sdrmutex); + log_error("ion_send_thread: no space for payload"); + ret = -ENOSPC; + goto cleanup_and_notify; + } + + sdr_write(sdr, sdr_buffer, (char *)payload, payload_size); + + adu = zco_create(sdr, ZcoSdrSource, sdr_buffer, 0, (vast)payload_size, ZcoOutbound); + if (adu <= 0) { + pthread_mutex_unlock(&sdrmutex); + log_error("ion_send_thread: zco_create failed"); + ret = -ENOMEM; + goto cleanup_and_notify; + } + + if (sdr_end_xn(sdr) < 0) { + pthread_mutex_unlock(&sdrmutex); + log_error("ion_send_thread: sdr_end_xn failed"); + ret = -EIO; + goto cleanup_and_notify; + } + + pthread_mutex_unlock(&sdrmutex); + + if (bp_send(sap, (char *)dest_eid, NULL, 86400, BP_STD_PRIORITY, NoCustodyRequested, 0, 0, NULL, + adu, NULL) <= 0) { + log_error("ion_send_thread: bp_send failed"); + ret = -EIO; + goto cleanup_and_notify; + } + + log_info("[ipn:%u.%u] SEND_BUNDLE: bundle sent to EID %s, size %zu (bytes)", args->src_node_id, + args->src_service_id, args->dest_eid, args->payload_size); + + if (nl_send_bundle_confirmation(args->netlink_family, args->netlink_sock, args->src_node_id, + args->src_service_id) < 0) { + log_error("[ipn:%u.%u] SEND_BUNDLE: failed to send confirmation to kernel", + args->src_node_id, args->src_service_id); + } else { + log_info("[ipn:%u.%u] SEND_BUNDLE: confirmation sent to kernel", args->src_node_id, + args->src_service_id); + } + + free(args->dest_eid); + free(args->payload); + free(args); + return (void *)(intptr_t)0; + +cleanup_and_notify: { + int nl_ret = nl_send_bundle_failure(args->netlink_family, args->netlink_sock, args->src_node_id, + args->src_service_id, ret); + if (nl_ret < 0) { + log_error( + "[ipn:%u.%u] SEND_BUNDLE: failed to send failure notification to kernel (err: %d)", + args->src_node_id, args->src_service_id, nl_ret); + } else { + log_info("[ipn:%u.%u] SEND_BUNDLE: failure notification sent to kernel", args->src_node_id, + args->src_service_id); + } +} + + free(args->dest_eid); + free(args->payload); + free(args); + return (void *)(intptr_t)ret; +} const char *bp_result_text(BpIndResult result) { switch (result) { @@ -22,14 +186,14 @@ const char *bp_result_text(BpIndResult result) { } } -int destroy_bundle(Sdr sdr, Object adu) { +int ion_destroy_bundle(Object adu) { if (pthread_mutex_lock(&sdrmutex) != 0) { - log_error("destroy_bundle: Failed to lock SDR mutex."); + log_error("ion_destroy_bundle: Failed to lock SDR mutex."); return -EAGAIN; } if (sdr_begin_xn(sdr) == 0) { - log_error("destroy_bundle: sdr_begin_xn failed."); + log_error("ion_destroy_bundle: sdr_begin_xn failed."); pthread_mutex_unlock(&sdrmutex); return -EIO; } @@ -41,182 +205,190 @@ int destroy_bundle(Sdr sdr, Object adu) { return 0; } -int bp_send_to_eid(Sdr sdr, void *payload, size_t payload_size, char *dest_eid) { - Object sdr_buffer = 0; - Object adu; - int ret = 0; - - if (sdr_begin_xn(sdr) == 0) { - log_error("bp_send_to_eid: sdr_begin_xn failed."); - return -EIO; - } - - sdr_buffer = sdr_malloc(sdr, payload_size); - if (sdr_buffer == 0) { - log_error("sdr_malloc failed."); - ret = -ENOMEM; - goto out; - } - - sdr_write(sdr, sdr_buffer, payload, payload_size); - - adu = zco_create(sdr, ZcoSdrSource, sdr_buffer, 0, payload_size, ZcoOutbound); - if (adu <= 0) { - log_error("zco_create failed."); - sdr_free(sdr, sdr_buffer); - ret = -ENOMEM; - goto out; - } - - if (bp_send(NULL, dest_eid, NULL, 86400, BP_STD_PRIORITY, 0, 0, 0, NULL, adu, NULL) <= 0) { - log_error("bp_send failed."); - sdr_free(sdr, sdr_buffer); - ret = -EIO; - goto out; - } +void *ion_receive_thread(void *arg) { + struct ion_recv_args *args = arg; + const u_int32_t dest_node_id = args->node_id; + const u_int32_t dest_service_id = args->service_id; -out: - sdr_end_xn(sdr); - return ret; -} + sap_registry_mark_receive_active(dest_node_id, dest_service_id); -struct reply_bundle bp_recv_once(Sdr sdr, u_int32_t dest_node_id, u_int32_t dest_service_id) { BpSAP sap; BpDelivery dlv; ZcoReader reader; - struct adu_reference *adu_ref; + adu_node_t *adu_ref; u_int32_t own_node_id; void *payload = NULL; - size_t payload_size; - int eid_size; - char eid[64]; - struct reply_bundle reply = {0}; - u_int32_t src_node_id, src_service_id; + size_t payload_size = 0; + u_int32_t src_node_id = 0, src_service_id = 0; + int err; - reply.is_present = false; - reply.payload = NULL; - - own_node_id = getOwnNodeNbr(); + { + uvast own = getOwnNodeNbr(); + if (own > (uvast)0xFFFFFFFFu) { + log_error("ion_receive_thread: own node ID out of 32-bit range: %llu", + (unsigned long long)own); + goto cancel; + } + own_node_id = (u_int32_t)own; + } if (dest_node_id != own_node_id) { - log_error("bp_recv_once: node ID mismatch. Expected %u, got %u", own_node_id, dest_node_id); - goto out; + log_error("ion_receive_thread: node ID mismatch. Expected %u, got %u", own_node_id, + dest_node_id); + goto cancel; } - adu_ref = find_adu_ref(sdr, dest_node_id, dest_service_id); + adu_ref = adu_registry_get(dest_node_id, dest_service_id); if (adu_ref != NULL) { - payload_size = zco_source_data_length(sdr, adu_ref->adu); + if (pthread_mutex_lock(&sdrmutex) != 0) { + log_error("ion_receive_thread: Failed to lock SDR mutex."); + goto cancel; + } + if (sdr_begin_xn(sdr) == 0) { + log_error("ion_receive_thread: sdr_begin_xn failed."); + pthread_mutex_unlock(&sdrmutex); + goto cancel; + } + payload_size = (size_t)zco_source_data_length(sdr, adu_ref->adu); payload = malloc(payload_size); if (!payload) { - log_error("bp_recv_once: Failed to allocate memory for payload."); - goto out; + log_error("ion_receive_thread: Failed to allocate memory for payload."); + sdr_end_xn(sdr); + pthread_mutex_unlock(&sdrmutex); + goto cancel; } zco_start_receiving(adu_ref->adu, &reader); - if (zco_receive_source(sdr, &reader, payload_size, payload) < 0) { - log_error("bp_recv_once: zco_receive_source failed."); - goto out; + if (zco_receive_source(sdr, &reader, (vast)payload_size, payload) < 0) { + log_error("ion_receive_thread: zco_receive_source failed."); + free(payload); + payload = NULL; + sdr_end_xn(sdr); + pthread_mutex_unlock(&sdrmutex); + goto cancel; } - src_node_id = adu_ref->src_node_id; src_service_id = adu_ref->src_service_id; + sdr_end_xn(sdr); + pthread_mutex_unlock(&sdrmutex); } else { - eid_size = snprintf(eid, sizeof(eid), "ipn:%u.%u", dest_node_id, dest_service_id); - if (eid_size < 0 || eid_size >= (int)sizeof(eid)) { - log_error("bp_recv_once: failed to construct EID string."); - goto out; + sap = sap_registry_get(dest_node_id, dest_service_id); + if (!sap) { + log_error("ion_receive_thread: no SAP for ipn:%u.%u", dest_node_id, dest_service_id); + goto cancel; } - if (bp_open(eid, &sap) < 0) { - log_error("bp_recv_once: failed to open BpSAP (node_id=%u service_id=%u)", dest_node_id, - dest_service_id); - goto out; - } + while (1) { + if (bp_receive(sap, &dlv, BP_BLOCKING) < 0) { + log_error("ion_receive_thread: bundle reception failed."); + goto cancel; + } - if (bp_receive(sap, &dlv, BP_BLOCKING) < 0) { - log_error("bp_recv_once: bundle reception failed."); - bp_close(sap); - goto out; - } + if (dlv.result == BpReceptionInterrupted) { + if (!sap_registry_contains(dest_node_id, dest_service_id)) { + log_info("ion_receive_thread: endpoint closing, stopping"); + bp_release_delivery(&dlv, 0); + goto cancel; + } + log_info("ion_receive_thread: reception interrupted, continuing to wait"); + bp_release_delivery(&dlv, 0); + continue; + } - if (dlv.result != BpPayloadPresent || dlv.adu == 0) { - log_error("bp_recv_once: %s", bp_result_text(dlv.result)); - bp_release_delivery(&dlv, 0); - bp_close(sap); - goto out; - } + if (dlv.adu == 0) { + log_info("ion_receive_thread: no ADU, continuing to wait"); + bp_release_delivery(&dlv, 0); + continue; + } + + if (dlv.result == BpEndpointStopped) { + log_info("ion_receive_thread: endpoint stopped"); + bp_release_delivery(&dlv, 0); + goto cancel; + } + break; + } if (sscanf(dlv.bundleSourceEid, "ipn:%u.%u", &src_node_id, &src_service_id) != 2) { - log_error("bp_recv_once: failed to parse bundleSourceEid: %s", dlv.bundleSourceEid); + log_error("ion_receive_thread: failed to parse bundleSourceEid: %s", + dlv.bundleSourceEid); bp_release_delivery(&dlv, 0); - bp_close(sap); - goto out; + goto cancel; } - - if (add_adu(sdr, dlv.adu, dest_node_id, dest_service_id, src_node_id, src_service_id) < 0) { - log_error("bp_recv_once: failed to add bundle reference."); + if (adu_registry_add(dlv.adu, dest_node_id, dest_service_id, src_node_id, src_service_id) < + 0) { + log_error("ion_receive_thread: failed to add bundle reference."); bp_release_delivery(&dlv, 0); - bp_close(sap); - goto out; + goto cancel; } - if (pthread_mutex_lock(&sdrmutex) != 0) { - log_error("bp_recv_once: Failed to lock SDR mutex."); + log_error("ion_receive_thread: Failed to lock SDR mutex."); bp_release_delivery(&dlv, 0); - bp_close(sap); - goto out; + goto cancel; } - if (sdr_begin_xn(sdr) == 0) { - log_error("bp_recv_once: sdr_begin_xn failed."); + log_error("ion_receive_thread: sdr_begin_xn failed."); pthread_mutex_unlock(&sdrmutex); bp_release_delivery(&dlv, 0); - bp_close(sap); - goto out; + goto cancel; } - - payload_size = zco_source_data_length(sdr, dlv.adu); + payload_size = (size_t)zco_source_data_length(sdr, dlv.adu); payload = malloc(payload_size); if (!payload) { - log_error("bp_recv_once: Failed to allocate memory for payload."); + log_error("ion_receive_thread: Failed to allocate memory for payload."); sdr_end_xn(sdr); pthread_mutex_unlock(&sdrmutex); bp_release_delivery(&dlv, 0); - bp_close(sap); - goto out; + goto cancel; } - zco_start_receiving(dlv.adu, &reader); - if (zco_receive_source(sdr, &reader, payload_size, payload) < 0) { - log_error("bp_recv_once: zco_receive_source failed."); + if (zco_receive_source(sdr, &reader, (vast)payload_size, payload) < 0) { + log_error("ion_receive_thread: zco_receive_source failed."); free(payload); payload = NULL; sdr_end_xn(sdr); pthread_mutex_unlock(&sdrmutex); bp_release_delivery(&dlv, 0); - bp_close(sap); - goto out; + goto cancel; } - sdr_end_xn(sdr); pthread_mutex_unlock(&sdrmutex); bp_release_delivery(&dlv, 0); - bp_close(sap); } - if (payload == NULL) { - log_info("bp_recv_once: no payload received for node_id=%u service_id=%u", dest_node_id, + if (!payload) { + log_info("ion_receive_thread: no payload received for node_id=%u service_id=%u", + dest_node_id, dest_service_id); + goto cancel; + } + + err = nl_send_deliver_bundle(args->netlink_family, args->netlink_sock, payload, payload_size, + src_node_id, src_service_id, dest_node_id, dest_service_id); + if (err < 0) { + log_error("[ipn:%u.%u] nl_send_deliver_bundle: failed with error %d", dest_node_id, + dest_service_id, err); + } else { + log_info("[ipn:%u.%u] DELIVER_BUNDLE: bundle sent to kernel", dest_node_id, dest_service_id); - goto out; } + sap_registry_mark_receive_inactive(dest_node_id, dest_service_id); + + free(payload); + free(args); + return NULL; - reply.is_present = true; - reply.payload = payload; - reply.payload_size = payload_size; - reply.src_node_id = src_node_id; - reply.src_service_id = src_service_id; +cancel: + // Mark thread as inactive before cleanup + sap_registry_mark_receive_inactive(dest_node_id, dest_service_id); -out: - if (payload && !reply.is_present) { - free(payload); + err = nl_send_cancel_bundle_request(args->netlink_family, args->netlink_sock, dest_node_id, + dest_service_id); + if (err < 0) { + log_error("[ipn:%u.%u] nl_send_cancel_bundle_request failed with error %d", dest_node_id, + dest_service_id, err); + } else { + log_info("[ipn:%u.%u] CANCEL_BUNDLE_REQUEST: bundle request cancelled", dest_node_id, + dest_service_id); } - return reply; + if (payload) free(payload); + free(args); + return NULL; } diff --git a/daemon/ion.h b/daemon/ion.h index f3ee93d..9c17aa0 100644 --- a/daemon/ion.h +++ b/daemon/ion.h @@ -4,16 +4,29 @@ #include "bp.h" #include -struct reply_bundle { - bool is_present; - void *payload; - size_t payload_size; +extern Sdr sdr; + +struct ion_recv_args { + struct nl_sock *netlink_sock; + int netlink_family; + u_int32_t node_id; + u_int32_t service_id; +}; + +struct ion_send_args { + struct nl_sock *netlink_sock; + int netlink_family; u_int32_t src_node_id; u_int32_t src_service_id; + char *dest_eid; + void *payload; + size_t payload_size; }; -int destroy_bundle(Sdr sdr, Object adu); -int bp_send_to_eid(Sdr sdr, void *payload, size_t payload_size, char *dest_eid); -struct reply_bundle bp_recv_once(Sdr sdr, u_int32_t dest_node_id, u_int32_t dest_service_id); +int ion_open_endpoint(u_int32_t node_id, u_int32_t service_id); +int ion_close_endpoint(u_int32_t node_id, u_int32_t service_id); +int ion_destroy_bundle(Object adu); +void *ion_receive_thread(void *arg); +void *ion_send_thread(void *arg); #endif \ No newline at end of file diff --git a/daemon/main.c b/daemon/main.c index 2c8e123..76c9656 100644 --- a/daemon/main.c +++ b/daemon/main.c @@ -7,6 +7,9 @@ #define NL_PID 8443 int main(int argc, char *argv[]) { + (void)argc; + (void)argv; + if (geteuid() != 0) { log_error("This program must be run as root or with CAP_NET_ADMIN (required by " "GENL_ADMIN_PERM)."); diff --git a/daemon/nl_utils.c b/daemon/nl_utils.c new file mode 100644 index 0000000..d9fc0fc --- /dev/null +++ b/daemon/nl_utils.c @@ -0,0 +1,258 @@ +#include "nl_utils.h" +#include +#include +#include +#include +#include +#include + +#include "../include/bp_socket.h" +#include "log.h" + +int nl_send_cancel_bundle_request(int netlink_family, struct nl_sock *netlink_sock, + uint32_t node_id, uint32_t service_id) { + struct nl_msg *msg = NULL; + void *hdr; + int ret; + + msg = nlmsg_alloc(); + if (!msg) { + log_error("[ipn:%u.%u] nl_send_cancel_bundle_request: failed to allocate Netlink msg", + node_id, service_id); + ret = -ENOMEM; + goto out; + } + + hdr = genlmsg_put(msg, NL_AUTO_PORT, NL_AUTO_SEQ, netlink_family, 0, 0, + BP_GENL_CMD_CANCEL_BUNDLE_REQUEST, BP_GENL_VERSION); + if (!hdr) { + log_error("[ipn:%u.%u] nl_send_cancel_bundle_request: failed to create Netlink header", + node_id, service_id); + ret = -EMSGSIZE; + goto err_free_msg; + } + + if (nla_put_u32(msg, BP_GENL_A_DEST_NODE_ID, node_id) < 0) { + log_error("[ipn:%u.%u] nl_send_cancel_bundle_request: failed to add NODE_ID attribute", + node_id, service_id); + ret = -EMSGSIZE; + goto err_free_msg; + } + + if (nla_put_u32(msg, BP_GENL_A_DEST_SERVICE_ID, service_id) < 0) { + log_error("[ipn:%u.%u] nl_send_cancel_bundle_request: failed to add SERVICE_ID attribute", + node_id, service_id); + ret = -EMSGSIZE; + goto err_free_msg; + } + + ret = nl_send_sync(netlink_sock, msg); + if (ret < 0) { + log_error("[ipn:%u.%u] nl_send_cancel_bundle_request: bundle request not cancelled", + node_id, service_id); + ret = -errno; + goto out; + } + + return 0; + +err_free_msg: + nlmsg_free(msg); +out: + return ret; +} + +int nl_send_deliver_bundle(int netlink_family, struct nl_sock *netlink_sock, void *payload, + size_t payload_size, uint32_t src_node_id, uint32_t src_service_id, + uint32_t dest_node_id, uint32_t dest_service_id) { + if (payload_size > (size_t)INT_MAX) { + log_error("[ipn:%u.%u] nl_send_deliver_bundle: payload too large", dest_node_id, + dest_service_id); + return -EMSGSIZE; + } + struct nl_msg *msg = NULL; + void *hdr; + int ret; + + msg = nlmsg_alloc(); + if (!msg) { + log_error("[ipn:%u.%u] nl_send_deliver_bundle: failed to allocate Netlink msg", + dest_node_id, dest_service_id); + ret = -ENOMEM; + goto out; + } + + hdr = genlmsg_put(msg, NL_AUTO_PORT, NL_AUTO_SEQ, netlink_family, 0, 0, + BP_GENL_CMD_DELIVER_BUNDLE, BP_GENL_VERSION); + if (!hdr) { + log_error("[ipn:%u.%u] nl_send_deliver_bundle: failed to create Netlink header", + dest_node_id, dest_service_id); + ret = -EMSGSIZE; + goto err_free_msg; + } + + if (nla_put_u32(msg, BP_GENL_A_DEST_NODE_ID, dest_node_id) < 0) { + log_error("[ipn:%u.%u] nl_send_deliver_bundle: failed to add NODE_ID attribute", + dest_node_id, dest_service_id); + ret = -EMSGSIZE; + goto err_free_msg; + } + + if (nla_put_u32(msg, BP_GENL_A_DEST_SERVICE_ID, dest_service_id) < 0) { + log_error("[ipn:%u.%u] nl_send_deliver_bundle: failed to add SERVICE_ID attribute", + dest_node_id, dest_service_id); + ret = -EMSGSIZE; + goto err_free_msg; + } + + if (nla_put_u32(msg, BP_GENL_A_SRC_NODE_ID, src_node_id) < 0) { + log_error("[ipn:%u.%u] nl_send_deliver_bundle: failed to add SRC_NODE_ID attribute", + src_node_id, src_service_id); + ret = -EMSGSIZE; + goto err_free_msg; + } + + if (nla_put_u32(msg, BP_GENL_A_SRC_SERVICE_ID, src_service_id) < 0) { + log_error("[ipn:%u.%u] nl_send_deliver_bundle: failed to add SRC_SERVICE_ID attribute", + src_node_id, src_service_id); + ret = -EMSGSIZE; + goto err_free_msg; + } + + if (nla_put(msg, BP_GENL_A_PAYLOAD, (int)payload_size, payload) < 0) { + log_error("[ipn:%u.%u] nl_send_deliver_bundle: failed to add PAYLOAD attribute", + dest_node_id, dest_service_id); + ret = -EMSGSIZE; + goto err_free_msg; + } + + ret = nl_send_sync(netlink_sock, msg); + if (ret < 0) { + log_warn("[ipn:%u.%u] DELIVER_BUNDLE: bundle not delivered to kernel, keeping reference in " + "memory (no active BP socket " + "client)", + dest_node_id, dest_service_id); + ret = -ENODEV; + goto out; + } + + return 0; + +err_free_msg: + nlmsg_free(msg); +out: + return ret; +} + +int nl_send_bundle_confirmation(int netlink_family, struct nl_sock *netlink_sock, + uint32_t src_node_id, uint32_t src_service_id) { + struct nl_msg *msg = NULL; + void *hdr; + int ret; + + msg = nlmsg_alloc(); + if (!msg) { + log_error("[ipn:%u.%u] nl_send_bundle_confirmation: failed to allocate Netlink msg", + src_node_id, src_service_id); + ret = -ENOMEM; + goto out; + } + + hdr = genlmsg_put(msg, NL_AUTO_PORT, NL_AUTO_SEQ, netlink_family, 0, 0, + BP_GENL_CMD_SEND_BUNDLE_CONFIRMATION, BP_GENL_VERSION); + if (!hdr) { + log_error("[ipn:%u.%u] nl_send_bundle_confirmation: failed to create Netlink header", + src_node_id, src_service_id); + ret = -EMSGSIZE; + goto err_free_msg; + } + + if (nla_put_u32(msg, BP_GENL_A_SRC_NODE_ID, src_node_id) < 0) { + log_error("[ipn:%u.%u] nl_send_bundle_confirmation: failed to add SRC_NODE_ID attribute", + src_node_id, src_service_id); + ret = -EMSGSIZE; + goto err_free_msg; + } + + if (nla_put_u32(msg, BP_GENL_A_SRC_SERVICE_ID, src_service_id) < 0) { + log_error("[ipn:%u.%u] nl_send_bundle_confirmation: failed to add SRC_SERVICE_ID attribute", + src_node_id, src_service_id); + ret = -EMSGSIZE; + goto err_free_msg; + } + + ret = nl_send_sync(netlink_sock, msg); + if (ret < 0) { + log_error("[ipn:%u.%u] nl_send_bundle_confirmation: confirmation not sent", src_node_id, + src_service_id); + ret = -errno; + goto out; + } + + return 0; + +err_free_msg: + nlmsg_free(msg); +out: + return ret; +} + +int nl_send_bundle_failure(int netlink_family, struct nl_sock *netlink_sock, uint32_t src_node_id, + uint32_t src_service_id, int error_code) { + struct nl_msg *msg = NULL; + void *hdr; + int ret; + + msg = nlmsg_alloc(); + if (!msg) { + log_error("[ipn:%u.%u] nl_send_bundle_failure: failed to allocate Netlink msg", src_node_id, + src_service_id); + ret = -ENOMEM; + goto out; + } + + hdr = genlmsg_put(msg, NL_AUTO_PORT, NL_AUTO_SEQ, netlink_family, 0, 0, + BP_GENL_CMD_SEND_BUNDLE_FAILURE, BP_GENL_VERSION); + if (!hdr) { + log_error("[ipn:%u.%u] nl_send_bundle_failure: failed to create Netlink header", + src_node_id, src_service_id); + ret = -EMSGSIZE; + goto err_free_msg; + } + + if (nla_put_u32(msg, BP_GENL_A_SRC_NODE_ID, src_node_id) < 0) { + log_error("[ipn:%u.%u] nl_send_bundle_failure: failed to add SRC_NODE_ID attribute", + src_node_id, src_service_id); + ret = -EMSGSIZE; + goto err_free_msg; + } + + if (nla_put_u32(msg, BP_GENL_A_SRC_SERVICE_ID, src_service_id) < 0) { + log_error("[ipn:%u.%u] nl_send_bundle_failure: failed to add SRC_SERVICE_ID attribute", + src_node_id, src_service_id); + ret = -EMSGSIZE; + goto err_free_msg; + } + + if (nla_put_u32(msg, BP_GENL_A_ERROR_CODE, (uint32_t)error_code) < 0) { + log_error("[ipn:%u.%u] nl_send_bundle_failure: failed to add ERROR_CODE attribute", + src_node_id, src_service_id); + ret = -EMSGSIZE; + goto err_free_msg; + } + + ret = nl_send_sync(netlink_sock, msg); + if (ret < 0) { + log_error("[ipn:%u.%u] nl_send_bundle_failure: failure notification not sent", src_node_id, + src_service_id); + ret = -errno; + goto out; + } + + return 0; + +err_free_msg: + nlmsg_free(msg); +out: + return ret; +} diff --git a/daemon/nl_utils.h b/daemon/nl_utils.h new file mode 100644 index 0000000..f778a1e --- /dev/null +++ b/daemon/nl_utils.h @@ -0,0 +1,18 @@ +#ifndef NL_UTILS_H +#define NL_UTILS_H + +#include +#include +#include + +int nl_send_cancel_bundle_request(int netlink_family, struct nl_sock *netlink_sock, + uint32_t node_id, uint32_t service_id); +int nl_send_deliver_bundle(int netlink_family, struct nl_sock *netlink_sock, void *payload, + size_t payload_size, uint32_t src_node_id, uint32_t src_service_id, + uint32_t dest_node_id, uint32_t dest_service_id); +int nl_send_bundle_confirmation(int netlink_family, struct nl_sock *netlink_sock, + uint32_t src_node_id, uint32_t src_service_id); +int nl_send_bundle_failure(int netlink_family, struct nl_sock *netlink_sock, uint32_t src_node_id, + uint32_t src_service_id, int error_code); + +#endif diff --git a/daemon/sap_registry.c b/daemon/sap_registry.c new file mode 100644 index 0000000..30278fa --- /dev/null +++ b/daemon/sap_registry.c @@ -0,0 +1,104 @@ +#include "sap_registry.h" +#include +#include + +static pthread_mutex_t sap_mutex = PTHREAD_MUTEX_INITIALIZER; +static sap_node_t *sap_head = NULL; + +static sap_node_t *find_node(uint32_t node_id, uint32_t service_id, sap_node_t **prev) { + sap_node_t *p = sap_head; + sap_node_t *q = NULL; + while (p) { + if (p->key.node_id == node_id && p->key.service_id == service_id) { + if (prev) *prev = q; + return p; + } + q = p; + p = p->next; + } + if (prev) *prev = NULL; + return NULL; +} + +bool sap_registry_contains(uint32_t node_id, uint32_t service_id) { + pthread_mutex_lock(&sap_mutex); + bool exists = find_node(node_id, service_id, NULL) != NULL; + pthread_mutex_unlock(&sap_mutex); + return exists; +} + +int sap_registry_add(uint32_t node_id, uint32_t service_id, BpSAP sap) { + int ret = 0; + sap_node_t *node; + pthread_mutex_lock(&sap_mutex); + if (find_node(node_id, service_id, NULL)) { + pthread_mutex_unlock(&sap_mutex); + return 0; + } + pthread_mutex_unlock(&sap_mutex); + + node = (sap_node_t *)calloc(1, sizeof(sap_node_t)); + if (!node) return -ENOMEM; + node->key.node_id = node_id; + node->key.service_id = service_id; + node->sap = sap; + node->has_active_receive = false; + + pthread_mutex_lock(&sap_mutex); + node->next = sap_head; + sap_head = node; + pthread_mutex_unlock(&sap_mutex); + return ret; +} + +BpSAP sap_registry_get(uint32_t node_id, uint32_t service_id) { + pthread_mutex_lock(&sap_mutex); + sap_node_t *node = find_node(node_id, service_id, NULL); + BpSAP sap = node ? node->sap : NULL; + pthread_mutex_unlock(&sap_mutex); + return sap; +} + +int sap_registry_remove(uint32_t node_id, uint32_t service_id) { + pthread_mutex_lock(&sap_mutex); + sap_node_t *prev = NULL; + sap_node_t *node = find_node(node_id, service_id, &prev); + if (!node) { + pthread_mutex_unlock(&sap_mutex); + return 0; + } + if (prev) + prev->next = node->next; + else + sap_head = node->next; + pthread_mutex_unlock(&sap_mutex); + free(node); + return 0; +} + +// Helper functions for receive thread tracking +void sap_registry_mark_receive_active(uint32_t node_id, uint32_t service_id) { + pthread_mutex_lock(&sap_mutex); + sap_node_t *node = find_node(node_id, service_id, NULL); + if (node) { + node->has_active_receive = true; + } + pthread_mutex_unlock(&sap_mutex); +} + +void sap_registry_mark_receive_inactive(uint32_t node_id, uint32_t service_id) { + pthread_mutex_lock(&sap_mutex); + sap_node_t *node = find_node(node_id, service_id, NULL); + if (node) { + node->has_active_receive = false; + } + pthread_mutex_unlock(&sap_mutex); +} + +bool sap_registry_has_active_receive(uint32_t node_id, uint32_t service_id) { + pthread_mutex_lock(&sap_mutex); + sap_node_t *node = find_node(node_id, service_id, NULL); + bool has_active = node ? node->has_active_receive : false; + pthread_mutex_unlock(&sap_mutex); + return has_active; +} diff --git a/daemon/sap_registry.h b/daemon/sap_registry.h new file mode 100644 index 0000000..b7f001d --- /dev/null +++ b/daemon/sap_registry.h @@ -0,0 +1,30 @@ +#ifndef SAP_REGISTRY_H +#define SAP_REGISTRY_H + +#include "bp.h" +#include +#include + +typedef struct sap_key { + uint32_t node_id; + uint32_t service_id; +} sap_key_t; + +typedef struct sap_node { + sap_key_t key; + BpSAP sap; + bool has_active_receive; + struct sap_node *next; +} sap_node_t; + +bool sap_registry_contains(uint32_t node_id, uint32_t service_id); +int sap_registry_add(uint32_t node_id, uint32_t service_id, BpSAP sap); +BpSAP sap_registry_get(uint32_t node_id, uint32_t service_id); +int sap_registry_remove(uint32_t node_id, uint32_t service_id); + +// Helper functions for receive thread tracking +void sap_registry_mark_receive_active(uint32_t node_id, uint32_t service_id); +void sap_registry_mark_receive_inactive(uint32_t node_id, uint32_t service_id); +bool sap_registry_has_active_receive(uint32_t node_id, uint32_t service_id); + +#endif diff --git a/include/bp_socket.h b/include/bp_socket.h index c962b9f..51368f4 100644 --- a/include/bp_socket.h +++ b/include/bp_socket.h @@ -22,6 +22,7 @@ enum bp_genl_attrs { BP_GENL_A_DEST_NODE_ID, BP_GENL_A_DEST_SERVICE_ID, BP_GENL_A_PAYLOAD, + BP_GENL_A_ERROR_CODE, __BP_GENL_A_MAX, }; @@ -31,10 +32,14 @@ enum bp_genl_attrs { enum bp_genl_cmds { BP_GENL_CMD_UNSPEC, BP_GENL_CMD_SEND_BUNDLE, + BP_GENL_CMD_SEND_BUNDLE_CONFIRMATION, + BP_GENL_CMD_SEND_BUNDLE_FAILURE, BP_GENL_CMD_REQUEST_BUNDLE, BP_GENL_CMD_CANCEL_BUNDLE_REQUEST, BP_GENL_CMD_DELIVER_BUNDLE, BP_GENL_CMD_DESTROY_BUNDLE, + BP_GENL_CMD_OPEN_ENDPOINT, + BP_GENL_CMD_CLOSE_ENDPOINT, __BP_GENL_CMD_MAX, };