diff --git a/bp_socket/af_bp.c b/bp_socket/af_bp.c index 49ea184..794fc7e 100644 --- a/bp_socket/af_bp.c +++ b/bp_socket/af_bp.c @@ -265,8 +265,8 @@ int bp_sendmsg(struct socket* sock, struct msghdr* msg, size_t size) goto err_free; } - ret = send_bundle_doit((uintptr_t)sock->sk->sk_socket, payload, - size, node_id, service_id, 8443); + ret = send_bundle_doit( + payload, size, node_id, service_id, 8443); if (ret < 0) { pr_err( "bp_sendmsg: send_bundle_doit failed (%d)\n", ret); @@ -335,6 +335,13 @@ int bp_recvmsg(struct socket* sock, struct msghdr* msg, size_t size, int flags) goto out; } + ret = destroy_bundle_doit(bp->bp_node_id, bp->bp_service_id, 8443); + if (ret < 0) { + pr_err( + "destroy_bundle_doit failed (%d), will retry later", ret); + // enqueue_retry(bp->bp_node_id, bp->bp_service_id); + } + ret = skb->len; out: diff --git a/bp_socket/bp_genl.c b/bp_socket/bp_genl.c index be97f6c..485a81b 100644 --- a/bp_socket/bp_genl.c +++ b/bp_socket/bp_genl.c @@ -36,8 +36,8 @@ struct genl_family genl_fam = { .n_mcgrps = ARRAY_SIZE(genl_mcgrps), }; -int send_bundle_doit(u64 sockid, void* payload, int payload_size, - u_int32_t node_id, u_int32_t service_id, int port_id) +int send_bundle_doit(void* payload, int payload_size, u_int32_t node_id, + u_int32_t service_id, int port_id) { void* msg_head; struct sk_buff* msg; @@ -62,12 +62,6 @@ int send_bundle_doit(u64 sockid, void* payload, int payload_size, goto err_free; } - ret = nla_put_u64_64bit(msg, BP_GENL_A_SOCKID, sockid, 0); - if (ret) { - pr_err("send_bundle: failed to put SOCKID (%d)\n", ret); - goto err_cancel; - } - ret = nla_put_u32(msg, BP_GENL_A_NODE_ID, node_id); if (ret) { pr_err("send_bundle: failed to put NODE_ID (%d)\n", ret); @@ -104,7 +98,7 @@ int request_bundle_doit(u_int32_t node_id, u_int32_t service_id, int port_id) size_t msg_size; int ret; - msg_size = nla_total_size(sizeof(u32)); + msg_size = 2 * nla_total_size(sizeof(u_int32_t)); msg = genlmsg_new(msg_size, GFP_KERNEL); if (!msg) { pr_err("request_bundle: failed to allocate message buffer\n"); @@ -120,6 +114,12 @@ int request_bundle_doit(u_int32_t node_id, u_int32_t service_id, int port_id) goto err_free; } + ret = nla_put_u32(msg, BP_GENL_A_NODE_ID, node_id); + if (ret) { + pr_err("request_bundle: failed to put NODE_ID (%d)\n", ret); + goto err_cancel; + } + ret = nla_put_u32(msg, BP_GENL_A_SERVICE_ID, service_id); if (ret) { pr_err("request_bundle: failed to put SERVICE_ID (%d)\n", ret); @@ -143,17 +143,19 @@ int deliver_bundle_doit(struct sk_buff* skb, struct genl_info* info) struct bp_sock* bp; struct sk_buff* new_skb; bool new_skb_queued = false; - u_int32_t service_id; + u_int32_t node_id, service_id; void* payload; size_t payload_len; int ret; - if (!info->attrs[BP_GENL_A_SERVICE_ID] + if (!info->attrs[BP_GENL_A_NODE_ID] + || !info->attrs[BP_GENL_A_SERVICE_ID] || !info->attrs[BP_GENL_A_PAYLOAD]) { pr_err("deliver_bundle: missing required attributes\n"); ret = -EINVAL; goto out; } + node_id = nla_get_u32(info->attrs[BP_GENL_A_NODE_ID]); service_id = nla_get_u32(info->attrs[BP_GENL_A_SERVICE_ID]); payload = nla_data(info->attrs[BP_GENL_A_PAYLOAD]); payload_len = nla_len(info->attrs[BP_GENL_A_PAYLOAD]); @@ -172,7 +174,8 @@ int deliver_bundle_doit(struct sk_buff* skb, struct genl_info* info) bh_lock_sock(sk); bp = bp_sk(sk); - if (bp->bp_service_id == service_id) { + if (bp->bp_node_id == node_id + && bp->bp_service_id == service_id) { skb_queue_tail(&bp->queue, new_skb); new_skb_queued = true; @@ -199,3 +202,49 @@ int deliver_bundle_doit(struct sk_buff* skb, struct genl_info* info) out: return ret; } + +int destroy_bundle_doit(u_int32_t node_id, u_int32_t service_id, int port_id) +{ + void* msg_head; + struct sk_buff* msg; + size_t msg_size; + int ret; + + msg_size = 2 * nla_total_size(sizeof(u_int32_t)); + msg = genlmsg_new(msg_size, GFP_KERNEL); + if (!msg) { + pr_err("destroy_bundle: failed to allocate message buffer\n"); + ret = -ENOMEM; + goto out; + } + + msg_head + = genlmsg_put(msg, 0, 0, &genl_fam, 0, BP_GENL_CMD_DESTROY_BUNDLE); + if (!msg_head) { + pr_err("destroy_bundle: failed to create genetlink header\n"); + ret = -EMSGSIZE; + goto err_free; + } + + ret = nla_put_u32(msg, BP_GENL_A_NODE_ID, node_id); + if (ret) { + pr_err("destroy_bundle: failed to put NODE_ID (%d)\n", ret); + goto err_cancel; + } + + ret = nla_put_u32(msg, BP_GENL_A_SERVICE_ID, service_id); + if (ret) { + pr_err("destroy_bundle: failed to put SERVICE_ID (%d)\n", ret); + goto err_cancel; + } + + genlmsg_end(msg, msg_head); + return genlmsg_unicast(&init_net, msg, port_id); + +err_cancel: + genlmsg_cancel(msg, msg_head); +err_free: + nlmsg_free(msg); +out: + return ret; +} \ No newline at end of file diff --git a/bp_socket/bp_genl.h b/bp_socket/bp_genl.h index 157bf44..d3cfb1d 100644 --- a/bp_socket/bp_genl.h +++ b/bp_socket/bp_genl.h @@ -5,10 +5,10 @@ extern struct genl_family genl_fam; -int fail_doit(struct sk_buff* skb, struct genl_info* info); -int send_bundle_doit(u64 sockid, void* payload, int payload_size, - u_int32_t node_id, u_int32_t service_id, int port_id); +int send_bundle_doit(void* payload, int payload_size, u_int32_t node_id, + u_int32_t service_id, int port_id); int deliver_bundle_doit(struct sk_buff* skb, struct genl_info* info); int request_bundle_doit(u_int32_t node_id, u_int32_t service_id, int port_id); +int destroy_bundle_doit(u_int32_t node_id, u_int32_t service_id, int port_id); #endif \ No newline at end of file diff --git a/daemon/bp_genl.c b/daemon/bp_genl.c index b52109e..51158bc 100644 --- a/daemon/bp_genl.c +++ b/daemon/bp_genl.c @@ -13,9 +13,13 @@ #include "log.h" struct nl_sock *genl_bp_sock_init(Daemon *daemon) { - struct nl_sock *sk = nl_socket_alloc(); + struct nl_sock *sk; + int family_id; + int err; + + sk = nl_socket_alloc(); if (!sk) { - log_error("Failed to allocate Netlink socket"); + log_error("Failed to allocate Netlink socket: %s", nl_geterror(-ENOMEM)); return NULL; } @@ -24,16 +28,16 @@ struct nl_sock *genl_bp_sock_init(Daemon *daemon) { nl_socket_modify_cb(sk, NL_CB_VALID, NL_CB_CUSTOM, genl_bp_sock_recvmsg_cb, daemon); nl_socket_set_peer_port(sk, 0); // Send to kernel - int err = genl_connect(sk); + err = genl_connect(sk); if (err < 0) { - log_error("genl_connect() failed: %s", nl_geterror(err)); + log_error("Failed to connect to Generic Netlink: %s", nl_geterror(err)); nl_socket_free(sk); return NULL; } - int family_id = genl_ctrl_resolve(sk, daemon->genl_bp_family_name); + family_id = genl_ctrl_resolve(sk, daemon->genl_bp_family_name); if (family_id < 0) { - log_error("Failed to resolve family '%s': %s", daemon->genl_bp_family_name, + log_error("Failed to resolve Generic Netlink family '%s': %s", daemon->genl_bp_family_name, nl_geterror(family_id)); nl_socket_free(sk); return NULL; @@ -47,9 +51,10 @@ void genl_bp_sock_close(Daemon *daemon) { if (!daemon->genl_bp_sock) return; nl_socket_free(daemon->genl_bp_sock); - log_info("Netlink socket closed"); - + daemon->genl_bp_sock = NULL; daemon->genl_bp_family_id = -1; + + log_info("Generic Netlink socket closed"); } int genl_bp_sock_recvmsg_cb(struct nl_msg *msg, void *arg) { @@ -57,11 +62,12 @@ int genl_bp_sock_recvmsg_cb(struct nl_msg *msg, void *arg) { struct nlmsghdr *nlh = nlmsg_hdr(msg); struct genlmsghdr *genlhdr = nlmsg_data(nlh); struct nlattr *attrs[BP_GENL_A_MAX + 1]; + int err; - int err = nla_parse(attrs, BP_GENL_A_MAX, genlmsg_attrdata(genlhdr, 0), - genlmsg_attrlen(genlhdr, 0), NULL); - if (err) { - log_error("Failed to parse message: %s", strerror(-err)); + err = nla_parse(attrs, BP_GENL_A_MAX, genlmsg_attrdata(genlhdr, 0), genlmsg_attrlen(genlhdr, 0), + NULL); + if (err < 0) { + log_error("Failed to parse Netlink attributes: %s", strerror(-err)); return NL_SKIP; } @@ -70,8 +76,12 @@ int genl_bp_sock_recvmsg_cb(struct nl_msg *msg, void *arg) { return handle_send_bundle(daemon, attrs); case BP_GENL_CMD_REQUEST_BUNDLE: return handle_request_bundle(daemon, attrs); + // case BP_GENL_CMD_DELIVER_BUNDLE: + // return handle_deliver_bundle_reply(daemon, attrs); + case BP_GENL_CMD_DESTROY_BUNDLE: + return handle_destroy_bundle(daemon, attrs); default: - log_error("Unknown GENL command: %d", genlhdr->cmd); + log_error("Unknown Generic Netlink command: %d", genlhdr->cmd); return NL_SKIP; } } diff --git a/daemon/bp_genl_handlers.c b/daemon/bp_genl_handlers.c index 23abcd3..e5860c5 100644 --- a/daemon/bp_genl_handlers.c +++ b/daemon/bp_genl_handlers.c @@ -3,65 +3,81 @@ #include #include #include +#include #include #include #include "../include/bp_socket.h" +#include "bp.h" #include "bp_genl_handlers.h" #include "daemon.h" #include "ion.h" #include "log.h" int handle_send_bundle(Daemon *daemon, struct nlattr **attrs) { - if (!attrs[BP_GENL_A_SOCKID] || !attrs[BP_GENL_A_PAYLOAD] || !attrs[BP_GENL_A_NODE_ID] || - !attrs[BP_GENL_A_SERVICE_ID]) { - log_error("Missing attribute(s) in SEND_BUNDLE"); - return NL_SKIP; + void *payload; + int payload_size; + u_int32_t node_id, service_id; + char eid[64]; + int eid_size; + + if (!attrs[BP_GENL_A_PAYLOAD] || !attrs[BP_GENL_A_NODE_ID] || !attrs[BP_GENL_A_SERVICE_ID]) { + log_error( + "handle_send_bundle: missing attribute(s) in SEND_BUNDLE command (payload, node ID, " + "service ID)"); + return -EINVAL; } - unsigned long sockid = nla_get_u64(attrs[BP_GENL_A_SOCKID]); - char *payload = nla_data(attrs[BP_GENL_A_PAYLOAD]); - int payload_size = nla_len(attrs[BP_GENL_A_PAYLOAD]); - uint32_t node_id = nla_get_u32(attrs[BP_GENL_A_NODE_ID]); - uint32_t service_id = nla_get_u32(attrs[BP_GENL_A_SERVICE_ID]); + payload = nla_data(attrs[BP_GENL_A_PAYLOAD]); + payload_size = nla_len(attrs[BP_GENL_A_PAYLOAD]); + node_id = nla_get_u32(attrs[BP_GENL_A_NODE_ID]); + service_id = nla_get_u32(attrs[BP_GENL_A_SERVICE_ID]); - char eid[64]; - int eid_size = snprintf(eid, sizeof(eid), "ipn:%u.%u", node_id, service_id); + eid_size = snprintf(eid, sizeof(eid), "ipn:%u.%u", node_id, service_id) + 1; if (eid_size < 0 || eid_size >= (int)sizeof(eid)) { - log_error("Failed to construct EID string"); + log_error("[ipn:%u.%u] handle_send_bundle: failed to construct EID string", node_id, + service_id); return -EINVAL; } - log_info("SEND_BUNDLE: sockid=%lu, EID=%s, payload size=%d", sockid, eid, payload_size); + log_info("[ipn:%u.%u] SEND_BUNDLE: sending bundle to EID %s, size %d (bytes)", eid, + payload_size, node_id, service_id); - return bp_send_to_eid(daemon->sdr, payload, payload_size, eid, eid_size + 1); + return bp_send_to_eid(daemon->sdr, payload, payload_size, eid, eid_size); } int handle_request_bundle(Daemon *daemon, struct nlattr **attrs) { - if (!attrs[BP_GENL_A_SERVICE_ID]) { - log_error("Missing BP_GENL_A_SERVICE_ID in REQUEST_BUNDLE"); - return NL_SKIP; + pthread_t thread; + struct thread_args *args; + u_int32_t node_id, service_id; + + if (!attrs[BP_GENL_A_SERVICE_ID] || !attrs[BP_GENL_A_NODE_ID]) { + log_error("handle_request_bundle: missing attribute(s) in REQUEST_BUNDLE " + "command (service " + "ID, node ID)"); + return -EINVAL; } - uint32_t service_id = nla_get_u32(attrs[BP_GENL_A_SERVICE_ID]); - log_info("REQUEST_BUNDLE: bundle request initiated (service ID %u)", service_id); + node_id = nla_get_u32(attrs[BP_GENL_A_NODE_ID]); + service_id = nla_get_u32(attrs[BP_GENL_A_SERVICE_ID]); - struct thread_args *args = malloc(sizeof(struct thread_args)); + args = malloc(sizeof(struct thread_args)); if (!args) { - log_error("Failed to allocate thread arguments"); + log_error("[ipn:%u.%u] handle_send_bundle: failed to allocate thread args", args->node_id, + args->service_id); return -ENOMEM; } - + args->node_id = node_id; args->service_id = service_id; args->netlink_sock = daemon->genl_bp_sock; args->netlink_family = daemon->genl_bp_family_id; args->sdr = daemon->sdr; - pthread_t thread; - if (pthread_create(&thread, NULL, handle_recv_thread, args) != 0) { - log_error("Failed to create thread"); - free(args); - return -1; + log_info("[ipn:%u.%u] REQUEST_BUNDLE: bundle request initiated", node_id, service_id); + if (pthread_create(&thread, NULL, (void *(*)(void *))handle_recv_thread, args) != 0) { + log_error("[ipn:%u.%u] handle_request_bundle: failed to create receive thread: %s", node_id, + service_id, strerror(errno)); + return -errno; } pthread_detach(thread); @@ -69,57 +85,131 @@ int handle_request_bundle(Daemon *daemon, struct nlattr **attrs) { return 0; } -void *handle_recv_thread(void *arg) { - struct thread_args *args = (struct thread_args *)arg; - char *payload = NULL; - int payload_size; +void *handle_recv_thread(struct thread_args *args) { + void *payload = NULL; + size_t payload_size; + int err; + bool bundle_present; + Object adu; + + adu = find_adu(args->sdr, args->node_id, args->service_id); + bundle_present = adu != 0; + + payload = bp_recv_once(args->sdr, args->node_id, args->service_id, &payload_size); + if (!payload) { + log_error("[ipn:%u.%u] handle_recv_thread: failed to receive bundle", args->node_id, + args->service_id); + goto out; + } - payload_size = bp_recv_once(args->sdr, args->service_id, &payload); - if (payload_size < 0) { - log_info("Exit recv thread (service ID %u)", args->service_id); - free(args); - return NULL; + if (!bundle_present) { + log_info("[ipn:%u.%u] REQUEST_BUNDLE: bundle received, size %zu bytes", args->node_id, + args->service_id, payload_size); + } else { + log_warn("[ipn:%u.%u] REQUEST_BUNDLE: bundle reference already present in memory, size %zu " + "bytes", + args->node_id, args->service_id, payload_size); } - handle_deliver_bundle(payload, payload_size, args); + err = handle_deliver_bundle(payload, payload_size, args); + if (err < 0) { + log_error("[ipn:%u.%u] handle_deliver_bundle: failed with error %d", err, args->node_id, + args->service_id); + } +out: + if (payload) free(payload); free(args); return NULL; } -int handle_deliver_bundle(char *payload, int payload_size, struct thread_args *args) { - int err = 0; +int handle_deliver_bundle(void *payload, int payload_size, struct thread_args *args) { + struct nl_msg *msg = NULL; + void *hdr; + int ret; - struct nl_msg *msg = nlmsg_alloc(); + msg = nlmsg_alloc(); if (!msg) { - log_error("DELIVER_BUNDLE: Failed to allocate Netlink msg"); - free(payload); - return -ENOMEM; + log_error("[ipn:%u.%u] handle_deliver_bundle: failed to allocate Netlink msg", + args->node_id, args->service_id); + ret = -ENOMEM; + goto out; } - void *hdr = genlmsg_put(msg, NL_AUTO_PORT, NL_AUTO_SEQ, args->netlink_family, 0, 0, - BP_GENL_CMD_DELIVER_BUNDLE, BP_GENL_VERSION); - if (!hdr || nla_put_u32(msg, BP_GENL_A_SERVICE_ID, args->service_id) < 0 || - nla_put(msg, BP_GENL_A_PAYLOAD, payload_size, payload) < 0) { - log_error("DELIVER_BUNDLE: Failed to construct Netlink reply"); - nlmsg_free(msg); - free(payload); - return -EMSGSIZE; + hdr = genlmsg_put(msg, NL_AUTO_PORT, NL_AUTO_SEQ, args->netlink_family, 0, 0, + BP_GENL_CMD_DELIVER_BUNDLE, BP_GENL_VERSION); + if (!hdr) { + log_error("[ipn:%u.%u] handle_deliver_bundle: failed to create Netlink header", + args->node_id, args->service_id); + ret = -EMSGSIZE; + goto err_free_msg; } - err = nl_send_auto(args->netlink_sock, msg); - if (err < 0) { - log_error("DELIVER_BUNDLE: Failed to send Netlink message (service ID %u)", - args->service_id); - nlmsg_free(msg); - free(payload); - return err; + if (nla_put_u32(msg, BP_GENL_A_SERVICE_ID, args->service_id) < 0) { + log_error("[ipn:%u.%u] handle_deliver_bundle: failed to add SERVICE_ID attribute", + args->node_id, args->service_id); + ret = -EMSGSIZE; + goto err_free_msg; + } + + if (nla_put_u32(msg, BP_GENL_A_NODE_ID, args->node_id) < 0) { + log_error("[ipn:%u.%u] handle_deliver_bundle: failed to add NODE_ID attribute", + args->node_id, args->service_id); + ret = -EMSGSIZE; + goto err_free_msg; + } + + if (nla_put(msg, BP_GENL_A_PAYLOAD, payload_size, payload) < 0) { + log_error("[ipn:%u.%u] [ipn:%u.%u] handle_deliver_bundle: failed to add PAYLOAD attribute", + args->node_id, args->service_id); + ret = -EMSGSIZE; + goto err_free_msg; + } + + ret = nl_send_sync(args->netlink_sock, msg); + if (ret < 0) { + log_warn("[ipn:%u.%u] DELIVER_BUNDLE: bundle not delivered to kernel, keeping reference in " + "memory (no active BP socket " + "client)", + args->node_id, args->service_id); + ret = 0; // Do not return an error, just log it + goto out; } - log_info("DELIVER_BUNDLE: received bundle and forwarding to kernel (service ID %u)", - args->service_id); + log_info("[ipn:%u.%u] DELIVER_BUNDLE: bundle sent to kernel", args->node_id, args->service_id); + + return 0; +err_free_msg: nlmsg_free(msg); - free(payload); - return err; +out: + return ret; +} + +int handle_destroy_bundle(Daemon *daemon, struct nlattr **attrs) { + u_int32_t node_id, service_id; + int ret = 0; + + if (!attrs[BP_GENL_A_NODE_ID] || !attrs[BP_GENL_A_SERVICE_ID]) { + log_error("handle_destroy_bundle: missing attribute(s) in DESTROY_BUNDLE " + "command (node ID, " + "service ID)"); + ret = -EINVAL; + goto out; + } + + node_id = nla_get_u32(attrs[BP_GENL_A_NODE_ID]); + service_id = nla_get_u32(attrs[BP_GENL_A_SERVICE_ID]); + + ret = destroy_adu(daemon->sdr, node_id, service_id); + if (ret < 0) { + log_error("[ipn:%u.%u] handle_destroy_bundle: failed to destroy bundle: %s", node_id, + service_id, strerror(-ret)); + goto out; + } + + log_info("[ipn:%u.%u] DESTROY_BUNDLE: bundle destroy from ION", node_id, service_id); + +out: + return ret; } \ No newline at end of file diff --git a/daemon/bp_genl_handlers.h b/daemon/bp_genl_handlers.h index 6d6d50e..f959ea4 100644 --- a/daemon/bp_genl_handlers.h +++ b/daemon/bp_genl_handlers.h @@ -7,14 +7,16 @@ struct thread_args { struct nl_sock *netlink_sock; int netlink_family; - unsigned int service_id; + u_int32_t node_id; + u_int32_t service_id; Sdr sdr; }; int handle_send_bundle(Daemon *daemon, struct nlattr **attrs); int handle_request_bundle(Daemon *daemon, struct nlattr **attrs); -int handle_deliver_bundle(char *payload, int payload_size, struct thread_args *args); +int handle_deliver_bundle(void *payload, int payload_size, struct thread_args *args); +int handle_destroy_bundle(Daemon *daemon, struct nlattr **attrs); -void *handle_recv_thread(void *arg); +void *handle_recv_thread(struct thread_args *arg); #endif \ No newline at end of file diff --git a/daemon/daemon.c b/daemon/daemon.c index c0f7de7..01884e0 100644 --- a/daemon/daemon.c +++ b/daemon/daemon.c @@ -24,7 +24,8 @@ void on_netlink(evutil_socket_t fd, short event, void *arg) { daemon->genl_bp_sock); // call the callback registered with genl_bp_sock_recvmsg_cb() } -int daemon_start(Daemon *self) { +int daemon_run(Daemon *self) { + int fd; int ret; self->base = event_base_new(); @@ -32,41 +33,51 @@ int daemon_start(Daemon *self) { (char *)event_base_get_method(self->base)); self->event_on_sigint = evsignal_new(self->base, SIGINT, on_sigint, self->base); - event_add(self->event_on_sigint, NULL); + ret = event_add(self->event_on_sigint, NULL); + if (ret < 0) { + log_error("Couldn't add SIGINT event"); + daemon_free(self); + return ret; + } self->event_on_sigpipe = evsignal_new(self->base, SIGPIPE, on_sigpipe, self->base); - event_add(self->event_on_sigpipe, NULL); + ret = event_add(self->event_on_sigpipe, NULL); + if (ret < 0) { + log_error("Couldn't add SIGPIPE event"); + daemon_free(self); + return ret; + } self->genl_bp_sock = genl_bp_sock_init(self); if (!self->genl_bp_sock) { log_error("Failed to initialize Generic Netlink socket"); daemon_free(self); - return 1; + return -ENOMEM; } log_info("Generic Netlink: GENL_BP open socket"); - int genl_bp_sock_fd = nl_socket_get_fd(self->genl_bp_sock); - self->event_on_nl_sock = - event_new(self->base, genl_bp_sock_fd, EV_READ | EV_PERSIST, on_netlink, self); - if (event_add(self->event_on_nl_sock, NULL) == -1) { + fd = nl_socket_get_fd(self->genl_bp_sock); + self->event_on_nl_sock = event_new(self->base, fd, EV_READ | EV_PERSIST, on_netlink, self); + ret = event_add(self->event_on_nl_sock, NULL); + if (ret < 0) { log_error("Couldn't add Netlink event"); daemon_free(self); - return 1; + return ret; } - ret = evutil_make_socket_nonblocking(genl_bp_sock_fd); - if (ret == -1) { + ret = evutil_make_socket_nonblocking(fd); + if (ret < 0) { log_error("Failed in evutil_make_socket_nonblocking: %s", evutil_socket_error_to_string(EVUTIL_SOCKET_ERROR())); daemon_free(self); - return 1; + return ret; } log_info("Attempting to attach to ION..."); if (bp_attach() < 0) { log_error("Can't attach to BP"); daemon_free(self); - return 1; + return -EAGAIN; } self->sdr = bp_get_sdr(); log_info("Successfully attached to ION"); diff --git a/daemon/daemon.h b/daemon/daemon.h index e6eac17..1bc4048 100644 --- a/daemon/daemon.h +++ b/daemon/daemon.h @@ -22,7 +22,7 @@ void on_sigint(evutil_socket_t fd, short what, void *arg); void on_sigpipe(evutil_socket_t fd, short what, void *arg); void on_netlink(evutil_socket_t fd, short what, void *arg); -int daemon_start(Daemon *self); +int daemon_run(Daemon *self); void daemon_free(Daemon *self); #endif diff --git a/daemon/ion.c b/daemon/ion.c index a2ce2b5..e04dfe8 100644 --- a/daemon/ion.c +++ b/daemon/ion.c @@ -1,8 +1,11 @@ +#include "ion.h" #include "log.h" #include "sdr.h" #include #include +static struct adu_reference *adu_refs = NULL; +static pthread_mutex_t adu_refs_mutex = PTHREAD_MUTEX_INITIALIZER; static pthread_mutex_t sdrmutex = PTHREAD_MUTEX_INITIALIZER; const char *bp_result_text(BpIndResult result) { @@ -20,94 +23,219 @@ const char *bp_result_text(BpIndResult result) { } } -int bp_send_to_eid(Sdr sdr, char *payload, int payload_size, char *dest_eid, int eid_size) { - Object sdrBuffer; - Object zco; +int add_adu(Sdr sdr, Object adu, u_int32_t node_id, u_int32_t service_id) { + struct adu_reference *ref; - oK(sdr_begin_xn(sdr)); + if (pthread_mutex_lock(&adu_refs_mutex) != 0) { + log_error("add_adu: Failed to lock SDR mutex."); + return -1; + } - sdrBuffer = sdr_malloc(sdr, payload_size); - if (sdrBuffer == 0) { - sdr_end_xn(sdr); + ref = malloc(sizeof(struct adu_reference)); + if (!ref) { + log_error("add_adu: Failed to allocate memory for bundle reference."); + pthread_mutex_unlock(&adu_refs_mutex); + return -ENOMEM; + } + + ref->adu = adu; + ref->node_id = node_id; + ref->service_id = service_id; + ref->next = adu_refs; + adu_refs = ref; + + pthread_mutex_unlock(&adu_refs_mutex); + return 0; +} + +Object find_adu(Sdr sdr, u_int32_t node_id, u_int32_t service_id) { + struct adu_reference *ref; + + for (ref = adu_refs; ref != NULL; ref = ref->next) { + if (ref->node_id == node_id && ref->service_id == service_id) { + return ref->adu; + } + } + return 0; +} + +int destroy_adu(Sdr sdr, u_int32_t node_id, u_int32_t service_id) { + struct adu_reference *prev = NULL; + struct adu_reference *current = adu_refs; + + if (pthread_mutex_lock(&adu_refs_mutex) != 0) { + log_error("destroy_adu: Failed to lock adu_refs mutex."); + return -EAGAIN; + } + + while (current) { + if (current->node_id == node_id && current->service_id == service_id) { + if (prev) { + prev->next = current->next; + } else { + adu_refs = current->next; + } + + if (pthread_mutex_lock(&sdrmutex) != 0) { + log_error("destroy_adu: Failed to lock SDR mutex."); + free(current); + pthread_mutex_unlock(&adu_refs_mutex); + return -EAGAIN; + } + + zco_destroy(sdr, current->adu); + free(current); + pthread_mutex_unlock(&sdrmutex); + pthread_mutex_unlock(&adu_refs_mutex); + return 0; + } + prev = current; + current = current->next; + } + + pthread_mutex_unlock(&adu_refs_mutex); + log_warn("destroy_adu: no bundle found (ipn:%u.%u)", node_id, service_id); + return -ENOENT; +} + +int bp_send_to_eid(Sdr sdr, void *payload, int payload_size, char *dest_eid, int eid_size) { + Object sdr_buffer = 0; + Object adu; + int ret = 0; + + if (sdr_begin_xn(sdr) == 0) { + log_error("bp_send_to_eid: sdr_begin_xn failed."); + return -EIO; + } + + sdr_buffer = sdr_malloc(sdr, payload_size); + if (sdr_buffer == 0) { log_error("sdr_malloc failed."); - return 0; + ret = -ENOMEM; + goto out; } - sdr_write(sdr, sdrBuffer, payload, payload_size); + sdr_write(sdr, sdr_buffer, payload, payload_size); - zco = zco_create(sdr, ZcoSdrSource, sdrBuffer, 0, payload_size, ZcoOutbound); - if (zco == 0 || zco == (Object)ERROR) { - sdr_end_xn(sdr); + adu = zco_create(sdr, ZcoSdrSource, sdr_buffer, 0, payload_size, ZcoOutbound); + if (adu <= 0) { log_error("zco_create failed."); - return 0; + ret = -ENOMEM; + goto out; } - if (bp_send(NULL, dest_eid, NULL, 86400, BP_STD_PRIORITY, 0, 0, 0, NULL, zco, NULL) <= 0) { - sdr_end_xn(sdr); + if (bp_send(NULL, dest_eid, NULL, 86400, BP_STD_PRIORITY, 0, 0, 0, NULL, adu, NULL) <= 0) { log_error("bp_send failed."); - return 0; + ret = -EIO; + goto out; } +out: + if (sdr_buffer != 0) sdr_free(sdr, sdr_buffer); sdr_end_xn(sdr); - return 1; + return ret; } -int bp_recv_once(Sdr sdr, int service_id, char **payload) { +void *bp_recv_once(Sdr sdr, u_int32_t node_id, u_int32_t service_id, size_t *payload_size) { BpSAP sap; BpDelivery dlv; ZcoReader reader; - int bundle_len; - int rc = -1; + Object adu; + u_int32_t own_node_id; + void *payload = NULL; int eid_size; char eid[64]; - int nodeNbr = getOwnNodeNbr(); - eid_size = snprintf(eid, sizeof(eid), "ipn:%d.%d", nodeNbr, service_id); - if (eid_size < 0 || eid_size >= sizeof(eid)) { - log_error("Failed to construct EID string."); - return -1; + own_node_id = getOwnNodeNbr(); + if (node_id != own_node_id) { + log_error("bp_recv_once: node ID mismatch. Expected %u, got %u", own_node_id, node_id); + return NULL; + } + + adu = find_adu(sdr, node_id, service_id); + if (adu != 0) { + *payload_size = zco_source_data_length(sdr, adu); + payload = malloc(*payload_size); + if (!payload) { + log_error("bp_recv_once: Failed to allocate memory for payload."); + return NULL; + } + zco_start_receiving(adu, &reader); + if (zco_receive_source(sdr, &reader, *payload_size, payload) < 0) { + log_error("bp_recv_once: zco_receive_source failed."); + free(payload); + return NULL; + } + return payload; + } + + eid_size = snprintf(eid, sizeof(eid), "ipn:%u.%u", node_id, service_id); + if (eid_size < 0 || eid_size >= (int)sizeof(eid)) { + log_error("bp_recv_once: failed to construct EID string."); + return NULL; } if (bp_open(eid, &sap) < 0) { - log_error("bp_recv_once: Failed to open BpSAP for node_id=%d service_id=%d", nodeNbr, + log_error("bp_recv_once: failed to open BpSAP (node_id=%u service_id=%u)", node_id, service_id); - return -1; + return NULL; } if (bp_receive(sap, &dlv, BP_BLOCKING) < 0) { - log_error("bp_recv_once: Bundle reception failed."); - goto out; + log_error("bp_recv_once: bundle reception failed."); + goto close_sap; } if (dlv.result != BpPayloadPresent || dlv.adu == 0) { log_error("bp_recv_once: %s", bp_result_text(dlv.result)); - goto out; + goto release_dlv; + } + + if (add_adu(sdr, dlv.adu, node_id, service_id) < 0) { + log_error("bp_recv_once: failed to add bundle reference."); + goto release_dlv; } if (pthread_mutex_lock(&sdrmutex) != 0) { - putErrmsg("Couldn't take sdr mutex.", NULL); - goto out; + log_error("bp_recv_once: Failed to lock SDR mutex."); + goto release_dlv; } - if (sdr_begin_xn(sdr) == 0) goto out; + if (sdr_begin_xn(sdr) == 0) { + log_error("bp_recv_once: sdr_begin_xn failed."); + pthread_mutex_unlock(&sdrmutex); + goto release_dlv; + } - bundle_len = zco_source_data_length(sdr, dlv.adu); - *payload = malloc(bundle_len); - if (!*payload) { + *payload_size = zco_source_data_length(sdr, dlv.adu); + payload = malloc(*payload_size); + if (!payload) { log_error("bp_recv_once: Failed to allocate memory for payload."); - goto unlock_sdr; + sdr_end_xn(sdr); + pthread_mutex_unlock(&sdrmutex); + goto release_dlv; } zco_start_receiving(dlv.adu, &reader); - rc = zco_receive_source(sdr, &reader, bundle_len, *payload); + if (zco_receive_source(sdr, &reader, *payload_size, payload) < 0) { + log_error("bp_recv_once: zco_receive_source failed."); + free(payload); + payload = NULL; + sdr_end_xn(sdr); + pthread_mutex_unlock(&sdrmutex); + goto release_dlv; + } sdr_end_xn(sdr); -unlock_sdr: pthread_mutex_unlock(&sdrmutex); - -out: bp_release_delivery(&dlv, 0); bp_close(sap); - return rc; + return payload; + +release_dlv: + bp_release_delivery(&dlv, 0); +close_sap: + bp_close(sap); + return NULL; } diff --git a/daemon/ion.h b/daemon/ion.h index d56a930..05310e4 100644 --- a/daemon/ion.h +++ b/daemon/ion.h @@ -3,7 +3,18 @@ #include "bp.h" -int bp_send_to_eid(Sdr sdr, char *payload, int payload_size, char *dest_eid, int eid_size); -int bp_recv_once(Sdr sdr, int service_id, char **payload); +struct adu_reference { + Object adu; + u_int32_t node_id; + u_int32_t service_id; + struct adu_reference *next; +}; + +int add_adu(Sdr sdr, Object adu, u_int32_t node_id, u_int32_t service_id); +Object find_adu(Sdr sdr, u_int32_t node_id, u_int32_t service_id); +int destroy_adu(Sdr sdr, u_int32_t node_id, u_int32_t service_id); + +int bp_send_to_eid(Sdr sdr, void *payload, int payload_size, char *dest_eid, int eid_size); +void *bp_recv_once(Sdr sdr, u_int32_t node_id, u_int32_t service_id, size_t *payload_size); #endif \ No newline at end of file diff --git a/daemon/main.c b/daemon/main.c index 24093db..2c8e123 100644 --- a/daemon/main.c +++ b/daemon/main.c @@ -25,6 +25,7 @@ int main(int argc, char *argv[]) { .event_on_nl_sock = NULL, }; - int ret = daemon_start(&daemon); - return ret; + if (daemon_run(&daemon) < 0) return EXIT_FAILURE; + + return EXIT_SUCCESS; } diff --git a/include/bp_socket.h b/include/bp_socket.h index c193b1e..c42f6e4 100644 --- a/include/bp_socket.h +++ b/include/bp_socket.h @@ -32,6 +32,7 @@ enum bp_genl_cmds { BP_GENL_CMD_SEND_BUNDLE, BP_GENL_CMD_REQUEST_BUNDLE, BP_GENL_CMD_DELIVER_BUNDLE, + BP_GENL_CMD_DESTROY_BUNDLE, __BP_GENL_CMD_MAX, };