Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 2 additions & 6 deletions bp_socket/af_bp.c
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,6 @@ int bp_bind(struct socket* sock, struct sockaddr* uaddr, int addr_len)
struct sockaddr_bp* addr = (struct sockaddr_bp*)uaddr;
int service_id = -1;
int node_id = -1;
int err = 0;

if (addr_len != sizeof(struct sockaddr_bp))
return -EINVAL;
Expand Down Expand Up @@ -132,8 +131,7 @@ int bp_bind(struct socket* sock, struct sockaddr* uaddr, int addr_len)
if (iter_bp->bp_service_id == service_id
&& iter_bp->bp_node_id == node_id) {
read_unlock_bh(&bp_list_lock);
err = -EADDRINUSE;
goto out;
return -EADDRINUSE;
}
}
read_unlock_bh(&bp_list_lock);
Expand All @@ -146,10 +144,8 @@ int bp_bind(struct socket* sock, struct sockaddr* uaddr, int addr_len)
sk_add_node(sk, &bp_list);
write_unlock_bh(&bp_list_lock);

out:
release_sock(sk);

return err;
return 0;
}

int bp_release(struct socket* sock)
Expand Down
16 changes: 6 additions & 10 deletions bp_socket/bp_genl.c
Original file line number Diff line number Diff line change
Expand Up @@ -162,16 +162,12 @@ int deliver_bundle_doit(struct sk_buff* skb, struct genl_info* info)

pr_info("TRIGGER: received message\n");

