Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 63 additions & 1 deletion bp_socket/af_bp.c
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,16 @@ static struct sock* bp_alloc_socket(struct net* net, int kern)
bp = bp_sk(sk);
skb_queue_head_init(&bp->rx_queue);
init_waitqueue_head(&bp->rx_waitq);
init_waitqueue_head(&bp->tx_waitq);

mutex_init(&bp->tx_mutex);
mutex_init(&bp->rx_mutex);

bp->bp_node_id = 0;
bp->bp_service_id = 0;
bp->rx_canceled = false;
bp->tx_confirmed = false;
bp->tx_error = false;
}

return sk;
Expand Down Expand Up @@ -159,6 +167,10 @@ int bp_bind(struct socket* sock, struct sockaddr* uaddr, int addr_len)
write_unlock_bh(&bp_list_lock);
release_sock(sk);

// Notify user-space daemon to open endpoint (bp_open) and prepare
// threads/state
open_endpoint_doit(node_id, service_id, 8443);

return 0;

out:
Expand All @@ -180,6 +192,13 @@ int bp_release(struct socket* sock)
write_unlock_bh(&bp_list_lock);
skb_queue_purge(&bp->rx_queue);

// Notify user-space daemon to close endpoint (bp_close) and
// cleanup
if (bp->bp_node_id && bp->bp_service_id) {
close_endpoint_doit(
bp->bp_node_id, bp->bp_service_id, 8443);
}

sock->sk = NULL;
release_sock(sk);
sock_put(sk);
Expand Down Expand Up @@ -279,6 +298,43 @@ int bp_sendmsg(struct socket* sock, struct msghdr* msg, size_t size)
goto err_free;
}

// Wait for confirmation from daemon
pr_info("bp_sendmsg: waiting for confirmation for endpoint "
"ipn:%u.%u\n",
bp->bp_node_id, bp->bp_service_id);

// Reset confirmation flag and wait
mutex_lock(&bp->tx_mutex);
bp->tx_confirmed = false;
bp->tx_error = false;
mutex_unlock(&bp->tx_mutex);

ret = wait_event_interruptible(
bp->tx_waitq, bp->tx_confirmed || signal_pending(current));

if (ret < 0) {
pr_err("bp_sendmsg: interrupted while waiting for "
"confirmation\n");
goto err_free;
}

mutex_lock(&bp->tx_mutex);
if (bp->tx_confirmed) {
if (bp->tx_error) {
pr_err("bp_sendmsg: bundle send failed for "
"endpoint ipn:%u.%u\n",
bp->bp_node_id, bp->bp_service_id);
mutex_unlock(&bp->tx_mutex);
ret = -EIO; // Return error to user space
goto err_free;
} else {
pr_info("bp_sendmsg: confirmation received for "
"endpoint ipn:%u.%u\n",
bp->bp_node_id, bp->bp_service_id);
}
}
mutex_unlock(&bp->tx_mutex);

kfree(payload);
}

Expand All @@ -294,7 +350,7 @@ int bp_recvmsg(struct socket* sock, struct msghdr* msg, size_t size, int flags)
{
struct sock* sk;
struct bp_sock* bp;
struct sk_buff* skb;
struct sk_buff* skb = NULL;
struct sockaddr_bp* src_addr;
int ret;

Expand All @@ -321,25 +377,31 @@ int bp_recvmsg(struct socket* sock, struct msghdr* msg, size_t size, int flags)
goto out;
}

mutex_lock(&bp->rx_mutex);
if (bp->rx_canceled) {
pr_info("bp_recvmsg: bundle request canceled\n");
mutex_unlock(&bp->rx_mutex);
ret = -ECANCELED;
goto out;
}
mutex_unlock(&bp->rx_mutex);

if (sock_flag(sk, SOCK_DEAD)) {
pr_err("bp_recvmsg: socket closed while waiting\n");
ret = -ESHUTDOWN;
goto out;
}

mutex_lock(&bp->rx_mutex);
skb = skb_dequeue(&bp->rx_queue);
if (!skb) {
pr_info("bp_recvmsg: no messages in the queue for service %d\n",
bp->bp_service_id);
mutex_unlock(&bp->rx_mutex);
ret = -ENOMSG;
goto out;
}
mutex_unlock(&bp->rx_mutex);

