diff --git a/bp_socket/af_bp.c b/bp_socket/af_bp.c index e4aada2..c8e3e94 100644 --- a/bp_socket/af_bp.c +++ b/bp_socket/af_bp.c @@ -29,7 +29,10 @@ static struct sock* bp_alloc_socket(struct net* net, int kern) bp = bp_sk(sk); skb_queue_head_init(&bp->queue); init_waitqueue_head(&bp->wait_queue); - + bp->bp_node_id = 0; + bp->bp_service_id = 0; + bp->recv_aborted = false; + bp->has_bound = false; out: return sk; } @@ -132,8 +135,7 @@ int bp_bind(struct socket* sock, struct sockaddr* uaddr, int addr_len) if (iter_bp->bp_service_id == service_id && iter_bp->bp_node_id == node_id) { read_unlock_bh(&bp_list_lock); - err = -EADDRINUSE; - goto out; + return -EADDRINUSE; } } read_unlock_bh(&bp_list_lock); @@ -142,14 +144,21 @@ int bp_bind(struct socket* sock, struct sockaddr* uaddr, int addr_len) bp = bp_sk(sk); bp->bp_service_id = service_id; bp->bp_node_id = node_id; + + err = open_endpoint_doit(bp->bp_node_id, bp->bp_service_id, 8443); + if (err < 0) { + pr_err("bp_bind: open_endpoint_doit failed (%d)\n", err); + release_sock(sk); + return err; + } + write_lock_bh(&bp_list_lock); sk_add_node(sk, &bp_list); write_unlock_bh(&bp_list_lock); -out: + bp->has_bound = true; release_sock(sk); - - return err; + return 0; } int bp_release(struct socket* sock) @@ -164,6 +173,14 @@ int bp_release(struct socket* sock) sock_orphan(sk); bp = bp_sk(sk); + if (bp->has_bound && bp->recv_aborted) { + abort_endpoint_doit(bp->bp_node_id, bp->bp_service_id, 8443); + } else { + if (bp->has_bound) + close_endpoint_doit( + bp->bp_node_id, bp->bp_service_id, 8443); + } + write_lock_bh(&bp_list_lock); sk_del_node_init(sk); write_unlock_bh(&bp_list_lock); @@ -247,13 +264,12 @@ int bp_sendmsg(struct socket* sock, struct msghdr* msg, size_t size) ret = send_bundle_doit( sockid, (char*)payload, size, node_id, service_id, 8443); - kfree(payload); - if (ret < 0) { pr_err("bp_sendmsg: send_bundle_doit failed (%d)\n", ret); return ret; } + kfree(payload); return size; } @@ -274,12 +290,7 @@ int bp_recvmsg(struct socket* sock, struct msghdr* msg, size_t size, int flags) bp->wait_queue, !skb_queue_empty(&bp->queue)); if (ret < 0) { pr_err("bp_recvmsg: interrupted while waiting\n"); - goto out_unlock; - } - - if (sock_flag(sk, SOCK_DEAD)) { - pr_err("bp_recvmsg: socket closed while waiting\n"); - ret = -ECONNRESET; + bp->recv_aborted = true; goto out_unlock; } diff --git a/bp_socket/af_bp.h b/bp_socket/af_bp.h index 842f080..4b3bd18 100644 --- a/bp_socket/af_bp.h +++ b/bp_socket/af_bp.h @@ -17,6 +17,8 @@ struct bp_sock { u_int8_t bp_service_id; struct sk_buff_head queue; wait_queue_head_t wait_queue; + bool has_bound; + bool recv_aborted; }; int bp_bind(struct socket* sock, struct sockaddr* addr, int addr_len); diff --git a/bp_socket/bp_genl.c b/bp_socket/bp_genl.c index 20cf555..f676b98 100644 --- a/bp_socket/bp_genl.c +++ b/bp_socket/bp_genl.c @@ -43,6 +43,142 @@ int fail_doit(struct sk_buff* skb, struct genl_info* info) return -1; } +int open_endpoint_doit(u32 node_id, u32 service_id, int port_id) +{ + int ret = 0; + void* hdr; + struct sk_buff* msg; + int msg_size; + + /* Taille du message avec deux attributs */ + msg_size = nla_total_size(sizeof(u32)) + nla_total_size(sizeof(u32)); + msg = genlmsg_new(msg_size + GENL_HDRLEN, GFP_KERNEL); + if (!msg) { + pr_err("bp_open_request: failed to allocate message buffer\n"); + return -ENOMEM; + } + + hdr = genlmsg_put(msg, 0, 0, &genl_fam, 0, BP_GENL_CMD_OPEN_ENDPOINT); + if (!hdr) { + pr_err("bp_open_request: failed to create genetlink header\n"); + nlmsg_free(msg); + return -EMSGSIZE; + } + + ret = nla_put_u32(msg, BP_GENL_A_NODE_ID, node_id); + if (ret) { + pr_err("bp_open_request: failed to add NODE_ID\n"); + goto fail; + } + + ret = nla_put_u32(msg, BP_GENL_A_SERVICE_ID, service_id); + if (ret) { + pr_err("bp_open_request: failed to add SERVICE_ID\n"); + goto fail; + } + + genlmsg_end(msg, hdr); + ret = genlmsg_unicast(&init_net, msg, port_id); + if (ret != 0) { + pr_err("bp_open_request: genlmsg_unicast failed (%d)\n", ret); + } + return ret; + +fail: + genlmsg_cancel(msg, hdr); + nlmsg_free(msg); + return ret; +} + +int close_endpoint_doit(u32 node_id, u32 service_id, int port_id) +{ + int ret = 0; + void* hdr; + struct sk_buff* msg; + int msg_size; + + /* Compute total size of Netlink attributes */ + msg_size = nla_total_size(sizeof(u32)) + nla_total_size(sizeof(u32)); + /* Allocate a new buffer */ + msg = genlmsg_new(msg_size + GENL_HDRLEN, GFP_KERNEL); + if (!msg) { + pr_err("bp_close_request: failed to allocate message buffer\n"); + return -ENOMEM; + } + /* Generic Netlink header */ + hdr = genlmsg_put(msg, 0, 0, &genl_fam, 0, BP_GENL_CMD_CLOSE_ENDPOINT); + if (!hdr) { + pr_err("bp_close_request: failed to create genetlink header\n"); + nlmsg_free(msg); + return -EMSGSIZE; + } + /* And the message */ + ret = nla_put_u32(msg, BP_GENL_A_NODE_ID, node_id); + if (ret) { + pr_err("bp_close_request: failed to put NODE_ID (%d)\n", ret); + goto fail; + } + ret = nla_put_u32(msg, BP_GENL_A_SERVICE_ID, service_id); + if (ret) { + pr_err( + "bp_close_request: failed to put SERVICE_ID (%d)\n", ret); + goto fail; + } + genlmsg_end(msg, hdr); + ret = genlmsg_unicast(&init_net, msg, port_id); + if (ret != 0) { + pr_err("bp_close_request: genlmsg_unicast failed (%d)\n", ret); + } + return ret; +fail: + genlmsg_cancel(msg, hdr); + nlmsg_free(msg); + return ret; +} + +int abort_endpoint_doit(u32 node_id, u32 service_id, int port_id) +{ + int ret = 0; + void* hdr; + struct sk_buff* msg; + int msg_size; + + msg_size = 2 * nla_total_size(sizeof(u32)); + msg = genlmsg_new(msg_size + GENL_HDRLEN, GFP_KERNEL); + if (!msg) { + pr_err("cancel_bundle: failed to allocate message buffer\n"); + return -ENOMEM; + } + + hdr = genlmsg_put(msg, 0, 0, &genl_fam, 0, BP_GENL_CMD_ABORT_ENDPOINT); + if (!hdr) { + pr_err("cancel_bundle: failed to create genetlink header\n"); + nlmsg_free(msg); + return -EMSGSIZE; + } + + if ((ret = nla_put_u32(msg, BP_GENL_A_SERVICE_ID, service_id))) { + pr_err("cancel_bundle: failed to put service_id\n"); + genlmsg_cancel(msg, hdr); + nlmsg_free(msg); + return ret; + } + + if ((ret = nla_put_u32(msg, BP_GENL_A_NODE_ID, node_id))) { + pr_err("cancel_bundle: failed to put node_id\n"); + genlmsg_cancel(msg, hdr); + nlmsg_free(msg); + return ret; + } + + genlmsg_end(msg, hdr); + ret = genlmsg_unicast(&init_net, msg, port_id); + if (ret != 0) { + pr_err("cancel_bundle: unicast failed (%d)\n", ret); + } + return ret; +} + int send_bundle_doit(u64 sockid, const char* payload, int payload_size, u32 node_id, u32 service_id, int port_id) { @@ -160,23 +296,16 @@ int deliver_bundle_doit(struct sk_buff* skb, struct genl_info* info) size_t payload_len; struct sk_buff* new_skb; - pr_info("TRIGGER: received message\n"); - - if (!info->attrs[BP_GENL_A_SERVICE_ID]) { - pr_err("attribute missing from message\n"); + if (!info->attrs[BP_GENL_A_SERVICE_ID] + || !info->attrs[BP_GENL_A_PAYLOAD]) { + pr_err("deliver_bundle: missing required attributes\n"); return -EINVAL; } - service_id = nla_get_u32(info->attrs[BP_GENL_A_SERVICE_ID]); - if (!info->attrs[BP_GENL_A_PAYLOAD]) { - pr_err("empty message received\n"); - return -EINVAL; - } + service_id = nla_get_u32(info->attrs[BP_GENL_A_SERVICE_ID]); payload = nla_data(info->attrs[BP_GENL_A_PAYLOAD]); payload_len = nla_len(info->attrs[BP_GENL_A_PAYLOAD]); - pr_info("Message for service %d: %s\n", service_id, payload); - new_skb = alloc_skb(payload_len, GFP_KERNEL); if (!new_skb) { pr_err("Failed to allocate sk_buff for payload\n"); @@ -187,7 +316,7 @@ int deliver_bundle_doit(struct sk_buff* skb, struct genl_info* info) read_lock_bh(&bp_list_lock); sk_for_each(sk, &bp_list) { - lock_sock(sk); + bh_lock_sock(sk); bp = bp_sk(sk); if (bp->bp_service_id == service_id) { @@ -195,10 +324,10 @@ int deliver_bundle_doit(struct sk_buff* skb, struct genl_info* info) skb_queue_tail(&bp->queue, new_skb); if (waitqueue_active(&bp->wait_queue)) wake_up_interruptible(&bp->wait_queue); - release_sock(sk); + bh_unlock_sock(sk); break; } - release_sock(sk); + bh_unlock_sock(sk); } read_unlock_bh(&bp_list_lock); diff --git a/bp_socket/bp_genl.h b/bp_socket/bp_genl.h index 8d9f9c0..1b17218 100644 --- a/bp_socket/bp_genl.h +++ b/bp_socket/bp_genl.h @@ -6,6 +6,9 @@ extern struct genl_family genl_fam; int fail_doit(struct sk_buff* skb, struct genl_info* info); +int open_endpoint_doit(u32 node_id, u32 service_id, int port_id); +int close_endpoint_doit(u32 node_id, u32 service_id, int port_id); +int abort_endpoint_doit(u32 node_id, u32 service_id, int port_id); int send_bundle_doit(u64 sockid, const char* payload, int payload_size, u32 node_id, u32 service_id, int port_id); int deliver_bundle_doit(struct sk_buff* skb, struct genl_info* info); diff --git a/daemon/bp_genl.c b/daemon/bp_genl.c index b52109e..c7990e7 100644 --- a/daemon/bp_genl.c +++ b/daemon/bp_genl.c @@ -66,6 +66,12 @@ int genl_bp_sock_recvmsg_cb(struct nl_msg *msg, void *arg) { } switch (genlhdr->cmd) { + case BP_GENL_CMD_OPEN_ENDPOINT: + return handle_open_endpoint(daemon, attrs); + case BP_GENL_CMD_CLOSE_ENDPOINT: + return handle_close_endpoint(daemon, attrs); + case BP_GENL_CMD_ABORT_ENDPOINT: + return handle_abort_endpoint(daemon, attrs); case BP_GENL_CMD_SEND_BUNDLE: return handle_send_bundle(daemon, attrs); case BP_GENL_CMD_REQUEST_BUNDLE: diff --git a/daemon/bp_genl.h b/daemon/bp_genl.h index d7b3d6b..e8e9e42 100644 --- a/daemon/bp_genl.h +++ b/daemon/bp_genl.h @@ -6,8 +6,6 @@ struct nl_sock *genl_bp_sock_init(Daemon *daemon); void genl_bp_sock_close(Daemon *daemon); -int genl_bp_sock_sendmsg(Daemon *self, void *payload, size_t len); -int genl_bp_sock_recvmsg(Daemon *self, void *payload, size_t len); int genl_bp_sock_recvmsg_cb(struct nl_msg *msg, void *arg); #endif diff --git a/daemon/bp_genl_handlers.c b/daemon/bp_genl_handlers.c index e6a957d..77b0a2e 100644 --- a/daemon/bp_genl_handlers.c +++ b/daemon/bp_genl_handlers.c @@ -12,6 +12,69 @@ #include "ion.h" #include "log.h" +int handle_open_endpoint(Daemon *daemon, struct nlattr **attrs) { + if (!attrs[BP_GENL_A_NODE_ID] || !attrs[BP_GENL_A_SERVICE_ID]) { + log_error("Missing attribute(s) in OPEN_ENDPOINT"); + return NL_SKIP; + } + + uint32_t node_id = nla_get_u32(attrs[BP_GENL_A_NODE_ID]); + uint32_t service_id = nla_get_u32(attrs[BP_GENL_A_SERVICE_ID]); + + // int ret = bp_open_and_register(node_id, service_id); + // if (ret < 0) { + // log_error("Failed to open BP endpoint for node %u, service %u: %s", node_id, service_id, + // strerror(-ret)); + // return ret; + // } + + log_info("OPEN_ENDPOINT: opening BpSAP (service ID %u)", service_id); + + return 0; +} + +int handle_close_endpoint(Daemon *daemon, struct nlattr **attrs) { + if (!attrs[BP_GENL_A_NODE_ID] || !attrs[BP_GENL_A_SERVICE_ID]) { + log_error("Missing attribute(s) in CLOSE_ENDPOINT"); + return NL_SKIP; + } + + uint32_t node_id = nla_get_u32(attrs[BP_GENL_A_NODE_ID]); + uint32_t service_id = nla_get_u32(attrs[BP_GENL_A_SERVICE_ID]); + + // int ret = bp_close_and_unregister(node_id, service_id); + // if (ret < 0) { + // log_error("Failed to close BP endpoint for node %u, service %u", node_id, service_id); + // return ret; + // } + + log_info("CLOSE_ENDPOINT: closing BpSAP (service ID %u)", service_id); + + return 0; +} + +int handle_abort_endpoint(Daemon *, struct nlattr **attrs) { + if (!attrs[BP_GENL_A_SERVICE_ID] || !attrs[BP_GENL_A_NODE_ID]) { + log_error("Missing attribute(s) in ABORT_ENDPOINT"); + return NL_SKIP; + } + + uint32_t service_id = nla_get_u32(attrs[BP_GENL_A_SERVICE_ID]); + uint32_t node_id = nla_get_u32(attrs[BP_GENL_A_NODE_ID]); + + // bp_cancel_recv_once(node_id, service_id); + + // int ret = bp_close_and_unregister(node_id, service_id); + // if (ret < 0) { + // log_error("Failed to close BP endpoint for node %u, service %u", node_id, service_id); + // return ret; + // } + + log_info("ABORT_ENDPOINT: abort request bundle and closing BpSAP (service ID %u)", service_id); + + return 0; +} + int handle_send_bundle(Daemon *daemon, struct nlattr **attrs) { if (!attrs[BP_GENL_A_SOCKID] || !attrs[BP_GENL_A_PAYLOAD] || !attrs[BP_GENL_A_NODE_ID] || !attrs[BP_GENL_A_SERVICE_ID]) { @@ -44,7 +107,7 @@ int handle_request_bundle(Daemon *daemon, struct nlattr **attrs) { } uint32_t service_id = nla_get_u32(attrs[BP_GENL_A_SERVICE_ID]); - log_info("REQUEST_BUNDLE: Bundle request initiated (service ID %u)", service_id); + log_info("REQUEST_BUNDLE: bundle request initiated (service ID %u)", service_id); struct thread_args *args = malloc(sizeof(struct thread_args)); if (!args) { @@ -55,9 +118,10 @@ int handle_request_bundle(Daemon *daemon, struct nlattr **attrs) { args->service_id = service_id; args->netlink_sock = daemon->genl_bp_sock; args->netlink_family = daemon->genl_bp_family_id; + args->sdr = daemon->sdr; pthread_t thread; - if (pthread_create(&thread, NULL, bp_recv_thread, args) != 0) { + if (pthread_create(&thread, NULL, handle_recv_thread, args) != 0) { log_error("Failed to create thread"); free(args); return -1; @@ -68,19 +132,26 @@ int handle_request_bundle(Daemon *daemon, struct nlattr **attrs) { return 0; } -void *bp_recv_thread(void *arg) { +void *handle_recv_thread(void *arg) { struct thread_args *args = (struct thread_args *)arg; - handle_deliver_bundle(args); + char *payload = NULL; + int payload_size; + + payload_size = bp_recv_once(args->sdr, args->service_id, &payload); + if (payload_size < 1) { + log_info("Exit recv thread (service ID %u)", args->service_id); + free(args); + return NULL; + } + + handle_deliver_bundle(payload, payload_size, args); + free(args); return NULL; } -int handle_deliver_bundle(struct thread_args *args) { - char *payload = bp_recv_once(args->service_id); - if (!payload) { - log_error("DELIVER_BUNDLE: No payload received (service ID %u)", args->service_id); - return -1; - } +int handle_deliver_bundle(char *payload, int payload_size, struct thread_args *args) { + int err = 0; struct nl_msg *msg = nlmsg_alloc(); if (!msg) { @@ -92,23 +163,27 @@ int handle_deliver_bundle(struct thread_args *args) { void *hdr = genlmsg_put(msg, NL_AUTO_PORT, NL_AUTO_SEQ, args->netlink_family, 0, 0, BP_GENL_CMD_DELIVER_BUNDLE, BP_GENL_VERSION); if (!hdr || nla_put_u32(msg, BP_GENL_A_SERVICE_ID, args->service_id) < 0 || - nla_put(msg, BP_GENL_A_PAYLOAD, strlen(payload) + 1, payload) < 0) { + nla_put(msg, BP_GENL_A_PAYLOAD, payload_size, payload) < 0) { log_error("DELIVER_BUNDLE: Failed to construct Netlink reply"); nlmsg_free(msg); free(payload); return -EMSGSIZE; } - int err = nl_send_auto(args->netlink_sock, msg); - nlmsg_free(msg); - free(payload); - + err = nl_send_auto(args->netlink_sock, msg); if (err < 0) { log_error("DELIVER_BUNDLE: Failed to send Netlink message (service ID %u)", args->service_id); + nlmsg_free(msg); + free(payload); return err; } - log_info("DELIVER_BUNDLE: Bundle successfully delivered (service ID %u)", args->service_id); - return 0; + log_info("DELIVER_BUNDLE: received bundle and forwarding to kernel (service ID %u)", + args->service_id); + + nlmsg_free(msg); + free(payload); + + return err; } \ No newline at end of file diff --git a/daemon/bp_genl_handlers.h b/daemon/bp_genl_handlers.h index c7d4331..4bf82f6 100644 --- a/daemon/bp_genl_handlers.h +++ b/daemon/bp_genl_handlers.h @@ -7,11 +7,16 @@ struct thread_args { struct nl_sock *netlink_sock; int netlink_family; unsigned int service_id; + Sdr sdr; }; +int handle_open_endpoint(Daemon *daemon, struct nlattr **attrs); +int handle_close_endpoint(Daemon *daemon, struct nlattr **attrs); +int handle_abort_endpoint(Daemon *daemon, struct nlattr **attrs); int handle_send_bundle(Daemon *daemon, struct nlattr **attrs); int handle_request_bundle(Daemon *daemon, struct nlattr **attrs); -int handle_deliver_bundle(struct thread_args *args); -void *bp_recv_thread(void *arg); +int handle_deliver_bundle(char *payload, int payload_size, struct thread_args *args); + +void *handle_recv_thread(void *arg); #endif \ No newline at end of file diff --git a/daemon/bp_sap_registry.c b/daemon/bp_sap_registry.c new file mode 100644 index 0000000..bc5deb7 --- /dev/null +++ b/daemon/bp_sap_registry.c @@ -0,0 +1,75 @@ +#include "bp_sap_registry.h" +#include "log.h" +#include +#include +#include +#include + +typedef struct SapNode { + uint32_t node_id; + uint32_t service_id; + BpSAP sap; + struct SapNode *next; +} SapNode; + +static struct SapNode *head = NULL; +static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; + +void bp_sap_registry_add(uint32_t node_id, uint32_t service_id, BpSAP sap) { + SapNode *node = malloc(sizeof(SapNode)); + node->node_id = node_id; + node->service_id = service_id; + node->sap = sap; + + pthread_mutex_lock(&mutex); + node->next = head; + head = node; + pthread_mutex_unlock(&mutex); +} + +BpSAP bp_sap_registry_find(uint32_t node_id, uint32_t service_id) { + pthread_mutex_lock(&mutex); + + SapNode *current = head; + while (current) { + if (current->node_id == node_id && current->service_id == service_id) { + pthread_mutex_unlock(&mutex); + return current->sap; + } + current = current->next; + } + + pthread_mutex_unlock(&mutex); + return NULL; +} + +void bp_sap_registry_remove(uint32_t node_id, uint32_t service_id) { + pthread_mutex_lock(&mutex); + + SapNode **p = &head; + while (*p) { + if ((*p)->node_id == node_id && (*p)->service_id == service_id) { + SapNode *to_delete = *p; + *p = (*p)->next; + free(to_delete); + break; + } + p = &(*p)->next; + } + + pthread_mutex_unlock(&mutex); +} + +void bp_sap_registry_clear(void) { + pthread_mutex_lock(&mutex); + + SapNode *current = head; + while (current) { + SapNode *to_delete = current; + current = current->next; + free(to_delete); + } + head = NULL; + + pthread_mutex_unlock(&mutex); +} diff --git a/daemon/bp_sap_registry.h b/daemon/bp_sap_registry.h new file mode 100644 index 0000000..505cd4a --- /dev/null +++ b/daemon/bp_sap_registry.h @@ -0,0 +1,12 @@ +#ifndef BP_SAP_REGISTRY_H +#define BP_SAP_REGISTRY_H + +#include +#include + +void bp_sap_registry_add(uint32_t node_id, uint32_t service_id, BpSAP sap); +BpSAP bp_sap_registry_find(uint32_t node_id, uint32_t service_id); +void bp_sap_registry_remove(uint32_t node_id, uint32_t service_id); +void bp_sap_registry_clear(void); + +#endif \ No newline at end of file diff --git a/daemon/daemon.c b/daemon/daemon.c index 29d2c52..c0f7de7 100644 --- a/daemon/daemon.c +++ b/daemon/daemon.c @@ -18,7 +18,7 @@ void on_sigpipe(evutil_socket_t fd, short what, void *arg) { event_base_loopexit(base, NULL); } -void on_netlink(int fd, short event, void *arg) { +void on_netlink(evutil_socket_t fd, short event, void *arg) { Daemon *daemon = (Daemon *)arg; nl_recvmsgs_default( daemon->genl_bp_sock); // call the callback registered with genl_bp_sock_recvmsg_cb() @@ -68,6 +68,7 @@ int daemon_start(Daemon *self) { daemon_free(self); return 1; } + self->sdr = bp_get_sdr(); log_info("Successfully attached to ION"); log_info("Daemon started successfully"); @@ -75,6 +76,7 @@ int daemon_start(Daemon *self) { log_info("Daemon terminated"); daemon_free(self); + bp_detach(); return 0; } diff --git a/daemon/daemon.h b/daemon/daemon.h index b4d9233..7739658 100644 --- a/daemon/daemon.h +++ b/daemon/daemon.h @@ -1,6 +1,7 @@ #ifndef DAEMON_H #define DAEMON_H +#include "sdrxn.h" #include #include @@ -9,6 +10,7 @@ typedef struct Daemon { char *genl_bp_family_name; int genl_bp_family_id; unsigned int nl_pid; + Sdr sdr; struct event_base *base; struct event *event_on_sigpipe; diff --git a/daemon/ion.c b/daemon/ion.c index 7d822ef..3da1e6d 100644 --- a/daemon/ion.c +++ b/daemon/ion.c @@ -1,6 +1,52 @@ +#include "ion.h" +#include "bp_sap_registry.h" #include "log.h" #include "sdr.h" #include +#include + +static pthread_mutex_t sdrmutex = PTHREAD_MUTEX_INITIALIZER; + +int bp_open_and_register(uint32_t node_id, uint32_t service_id) { + BpSAP sap; + char eid[64]; + int eid_size; + Sdr sdr = getIonsdr(); + + if (sdr == NULL) { + log_error("Failed to get SDR."); + return -1; + } + + eid_size = snprintf(eid, sizeof(eid), "ipn:%u.%u", node_id, service_id); + if (eid_size < 0 || eid_size >= (int)sizeof(eid)) { + log_error("Failed to construct EID string."); + return -1; + } + + memset(&sap, 0, sizeof(BpSAP)); + if (bp_open(eid, &sap) < 0) { + log_error("Failed to open source endpoint."); + return -1; + } + + bp_sap_registry_add(node_id, service_id, sap); + + return 0; +} + +int bp_close_and_unregister(uint32_t node_id, uint32_t service_id) { + BpSAP sap = bp_sap_registry_find(node_id, service_id); + if (sap == NULL) { + log_error("bp_close_and_unregister: BpSAP is NULL"); + return -1; + } + + bp_close(sap); + bp_sap_registry_remove(node_id, service_id); + + return 0; +} int bp_send_to_eid(char *payload, int payload_size, char *destEid, int eid_size) { Sdr sdr; @@ -41,64 +87,79 @@ int bp_send_to_eid(char *payload, int payload_size, char *destEid, int eid_size) return 1; } -char *bp_recv_once(int service_id) { - BpSAP txSap; +int bp_recv_once(Sdr sdr, int service_id, char **payload) { + BpSAP sap; BpDelivery dlv; - Sdr sdr = getIonsdr(); ZcoReader reader; - char *eid = NULL; - char *payload = NULL; - int eid_size; + int bundleLen; + int rc = -1; int nodeNbr = getOwnNodeNbr(); - vast len; + char eid[64]; + int eid_size; - eid_size = snprintf(NULL, 0, "ipn:%d.%d", nodeNbr, service_id) + 1; - eid = malloc(eid_size); - if (!eid) { - log_error("Failed to allocate EID"); - return NULL; + eid_size = snprintf(eid, sizeof(eid), "ipn:%u.%u", nodeNbr, service_id); + if (eid_size < 0 || eid_size >= (int)sizeof(eid)) { + log_error("Failed to construct EID string."); + return rc; } - snprintf(eid, eid_size, "ipn:%d.%d", nodeNbr, service_id); - if (bp_open(eid, &txSap) < 0 || txSap == NULL) { - log_error("Failed to open source endpoint."); - goto out; + if (bp_open(eid, &sap) < 0) { + log_error("bp_recv_once: Failed to open BpSAP for node_id=%d service_id=%d", nodeNbr, + service_id); + return rc; } - if (bp_receive(txSap, &dlv, BP_BLOCKING) < 0) { - log_error("Bundle reception failed."); + if (bp_receive(sap, &dlv, BP_BLOCKING) < 0) { + log_error("bp_recv_once: Bundle reception failed."); goto out; } - if (dlv.result != BpPayloadPresent) { - log_info("bp_recv_once: no payload"); + if (dlv.result != BpPayloadPresent || dlv.adu == 0) { + bp_release_delivery(&dlv, 1); goto out; } - if (!sdr_begin_xn(sdr)) goto out; - - int payload_size = zco_source_data_length(sdr, dlv.adu); - payload = malloc(payload_size); - if (!payload) { - log_error("Failed to allocate memory for payload"); - sdr_exit_xn(sdr); + if (pthread_mutex_lock(&sdrmutex) != 0) { + putErrmsg("Couldn't take sdr mutex.", NULL); goto out; } - zco_start_receiving(dlv.adu, &reader); - len = zco_receive_source(sdr, &reader, payload_size, payload); - - if (sdr_end_xn(sdr) < 0 || len < 0) { - log_error("Failed to read payload"); - free(payload); - payload = NULL; + bundleLen = zco_source_data_length(sdr, dlv.adu); + log_info("bp_recv_once: Received bundle length: %d", bundleLen); + *payload = malloc(bundleLen); + if (!*payload) { + log_error("bp_recv_once: Failed to allocate memory for payload."); + sdr_exit_xn(sdr); + bp_release_delivery(&dlv, 1); goto out; } + zco_start_receiving(dlv.adu, &reader); + CHKZERO(sdr_begin_xn(sdr)); + rc = zco_receive_source(sdr, &reader, bundleLen, *payload); + log_info("bp_recv_once: Received %d bytes from ZCO.", rc); + if (sdr_end_xn(sdr) < 0 || rc < 0) putErrmsg("Can't receive payload.", NULL); + pthread_mutex_unlock(&sdrmutex); + bp_release_delivery(&dlv, 1); + + // sap = bp_sap_registry_find(nodeNbr, service_id); + // if (sap == NULL) { + // log_error("bp_recv_once: BpSAP is NULL for node_id=%d service_id=%d", nodeNbr, + // service_id); return -1; + // } + // log_info("BpSAP content: %p", sap); out: - if (eid) free(eid); - bp_release_delivery(&dlv, 0); - bp_close(txSap); + bp_close(sap); + return rc; +} + +void bp_cancel_recv_once(uint32_t node_id, uint32_t service_id) { + BpSAP sap = bp_sap_registry_find(node_id, service_id); + Sdr sdr = getIonsdr(); + if (sap == NULL) { + log_error("bp_interrupt: BpSAP is NULL"); + return; + } - return payload; + bp_interrupt(sap); } diff --git a/daemon/ion.h b/daemon/ion.h index 6090c0c..4366eff 100644 --- a/daemon/ion.h +++ b/daemon/ion.h @@ -1,7 +1,12 @@ #ifndef ION_H #define ION_H +#include + +int bp_open_and_register(uint32_t node_id, uint32_t service_id); +int bp_close_and_unregister(uint32_t node_id, uint32_t service_id); int bp_send_to_eid(char *payload, int payload_size, char *eid, int eid_size); -char *bp_recv_once(int service_id); +int bp_recv_once(Sdr sdr, int service_id, char **payload); +void bp_cancel_recv_once(uint32_t node_id, uint32_t service_id); #endif \ No newline at end of file diff --git a/daemon/main.c b/daemon/main.c index b9c908f..24093db 100644 --- a/daemon/main.c +++ b/daemon/main.c @@ -1,10 +1,18 @@ #include "../include/bp_socket.h" #include "daemon.h" #include "log.h" +#include +#include #define NL_PID 8443 int main(int argc, char *argv[]) { + if (geteuid() != 0) { + log_error("This program must be run as root or with CAP_NET_ADMIN (required by " + "GENL_ADMIN_PERM)."); + return EXIT_FAILURE; + } + Daemon daemon = { .genl_bp_sock = NULL, .genl_bp_family_name = BP_GENL_NAME, diff --git a/include/bp_socket.h b/include/bp_socket.h index c193b1e..ba8ef0c 100644 --- a/include/bp_socket.h +++ b/include/bp_socket.h @@ -29,6 +29,9 @@ enum bp_genl_attrs { /* Commands */ enum bp_genl_cmds { BP_GENL_CMD_UNSPEC, + BP_GENL_CMD_OPEN_ENDPOINT, + BP_GENL_CMD_CLOSE_ENDPOINT, + BP_GENL_CMD_ABORT_ENDPOINT, BP_GENL_CMD_SEND_BUNDLE, BP_GENL_CMD_REQUEST_BUNDLE, BP_GENL_CMD_DELIVER_BUNDLE,