Skip to content

Commit

Permalink
close work?
Browse files Browse the repository at this point in the history
  • Loading branch information
spacemeowx2 committed Sep 14, 2018
1 parent 701a1b7 commit 3cb5eae
Show file tree
Hide file tree
Showing 4 changed files with 117 additions and 38 deletions.
4 changes: 3 additions & 1 deletion lwip/custom/lwipopts.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,9 @@

#define MEMP_NUM_TCP_PCB_LISTEN 16
#define MEMP_NUM_TCP_PCB 1024
#define TCP_MSS 1460
#ifndef TCP_MSS
#define TCP_MSS 512
#endif
#define TCP_SND_BUF 16384
#define TCP_SND_QUEUELEN (4 * (TCP_SND_BUF)/(TCP_MSS))

Expand Down
72 changes: 55 additions & 17 deletions src/gateway.c
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,16 @@
typedef struct {
uv_tcp_t dtcp;
uvl_tcp_t stcp;
int dclosed;
int sclosed;
} conn_t;
static struct packet_ctx *g_gateway_send_packet_ctx;

struct uvl_pipe_req {
conn_t *conn;
uv_buf_t buf;
};

// lwip TCP listener
struct tcp_pcb *listener;

Expand Down Expand Up @@ -78,42 +85,68 @@ void addr_from_lwip(void *ip, const ip_addr_t *ip_addr)
}
}

void conn_free(conn_t *conn)
{
if (conn->sclosed && conn->dclosed) {
free(conn);
}
}

void close_cb(uvl_tcp_t *client)
{
puts("close_cb");
conn_t *conn = client->data;
conn->sclosed = 1;
conn_free(conn);
}

void p_close_cb(uv_handle_t *handle)
{
puts("p_close_cb");
conn_t *conn = handle->data;
conn->dclosed = 1;
conn_free(conn);
}

static void conn_kill(conn_t *conn)
{
assert(conn);
// uvl_tcp_close(&conn->stcp, close_cb);
// uv_close((uv_handle_t *)&conn->dtcp, p_close_cb);
if (!conn->sclosed) {
uvl_read_stop(&conn->stcp);
uvl_tcp_close(&conn->stcp, close_cb);
}
if (!conn->dclosed) {
uv_read_stop((uv_stream_t *)&conn->dtcp);
uv_close((uv_handle_t *)&conn->dtcp, p_close_cb);
}
}

void p_write_cb(uv_write_t *req, int status)
{
conn_t *conn = req->data;
struct uvl_pipe_req *r = req->data;
conn_t *conn = r->conn;
if (status != 0) {
printf("p_write_cb %d %s\n", status, uv_strerror(status));
}

free(r->buf.base);
free(req);

assert(uvl_read_start(&conn->stcp, alloc_cb, read_cb) == 0);
}

void write_cb(uvl_write_t *req, int status)
{
conn_t *conn = req->data;
struct uvl_pipe_req *r = req->data;
conn_t *conn = r->conn;
if (status) {
printf("write_cb %d\n", status);
}

free(r->buf.base);
free(req);

assert(uv_read_start((uv_stream_t *)&conn->dtcp, p_alloc_cb, p_read_cb) == 0);
uv_read_start((uv_stream_t *)&conn->dtcp, p_alloc_cb, p_read_cb);
}

void p_read_cb(uv_stream_t *handle, ssize_t nread, const uv_buf_t *buf)
Expand All @@ -129,14 +162,15 @@ void p_read_cb(uv_stream_t *handle, ssize_t nread, const uv_buf_t *buf)
return;
}

uvl_write_t *req = malloc(sizeof(uvl_write_t) + sizeof(uv_buf_t));
uv_buf_t *b = (uv_buf_t *)((uint8_t *)req + sizeof(*req));
uvl_write_t *req = malloc(sizeof(uvl_write_t) + sizeof(struct uvl_pipe_req));
struct uvl_pipe_req *r = (struct uvl_pipe_req *)((uint8_t *)req + sizeof(*req));

b->base = buf->base;
b->len = nread;
req->data = conn;
r->conn = conn;
r->buf.base = buf->base;
r->buf.len = nread;
req->data = r;

