diff --git a/bp_socket/af_bp.c b/bp_socket/af_bp.c index 794fc7e..9bf9d7b 100644 --- a/bp_socket/af_bp.c +++ b/bp_socket/af_bp.c @@ -28,8 +28,8 @@ static struct sock* bp_alloc_socket(struct net* net, int kern) sock_init_data(NULL, sk); bp = bp_sk(sk); - skb_queue_head_init(&bp->queue); - init_waitqueue_head(&bp->wait_queue); + skb_queue_head_init(&bp->rx_queue); + init_waitqueue_head(&bp->rx_waitq); bp->bp_node_id = 0; bp->bp_service_id = 0; } @@ -178,7 +178,7 @@ int bp_release(struct socket* sock) write_lock_bh(&bp_list_lock); sk_del_node_init(sk); write_unlock_bh(&bp_list_lock); - skb_queue_purge(&bp->queue); + skb_queue_purge(&bp->rx_queue); sock->sk = NULL; release_sock(sk); @@ -192,9 +192,15 @@ int bp_sendmsg(struct socket* sock, struct msghdr* msg, size_t size) { struct sockaddr_bp* addr; void* payload; - u_int32_t service_id; - u_int32_t node_id; + u_int32_t dest_node_id, dest_service_id; int ret; + struct bp_sock* bp = bp_sk(sock->sk); + + if (bp->bp_node_id == 0 || bp->bp_service_id == 0) { + pr_err("bp_sendmsg: socket must be bound before sending\n"); + ret = -EADDRNOTAVAIL; + goto out; + } if (!msg->msg_name) { pr_err("bp_sendmsg: no destination address provided\n"); @@ -226,21 +232,21 @@ int bp_sendmsg(struct socket* sock, struct msghdr* msg, size_t size) goto out; } - service_id = addr->bp_addr.ipn.service_id; - node_id = addr->bp_addr.ipn.node_id; + dest_node_id = addr->bp_addr.ipn.node_id; + dest_service_id = addr->bp_addr.ipn.service_id; // https://www.rfc-editor.org/rfc/rfc9758.html#name-node-numbers - if (node_id > 0xFFFFFFFF) { + if (dest_node_id > 0xFFFFFFFF) { pr_err("bp_bind: invalid node ID (must be in [0;2^31])\n"); ret = -EINVAL; goto out; } // https://www.rfc-editor.org/rfc/rfc9758.html#name-service-numbers - if (service_id < 1 || service_id > 0xFFFFFFFF) { + if (dest_service_id < 1 || dest_service_id > 0xFFFFFFFF) { pr_err("bp_bind: invalid service ID %d (must be in " "[1;2^31])\n", - service_id); + dest_service_id); ret = -EINVAL; goto out; } @@ -265,8 +271,8 @@ int bp_sendmsg(struct socket* sock, struct msghdr* msg, size_t size) goto err_free; } - ret = send_bundle_doit( - payload, size, node_id, service_id, 8443); + ret = send_bundle_doit(payload, size, dest_node_id, + dest_service_id, bp->bp_node_id, bp->bp_service_id, 8443); if (ret < 0) { pr_err( "bp_sendmsg: send_bundle_doit failed (%d)\n", ret); @@ -289,11 +295,19 @@ int bp_recvmsg(struct socket* sock, struct msghdr* msg, size_t size, int flags) struct sock* sk; struct bp_sock* bp; struct sk_buff* skb; + struct sockaddr_bp* src_addr; int ret; sk = sock->sk; lock_sock(sk); bp = bp_sk(sk); + + if (bp->bp_node_id == 0 || bp->bp_service_id == 0) { + pr_err("bp_recvmsg: socket must be bound before receiving\n"); + ret = -EADDRNOTAVAIL; + goto out; + } + ret = request_bundle_doit(bp->bp_node_id, bp->bp_service_id, 8443); if (ret < 0) { pr_err("bp_recvmsg: request_bundle_doit failed (%d)\n", ret); @@ -301,19 +315,25 @@ int bp_recvmsg(struct socket* sock, struct msghdr* msg, size_t size, int flags) } ret = wait_event_interruptible( - bp->wait_queue, !skb_queue_empty(&bp->queue)); + bp->rx_waitq, !skb_queue_empty(&bp->rx_queue) || bp->rx_canceled); if (ret < 0) { pr_err("bp_recvmsg: interrupted while waiting\n"); goto out; } + if (bp->rx_canceled) { + pr_info("bp_recvmsg: bundle request canceled\n"); + ret = -ECANCELED; + goto out; + } + if (sock_flag(sk, SOCK_DEAD)) { pr_err("bp_recvmsg: socket closed while waiting\n"); ret = -ESHUTDOWN; goto out; } - skb = skb_dequeue(&bp->queue); + skb = skb_dequeue(&bp->rx_queue); if (!skb) { pr_info("bp_recvmsg: no messages in the queue for service %d\n", bp->bp_service_id); @@ -329,6 +349,16 @@ int bp_recvmsg(struct socket* sock, struct msghdr* msg, size_t size, int flags) goto out; } + if (msg->msg_name) { + src_addr = (struct sockaddr_bp*)msg->msg_name; + src_addr->bp_family = AF_BP; + src_addr->bp_scheme = BP_SCHEME_IPN; + src_addr->bp_addr.ipn.node_id = BP_SKB_CB(skb)->src_node_id; + src_addr->bp_addr.ipn.service_id + = BP_SKB_CB(skb)->src_service_id; + msg->msg_namelen = sizeof(struct sockaddr_bp); + } + if (copy_to_iter(skb->data, skb->len, &msg->msg_iter) != skb->len) { pr_err("bp_recvmsg: failed to copy data to user space\n"); ret = -EFAULT; diff --git a/bp_socket/af_bp.h b/bp_socket/af_bp.h index 842f080..d6b4446 100644 --- a/bp_socket/af_bp.h +++ b/bp_socket/af_bp.h @@ -4,7 +4,13 @@ #include #include +struct bp_skb_cb { + u_int32_t src_node_id; + u_int32_t src_service_id; +}; + #define bp_sk(ptr) container_of(ptr, struct bp_sock, sk) +#define BP_SKB_CB(skb) ((struct bp_skb_cb*)((skb)->cb)) extern struct hlist_head bp_list; extern rwlock_t bp_list_lock; @@ -15,8 +21,9 @@ struct bp_sock { struct sock sk; u_int32_t bp_node_id; u_int8_t bp_service_id; - struct sk_buff_head queue; - wait_queue_head_t wait_queue; + struct sk_buff_head rx_queue; + wait_queue_head_t rx_waitq; + bool rx_canceled; }; int bp_bind(struct socket* sock, struct sockaddr* addr, int addr_len); diff --git a/bp_socket/bp_genl.c b/bp_socket/bp_genl.c index 485a81b..78c028f 100644 --- a/bp_socket/bp_genl.c +++ b/bp_socket/bp_genl.c @@ -5,19 +5,27 @@ static const struct nla_policy nla_policy[BP_GENL_A_MAX + 1] = { [BP_GENL_A_UNSPEC] = { .type = NLA_UNSPEC }, - [BP_GENL_A_SOCKID] = { .type = NLA_U64 }, - [BP_GENL_A_NODE_ID] = { .type = NLA_U32 }, - [BP_GENL_A_SERVICE_ID] = { .type = NLA_U32 }, + [BP_GENL_A_SRC_NODE_ID] = { .type = NLA_U32 }, + [BP_GENL_A_SRC_SERVICE_ID] = { .type = NLA_U32 }, + [BP_GENL_A_DEST_NODE_ID] = { .type = NLA_U32 }, + [BP_GENL_A_DEST_SERVICE_ID] = { .type = NLA_U32 }, [BP_GENL_A_PAYLOAD] = { .type = NLA_BINARY }, }; static struct genl_ops genl_ops[] = { { - .cmd = BP_GENL_CMD_DELIVER_BUNDLE, - .flags = GENL_ADMIN_PERM, - .policy = nla_policy, - .doit = deliver_bundle_doit, - .dumpit = NULL, -} }; + .cmd = BP_GENL_CMD_DELIVER_BUNDLE, + .flags = GENL_ADMIN_PERM, + .policy = nla_policy, + .doit = deliver_bundle_doit, + .dumpit = NULL, + }, + { + .cmd = BP_GENL_CMD_CANCEL_BUNDLE_REQUEST, + .flags = GENL_ADMIN_PERM, + .policy = nla_policy, + .doit = cancel_bundle_request_doit, + .dumpit = NULL, + } }; /* Multicast groups for our family */ static const struct genl_multicast_group genl_mcgrps[] = { @@ -36,17 +44,17 @@ struct genl_family genl_fam = { .n_mcgrps = ARRAY_SIZE(genl_mcgrps), }; -int send_bundle_doit(void* payload, int payload_size, u_int32_t node_id, - u_int32_t service_id, int port_id) +int send_bundle_doit(void* payload, size_t payload_size, u_int32_t dest_node_id, + u_int32_t dest_service_id, u_int32_t src_node_id, u_int32_t src_service_id, + int port_id) { void* msg_head; struct sk_buff* msg; size_t msg_size; int ret; - msg_size = nla_total_size(sizeof(u64)) - + nla_total_size(sizeof(u_int32_t)) - + nla_total_size(sizeof(u_int32_t)) + nla_total_size(payload_size); + msg_size = 4 * nla_total_size(sizeof(u_int32_t)) + + nla_total_size(payload_size); msg = genlmsg_new(msg_size, GFP_KERNEL); if (!msg) { pr_err("send_bundle: failed to allocate message buffer\n"); @@ -62,21 +70,42 @@ int send_bundle_doit(void* payload, int payload_size, u_int32_t node_id, goto err_free; } - ret = nla_put_u32(msg, BP_GENL_A_NODE_ID, node_id); + ret = nla_put_u32(msg, BP_GENL_A_DEST_NODE_ID, dest_node_id); + if (ret) { + pr_err( + "send_bundle: failed to put BP_GENL_A_DEST_NODE_ID (%d)\n", + ret); + goto err_cancel; + } + + ret = nla_put_u32(msg, BP_GENL_A_DEST_SERVICE_ID, dest_service_id); + if (ret) { + pr_err("send_bundle: failed to put BP_GENL_A_DEST_SERVICE_ID " + "(%d)\n", + ret); + goto err_cancel; + } + + ret = nla_put_u32(msg, BP_GENL_A_SRC_NODE_ID, src_node_id); if (ret) { - pr_err("send_bundle: failed to put NODE_ID (%d)\n", ret); + pr_err( + "send_bundle: failed to put BP_GENL_A_SRC_NODE_ID (%d)\n", + ret); goto err_cancel; } - ret = nla_put_u32(msg, BP_GENL_A_SERVICE_ID, service_id); + ret = nla_put_u32(msg, BP_GENL_A_SRC_SERVICE_ID, src_service_id); if (ret) { - pr_err("send_bundle: failed to put SERVICE_ID (%d)\n", ret); + pr_err("send_bundle: failed to put BP_GENL_A_SRC_SERVICE_ID " + "(%d)\n", + ret); goto err_cancel; } ret = nla_put(msg, BP_GENL_A_PAYLOAD, payload_size, payload); if (ret) { - pr_err("send_bundle: failed to put PAYLOAD (%d)\n", ret); + pr_err( + "send_bundle: failed to put BP_GENL_A_PAYLOAD (%d)\n", ret); goto err_cancel; } @@ -91,7 +120,8 @@ int send_bundle_doit(void* payload, int payload_size, u_int32_t node_id, return ret; } -int request_bundle_doit(u_int32_t node_id, u_int32_t service_id, int port_id) +int request_bundle_doit( + u_int32_t dest_node_id, u_int32_t dest_service_id, int port_id) { void* msg_head; struct sk_buff* msg; @@ -114,15 +144,19 @@ int request_bundle_doit(u_int32_t node_id, u_int32_t service_id, int port_id) goto err_free; } - ret = nla_put_u32(msg, BP_GENL_A_NODE_ID, node_id); + ret = nla_put_u32(msg, BP_GENL_A_DEST_NODE_ID, dest_node_id); if (ret) { - pr_err("request_bundle: failed to put NODE_ID (%d)\n", ret); + pr_err("request_bundle: failed to put BP_GENL_A_DEST_NODE_ID " + "(%d)\n", + ret); goto err_cancel; } - ret = nla_put_u32(msg, BP_GENL_A_SERVICE_ID, service_id); + ret = nla_put_u32(msg, BP_GENL_A_DEST_SERVICE_ID, dest_service_id); if (ret) { - pr_err("request_bundle: failed to put SERVICE_ID (%d)\n", ret); + pr_err("request_bundle: failed to put " + "BP_GENL_A_DEST_SERVICE_ID (%d)\n", + ret); goto err_cancel; } @@ -137,26 +171,69 @@ int request_bundle_doit(u_int32_t node_id, u_int32_t service_id, int port_id) return ret; } +int cancel_bundle_request_doit(struct sk_buff* skb, struct genl_info* info) +{ + struct sock* sk; + struct bp_sock* bp; + u_int32_t dest_node_id, dest_service_id; + + if (!info->attrs[BP_GENL_A_DEST_NODE_ID] + || !info->attrs[BP_GENL_A_DEST_SERVICE_ID]) { + pr_err("cancel_bundle_request: missing required attributes\n"); + return -EINVAL; + } + + dest_node_id = nla_get_u32(info->attrs[BP_GENL_A_DEST_NODE_ID]); + dest_service_id = nla_get_u32(info->attrs[BP_GENL_A_DEST_SERVICE_ID]); + + read_lock_bh(&bp_list_lock); + sk_for_each(sk, &bp_list) + { + bh_lock_sock(sk); + bp = bp_sk(sk); + + if (bp->bp_node_id == dest_node_id + && bp->bp_service_id == dest_service_id) { + + if (waitqueue_active(&bp->rx_waitq)) { + bp->rx_canceled = true; + wake_up_interruptible(&bp->rx_waitq); + } + bh_unlock_sock(sk); + break; + } + bh_unlock_sock(sk); + } + read_unlock_bh(&bp_list_lock); + + return 0; +} + int deliver_bundle_doit(struct sk_buff* skb, struct genl_info* info) { struct sock* sk; struct bp_sock* bp; struct sk_buff* new_skb; bool new_skb_queued = false; - u_int32_t node_id, service_id; + u_int32_t dest_node_id, dest_service_id, src_node_id, src_service_id; void* payload; size_t payload_len; int ret; - if (!info->attrs[BP_GENL_A_NODE_ID] - || !info->attrs[BP_GENL_A_SERVICE_ID] + if (!info->attrs[BP_GENL_A_DEST_NODE_ID] + || !info->attrs[BP_GENL_A_DEST_SERVICE_ID] + || !info->attrs[BP_GENL_A_SRC_NODE_ID] + || !info->attrs[BP_GENL_A_SRC_SERVICE_ID] || !info->attrs[BP_GENL_A_PAYLOAD]) { pr_err("deliver_bundle: missing required attributes\n"); ret = -EINVAL; goto out; } - node_id = nla_get_u32(info->attrs[BP_GENL_A_NODE_ID]); - service_id = nla_get_u32(info->attrs[BP_GENL_A_SERVICE_ID]); + + dest_node_id = nla_get_u32(info->attrs[BP_GENL_A_DEST_NODE_ID]); + dest_service_id = nla_get_u32(info->attrs[BP_GENL_A_DEST_SERVICE_ID]); + src_node_id = nla_get_u32(info->attrs[BP_GENL_A_SRC_NODE_ID]); + src_service_id = nla_get_u32(info->attrs[BP_GENL_A_SRC_SERVICE_ID]); payload = nla_data(info->attrs[BP_GENL_A_PAYLOAD]); payload_len = nla_len(info->attrs[BP_GENL_A_PAYLOAD]); @@ -167,6 +244,8 @@ int deliver_bundle_doit(struct sk_buff* skb, struct genl_info* info) goto out; } skb_put_data(new_skb, payload, payload_len); + BP_SKB_CB(new_skb)->src_node_id = src_node_id; + BP_SKB_CB(new_skb)->src_service_id = src_service_id; read_lock_bh(&bp_list_lock); sk_for_each(sk, &bp_list) @@ -174,13 +253,14 @@ int deliver_bundle_doit(struct sk_buff* skb, struct genl_info* info) bh_lock_sock(sk); bp = bp_sk(sk); - if (bp->bp_node_id == node_id - && bp->bp_service_id == service_id) { + if (bp->bp_node_id == dest_node_id + && bp->bp_service_id == dest_service_id) { - skb_queue_tail(&bp->queue, new_skb); + skb_queue_tail(&bp->rx_queue, new_skb); new_skb_queued = true; - if (waitqueue_active(&bp->wait_queue)) - wake_up_interruptible(&bp->wait_queue); + if (waitqueue_active(&bp->rx_waitq)) { + wake_up_interruptible(&bp->rx_waitq); + } bh_unlock_sock(sk); break; } @@ -189,8 +269,8 @@ int deliver_bundle_doit(struct sk_buff* skb, struct genl_info* info) read_unlock_bh(&bp_list_lock); if (!new_skb_queued) { - pr_err("deliver_bundle: no socket found for service ID %d\n", - service_id); + pr_err("deliver_bundle: no socket found (ipn:%d.%d)\n", + dest_node_id, dest_service_id); ret = -ENODEV; goto err_free; } @@ -203,7 +283,8 @@ int deliver_bundle_doit(struct sk_buff* skb, struct genl_info* info) return ret; } -int destroy_bundle_doit(u_int32_t node_id, u_int32_t service_id, int port_id) +int destroy_bundle_doit( + u_int32_t dest_node_id, u_int32_t dest_service_id, int port_id) { void* msg_head; struct sk_buff* msg; @@ -226,15 +307,19 @@ int destroy_bundle_doit(u_int32_t node_id, u_int32_t service_id, int port_id) goto err_free; } - ret = nla_put_u32(msg, BP_GENL_A_NODE_ID, node_id); + ret = nla_put_u32(msg, BP_GENL_A_DEST_NODE_ID, dest_node_id); if (ret) { - pr_err("destroy_bundle: failed to put NODE_ID (%d)\n", ret); + pr_err("destroy_bundle: failed to put BP_GENL_A_DEST_NODE_ID " + "(%d)\n", + ret); goto err_cancel; } - ret = nla_put_u32(msg, BP_GENL_A_SERVICE_ID, service_id); + ret = nla_put_u32(msg, BP_GENL_A_DEST_SERVICE_ID, dest_service_id); if (ret) { - pr_err("destroy_bundle: failed to put SERVICE_ID (%d)\n", ret); + pr_err("destroy_bundle: failed to put " + "BP_GENL_A_DEST_SERVICE_ID (%d)\n", + ret); goto err_cancel; } diff --git a/bp_socket/bp_genl.h b/bp_socket/bp_genl.h index d3cfb1d..3dc1153 100644 --- a/bp_socket/bp_genl.h +++ b/bp_socket/bp_genl.h @@ -5,10 +5,14 @@ extern struct genl_family genl_fam; -int send_bundle_doit(void* payload, int payload_size, u_int32_t node_id, - u_int32_t service_id, int port_id); +int send_bundle_doit(void* payload, size_t payload_size, u_int32_t dest_node_id, + u_int32_t dest_service_id, u_int32_t src_node_id, u_int32_t src_service_id, + int port_id); int deliver_bundle_doit(struct sk_buff* skb, struct genl_info* info); -int request_bundle_doit(u_int32_t node_id, u_int32_t service_id, int port_id); -int destroy_bundle_doit(u_int32_t node_id, u_int32_t service_id, int port_id); +int request_bundle_doit( + u_int32_t dest_node_id, u_int32_t dest_service_id, int port_id); +int cancel_bundle_request_doit(struct sk_buff* skb, struct genl_info* info); +int destroy_bundle_doit( + u_int32_t dest_node_id, u_int32_t dest_service_id, int port_id); #endif \ No newline at end of file diff --git a/daemon/adu_ref.c b/daemon/adu_ref.c new file mode 100644 index 0000000..902067d --- /dev/null +++ b/daemon/adu_ref.c @@ -0,0 +1,79 @@ +#include "adu_ref.h" +#include "log.h" +#include "sdr.h" +#include +#include + +static struct adu_reference *adu_refs = NULL; +static pthread_mutex_t adu_refs_mutex = PTHREAD_MUTEX_INITIALIZER; + +int add_adu(Sdr sdr, Object adu, u_int32_t dest_node_id, u_int32_t dest_service_id, + u_int32_t src_node_id, u_int32_t src_service_id) { + struct adu_reference *ref; + + if (pthread_mutex_lock(&adu_refs_mutex) != 0) { + log_error("add_adu: Failed to lock SDR mutex."); + return -1; + } + + ref = malloc(sizeof(struct adu_reference)); + if (!ref) { + log_error("add_adu: Failed to allocate memory for bundle reference."); + pthread_mutex_unlock(&adu_refs_mutex); + return -ENOMEM; + } + + ref->adu = adu; + ref->dest_node_id = dest_node_id; + ref->dest_service_id = dest_service_id; + ref->src_node_id = src_node_id; + ref->src_service_id = src_service_id; + ref->next = adu_refs; + adu_refs = ref; + + pthread_mutex_unlock(&adu_refs_mutex); + return 0; +} + +struct adu_reference *find_adu_ref(Sdr sdr, u_int32_t dest_node_id, u_int32_t dest_service_id) { + struct adu_reference *ref; + + for (ref = adu_refs; ref != NULL; ref = ref->next) { + if (ref->dest_node_id == dest_node_id && ref->dest_service_id == dest_service_id) { + return ref; + } + } + return NULL; +} + +Object remove_adu_ref(Sdr sdr, u_int32_t dest_node_id, u_int32_t dest_service_id) { + struct adu_reference *prev = NULL; + struct adu_reference *current = adu_refs; + Object adu = 0; + + if (pthread_mutex_lock(&adu_refs_mutex) != 0) { + log_error("remove_adu_ref: Failed to lock adu_refs mutex."); + return -EAGAIN; + } + + while (current) { + if (current->dest_node_id == dest_node_id && current->dest_service_id == dest_service_id) { + adu = current->adu; + if (prev) { + prev->next = current->next; + } else { + adu_refs = current->next; + } + + free(current); + pthread_mutex_unlock(&adu_refs_mutex); + return adu; + } + prev = current; + current = current->next; + } + + pthread_mutex_unlock(&adu_refs_mutex); + log_warn("remove_adu_ref: no bundle found (ipn:%u.%u)", dest_node_id, dest_service_id); + return adu; +} \ No newline at end of file diff --git a/daemon/adu_ref.h b/daemon/adu_ref.h new file mode 100644 index 0000000..8fb42ab --- /dev/null +++ b/daemon/adu_ref.h @@ -0,0 +1,20 @@ +#ifndef ADU_REF_H +#define ADU_REF_H + +#include "bp.h" + +struct adu_reference { + Object adu; + u_int32_t dest_node_id; + u_int32_t dest_service_id; + u_int32_t src_node_id; + u_int32_t src_service_id; + struct adu_reference *next; +}; + +int add_adu(Sdr sdr, Object adu, u_int32_t dest_node_id, u_int32_t dest_service_id, + u_int32_t src_node_id, u_int32_t src_service_id); +struct adu_reference *find_adu_ref(Sdr sdr, u_int32_t dest_node_id, u_int32_t dest_service_id); +Object remove_adu_ref(Sdr sdr, u_int32_t dest_node_id, u_int32_t dest_service_id); + +#endif \ No newline at end of file diff --git a/daemon/bp_genl_handlers.c b/daemon/bp_genl_handlers.c index e5860c5..dbaa63c 100644 --- a/daemon/bp_genl_handlers.c +++ b/daemon/bp_genl_handlers.c @@ -8,6 +8,7 @@ #include #include "../include/bp_socket.h" +#include "adu_ref.h" #include "bp.h" #include "bp_genl_handlers.h" #include "daemon.h" @@ -16,12 +17,15 @@ int handle_send_bundle(Daemon *daemon, struct nlattr **attrs) { void *payload; - int payload_size; - u_int32_t node_id, service_id; - char eid[64]; - int eid_size; - - if (!attrs[BP_GENL_A_PAYLOAD] || !attrs[BP_GENL_A_NODE_ID] || !attrs[BP_GENL_A_SERVICE_ID]) { + size_t payload_size; + u_int32_t dest_node_id, dest_service_id, src_node_id, src_service_id; + char dest_eid[64]; + int err = 0; + int written; + + if (!attrs[BP_GENL_A_PAYLOAD] || !attrs[BP_GENL_A_DEST_NODE_ID] || + !attrs[BP_GENL_A_DEST_SERVICE_ID] || !attrs[BP_GENL_A_SRC_NODE_ID] || + !attrs[BP_GENL_A_SRC_SERVICE_ID]) { log_error( "handle_send_bundle: missing attribute(s) in SEND_BUNDLE command (payload, node ID, " "service ID)"); @@ -30,20 +34,29 @@ int handle_send_bundle(Daemon *daemon, struct nlattr **attrs) { payload = nla_data(attrs[BP_GENL_A_PAYLOAD]); payload_size = nla_len(attrs[BP_GENL_A_PAYLOAD]); - node_id = nla_get_u32(attrs[BP_GENL_A_NODE_ID]); - service_id = nla_get_u32(attrs[BP_GENL_A_SERVICE_ID]); - - eid_size = snprintf(eid, sizeof(eid), "ipn:%u.%u", node_id, service_id) + 1; - if (eid_size < 0 || eid_size >= (int)sizeof(eid)) { - log_error("[ipn:%u.%u] handle_send_bundle: failed to construct EID string", node_id, - service_id); + dest_node_id = nla_get_u32(attrs[BP_GENL_A_DEST_NODE_ID]); + dest_service_id = nla_get_u32(attrs[BP_GENL_A_DEST_SERVICE_ID]); + src_node_id = nla_get_u32(attrs[BP_GENL_A_SRC_NODE_ID]); + src_service_id = nla_get_u32(attrs[BP_GENL_A_SRC_SERVICE_ID]); + + written = snprintf(dest_eid, sizeof(dest_eid), "ipn:%u.%u", dest_node_id, dest_service_id); + if (written < 0 || written >= (int)sizeof(dest_eid)) { + log_error("[ipn:%u.%u] handle_send_bundle: failed to construct EID string", src_node_id, + src_service_id); return -EINVAL; } - log_info("[ipn:%u.%u] SEND_BUNDLE: sending bundle to EID %s, size %d (bytes)", eid, - payload_size, node_id, service_id); + err = bp_send_to_eid(daemon->sdr, payload, payload_size, dest_eid); + if (err < 0) { + log_error("[ipn:%u.%u] handle_send_bundle: bp_send_to_eid failed with error %d", + dest_node_id, dest_service_id, err); + return err; + } - return bp_send_to_eid(daemon->sdr, payload, payload_size, eid, eid_size); + log_info("[ipn:%u.%u] SEND_BUNDLE: bundle sent to EID %s, size %d (bytes)", src_node_id, + src_service_id, dest_eid, payload_size); + + return 0; } int handle_request_bundle(Daemon *daemon, struct nlattr **attrs) { @@ -51,15 +64,15 @@ int handle_request_bundle(Daemon *daemon, struct nlattr **attrs) { struct thread_args *args; u_int32_t node_id, service_id; - if (!attrs[BP_GENL_A_SERVICE_ID] || !attrs[BP_GENL_A_NODE_ID]) { + if (!attrs[BP_GENL_A_DEST_SERVICE_ID] || !attrs[BP_GENL_A_DEST_NODE_ID]) { log_error("handle_request_bundle: missing attribute(s) in REQUEST_BUNDLE " "command (service " "ID, node ID)"); return -EINVAL; } - node_id = nla_get_u32(attrs[BP_GENL_A_NODE_ID]); - service_id = nla_get_u32(attrs[BP_GENL_A_SERVICE_ID]); + node_id = nla_get_u32(attrs[BP_GENL_A_DEST_NODE_ID]); + service_id = nla_get_u32(attrs[BP_GENL_A_DEST_SERVICE_ID]); args = malloc(sizeof(struct thread_args)); if (!args) { @@ -86,98 +99,164 @@ int handle_request_bundle(Daemon *daemon, struct nlattr **attrs) { } void *handle_recv_thread(struct thread_args *args) { - void *payload = NULL; - size_t payload_size; int err; - bool bundle_present; - Object adu; + struct reply_bundle reply; + + reply = bp_recv_once(args->sdr, args->node_id, + args->service_id); // Blocking invocation to receive a bundle + if (!reply.is_present) { + err = handle_cancel_bundle_request(args->netlink_family, args->netlink_sock, args->node_id, + args->service_id); + if (err < 0) { + log_error("[ipn:%u.%u] handle_cancel_bundle_request failed with error %d", + args->node_id, args->service_id, err); + goto out; + } + + log_info("[ipn:%u.%u] CANCEL_BUNDLE_REQUEST: bundle request cancelled", args->node_id, + args->service_id); + } else { + err = handle_deliver_bundle(args->netlink_family, args->netlink_sock, reply.payload, + reply.payload_size, reply.src_node_id, reply.src_service_id, + args->node_id, args->service_id); + if (err < 0) { + log_error("[ipn:%u.%u] handle_deliver_bundle: failed with error %d", args->node_id, + args->service_id, err); + goto out; + } + + log_info("[ipn:%u.%u] DELIVER_BUNDLE: bundle sent to kernel", args->node_id, + args->service_id); + } - adu = find_adu(args->sdr, args->node_id, args->service_id); - bundle_present = adu != 0; +out: + if (reply.payload) free(reply.payload); + free(args); + return NULL; +} - payload = bp_recv_once(args->sdr, args->node_id, args->service_id, &payload_size); - if (!payload) { - log_error("[ipn:%u.%u] handle_recv_thread: failed to receive bundle", args->node_id, - args->service_id); +int handle_cancel_bundle_request(int netlink_family, struct nl_sock *netlink_sock, + u_int32_t node_id, u_int32_t service_id) { + struct nl_msg *msg = NULL; + void *hdr; + int ret; + + msg = nlmsg_alloc(); + if (!msg) { + log_error("[ipn:%u.%u] handle_cancel_bundle_request: failed to allocate Netlink msg", + node_id, service_id); + ret = -ENOMEM; goto out; } - if (!bundle_present) { - log_info("[ipn:%u.%u] REQUEST_BUNDLE: bundle received, size %zu bytes", args->node_id, - args->service_id, payload_size); - } else { - log_warn("[ipn:%u.%u] REQUEST_BUNDLE: bundle reference already present in memory, size %zu " - "bytes", - args->node_id, args->service_id, payload_size); + hdr = genlmsg_put(msg, NL_AUTO_PORT, NL_AUTO_SEQ, netlink_family, 0, 0, + BP_GENL_CMD_CANCEL_BUNDLE_REQUEST, BP_GENL_VERSION); + if (!hdr) { + log_error("[ipn:%u.%u] handle_cancel_bundle_request: failed to create Netlink header", + node_id, service_id); + ret = -EMSGSIZE; + goto err_free_msg; } - err = handle_deliver_bundle(payload, payload_size, args); - if (err < 0) { - log_error("[ipn:%u.%u] handle_deliver_bundle: failed with error %d", err, args->node_id, - args->service_id); + if (nla_put_u32(msg, BP_GENL_A_DEST_NODE_ID, node_id) < 0) { + log_error("[ipn:%u.%u] handle_cancel_bundle_request: failed to add NODE_ID attribute", + node_id, service_id); + ret = -EMSGSIZE; + goto err_free_msg; } + if (nla_put_u32(msg, BP_GENL_A_DEST_SERVICE_ID, service_id) < 0) { + log_error("[ipn:%u.%u] handle_cancel_bundle_request: failed to add SERVICE_ID attribute", + node_id, service_id); + ret = -EMSGSIZE; + goto err_free_msg; + } + + ret = nl_send_sync(netlink_sock, msg); + if (ret < 0) { + log_error("[ipn:%u.%u] handle_cancel_bundle_request: bundle request not cancelled", node_id, + service_id); + ret = -errno; + goto out; + } + + return 0; + +err_free_msg: + nlmsg_free(msg); out: - if (payload) free(payload); - free(args); - return NULL; + return ret; } -int handle_deliver_bundle(void *payload, int payload_size, struct thread_args *args) { +int handle_deliver_bundle(int netlink_family, struct nl_sock *netlink_sock, void *payload, + int payload_size, u_int32_t src_node_id, u_int32_t src_service_id, + u_int32_t dest_node_id, u_int32_t dest_service_id) { struct nl_msg *msg = NULL; void *hdr; int ret; msg = nlmsg_alloc(); if (!msg) { - log_error("[ipn:%u.%u] handle_deliver_bundle: failed to allocate Netlink msg", - args->node_id, args->service_id); + log_error("[ipn:%u.%u] handle_deliver_bundle: failed to allocate Netlink msg", dest_node_id, + dest_service_id); ret = -ENOMEM; goto out; } - hdr = genlmsg_put(msg, NL_AUTO_PORT, NL_AUTO_SEQ, args->netlink_family, 0, 0, + hdr = genlmsg_put(msg, NL_AUTO_PORT, NL_AUTO_SEQ, netlink_family, 0, 0, BP_GENL_CMD_DELIVER_BUNDLE, BP_GENL_VERSION); if (!hdr) { log_error("[ipn:%u.%u] handle_deliver_bundle: failed to create Netlink header", - args->node_id, args->service_id); + dest_node_id, dest_service_id); ret = -EMSGSIZE; goto err_free_msg; } - if (nla_put_u32(msg, BP_GENL_A_SERVICE_ID, args->service_id) < 0) { + if (nla_put_u32(msg, BP_GENL_A_DEST_NODE_ID, dest_node_id) < 0) { + log_error("[ipn:%u.%u] handle_deliver_bundle: failed to add NODE_ID attribute", + dest_node_id, dest_service_id); + ret = -EMSGSIZE; + goto err_free_msg; + } + + if (nla_put_u32(msg, BP_GENL_A_DEST_SERVICE_ID, dest_service_id) < 0) { log_error("[ipn:%u.%u] handle_deliver_bundle: failed to add SERVICE_ID attribute", - args->node_id, args->service_id); + dest_node_id, dest_service_id); ret = -EMSGSIZE; goto err_free_msg; } - if (nla_put_u32(msg, BP_GENL_A_NODE_ID, args->node_id) < 0) { - log_error("[ipn:%u.%u] handle_deliver_bundle: failed to add NODE_ID attribute", - args->node_id, args->service_id); + if (nla_put_u32(msg, BP_GENL_A_SRC_NODE_ID, src_node_id) < 0) { + log_error("[ipn:%u.%u] handle_deliver_bundle: failed to add SRC_NODE_ID attribute", + src_node_id, src_service_id); + ret = -EMSGSIZE; + goto err_free_msg; + } + + if (nla_put_u32(msg, BP_GENL_A_SRC_SERVICE_ID, src_service_id) < 0) { + log_error("[ipn:%u.%u] handle_deliver_bundle: failed to add SRC_SERVICE_ID attribute", + src_node_id, src_service_id); ret = -EMSGSIZE; goto err_free_msg; } if (nla_put(msg, BP_GENL_A_PAYLOAD, payload_size, payload) < 0) { - log_error("[ipn:%u.%u] [ipn:%u.%u] handle_deliver_bundle: failed to add PAYLOAD attribute", - args->node_id, args->service_id); + log_error("[ipn:%u.%u] handle_deliver_bundle: failed to add PAYLOAD attribute", + dest_node_id, dest_service_id); ret = -EMSGSIZE; goto err_free_msg; } - ret = nl_send_sync(args->netlink_sock, msg); + ret = nl_send_sync(netlink_sock, msg); if (ret < 0) { log_warn("[ipn:%u.%u] DELIVER_BUNDLE: bundle not delivered to kernel, keeping reference in " "memory (no active BP socket " "client)", - args->node_id, args->service_id); - ret = 0; // Do not return an error, just log it + dest_node_id, dest_service_id); + ret = -ENODEV; goto out; } - log_info("[ipn:%u.%u] DELIVER_BUNDLE: bundle sent to kernel", args->node_id, args->service_id); - return 0; err_free_msg: @@ -188,9 +267,10 @@ int handle_deliver_bundle(void *payload, int payload_size, struct thread_args *a int handle_destroy_bundle(Daemon *daemon, struct nlattr **attrs) { u_int32_t node_id, service_id; + Object adu; int ret = 0; - if (!attrs[BP_GENL_A_NODE_ID] || !attrs[BP_GENL_A_SERVICE_ID]) { + if (!attrs[BP_GENL_A_DEST_NODE_ID] || !attrs[BP_GENL_A_DEST_SERVICE_ID]) { log_error("handle_destroy_bundle: missing attribute(s) in DESTROY_BUNDLE " "command (node ID, " "service ID)"); @@ -198,15 +278,21 @@ int handle_destroy_bundle(Daemon *daemon, struct nlattr **attrs) { goto out; } - node_id = nla_get_u32(attrs[BP_GENL_A_NODE_ID]); - service_id = nla_get_u32(attrs[BP_GENL_A_SERVICE_ID]); + node_id = nla_get_u32(attrs[BP_GENL_A_DEST_NODE_ID]); + service_id = nla_get_u32(attrs[BP_GENL_A_DEST_SERVICE_ID]); - ret = destroy_adu(daemon->sdr, node_id, service_id); - if (ret < 0) { + adu = remove_adu_ref(daemon->sdr, node_id, service_id); + if (adu == 0) { log_error("[ipn:%u.%u] handle_destroy_bundle: failed to destroy bundle: %s", node_id, service_id, strerror(-ret)); goto out; } + ret = destroy_bundle(daemon->sdr, adu); + if (ret < 0) { + log_error("[ipn:%u.%u] handle_destroy_bundle: destroy_bundle failed with error %d", node_id, + service_id, ret); + goto out; + } log_info("[ipn:%u.%u] DESTROY_BUNDLE: bundle destroy from ION", node_id, service_id); diff --git a/daemon/bp_genl_handlers.h b/daemon/bp_genl_handlers.h index f959ea4..e0b10a7 100644 --- a/daemon/bp_genl_handlers.h +++ b/daemon/bp_genl_handlers.h @@ -14,7 +14,11 @@ struct thread_args { int handle_send_bundle(Daemon *daemon, struct nlattr **attrs); int handle_request_bundle(Daemon *daemon, struct nlattr **attrs); -int handle_deliver_bundle(void *payload, int payload_size, struct thread_args *args); +int handle_cancel_bundle_request(int netlink_family, struct nl_sock *netlink_sock, + u_int32_t node_id, u_int32_t service_id); +int handle_deliver_bundle(int netlink_family, struct nl_sock *netlink_sock, void *payload, + int payload_size, u_int32_t src_node_id, u_int32_t src_service_id, + u_int32_t dest_node_id, u_int32_t dest_service_id); int handle_destroy_bundle(Daemon *daemon, struct nlattr **attrs); void *handle_recv_thread(struct thread_args *arg); diff --git a/daemon/ion.c b/daemon/ion.c index e04dfe8..87bf013 100644 --- a/daemon/ion.c +++ b/daemon/ion.c @@ -1,11 +1,10 @@ #include "ion.h" +#include "adu_ref.h" #include "log.h" #include "sdr.h" #include #include -static struct adu_reference *adu_refs = NULL; -static pthread_mutex_t adu_refs_mutex = PTHREAD_MUTEX_INITIALIZER; static pthread_mutex_t sdrmutex = PTHREAD_MUTEX_INITIALIZER; const char *bp_result_text(BpIndResult result) { @@ -23,82 +22,26 @@ const char *bp_result_text(BpIndResult result) { } } -int add_adu(Sdr sdr, Object adu, u_int32_t node_id, u_int32_t service_id) { - struct adu_reference *ref; - - if (pthread_mutex_lock(&adu_refs_mutex) != 0) { - log_error("add_adu: Failed to lock SDR mutex."); - return -1; +int destroy_bundle(Sdr sdr, Object adu) { + if (pthread_mutex_lock(&sdrmutex) != 0) { + log_error("destroy_bundle: Failed to lock SDR mutex."); + return -EAGAIN; } - ref = malloc(sizeof(struct adu_reference)); - if (!ref) { - log_error("add_adu: Failed to allocate memory for bundle reference."); - pthread_mutex_unlock(&adu_refs_mutex); - return -ENOMEM; + if (sdr_begin_xn(sdr) == 0) { + log_error("destroy_bundle: sdr_begin_xn failed."); + pthread_mutex_unlock(&sdrmutex); + return -EIO; } - ref->adu = adu; - ref->node_id = node_id; - ref->service_id = service_id; - ref->next = adu_refs; - adu_refs = ref; - - pthread_mutex_unlock(&adu_refs_mutex); - return 0; -} - -Object find_adu(Sdr sdr, u_int32_t node_id, u_int32_t service_id) { - struct adu_reference *ref; + zco_destroy(sdr, adu); - for (ref = adu_refs; ref != NULL; ref = ref->next) { - if (ref->node_id == node_id && ref->service_id == service_id) { - return ref->adu; - } - } + sdr_end_xn(sdr); + pthread_mutex_unlock(&sdrmutex); return 0; } -int destroy_adu(Sdr sdr, u_int32_t node_id, u_int32_t service_id) { - struct adu_reference *prev = NULL; - struct adu_reference *current = adu_refs; - - if (pthread_mutex_lock(&adu_refs_mutex) != 0) { - log_error("destroy_adu: Failed to lock adu_refs mutex."); - return -EAGAIN; - } - - while (current) { - if (current->node_id == node_id && current->service_id == service_id) { - if (prev) { - prev->next = current->next; - } else { - adu_refs = current->next; - } - - if (pthread_mutex_lock(&sdrmutex) != 0) { - log_error("destroy_adu: Failed to lock SDR mutex."); - free(current); - pthread_mutex_unlock(&adu_refs_mutex); - return -EAGAIN; - } - - zco_destroy(sdr, current->adu); - free(current); - pthread_mutex_unlock(&sdrmutex); - pthread_mutex_unlock(&adu_refs_mutex); - return 0; - } - prev = current; - current = current->next; - } - - pthread_mutex_unlock(&adu_refs_mutex); - log_warn("destroy_adu: no bundle found (ipn:%u.%u)", node_id, service_id); - return -ENOENT; -} - -int bp_send_to_eid(Sdr sdr, void *payload, int payload_size, char *dest_eid, int eid_size) { +int bp_send_to_eid(Sdr sdr, void *payload, size_t payload_size, char *dest_eid) { Object sdr_buffer = 0; Object adu; int ret = 0; @@ -120,122 +63,160 @@ int bp_send_to_eid(Sdr sdr, void *payload, int payload_size, char *dest_eid, int adu = zco_create(sdr, ZcoSdrSource, sdr_buffer, 0, payload_size, ZcoOutbound); if (adu <= 0) { log_error("zco_create failed."); + sdr_free(sdr, sdr_buffer); ret = -ENOMEM; goto out; } if (bp_send(NULL, dest_eid, NULL, 86400, BP_STD_PRIORITY, 0, 0, 0, NULL, adu, NULL) <= 0) { log_error("bp_send failed."); + sdr_free(sdr, sdr_buffer); ret = -EIO; goto out; } out: - if (sdr_buffer != 0) sdr_free(sdr, sdr_buffer); sdr_end_xn(sdr); return ret; } -void *bp_recv_once(Sdr sdr, u_int32_t node_id, u_int32_t service_id, size_t *payload_size) { +struct reply_bundle bp_recv_once(Sdr sdr, u_int32_t dest_node_id, u_int32_t dest_service_id) { BpSAP sap; BpDelivery dlv; ZcoReader reader; - Object adu; + struct adu_reference *adu_ref; u_int32_t own_node_id; void *payload = NULL; + size_t payload_size; int eid_size; char eid[64]; + struct reply_bundle reply = {0}; + u_int32_t src_node_id, src_service_id; + + reply.is_present = false; + reply.payload = NULL; own_node_id = getOwnNodeNbr(); - if (node_id != own_node_id) { - log_error("bp_recv_once: node ID mismatch. Expected %u, got %u", own_node_id, node_id); - return NULL; + if (dest_node_id != own_node_id) { + log_error("bp_recv_once: node ID mismatch. Expected %u, got %u", own_node_id, dest_node_id); + goto out; } - adu = find_adu(sdr, node_id, service_id); - if (adu != 0) { - *payload_size = zco_source_data_length(sdr, adu); - payload = malloc(*payload_size); + adu_ref = find_adu_ref(sdr, dest_node_id, dest_service_id); + if (adu_ref != NULL) { + payload_size = zco_source_data_length(sdr, adu_ref->adu); + payload = malloc(payload_size); if (!payload) { log_error("bp_recv_once: Failed to allocate memory for payload."); - return NULL; + goto out; } - zco_start_receiving(adu, &reader); - if (zco_receive_source(sdr, &reader, *payload_size, payload) < 0) { + zco_start_receiving(adu_ref->adu, &reader); + if (zco_receive_source(sdr, &reader, payload_size, payload) < 0) { log_error("bp_recv_once: zco_receive_source failed."); - free(payload); - return NULL; + goto out; } - return payload; - } - eid_size = snprintf(eid, sizeof(eid), "ipn:%u.%u", node_id, service_id); - if (eid_size < 0 || eid_size >= (int)sizeof(eid)) { - log_error("bp_recv_once: failed to construct EID string."); - return NULL; - } + src_node_id = adu_ref->src_node_id; + src_service_id = adu_ref->src_service_id; + } else { + eid_size = snprintf(eid, sizeof(eid), "ipn:%u.%u", dest_node_id, dest_service_id); + if (eid_size < 0 || eid_size >= (int)sizeof(eid)) { + log_error("bp_recv_once: failed to construct EID string."); + goto out; + } - if (bp_open(eid, &sap) < 0) { - log_error("bp_recv_once: failed to open BpSAP (node_id=%u service_id=%u)", node_id, - service_id); - return NULL; - } + if (bp_open(eid, &sap) < 0) { + log_error("bp_recv_once: failed to open BpSAP (node_id=%u service_id=%u)", dest_node_id, + dest_service_id); + goto out; + } - if (bp_receive(sap, &dlv, BP_BLOCKING) < 0) { - log_error("bp_recv_once: bundle reception failed."); - goto close_sap; - } + if (bp_receive(sap, &dlv, BP_BLOCKING) < 0) { + log_error("bp_recv_once: bundle reception failed."); + bp_close(sap); + goto out; + } - if (dlv.result != BpPayloadPresent || dlv.adu == 0) { - log_error("bp_recv_once: %s", bp_result_text(dlv.result)); - goto release_dlv; - } + if (dlv.result != BpPayloadPresent || dlv.adu == 0) { + log_error("bp_recv_once: %s", bp_result_text(dlv.result)); + bp_release_delivery(&dlv, 0); + bp_close(sap); + goto out; + } - if (add_adu(sdr, dlv.adu, node_id, service_id) < 0) { - log_error("bp_recv_once: failed to add bundle reference."); - goto release_dlv; - } + if (sscanf(dlv.bundleSourceEid, "ipn:%u.%u", &src_node_id, &src_service_id) != 2) { + log_error("bp_recv_once: failed to parse bundleSourceEid: %s", dlv.bundleSourceEid); + bp_release_delivery(&dlv, 0); + bp_close(sap); + goto out; + } - if (pthread_mutex_lock(&sdrmutex) != 0) { - log_error("bp_recv_once: Failed to lock SDR mutex."); - goto release_dlv; - } + if (add_adu(sdr, dlv.adu, dest_node_id, dest_service_id, src_node_id, src_service_id) < 0) { + log_error("bp_recv_once: failed to add bundle reference."); + bp_release_delivery(&dlv, 0); + bp_close(sap); + goto out; + } - if (sdr_begin_xn(sdr) == 0) { - log_error("bp_recv_once: sdr_begin_xn failed."); - pthread_mutex_unlock(&sdrmutex); - goto release_dlv; - } + if (pthread_mutex_lock(&sdrmutex) != 0) { + log_error("bp_recv_once: Failed to lock SDR mutex."); + bp_release_delivery(&dlv, 0); + bp_close(sap); + goto out; + } - *payload_size = zco_source_data_length(sdr, dlv.adu); - payload = malloc(*payload_size); - if (!payload) { - log_error("bp_recv_once: Failed to allocate memory for payload."); - sdr_end_xn(sdr); - pthread_mutex_unlock(&sdrmutex); - goto release_dlv; - } + if (sdr_begin_xn(sdr) == 0) { + log_error("bp_recv_once: sdr_begin_xn failed."); + pthread_mutex_unlock(&sdrmutex); + bp_release_delivery(&dlv, 0); + bp_close(sap); + goto out; + } + + payload_size = zco_source_data_length(sdr, dlv.adu); + payload = malloc(payload_size); + if (!payload) { + log_error("bp_recv_once: Failed to allocate memory for payload."); + sdr_end_xn(sdr); + pthread_mutex_unlock(&sdrmutex); + bp_release_delivery(&dlv, 0); + bp_close(sap); + goto out; + } + + zco_start_receiving(dlv.adu, &reader); + if (zco_receive_source(sdr, &reader, payload_size, payload) < 0) { + log_error("bp_recv_once: zco_receive_source failed."); + free(payload); + payload = NULL; + sdr_end_xn(sdr); + pthread_mutex_unlock(&sdrmutex); + bp_release_delivery(&dlv, 0); + bp_close(sap); + goto out; + } - zco_start_receiving(dlv.adu, &reader); - if (zco_receive_source(sdr, &reader, *payload_size, payload) < 0) { - log_error("bp_recv_once: zco_receive_source failed."); - free(payload); - payload = NULL; sdr_end_xn(sdr); pthread_mutex_unlock(&sdrmutex); - goto release_dlv; + bp_release_delivery(&dlv, 0); + bp_close(sap); } - sdr_end_xn(sdr); - pthread_mutex_unlock(&sdrmutex); - bp_release_delivery(&dlv, 0); - bp_close(sap); + if (payload == NULL) { + log_info("bp_recv_once: no payload received for node_id=%u service_id=%u", dest_node_id, + dest_service_id); + goto out; + } - return payload; + reply.is_present = true; + reply.payload = payload; + reply.payload_size = payload_size; + reply.src_node_id = src_node_id; + reply.src_service_id = src_service_id; -release_dlv: - bp_release_delivery(&dlv, 0); -close_sap: - bp_close(sap); - return NULL; +out: + if (payload && !reply.is_present) { + free(payload); + } + return reply; } diff --git a/daemon/ion.h b/daemon/ion.h index 05310e4..f3ee93d 100644 --- a/daemon/ion.h +++ b/daemon/ion.h @@ -2,19 +2,18 @@ #define ION_H #include "bp.h" +#include -struct adu_reference { - Object adu; - u_int32_t node_id; - u_int32_t service_id; - struct adu_reference *next; +struct reply_bundle { + bool is_present; + void *payload; + size_t payload_size; + u_int32_t src_node_id; + u_int32_t src_service_id; }; -int add_adu(Sdr sdr, Object adu, u_int32_t node_id, u_int32_t service_id); -Object find_adu(Sdr sdr, u_int32_t node_id, u_int32_t service_id); -int destroy_adu(Sdr sdr, u_int32_t node_id, u_int32_t service_id); - -int bp_send_to_eid(Sdr sdr, void *payload, int payload_size, char *dest_eid, int eid_size); -void *bp_recv_once(Sdr sdr, u_int32_t node_id, u_int32_t service_id, size_t *payload_size); +int destroy_bundle(Sdr sdr, Object adu); +int bp_send_to_eid(Sdr sdr, void *payload, size_t payload_size, char *dest_eid); +struct reply_bundle bp_recv_once(Sdr sdr, u_int32_t dest_node_id, u_int32_t dest_service_id); #endif \ No newline at end of file diff --git a/include/bp_socket.h b/include/bp_socket.h index c42f6e4..c962b9f 100644 --- a/include/bp_socket.h +++ b/include/bp_socket.h @@ -17,9 +17,10 @@ /* Generic Netlink attributes */ enum bp_genl_attrs { BP_GENL_A_UNSPEC, - BP_GENL_A_SOCKID, - BP_GENL_A_NODE_ID, - BP_GENL_A_SERVICE_ID, + BP_GENL_A_SRC_NODE_ID, + BP_GENL_A_SRC_SERVICE_ID, + BP_GENL_A_DEST_NODE_ID, + BP_GENL_A_DEST_SERVICE_ID, BP_GENL_A_PAYLOAD, __BP_GENL_A_MAX, }; @@ -31,6 +32,7 @@ enum bp_genl_cmds { BP_GENL_CMD_UNSPEC, BP_GENL_CMD_SEND_BUNDLE, BP_GENL_CMD_REQUEST_BUNDLE, + BP_GENL_CMD_CANCEL_BUNDLE_REQUEST, BP_GENL_CMD_DELIVER_BUNDLE, BP_GENL_CMD_DESTROY_BUNDLE, __BP_GENL_CMD_MAX, diff --git a/receiver.c b/receiver.c index 3a6ed42..737e974 100644 --- a/receiver.c +++ b/receiver.c @@ -25,6 +25,7 @@ int main(int argc, char *argv[]) { char buffer[BUFFER_SIZE]; uint32_t node_id; uint32_t service_id; + struct sockaddr_bp src_addr; int ret = 0; if (argc < 3) { @@ -71,16 +72,25 @@ int main(int argc, char *argv[]) { memset(&msg, 0, sizeof(msg)); msg.msg_iov = &iov; msg.msg_iovlen = 1; + memset(&src_addr, 0, sizeof(src_addr)); + msg.msg_name = &src_addr; + msg.msg_namelen = sizeof(src_addr); printf("Listening for incoming messages...\n"); ssize_t n = recvmsg(sfd, &msg, 0); if (n < 0) { - perror("recvmsg failed"); - ret = EXIT_FAILURE; - goto out; + perror("recvmsg failed"); + ret = EXIT_FAILURE; + goto out; } - + printf("Received message (%zd bytes): %.*s\n", n, (int)n, buffer); + if (msg.msg_namelen >= sizeof(struct sockaddr_bp)) { + printf("Bundle sent by ipn:%u.%u\n", src_addr.bp_addr.ipn.node_id, + src_addr.bp_addr.ipn.service_id); + } else { + printf("Source address not available\n"); + } out: close(sfd); diff --git a/sender.c b/sender.c index 87c250a..e7d717a 100644 --- a/sender.c +++ b/sender.c @@ -9,7 +9,7 @@ #define AF_BP 28 int main(int argc, char *argv[]) { - struct sockaddr_bp dest_addr; + struct sockaddr_bp dest_addr, src_addr; int fd; uint32_t node_id, service_id; int ret = 0; @@ -38,6 +38,17 @@ int main(int argc, char *argv[]) { return EXIT_FAILURE; } + src_addr.bp_family = AF_BP; + src_addr.bp_scheme = BP_SCHEME_IPN; + src_addr.bp_addr.ipn.node_id = 10; + src_addr.bp_addr.ipn.service_id = 1; + + if (bind(fd, (struct sockaddr *)&src_addr, sizeof(src_addr)) == -1) { + perror("Failed to bind socket"); + ret = EXIT_FAILURE; + goto out; + } + dest_addr.bp_family = AF_BP; dest_addr.bp_scheme = BP_SCHEME_IPN; dest_addr.bp_addr.ipn.node_id = node_id;