Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ demo-app-bp-recv
.vagrant
sender
receiver
bp_client
tests/sender
tests/receiver
daemon/bp_daemon
Expand Down
204 changes: 204 additions & 0 deletions bp_client.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
#include "include/bp_socket.h"
#include <errno.h>
#include <pthread.h>
#include <signal.h>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <time.h>
#include <unistd.h>

#define BUFFER_SIZE 1024
#define AF_BP 28

volatile int running = 1;

struct client_data {
int fd;
struct sockaddr_bp dest_addr;
struct sockaddr_bp src_addr;
};

void handle_sigint(int sig) {
printf("\nInterrupt received, shutting down...\n");
running = 0;
}

void *send_thread(void *arg) {
struct client_data *data = (struct client_data *)arg;
char send_buffer[BUFFER_SIZE];
int message_count = 0;

printf("Send thread started\n");

while (running) {
message_count++;
snprintf(send_buffer, sizeof(send_buffer), "Hello from client! Message #%d",
message_count);

int flags = 0;
flags |= MSG_ACK_REQUESTED;

int ret =
sendto(data->fd, send_buffer, strlen(send_buffer) + 1, flags,
(struct sockaddr *)&data->dest_addr, sizeof(data->dest_addr));
if (ret < 0) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
continue;
}
printf("sendto failed for ipn:%u.%u: %s\n",
data->dest_addr.bp_addr.ipn.node_id,
data->dest_addr.bp_addr.ipn.service_id, strerror(errno));
break;
}

printf("Message sent: %s\n", send_buffer);
}

printf("Send thread exiting\n");
return NULL;
}

void *receive_thread(void *arg) {
struct client_data *data = (struct client_data *)arg;
struct msghdr msg;
struct iovec iov;
char buffer[BUFFER_SIZE];
struct sockaddr_bp src_addr;

// Set up message structure for receiving
iov.iov_base = buffer;
iov.iov_len = sizeof(buffer);
memset(&msg, 0, sizeof(msg));
msg.msg_iov = &iov;
msg.msg_iovlen = 1;
memset(&src_addr, 0, sizeof(src_addr));
msg.msg_name = &src_addr;
msg.msg_namelen = sizeof(src_addr);

printf("Receive thread started\n");

while (running) {
ssize_t n = recvmsg(data->fd, &msg, 0);
if (n < 0) {
if (errno == EINTR) {
printf("\nInterrupted by signal, exiting...\n");
break;
}
if (errno == EAGAIN || errno == EWOULDBLOCK) {
// Timeout reached, continue loop to check running flag
continue;
}
perror("recvmsg failed");
break;
}

printf("Received message (%zd bytes): %.*s\n", n, (int)n, buffer);
if (msg.msg_namelen >= sizeof(struct sockaddr_bp)) {
printf("Bundle sent by ipn:%u.%u\n", src_addr.bp_addr.ipn.node_id,
src_addr.bp_addr.ipn.service_id);
}

memset(&msg, 0, sizeof(msg));
msg.msg_iov = &iov;
msg.msg_iovlen = 1;
memset(&src_addr, 0, sizeof(src_addr));
msg.msg_name = &src_addr;
msg.msg_namelen = sizeof(src_addr);
}

printf("Receive thread exiting\n");
return NULL;
}