uvl_write(req, &conn->stcp, b, 1, write_cb);
uvl_write(req, &conn->stcp, &r->buf, 1, write_cb);
}

void read_cb(uvl_tcp_t *handle, ssize_t nread, const uv_buf_t *buf)
Expand All @@ -149,14 +183,16 @@ void read_cb(uvl_tcp_t *handle, ssize_t nread, const uv_buf_t *buf)
}
uvl_read_stop(handle);

uv_write_t *req = malloc(sizeof(uv_write_t) + sizeof(uv_buf_t));
uv_buf_t *b = (uv_buf_t *)((uint8_t *)req + sizeof(*req));

b->base = buf->base;
b->len = nread;
req->data = conn;
uv_write_t *req = malloc(sizeof(uv_write_t) + sizeof(struct uvl_pipe_req));
struct uvl_pipe_req *r = (struct uvl_pipe_req *)((uint8_t *)req + sizeof(*req));

r->conn = conn;
r->buf.base = buf->base;
r->buf.len = nread;
req->data = r;

uv_write(req, (uv_stream_t *)&conn->dtcp, b, 1, p_write_cb);
uv_write(req, (uv_stream_t *)&conn->dtcp, &r->buf, 1, p_write_cb);
}

void p_alloc_cb(uv_handle_t *handle, size_t suggested_size, uv_buf_t* buf)
Expand Down Expand Up @@ -197,6 +233,8 @@ void on_connect(uvl_t *handle, int status)
conn->stcp.data = conn;
conn->dtcp.data = conn;
req->data = conn;
conn->sclosed = 0;
conn->dclosed = 0;

assert(uvl_tcp_init(handle->loop, client) == 0);
assert(uvl_accept(handle, client) == 0);
Expand Down
75 changes: 56 additions & 19 deletions src/uv_lwip.c
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
#include <lwip/init.h>

#include "uv_lwip.h"
#include <base/llog.h>
#include <string.h>
#include <lwip/init.h>
#include <lwip/ip.h>
#include <lwip/ip_addr.h>
#include <lwip/priv/tcp_priv.h>
#include <lwip/tcp.h>
#include <lwip/ip4_frag.h>
#include <lwip/nd6.h>
#include <lwip/ip6_frag.h>
#include <string.h>

#define _DEBUG
// #define _DEBUG
#define UVL_TCP_RECV_BUF_LEN TCP_WND
#define UVL_TCP_SEND_BUF_LEN 8192

Expand Down Expand Up @@ -54,7 +55,7 @@ static void uvl_async_tcp_read_cb(uv_async_t *req)
struct uvl_tcp_buf *buf = client->buf;
uv_buf_t b;
int nnread = 0;
ASSERT(buf->recv_used < sizeof(buf->recv_buf))
ASSERT(buf->recv_used <= sizeof(buf->recv_buf))

if (client->read_cb) {
if (buf->recv_used > 0) {
Expand All @@ -74,12 +75,31 @@ static void uvl_async_tcp_read_cb(uv_async_t *req)
}
}

static void uvl_cancel_reqs(uvl_tcp_t *client)
{
uvl_write_t *req = client->cur_write;

while (req)
{
req->write_cb(req, UV_ECANCELED);

req = req->next;
}

client->cur_write = NULL;
client->tail_write = NULL;
}

// TODO: complete this function
static void uvl_async_tcp_write_cb(uv_async_t *write_req)
{
uvl_tcp_t *client = (uvl_tcp_t *)write_req->data;

LLOG(LLOG_DEBUG, "%p uvl_async_tcp_write_cb", client);
if (client->closed) {
uvl_cancel_reqs(client);
return;
}

uvl_imp_write_to_tcp(client);
}
Expand Down Expand Up @@ -150,12 +170,15 @@ static void uvl_imp_write_to_tcp(uvl_tcp_t *client)
break;
}
req = req->next;
#ifdef _DEBUG
i++;
#endif
}

