Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cancel xds stream ready timeout when the stream is closed. Correct xds connection status metric. #1007

Merged
merged 4 commits into from
Jun 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ and what APIs have changed, if applicable.

## [Unreleased]

## [29.57.1] - 2024-06-24
- Cancel xds stream ready timeout when the stream is closed. Correct xds connection status metric.

## [29.57.0] - 2024-06-16
- Add xds client metric for receiving invalid resource

Expand Down Expand Up @@ -5701,7 +5704,8 @@ patch operations can re-use these classes for generating patch messages.

## [0.14.1]

[Unreleased]: https://github.com/linkedin/rest.li/compare/v29.57.0...master
[Unreleased]: https://github.com/linkedin/rest.li/compare/v29.57.1...master
[29.57.1]: https://github.com/linkedin/rest.li/compare/v29.57.0...v29.57.1
[29.57.0]: https://github.com/linkedin/rest.li/compare/v29.56.1...v29.57.0
[29.56.1]: https://github.com/linkedin/rest.li/compare/v29.56.0...v29.56.1
[29.56.0]: https://github.com/linkedin/rest.li/compare/v29.55.0...v29.56.0
Expand Down
41 changes: 26 additions & 15 deletions d2/src/main/java/com/linkedin/d2/xds/XdsClientImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ private void startRpcStreamLocal() {
// to one of the sub-channels (unless an error or complete callback is called).
}, _readyTimeoutMillis, TimeUnit.MILLISECONDS);
_adsStream.start();
_log.info("ADS stream started, connected to server: {}", _managedChannel.authority());
_log.info("Starting ADS stream, connecting to server: {}", _managedChannel.authority());
}

@Override
Expand Down Expand Up @@ -257,21 +257,23 @@ private void readyHandler()
_log.warn("Unexpected state, ready called on null or backed off ADS stream!");
return;
}
// confirm ready state to neglect spurious callbacks; we'll get another callback whenever it is ready again.
if (_adsStream.isReady())
// Confirm ready state to neglect spurious callbacks; we'll get another callback whenever it is ready again.
// Also confirm ready timeout future is not null to avoid notifying multiple times.
if (!_adsStream.isReady() || _readyTimeoutFuture == null)
{
// if the ready timeout future is non-null, a reconnect notification hasn't been sent yet.
if (_readyTimeoutFuture != null)
{
// timeout task will be cancelled only if it hasn't already executed.
boolean cancelledTimeout = _readyTimeoutFuture.cancel(false);
_log.info("ADS stream ready, cancelled timeout task: {}", cancelledTimeout);
_readyTimeoutFuture = null; // set it to null to avoid repeat notifications to subscribers.
_xdsClientJmx.incrementReconnectionCount();
notifyStreamReconnect();
}
_xdsClientJmx.setIsConnected(true);
return;
}

// timeout task will be cancelled only if it hasn't already executed.
boolean cancelledTimeout = _readyTimeoutFuture.cancel(false);
_log.info("ADS stream ready, cancelled timeout task: {}", cancelledTimeout);
_readyTimeoutFuture = null; // set it to null to avoid repeat notifications to subscribers.
Comment on lines +261 to +270
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although not too bad, this might be some unneeded refactoring? (don't fix what ain't broken)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, it's just return-early, as suggested by @brycezhongqing above.

if (_retryRpcStreamFuture != null)
{
_retryRpcStreamFuture = null;
_xdsClientJmx.incrementReconnectionCount();
}
notifyStreamReconnect();
}

@VisibleForTesting
Expand Down Expand Up @@ -493,6 +495,7 @@ private void notifyStreamError(Status error)
subscriber.onError(error);
}
}
_xdsClientJmx.setIsConnected(false);
}

private void notifyStreamReconnect()
Expand All @@ -504,6 +507,7 @@ private void notifyStreamReconnect()
subscriber.onReconnect();
}
}
_xdsClientJmx.setIsConnected(true);
}

@VisibleForTesting
Expand Down Expand Up @@ -899,7 +903,7 @@ public void onNext(DeltaDiscoveryResponse response)

if (!_responseReceived && responseData.getControlPlaneIdentifier() != null)
{
_log.info("Successfully established stream with ADS server: {}",
_log.info("Successfully received response from ADS server: {}",
responseData.getControlPlaneIdentifier());
}
_responseReceived = true;
Expand Down Expand Up @@ -1008,6 +1012,13 @@ private void cleanUp()
{
_adsStream = null;
}

if (_readyTimeoutFuture != null)
{
_readyTimeoutFuture.cancel(true);
_readyTimeoutFuture = null;
}

if (_retryRpcStreamFuture != null)
{
_retryRpcStreamFuture.cancel(true);
Expand Down
18 changes: 18 additions & 0 deletions d2/src/main/java/com/linkedin/d2/xds/XdsToD2PropertiesAdaptor.java
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,12 @@ public void onChanged(XdsClient.NodeUpdate update)
_serviceEventBus.publishInitialize(serviceName, null);
}
}
else
{
// Received xds resource update while service event bus is not set. Notify that the xds source becomes
// available instead.
onReconnect();
}
}

@Override
Expand Down Expand Up @@ -284,6 +290,12 @@ public void onChanged(XdsClient.NodeUpdate update)
_clusterEventBus.publishInitialize(clusterName, null);
}
}
else
{
// Received xds resource update while cluster event bus is not set. Notify that the xds source becomes
// available instead.
onReconnect();
}
}

private void publishClusterData(String clusterName, ClusterProperties properties)
Expand Down Expand Up @@ -542,6 +554,12 @@ private void mergeAndPublishUris(String clusterName)
{
_uriEventBus.publishInitialize(clusterName, mergedUriProperties);
}
else
{
// Received xds resource update while uri event bus is not set. Notify that the xds source becomes
// available instead.
onReconnect();
}

if (_dualReadStateManager != null)
{
Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
version=29.57.0
version=29.57.1
group=com.linkedin.pegasus
org.gradle.configureondemand=true
org.gradle.parallel=true
Expand Down
Loading