diff --git a/d2/src/main/java/com/linkedin/d2/xds/XdsClientImpl.java b/d2/src/main/java/com/linkedin/d2/xds/XdsClientImpl.java index 515d51e93b..23f0259e7b 100644 --- a/d2/src/main/java/com/linkedin/d2/xds/XdsClientImpl.java +++ b/d2/src/main/java/com/linkedin/d2/xds/XdsClientImpl.java @@ -23,9 +23,9 @@ import com.google.common.collect.Maps; import com.google.protobuf.InvalidProtocolBufferException; import com.google.rpc.Code; -import com.linkedin.d2.jmx.XdsServerMetricsProvider; import com.linkedin.d2.jmx.NoOpXdsServerMetricsProvider; import com.linkedin.d2.jmx.XdsClientJmx; +import com.linkedin.d2.jmx.XdsServerMetricsProvider; import com.linkedin.d2.xds.GlobCollectionUtils.D2UriIdentifier; import com.linkedin.util.RateLimitedLogger; import com.linkedin.util.clock.SystemClock; @@ -197,8 +197,16 @@ private void startRpcStreamLocal() { } AggregatedDiscoveryServiceGrpc.AggregatedDiscoveryServiceStub stub = AggregatedDiscoveryServiceGrpc.newStub(_managedChannel); - _adsStream = new AdsStream(stub); - _readyTimeoutFuture = _executorService.schedule(() -> { + AdsStream stream = new AdsStream(stub); + _adsStream = stream; + _readyTimeoutFuture = _executorService.schedule(() -> + { + // There is a race condition where the task can be executed right as it's being cancelled. This checks whether + // the current state is still pointing to the right stream, and whether it is ready before notifying of an error. + if (_adsStream != stream || stream.isReady()) + { + return; + } _log.warn("ADS stream not ready within {} milliseconds", _readyTimeoutMillis); // notify subscribers about the error and wait for the stream to be ready by keeping it open. notifyStreamError(Status.DEADLINE_EXCEEDED); @@ -996,9 +1004,15 @@ private void close(Exception error) { private void cleanUp() { - if (_adsStream == this) { + if (_adsStream == this) + { _adsStream = null; } + if (_retryRpcStreamFuture != null) + { + _retryRpcStreamFuture.cancel(true); + _retryRpcStreamFuture = null; + } } } }