LLOG(LLOG_DEBUG, "%p block %d / %d", client, i, total);
while (req) {
#ifdef _DEBUG
LLOG(LLOG_DEBUG, "%p block %d / %d", client, i, total);
#endif
int ret = uvl_imp_write_buf_to_tcp(client, req);
if (ret == 0) {
break;
Expand All @@ -165,7 +188,6 @@ static void uvl_imp_write_to_tcp(uvl_tcp_t *client)
}
if (req->sent_bufs == req->send_nbufs) {
req = req->next;
i++;
}
}

Expand Down Expand Up @@ -199,9 +221,9 @@ static err_t uvl_client_abort (uvl_tcp_t *client)
ASSERT(!client->closed)

// // remove callbacks
// tcp_err(client->pcb, NULL);
// tcp_recv(client->pcb, NULL);
// tcp_sent(client->pcb, NULL);
tcp_err(client->pcb, NULL);
tcp_recv(client->pcb, NULL);
tcp_sent(client->pcb, NULL);

// abort
tcp_abort(client->pcb);
Expand All @@ -215,6 +237,11 @@ static err_t uvl_client_close_func (uvl_tcp_t *client)
{
ASSERT(!client->closed)

tcp_err(client->pcb, NULL);
tcp_recv(client->pcb, NULL);
tcp_sent(client->pcb, NULL);


err_t err = tcp_close(client->pcb);

if (err == ERR_OK) {
Expand Down Expand Up @@ -250,7 +277,7 @@ static err_t uvl_client_recv_func (void *arg, struct tcp_pcb *tpcb, struct pbuf
sizeof(buf->recv_buf),
(sizeof(buf->recv_buf) - buf->recv_used));

return uvl_client_abort(client);
return ERR_MEM;
}

LLOG(LLOG_DEBUG, "[+] recv_buf %d + %d / %d", buf->recv_used, p->tot_len, sizeof(buf->recv_buf));
Expand Down Expand Up @@ -456,12 +483,6 @@ int uvl_write(uvl_write_t *req, uvl_tcp_t *client, const uv_buf_t bufs[], unsign
{
req->client = client;

if (client->closed) {
cb(req, UV_ECANCELED);
return 0;
}
ASSERT(client->pcb)

int i;

req->send_bufs = bufs;
Expand Down Expand Up @@ -517,6 +538,7 @@ int uvl_accept(uvl_t *handle, uvl_tcp_t *client)

client->sent_bytes = 0;
client->recv_bytes = 0;
client->closed_handle = 0;

tcp_arg(newpcb, client);

Expand Down Expand Up @@ -610,15 +632,30 @@ int uvl_tcp_init(uv_loop_t *loop, uvl_tcp_t *client)
return 0;
}

int uvl_tcp_close(uvl_tcp_t *client, uvl_tcp_close_cb close_cb)
static void uvl_tcp_close_handle_cb(uv_handle_t *handle)
{
// ASSERT(client->close_cb == NULL)
uvl_tcp_t *client = handle->data;

client->closed_handle++;
if (client->closed_handle == 2) {
client->close_cb(client);
}
}

int uvl_tcp_close(uvl_tcp_t *client, uvl_tcp_close_cb cb)
{
ASSERT(client->close_cb == NULL || client->close_cb == cb)

uvl_cancel_reqs(client);

uv_close((uv_handle_t *)&client->read_req, uvl_tcp_close_handle_cb);
uv_close((uv_handle_t *)&client->write_req, uvl_tcp_close_handle_cb);

if (!client->closed) {
uvl_client_close_func(client);
}

close_cb(client);
client->close_cb = cb;

return 0;
}
Expand Down
4 changes: 3 additions & 1 deletion src/uv_lwip.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,15 @@ struct uvl_tcp {
int closed;
uvl_alloc_tcp_cb alloc_cb;
uvl_read_cb read_cb;
uvl_close_cb close_cb;
uvl_tcp_close_cb close_cb;
struct tcp_pcb *pcb;
struct sockaddr_in local_addr;
struct sockaddr_in remote_addr;

uint32_t sent_bytes;
uint32_t recv_bytes;

int closed_handle;
};
struct uvl_write {
UVL_FIELDS
Expand Down

0 comments on commit 3cb5eae

Please sign in to comment.