diff --git a/src/AsyncTCP.cpp b/src/AsyncTCP.cpp index e9839e72..3a29790b 100644 --- a/src/AsyncTCP.cpp +++ b/src/AsyncTCP.cpp @@ -2,6 +2,7 @@ // Copyright 2016-2025 Hristo Gochkov, Mathieu Carbou, Emil Muratov #include "AsyncTCP.h" +#include "AsyncTCPSimpleIntrusiveList.h" #ifndef LIBRETINY #include @@ -52,6 +53,13 @@ extern "C" { // Required for: // https://github.com/espressif/arduino-esp32/blob/3.0.3/libraries/Network/src/NetworkInterface.cpp#L37-L47 +#if CONFIG_ASYNC_TCP_USE_WDT +#include "esp_task_wdt.h" +#define ASYNC_TCP_MAX_TASK_SLEEP (pdMS_TO_TICKS(1000 * CONFIG_ESP_TASK_WDT_TIMEOUT_S) / 4) +#else +#define ASYNC_TCP_MAX_TASK_SLEEP portMAX_DELAY +#endif + // https://github.com/espressif/arduino-esp32/issues/10526 namespace { #ifdef CONFIG_LWIP_TCPIP_CORE_LOCKING @@ -96,13 +104,13 @@ typedef enum { LWIP_TCP_FIN, LWIP_TCP_ERROR, LWIP_TCP_POLL, - LWIP_TCP_CLEAR, LWIP_TCP_ACCEPT, LWIP_TCP_CONNECTED, LWIP_TCP_DNS } lwip_tcp_event_t; -typedef struct { +struct lwip_tcp_event_packet_t { + lwip_tcp_event_packet_t *next; lwip_tcp_event_t event; AsyncClient *client; union { @@ -137,7 +145,9 @@ typedef struct { ip_addr_t addr; } dns; }; -} lwip_tcp_event_packet_t; + + inline lwip_tcp_event_packet_t(lwip_tcp_event_t _event, AsyncClient *_client) : next(nullptr), event(_event), client(_client){}; +}; // Detail class for interacting with AsyncClient internals, but without exposing the API class AsyncTCP_detail { @@ -153,7 +163,28 @@ class AsyncTCP_detail { static int8_t __attribute__((visibility("internal"))) tcp_accept(void *arg, tcp_pcb *pcb, int8_t err); }; -static QueueHandle_t _async_queue = NULL; +// Guard class for the global queue +namespace { + +static SemaphoreHandle_t _async_queue_mutex = nullptr; + +class queue_mutex_guard { + bool holds_mutex; + +public: + inline queue_mutex_guard() : holds_mutex(xSemaphoreTake(_async_queue_mutex, portMAX_DELAY)){}; + inline ~queue_mutex_guard() { + if (holds_mutex) { + xSemaphoreGive(_async_queue_mutex); + } + }; + inline explicit operator bool() const { + return holds_mutex; + }; +}; +} // anonymous namespace + +static SimpleIntrusiveList _async_queue; static TaskHandle_t _async_service_task_handle = NULL; static SemaphoreHandle_t _slots_lock = NULL; @@ -169,43 +200,32 @@ static uint32_t _closed_index = []() { return 1; }(); -static inline bool _init_async_event_queue() { - if (!_async_queue) { - _async_queue = xQueueCreate(CONFIG_ASYNC_TCP_QUEUE_SIZE, sizeof(lwip_tcp_event_packet_t *)); - if (!_async_queue) { - return false; - } +static void _free_event(lwip_tcp_event_packet_t *evpkt) { + if ((evpkt->event == LWIP_TCP_RECV) && (evpkt->recv.pb != nullptr)) { + pbuf_free(evpkt->recv.pb); } - return true; + delete evpkt; } -static inline bool _send_async_event(lwip_tcp_event_packet_t **e, TickType_t wait = portMAX_DELAY) { - return _async_queue && xQueueSend(_async_queue, e, wait) == pdPASS; +static inline void _send_async_event(lwip_tcp_event_packet_t *e) { + assert(e != nullptr); + _async_queue.push_back(e); + xTaskNotifyGive(_async_service_task_handle); } -static inline bool _prepend_async_event(lwip_tcp_event_packet_t **e, TickType_t wait = portMAX_DELAY) { - return _async_queue && xQueueSendToFront(_async_queue, e, wait) == pdPASS; +static inline void _prepend_async_event(lwip_tcp_event_packet_t *e) { + assert(e != nullptr); + _async_queue.push_front(e); + xTaskNotifyGive(_async_service_task_handle); } -static inline bool _get_async_event(lwip_tcp_event_packet_t **e) { - while (true) { - if (!_async_queue) { - break; - } - -#if CONFIG_ASYNC_TCP_USE_WDT - // need to return periodically to feed the dog - if (xQueueReceive(_async_queue, e, pdMS_TO_TICKS(1000)) != pdPASS) { - break; - } -#else - if (xQueueReceive(_async_queue, e, portMAX_DELAY) != pdPASS) { - break; - } -#endif +static inline lwip_tcp_event_packet_t *_get_async_event() { + queue_mutex_guard guard; + while (1) { + lwip_tcp_event_packet_t *e = _async_queue.pop_front(); - if ((*e)->event != LWIP_TCP_POLL) { - return true; + if ((!e) || (e->event != LWIP_TCP_POLL)) { + return e; } /* @@ -216,20 +236,11 @@ static inline bool _get_async_event(lwip_tcp_event_packet_t **e) { It won't be effective if user would run multiple simultaneous long running callbacks due to message interleaving. todo: implement some kind of fair dequeuing or (better) simply punish user for a bad designed callbacks by resetting hog connections */ - lwip_tcp_event_packet_t *next_pkt = NULL; - while (xQueuePeek(_async_queue, &next_pkt, 0) == pdPASS) { + for (lwip_tcp_event_packet_t *next_pkt = _async_queue.begin(); next_pkt && (next_pkt->client == e->client) && (next_pkt->event == LWIP_TCP_POLL); + next_pkt = _async_queue.begin()) { // if the next event that will come is a poll event for the same connection, we can discard it and continue - if (next_pkt->client == (*e)->client && next_pkt->event == LWIP_TCP_POLL) { - if (xQueueReceive(_async_queue, &next_pkt, 0) == pdPASS) { - free(next_pkt); - next_pkt = NULL; - log_d("coalescing polls, network congestion or async callbacks might be too slow!"); - continue; - } - } - - // quit while loop if next incoming event can't be discarded (not a poll event) - break; + _free_event(_async_queue.pop_front()); + log_d("coalescing polls, network congestion or async callbacks might be too slow!"); } /* @@ -241,73 +252,42 @@ static inline bool _get_async_event(lwip_tcp_event_packet_t **e) { Let's discard poll events processing using linear-increasing probability curve when queue size grows over 3/4 Poll events are periodic and connection could get another chance next time */ - if (uxQueueMessagesWaiting(_async_queue) > (rand() % CONFIG_ASYNC_TCP_QUEUE_SIZE / 4 + CONFIG_ASYNC_TCP_QUEUE_SIZE * 3 / 4)) { - free(*e); - *e = NULL; + if (_async_queue.size() > (rand() % CONFIG_ASYNC_TCP_QUEUE_SIZE / 4 + CONFIG_ASYNC_TCP_QUEUE_SIZE * 3 / 4)) { + _free_event(e); log_d("discarding poll due to queue congestion"); - continue; // continue main loop to dequeue next event which we know is not a poll event + continue; } - return true; // queue not nearly full, caller can process the poll event - } - return false; -} -static bool _remove_events_for_client(AsyncClient *client) { - if (!_async_queue) { - return false; + return e; } +} - lwip_tcp_event_packet_t *first_packet = NULL; - lwip_tcp_event_packet_t *packet = NULL; - - // figure out which is the first non-matching packet so we can keep the order - while (!first_packet) { - if (xQueueReceive(_async_queue, &first_packet, 0) != pdPASS) { - return false; - } - // discard packet if matching - if ((uintptr_t)first_packet->client == (uintptr_t)client) { - free(first_packet); - first_packet = NULL; - } else if (xQueueSend(_async_queue, &first_packet, 0) != pdPASS) { - // try to return first packet to the back of the queue - // we can't wait here if queue is full, because this call has been done from the only consumer task of this queue - // otherwise it would deadlock, we have to discard the event - free(first_packet); - first_packet = NULL; - return false; - } +static void _remove_events_for_client(AsyncClient *client) { + lwip_tcp_event_packet_t *removed_event_chain; + { + queue_mutex_guard guard; + removed_event_chain = _async_queue.remove_if([=](lwip_tcp_event_packet_t &pkt) { + return pkt.client == client; + }); } - while (xQueuePeek(_async_queue, &packet, 0) == pdPASS && packet != first_packet) { - if (xQueueReceive(_async_queue, &packet, 0) != pdPASS) { - return false; - } - if ((uintptr_t)packet->client == (uintptr_t)client) { - // remove matching event - free(packet); - packet = NULL; - // otherwise try to requeue it - } else if (xQueueSend(_async_queue, &packet, 0) != pdPASS) { - // we can't wait here if queue is full, because this call has been done from the only consumer task of this queue - // otherwise it would deadlock, we have to discard the event - free(packet); - packet = NULL; - return false; - } + size_t count = 0; + while (removed_event_chain) { + ++count; + auto t = removed_event_chain; + removed_event_chain = t->next; + _free_event(t); } - return true; -} +}; void AsyncTCP_detail::handle_async_event(lwip_tcp_event_packet_t *e) { if (e->client == NULL) { // do nothing when arg is NULL // ets_printf("event arg == NULL: 0x%08x\n", e->recv.pcb); - } else if (e->event == LWIP_TCP_CLEAR) { - _remove_events_for_client(e->client); } else if (e->event == LWIP_TCP_RECV) { // ets_printf("-R: 0x%08x\n", e->recv.pcb); e->client->_recv(e->recv.pcb, e->recv.pb, e->recv.err); + e->recv.pb = nullptr; // given to client } else if (e->event == LWIP_TCP_FIN) { // ets_printf("-F: 0x%08x\n", e->fin.pcb); e->client->_fin(e->fin.pcb, e->fin.err); @@ -330,7 +310,7 @@ void AsyncTCP_detail::handle_async_event(lwip_tcp_event_packet_t *e) { // ets_printf("D: 0x%08x %s = %s\n", e->client, e->dns.name, ipaddr_ntoa(&e->dns.addr)); e->client->_dns_found(&e->dns.addr); } - free((void *)(e)); + _free_event(e); } static void _async_service_task(void *pvParameters) { @@ -339,11 +319,17 @@ static void _async_service_task(void *pvParameters) { log_w("Failed to add async task to WDT"); } #endif - lwip_tcp_event_packet_t *packet = NULL; for (;;) { - if (_get_async_event(&packet)) { + while (auto packet = _get_async_event()) { AsyncTCP_detail::handle_async_event(packet); +#if CONFIG_ASYNC_TCP_USE_WDT + esp_task_wdt_reset(); +#endif } + // queue is empty + // DEBUG_PRINTF("Async task waiting 0x%08",(intptr_t)_async_queue_head); + ulTaskNotifyTake(pdTRUE, ASYNC_TCP_MAX_TASK_SLEEP); + // DEBUG_PRINTF("Async task woke = %d 0x%08x",q, (intptr_t)_async_queue_head); #if CONFIG_ASYNC_TCP_USE_WDT esp_task_wdt_reset(); #endif @@ -354,6 +340,7 @@ static void _async_service_task(void *pvParameters) { vTaskDelete(NULL); _async_service_task_handle = NULL; } + /* static void _stop_async_task(){ if(_async_service_task_handle){ @@ -379,9 +366,13 @@ static bool customTaskCreateUniversal( } static bool _start_async_task() { - if (!_init_async_event_queue()) { - return false; + if (!_async_queue_mutex) { + _async_queue_mutex = xSemaphoreCreateMutex(); + if (!_async_queue_mutex) { + return false; + } } + if (!_async_service_task_handle) { customTaskCreateUniversal( _async_service_task, "async_tcp", CONFIG_ASYNC_TCP_STACK_SIZE, NULL, CONFIG_ASYNC_TCP_PRIORITY, &_async_service_task_handle, CONFIG_ASYNC_TCP_RUNNING_CORE @@ -397,74 +388,74 @@ static bool _start_async_task() { * LwIP Callbacks * */ -static int8_t _tcp_clear_events(AsyncClient *client) { - lwip_tcp_event_packet_t *e = (lwip_tcp_event_packet_t *)malloc(sizeof(lwip_tcp_event_packet_t)); - if (!e) { - log_e("Failed to allocate event packet"); - return ERR_MEM; - } - e->event = LWIP_TCP_CLEAR; - e->client = client; - if (!_prepend_async_event(&e)) { - free((void *)(e)); - return ERR_TIMEOUT; +static void _bind_tcp_callbacks(tcp_pcb *pcb, AsyncClient *client) { + tcp_arg(pcb, client); + tcp_recv(pcb, &AsyncTCP_detail::tcp_recv); + tcp_sent(pcb, &AsyncTCP_detail::tcp_sent); + tcp_err(pcb, &AsyncTCP_detail::tcp_error); + tcp_poll(pcb, &AsyncTCP_detail::tcp_poll, CONFIG_ASYNC_TCP_POLL_TIMER); +} + +static void _reset_tcp_callbacks(tcp_pcb *pcb, AsyncClient *client) { + tcp_arg(pcb, NULL); + tcp_sent(pcb, NULL); + tcp_recv(pcb, NULL); + tcp_err(pcb, NULL); + tcp_poll(pcb, NULL, 0); + if (client) { + _remove_events_for_client(client); } - return ERR_OK; } static int8_t _tcp_connected(void *arg, tcp_pcb *pcb, int8_t err) { // ets_printf("+C: 0x%08x\n", pcb); - lwip_tcp_event_packet_t *e = (lwip_tcp_event_packet_t *)malloc(sizeof(lwip_tcp_event_packet_t)); + AsyncClient *client = reinterpret_cast(arg); + lwip_tcp_event_packet_t *e = new (std::nothrow) lwip_tcp_event_packet_t{LWIP_TCP_CONNECTED, client}; if (!e) { log_e("Failed to allocate event packet"); return ERR_MEM; } - e->event = LWIP_TCP_CONNECTED; - e->client = reinterpret_cast(arg); e->connected.pcb = pcb; e->connected.err = err; - if (!_prepend_async_event(&e)) { - free((void *)(e)); - return ERR_TIMEOUT; - } + queue_mutex_guard guard; + _send_async_event(e); return ERR_OK; } int8_t AsyncTCP_detail::tcp_poll(void *arg, struct tcp_pcb *pcb) { // throttle polling events queueing when event queue is getting filled up, let it handle _onack's - // log_d("qs:%u", uxQueueMessagesWaiting(_async_queue)); - if (uxQueueMessagesWaiting(_async_queue) > (rand() % CONFIG_ASYNC_TCP_QUEUE_SIZE / 2 + CONFIG_ASYNC_TCP_QUEUE_SIZE / 4)) { - log_d("throttling"); - return ERR_OK; + { + queue_mutex_guard guard; + // log_d("qs:%u", _async_queue.size()); + if (_async_queue.size() > (rand() % CONFIG_ASYNC_TCP_QUEUE_SIZE / 2 + CONFIG_ASYNC_TCP_QUEUE_SIZE / 4)) { + log_d("throttling"); + return ERR_OK; + } } // ets_printf("+P: 0x%08x\n", pcb); - lwip_tcp_event_packet_t *e = (lwip_tcp_event_packet_t *)malloc(sizeof(lwip_tcp_event_packet_t)); + AsyncClient *client = reinterpret_cast(arg); + lwip_tcp_event_packet_t *e = new (std::nothrow) lwip_tcp_event_packet_t{LWIP_TCP_POLL, client}; if (!e) { log_e("Failed to allocate event packet"); return ERR_MEM; } - e->event = LWIP_TCP_POLL; - e->client = reinterpret_cast(arg); e->poll.pcb = pcb; - // poll events are not critical 'cause those are repetitive, so we may not wait the queue in any case - if (!_send_async_event(&e, 0)) { - free((void *)(e)); - return ERR_TIMEOUT; - } + + queue_mutex_guard guard; + _send_async_event(e); return ERR_OK; } int8_t AsyncTCP_detail::tcp_recv(void *arg, struct tcp_pcb *pcb, struct pbuf *pb, int8_t err) { - lwip_tcp_event_packet_t *e = (lwip_tcp_event_packet_t *)malloc(sizeof(lwip_tcp_event_packet_t)); + AsyncClient *client = reinterpret_cast(arg); + lwip_tcp_event_packet_t *e = new (std::nothrow) lwip_tcp_event_packet_t{LWIP_TCP_RECV, client}; if (!e) { log_e("Failed to allocate event packet"); return ERR_MEM; } - e->client = reinterpret_cast(arg); if (pb) { // ets_printf("+R: 0x%08x\n", pcb); - e->event = LWIP_TCP_RECV; e->recv.pcb = pcb; e->recv.pb = pb; e->recv.err = err; @@ -474,30 +465,27 @@ int8_t AsyncTCP_detail::tcp_recv(void *arg, struct tcp_pcb *pcb, struct pbuf *pb e->fin.pcb = pcb; e->fin.err = err; // close the PCB in LwIP thread - reinterpret_cast(arg)->_lwip_fin(e->fin.pcb, e->fin.err); - } - if (!_send_async_event(&e)) { - free((void *)(e)); - return ERR_TIMEOUT; + client->_lwip_fin(e->fin.pcb, e->fin.err); } + + queue_mutex_guard guard; + _send_async_event(e); return ERR_OK; } int8_t AsyncTCP_detail::tcp_sent(void *arg, struct tcp_pcb *pcb, uint16_t len) { // ets_printf("+S: 0x%08x\n", pcb); - lwip_tcp_event_packet_t *e = (lwip_tcp_event_packet_t *)malloc(sizeof(lwip_tcp_event_packet_t)); + AsyncClient *client = reinterpret_cast(arg); + lwip_tcp_event_packet_t *e = new (std::nothrow) lwip_tcp_event_packet_t{LWIP_TCP_SENT, client}; if (!e) { log_e("Failed to allocate event packet"); return ERR_MEM; } - e->event = LWIP_TCP_SENT; - e->client = reinterpret_cast(arg); e->sent.pcb = pcb; e->sent.len = len; - if (!_send_async_event(&e)) { - free((void *)(e)); - return ERR_TIMEOUT; - } + + queue_mutex_guard guard; + _send_async_event(e); return ERR_OK; } @@ -505,49 +493,42 @@ void AsyncTCP_detail::tcp_error(void *arg, int8_t err) { // ets_printf("+E: 0x%08x\n", arg); AsyncClient *client = reinterpret_cast(arg); if (client && client->_pcb) { - tcp_arg(client->_pcb, NULL); - if (client->_pcb->state == LISTEN) { - ::tcp_sent(client->_pcb, NULL); - ::tcp_recv(client->_pcb, NULL); - ::tcp_err(client->_pcb, NULL); - ::tcp_poll(client->_pcb, NULL, 0); - } + _reset_tcp_callbacks(client->_pcb, client); client->_pcb = nullptr; client->_free_closed_slot(); } // enqueue event to be processed in the async task for the user callback - lwip_tcp_event_packet_t *e = (lwip_tcp_event_packet_t *)malloc(sizeof(lwip_tcp_event_packet_t)); + lwip_tcp_event_packet_t *e = new (std::nothrow) lwip_tcp_event_packet_t{LWIP_TCP_ERROR, client}; if (!e) { log_e("Failed to allocate event packet"); return; } - e->event = LWIP_TCP_ERROR; - e->client = client; e->error.err = err; - if (!_send_async_event(&e)) { - ::free((void *)(e)); - } + + queue_mutex_guard guard; + _send_async_event(e); } static void _tcp_dns_found(const char *name, struct ip_addr *ipaddr, void *arg) { - lwip_tcp_event_packet_t *e = (lwip_tcp_event_packet_t *)malloc(sizeof(lwip_tcp_event_packet_t)); + // ets_printf("+DNS: name=%s ipaddr=0x%08x arg=%x\n", name, ipaddr, arg); + auto client = reinterpret_cast(arg); + + lwip_tcp_event_packet_t *e = new (std::nothrow) lwip_tcp_event_packet_t{LWIP_TCP_DNS, client}; if (!e) { log_e("Failed to allocate event packet"); return; } - // ets_printf("+DNS: name=%s ipaddr=0x%08x arg=%x\n", name, ipaddr, arg); - e->event = LWIP_TCP_DNS; - e->client = reinterpret_cast(arg); + e->dns.name = name; if (ipaddr) { memcpy(&e->dns.addr, ipaddr, sizeof(struct ip_addr)); } else { memset(&e->dns.addr, 0, sizeof(e->dns.addr)); } - if (!_send_async_event(&e)) { - free((void *)(e)); - } + + queue_mutex_guard guard; + _send_async_event(e); } /* @@ -562,6 +543,7 @@ typedef struct { int8_t closed_slot; int8_t err; union { + AsyncClient *close; struct { const char *data; size_t size; @@ -652,18 +634,25 @@ static err_t _tcp_close_api(struct tcpip_api_call_data *api_call_msg) { tcp_api_call_t *msg = (tcp_api_call_t *)api_call_msg; msg->err = ERR_CONN; if (msg->closed_slot == INVALID_CLOSED_SLOT || !_closed_slots[msg->closed_slot]) { + // Unlike the other calls, this is not a direct wrapper of the LwIP function; + // we perform the AsyncClient teardown interlocked safely with the LwIP task. + _reset_tcp_callbacks(msg->pcb, msg->close); msg->err = tcp_close(msg->pcb); + if (msg->err != ERR_OK) { + tcp_abort(msg->pcb); + } } return msg->err; } -static esp_err_t _tcp_close(tcp_pcb *pcb, int8_t closed_slot) { +static esp_err_t _tcp_close(tcp_pcb *pcb, int8_t closed_slot, AsyncClient *client) { if (!pcb) { return ERR_CONN; } tcp_api_call_t msg; msg.pcb = pcb; msg.closed_slot = closed_slot; + msg.close = client; tcpip_api_call(_tcp_close_api, (struct tcpip_api_call_data *)&msg); return msg.err; } @@ -711,6 +700,12 @@ static esp_err_t _tcp_connect(tcp_pcb *pcb, int8_t closed_slot, ip_addr_t *addr, static err_t _tcp_bind_api(struct tcpip_api_call_data *api_call_msg) { tcp_api_call_t *msg = (tcp_api_call_t *)api_call_msg; msg->err = tcp_bind(msg->pcb, msg->bind.addr, msg->bind.port); + if (msg->err != ERR_OK) { + // Close the pcb on behalf of the server without an extra round-trip through the LwIP lock + if (tcp_close(msg->pcb) != ERR_OK) { + tcp_abort(msg->pcb); + } + } return msg->err; } @@ -758,11 +753,7 @@ AsyncClient::AsyncClient(tcp_pcb *pcb) _closed_slot = INVALID_CLOSED_SLOT; if (_pcb) { _rx_last_packet = millis(); - tcp_arg(_pcb, this); - tcp_recv(_pcb, &AsyncTCP_detail::tcp_recv); - tcp_sent(_pcb, &AsyncTCP_detail::tcp_sent); - tcp_err(_pcb, &AsyncTCP_detail::tcp_error); - tcp_poll(_pcb, &AsyncTCP_detail::tcp_poll, CONFIG_ASYNC_TCP_POLL_TIMER); + _bind_tcp_callbacks(_pcb, this); if (!_allocate_closed_slot()) { _close(); } @@ -859,11 +850,7 @@ bool AsyncClient::connect(ip_addr_t addr, uint16_t port) { log_e("pcb == NULL"); return false; } - tcp_arg(pcb, this); - tcp_err(pcb, &AsyncTCP_detail::tcp_error); - tcp_recv(pcb, &AsyncTCP_detail::tcp_recv); - tcp_sent(pcb, &AsyncTCP_detail::tcp_sent); - tcp_poll(pcb, &AsyncTCP_detail::tcp_poll, CONFIG_ASYNC_TCP_POLL_TIMER); + _bind_tcp_callbacks(pcb, this); } esp_err_t err = _tcp_connect(pcb, _closed_slot, &addr, port, (tcp_connected_fn)&_tcp_connected); @@ -1009,19 +996,7 @@ int8_t AsyncClient::_close() { // ets_printf("X: 0x%08x\n", (uint32_t)this); int8_t err = ERR_OK; if (_pcb) { - { - tcp_core_guard tcg; - tcp_arg(_pcb, NULL); - tcp_sent(_pcb, NULL); - tcp_recv(_pcb, NULL); - tcp_err(_pcb, NULL); - tcp_poll(_pcb, NULL, 0); - } - _tcp_clear_events(this); - err = _tcp_close(_pcb, _closed_slot); - if (err != ERR_OK) { - err = abort(); - } + _tcp_close(_pcb, _closed_slot, this); _free_closed_slot(); _pcb = NULL; if (_discard_cb) { @@ -1095,13 +1070,7 @@ int8_t AsyncClient::_lwip_fin(tcp_pcb *pcb, int8_t err) { log_d("0x%08" PRIx32 " != 0x%08" PRIx32, (uint32_t)pcb, (uint32_t)_pcb); return ERR_OK; } - tcp_arg(_pcb, NULL); - if (_pcb->state == LISTEN) { - tcp_sent(_pcb, NULL); - tcp_recv(_pcb, NULL); - tcp_err(_pcb, NULL); - tcp_poll(_pcb, NULL, 0); - } + _reset_tcp_callbacks(_pcb, this); if (tcp_close(_pcb) != ERR_OK) { tcp_abort(_pcb); } @@ -1112,7 +1081,6 @@ int8_t AsyncClient::_lwip_fin(tcp_pcb *pcb, int8_t err) { // In Async Thread int8_t AsyncClient::_fin(tcp_pcb *pcb, int8_t err) { - _tcp_clear_events(this); if (_discard_cb) { _discard_cb(_discard_cb_arg, this); } @@ -1587,7 +1555,7 @@ void AsyncServer::begin() { err = _tcp_bind(_pcb, &_addr, _port); if (err != ERR_OK) { - _tcp_close(_pcb, -1); + // pcb was closed by _tcp_bind _pcb = NULL; log_e("bind error: %d", err); return; @@ -1628,15 +1596,13 @@ int8_t AsyncTCP_detail::tcp_accept(void *arg, tcp_pcb *pcb, int8_t err) { if (c && c->pcb()) { c->setNoDelay(server->_noDelay); - lwip_tcp_event_packet_t *e = (lwip_tcp_event_packet_t *)malloc(sizeof(lwip_tcp_event_packet_t)); + lwip_tcp_event_packet_t *e = new (std::nothrow) lwip_tcp_event_packet_t{LWIP_TCP_ACCEPT, c}; if (e) { - e->event = LWIP_TCP_ACCEPT; e->accept.server = server; - e->client = c; - if (_prepend_async_event(&e)) { - return ERR_OK; // success - } - free((void *)(e)); + + queue_mutex_guard guard; + _prepend_async_event(e); + return ERR_OK; // success } // Couldn't allocate accept event diff --git a/src/AsyncTCPSimpleIntrusiveList.h b/src/AsyncTCPSimpleIntrusiveList.h new file mode 100644 index 00000000..589268ce --- /dev/null +++ b/src/AsyncTCPSimpleIntrusiveList.h @@ -0,0 +1,134 @@ +// Simple intrusive list class +#pragma once + +template class SimpleIntrusiveList { + static_assert(std::is_same().next), T *>::value, "Template type must have public 'T* next' member"); + +public: + typedef T value_type; + typedef value_type *value_ptr_type; + typedef value_ptr_type *value_ptr_ptr_type; + + // Static utility methods + static size_t list_size(value_ptr_type chain) { + size_t count = 0; + for (auto c = chain; c != nullptr; c = c->next) { + ++count; + } + return count; + } + + static void delete_list(value_ptr_type chain) { + while (chain) { + auto t = chain; + chain = chain->next; + delete t; + } + } + +public: + // Object methods + + SimpleIntrusiveList() : _head(nullptr), _tail(&_head) {} + ~SimpleIntrusiveList() { + clear(); + } + + // Noncopyable, nonmovable + SimpleIntrusiveList(const SimpleIntrusiveList &) = delete; + SimpleIntrusiveList(SimpleIntrusiveList &&) = delete; + SimpleIntrusiveList &operator=(const SimpleIntrusiveList &) = delete; + SimpleIntrusiveList &operator=(SimpleIntrusiveList &&) = delete; + + inline void push_back(value_ptr_type obj) { + if (obj) { + *_tail = obj; + _tail = &obj->next; + ++_size; + } + } + + inline void push_front(value_ptr_type obj) { + if (obj) { + if (_head == nullptr) { + _tail = &obj->next; + } + obj->next = _head; + _head = obj; + ++_size; + } + } + + inline value_ptr_type pop_front() { + auto rv = _head; + if (_head) { + if (_tail == &_head->next) { + _tail = &_head; + } + _head = _head->next; + --_size; + } + return rv; + } + + inline void clear() { + // Assumes all elements were allocated with "new" + delete_list(_head); + _head = nullptr; + _tail = &_head; + _size = 0; + } + + inline size_t size() const { + return _size; + } + + template inline value_ptr_type remove_if(const function_type &condition) { + value_ptr_type removed = nullptr; + value_ptr_ptr_type current_ptr = &_head; + while (*current_ptr != nullptr) { + value_ptr_type current = *current_ptr; + if (condition(*current)) { + // Remove this item from the list by moving the next item in + *current_ptr = current->next; + // If we were the last item, reset tail + if (current->next == nullptr) { + _tail = current_ptr; + } + --_size; + // Prepend this item to the removed list + current->next = removed; + removed = current; + // do not advance current_ptr + } else { + // advance current_ptr + current_ptr = &(*current_ptr)->next; + } + } + + // Return the removed entries + return removed; + } + + inline value_ptr_type begin() const { + return _head; + } + + bool validate_tail() const { + if (_head == nullptr) { + return (_tail == &_head); + } + auto p = _head; + while (p->next != nullptr) { + p = p->next; + } + return _tail == &p->next; + } + +private: + // Data members + value_ptr_type _head; + value_ptr_ptr_type _tail; + size_t _size; + +}; // class simple_intrusive_list