From 9643b6021432b91808332392f3bf42b2439256de Mon Sep 17 00:00:00 2001 From: Paul Chesnais Date: Fri, 21 Jun 2024 23:28:25 +0200 Subject: [PATCH] Better handling of stream retries on fast disconnects (#1006) Copied from the Slack context (thanks Bohan for the writeup): Summary: we looked into the bad host's log and our xds client code. There is an issue in the reconnect logic. When a reconnect ads stream is closed/rejected for stream limit on an observer A, the stream ready timeout for this stream was not cancelled before it retries with another observer B. So that after the stream with observer B becomes ready, the ready timeout for observer A was executed which cleaned the stream with observer B. This leads to the client enables backup (FS) store and never tries to connect with the observer again. ``` 2024/06/20 10:07:00.204 INFO [XdsClientImpl] [Indis xDS client executor-4-1] [followfeed-query] [AAYbT3nOcB56sIf0OjkOIg==] ADS stream started, connected to server: main.indis-registry-observer.prod-lor1.atd.disco.linkedin.com:32123 2024/06/20 10:07:00.367 ERROR [XdsClientImpl] [Indis xDS client executor-4-1] [followfeed-query] [AAYbT3nQ9CeWMXpo8cQEFg==] ADS stream closed with status UNAVAILABLE: Network closed for unknown reason. Cause: null 2024/06/20 10:07:00.367 INFO [XdsClientImpl] [Indis xDS client executor-4-1] [followfeed-query] [AAYbT3nQ9CeWMXpo8cQEFg==] Retry ADS stream in 859991762 ns 2024/06/20 10:07:01.228 INFO [XdsClientImpl] [Indis xDS client executor-4-1] [followfeed-query] [AAYbT3neFIsVPcAaz2Yv3w==] ADS stream started, connected to server: main.indis-registry-observer.prod-lor1.atd.disco.linkedin.com:32123 2024/06/20 10:07:01.240 INFO [XdsClientImpl] [Indis xDS client executor-4-1] [followfeed-query] [AAYbT3neRq29/jeDmOPBqw==] Successfully established stream with ADS server: lor1-app104284.prod.linkedin.com 2024/06/20 10:07:01.240 INFO [XdsClientImpl] [Indis xDS client executor-4-1] [followfeed-query] [AAYbT3neRe10FiykxUBBow==] ADS stream ready, cancelled timeout task: true 2024/06/20 10:07:01.240 INFO [XdsLoadBalancer] [Indis xDS client executor-4-1] [followfeed-query] [AAYbT3neRe10FiykxUBBow==] Enabled primary stores 2024/06/20 10:07:02.202 WARN [XdsClientImpl] [Indis xDS client executor-4-1] [followfeed-query] [AAYbT3ns9W3jJPwtvqLIlw==] ADS stream not ready within 2000 milliseconds 2024/06/20 10:07:02.202 INFO [XdsLoadBalancer] [Indis xDS client executor-4-1] [followfeed-query] [AAYbT3ns9W3jJPwtvqLIlw==] Enabled backup stores ``` --- .../com/linkedin/d2/xds/XdsClientImpl.java | 22 +++++++++++++++---- 1 file changed, 18 insertions(+), 4 deletions(-) diff --git a/d2/src/main/java/com/linkedin/d2/xds/XdsClientImpl.java b/d2/src/main/java/com/linkedin/d2/xds/XdsClientImpl.java index 515d51e93b..23f0259e7b 100644 --- a/d2/src/main/java/com/linkedin/d2/xds/XdsClientImpl.java +++ b/d2/src/main/java/com/linkedin/d2/xds/XdsClientImpl.java @@ -23,9 +23,9 @@ import com.google.common.collect.Maps; import com.google.protobuf.InvalidProtocolBufferException; import com.google.rpc.Code; -import com.linkedin.d2.jmx.XdsServerMetricsProvider; import com.linkedin.d2.jmx.NoOpXdsServerMetricsProvider; import com.linkedin.d2.jmx.XdsClientJmx; +import com.linkedin.d2.jmx.XdsServerMetricsProvider; import com.linkedin.d2.xds.GlobCollectionUtils.D2UriIdentifier; import com.linkedin.util.RateLimitedLogger; import com.linkedin.util.clock.SystemClock; @@ -197,8 +197,16 @@ private void startRpcStreamLocal() { } AggregatedDiscoveryServiceGrpc.AggregatedDiscoveryServiceStub stub = AggregatedDiscoveryServiceGrpc.newStub(_managedChannel); - _adsStream = new AdsStream(stub); - _readyTimeoutFuture = _executorService.schedule(() -> { + AdsStream stream = new AdsStream(stub); + _adsStream = stream; + _readyTimeoutFuture = _executorService.schedule(() -> + { + // There is a race condition where the task can be executed right as it's being cancelled. This checks whether + // the current state is still pointing to the right stream, and whether it is ready before notifying of an error. + if (_adsStream != stream || stream.isReady()) + { + return; + } _log.warn("ADS stream not ready within {} milliseconds", _readyTimeoutMillis); // notify subscribers about the error and wait for the stream to be ready by keeping it open. notifyStreamError(Status.DEADLINE_EXCEEDED); @@ -996,9 +1004,15 @@ private void close(Exception error) { private void cleanUp() { - if (_adsStream == this) { + if (_adsStream == this) + { _adsStream = null; } + if (_retryRpcStreamFuture != null) + { + _retryRpcStreamFuture.cancel(true); + _retryRpcStreamFuture = null; + } } } }