int main(int argc, char *argv[]) {
struct sockaddr_bp dest_addr, src_addr;
int fd;
uint32_t node_id, service_id;
int ret = 0;
pthread_t send_tid, recv_tid;
struct client_data data;

if (argc < 3) {
printf("Usage: %s <node_id> <service_id>\n", argv[0]);
return EXIT_FAILURE;
}

signal(SIGINT, handle_sigint);

node_id = (uint32_t)atoi(argv[1]);
service_id = (uint32_t)atoi(argv[2]);

if (service_id < 1 || service_id > 255) {
fprintf(stderr, "Invalid service_id (must be in 1-255)\n");
return EXIT_FAILURE;
}

if (node_id == 0) {
fprintf(stderr, "Invalid node_id (cannot be 0)\n");
return EXIT_FAILURE;
}

fd = socket(AF_BP, SOCK_DGRAM, 0);
if (fd < 0) {
perror("socket creation failed");
return EXIT_FAILURE;
}

src_addr.bp_family = AF_BP;
src_addr.bp_scheme = BP_SCHEME_IPN;
src_addr.bp_addr.ipn.node_id = 10;
src_addr.bp_addr.ipn.service_id = 2;
if (bind(fd, (struct sockaddr *)&src_addr, sizeof(src_addr)) == -1) {
perror("Failed to bind socket");
ret = EXIT_FAILURE;
goto out;
}

struct timeval timeout;
timeout.tv_sec = 1; // 1 second timeout
timeout.tv_usec = 0;
if (setsockopt(fd, SOL_SOCKET, SO_RCVTIMEO, &timeout, sizeof(timeout)) < 0) {
perror("Failed to set socket timeout");
ret = EXIT_FAILURE;
goto out;
}

dest_addr.bp_family = AF_BP;
dest_addr.bp_scheme = BP_SCHEME_IPN;
dest_addr.bp_addr.ipn.node_id = node_id;
dest_addr.bp_addr.ipn.service_id = service_id;

data.fd = fd;
data.dest_addr = dest_addr;
data.src_addr = src_addr;

printf("BP Client started - sending messages to ipn:%u.%u\n", node_id,
service_id);
printf("Press Ctrl+C to exit.\n");

if (pthread_create(&send_tid, NULL, send_thread, &data) != 0) {
perror("Failed to create send thread");
ret = EXIT_FAILURE;
goto out;
}

if (pthread_create(&recv_tid, NULL, receive_thread, &data) != 0) {
perror("Failed to create receive thread");
running = 0;
pthread_join(send_tid, NULL);
ret = EXIT_FAILURE;
goto out;
}

pthread_join(send_tid, NULL);
pthread_join(recv_tid, NULL);

out:
close(fd);
printf("Socket closed.\n");
return ret;
}
9 changes: 3 additions & 6 deletions bp_socket/af_bp.c
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ int bp_sendmsg(struct socket* sock, struct msghdr* msg, size_t size)
}

if (size > 0) {
payload = kmalloc(size, GFP_KERNEL);
payload = kmalloc(size, GFP_ATOMIC);
if (!payload) {
pr_err("bp_sendmsg: failed to allocate memory\n");
ret = -ENOMEM;
Expand All @@ -299,17 +299,14 @@ int bp_sendmsg(struct socket* sock, struct msghdr* msg, size_t size)
}

ret = send_bundle_doit(payload, size, dest_node_id,
dest_service_id, bp->bp_node_id, bp->bp_service_id, 8443);
dest_service_id, bp->bp_node_id, bp->bp_service_id,
msg->msg_flags, 8443);
if (ret < 0) {
pr_err(
"bp_sendmsg: send_bundle_doit failed (%d)\n", ret);
goto err_free;
}

pr_info("bp_sendmsg: bundle sent for endpoint ipn:%u.%u (size: "
"%zu)\n",
bp->bp_node_id, bp->bp_service_id, size);

kfree(payload);
}

Expand Down
12 changes: 10 additions & 2 deletions bp_socket/bp_genl.c
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ static const struct nla_policy nla_policy[BP_GENL_A_MAX + 1] = {
[BP_GENL_A_DEST_SERVICE_ID] = { .type = NLA_U32 },
[BP_GENL_A_PAYLOAD] = { .type = NLA_BINARY },
[BP_GENL_A_ADU] = { .type = NLA_U64 },
[BP_GENL_A_FLAGS] = { .type = NLA_U32 },
};

