diff --git a/lwip/custom/lwipopts.h b/lwip/custom/lwipopts.h index 51a9c23..8cad13d 100644 --- a/lwip/custom/lwipopts.h +++ b/lwip/custom/lwipopts.h @@ -61,7 +61,9 @@ #define MEMP_NUM_TCP_PCB_LISTEN 16 #define MEMP_NUM_TCP_PCB 1024 -#define TCP_MSS 1460 +#ifndef TCP_MSS +#define TCP_MSS 512 +#endif #define TCP_SND_BUF 16384 #define TCP_SND_QUEUELEN (4 * (TCP_SND_BUF)/(TCP_MSS)) diff --git a/src/gateway.c b/src/gateway.c index 61150b3..23de552 100644 --- a/src/gateway.c +++ b/src/gateway.c @@ -24,9 +24,16 @@ typedef struct { uv_tcp_t dtcp; uvl_tcp_t stcp; + int dclosed; + int sclosed; } conn_t; static struct packet_ctx *g_gateway_send_packet_ctx; +struct uvl_pipe_req { + conn_t *conn; + uv_buf_t buf; +}; + // lwip TCP listener struct tcp_pcb *listener; @@ -78,28 +85,51 @@ void addr_from_lwip(void *ip, const ip_addr_t *ip_addr) } } +void conn_free(conn_t *conn) +{ + if (conn->sclosed && conn->dclosed) { + free(conn); + } +} + void close_cb(uvl_tcp_t *client) { puts("close_cb"); + conn_t *conn = client->data; + conn->sclosed = 1; + conn_free(conn); } + void p_close_cb(uv_handle_t *handle) { puts("p_close_cb"); + conn_t *conn = handle->data; + conn->dclosed = 1; + conn_free(conn); } static void conn_kill(conn_t *conn) { assert(conn); - // uvl_tcp_close(&conn->stcp, close_cb); - // uv_close((uv_handle_t *)&conn->dtcp, p_close_cb); + if (!conn->sclosed) { + uvl_read_stop(&conn->stcp); + uvl_tcp_close(&conn->stcp, close_cb); + } + if (!conn->dclosed) { + uv_read_stop((uv_stream_t *)&conn->dtcp); + uv_close((uv_handle_t *)&conn->dtcp, p_close_cb); + } } void p_write_cb(uv_write_t *req, int status) { - conn_t *conn = req->data; + struct uvl_pipe_req *r = req->data; + conn_t *conn = r->conn; if (status != 0) { printf("p_write_cb %d %s\n", status, uv_strerror(status)); } + + free(r->buf.base); free(req); assert(uvl_read_start(&conn->stcp, alloc_cb, read_cb) == 0); @@ -107,13 +137,16 @@ void p_write_cb(uv_write_t *req, int status) void write_cb(uvl_write_t *req, int status) { - conn_t *conn = req->data; + struct uvl_pipe_req *r = req->data; + conn_t *conn = r->conn; if (status) { printf("write_cb %d\n", status); } + + free(r->buf.base); free(req); - assert(uv_read_start((uv_stream_t *)&conn->dtcp, p_alloc_cb, p_read_cb) == 0); + uv_read_start((uv_stream_t *)&conn->dtcp, p_alloc_cb, p_read_cb); } void p_read_cb(uv_stream_t *handle, ssize_t nread, const uv_buf_t *buf) @@ -129,14 +162,15 @@ void p_read_cb(uv_stream_t *handle, ssize_t nread, const uv_buf_t *buf) return; } - uvl_write_t *req = malloc(sizeof(uvl_write_t) + sizeof(uv_buf_t)); - uv_buf_t *b = (uv_buf_t *)((uint8_t *)req + sizeof(*req)); + uvl_write_t *req = malloc(sizeof(uvl_write_t) + sizeof(struct uvl_pipe_req)); + struct uvl_pipe_req *r = (struct uvl_pipe_req *)((uint8_t *)req + sizeof(*req)); - b->base = buf->base; - b->len = nread; - req->data = conn; + r->conn = conn; + r->buf.base = buf->base; + r->buf.len = nread; + req->data = r; - uvl_write(req, &conn->stcp, b, 1, write_cb); + uvl_write(req, &conn->stcp, &r->buf, 1, write_cb); } void read_cb(uvl_tcp_t *handle, ssize_t nread, const uv_buf_t *buf) @@ -149,14 +183,16 @@ void read_cb(uvl_tcp_t *handle, ssize_t nread, const uv_buf_t *buf) } uvl_read_stop(handle); - uv_write_t *req = malloc(sizeof(uv_write_t) + sizeof(uv_buf_t)); - uv_buf_t *b = (uv_buf_t *)((uint8_t *)req + sizeof(*req)); - b->base = buf->base; - b->len = nread; - req->data = conn; + uv_write_t *req = malloc(sizeof(uv_write_t) + sizeof(struct uvl_pipe_req)); + struct uvl_pipe_req *r = (struct uvl_pipe_req *)((uint8_t *)req + sizeof(*req)); + + r->conn = conn; + r->buf.base = buf->base; + r->buf.len = nread; + req->data = r; - uv_write(req, (uv_stream_t *)&conn->dtcp, b, 1, p_write_cb); + uv_write(req, (uv_stream_t *)&conn->dtcp, &r->buf, 1, p_write_cb); } void p_alloc_cb(uv_handle_t *handle, size_t suggested_size, uv_buf_t* buf) @@ -197,6 +233,8 @@ void on_connect(uvl_t *handle, int status) conn->stcp.data = conn; conn->dtcp.data = conn; req->data = conn; + conn->sclosed = 0; + conn->dclosed = 0; assert(uvl_tcp_init(handle->loop, client) == 0); assert(uvl_accept(handle, client) == 0); diff --git a/src/uv_lwip.c b/src/uv_lwip.c index 6a0666b..efd474a 100644 --- a/src/uv_lwip.c +++ b/src/uv_lwip.c @@ -1,6 +1,8 @@ -#include + #include "uv_lwip.h" #include +#include +#include #include #include #include @@ -8,9 +10,8 @@ #include #include #include -#include -#define _DEBUG +// #define _DEBUG #define UVL_TCP_RECV_BUF_LEN TCP_WND #define UVL_TCP_SEND_BUF_LEN 8192 @@ -54,7 +55,7 @@ static void uvl_async_tcp_read_cb(uv_async_t *req) struct uvl_tcp_buf *buf = client->buf; uv_buf_t b; int nnread = 0; - ASSERT(buf->recv_used < sizeof(buf->recv_buf)) + ASSERT(buf->recv_used <= sizeof(buf->recv_buf)) if (client->read_cb) { if (buf->recv_used > 0) { @@ -74,12 +75,31 @@ static void uvl_async_tcp_read_cb(uv_async_t *req) } } +static void uvl_cancel_reqs(uvl_tcp_t *client) +{ + uvl_write_t *req = client->cur_write; + + while (req) + { + req->write_cb(req, UV_ECANCELED); + + req = req->next; + } + + client->cur_write = NULL; + client->tail_write = NULL; +} + // TODO: complete this function static void uvl_async_tcp_write_cb(uv_async_t *write_req) { uvl_tcp_t *client = (uvl_tcp_t *)write_req->data; LLOG(LLOG_DEBUG, "%p uvl_async_tcp_write_cb", client); + if (client->closed) { + uvl_cancel_reqs(client); + return; + } uvl_imp_write_to_tcp(client); } @@ -150,12 +170,15 @@ static void uvl_imp_write_to_tcp(uvl_tcp_t *client) break; } req = req->next; +#ifdef _DEBUG i++; +#endif } - LLOG(LLOG_DEBUG, "%p block %d / %d", client, i, total); while (req) { +#ifdef _DEBUG LLOG(LLOG_DEBUG, "%p block %d / %d", client, i, total); +#endif int ret = uvl_imp_write_buf_to_tcp(client, req); if (ret == 0) { break; @@ -165,7 +188,6 @@ static void uvl_imp_write_to_tcp(uvl_tcp_t *client) } if (req->sent_bufs == req->send_nbufs) { req = req->next; - i++; } } @@ -199,9 +221,9 @@ static err_t uvl_client_abort (uvl_tcp_t *client) ASSERT(!client->closed) // // remove callbacks -// tcp_err(client->pcb, NULL); -// tcp_recv(client->pcb, NULL); -// tcp_sent(client->pcb, NULL); + tcp_err(client->pcb, NULL); + tcp_recv(client->pcb, NULL); + tcp_sent(client->pcb, NULL); // abort tcp_abort(client->pcb); @@ -215,6 +237,11 @@ static err_t uvl_client_close_func (uvl_tcp_t *client) { ASSERT(!client->closed) + tcp_err(client->pcb, NULL); + tcp_recv(client->pcb, NULL); + tcp_sent(client->pcb, NULL); + + err_t err = tcp_close(client->pcb); if (err == ERR_OK) { @@ -250,7 +277,7 @@ static err_t uvl_client_recv_func (void *arg, struct tcp_pcb *tpcb, struct pbuf sizeof(buf->recv_buf), (sizeof(buf->recv_buf) - buf->recv_used)); - return uvl_client_abort(client); + return ERR_MEM; } LLOG(LLOG_DEBUG, "[+] recv_buf %d + %d / %d", buf->recv_used, p->tot_len, sizeof(buf->recv_buf)); @@ -456,12 +483,6 @@ int uvl_write(uvl_write_t *req, uvl_tcp_t *client, const uv_buf_t bufs[], unsign { req->client = client; - if (client->closed) { - cb(req, UV_ECANCELED); - return 0; - } - ASSERT(client->pcb) - int i; req->send_bufs = bufs; @@ -517,6 +538,7 @@ int uvl_accept(uvl_t *handle, uvl_tcp_t *client) client->sent_bytes = 0; client->recv_bytes = 0; + client->closed_handle = 0; tcp_arg(newpcb, client); @@ -610,15 +632,30 @@ int uvl_tcp_init(uv_loop_t *loop, uvl_tcp_t *client) return 0; } -int uvl_tcp_close(uvl_tcp_t *client, uvl_tcp_close_cb close_cb) +static void uvl_tcp_close_handle_cb(uv_handle_t *handle) { - // ASSERT(client->close_cb == NULL) + uvl_tcp_t *client = handle->data; + + client->closed_handle++; + if (client->closed_handle == 2) { + client->close_cb(client); + } +} + +int uvl_tcp_close(uvl_tcp_t *client, uvl_tcp_close_cb cb) +{ + ASSERT(client->close_cb == NULL || client->close_cb == cb) + + uvl_cancel_reqs(client); + + uv_close((uv_handle_t *)&client->read_req, uvl_tcp_close_handle_cb); + uv_close((uv_handle_t *)&client->write_req, uvl_tcp_close_handle_cb); if (!client->closed) { uvl_client_close_func(client); } - close_cb(client); + client->close_cb = cb; return 0; } diff --git a/src/uv_lwip.h b/src/uv_lwip.h index 3ed66dd..a6926e7 100644 --- a/src/uv_lwip.h +++ b/src/uv_lwip.h @@ -52,13 +52,15 @@ struct uvl_tcp { int closed; uvl_alloc_tcp_cb alloc_cb; uvl_read_cb read_cb; - uvl_close_cb close_cb; + uvl_tcp_close_cb close_cb; struct tcp_pcb *pcb; struct sockaddr_in local_addr; struct sockaddr_in remote_addr; uint32_t sent_bytes; uint32_t recv_bytes; + + int closed_handle; }; struct uvl_write { UVL_FIELDS