Skip to content

Commit

Permalink
queue deadlock and congestion aviodance
Browse files Browse the repository at this point in the history
 - do not let `_remove_events_with_arg()` call to block on queue pertubation, otherwise it could deadlock

 - in any case do not block on adding LWIP_TCP_POLL event to the queue, it does not make much sense anyway
   due to poll events are repetitive

 - `_get_async_event()` will discard LWIP_TCP_POLL events when q gets filled up
   this will work in combination with throttling poll events when adding to the queue
   Poor designed apps and multiple parallel connections could flood the queue with interleaved poll events
   that can't be properly throttled or coalesced. So we can discard it in the eviction task after coalescing
   It will work in this way:
    - queue is up to 1/4 full - all events are entering the queue and serviced
    - queue is from 1/4 and up to 3/4 full - new poll events are throttled on enqueue with linear probability
    - queue is from 3/4 up to full top - all new poll events are ignored on enqueue and existing poll events
      already in the queue are discarded on eviction with linear probability giving away priority for all other
      events to be serviced. It is expected that on a new poll timer connection polls could reenter the queue
  • Loading branch information
vortigont committed Dec 15, 2024
1 parent 80b2733 commit 045448a
Showing 1 changed file with 40 additions and 16 deletions.
56 changes: 40 additions & 16 deletions src/AsyncTCP.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -145,12 +145,12 @@ static inline bool _init_async_event_queue() {
return true;
}

static inline bool _send_async_event(lwip_event_packet_t** e) {
return _async_queue && xQueueSend(_async_queue, e, portMAX_DELAY) == pdPASS;
static inline bool _send_async_event(lwip_event_packet_t** e, TickType_t wait = portMAX_DELAY) {
return _async_queue && xQueueSend(_async_queue, e, wait) == pdPASS;
}

static inline bool _prepend_async_event(lwip_event_packet_t** e) {
return _async_queue && xQueueSendToFront(_async_queue, e, portMAX_DELAY) == pdPASS;
static inline bool _prepend_async_event(lwip_event_packet_t** e, TickType_t wait = portMAX_DELAY) {
return _async_queue && xQueueSendToFront(_async_queue, e, wait) == pdPASS;
}

static inline bool _get_async_event(lwip_event_packet_t** e) {
Expand All @@ -167,6 +167,9 @@ static inline bool _get_async_event(lwip_event_packet_t** e) {
return false;
#endif

if ((*e)->event != LWIP_TCP_POLL)
return true;

/*
Let's try to coalesce two (or more) consecutive poll events into one
this usually happens with poor implemented user-callbacks that are runs too long and makes poll events to stack in the queue
Expand All @@ -177,16 +180,31 @@ static inline bool _get_async_event(lwip_event_packet_t** e) {
*/
lwip_event_packet_t* next_pkt = NULL;
while (xQueuePeek(_async_queue, &next_pkt, 0) == pdPASS){
if (next_pkt->arg == (*e)->arg && next_pkt->event == LWIP_TCP_POLL && (*e)->event == LWIP_TCP_POLL){
if (next_pkt->arg == (*e)->arg && next_pkt->event == LWIP_TCP_POLL){
if (xQueueReceive(_async_queue, &next_pkt, 0) == pdPASS){
free(next_pkt);
next_pkt = NULL;
log_d("coalescing polls, async callback might be too slow!");
} else
return true;
} else
return true;
log_d("coalescing polls, network congestion or async callbacks might be too slow!");
continue;
}
} else {
/*
poor designed apps using asynctcp without proper dataflow control could flood the queue with interleaved pool/ack events.
We can try to mitigate it by discarding poll events when queue grows too much.
Let's discard poll events using linear probability curve starting from 3/4 of queue length
Poll events are periodic and connection could get another chance next time
*/
if (uxQueueMessagesWaiting(_async_queue) > (rand() % CONFIG_ASYNC_TCP_QUEUE_SIZE / 4 + CONFIG_ASYNC_TCP_QUEUE_SIZE * 3 / 4)) {
free(next_pkt);
next_pkt = NULL;
log_d("discarding poll due to queue congestion");
// evict next event from a queue
return _get_async_event(e);
}
}
return true;
}
// last resort return
return true;
}

Expand All @@ -206,8 +224,11 @@ static bool _remove_events_with_arg(void* arg) {
if ((int)first_packet->arg == (int)arg) {
free(first_packet);
first_packet = NULL;
// return first packet to the back of the queue
} else if (xQueueSend(_async_queue, &first_packet, portMAX_DELAY) != pdPASS) {

// try to return first packet to the back of the queue
} else if (xQueueSend(_async_queue, &first_packet, 0) != pdPASS) {
// we can't wait here if queue is full, because this call has been done from the only consumer task of this queue
// otherwise it would deadlock, we have to discard the event
return false;
}
}
Expand All @@ -219,7 +240,9 @@ static bool _remove_events_with_arg(void* arg) {
if ((int)packet->arg == (int)arg) {
free(packet);
packet = NULL;
} else if (xQueueSend(_async_queue, &packet, portMAX_DELAY) != pdPASS) {
} else if (xQueueSend(_async_queue, &packet, 0) != pdPASS) {
// we can't wait here if queue is full, because this call has been done from the only consumer task of this queue
// otherwise it would deadlock, we have to discard the event
return false;
}
}
Expand Down Expand Up @@ -362,7 +385,8 @@ static int8_t _tcp_poll(void* arg, struct tcp_pcb* pcb) {
e->event = LWIP_TCP_POLL;
e->arg = arg;
e->poll.pcb = pcb;
if (!_send_async_event(&e)) {
// poll events are not critical 'cause those are repetitive, so we may not wait the queue in any case
if (!_send_async_event(&e, 0)) {
free((void*)(e));
}
return ERR_OK;
Expand Down Expand Up @@ -684,7 +708,7 @@ AsyncClient& AsyncClient::operator=(const AsyncClient& other) {
tcp_recv(_pcb, &_tcp_recv);
tcp_sent(_pcb, &_tcp_sent);
tcp_err(_pcb, &_tcp_error);
tcp_poll(_pcb, &_tcp_poll, 1);
tcp_poll(_pcb, &_tcp_poll, CONFIG_ASYNC_TCP_POLL_TIMER);
}
return *this;
}
Expand Down Expand Up @@ -782,7 +806,7 @@ bool AsyncClient::_connect(ip_addr_t addr, uint16_t port) {
tcp_err(pcb, &_tcp_error);
tcp_recv(pcb, &_tcp_recv);
tcp_sent(pcb, &_tcp_sent);
tcp_poll(pcb, &_tcp_poll, 1);
tcp_poll(pcb, &_tcp_poll, CONFIG_ASYNC_TCP_POLL_TIMER);
TCP_MUTEX_UNLOCK();

esp_err_t err = _tcp_connect(pcb, _closed_slot, &addr, port, (tcp_connected_fn)&_tcp_connected);
Expand Down

0 comments on commit 045448a

Please sign in to comment.