static struct genl_ops genl_ops[] = { {
Expand Down Expand Up @@ -41,14 +42,14 @@ struct genl_family genl_fam = {

int send_bundle_doit(void* payload, size_t payload_size, u_int32_t dest_node_id,
u_int32_t dest_service_id, u_int32_t src_node_id, u_int32_t src_service_id,
int port_id)
u_int32_t flags, int port_id)
{
void* msg_head;
struct sk_buff* msg;
size_t msg_size;
int ret;

msg_size = 4 * nla_total_size(sizeof(u_int32_t))
msg_size = 5 * nla_total_size(sizeof(u_int32_t))
+ nla_total_size(payload_size);
msg = genlmsg_new(msg_size, GFP_KERNEL);
if (!msg) {
Expand Down Expand Up @@ -104,6 +105,13 @@ int send_bundle_doit(void* payload, size_t payload_size, u_int32_t dest_node_id,
goto err_cancel;
}

ret = nla_put_u32(msg, BP_GENL_A_FLAGS, flags);
if (ret) {
pr_err(
"send_bundle: failed to put BP_GENL_A_FLAGS (%d)\n", ret);
goto err_cancel;
}

genlmsg_end(msg, msg_head);
return genlmsg_unicast(&init_net, msg, port_id);

Expand Down
2 changes: 1 addition & 1 deletion bp_socket/bp_genl.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ int open_endpoint_doit(u_int32_t node_id, u_int32_t service_id, int port_id);
int close_endpoint_doit(u_int32_t node_id, u_int32_t service_id, int port_id);
int send_bundle_doit(void* payload, size_t payload_size, u_int32_t dest_node_id,
u_int32_t dest_service_id, u_int32_t src_node_id, u_int32_t src_service_id,
int port_id);
u_int32_t flags, int port_id);
int enqueue_bundle_doit(struct sk_buff* skb, struct genl_info* info);
int destroy_bundle_doit(uint64_t adu, int port_id);

Expand Down
53 changes: 13 additions & 40 deletions daemon/bp_genl_handlers.c
Original file line number Diff line number Diff line change
Expand Up @@ -67,17 +67,15 @@ int handle_send_bundle(Daemon *daemon, struct nlattr **attrs) {
u_int32_t dest_node_id, dest_service_id, src_node_id, src_service_id;
char dest_eid[64];
int written;
pthread_t thread;
struct ion_send_args *args;
struct endpoint_ctx *ctx;
void *payload_copy;
u_int32_t flags;
int ret;

if (!attrs[BP_GENL_A_PAYLOAD] || !attrs[BP_GENL_A_DEST_NODE_ID] ||
!attrs[BP_GENL_A_DEST_SERVICE_ID] || !attrs[BP_GENL_A_SRC_NODE_ID] ||
!attrs[BP_GENL_A_SRC_SERVICE_ID]) {
!attrs[BP_GENL_A_SRC_SERVICE_ID] || !attrs[BP_GENL_A_FLAGS]) {
log_error(
"handle_send_bundle: missing attribute(s) in SEND_BUNDLE command (payload, node ID, "
"service ID)");
"service ID, flags)");
return -EINVAL;
}

Expand All @@ -87,6 +85,7 @@ int handle_send_bundle(Daemon *daemon, struct nlattr **attrs) {
dest_service_id = nla_get_u32(attrs[BP_GENL_A_DEST_SERVICE_ID]);
src_node_id = nla_get_u32(attrs[BP_GENL_A_SRC_NODE_ID]);
src_service_id = nla_get_u32(attrs[BP_GENL_A_SRC_SERVICE_ID]);
flags = nla_get_u32(attrs[BP_GENL_A_FLAGS]);

written = snprintf(dest_eid, sizeof(dest_eid), "ipn:%u.%u", dest_node_id, dest_service_id);
if (written < 0 || written >= (int)sizeof(dest_eid)) {
Expand All @@ -95,42 +94,16 @@ int handle_send_bundle(Daemon *daemon, struct nlattr **attrs) {
return -EINVAL;
}

ctx = endpoint_registry_get(src_node_id, src_service_id);
if (!ctx) {
log_error("[ipn:%u.%u] handle_send_bundle: no endpoint for ipn:%u.%u", src_node_id,
src_service_id, src_node_id, src_service_id);
return -ENODEV;
ret = endpoint_registry_enqueue_send(src_node_id, src_service_id, dest_eid, payload,
payload_size, flags);
if (ret < 0) {
log_error("[ipn:%u.%u] handle_send_bundle: failed to enqueue send (error: %d)", src_node_id,
src_service_id, ret);
return ret;
}

payload_copy = malloc(payload_size);
if (!payload_copy) {
log_error("[ipn:%u.%u] handle_send_bundle: failed to allocate payload", src_node_id,
src_service_id);
return -ENOMEM;
}
memcpy(payload_copy, payload, payload_size);

// Enqueue to send thread using source endpoint SAP
// Launch async send thread
args = malloc(sizeof(struct ion_send_args));
if (!args) return -ENOMEM;
args->node_id = src_node_id;
args->service_id = src_service_id;
args->dest_eid = strndup(dest_eid, sizeof(dest_eid));
args->netlink_sock = daemon->genl_bp_sock;
args->netlink_mutex = &daemon->netlink_mutex;
args->netlink_family = daemon->genl_bp_family_id;
args->payload = payload_copy;
args->payload_size = payload_size;

if (pthread_create(&thread, NULL, ion_send_thread, args) != 0) {
log_error("[ipn:%u.%u] handle_send_bundle: failed to create send thread", src_node_id,
src_service_id);
free(args->dest_eid);
free(args->payload);
free(args);
return -errno;
}
log_info("[ipn:%u.%u] SEND_BUNDLE: bundle queued for sending to EID %s, size %zu (bytes)",
src_node_id, src_service_id, dest_eid, payload_size);

return 0;
}
Expand Down
Loading