Skip to content

Commit

Permalink
tcp works well
Browse files Browse the repository at this point in the history
  • Loading branch information
spacemeowx2 committed Sep 14, 2018
1 parent 3cb5eae commit c7c9514
Show file tree
Hide file tree
Showing 5 changed files with 93 additions and 61 deletions.
61 changes: 30 additions & 31 deletions src/gateway.c
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,14 @@ typedef struct {
uvl_tcp_t stcp;
int dclosed;
int sclosed;

uvl_write_t uvl_req;
uv_buf_t uvl_buf;
uv_write_t uv_req;
uv_buf_t uv_buf;
} conn_t;
static struct packet_ctx *g_gateway_send_packet_ctx;

struct uvl_pipe_req {
conn_t *conn;
uv_buf_t buf;
};

// lwip TCP listener
struct tcp_pcb *listener;

Expand Down Expand Up @@ -88,7 +88,8 @@ void addr_from_lwip(void *ip, const ip_addr_t *ip_addr)
void conn_free(conn_t *conn)
{
if (conn->sclosed && conn->dclosed) {
free(conn);
LLOG(LLOG_DEBUG, "conn_kill %p done", conn);
// free(conn);
}
}

Expand All @@ -111,6 +112,7 @@ void p_close_cb(uv_handle_t *handle)
static void conn_kill(conn_t *conn)
{
assert(conn);
LLOG(LLOG_DEBUG, "conn_kill %p", conn);
if (!conn->sclosed) {
uvl_read_stop(&conn->stcp);
uvl_tcp_close(&conn->stcp, close_cb);
Expand All @@ -123,30 +125,29 @@ static void conn_kill(conn_t *conn)

void p_write_cb(uv_write_t *req, int status)
{
struct uvl_pipe_req *r = req->data;
conn_t *conn = r->conn;
conn_t *conn = req->data;
if (status != 0) {
printf("p_write_cb %d %s\n", status, uv_strerror(status));
}

free(r->buf.base);
free(req);
free(conn->uv_buf.base);

assert(uvl_read_start(&conn->stcp, alloc_cb, read_cb) == 0);
}

void write_cb(uvl_write_t *req, int status)
{
struct uvl_pipe_req *r = req->data;
conn_t *conn = r->conn;
conn_t *conn = req->data;
if (status) {
printf("write_cb %d\n", status);
}

free(r->buf.base);
free(req);
free(conn->uvl_buf.base);

uv_read_start((uv_stream_t *)&conn->dtcp, p_alloc_cb, p_read_cb);
int ret = uv_read_start((uv_stream_t *)&conn->dtcp, p_alloc_cb, p_read_cb);
if (ret) {
LLOG(LLOG_ERROR, "write_cb uv_read_start %d %s", ret, uv_strerror(ret));
}
}

void p_read_cb(uv_stream_t *handle, ssize_t nread, const uv_buf_t *buf)
Expand All @@ -156,43 +157,38 @@ void p_read_cb(uv_stream_t *handle, ssize_t nread, const uv_buf_t *buf)
printf("p_read_cb %d %s\n", nread, uv_strerror(nread));
return;
}
uv_read_stop(handle);

conn_t *conn = handle->data;
if (nread <= 0) {
conn_kill(conn);
return;
}

uvl_write_t *req = malloc(sizeof(uvl_write_t) + sizeof(struct uvl_pipe_req));
struct uvl_pipe_req *r = (struct uvl_pipe_req *)((uint8_t *)req + sizeof(*req));
uvl_write_t *req = &conn->uvl_req;

r->conn = conn;
r->buf.base = buf->base;
r->buf.len = nread;
req->data = r;
conn->uvl_buf.base = buf->base;
conn->uvl_buf.len = nread;

uvl_write(req, &conn->stcp, &r->buf, 1, write_cb);
uvl_write(req, &conn->stcp, &conn->uvl_buf, 1, write_cb);
}

void read_cb(uvl_tcp_t *handle, ssize_t nread, const uv_buf_t *buf)
{
LLOG(LLOG_DEBUG, "read_cb %d", nread);
conn_t *conn = handle->data;
if (nread <= 0) {
LLOG(LLOG_DEBUG, "read_cb %d", nread);
conn_kill(conn);
return;
}
uvl_read_stop(handle);

uv_write_t *req = &conn->uv_req;

uv_write_t *req = malloc(sizeof(uv_write_t) + sizeof(struct uvl_pipe_req));
struct uvl_pipe_req *r = (struct uvl_pipe_req *)((uint8_t *)req + sizeof(*req));
conn->uv_buf.base = buf->base;
conn->uv_buf.len = nread;

r->conn = conn;
r->buf.base = buf->base;
r->buf.len = nread;
req->data = r;

uv_write(req, (uv_stream_t *)&conn->dtcp, &r->buf, 1, p_write_cb);
uv_write(req, (uv_stream_t *)&conn->dtcp, &conn->uv_buf, 1, p_write_cb);
}