if (!info->attrs[BP_GENL_A_SERVICE_ID]) {
pr_err("attribute missing from message\n");
if (!info->attrs[BP_GENL_A_SERVICE_ID]
|| !info->attrs[BP_GENL_A_PAYLOAD]) {
pr_err("deliver_bundle: missing required attributes\n");
return -EINVAL;
}
service_id = nla_get_u32(info->attrs[BP_GENL_A_SERVICE_ID]);

if (!info->attrs[BP_GENL_A_PAYLOAD]) {
pr_err("empty message received\n");
return -EINVAL;
}
payload = nla_data(info->attrs[BP_GENL_A_PAYLOAD]);
payload_len = nla_len(info->attrs[BP_GENL_A_PAYLOAD]);

Expand All @@ -187,18 +183,18 @@ int deliver_bundle_doit(struct sk_buff* skb, struct genl_info* info)
read_lock_bh(&bp_list_lock);
sk_for_each(sk, &bp_list)
{
lock_sock(sk);
bh_lock_sock(sk);
bp = bp_sk(sk);

if (bp->bp_service_id == service_id) {

skb_queue_tail(&bp->queue, new_skb);
if (waitqueue_active(&bp->wait_queue))
wake_up_interruptible(&bp->wait_queue);
release_sock(sk);
bh_unlock_sock(sk);
break;
}
release_sock(sk);
bh_unlock_sock(sk);
}
read_unlock_bh(&bp_list_lock);

Expand Down
47 changes: 29 additions & 18 deletions daemon/bp_genl_handlers.c
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ int handle_send_bundle(Daemon *daemon, struct nlattr **attrs) {

log_info("SEND_BUNDLE: sockid=%lu, EID=%s, payload size=%d", sockid, eid, payload_size);

return bp_send_to_eid(payload, payload_size, eid, eid_size + 1);
return bp_send_to_eid(daemon->sdr, payload, payload_size, eid, eid_size + 1);
}

int handle_request_bundle(Daemon *daemon, struct nlattr **attrs) {
Expand All @@ -44,7 +44,7 @@ int handle_request_bundle(Daemon *daemon, struct nlattr **attrs) {
}

uint32_t service_id = nla_get_u32(attrs[BP_GENL_A_SERVICE_ID]);
log_info("REQUEST_BUNDLE: Bundle request initiated (service ID %u)", service_id);
log_info("REQUEST_BUNDLE: bundle request initiated (service ID %u)", service_id);

struct thread_args *args = malloc(sizeof(struct thread_args));
if (!args) {
Expand All @@ -55,9 +55,10 @@ int handle_request_bundle(Daemon *daemon, struct nlattr **attrs) {
args->service_id = service_id;
args->netlink_sock = daemon->genl_bp_sock;
args->netlink_family = daemon->genl_bp_family_id;
args->sdr = daemon->sdr;

pthread_t thread;
if (pthread_create(&thread, NULL, bp_recv_thread, args) != 0) {
if (pthread_create(&thread, NULL, handle_recv_thread, args) != 0) {
log_error("Failed to create thread");
free(args);
return -1;
Expand All @@ -68,19 +69,26 @@ int handle_request_bundle(Daemon *daemon, struct nlattr **attrs) {
return 0;
}

void *bp_recv_thread(void *arg) {
void *handle_recv_thread(void *arg) {
struct thread_args *args = (struct thread_args *)arg;
handle_deliver_bundle(args);
char *payload = NULL;
int payload_size;

payload_size = bp_recv_once(args->sdr, args->service_id, &payload);
if (payload_size < 0) {
log_info("Exit recv thread (service ID %u)", args->service_id);
free(args);
return NULL;
}

handle_deliver_bundle(payload, payload_size, args);

free(args);
return NULL;
}

int handle_deliver_bundle(struct thread_args *args) {
char *payload = bp_recv_once(args->service_id);
if (!payload) {
log_error("DELIVER_BUNDLE: No payload received (service ID %u)", args->service_id);
return -1;
}
int handle_deliver_bundle(char *payload, int payload_size, struct thread_args *args) {
int err = 0;

struct nl_msg *msg = nlmsg_alloc();
if (!msg) {
Expand All @@ -92,23 +100,26 @@ int handle_deliver_bundle(struct thread_args *args) {
void *hdr = genlmsg_put(msg, NL_AUTO_PORT, NL_AUTO_SEQ, args->netlink_family, 0, 0,
BP_GENL_CMD_DELIVER_BUNDLE, BP_GENL_VERSION);
if (!hdr || nla_put_u32(msg, BP_GENL_A_SERVICE_ID, args->service_id) < 0 ||
nla_put(msg, BP_GENL_A_PAYLOAD, strlen(payload) + 1, payload) < 0) {
nla_put(msg, BP_GENL_A_PAYLOAD, payload_size, payload) < 0) {
log_error("DELIVER_BUNDLE: Failed to construct Netlink reply");
nlmsg_free(msg);
free(payload);
return -EMSGSIZE;
}

int err = nl_send_auto(args->netlink_sock, msg);
nlmsg_free(msg);
free(payload);

err = nl_send_auto(args->netlink_sock, msg);
if (err < 0) {
log_error("DELIVER_BUNDLE: Failed to send Netlink message (service ID %u)",
args->service_id);
nlmsg_free(msg);
free(payload);
return err;
}

log_info("DELIVER_BUNDLE: Bundle successfully delivered (service ID %u)", args->service_id);
return 0;
log_info("DELIVER_BUNDLE: received bundle and forwarding to kernel (service ID %u)",
args->service_id);

nlmsg_free(msg);
free(payload);
return err;
}
7 changes: 5 additions & 2 deletions daemon/bp_genl_handlers.h
Original file line number Diff line number Diff line change
@@ -1,17 +1,20 @@
#ifndef BP_GENL_HANDLERS_H
#define BP_GENL_HANDLERS_H

#include "bp.h"
#include "daemon.h"

struct thread_args {
struct nl_sock *netlink_sock;
int netlink_family;
unsigned int service_id;
Sdr sdr;
};

int handle_send_bundle(Daemon *daemon, struct nlattr **attrs);
int handle_request_bundle(Daemon *daemon, struct nlattr **attrs);
int handle_deliver_bundle(struct thread_args *args);
void *bp_recv_thread(void *arg);
int handle_deliver_bundle(char *payload, int payload_size, struct thread_args *args);

void *handle_recv_thread(void *arg);

#endif
4 changes: 3 additions & 1 deletion daemon/daemon.c
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ void on_sigpipe(evutil_socket_t fd, short what, void *arg) {
event_base_loopexit(base, NULL);
}

void on_netlink(int fd, short event, void *arg) {
void on_netlink(evutil_socket_t fd, short event, void *arg) {
Daemon *daemon = (Daemon *)arg;
nl_recvmsgs_default(
daemon->genl_bp_sock); // call the callback registered with genl_bp_sock_recvmsg_cb()
Expand Down Expand Up @@ -68,13 +68,15 @@ int daemon_start(Daemon *self) {
daemon_free(self);
return 1;
}
self->sdr = bp_get_sdr();
log_info("Successfully attached to ION");

log_info("Daemon started successfully");
event_base_dispatch(self->base);
log_info("Daemon terminated");

daemon_free(self);
bp_detach();

return 0;
}
Expand Down
2 changes: 2 additions & 0 deletions daemon/daemon.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#ifndef DAEMON_H
#define DAEMON_H

#include "bp.h"
#include <event2/event.h>
#include <netlink/socket.h>

Expand All @@ -9,6 +10,7 @@ typedef struct Daemon {
char *genl_bp_family_name;
int genl_bp_family_id;
unsigned int nl_pid;
Sdr sdr;

struct event_base *base;
struct event *event_on_sigpipe;
Expand Down
97 changes: 53 additions & 44 deletions daemon/ion.c
Original file line number Diff line number Diff line change
@@ -1,18 +1,29 @@
#include "log.h"
#include "sdr.h"
#include <bp.h>
#include <pthread.h>

static pthread_mutex_t sdrmutex = PTHREAD_MUTEX_INITIALIZER;

const char *bp_result_text(BpIndResult result) {
switch (result) {
case BpPayloadPresent:
return "BpPayloadPresent";
case BpReceptionTimedOut:
return "BpReceptionTimedOut";
case BpReceptionInterrupted:
return "BpReceptionInterrupted";
case BpEndpointStopped:
return "BpEndpointStopped";
default:
return "Unknown";
}
}

int bp_send_to_eid(char *payload, int payload_size, char *destEid, int eid_size) {
Sdr sdr;
int bp_send_to_eid(Sdr sdr, char *payload, int payload_size, char *dest_eid, int eid_size) {
Object sdrBuffer;
Object zco;

sdr = bp_get_sdr();
if (sdr == NULL) {
log_error("*** Failed to get sdr.");
return 0;
}

oK(sdr_begin_xn(sdr));

sdrBuffer = sdr_malloc(sdr, payload_size);
Expand All @@ -31,7 +42,7 @@ int bp_send_to_eid(char *payload, int payload_size, char *destEid, int eid_size)
return 0;
}

if (bp_send(NULL, destEid, NULL, 86400, BP_STD_PRIORITY, 0, 0, 0, NULL, zco, NULL) <= 0) {
if (bp_send(NULL, dest_eid, NULL, 86400, BP_STD_PRIORITY, 0, 0, 0, NULL, zco, NULL) <= 0) {
sdr_end_xn(sdr);
log_error("bp_send failed.");
return 0;
Expand All @@ -41,64 +52,62 @@ int bp_send_to_eid(char *payload, int payload_size, char *destEid, int eid_size)
return 1;
}

char *bp_recv_once(int service_id) {
BpSAP txSap;
int bp_recv_once(Sdr sdr, int service_id, char **payload) {
BpSAP sap;
BpDelivery dlv;
Sdr sdr = getIonsdr();
ZcoReader reader;
char *eid = NULL;
char *payload = NULL;
int bundle_len;
int rc = -1;
int eid_size;
char eid[64];
int nodeNbr = getOwnNodeNbr();
vast len;

eid_size = snprintf(NULL, 0, "ipn:%d.%d", nodeNbr, service_id) + 1;
eid = malloc(eid_size);
if (!eid) {
log_error("Failed to allocate EID");
return NULL;
eid_size = snprintf(eid, sizeof(eid), "ipn:%d.%d", nodeNbr, service_id);
if (eid_size < 0 || eid_size >= sizeof(eid)) {
log_error("Failed to construct EID string.");
return -1;
}
snprintf(eid, eid_size, "ipn:%d.%d", nodeNbr, service_id);

if (bp_open(eid, &txSap) < 0 || txSap == NULL) {
log_error("Failed to open source endpoint.");
if (bp_open(eid, &sap) < 0) {
log_error("bp_recv_once: Failed to open BpSAP for node_id=%d service_id=%d", nodeNbr,
service_id);
return -1;
}

if (bp_receive(sap, &dlv, BP_BLOCKING) < 0) {
log_error("bp_recv_once: Bundle reception failed.");
goto out;
}

if (bp_receive(txSap, &dlv, BP_BLOCKING) < 0) {
log_error("Bundle reception failed.");
if (dlv.result != BpPayloadPresent || dlv.adu == 0) {
log_error("bp_recv_once: %s", bp_result_text(dlv.result));
goto out;
}

if (dlv.result != BpPayloadPresent) {
log_info("bp_recv_once: no payload");
if (pthread_mutex_lock(&sdrmutex) != 0) {
putErrmsg("Couldn't take sdr mutex.", NULL);
goto out;
}

if (!sdr_begin_xn(sdr)) goto out;
if (sdr_begin_xn(sdr) == 0) goto out;

int payload_size = zco_source_data_length(sdr, dlv.adu);
payload = malloc(payload_size);
if (!payload) {
log_error("Failed to allocate memory for payload");
sdr_exit_xn(sdr);
goto out;
bundle_len = zco_source_data_length(sdr, dlv.adu);
*payload = malloc(bundle_len);
if (!*payload) {
log_error("bp_recv_once: Failed to allocate memory for payload.");
goto unlock_sdr;
}

zco_start_receiving(dlv.adu, &reader);
len = zco_receive_source(sdr, &reader, payload_size, payload);
rc = zco_receive_source(sdr, &reader, bundle_len, *payload);

if (sdr_end_xn(sdr) < 0 || len < 0) {
log_error("Failed to read payload");
free(payload);
payload = NULL;
goto out;
}
sdr_end_xn(sdr);
unlock_sdr:
pthread_mutex_unlock(&sdrmutex);

out:
if (eid) free(eid);
bp_release_delivery(&dlv, 0);
bp_close(txSap);
bp_close(sap);

return payload;
return rc;
}
6 changes: 4 additions & 2 deletions daemon/ion.h
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
#ifndef ION_H
#define ION_H

int bp_send_to_eid(char *payload, int payload_size, char *eid, int eid_size);
char *bp_recv_once(int service_id);
#include "bp.h"

int bp_send_to_eid(Sdr sdr, char *payload, int payload_size, char *dest_eid, int eid_size);
int bp_recv_once(Sdr sdr, int service_id, char **payload);

#endif
Loading