diff --git a/CHANGELOG.md b/CHANGELOG.md index d4017ac048..68a56540c8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,9 @@ and what APIs have changed, if applicable. ## [Unreleased] +## [29.57.1] - 2024-06-24 +- Cancel xds stream ready timeout when the stream is closed. Correct xds connection status metric. + ## [29.57.0] - 2024-06-16 - Add xds client metric for receiving invalid resource @@ -5701,7 +5704,8 @@ patch operations can re-use these classes for generating patch messages. ## [0.14.1] -[Unreleased]: https://github.com/linkedin/rest.li/compare/v29.57.0...master +[Unreleased]: https://github.com/linkedin/rest.li/compare/v29.57.1...master +[29.57.1]: https://github.com/linkedin/rest.li/compare/v29.57.0...v29.57.1 [29.57.0]: https://github.com/linkedin/rest.li/compare/v29.56.1...v29.57.0 [29.56.1]: https://github.com/linkedin/rest.li/compare/v29.56.0...v29.56.1 [29.56.0]: https://github.com/linkedin/rest.li/compare/v29.55.0...v29.56.0 diff --git a/d2/src/main/java/com/linkedin/d2/xds/XdsClientImpl.java b/d2/src/main/java/com/linkedin/d2/xds/XdsClientImpl.java index 23f0259e7b..ee4e25231f 100644 --- a/d2/src/main/java/com/linkedin/d2/xds/XdsClientImpl.java +++ b/d2/src/main/java/com/linkedin/d2/xds/XdsClientImpl.java @@ -214,7 +214,7 @@ private void startRpcStreamLocal() { // to one of the sub-channels (unless an error or complete callback is called). }, _readyTimeoutMillis, TimeUnit.MILLISECONDS); _adsStream.start(); - _log.info("ADS stream started, connected to server: {}", _managedChannel.authority()); + _log.info("Starting ADS stream, connecting to server: {}", _managedChannel.authority()); } @Override @@ -257,21 +257,23 @@ private void readyHandler() _log.warn("Unexpected state, ready called on null or backed off ADS stream!"); return; } - // confirm ready state to neglect spurious callbacks; we'll get another callback whenever it is ready again. - if (_adsStream.isReady()) + // Confirm ready state to neglect spurious callbacks; we'll get another callback whenever it is ready again. + // Also confirm ready timeout future is not null to avoid notifying multiple times. + if (!_adsStream.isReady() || _readyTimeoutFuture == null) { - // if the ready timeout future is non-null, a reconnect notification hasn't been sent yet. - if (_readyTimeoutFuture != null) - { - // timeout task will be cancelled only if it hasn't already executed. - boolean cancelledTimeout = _readyTimeoutFuture.cancel(false); - _log.info("ADS stream ready, cancelled timeout task: {}", cancelledTimeout); - _readyTimeoutFuture = null; // set it to null to avoid repeat notifications to subscribers. - _xdsClientJmx.incrementReconnectionCount(); - notifyStreamReconnect(); - } - _xdsClientJmx.setIsConnected(true); + return; + } + + // timeout task will be cancelled only if it hasn't already executed. + boolean cancelledTimeout = _readyTimeoutFuture.cancel(false); + _log.info("ADS stream ready, cancelled timeout task: {}", cancelledTimeout); + _readyTimeoutFuture = null; // set it to null to avoid repeat notifications to subscribers. + if (_retryRpcStreamFuture != null) + { + _retryRpcStreamFuture = null; + _xdsClientJmx.incrementReconnectionCount(); } + notifyStreamReconnect(); } @VisibleForTesting @@ -493,6 +495,7 @@ private void notifyStreamError(Status error) subscriber.onError(error); } } + _xdsClientJmx.setIsConnected(false); } private void notifyStreamReconnect() @@ -504,6 +507,7 @@ private void notifyStreamReconnect() subscriber.onReconnect(); } } + _xdsClientJmx.setIsConnected(true); } @VisibleForTesting @@ -899,7 +903,7 @@ public void onNext(DeltaDiscoveryResponse response) if (!_responseReceived && responseData.getControlPlaneIdentifier() != null) { - _log.info("Successfully established stream with ADS server: {}", + _log.info("Successfully received response from ADS server: {}", responseData.getControlPlaneIdentifier()); } _responseReceived = true; @@ -1008,6 +1012,13 @@ private void cleanUp() { _adsStream = null; } + + if (_readyTimeoutFuture != null) + { + _readyTimeoutFuture.cancel(true); + _readyTimeoutFuture = null; + } + if (_retryRpcStreamFuture != null) { _retryRpcStreamFuture.cancel(true); diff --git a/d2/src/main/java/com/linkedin/d2/xds/XdsToD2PropertiesAdaptor.java b/d2/src/main/java/com/linkedin/d2/xds/XdsToD2PropertiesAdaptor.java index dc33d133dc..a8b9293135 100644 --- a/d2/src/main/java/com/linkedin/d2/xds/XdsToD2PropertiesAdaptor.java +++ b/d2/src/main/java/com/linkedin/d2/xds/XdsToD2PropertiesAdaptor.java @@ -228,6 +228,12 @@ public void onChanged(XdsClient.NodeUpdate update) _serviceEventBus.publishInitialize(serviceName, null); } } + else + { + // Received xds resource update while service event bus is not set. Notify that the xds source becomes + // available instead. + onReconnect(); + } } @Override @@ -284,6 +290,12 @@ public void onChanged(XdsClient.NodeUpdate update) _clusterEventBus.publishInitialize(clusterName, null); } } + else + { + // Received xds resource update while cluster event bus is not set. Notify that the xds source becomes + // available instead. + onReconnect(); + } } private void publishClusterData(String clusterName, ClusterProperties properties) @@ -542,6 +554,12 @@ private void mergeAndPublishUris(String clusterName) { _uriEventBus.publishInitialize(clusterName, mergedUriProperties); } + else + { + // Received xds resource update while uri event bus is not set. Notify that the xds source becomes + // available instead. + onReconnect(); + } if (_dualReadStateManager != null) { diff --git a/gradle.properties b/gradle.properties index 4603347bbc..eb09a518a3 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,4 +1,4 @@ -version=29.57.0 +version=29.57.1 group=com.linkedin.pegasus org.gradle.configureondemand=true org.gradle.parallel=true