void p_alloc_cb(uv_handle_t *handle, size_t suggested_size, uv_buf_t* buf)
Expand Down Expand Up @@ -236,6 +232,9 @@ void on_connect(uvl_t *handle, int status)
conn->sclosed = 0;
conn->dclosed = 0;

conn->uv_req.data = conn;
conn->uvl_req.data = conn;

assert(uvl_tcp_init(handle->loop, client) == 0);
assert(uvl_accept(handle, client) == 0);

Expand Down
26 changes: 16 additions & 10 deletions src/main.c
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ struct {
char *password;
char *password_file;
} options;
struct lan_play real_lan_play;

uint8_t SEND_BUFFER[BUFFER_SIZE];

Expand Down Expand Up @@ -96,6 +97,10 @@ void init_pcap(struct lan_play *lan_play, void *mac)
break;
}
}
if (d == NULL) {
LLOG(LLOG_ERROR, "failed to find --netif: %s", options.netif);
exit(1);
}
}

printf("Opening %s\n", d->name);
Expand Down Expand Up @@ -303,6 +308,7 @@ void lan_play_libpcap_thread(void *data)
void lan_play_get_packet_async_cb(uv_async_t *async)
{
struct lan_play *lan_play = (struct lan_play *)async->data;
assert(lan_play == &real_lan_play);

get_packet(&lan_play->packet_ctx, lan_play->pkthdr, lan_play->packet);

Expand Down Expand Up @@ -334,10 +340,10 @@ int lan_play_gateway_send_packet(struct packet_ctx *packet_ctx, const void *data
int main(int argc, char **argv)
{
char relay_server_addr[128] = { 0 };
struct lan_play lan_play;
struct lan_play *lan_play = &real_lan_play;
int ret;

lan_play.loop = &lan_play.real_loop;
lan_play->loop = &lan_play->real_loop;

if (parse_arguments(argc, argv) != 0) {
LLOG(LLOG_ERROR, "Failed to parse arguments");
Expand All @@ -361,19 +367,19 @@ int main(int argc, char **argv)
options.relay_server_addr = relay_server_addr;
}

if (parse_addr(options.relay_server_addr, &lan_play.server_addr) != 0) {
if (parse_addr(options.relay_server_addr, &lan_play->server_addr) != 0) {
LLOG(LLOG_ERROR, "Failed to parse and get ip address. --relay-server-addr: %s", options.relay_server_addr);
return -1;
}

assert(uv_loop_init(lan_play.loop) == 0);
assert(uv_async_init(lan_play.loop, &lan_play.get_packet_async, lan_play_get_packet_async_cb) == 0);
assert(uv_sem_init(&lan_play.get_packet_sem, 0) == 0);
lan_play.get_packet_async.data = &lan_play;
assert(uv_loop_init(lan_play->loop) == 0);
assert(uv_async_init(lan_play->loop, &lan_play->get_packet_async, lan_play_get_packet_async_cb) == 0);
assert(uv_sem_init(&lan_play->get_packet_sem, 0) == 0);
lan_play->get_packet_async.data = lan_play;

assert(lan_play_init(&lan_play) == 0);
assert(lan_play_init(lan_play) == 0);

ret = uv_thread_create(&lan_play.libpcap_thread, lan_play_libpcap_thread, &lan_play);
ret = uv_thread_create(&lan_play->libpcap_thread, lan_play_libpcap_thread, lan_play);

return uv_run(lan_play.loop, UV_RUN_DEFAULT);
return uv_run(lan_play->loop, UV_RUN_DEFAULT);
}
3 changes: 3 additions & 0 deletions src/packet.c
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
#include "lan-play.h"
#include <assert.h>

void *g_debug1 = (void *)0x1234;
void *g_debug2 = (void *)0x12345;

int send_payloads(
struct packet_ctx *self,
const struct payload *payload
Expand Down
26 changes: 18 additions & 8 deletions src/proxy.c
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,13 @@
#include "ipv4/ipv4.h"
#include <assert.h>
#include <base/llog.h>

#if 1
#define malloc(size) ({ \
void *__ptr = malloc(size); \
LLOG(LLOG_DEBUG, "[malloc] %p %d %d", __ptr, size, __LINE__); \
__ptr; \
})
#endif
static void proxy_alloc_cb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf)
{
buf->base = malloc(suggested_size);
Expand Down Expand Up @@ -76,8 +82,8 @@ static uv_udp_t *proxy_udp_get(struct proxy *proxy, uint8_t src[4], uint16_t src
item->expire_at = now + PROXY_UDP_TABLE_TTL;
item->proxy = proxy;

uv_udp_init(proxy->loop, item->udp);
uv_udp_recv_start(item->udp, proxy_alloc_cb, proxy_udp_recv_cb);
assert(uv_udp_init(proxy->loop, item->udp) == 0);
assert(uv_udp_recv_start(item->udp, proxy_alloc_cb, proxy_udp_recv_cb) == 0);
item->udp->data = item;

CPY_IPV4(item->src, src);
Expand All @@ -101,16 +107,20 @@ static int direct_udp(struct proxy *proxy, uint8_t src[4], uint16_t srcport, uin
}

uv_udp_send_t *req = (uv_udp_send_t *)malloc(sizeof(uv_udp_send_t));
uv_buf_t *buf = (uv_buf_t *)malloc(sizeof(uv_buf_t));
req->data = malloc(data_len);
memcpy(req->data, data, data_len);

uv_buf_t buf;
struct sockaddr_in addr;

buf->base = (char *)data;
buf->len = data_len;
req->data = buf;
buf.base = (char *)req->data;
buf.len = data_len;

addr.sin_family = AF_INET;
CPY_IPV4(&addr.sin_addr, dst);
addr.sin_port = htons(dstport);
return uv_udp_send(req, udp, buf, 1, (struct sockaddr *)&addr, proxy_udp_send_cb);

return uv_udp_send(req, udp, &buf, 1, (struct sockaddr *)&addr, proxy_udp_send_cb);
}

static proxy_tcp_t *direct_tcp_new(struct proxy *proxy)
Expand Down
38 changes: 26 additions & 12 deletions src/uv_lwip.c
Original file line number Diff line number Diff line change
Expand Up @@ -78,24 +78,28 @@ static void uvl_async_tcp_read_cb(uv_async_t *req)
static void uvl_cancel_reqs(uvl_tcp_t *client)
{
uvl_write_t *req = client->cur_write;
uvl_write_t *next;

while (req)
{
next = req->next;
req->write_cb(req, UV_ECANCELED);

req = req->next;
req = next;
}

client->cur_write = NULL;
client->tail_write = NULL;

if (client->read_cb) {
client->read_cb = NULL;
}
}

// TODO: complete this function
static void uvl_async_tcp_write_cb(uv_async_t *write_req)
{
uvl_tcp_t *client = (uvl_tcp_t *)write_req->data;

LLOG(LLOG_DEBUG, "%p uvl_async_tcp_write_cb", client);
if (client->closed) {
uvl_cancel_reqs(client);
return;
Expand Down Expand Up @@ -241,7 +245,6 @@ static err_t uvl_client_close_func (uvl_tcp_t *client)
tcp_recv(client->pcb, NULL);
tcp_sent(client->pcb, NULL);


err_t err = tcp_close(client->pcb);

if (err == ERR_OK) {
Expand Down Expand Up @@ -296,6 +299,7 @@ static err_t uvl_client_sent_func (void *arg, struct tcp_pcb *tpcb, u16_t len)
{
uvl_tcp_t *client = (uvl_tcp_t *)arg;
uvl_write_t *req = client->cur_write;
uvl_write_t *next = NULL;

#ifdef _DEBUG
int total = 0;
Expand All @@ -322,19 +326,18 @@ static err_t uvl_client_sent_func (void *arg, struct tcp_pcb *tpcb, u16_t len)
req = client->cur_write;

while (req && (req->total_sent == req->total_len) && (req->pending == 0)) {
next = req->next; // save before it be freed
// should call the callback
req->write_cb(req, 0);

req = req->next;
req = next;
}

client->cur_write = req;

if (client->cur_write == NULL) {
client->tail_write = NULL;
LLOG(LLOG_DEBUG, "%p uvl_client_sent_func cur_write null", client);
} else {
LLOG(LLOG_DEBUG, "%p uvl_client_sent_func uv_async_send", client);
int ret = uv_async_send(&client->write_req);
if (ret) {
LLOG(LLOG_ERROR, "sent_func async_send");
Expand Down Expand Up @@ -508,7 +511,6 @@ int uvl_write(uvl_write_t *req, uvl_tcp_t *client, const uv_buf_t bufs[], unsign
req->total_len += bufs[i].len;
}

LLOG(LLOG_DEBUG, "uvl_write uv_async_send");
return uv_async_send(&client->write_req);
}

Expand Down Expand Up @@ -639,22 +641,34 @@ static void uvl_tcp_close_handle_cb(uv_handle_t *handle)
client->closed_handle++;
if (client->closed_handle == 2) {
client->close_cb(client);
client->close_cb = NULL;

client->loop = NULL;
client->handle = NULL;
free(client->buf);
client->buf = NULL;
client->cur_write = NULL;
client->tail_write = NULL;
client->alloc_cb = NULL;
client->read_cb = NULL;
client->pcb = NULL;
client->closed_handle = 0;
}
}

int uvl_tcp_close(uvl_tcp_t *client, uvl_tcp_close_cb cb)
{
ASSERT(client->close_cb == NULL || client->close_cb == cb)

if (!client->closed) {
uvl_client_close_func(client);
}

uvl_cancel_reqs(client);

uv_close((uv_handle_t *)&client->read_req, uvl_tcp_close_handle_cb);
uv_close((uv_handle_t *)&client->write_req, uvl_tcp_close_handle_cb);

if (!client->closed) {
uvl_client_close_func(client);
}

client->close_cb = cb;

return 0;
Expand Down

0 comments on commit c7c9514

Please sign in to comment.