diff --git a/bp_socket/af_bp.c b/bp_socket/af_bp.c index e4aada2..a244412 100644 --- a/bp_socket/af_bp.c +++ b/bp_socket/af_bp.c @@ -90,7 +90,6 @@ int bp_bind(struct socket* sock, struct sockaddr* uaddr, int addr_len) struct sockaddr_bp* addr = (struct sockaddr_bp*)uaddr; int service_id = -1; int node_id = -1; - int err = 0; if (addr_len != sizeof(struct sockaddr_bp)) return -EINVAL; @@ -132,8 +131,7 @@ int bp_bind(struct socket* sock, struct sockaddr* uaddr, int addr_len) if (iter_bp->bp_service_id == service_id && iter_bp->bp_node_id == node_id) { read_unlock_bh(&bp_list_lock); - err = -EADDRINUSE; - goto out; + return -EADDRINUSE; } } read_unlock_bh(&bp_list_lock); @@ -146,10 +144,8 @@ int bp_bind(struct socket* sock, struct sockaddr* uaddr, int addr_len) sk_add_node(sk, &bp_list); write_unlock_bh(&bp_list_lock); -out: release_sock(sk); - - return err; + return 0; } int bp_release(struct socket* sock) diff --git a/bp_socket/bp_genl.c b/bp_socket/bp_genl.c index 20cf555..2f35d71 100644 --- a/bp_socket/bp_genl.c +++ b/bp_socket/bp_genl.c @@ -162,16 +162,12 @@ int deliver_bundle_doit(struct sk_buff* skb, struct genl_info* info) pr_info("TRIGGER: received message\n"); - if (!info->attrs[BP_GENL_A_SERVICE_ID]) { - pr_err("attribute missing from message\n"); + if (!info->attrs[BP_GENL_A_SERVICE_ID] + || !info->attrs[BP_GENL_A_PAYLOAD]) { + pr_err("deliver_bundle: missing required attributes\n"); return -EINVAL; } service_id = nla_get_u32(info->attrs[BP_GENL_A_SERVICE_ID]); - - if (!info->attrs[BP_GENL_A_PAYLOAD]) { - pr_err("empty message received\n"); - return -EINVAL; - } payload = nla_data(info->attrs[BP_GENL_A_PAYLOAD]); payload_len = nla_len(info->attrs[BP_GENL_A_PAYLOAD]); @@ -187,7 +183,7 @@ int deliver_bundle_doit(struct sk_buff* skb, struct genl_info* info) read_lock_bh(&bp_list_lock); sk_for_each(sk, &bp_list) { - lock_sock(sk); + bh_lock_sock(sk); bp = bp_sk(sk); if (bp->bp_service_id == service_id) { @@ -195,10 +191,10 @@ int deliver_bundle_doit(struct sk_buff* skb, struct genl_info* info) skb_queue_tail(&bp->queue, new_skb); if (waitqueue_active(&bp->wait_queue)) wake_up_interruptible(&bp->wait_queue); - release_sock(sk); + bh_unlock_sock(sk); break; } - release_sock(sk); + bh_unlock_sock(sk); } read_unlock_bh(&bp_list_lock); diff --git a/daemon/bp_genl_handlers.c b/daemon/bp_genl_handlers.c index e6a957d..23abcd3 100644 --- a/daemon/bp_genl_handlers.c +++ b/daemon/bp_genl_handlers.c @@ -34,7 +34,7 @@ int handle_send_bundle(Daemon *daemon, struct nlattr **attrs) { log_info("SEND_BUNDLE: sockid=%lu, EID=%s, payload size=%d", sockid, eid, payload_size); - return bp_send_to_eid(payload, payload_size, eid, eid_size + 1); + return bp_send_to_eid(daemon->sdr, payload, payload_size, eid, eid_size + 1); } int handle_request_bundle(Daemon *daemon, struct nlattr **attrs) { @@ -44,7 +44,7 @@ int handle_request_bundle(Daemon *daemon, struct nlattr **attrs) { } uint32_t service_id = nla_get_u32(attrs[BP_GENL_A_SERVICE_ID]); - log_info("REQUEST_BUNDLE: Bundle request initiated (service ID %u)", service_id); + log_info("REQUEST_BUNDLE: bundle request initiated (service ID %u)", service_id); struct thread_args *args = malloc(sizeof(struct thread_args)); if (!args) { @@ -55,9 +55,10 @@ int handle_request_bundle(Daemon *daemon, struct nlattr **attrs) { args->service_id = service_id; args->netlink_sock = daemon->genl_bp_sock; args->netlink_family = daemon->genl_bp_family_id; + args->sdr = daemon->sdr; pthread_t thread; - if (pthread_create(&thread, NULL, bp_recv_thread, args) != 0) { + if (pthread_create(&thread, NULL, handle_recv_thread, args) != 0) { log_error("Failed to create thread"); free(args); return -1; @@ -68,19 +69,26 @@ int handle_request_bundle(Daemon *daemon, struct nlattr **attrs) { return 0; } -void *bp_recv_thread(void *arg) { +void *handle_recv_thread(void *arg) { struct thread_args *args = (struct thread_args *)arg; - handle_deliver_bundle(args); + char *payload = NULL; + int payload_size; + + payload_size = bp_recv_once(args->sdr, args->service_id, &payload); + if (payload_size < 0) { + log_info("Exit recv thread (service ID %u)", args->service_id); + free(args); + return NULL; + } + + handle_deliver_bundle(payload, payload_size, args); + free(args); return NULL; } -int handle_deliver_bundle(struct thread_args *args) { - char *payload = bp_recv_once(args->service_id); - if (!payload) { - log_error("DELIVER_BUNDLE: No payload received (service ID %u)", args->service_id); - return -1; - } +int handle_deliver_bundle(char *payload, int payload_size, struct thread_args *args) { + int err = 0; struct nl_msg *msg = nlmsg_alloc(); if (!msg) { @@ -92,23 +100,26 @@ int handle_deliver_bundle(struct thread_args *args) { void *hdr = genlmsg_put(msg, NL_AUTO_PORT, NL_AUTO_SEQ, args->netlink_family, 0, 0, BP_GENL_CMD_DELIVER_BUNDLE, BP_GENL_VERSION); if (!hdr || nla_put_u32(msg, BP_GENL_A_SERVICE_ID, args->service_id) < 0 || - nla_put(msg, BP_GENL_A_PAYLOAD, strlen(payload) + 1, payload) < 0) { + nla_put(msg, BP_GENL_A_PAYLOAD, payload_size, payload) < 0) { log_error("DELIVER_BUNDLE: Failed to construct Netlink reply"); nlmsg_free(msg); free(payload); return -EMSGSIZE; } - int err = nl_send_auto(args->netlink_sock, msg); - nlmsg_free(msg); - free(payload); - + err = nl_send_auto(args->netlink_sock, msg); if (err < 0) { log_error("DELIVER_BUNDLE: Failed to send Netlink message (service ID %u)", args->service_id); + nlmsg_free(msg); + free(payload); return err; } - log_info("DELIVER_BUNDLE: Bundle successfully delivered (service ID %u)", args->service_id); - return 0; + log_info("DELIVER_BUNDLE: received bundle and forwarding to kernel (service ID %u)", + args->service_id); + + nlmsg_free(msg); + free(payload); + return err; } \ No newline at end of file diff --git a/daemon/bp_genl_handlers.h b/daemon/bp_genl_handlers.h index c7d4331..6d6d50e 100644 --- a/daemon/bp_genl_handlers.h +++ b/daemon/bp_genl_handlers.h @@ -1,17 +1,20 @@ #ifndef BP_GENL_HANDLERS_H #define BP_GENL_HANDLERS_H +#include "bp.h" #include "daemon.h" struct thread_args { struct nl_sock *netlink_sock; int netlink_family; unsigned int service_id; + Sdr sdr; }; int handle_send_bundle(Daemon *daemon, struct nlattr **attrs); int handle_request_bundle(Daemon *daemon, struct nlattr **attrs); -int handle_deliver_bundle(struct thread_args *args); -void *bp_recv_thread(void *arg); +int handle_deliver_bundle(char *payload, int payload_size, struct thread_args *args); + +void *handle_recv_thread(void *arg); #endif \ No newline at end of file diff --git a/daemon/daemon.c b/daemon/daemon.c index 29d2c52..c0f7de7 100644 --- a/daemon/daemon.c +++ b/daemon/daemon.c @@ -18,7 +18,7 @@ void on_sigpipe(evutil_socket_t fd, short what, void *arg) { event_base_loopexit(base, NULL); } -void on_netlink(int fd, short event, void *arg) { +void on_netlink(evutil_socket_t fd, short event, void *arg) { Daemon *daemon = (Daemon *)arg; nl_recvmsgs_default( daemon->genl_bp_sock); // call the callback registered with genl_bp_sock_recvmsg_cb() @@ -68,6 +68,7 @@ int daemon_start(Daemon *self) { daemon_free(self); return 1; } + self->sdr = bp_get_sdr(); log_info("Successfully attached to ION"); log_info("Daemon started successfully"); @@ -75,6 +76,7 @@ int daemon_start(Daemon *self) { log_info("Daemon terminated"); daemon_free(self); + bp_detach(); return 0; } diff --git a/daemon/daemon.h b/daemon/daemon.h index b4d9233..e6eac17 100644 --- a/daemon/daemon.h +++ b/daemon/daemon.h @@ -1,6 +1,7 @@ #ifndef DAEMON_H #define DAEMON_H +#include "bp.h" #include #include @@ -9,6 +10,7 @@ typedef struct Daemon { char *genl_bp_family_name; int genl_bp_family_id; unsigned int nl_pid; + Sdr sdr; struct event_base *base; struct event *event_on_sigpipe; diff --git a/daemon/ion.c b/daemon/ion.c index 7d822ef..a2ce2b5 100644 --- a/daemon/ion.c +++ b/daemon/ion.c @@ -1,18 +1,29 @@ #include "log.h" #include "sdr.h" #include +#include + +static pthread_mutex_t sdrmutex = PTHREAD_MUTEX_INITIALIZER; + +const char *bp_result_text(BpIndResult result) { + switch (result) { + case BpPayloadPresent: + return "BpPayloadPresent"; + case BpReceptionTimedOut: + return "BpReceptionTimedOut"; + case BpReceptionInterrupted: + return "BpReceptionInterrupted"; + case BpEndpointStopped: + return "BpEndpointStopped"; + default: + return "Unknown"; + } +} -int bp_send_to_eid(char *payload, int payload_size, char *destEid, int eid_size) { - Sdr sdr; +int bp_send_to_eid(Sdr sdr, char *payload, int payload_size, char *dest_eid, int eid_size) { Object sdrBuffer; Object zco; - sdr = bp_get_sdr(); - if (sdr == NULL) { - log_error("*** Failed to get sdr."); - return 0; - } - oK(sdr_begin_xn(sdr)); sdrBuffer = sdr_malloc(sdr, payload_size); @@ -31,7 +42,7 @@ int bp_send_to_eid(char *payload, int payload_size, char *destEid, int eid_size) return 0; } - if (bp_send(NULL, destEid, NULL, 86400, BP_STD_PRIORITY, 0, 0, 0, NULL, zco, NULL) <= 0) { + if (bp_send(NULL, dest_eid, NULL, 86400, BP_STD_PRIORITY, 0, 0, 0, NULL, zco, NULL) <= 0) { sdr_end_xn(sdr); log_error("bp_send failed."); return 0; @@ -41,64 +52,62 @@ int bp_send_to_eid(char *payload, int payload_size, char *destEid, int eid_size) return 1; } -char *bp_recv_once(int service_id) { - BpSAP txSap; +int bp_recv_once(Sdr sdr, int service_id, char **payload) { + BpSAP sap; BpDelivery dlv; - Sdr sdr = getIonsdr(); ZcoReader reader; - char *eid = NULL; - char *payload = NULL; + int bundle_len; + int rc = -1; int eid_size; + char eid[64]; int nodeNbr = getOwnNodeNbr(); - vast len; - eid_size = snprintf(NULL, 0, "ipn:%d.%d", nodeNbr, service_id) + 1; - eid = malloc(eid_size); - if (!eid) { - log_error("Failed to allocate EID"); - return NULL; + eid_size = snprintf(eid, sizeof(eid), "ipn:%d.%d", nodeNbr, service_id); + if (eid_size < 0 || eid_size >= sizeof(eid)) { + log_error("Failed to construct EID string."); + return -1; } - snprintf(eid, eid_size, "ipn:%d.%d", nodeNbr, service_id); - if (bp_open(eid, &txSap) < 0 || txSap == NULL) { - log_error("Failed to open source endpoint."); + if (bp_open(eid, &sap) < 0) { + log_error("bp_recv_once: Failed to open BpSAP for node_id=%d service_id=%d", nodeNbr, + service_id); + return -1; + } + + if (bp_receive(sap, &dlv, BP_BLOCKING) < 0) { + log_error("bp_recv_once: Bundle reception failed."); goto out; } - if (bp_receive(txSap, &dlv, BP_BLOCKING) < 0) { - log_error("Bundle reception failed."); + if (dlv.result != BpPayloadPresent || dlv.adu == 0) { + log_error("bp_recv_once: %s", bp_result_text(dlv.result)); goto out; } - if (dlv.result != BpPayloadPresent) { - log_info("bp_recv_once: no payload"); + if (pthread_mutex_lock(&sdrmutex) != 0) { + putErrmsg("Couldn't take sdr mutex.", NULL); goto out; } - if (!sdr_begin_xn(sdr)) goto out; + if (sdr_begin_xn(sdr) == 0) goto out; - int payload_size = zco_source_data_length(sdr, dlv.adu); - payload = malloc(payload_size); - if (!payload) { - log_error("Failed to allocate memory for payload"); - sdr_exit_xn(sdr); - goto out; + bundle_len = zco_source_data_length(sdr, dlv.adu); + *payload = malloc(bundle_len); + if (!*payload) { + log_error("bp_recv_once: Failed to allocate memory for payload."); + goto unlock_sdr; } zco_start_receiving(dlv.adu, &reader); - len = zco_receive_source(sdr, &reader, payload_size, payload); + rc = zco_receive_source(sdr, &reader, bundle_len, *payload); - if (sdr_end_xn(sdr) < 0 || len < 0) { - log_error("Failed to read payload"); - free(payload); - payload = NULL; - goto out; - } + sdr_end_xn(sdr); +unlock_sdr: + pthread_mutex_unlock(&sdrmutex); out: - if (eid) free(eid); bp_release_delivery(&dlv, 0); - bp_close(txSap); + bp_close(sap); - return payload; + return rc; } diff --git a/daemon/ion.h b/daemon/ion.h index 6090c0c..d56a930 100644 --- a/daemon/ion.h +++ b/daemon/ion.h @@ -1,7 +1,9 @@ #ifndef ION_H #define ION_H -int bp_send_to_eid(char *payload, int payload_size, char *eid, int eid_size); -char *bp_recv_once(int service_id); +#include "bp.h" + +int bp_send_to_eid(Sdr sdr, char *payload, int payload_size, char *dest_eid, int eid_size); +int bp_recv_once(Sdr sdr, int service_id, char **payload); #endif \ No newline at end of file diff --git a/daemon/main.c b/daemon/main.c index b9c908f..24093db 100644 --- a/daemon/main.c +++ b/daemon/main.c @@ -1,10 +1,18 @@ #include "../include/bp_socket.h" #include "daemon.h" #include "log.h" +#include +#include #define NL_PID 8443 int main(int argc, char *argv[]) { + if (geteuid() != 0) { + log_error("This program must be run as root or with CAP_NET_ADMIN (required by " + "GENL_ADMIN_PERM)."); + return EXIT_FAILURE; + } + Daemon daemon = { .genl_bp_sock = NULL, .genl_bp_family_name = BP_GENL_NAME, diff --git a/tools/bp-demo-receiver.c b/tools/bp-demo-receiver.c index 5f457f5..3f017f0 100644 --- a/tools/bp-demo-receiver.c +++ b/tools/bp-demo-receiver.c @@ -20,9 +20,9 @@ void handle_sigint(int sig) { int main(int argc, char *argv[]) { int sfd; struct sockaddr_bp addr_bp; + struct msghdr msg; + struct iovec iov; char buffer[BUFFER_SIZE]; - struct iovec iov[1]; - struct msghdr *msg; uint32_t node_id; uint32_t service_id; int ret = 0; @@ -34,7 +34,6 @@ int main(int argc, char *argv[]) { signal(SIGINT, handle_sigint); - // Parse arguments node_id = (uint32_t)atoi(argv[1]); service_id = (uint32_t)atoi(argv[2]); @@ -48,7 +47,6 @@ int main(int argc, char *argv[]) { return EXIT_FAILURE; } - // Create the socket sfd = socket(AF_BP, SOCK_DGRAM, 1); if (sfd < 0) { perror("socket creation failed"); @@ -56,48 +54,34 @@ int main(int argc, char *argv[]) { } printf("Socket created.\n"); - // Fill sockaddr_bp memset(&addr_bp, 0, sizeof(addr_bp)); addr_bp.bp_family = AF_BP; addr_bp.bp_scheme = BP_SCHEME_IPN; addr_bp.bp_addr.ipn.node_id = node_id; addr_bp.bp_addr.ipn.service_id = service_id; - // Bind the socket if (bind(sfd, (struct sockaddr *)&addr_bp, sizeof(addr_bp)) == -1) { perror("Failed to bind socket"); ret = EXIT_FAILURE; goto out; } - // Prepare for receiving messages - msg = (struct msghdr *)malloc(sizeof(struct msghdr)); - if (!msg) { - perror("malloc failed"); - ret = EXIT_FAILURE; - goto out; - } - - memset(iov, 0, sizeof(iov)); - iov[0].iov_base = buffer; - iov[0].iov_len = sizeof(buffer); - - memset(msg, 0, sizeof(struct msghdr)); - msg->msg_iov = iov; - msg->msg_iovlen = 1; + iov.iov_base = buffer; + iov.iov_len = sizeof(buffer); + memset(&msg, 0, sizeof(msg)); + msg.msg_iov = &iov; + msg.msg_iovlen = 1; printf("Listening for incoming messages...\n"); - ssize_t n = recvmsg(sfd, msg, 0); + ssize_t n = recvmsg(sfd, &msg, 0); if (n < 0) { - perror("Failed to receive message"); - ret = EXIT_FAILURE; - goto out_free; - } else { - printf("Message received (%zd bytes): %s\n", n, buffer); + perror("recvmsg failed"); + ret = EXIT_FAILURE; + goto out; } + + printf("Received message (%zd bytes): %.*s\n", n, (int)n, buffer); -out_free: - free(msg); out: close(sfd); printf("Socket closed.\n"); diff --git a/tools/bp-demo-sender.c b/tools/bp-demo-sender.c index e5058da..3c358dc 100644 --- a/tools/bp-demo-sender.c +++ b/tools/bp-demo-sender.c @@ -9,15 +9,16 @@ #define AF_BP 28 int main(int argc, char *argv[]) { - int sockfd, ret; + struct sockaddr_bp dest_addr; + int fd; uint32_t node_id, service_id; + int ret = 0; if (argc < 3) { printf("Usage: %s \n", argv[0]); return EXIT_FAILURE; } - // Parse arguments node_id = (uint32_t)atoi(argv[1]); service_id = (uint32_t)atoi(argv[2]); @@ -31,36 +32,29 @@ int main(int argc, char *argv[]) { return EXIT_FAILURE; } - // Create a socket - sockfd = socket(AF_BP, SOCK_DGRAM, 0); - if (sockfd < 0) { + fd = socket(AF_BP, SOCK_DGRAM, 0); + if (fd < 0) { perror("socket creation failed"); return EXIT_FAILURE; } - // Prepare sockaddr_bp - struct sockaddr_bp dest_addr; - memset(&dest_addr, 0, sizeof(dest_addr)); dest_addr.bp_family = AF_BP; dest_addr.bp_scheme = BP_SCHEME_IPN; dest_addr.bp_addr.ipn.node_id = node_id; dest_addr.bp_addr.ipn.service_id = service_id; - // Message to send - const char *message = "Hello!"; - - ret = sendto(sockfd, message, strlen(message) + 1, 0, + char *message = "Hello!"; + ret = sendto(fd, message, strlen(message) + 1, 0, (struct sockaddr *)&dest_addr, sizeof(dest_addr)); if (ret < 0) { perror("sendto failed"); - close(sockfd); - return EXIT_FAILURE; + ret = EXIT_FAILURE; + goto out; } printf("Message sent successfully: %s\n", message); - // Clean up - close(sockfd); - - return EXIT_SUCCESS; +out: + close(fd); + return ret; }