From 58e632812cc74643080049ca3b840ac85c2ac284 Mon Sep 17 00:00:00 2001 From: Will Miles Date: Tue, 22 Apr 2025 19:20:00 -0400 Subject: [PATCH 1/7] Use simple intrusive linked list for queue Use a simple intrusive list for the event queue. The ultimate goal here is to arrange that certain kinds of events (errors) can be guaranteed to be queued, as client objects will leak if they are discarded. As a secondary improvement, there are some operations (peeking, remove_if) that can be more efficient as we can hold the queue lock for longer. This commit is a straight replacement and does not attempt any logic changes. --- src/AsyncTCP.cpp | 297 ++++++++++++++---------------- src/AsyncTCPSimpleIntrusiveList.h | 128 +++++++++++++ 2 files changed, 265 insertions(+), 160 deletions(-) create mode 100644 src/AsyncTCPSimpleIntrusiveList.h diff --git a/src/AsyncTCP.cpp b/src/AsyncTCP.cpp index 8689572..21a4103 100644 --- a/src/AsyncTCP.cpp +++ b/src/AsyncTCP.cpp @@ -2,6 +2,7 @@ // Copyright 2016-2025 Hristo Gochkov, Mathieu Carbou, Emil Muratov #include "AsyncTCP.h" +#include "AsyncTCPSimpleIntrusiveList.h" #include @@ -39,6 +40,13 @@ extern "C" { // Required for: // https://github.com/espressif/arduino-esp32/blob/3.0.3/libraries/Network/src/NetworkInterface.cpp#L37-L47 +#if CONFIG_ASYNC_TCP_USE_WDT +#include "esp_task_wdt.h" +#define ASYNC_TCP_MAX_TASK_SLEEP (pdMS_TO_TICKS(1000 * CONFIG_ESP_TASK_WDT_TIMEOUT_S) / 4) +#else +#define ASYNC_TCP_MAX_TASK_SLEEP portMAX_DELAY +#endif + // https://github.com/espressif/arduino-esp32/issues/10526 namespace { #ifdef CONFIG_LWIP_TCPIP_CORE_LOCKING @@ -89,7 +97,8 @@ typedef enum { LWIP_TCP_DNS } lwip_tcp_event_t; -typedef struct { +struct lwip_tcp_event_packet_t { + lwip_tcp_event_packet_t *next; lwip_tcp_event_t event; AsyncClient *client; union { @@ -124,7 +133,9 @@ typedef struct { ip_addr_t addr; } dns; }; -} lwip_tcp_event_packet_t; + + inline lwip_tcp_event_packet_t(lwip_tcp_event_t _event, AsyncClient *_client) : next(nullptr), event(_event), client(_client){}; +}; // Detail class for interacting with AsyncClient internals, but without exposing the API class AsyncTCP_detail { @@ -140,7 +151,32 @@ class AsyncTCP_detail { static int8_t __attribute__((visibility("internal"))) tcp_accept(void *arg, tcp_pcb *pcb, int8_t err); }; -static QueueHandle_t _async_queue = NULL; +// Guard class for the global queue +namespace { +class queue_mutex_guard { + // Create-on-first-use idiom for an embedded mutex + static SemaphoreHandle_t _async_queue_mutex() { + static SemaphoreHandle_t mutex = xSemaphoreCreateMutex(); + assert(mutex != nullptr); + return mutex; + }; + + bool holds_mutex; + +public: + inline queue_mutex_guard() : holds_mutex(xSemaphoreTake(_async_queue_mutex(), portMAX_DELAY)){}; + inline ~queue_mutex_guard() { + if (holds_mutex) { + xSemaphoreGive(_async_queue_mutex()); + } + }; + inline explicit operator bool() const { + return holds_mutex; + }; +}; +} // anonymous namespace + +static SimpleIntrusiveList _async_queue; static TaskHandle_t _async_service_task_handle = NULL; static SemaphoreHandle_t _slots_lock = NULL; @@ -156,43 +192,32 @@ static uint32_t _closed_index = []() { return 1; }(); -static inline bool _init_async_event_queue() { - if (!_async_queue) { - _async_queue = xQueueCreate(CONFIG_ASYNC_TCP_QUEUE_SIZE, sizeof(lwip_tcp_event_packet_t *)); - if (!_async_queue) { - return false; - } +static void _free_event(lwip_tcp_event_packet_t *evpkt) { + if ((evpkt->event == LWIP_TCP_RECV) && (evpkt->recv.pb != nullptr)) { + pbuf_free(evpkt->recv.pb); } - return true; + delete evpkt; } -static inline bool _send_async_event(lwip_tcp_event_packet_t **e, TickType_t wait = portMAX_DELAY) { - return _async_queue && xQueueSend(_async_queue, e, wait) == pdPASS; +static inline void _send_async_event(lwip_tcp_event_packet_t *e) { + assert(e != nullptr); + _async_queue.push_back(e); + xTaskNotifyGive(_async_service_task_handle); } -static inline bool _prepend_async_event(lwip_tcp_event_packet_t **e, TickType_t wait = portMAX_DELAY) { - return _async_queue && xQueueSendToFront(_async_queue, e, wait) == pdPASS; +static inline void _prepend_async_event(lwip_tcp_event_packet_t *e) { + assert(e != nullptr); + _async_queue.push_front(e); + xTaskNotifyGive(_async_service_task_handle); } -static inline bool _get_async_event(lwip_tcp_event_packet_t **e) { - while (true) { - if (!_async_queue) { - break; - } - -#if CONFIG_ASYNC_TCP_USE_WDT - // need to return periodically to feed the dog - if (xQueueReceive(_async_queue, e, pdMS_TO_TICKS(1000)) != pdPASS) { - break; - } -#else - if (xQueueReceive(_async_queue, e, portMAX_DELAY) != pdPASS) { - break; - } -#endif +static inline lwip_tcp_event_packet_t *_get_async_event() { + queue_mutex_guard guard; + while (1) { + lwip_tcp_event_packet_t *e = _async_queue.pop_front(); - if ((*e)->event != LWIP_TCP_POLL) { - return true; + if ((!e) || (e->event != LWIP_TCP_POLL)) { + return e; } /* @@ -203,20 +228,11 @@ static inline bool _get_async_event(lwip_tcp_event_packet_t **e) { It won't be effective if user would run multiple simultaneous long running callbacks due to message interleaving. todo: implement some kind of fair dequeuing or (better) simply punish user for a bad designed callbacks by resetting hog connections */ - lwip_tcp_event_packet_t *next_pkt = NULL; - while (xQueuePeek(_async_queue, &next_pkt, 0) == pdPASS) { + for (lwip_tcp_event_packet_t *next_pkt = _async_queue.begin(); next_pkt && (next_pkt->client == e->client) && (next_pkt->event == LWIP_TCP_POLL); + next_pkt = _async_queue.begin()) { // if the next event that will come is a poll event for the same connection, we can discard it and continue - if (next_pkt->client == (*e)->client && next_pkt->event == LWIP_TCP_POLL) { - if (xQueueReceive(_async_queue, &next_pkt, 0) == pdPASS) { - free(next_pkt); - next_pkt = NULL; - log_d("coalescing polls, network congestion or async callbacks might be too slow!"); - continue; - } - } - - // quit while loop if next incoming event can't be discarded (not a poll event) - break; + _free_event(_async_queue.pop_front()); + log_d("coalescing polls, network congestion or async callbacks might be too slow!"); } /* @@ -228,63 +244,33 @@ static inline bool _get_async_event(lwip_tcp_event_packet_t **e) { Let's discard poll events processing using linear-increasing probability curve when queue size grows over 3/4 Poll events are periodic and connection could get another chance next time */ - if (uxQueueMessagesWaiting(_async_queue) > (rand() % CONFIG_ASYNC_TCP_QUEUE_SIZE / 4 + CONFIG_ASYNC_TCP_QUEUE_SIZE * 3 / 4)) { - free(*e); - *e = NULL; + if (_async_queue.size() > (rand() % CONFIG_ASYNC_TCP_QUEUE_SIZE / 4 + CONFIG_ASYNC_TCP_QUEUE_SIZE * 3 / 4)) { + _free_event(e); log_d("discarding poll due to queue congestion"); - continue; // continue main loop to dequeue next event which we know is not a poll event + continue; } - return true; // queue not nearly full, caller can process the poll event - } - return false; -} -static bool _remove_events_for_client(AsyncClient *client) { - if (!_async_queue) { - return false; + return e; } +} - lwip_tcp_event_packet_t *first_packet = NULL; - lwip_tcp_event_packet_t *packet = NULL; - - // figure out which is the first non-matching packet so we can keep the order - while (!first_packet) { - if (xQueueReceive(_async_queue, &first_packet, 0) != pdPASS) { - return false; - } - // discard packet if matching - if ((uintptr_t)first_packet->client == (uintptr_t)client) { - free(first_packet); - first_packet = NULL; - } else if (xQueueSend(_async_queue, &first_packet, 0) != pdPASS) { - // try to return first packet to the back of the queue - // we can't wait here if queue is full, because this call has been done from the only consumer task of this queue - // otherwise it would deadlock, we have to discard the event - free(first_packet); - first_packet = NULL; - return false; - } +static void _remove_events_for_client(AsyncClient *client) { + lwip_tcp_event_packet_t *removed_event_chain; + { + queue_mutex_guard guard; + removed_event_chain = _async_queue.remove_if([=](lwip_tcp_event_packet_t &pkt) { + return pkt.client == client; + }); } - while (xQueuePeek(_async_queue, &packet, 0) == pdPASS && packet != first_packet) { - if (xQueueReceive(_async_queue, &packet, 0) != pdPASS) { - return false; - } - if ((uintptr_t)packet->client == (uintptr_t)client) { - // remove matching event - free(packet); - packet = NULL; - // otherwise try to requeue it - } else if (xQueueSend(_async_queue, &packet, 0) != pdPASS) { - // we can't wait here if queue is full, because this call has been done from the only consumer task of this queue - // otherwise it would deadlock, we have to discard the event - free(packet); - packet = NULL; - return false; - } + size_t count = 0; + while (removed_event_chain) { + ++count; + auto t = removed_event_chain; + removed_event_chain = t->next; + _free_event(t); } - return true; -} +}; void AsyncTCP_detail::handle_async_event(lwip_tcp_event_packet_t *e) { if (e->client == NULL) { @@ -295,6 +281,7 @@ void AsyncTCP_detail::handle_async_event(lwip_tcp_event_packet_t *e) { } else if (e->event == LWIP_TCP_RECV) { // ets_printf("-R: 0x%08x\n", e->recv.pcb); e->client->_recv(e->recv.pcb, e->recv.pb, e->recv.err); + e->recv.pb = nullptr; // given to client } else if (e->event == LWIP_TCP_FIN) { // ets_printf("-F: 0x%08x\n", e->fin.pcb); e->client->_fin(e->fin.pcb, e->fin.err); @@ -317,7 +304,7 @@ void AsyncTCP_detail::handle_async_event(lwip_tcp_event_packet_t *e) { // ets_printf("D: 0x%08x %s = %s\n", e->client, e->dns.name, ipaddr_ntoa(&e->dns.addr)); e->client->_dns_found(&e->dns.addr); } - free((void *)(e)); + _free_event(e); } static void _async_service_task(void *pvParameters) { @@ -326,11 +313,17 @@ static void _async_service_task(void *pvParameters) { log_w("Failed to add async task to WDT"); } #endif - lwip_tcp_event_packet_t *packet = NULL; for (;;) { - if (_get_async_event(&packet)) { + while (auto packet = _get_async_event()) { AsyncTCP_detail::handle_async_event(packet); +#if CONFIG_ASYNC_TCP_USE_WDT + esp_task_wdt_reset(); +#endif } + // queue is empty + // DEBUG_PRINTF("Async task waiting 0x%08",(intptr_t)_async_queue_head); + ulTaskNotifyTake(pdTRUE, ASYNC_TCP_MAX_TASK_SLEEP); + // DEBUG_PRINTF("Async task woke = %d 0x%08x",q, (intptr_t)_async_queue_head); #if CONFIG_ASYNC_TCP_USE_WDT esp_task_wdt_reset(); #endif @@ -341,6 +334,7 @@ static void _async_service_task(void *pvParameters) { vTaskDelete(NULL); _async_service_task_handle = NULL; } + /* static void _stop_async_task(){ if(_async_service_task_handle){ @@ -366,9 +360,6 @@ static bool customTaskCreateUniversal( } static bool _start_async_task() { - if (!_init_async_event_queue()) { - return false; - } if (!_async_service_task_handle) { customTaskCreateUniversal( _async_service_task, "async_tcp", CONFIG_ASYNC_TCP_STACK_SIZE, NULL, CONFIG_ASYNC_TCP_PRIORITY, &_async_service_task_handle, CONFIG_ASYNC_TCP_RUNNING_CORE @@ -385,73 +376,65 @@ static bool _start_async_task() { * */ static int8_t _tcp_clear_events(AsyncClient *client) { - lwip_tcp_event_packet_t *e = (lwip_tcp_event_packet_t *)malloc(sizeof(lwip_tcp_event_packet_t)); + lwip_tcp_event_packet_t *e = new (std::nothrow) lwip_tcp_event_packet_t{LWIP_TCP_CLEAR, client}; if (!e) { log_e("Failed to allocate event packet"); return ERR_MEM; } - e->event = LWIP_TCP_CLEAR; - e->client = client; - if (!_prepend_async_event(&e)) { - free((void *)(e)); - return ERR_TIMEOUT; - } + queue_mutex_guard guard; + _prepend_async_event(e); return ERR_OK; } static int8_t _tcp_connected(void *arg, tcp_pcb *pcb, int8_t err) { // ets_printf("+C: 0x%08x\n", pcb); - lwip_tcp_event_packet_t *e = (lwip_tcp_event_packet_t *)malloc(sizeof(lwip_tcp_event_packet_t)); + AsyncClient *client = reinterpret_cast(arg); + lwip_tcp_event_packet_t *e = new (std::nothrow) lwip_tcp_event_packet_t{LWIP_TCP_CONNECTED, client}; if (!e) { log_e("Failed to allocate event packet"); return ERR_MEM; } - e->event = LWIP_TCP_CONNECTED; - e->client = reinterpret_cast(arg); e->connected.pcb = pcb; e->connected.err = err; - if (!_prepend_async_event(&e)) { - free((void *)(e)); - return ERR_TIMEOUT; - } + queue_mutex_guard guard; + _send_async_event(e); return ERR_OK; } int8_t AsyncTCP_detail::tcp_poll(void *arg, struct tcp_pcb *pcb) { // throttle polling events queueing when event queue is getting filled up, let it handle _onack's - // log_d("qs:%u", uxQueueMessagesWaiting(_async_queue)); - if (uxQueueMessagesWaiting(_async_queue) > (rand() % CONFIG_ASYNC_TCP_QUEUE_SIZE / 2 + CONFIG_ASYNC_TCP_QUEUE_SIZE / 4)) { - log_d("throttling"); - return ERR_OK; + { + queue_mutex_guard guard; + // log_d("qs:%u", _async_queue.size()); + if (_async_queue.size() > (rand() % CONFIG_ASYNC_TCP_QUEUE_SIZE / 2 + CONFIG_ASYNC_TCP_QUEUE_SIZE / 4)) { + log_d("throttling"); + return ERR_OK; + } } // ets_printf("+P: 0x%08x\n", pcb); - lwip_tcp_event_packet_t *e = (lwip_tcp_event_packet_t *)malloc(sizeof(lwip_tcp_event_packet_t)); + AsyncClient *client = reinterpret_cast(arg); + lwip_tcp_event_packet_t *e = new (std::nothrow) lwip_tcp_event_packet_t{LWIP_TCP_POLL, client}; if (!e) { log_e("Failed to allocate event packet"); return ERR_MEM; } - e->event = LWIP_TCP_POLL; - e->client = reinterpret_cast(arg); e->poll.pcb = pcb; - // poll events are not critical 'cause those are repetitive, so we may not wait the queue in any case - if (!_send_async_event(&e, 0)) { - free((void *)(e)); - return ERR_TIMEOUT; - } + + queue_mutex_guard guard; + _send_async_event(e); return ERR_OK; } int8_t AsyncTCP_detail::tcp_recv(void *arg, struct tcp_pcb *pcb, struct pbuf *pb, int8_t err) { - lwip_tcp_event_packet_t *e = (lwip_tcp_event_packet_t *)malloc(sizeof(lwip_tcp_event_packet_t)); + AsyncClient *client = reinterpret_cast(arg); + lwip_tcp_event_packet_t *e = new (std::nothrow) lwip_tcp_event_packet_t{LWIP_TCP_RECV, client}; if (!e) { log_e("Failed to allocate event packet"); return ERR_MEM; } - e->client = reinterpret_cast(arg); if (pb) { // ets_printf("+R: 0x%08x\n", pcb); - e->event = LWIP_TCP_RECV; e->recv.pcb = pcb; e->recv.pb = pb; e->recv.err = err; @@ -463,28 +446,25 @@ int8_t AsyncTCP_detail::tcp_recv(void *arg, struct tcp_pcb *pcb, struct pbuf *pb // close the PCB in LwIP thread reinterpret_cast(arg)->_lwip_fin(e->fin.pcb, e->fin.err); } - if (!_send_async_event(&e)) { - free((void *)(e)); - return ERR_TIMEOUT; - } + + queue_mutex_guard guard; + _send_async_event(e); return ERR_OK; } int8_t AsyncTCP_detail::tcp_sent(void *arg, struct tcp_pcb *pcb, uint16_t len) { // ets_printf("+S: 0x%08x\n", pcb); - lwip_tcp_event_packet_t *e = (lwip_tcp_event_packet_t *)malloc(sizeof(lwip_tcp_event_packet_t)); + AsyncClient *client = reinterpret_cast(arg); + lwip_tcp_event_packet_t *e = new (std::nothrow) lwip_tcp_event_packet_t{LWIP_TCP_SENT, client}; if (!e) { log_e("Failed to allocate event packet"); return ERR_MEM; } - e->event = LWIP_TCP_SENT; - e->client = reinterpret_cast(arg); e->sent.pcb = pcb; e->sent.len = len; - if (!_send_async_event(&e)) { - free((void *)(e)); - return ERR_TIMEOUT; - } + + queue_mutex_guard guard; + _send_async_event(e); return ERR_OK; } @@ -504,37 +484,36 @@ void AsyncTCP_detail::tcp_error(void *arg, int8_t err) { } // enqueue event to be processed in the async task for the user callback - lwip_tcp_event_packet_t *e = (lwip_tcp_event_packet_t *)malloc(sizeof(lwip_tcp_event_packet_t)); + lwip_tcp_event_packet_t *e = new (std::nothrow) lwip_tcp_event_packet_t{LWIP_TCP_ERROR, client}; if (!e) { log_e("Failed to allocate event packet"); return; } - e->event = LWIP_TCP_ERROR; - e->client = client; e->error.err = err; - if (!_send_async_event(&e)) { - ::free((void *)(e)); - } + + queue_mutex_guard guard; + _send_async_event(e); } static void _tcp_dns_found(const char *name, struct ip_addr *ipaddr, void *arg) { - lwip_tcp_event_packet_t *e = (lwip_tcp_event_packet_t *)malloc(sizeof(lwip_tcp_event_packet_t)); + // ets_printf("+DNS: name=%s ipaddr=0x%08x arg=%x\n", name, ipaddr, arg); + auto client = reinterpret_cast(arg); + + lwip_tcp_event_packet_t *e = new (std::nothrow) lwip_tcp_event_packet_t{LWIP_TCP_DNS, client}; if (!e) { log_e("Failed to allocate event packet"); return; } - // ets_printf("+DNS: name=%s ipaddr=0x%08x arg=%x\n", name, ipaddr, arg); - e->event = LWIP_TCP_DNS; - e->client = reinterpret_cast(arg); + e->dns.name = name; if (ipaddr) { memcpy(&e->dns.addr, ipaddr, sizeof(struct ip_addr)); } else { memset(&e->dns.addr, 0, sizeof(e->dns.addr)); } - if (!_send_async_event(&e)) { - free((void *)(e)); - } + + queue_mutex_guard guard; + _send_async_event(e); } /* @@ -1578,15 +1557,13 @@ int8_t AsyncTCP_detail::tcp_accept(void *arg, tcp_pcb *pcb, int8_t err) { if (c && c->pcb()) { c->setNoDelay(server->_noDelay); - lwip_tcp_event_packet_t *e = (lwip_tcp_event_packet_t *)malloc(sizeof(lwip_tcp_event_packet_t)); + lwip_tcp_event_packet_t *e = new (std::nothrow) lwip_tcp_event_packet_t{LWIP_TCP_ACCEPT, c}; if (e) { - e->event = LWIP_TCP_ACCEPT; e->accept.server = server; - e->client = c; - if (_prepend_async_event(&e)) { - return ERR_OK; // success - } - free((void *)(e)); + + queue_mutex_guard guard; + _prepend_async_event(e); + return ERR_OK; // success } // Couldn't allocate accept event diff --git a/src/AsyncTCPSimpleIntrusiveList.h b/src/AsyncTCPSimpleIntrusiveList.h new file mode 100644 index 0000000..71d3106 --- /dev/null +++ b/src/AsyncTCPSimpleIntrusiveList.h @@ -0,0 +1,128 @@ +// Simple intrusive list class +#pragma once + +template class SimpleIntrusiveList { + static_assert(std::is_same().next), T *>::value, "Template type must have public 'T* next' member"); + +public: + typedef T value_type; + typedef value_type *value_ptr_type; + typedef value_ptr_type *value_ptr_ptr_type; + + // Static utility methods + static size_t list_size(value_ptr_type chain) { + size_t count = 0; + for (auto c = chain; c != nullptr; c = c->next) { + ++count; + } + return count; + } + + static void delete_list(value_ptr_type chain) { + while (chain) { + auto t = chain; + chain = chain->next; + delete t; + } + } + +public: + // Object methods + + SimpleIntrusiveList() : _head(nullptr), _tail(&_head) {} + ~SimpleIntrusiveList() { + clear(); + } + + // Noncopyable, nonmovable + SimpleIntrusiveList(const SimpleIntrusiveList &) = delete; + SimpleIntrusiveList(SimpleIntrusiveList &&) = delete; + SimpleIntrusiveList &operator=(const SimpleIntrusiveList &) = delete; + SimpleIntrusiveList &operator=(SimpleIntrusiveList &&) = delete; + + inline void push_back(value_ptr_type obj) { + if (obj) { + *_tail = obj; + _tail = &obj->next; + } + } + + inline void push_front(value_ptr_type obj) { + if (obj) { + if (_head == nullptr) { + _tail = &obj->next; + } + obj->next = _head; + _head = obj; + } + } + + inline value_ptr_type pop_front() { + auto rv = _head; + if (_head) { + if (_tail == &_head->next) { + _tail = &_head; + } + _head = _head->next; + } + return rv; + } + + inline void clear() { + // Assumes all elements were allocated with "new" + delete_list(_head); + _head = nullptr; + _tail = &_head; + } + + inline size_t size() const { + return list_size(_head); + } + + template inline value_ptr_type remove_if(const function_type &condition) { + value_ptr_type removed = nullptr; + value_ptr_ptr_type current_ptr = &_head; + while (*current_ptr != nullptr) { + value_ptr_type current = *current_ptr; + if (condition(*current)) { + // Remove this item from the list by moving the next item in + *current_ptr = current->next; + // If we were the last item, reset tail + if (current->next == nullptr) { + _tail = current_ptr; + } + // Prepend this item to the removed list + current->next = removed; + removed = current; + // do not advance current_ptr + } else { + // advance current_ptr + current_ptr = &(*current_ptr)->next; + } + } + + // Return the removed entries + return removed; + } + + inline value_ptr_type begin() const { + return _head; + } + + bool validate_tail() const { + if (_head == nullptr) { + return (_tail == &_head); + } + auto p = _head; + while (p->next != nullptr) { + p = p->next; + } + return _tail == &p->next; + } + +private: + // Data members + value_ptr_type _head; + value_ptr_ptr_type _tail; + +}; // class simple_intrusive_list From 18cc80dcbe12fe12763cee44ca97f27c231b22f1 Mon Sep 17 00:00:00 2001 From: Will Miles Date: Tue, 22 Apr 2025 19:20:00 -0400 Subject: [PATCH 2/7] Factor common binding code --- src/AsyncTCP.cpp | 50 ++++++++++++++++++++---------------------------- 1 file changed, 21 insertions(+), 29 deletions(-) diff --git a/src/AsyncTCP.cpp b/src/AsyncTCP.cpp index 21a4103..d9cc6da 100644 --- a/src/AsyncTCP.cpp +++ b/src/AsyncTCP.cpp @@ -375,6 +375,22 @@ static bool _start_async_task() { * LwIP Callbacks * */ +static void _bind_tcp_callbacks(tcp_pcb *pcb, AsyncClient *client) { + tcp_arg(pcb, client); + tcp_recv(pcb, &AsyncTCP_detail::tcp_recv); + tcp_sent(pcb, &AsyncTCP_detail::tcp_sent); + tcp_err(pcb, &AsyncTCP_detail::tcp_error); + tcp_poll(pcb, &AsyncTCP_detail::tcp_poll, CONFIG_ASYNC_TCP_POLL_TIMER); +} + +static void _reset_tcp_callbacks(tcp_pcb *pcb) { + tcp_arg(pcb, NULL); + tcp_sent(pcb, NULL); + tcp_recv(pcb, NULL); + tcp_err(pcb, NULL); + tcp_poll(pcb, NULL, 0); +} + static int8_t _tcp_clear_events(AsyncClient *client) { lwip_tcp_event_packet_t *e = new (std::nothrow) lwip_tcp_event_packet_t{LWIP_TCP_CLEAR, client}; if (!e) { @@ -472,13 +488,7 @@ void AsyncTCP_detail::tcp_error(void *arg, int8_t err) { // ets_printf("+E: 0x%08x\n", arg); AsyncClient *client = reinterpret_cast(arg); if (client && client->_pcb) { - tcp_arg(client->_pcb, NULL); - if (client->_pcb->state == LISTEN) { - ::tcp_sent(client->_pcb, NULL); - ::tcp_recv(client->_pcb, NULL); - ::tcp_err(client->_pcb, NULL); - ::tcp_poll(client->_pcb, NULL, 0); - } + _reset_tcp_callbacks(client->_pcb); client->_pcb = nullptr; client->_free_closed_slot(); } @@ -724,11 +734,7 @@ AsyncClient::AsyncClient(tcp_pcb *pcb) _closed_slot = INVALID_CLOSED_SLOT; if (_pcb) { _rx_last_packet = millis(); - tcp_arg(_pcb, this); - tcp_recv(_pcb, &AsyncTCP_detail::tcp_recv); - tcp_sent(_pcb, &AsyncTCP_detail::tcp_sent); - tcp_err(_pcb, &AsyncTCP_detail::tcp_error); - tcp_poll(_pcb, &AsyncTCP_detail::tcp_poll, CONFIG_ASYNC_TCP_POLL_TIMER); + _bind_tcp_callbacks(_pcb, this); if (!_allocate_closed_slot()) { _close(); } @@ -821,11 +827,7 @@ bool AsyncClient::connect(ip_addr_t addr, uint16_t port) { log_e("pcb == NULL"); return false; } - tcp_arg(pcb, this); - tcp_err(pcb, &AsyncTCP_detail::tcp_error); - tcp_recv(pcb, &AsyncTCP_detail::tcp_recv); - tcp_sent(pcb, &AsyncTCP_detail::tcp_sent); - tcp_poll(pcb, &AsyncTCP_detail::tcp_poll, CONFIG_ASYNC_TCP_POLL_TIMER); + _bind_tcp_callbacks(pcb, this); } esp_err_t err = _tcp_connect(pcb, _closed_slot, &addr, port, (tcp_connected_fn)&_tcp_connected); @@ -968,11 +970,7 @@ int8_t AsyncClient::_close() { if (_pcb) { { tcp_core_guard tcg; - tcp_arg(_pcb, NULL); - tcp_sent(_pcb, NULL); - tcp_recv(_pcb, NULL); - tcp_err(_pcb, NULL); - tcp_poll(_pcb, NULL, 0); + _reset_tcp_callbacks(_pcb); } _tcp_clear_events(this); err = _tcp_close(_pcb, _closed_slot); @@ -1052,13 +1050,7 @@ int8_t AsyncClient::_lwip_fin(tcp_pcb *pcb, int8_t err) { log_d("0x%08" PRIx32 " != 0x%08" PRIx32, (uint32_t)pcb, (uint32_t)_pcb); return ERR_OK; } - tcp_arg(_pcb, NULL); - if (_pcb->state == LISTEN) { - tcp_sent(_pcb, NULL); - tcp_recv(_pcb, NULL); - tcp_err(_pcb, NULL); - tcp_poll(_pcb, NULL, 0); - } + _reset_tcp_callbacks(_pcb); if (tcp_close(_pcb) != ERR_OK) { tcp_abort(_pcb); } From 6825e07e1e458c08cf321fc15c4ff0bd034c3e31 Mon Sep 17 00:00:00 2001 From: Will Miles Date: Tue, 22 Apr 2025 19:20:00 -0400 Subject: [PATCH 3/7] Purge queue when unbinding --- src/AsyncTCP.cpp | 27 +++++++-------------------- 1 file changed, 7 insertions(+), 20 deletions(-) diff --git a/src/AsyncTCP.cpp b/src/AsyncTCP.cpp index d9cc6da..c50ead2 100644 --- a/src/AsyncTCP.cpp +++ b/src/AsyncTCP.cpp @@ -91,7 +91,6 @@ typedef enum { LWIP_TCP_FIN, LWIP_TCP_ERROR, LWIP_TCP_POLL, - LWIP_TCP_CLEAR, LWIP_TCP_ACCEPT, LWIP_TCP_CONNECTED, LWIP_TCP_DNS @@ -276,8 +275,6 @@ void AsyncTCP_detail::handle_async_event(lwip_tcp_event_packet_t *e) { if (e->client == NULL) { // do nothing when arg is NULL // ets_printf("event arg == NULL: 0x%08x\n", e->recv.pcb); - } else if (e->event == LWIP_TCP_CLEAR) { - _remove_events_for_client(e->client); } else if (e->event == LWIP_TCP_RECV) { // ets_printf("-R: 0x%08x\n", e->recv.pcb); e->client->_recv(e->recv.pcb, e->recv.pb, e->recv.err); @@ -383,23 +380,15 @@ static void _bind_tcp_callbacks(tcp_pcb *pcb, AsyncClient *client) { tcp_poll(pcb, &AsyncTCP_detail::tcp_poll, CONFIG_ASYNC_TCP_POLL_TIMER); } -static void _reset_tcp_callbacks(tcp_pcb *pcb) { +static void _reset_tcp_callbacks(tcp_pcb *pcb, AsyncClient *client) { tcp_arg(pcb, NULL); tcp_sent(pcb, NULL); tcp_recv(pcb, NULL); tcp_err(pcb, NULL); tcp_poll(pcb, NULL, 0); -} - -static int8_t _tcp_clear_events(AsyncClient *client) { - lwip_tcp_event_packet_t *e = new (std::nothrow) lwip_tcp_event_packet_t{LWIP_TCP_CLEAR, client}; - if (!e) { - log_e("Failed to allocate event packet"); - return ERR_MEM; + if (client) { + _remove_events_for_client(client); } - queue_mutex_guard guard; - _prepend_async_event(e); - return ERR_OK; } static int8_t _tcp_connected(void *arg, tcp_pcb *pcb, int8_t err) { @@ -460,7 +449,7 @@ int8_t AsyncTCP_detail::tcp_recv(void *arg, struct tcp_pcb *pcb, struct pbuf *pb e->fin.pcb = pcb; e->fin.err = err; // close the PCB in LwIP thread - reinterpret_cast(arg)->_lwip_fin(e->fin.pcb, e->fin.err); + client->_lwip_fin(e->fin.pcb, e->fin.err); } queue_mutex_guard guard; @@ -488,7 +477,7 @@ void AsyncTCP_detail::tcp_error(void *arg, int8_t err) { // ets_printf("+E: 0x%08x\n", arg); AsyncClient *client = reinterpret_cast(arg); if (client && client->_pcb) { - _reset_tcp_callbacks(client->_pcb); + _reset_tcp_callbacks(client->_pcb, client); client->_pcb = nullptr; client->_free_closed_slot(); } @@ -970,9 +959,8 @@ int8_t AsyncClient::_close() { if (_pcb) { { tcp_core_guard tcg; - _reset_tcp_callbacks(_pcb); + _reset_tcp_callbacks(_pcb, this); } - _tcp_clear_events(this); err = _tcp_close(_pcb, _closed_slot); if (err != ERR_OK) { err = abort(); @@ -1050,7 +1038,7 @@ int8_t AsyncClient::_lwip_fin(tcp_pcb *pcb, int8_t err) { log_d("0x%08" PRIx32 " != 0x%08" PRIx32, (uint32_t)pcb, (uint32_t)_pcb); return ERR_OK; } - _reset_tcp_callbacks(_pcb); + _reset_tcp_callbacks(_pcb, this); if (tcp_close(_pcb) != ERR_OK) { tcp_abort(_pcb); } @@ -1061,7 +1049,6 @@ int8_t AsyncClient::_lwip_fin(tcp_pcb *pcb, int8_t err) { // In Async Thread int8_t AsyncClient::_fin(tcp_pcb *pcb, int8_t err) { - _tcp_clear_events(this); if (_discard_cb) { _discard_cb(_discard_cb_arg, this); } From 27bbd298b260c0b382116f3cd38d5065597c06f6 Mon Sep 17 00:00:00 2001 From: Will Miles Date: Fri, 2 May 2025 20:03:01 -0400 Subject: [PATCH 4/7] _tcp_bind_api: Close automatically on failure This eliminates a round-trip through the LwIP lock and allows _tcp_close_api to specialize for AsyncClient. --- src/AsyncTCP.cpp | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/src/AsyncTCP.cpp b/src/AsyncTCP.cpp index c50ead2..93f503a 100644 --- a/src/AsyncTCP.cpp +++ b/src/AsyncTCP.cpp @@ -676,6 +676,12 @@ static esp_err_t _tcp_connect(tcp_pcb *pcb, int8_t closed_slot, ip_addr_t *addr, static err_t _tcp_bind_api(struct tcpip_api_call_data *api_call_msg) { tcp_api_call_t *msg = (tcp_api_call_t *)api_call_msg; msg->err = tcp_bind(msg->pcb, msg->bind.addr, msg->bind.port); + if (msg->err != ERR_OK) { + // Close the pcb on behalf of the server without an extra round-trip through the LwIP lock + if (tcp_close(msg->pcb) != ERR_OK) { + tcp_abort(msg->pcb); + } + } return msg->err; } @@ -1495,7 +1501,7 @@ void AsyncServer::begin() { err = _tcp_bind(_pcb, &_addr, _port); if (err != ERR_OK) { - _tcp_close(_pcb, -1); + // pcb was closed by _tcp_bind _pcb = NULL; log_e("bind error: %d", err); return; From b3b43cd71b05a01b381dae23ae0c96479c9d599e Mon Sep 17 00:00:00 2001 From: Will Miles Date: Fri, 2 May 2025 20:14:09 -0400 Subject: [PATCH 5/7] Complete AsyncClient close on LwIP thread Ensure that _tcp_close completely disconnects a pcb from an AsyncClient - All callbacks are unhooked - All events are purged - abort() called on close() failure This fixes some race conditions with closing, particularly without CONFIG_LWIP_TCPIP_CORE_LOCKING, where an event might be processed for a now-dead client if it arrived after arg was cleared but before the callbacks were disconnected. --- src/AsyncTCP.cpp | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/src/AsyncTCP.cpp b/src/AsyncTCP.cpp index 93f503a..00c537f 100644 --- a/src/AsyncTCP.cpp +++ b/src/AsyncTCP.cpp @@ -527,6 +527,7 @@ typedef struct { int8_t closed_slot; int8_t err; union { + AsyncClient *close; struct { const char *data; size_t size; @@ -617,18 +618,25 @@ static err_t _tcp_close_api(struct tcpip_api_call_data *api_call_msg) { tcp_api_call_t *msg = (tcp_api_call_t *)api_call_msg; msg->err = ERR_CONN; if (msg->closed_slot == INVALID_CLOSED_SLOT || !_closed_slots[msg->closed_slot]) { + // Unlike the other calls, this is not a direct wrapper of the LwIP function; + // we perform the AsyncClient teardown interlocked safely with the LwIP task. + _reset_tcp_callbacks(msg->pcb, msg->close); msg->err = tcp_close(msg->pcb); + if (msg->err != ERR_OK) { + tcp_abort(msg->pcb); + } } return msg->err; } -static esp_err_t _tcp_close(tcp_pcb *pcb, int8_t closed_slot) { +static esp_err_t _tcp_close(tcp_pcb *pcb, int8_t closed_slot, AsyncClient *client) { if (!pcb) { return ERR_CONN; } tcp_api_call_t msg; msg.pcb = pcb; msg.closed_slot = closed_slot; + msg.close = client; tcpip_api_call(_tcp_close_api, (struct tcpip_api_call_data *)&msg); return msg.err; } @@ -963,14 +971,7 @@ int8_t AsyncClient::_close() { // ets_printf("X: 0x%08x\n", (uint32_t)this); int8_t err = ERR_OK; if (_pcb) { - { - tcp_core_guard tcg; - _reset_tcp_callbacks(_pcb, this); - } - err = _tcp_close(_pcb, _closed_slot); - if (err != ERR_OK) { - err = abort(); - } + _tcp_close(_pcb, _closed_slot, this); _free_closed_slot(); _pcb = NULL; if (_discard_cb) { From 9262a9e24b094b64b02e257c301ceabe515c1294 Mon Sep 17 00:00:00 2001 From: Will Miles Date: Mon, 26 May 2025 19:22:03 -0400 Subject: [PATCH 6/7] Add size cache to SimpleIntrusiveList Cache the list length to avoid performing expensive lookups during stochastic poll coaelescence. This can be removed later when the size() function is no longer necessary outside of a debug context. --- src/AsyncTCPSimpleIntrusiveList.h | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/src/AsyncTCPSimpleIntrusiveList.h b/src/AsyncTCPSimpleIntrusiveList.h index 71d3106..589268c 100644 --- a/src/AsyncTCPSimpleIntrusiveList.h +++ b/src/AsyncTCPSimpleIntrusiveList.h @@ -44,6 +44,7 @@ template class SimpleIntrusiveList { if (obj) { *_tail = obj; _tail = &obj->next; + ++_size; } } @@ -54,6 +55,7 @@ template class SimpleIntrusiveList { } obj->next = _head; _head = obj; + ++_size; } } @@ -64,6 +66,7 @@ template class SimpleIntrusiveList { _tail = &_head; } _head = _head->next; + --_size; } return rv; } @@ -73,10 +76,11 @@ template class SimpleIntrusiveList { delete_list(_head); _head = nullptr; _tail = &_head; + _size = 0; } inline size_t size() const { - return list_size(_head); + return _size; } template inline value_ptr_type remove_if(const function_type &condition) { @@ -91,6 +95,7 @@ template class SimpleIntrusiveList { if (current->next == nullptr) { _tail = current_ptr; } + --_size; // Prepend this item to the removed list current->next = removed; removed = current; @@ -124,5 +129,6 @@ template class SimpleIntrusiveList { // Data members value_ptr_type _head; value_ptr_ptr_type _tail; + size_t _size; }; // class simple_intrusive_list From 5f088394a65408f3d2b6f5e6c2dfcec0b2a27bc6 Mon Sep 17 00:00:00 2001 From: Will Miles Date: Mon, 26 May 2025 21:46:32 -0400 Subject: [PATCH 7/7] Remove singleton pattern on queue mutex The creation check in the hot path comes at a measurable performance cost. --- src/AsyncTCP.cpp | 21 ++++++++++++--------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/src/AsyncTCP.cpp b/src/AsyncTCP.cpp index 210231f..905a054 100644 --- a/src/AsyncTCP.cpp +++ b/src/AsyncTCP.cpp @@ -152,21 +152,17 @@ class AsyncTCP_detail { // Guard class for the global queue namespace { -class queue_mutex_guard { - // Create-on-first-use idiom for an embedded mutex - static SemaphoreHandle_t _async_queue_mutex() { - static SemaphoreHandle_t mutex = xSemaphoreCreateMutex(); - assert(mutex != nullptr); - return mutex; - }; +static SemaphoreHandle_t _async_queue_mutex = nullptr; + +class queue_mutex_guard { bool holds_mutex; public: - inline queue_mutex_guard() : holds_mutex(xSemaphoreTake(_async_queue_mutex(), portMAX_DELAY)){}; + inline queue_mutex_guard() : holds_mutex(xSemaphoreTake(_async_queue_mutex, portMAX_DELAY)){}; inline ~queue_mutex_guard() { if (holds_mutex) { - xSemaphoreGive(_async_queue_mutex()); + xSemaphoreGive(_async_queue_mutex); } }; inline explicit operator bool() const { @@ -357,6 +353,13 @@ static bool customTaskCreateUniversal( } static bool _start_async_task() { + if (!_async_queue_mutex) { + _async_queue_mutex = xSemaphoreCreateMutex(); + if (!_async_queue_mutex) { + return false; + } + } + if (!_async_service_task_handle) { customTaskCreateUniversal( _async_service_task, "async_tcp", CONFIG_ASYNC_TCP_STACK_SIZE, NULL, CONFIG_ASYNC_TCP_PRIORITY, &_async_service_task_handle, CONFIG_ASYNC_TCP_RUNNING_CORE