Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
264 changes: 194 additions & 70 deletions bp_socket/af_bp.c
Original file line number Diff line number Diff line change
@@ -1,7 +1,14 @@
#include <linux/init.h>
#include <linux/jiffies.h>
#include <linux/kernel.h>
#include <linux/module.h>
#include <linux/string.h>
#include <linux/net.h>
#include <linux/skbuff.h>
#include <linux/socket.h>
#include <linux/timekeeping.h>
#include <linux/types.h>
#include <net/sock.h>
#include <uapi/linux/time.h>

#include "../include/bp_socket.h"
#include "af_bp.h"
Expand Down Expand Up @@ -30,16 +37,13 @@ static struct sock* bp_alloc_socket(struct net* net, int kern)
bp = bp_sk(sk);
skb_queue_head_init(&bp->rx_queue);
init_waitqueue_head(&bp->rx_waitq);
init_waitqueue_head(&bp->tx_waitq);

mutex_init(&bp->tx_mutex);
mutex_init(&bp->rx_mutex);

bp->bp_node_id = 0;
bp->bp_service_id = 0;
bp->rx_canceled = false;
bp->tx_confirmed = false;
bp->tx_error = false;
sk->sk_rcvtimeo
= MAX_SCHEDULE_TIMEOUT; /* Default: no timeout */
}

return sk;
Expand All @@ -61,12 +65,12 @@ struct proto_ops bp_proto_ops = { .family = AF_BP,
.mmap = sock_no_mmap,
.accept = sock_no_accept,
.getname = sock_no_getname,
// .poll = datagram_poll,
// .poll = bp_poll,
.ioctl = sock_no_ioctl,
.listen = sock_no_listen,
.shutdown = sock_no_shutdown,
.setsockopt = sock_common_setsockopt,
.getsockopt = sock_common_getsockopt,
.setsockopt = bp_setsockopt,
.getsockopt = bp_getsockopt,
.sendmsg = bp_sendmsg,
.recvmsg = bp_recvmsg };

Expand Down Expand Up @@ -169,7 +173,11 @@ int bp_bind(struct socket* sock, struct sockaddr* uaddr, int addr_len)

// Notify user-space daemon to open endpoint (bp_open) and prepare
// threads/state
open_endpoint_doit(node_id, service_id, 8443);
ret = open_endpoint_doit(node_id, service_id, 8443);
if (ret < 0) {
pr_err("bp_bind: open_endpoint_doit failed (%d)\n", ret);
goto out;
}

return 0;

Expand Down Expand Up @@ -298,42 +306,9 @@ int bp_sendmsg(struct socket* sock, struct msghdr* msg, size_t size)
goto err_free;
}

// Wait for confirmation from daemon
pr_info("bp_sendmsg: waiting for confirmation for endpoint "
"ipn:%u.%u\n",
bp->bp_node_id, bp->bp_service_id);

// Reset confirmation flag and wait
mutex_lock(&bp->tx_mutex);
bp->tx_confirmed = false;
bp->tx_error = false;
mutex_unlock(&bp->tx_mutex);

ret = wait_event_interruptible(
bp->tx_waitq, bp->tx_confirmed || signal_pending(current));

if (ret < 0) {
pr_err("bp_sendmsg: interrupted while waiting for "
"confirmation\n");
goto err_free;
}

mutex_lock(&bp->tx_mutex);
if (bp->tx_confirmed) {
if (bp->tx_error) {
pr_err("bp_sendmsg: bundle send failed for "
"endpoint ipn:%u.%u\n",
bp->bp_node_id, bp->bp_service_id);
mutex_unlock(&bp->tx_mutex);
ret = -EIO; // Return error to user space
goto err_free;
} else {
pr_info("bp_sendmsg: confirmation received for "
"endpoint ipn:%u.%u\n",
bp->bp_node_id, bp->bp_service_id);
}
}
mutex_unlock(&bp->tx_mutex);
pr_info("bp_sendmsg: bundle sent for endpoint ipn:%u.%u (size: "
"%zu)\n",
bp->bp_node_id, bp->bp_service_id, size);

kfree(payload);
}
Expand All @@ -352,9 +327,11 @@ int bp_recvmsg(struct socket* sock, struct msghdr* msg, size_t size, int flags)
struct bp_sock* bp;
struct sk_buff* skb = NULL;
struct sockaddr_bp* src_addr;
long timeo;
int ret;

