Skip to content

Commit

Permalink
Cancel xds stream ready timeout when the stream is closed. Correct xd…
Browse files Browse the repository at this point in the history
…s connection status metric. (#1007)

* Cancel xds stream ready timeout when the stream is closed. Correct xds connection status metric.

* minor fix on comment

* return early to reduce if-wrapping

* remove getDelay and fix typo in log
  • Loading branch information
bohhyang authored Jun 25, 2024
1 parent 9643b60 commit bba13f9
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 17 deletions.
6 changes: 5 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ and what APIs have changed, if applicable.

## [Unreleased]

## [29.57.1] - 2024-06-24
- Cancel xds stream ready timeout when the stream is closed. Correct xds connection status metric.

## [29.57.0] - 2024-06-16
- Add xds client metric for receiving invalid resource

Expand Down Expand Up @@ -5701,7 +5704,8 @@ patch operations can re-use these classes for generating patch messages.

## [0.14.1]

[Unreleased]: https://github.com/linkedin/rest.li/compare/v29.57.0...master
[Unreleased]: https://github.com/linkedin/rest.li/compare/v29.57.1...master
[29.57.1]: https://github.com/linkedin/rest.li/compare/v29.57.0...v29.57.1
[29.57.0]: https://github.com/linkedin/rest.li/compare/v29.56.1...v29.57.0
[29.56.1]: https://github.com/linkedin/rest.li/compare/v29.56.0...v29.56.1
[29.56.0]: https://github.com/linkedin/rest.li/compare/v29.55.0...v29.56.0
Expand Down
41 changes: 26 additions & 15 deletions d2/src/main/java/com/linkedin/d2/xds/XdsClientImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ private void startRpcStreamLocal() {
// to one of the sub-channels (unless an error or complete callback is called).
}, _readyTimeoutMillis, TimeUnit.MILLISECONDS);
_adsStream.start();
_log.info("ADS stream started, connected to server: {}", _managedChannel.authority());
_log.info("Starting ADS stream, connecting to server: {}", _managedChannel.authority());
}

@Override
Expand Down Expand Up @@ -257,21 +257,23 @@ private void readyHandler()
_log.warn("Unexpected state, ready called on null or backed off ADS stream!");
return;
}
// confirm ready state to neglect spurious callbacks; we'll get another callback whenever it is ready again.
if (_adsStream.isReady())
// Confirm ready state to neglect spurious callbacks; we'll get another callback whenever it is ready again.
// Also confirm ready timeout future is not null to avoid notifying multiple times.
if (!_adsStream.isReady() || _readyTimeoutFuture == null)
{
// if the ready timeout future is non-null, a reconnect notification hasn't been sent yet.
if (_readyTimeoutFuture != null)
{
// timeout task will be cancelled only if it hasn't already executed.
boolean cancelledTimeout = _readyTimeoutFuture.cancel(false);
_log.info("ADS stream ready, cancelled timeout task: {}", cancelledTimeout);
_readyTimeoutFuture = null; // set it to null to avoid repeat notifications to subscribers.
_xdsClientJmx.incrementReconnectionCount();
notifyStreamReconnect();
}
_xdsClientJmx.setIsConnected(true);
return;
}

// timeout task will be cancelled only if it hasn't already executed.
boolean cancelledTimeout = _readyTimeoutFuture.cancel(false);
_log.info("ADS stream ready, cancelled timeout task: {}", cancelledTimeout);
_readyTimeoutFuture = null; // set it to null to avoid repeat notifications to subscribers.
if (_retryRpcStreamFuture != null)
{
_retryRpcStreamFuture = null;
_xdsClientJmx.incrementReconnectionCount();
}
notifyStreamReconnect();
}

@VisibleForTesting
Expand Down Expand Up @@ -493,6 +495,7 @@ private void notifyStreamError(Status error)
subscriber.onError(error);
}
}
_xdsClientJmx.setIsConnected(false);
}

private void notifyStreamReconnect()
Expand All @@ -504,6 +507,7 @@ private void notifyStreamReconnect()
subscriber.onReconnect();
}
}
_xdsClientJmx.setIsConnected(true);
}

@VisibleForTesting
Expand Down Expand Up @@ -899,7 +903,7 @@ public void onNext(DeltaDiscoveryResponse response)

if (!_responseReceived && responseData.getControlPlaneIdentifier() != null)
{
_log.info("Successfully established stream with ADS server: {}",
_log.info("Successfully received response from ADS server: {}",
responseData.getControlPlaneIdentifier());
}
_responseReceived = true;
Expand Down Expand Up @@ -1008,6 +1012,13 @@ private void cleanUp()
{
_adsStream = null;
}

if (_readyTimeoutFuture != null)
{
_readyTimeoutFuture.cancel(true);
_readyTimeoutFuture = null;
}

if (_retryRpcStreamFuture != null)
{
_retryRpcStreamFuture.cancel(true);
Expand Down
18 changes: 18 additions & 0 deletions d2/src/main/java/com/linkedin/d2/xds/XdsToD2PropertiesAdaptor.java
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,12 @@ public void onChanged(XdsClient.NodeUpdate update)
_serviceEventBus.publishInitialize(serviceName, null);
}
}
else
{
// Received xds resource update while service event bus is not set. Notify that the xds source becomes
// available instead.
onReconnect();
}
}

@Override
Expand Down Expand Up @@ -284,6 +290,12 @@ public void onChanged(XdsClient.NodeUpdate update)
_clusterEventBus.publishInitialize(clusterName, null);
}
}
else
{
// Received xds resource update while cluster event bus is not set. Notify that the xds source becomes
// available instead.
onReconnect();
}
}

private void publishClusterData(String clusterName, ClusterProperties properties)
Expand Down Expand Up @@ -542,6 +554,12 @@ private void mergeAndPublishUris(String clusterName)
{
_uriEventBus.publishInitialize(clusterName, mergedUriProperties);
}
else
{
// Received xds resource update while uri event bus is not set. Notify that the xds source becomes
// available instead.
onReconnect();
}

if (_dualReadStateManager != null)
{
Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
version=29.57.0
version=29.57.1
group=com.linkedin.pegasus
org.gradle.configureondemand=true
org.gradle.parallel=true
Expand Down

0 comments on commit bba13f9

Please sign in to comment.