if (skb->len > size) {
pr_err("bp_recvmsg: buffer too small for message (required=%u, "
Expand Down
11 changes: 10 additions & 1 deletion bp_socket/af_bp.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,19 @@ extern const struct net_proto_family bp_family_ops;
struct bp_sock {
struct sock sk;
u_int32_t bp_node_id;
u_int8_t bp_service_id;
u_int32_t bp_service_id;

// Reception
struct mutex rx_mutex;
struct sk_buff_head rx_queue;
wait_queue_head_t rx_waitq;
bool rx_canceled;

// Transmission
struct mutex tx_mutex;
wait_queue_head_t tx_waitq;
bool tx_confirmed;
bool tx_error;
};

int bp_bind(struct socket* sock, struct sockaddr* addr, int addr_len);
Expand Down
206 changes: 206 additions & 0 deletions bp_socket/bp_genl.c
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include "bp_genl.h"
#include "../include/bp_socket.h"
#include "af_bp.h"
#include <linux/sched.h>
#include <net/genetlink.h>

static const struct nla_policy nla_policy[BP_GENL_A_MAX + 1] = {
Expand All @@ -10,6 +11,7 @@ static const struct nla_policy nla_policy[BP_GENL_A_MAX + 1] = {
[BP_GENL_A_DEST_NODE_ID] = { .type = NLA_U32 },
[BP_GENL_A_DEST_SERVICE_ID] = { .type = NLA_U32 },
[BP_GENL_A_PAYLOAD] = { .type = NLA_BINARY },
[BP_GENL_A_ERROR_CODE] = { .type = NLA_U32 },
};

static struct genl_ops genl_ops[] = { {
Expand All @@ -25,6 +27,20 @@ static struct genl_ops genl_ops[] = { {
.policy = nla_policy,
.doit = cancel_bundle_request_doit,
.dumpit = NULL,
},
{
.cmd = BP_GENL_CMD_SEND_BUNDLE_CONFIRMATION,
.flags = GENL_ADMIN_PERM,
.policy = nla_policy,
.doit = send_bundle_confirmation_doit,
.dumpit = NULL,
},
{
.cmd = BP_GENL_CMD_SEND_BUNDLE_FAILURE,
.flags = GENL_ADMIN_PERM,
.policy = nla_policy,
.doit = send_bundle_failure_doit,
.dumpit = NULL,
} };

/* Multicast groups for our family */
Expand Down Expand Up @@ -196,7 +212,9 @@ int cancel_bundle_request_doit(struct sk_buff* skb, struct genl_info* info)
&& bp->bp_service_id == dest_service_id) {

if (waitqueue_active(&bp->rx_waitq)) {
mutex_lock(&bp->rx_mutex);
bp->rx_canceled = true;
mutex_unlock(&bp->rx_mutex);
wake_up_interruptible(&bp->rx_waitq);
}
bh_unlock_sock(sk);
Expand Down Expand Up @@ -256,7 +274,9 @@ int deliver_bundle_doit(struct sk_buff* skb, struct genl_info* info)
if (bp->bp_node_id == dest_node_id
&& bp->bp_service_id == dest_service_id) {

mutex_lock(&bp->rx_mutex);
skb_queue_tail(&bp->rx_queue, new_skb);
mutex_unlock(&bp->rx_mutex);
new_skb_queued = true;
if (waitqueue_active(&bp->rx_waitq)) {
wake_up_interruptible(&bp->rx_waitq);
Expand Down Expand Up @@ -326,6 +346,192 @@ int destroy_bundle_doit(
genlmsg_end(msg, msg_head);
return genlmsg_unicast(&init_net, msg, port_id);

err_cancel:
genlmsg_cancel(msg, msg_head);
err_free:
nlmsg_free(msg);
out:
return ret;
}

int open_endpoint_doit(u_int32_t node_id, u_int32_t service_id, int port_id)
{
void* msg_head;
struct sk_buff* msg;
size_t msg_size;
int ret;

msg_size = 2 * nla_total_size(sizeof(u_int32_t));
msg = genlmsg_new(msg_size, GFP_KERNEL);
if (!msg) {
pr_err("open_endpoint: failed to allocate message buffer\n");
ret = -ENOMEM;
goto out;
}

msg_head
= genlmsg_put(msg, 0, 0, &genl_fam, 0, BP_GENL_CMD_OPEN_ENDPOINT);
if (!msg_head) {
pr_err("open_endpoint: failed to create genetlink header\n");
ret = -EMSGSIZE;
goto err_free;
}

ret = nla_put_u32(msg, BP_GENL_A_DEST_NODE_ID, node_id);
if (ret) {
pr_err("open_endpoint: failed to put BP_GENL_A_DEST_NODE_ID "
"(%d)\n",
ret);
goto err_cancel;
}

ret = nla_put_u32(msg, BP_GENL_A_DEST_SERVICE_ID, service_id);
if (ret) {
pr_err("open_endpoint: failed to put BP_GENL_A_DEST_SERVICE_ID "
"(%d)\n",
ret);
goto err_cancel;
}

genlmsg_end(msg, msg_head);
return genlmsg_unicast(&init_net, msg, port_id);

err_cancel:
genlmsg_cancel(msg, msg_head);
err_free:
nlmsg_free(msg);
out:
return ret;
}

int send_bundle_confirmation_doit(struct sk_buff* skb, struct genl_info* info)
{
u_int32_t src_node_id, src_service_id;
struct sock* sk;
struct bp_sock* bp;

if (!info->attrs[BP_GENL_A_SRC_NODE_ID]
|| !info->attrs[BP_GENL_A_SRC_SERVICE_ID]) {
pr_err("send_bundle_confirmation_doit: missing attribute(s)\n");
return -EINVAL;
}

src_node_id = nla_get_u32(info->attrs[BP_GENL_A_SRC_NODE_ID]);
src_service_id = nla_get_u32(info->attrs[BP_GENL_A_SRC_SERVICE_ID]);

pr_info("send_bundle_confirmation_doit: received confirmation for "
"ipn:%u.%u\n",
src_node_id, src_service_id);

// Find the socket waiting for confirmation
read_lock_bh(&bp_list_lock);
sk_for_each(sk, &bp_list)
{
bp = bp_sk(sk);
if (bp->bp_node_id == src_node_id
&& bp->bp_service_id == src_service_id) {
// Found the socket, wake it up
mutex_lock(&bp->tx_mutex);
bp->tx_confirmed = true;
mutex_unlock(&bp->tx_mutex);
wake_up(&bp->tx_waitq);
pr_info(
"send_bundle_confirmation_doit: woke up socket for "
"ipn:%u.%u\n",
src_node_id, src_service_id);
break;
}
}
read_unlock_bh(&bp_list_lock);

return 0;
}

int send_bundle_failure_doit(struct sk_buff* skb, struct genl_info* info)
{
u_int32_t src_node_id, src_service_id;
struct sock* sk;
struct bp_sock* bp;

if (!info->attrs[BP_GENL_A_SRC_NODE_ID]
|| !info->attrs[BP_GENL_A_SRC_SERVICE_ID]) {
pr_err("send_bundle_failure_doit: missing attribute(s)\n");
return -EINVAL;
}

src_node_id = nla_get_u32(info->attrs[BP_GENL_A_SRC_NODE_ID]);
src_service_id = nla_get_u32(info->attrs[BP_GENL_A_SRC_SERVICE_ID]);

pr_info("send_bundle_failure_doit: received failure notification for "
"ipn:%u.%u\n",
src_node_id, src_service_id);

// Find the socket waiting for confirmation and mark it as failed
read_lock_bh(&bp_list_lock);
sk_for_each(sk, &bp_list)
{
bp = bp_sk(sk);
if (bp->bp_node_id == src_node_id
&& bp->bp_service_id == src_service_id) {
// Found the socket, mark as failed and wake it up
mutex_lock(&bp->tx_mutex);
bp->tx_confirmed = true;
bp->tx_error = true;
mutex_unlock(&bp->tx_mutex);
wake_up(&bp->tx_waitq);
pr_info("send_bundle_failure_doit: woke up socket for "
"ipn:%u.%u with failure\n",
src_node_id, src_service_id);
break;
}
}
read_unlock_bh(&bp_list_lock);

return 0;
}

int close_endpoint_doit(u_int32_t node_id, u_int32_t service_id, int port_id)
{
void* msg_head;
struct sk_buff* msg;
size_t msg_size;
int ret;

msg_size = 2 * nla_total_size(sizeof(u_int32_t));
msg = genlmsg_new(msg_size, GFP_KERNEL);
if (!msg) {
pr_err("close_endpoint: failed to allocate message buffer\n");
ret = -ENOMEM;
goto out;
}

msg_head
= genlmsg_put(msg, 0, 0, &genl_fam, 0, BP_GENL_CMD_CLOSE_ENDPOINT);
if (!msg_head) {
pr_err("close_endpoint: failed to create genetlink header\n");
ret = -EMSGSIZE;
goto err_free;
}

ret = nla_put_u32(msg, BP_GENL_A_DEST_NODE_ID, node_id);
if (ret) {
pr_err("close_endpoint: failed to put BP_GENL_A_DEST_NODE_ID "
"(%d)\n",
ret);
goto err_cancel;
}

ret = nla_put_u32(msg, BP_GENL_A_DEST_SERVICE_ID, service_id);
if (ret) {
pr_err("close_endpoint: failed to put "
"BP_GENL_A_DEST_SERVICE_ID (%d)\n",
ret);
goto err_cancel;
}

genlmsg_end(msg, msg_head);
return genlmsg_unicast(&init_net, msg, port_id);

err_cancel:
genlmsg_cancel(msg, msg_head);
err_free:
Expand Down
Loading