diff --git a/src/AsyncTCP.cpp b/src/AsyncTCP.cpp index 00ac960..722da63 100644 --- a/src/AsyncTCP.cpp +++ b/src/AsyncTCP.cpp @@ -145,12 +145,12 @@ static inline bool _init_async_event_queue() { return true; } -static inline bool _send_async_event(lwip_event_packet_t** e) { - return _async_queue && xQueueSend(_async_queue, e, portMAX_DELAY) == pdPASS; +static inline bool _send_async_event(lwip_event_packet_t** e, TickType_t wait = portMAX_DELAY) { + return _async_queue && xQueueSend(_async_queue, e, wait) == pdPASS; } -static inline bool _prepend_async_event(lwip_event_packet_t** e) { - return _async_queue && xQueueSendToFront(_async_queue, e, portMAX_DELAY) == pdPASS; +static inline bool _prepend_async_event(lwip_event_packet_t** e, TickType_t wait = portMAX_DELAY) { + return _async_queue && xQueueSendToFront(_async_queue, e, wait) == pdPASS; } static inline bool _get_async_event(lwip_event_packet_t** e) { @@ -167,6 +167,9 @@ static inline bool _get_async_event(lwip_event_packet_t** e) { return false; #endif + if ((*e)->event != LWIP_TCP_POLL) + return true; + /* Let's try to coalesce two (or more) consecutive poll events into one this usually happens with poor implemented user-callbacks that are runs too long and makes poll events to stack in the queue @@ -177,16 +180,31 @@ static inline bool _get_async_event(lwip_event_packet_t** e) { */ lwip_event_packet_t* next_pkt = NULL; while (xQueuePeek(_async_queue, &next_pkt, 0) == pdPASS){ - if (next_pkt->arg == (*e)->arg && next_pkt->event == LWIP_TCP_POLL && (*e)->event == LWIP_TCP_POLL){ + if (next_pkt->arg == (*e)->arg && next_pkt->event == LWIP_TCP_POLL){ if (xQueueReceive(_async_queue, &next_pkt, 0) == pdPASS){ free(next_pkt); next_pkt = NULL; - log_d("coalescing polls, async callback might be too slow!"); - } else - return true; - } else - return true; + log_d("coalescing polls, network congestion or async callbacks might be too slow!"); + continue; + } + } else { + /* + poor designed apps using asynctcp without proper dataflow control could flood the queue with interleaved pool/ack events. + We can try to mitigate it by discarding poll events when queue grows too much. + Let's discard poll events using linear probability curve starting from 3/4 of queue length + Poll events are periodic and connection could get another chance next time + */ + if (uxQueueMessagesWaiting(_async_queue) > (rand() % CONFIG_ASYNC_TCP_QUEUE_SIZE / 4 + CONFIG_ASYNC_TCP_QUEUE_SIZE * 3 / 4)) { + free(next_pkt); + next_pkt = NULL; + log_d("discarding poll due to queue congestion"); + // evict next event from a queue + return _get_async_event(e); + } + } + return true; } + // last resort return return true; } @@ -206,8 +224,11 @@ static bool _remove_events_with_arg(void* arg) { if ((int)first_packet->arg == (int)arg) { free(first_packet); first_packet = NULL; - // return first packet to the back of the queue - } else if (xQueueSend(_async_queue, &first_packet, portMAX_DELAY) != pdPASS) { + + // try to return first packet to the back of the queue + } else if (xQueueSend(_async_queue, &first_packet, 0) != pdPASS) { + // we can't wait here if queue is full, because this call has been done from the only consumer task of this queue + // otherwise it would deadlock, we have to discard the event return false; } } @@ -219,7 +240,9 @@ static bool _remove_events_with_arg(void* arg) { if ((int)packet->arg == (int)arg) { free(packet); packet = NULL; - } else if (xQueueSend(_async_queue, &packet, portMAX_DELAY) != pdPASS) { + } else if (xQueueSend(_async_queue, &packet, 0) != pdPASS) { + // we can't wait here if queue is full, because this call has been done from the only consumer task of this queue + // otherwise it would deadlock, we have to discard the event return false; } } @@ -362,7 +385,8 @@ static int8_t _tcp_poll(void* arg, struct tcp_pcb* pcb) { e->event = LWIP_TCP_POLL; e->arg = arg; e->poll.pcb = pcb; - if (!_send_async_event(&e)) { + // poll events are not critical 'cause those are repetitive, so we may not wait the queue in any case + if (!_send_async_event(&e, 0)) { free((void*)(e)); } return ERR_OK; @@ -684,7 +708,7 @@ AsyncClient& AsyncClient::operator=(const AsyncClient& other) { tcp_recv(_pcb, &_tcp_recv); tcp_sent(_pcb, &_tcp_sent); tcp_err(_pcb, &_tcp_error); - tcp_poll(_pcb, &_tcp_poll, 1); + tcp_poll(_pcb, &_tcp_poll, CONFIG_ASYNC_TCP_POLL_TIMER); } return *this; } @@ -782,7 +806,7 @@ bool AsyncClient::_connect(ip_addr_t addr, uint16_t port) { tcp_err(pcb, &_tcp_error); tcp_recv(pcb, &_tcp_recv); tcp_sent(pcb, &_tcp_sent); - tcp_poll(pcb, &_tcp_poll, 1); + tcp_poll(pcb, &_tcp_poll, CONFIG_ASYNC_TCP_POLL_TIMER); TCP_MUTEX_UNLOCK(); esp_err_t err = _tcp_connect(pcb, _closed_slot, &addr, port, (tcp_connected_fn)&_tcp_connected); @@ -1131,10 +1155,6 @@ void AsyncClient::_dns_found(struct ip_addr* ipaddr) { * Public Helper Methods * */ -void AsyncClient::stop() { - close(false); -} - bool AsyncClient::free() { if (!_pcb) { return true; @@ -1145,13 +1165,6 @@ bool AsyncClient::free() { return false; } -size_t AsyncClient::write(const char* data) { - if (data == NULL) { - return 0; - } - return write(data, strlen(data)); -} - size_t AsyncClient::write(const char* data, size_t size, uint8_t apiflags) { size_t will_send = add(data, size, apiflags); if (!will_send || !send()) { diff --git a/src/AsyncTCP.h b/src/AsyncTCP.h index 2df01c7..17e3db9 100644 --- a/src/AsyncTCP.h +++ b/src/AsyncTCP.h @@ -109,34 +109,86 @@ class AsyncClient { bool connect(const IPv6Address& ip, uint16_t port); #endif bool connect(const char* host, uint16_t port); + /** + * @brief close connection + * + * @param now - ignored + */ void close(bool now = false); - void stop(); + // same as close() + void stop(){ close(false); }; int8_t abort(); bool free(); - bool canSend(); // ack is not pending - size_t space(); // space available in the TCP window - size_t add(const char* data, size_t size, uint8_t apiflags = ASYNC_WRITE_FLAG_COPY); // add for sending - bool send(); // send all data added with the method above - - // write equals add()+send() - size_t write(const char* data); - size_t write(const char* data, size_t size, uint8_t apiflags = ASYNC_WRITE_FLAG_COPY); // only when canSend() == true + // ack is not pending + bool canSend(); + // TCP buffer space available + size_t space(); + + /** + * @brief add data to be send (but do not send yet) + * @note add() would call lwip's tcp_write() + By default apiflags=ASYNC_WRITE_FLAG_COPY + You could try to use apiflags with this flag unset to pass data by reference and avoid copy to socket buffer, + but looks like it does not work for Arduino's lwip in ESP32/IDF at least + it is enforced in https://github.com/espressif/esp-lwip/blob/0606eed9d8b98a797514fdf6eabb4daf1c8c8cd9/src/core/tcp_out.c#L422C5-L422C30 + if LWIP_NETIF_TX_SINGLE_PBUF is set, and it is set indeed in IDF + https://github.com/espressif/esp-idf/blob/a0f798cfc4bbd624aab52b2c194d219e242d80c1/components/lwip/port/include/lwipopts.h#L744 + * + * @param data + * @param size + * @param apiflags + * @return size_t amount of data that has been copied + */ + size_t add(const char* data, size_t size, uint8_t apiflags = ASYNC_WRITE_FLAG_COPY); + + /** + * @brief send data previously add()'ed + * + * @return true on success + * @return false on error + */ + bool send(); + + /** + * @brief add and enqueue data for sending + * @note it is same as add() + send() + * @note only make sense when canSend() == true + * + * @param data + * @param size + * @param apiflags + * @return size_t + */ + size_t write(const char* data, size_t size, uint8_t apiflags = ASYNC_WRITE_FLAG_COPY); + + /** + * @brief add and enque data for sending + * @note treats data as null-terminated string + * + * @param data + * @return size_t + */ + size_t write(const char* data){ return data == NULL ? 0 : write(data, strlen(data)); }; uint8_t state(); bool connecting(); bool connected(); bool disconnecting(); bool disconnected(); - bool freeable(); // disconnected or disconnecting + + // disconnected or disconnecting + bool freeable(); uint16_t getMss(); uint32_t getRxTimeout(); - void setRxTimeout(uint32_t timeout); // no RX data timeout for the connection in seconds + // no RX data timeout for the connection in seconds + void setRxTimeout(uint32_t timeout); uint32_t getAckTimeout(); - void setAckTimeout(uint32_t timeout); // no ACK timeout for the last sent packet in milliseconds + // no ACK timeout for the last sent packet in milliseconds + void setAckTimeout(uint32_t timeout); void setNoDelay(bool nodelay); bool getNoDelay(); @@ -165,23 +217,34 @@ class AsyncClient { IPAddress localIP(); uint16_t localPort(); - void onConnect(AcConnectHandler cb, void* arg = 0); // on successful connect - void onDisconnect(AcConnectHandler cb, void* arg = 0); // disconnected - void onAck(AcAckHandler cb, void* arg = 0); // ack received - void onError(AcErrorHandler cb, void* arg = 0); // unsuccessful connect or error - void onData(AcDataHandler cb, void* arg = 0); // data received (called if onPacket is not used) - void onPacket(AcPacketHandler cb, void* arg = 0); // data received - void onTimeout(AcTimeoutHandler cb, void* arg = 0); // ack timeout - void onPoll(AcConnectHandler cb, void* arg = 0); // every 125ms when connected - - void ackPacket(struct pbuf* pb); // ack pbuf from onPacket - size_t ack(size_t len); // ack data that you have not acked using the method below - void ackLater() { _ack_pcb = false; } // will not ack the current packet. Call from onData + // set callback - on successful connect + void onConnect(AcConnectHandler cb, void* arg = 0); + // set callback - disconnected + void onDisconnect(AcConnectHandler cb, void* arg = 0); + // set callback - ack received + void onAck(AcAckHandler cb, void* arg = 0); + // set callback - unsuccessful connect or error + void onError(AcErrorHandler cb, void* arg = 0); + // set callback - data received (called if onPacket is not used) + void onData(AcDataHandler cb, void* arg = 0); + // set callback - data received + void onPacket(AcPacketHandler cb, void* arg = 0); + // set callback - ack timeout + void onTimeout(AcTimeoutHandler cb, void* arg = 0); + // set callback - every 125ms when connected + void onPoll(AcConnectHandler cb, void* arg = 0); + + // ack pbuf from onPacket + void ackPacket(struct pbuf* pb); + // ack data that you have not acked using the method below + size_t ack(size_t len); + // will not ack the current packet. Call from onData + void ackLater() { _ack_pcb = false; } const char* errorToString(int8_t error); const char* stateToString(); - // Do not use any of the functions below! + // internal callbacks - Do NOT call any of the functions below in user code! static int8_t _s_poll(void* arg, struct tcp_pcb* tpcb); static int8_t _s_recv(void* arg, struct tcp_pcb* tpcb, struct pbuf* pb, int8_t err); static int8_t _s_fin(void* arg, struct tcp_pcb* tpcb, int8_t err);