sk = sock->sk;
timeo = sock_rcvtimeo(sk, flags & MSG_DONTWAIT);
lock_sock(sk);
bp = bp_sk(sk);

Expand All @@ -364,27 +341,18 @@ int bp_recvmsg(struct socket* sock, struct msghdr* msg, size_t size, int flags)
goto out;
}

ret = request_bundle_doit(bp->bp_node_id, bp->bp_service_id, 8443);
if (ret < 0) {
pr_err("bp_recvmsg: request_bundle_doit failed (%d)\n", ret);
goto out;
}

ret = wait_event_interruptible(
bp->rx_waitq, !skb_queue_empty(&bp->rx_queue) || bp->rx_canceled);
if (ret < 0) {
pr_err("bp_recvmsg: interrupted while waiting\n");
ret = wait_event_interruptible_timeout(
bp->rx_waitq, !skb_queue_empty(&bp->rx_queue), timeo);
if (ret == 0) {
pr_err("bp_recvmsg: timeout waiting for message\n");
ret = -EAGAIN;
goto out;
}

mutex_lock(&bp->rx_mutex);
if (bp->rx_canceled) {
pr_info("bp_recvmsg: bundle request canceled\n");
mutex_unlock(&bp->rx_mutex);
ret = -ECANCELED;
if (ret == -ERESTARTSYS) {
pr_err("bp_recvmsg: interrupted by signal\n");
ret = -EINTR;
goto out;
}
mutex_unlock(&bp->rx_mutex);

if (sock_flag(sk, SOCK_DEAD)) {
pr_err("bp_recvmsg: socket closed while waiting\n");
Expand All @@ -395,10 +363,10 @@ int bp_recvmsg(struct socket* sock, struct msghdr* msg, size_t size, int flags)
mutex_lock(&bp->rx_mutex);
skb = skb_dequeue(&bp->rx_queue);
if (!skb) {
pr_info("bp_recvmsg: no messages in the queue for service %d\n",
bp->bp_service_id);
pr_info("bp_recvmsg: no messages in the queue for ipn:%u.%u\n",
bp->bp_node_id, bp->bp_service_id);
mutex_unlock(&bp->rx_mutex);
ret = -ENOMSG;
ret = -EAGAIN;
goto out;
}
mutex_unlock(&bp->rx_mutex);
Expand Down Expand Up @@ -427,11 +395,10 @@ int bp_recvmsg(struct socket* sock, struct msghdr* msg, size_t size, int flags)
goto out;
}

ret = destroy_bundle_doit(bp->bp_node_id, bp->bp_service_id, 8443);
ret = destroy_bundle_doit(BP_SKB_CB(skb)->adu, 8443);
if (ret < 0) {
pr_err(
"destroy_bundle_doit failed (%d), will retry later", ret);
// enqueue_retry(bp->bp_node_id, bp->bp_service_id);
pr_warn(
"bp_recvmsg: failed to destroy bundle, bundle may leak\n");
}

ret = skb->len;
Expand All @@ -442,3 +409,160 @@ int bp_recvmsg(struct socket* sock, struct msghdr* msg, size_t size, int flags)
release_sock(sk);
return ret;
}

int bp_setsockopt(struct socket* sock, int level, int optname, sockptr_t optval,
unsigned int optlen)
{
struct sock* sk = sock->sk;
struct bp_sock* bp;
int ret = 0;

if (level != SOL_SOCKET)
return sock_common_setsockopt(
sock, level, optname, optval, optlen);

lock_sock(sk);
bp = bp_sk(sk);

switch (optname) {

case SO_RCVTIMEO_OLD: {
struct __kernel_old_timeval tv;
unsigned long t;
if (optlen < sizeof(tv)) {
ret = -EINVAL;
break;
}
if (copy_from_sockptr(&tv, optval, sizeof(tv))) {
ret = -EFAULT;
break;
}

if (tv.tv_sec == 0 && tv.tv_usec == 0) {
t = 0;
} else if (tv.tv_sec < 0 || tv.tv_usec < 0
|| tv.tv_usec >= 1000000) {
ret = -EINVAL;
break;
} else {
u64 usec = (u64)tv.tv_sec * 1000000ULL + tv.tv_usec;
t = usecs_to_jiffies(usec);
}
sk->sk_rcvtimeo = t;
break;
}

case SO_RCVTIMEO_NEW: {
struct __kernel_timespec ts;
unsigned long t;
if (optlen < sizeof(ts)) {
ret = -EINVAL;
break;
}
if (copy_from_sockptr(&ts, optval, sizeof(ts))) {
ret = -EFAULT;
break;
}

if (ts.tv_sec == 0 && ts.tv_nsec == 0) {
t = 0;
} else if (ts.tv_sec < 0 || ts.tv_nsec < 0
|| ts.tv_nsec >= 1000000000L) {
ret = -EINVAL;
break;
} else {
struct timespec64 t64
= { .tv_sec = ts.tv_sec, .tv_nsec = ts.tv_nsec };
t = timespec64_to_jiffies(&t64);
}
sk->sk_rcvtimeo = t;
break;
}

default:
ret = sock_common_setsockopt(
sock, level, optname, optval, optlen);
break;
}

release_sock(sk);
return ret;
}

int bp_getsockopt(struct socket* sock, int level, int optname,
char __user* optval, int __user* optlen)
{
struct sock* sk = sock->sk;
struct bp_sock* bp;
int ret = 0, len;

if (level != SOL_SOCKET)
return sock_common_getsockopt(
sock, level, optname, optval, optlen);

if (get_user(len, optlen))
return -EFAULT;

lock_sock(sk);
bp = bp_sk(sk);

switch (optname) {

case SO_RCVTIMEO_OLD: {
struct __kernel_old_timeval tv;
unsigned long t = sk->sk_rcvtimeo; /* ou bp->rcv_timeout */

if (len < (int)sizeof(tv)) {
ret = -EINVAL;
break;
}

if (t == MAX_SCHEDULE_TIMEOUT) {
tv.tv_sec = 0;
tv.tv_usec = 0;
} else {
unsigned long msec = jiffies_to_msecs(t);
tv.tv_sec = msec / 1000;
tv.tv_usec = (msec % 1000) * 1000;
}

if (copy_to_user(optval, &tv, sizeof(tv))
|| put_user(sizeof(tv), optlen))
ret = -EFAULT;
break;
}

case SO_RCVTIMEO_NEW: {
struct __kernel_timespec ts;
unsigned long t = sk->sk_rcvtimeo;

if (len < (int)sizeof(ts)) {
ret = -EINVAL;
break;
}

if (t == MAX_SCHEDULE_TIMEOUT) {
ts.tv_sec = 0;
ts.tv_nsec = 0;
} else {
unsigned long msec = jiffies_to_msecs(t);
u64 ns = (u64)msec * 1000000ULL;
ts.tv_sec = div_u64(ns, 1000000000ULL);
ts.tv_nsec = ns % 1000000000ULL;
}

if (copy_to_user(optval, &ts, sizeof(ts))
|| put_user(sizeof(ts), optlen))
ret = -EFAULT;
break;
}

default:
ret = sock_common_getsockopt(
sock, level, optname, optval, optlen);
break;
}

release_sock(sk);
return ret;
}
12 changes: 5 additions & 7 deletions bp_socket/af_bp.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
struct bp_skb_cb {
u_int32_t src_node_id;
u_int32_t src_service_id;
uint64_t adu;
};

#define bp_sk(ptr) container_of(ptr, struct bp_sock, sk)
Expand All @@ -26,19 +27,16 @@ struct bp_sock {
struct mutex rx_mutex;
struct sk_buff_head rx_queue;
wait_queue_head_t rx_waitq;
bool rx_canceled;

// Transmission
struct mutex tx_mutex;
wait_queue_head_t tx_waitq;
bool tx_confirmed;
bool tx_error;
};

int bp_bind(struct socket* sock, struct sockaddr* addr, int addr_len);
int bp_create(struct net* net, struct socket* sock, int protocol, int kern);
int bp_release(struct socket* sock);
int bp_sendmsg(struct socket* sock, struct msghdr* msg, size_t size);
int bp_recvmsg(struct socket* sock, struct msghdr* msg, size_t size, int flags);
int bp_setsockopt(struct socket* sock, int level, int optname, sockptr_t optval,
unsigned int optlen);
int bp_getsockopt(struct socket* sock, int level, int optname,
char __user* optval, int __user* optlen);

#endif
Loading