diff --git a/CHANGELOG.md b/CHANGELOG.md index 9c721059df..42f44879a1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,9 @@ and what APIs have changed, if applicable. ## [Unreleased] +## [29.46.7] - 2023-10-10 +- fix xDS client bugs and race conditions + ## [29.46.6] - 2023-10-04 - simplify symlink subscription in xds flow @@ -5548,7 +5551,8 @@ patch operations can re-use these classes for generating patch messages. ## [0.14.1] -[Unreleased]: https://github.com/linkedin/rest.li/compare/v29.46.6...master +[Unreleased]: https://github.com/linkedin/rest.li/compare/v29.46.7...master +[29.46.7]: https://github.com/linkedin/rest.li/compare/v29.46.6...v29.46.7 [29.46.6]: https://github.com/linkedin/rest.li/compare/v29.46.5...v29.46.6 [29.46.5]: https://github.com/linkedin/rest.li/compare/v29.45.1...v29.45.2 [29.46.4]: https://github.com/linkedin/rest.li/compare/v29.46.3...v29.46.4 diff --git a/d2/src/main/java/com/linkedin/d2/balancer/D2ClientBuilder.java b/d2/src/main/java/com/linkedin/d2/balancer/D2ClientBuilder.java index d01015a76d..1aa24f63d5 100644 --- a/d2/src/main/java/com/linkedin/d2/balancer/D2ClientBuilder.java +++ b/d2/src/main/java/com/linkedin/d2/balancer/D2ClientBuilder.java @@ -199,7 +199,9 @@ public D2Client build() _config.failoutConfigProviderFactory, _config.failoutRedirectStrategy, _config.serviceDiscoveryEventEmitter, - _config.dualReadStateManager + _config.dualReadStateManager, + _config.xdsExecutorService, + _config.xdsStreamReadyTimeout ); final LoadBalancerWithFacilitiesFactory loadBalancerFactory = (_config.lbWithFacilitiesFactory == null) ? @@ -641,6 +643,19 @@ public D2ClientBuilder setDualReadStateManager(DualReadStateManager dualReadStat return this; } + /** + * Single-threaded executor service for xDS communication. + */ + public D2ClientBuilder setXdsExecutorService(ScheduledExecutorService xdsExecutorService) { + _config.xdsExecutorService = xdsExecutorService; + return this; + } + + public D2ClientBuilder setXdsStreamReadyTimeout(long xdsStreamReadyTimeout) { + _config.xdsStreamReadyTimeout = xdsStreamReadyTimeout; + return this; + } + private Map createDefaultTransportClientFactories() { final Map clientFactories = new HashMap<>(); diff --git a/d2/src/main/java/com/linkedin/d2/balancer/D2ClientConfig.java b/d2/src/main/java/com/linkedin/d2/balancer/D2ClientConfig.java index b34dc8884e..5645451847 100644 --- a/d2/src/main/java/com/linkedin/d2/balancer/D2ClientConfig.java +++ b/d2/src/main/java/com/linkedin/d2/balancer/D2ClientConfig.java @@ -125,6 +125,9 @@ public class D2ClientConfig public ServiceDiscoveryEventEmitter serviceDiscoveryEventEmitter = new LogOnlyServiceDiscoveryEventEmitter(); // default to use log-only emitter public DualReadStateManager dualReadStateManager = null; + public ScheduledExecutorService xdsExecutorService = null; + public Long xdsStreamReadyTimeout = null; + public D2ClientConfig() { } @@ -190,7 +193,9 @@ public D2ClientConfig() FailoutConfigProviderFactory failoutConfigProviderFactory, FailoutRedirectStrategy failoutRedirectStrategy, ServiceDiscoveryEventEmitter serviceDiscoveryEventEmitter, - DualReadStateManager dualReadStateManager) + DualReadStateManager dualReadStateManager, + ScheduledExecutorService xdsExecutorService, + Long xdsStreamReadyTimeout) { this.zkHosts = zkHosts; this.xdsServer = xdsServer; @@ -254,5 +259,7 @@ public D2ClientConfig() this.failoutRedirectStrategy = failoutRedirectStrategy; this.serviceDiscoveryEventEmitter = serviceDiscoveryEventEmitter; this.dualReadStateManager = dualReadStateManager; + this.xdsExecutorService = xdsExecutorService; + this.xdsStreamReadyTimeout = xdsStreamReadyTimeout; } } diff --git a/d2/src/main/java/com/linkedin/d2/xds/XdsClientImpl.java b/d2/src/main/java/com/linkedin/d2/xds/XdsClientImpl.java index 307af9c75b..43ef338e92 100644 --- a/d2/src/main/java/com/linkedin/d2/xds/XdsClientImpl.java +++ b/d2/src/main/java/com/linkedin/d2/xds/XdsClientImpl.java @@ -28,6 +28,8 @@ import io.grpc.Status; import io.grpc.internal.BackoffPolicy; import io.grpc.internal.ExponentialBackoffPolicy; +import io.grpc.stub.ClientCallStreamObserver; +import io.grpc.stub.ClientResponseObserver; import io.grpc.stub.StreamObserver; import java.util.ArrayList; import java.util.Collection; @@ -39,6 +41,7 @@ import java.util.Objects; import java.util.Set; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import javax.annotation.Nonnull; import javax.annotation.Nullable; @@ -49,6 +52,7 @@ public class XdsClientImpl extends XdsClient { private static final Logger _log = LoggerFactory.getLogger(XdsClientImpl.class); + public static final long DEFAULT_READY_TIMEOUT_MILLIS = 2000L; private final Map _d2NodeSubscribers = new HashMap<>(); private final Map _d2SymlinkNodeSubscribers = new HashMap<>(); @@ -62,9 +66,19 @@ public class XdsClientImpl extends XdsClient private BackoffPolicy _retryBackoffPolicy; private AdsStream _adsStream; private boolean _shutdown; + private ScheduledFuture _retryRpcStreamFuture; + private ScheduledFuture _readyTimeoutFuture; + private final long _readyTimeoutMillis; + @Deprecated public XdsClientImpl(Node node, ManagedChannel managedChannel, ScheduledExecutorService executorService) { + this(node, managedChannel, executorService, DEFAULT_READY_TIMEOUT_MILLIS); + } + + public XdsClientImpl(Node node, ManagedChannel managedChannel, ScheduledExecutorService executorService, + long readyTimeoutMillis) { + _readyTimeoutMillis = readyTimeoutMillis; _node = node; _managedChannel = managedChannel; _executorService = executorService; @@ -83,11 +97,13 @@ void watchXdsResource(String resourceName, ResourceType type, ResourceWatcher wa subscriber = new ResourceSubscriber(type, resourceName); resourceSubscriberMap.put(resourceName, subscriber); - if (_adsStream == null) + if (_adsStream == null && !isInBackoff()) { - startRpcStream(); + startRpcStreamLocal(); + } + if (_adsStream != null) { + _adsStream.sendDiscoveryRequest(type, Collections.singletonList(resourceName)); } - _adsStream.sendDiscoveryRequest(type, Collections.singletonList(resourceName)); } subscriber.addWatcher(watcher); }); @@ -96,9 +112,32 @@ void watchXdsResource(String resourceName, ResourceType type, ResourceWatcher wa @Override public void startRpcStream() { + _executorService.execute(() -> { + if (!isInBackoff()) { + startRpcStreamLocal(); + } + }); + } + + // Start RPC stream. Must be called from the executor, and only if we're not backed off. + private void startRpcStreamLocal() { + if (_shutdown) { + _log.warn("RPC stream cannot be started after shutdown!"); + return; + } + // Check rpc stream is null to ensure duplicate RPC retry tasks are no-op + if (_adsStream != null) { + _log.warn("Tried to create duplicate RPC stream, ignoring!"); + return; + } AggregatedDiscoveryServiceGrpc.AggregatedDiscoveryServiceStub stub = AggregatedDiscoveryServiceGrpc.newStub(_managedChannel); _adsStream = new AdsStream(stub); + _readyTimeoutFuture = _executorService.schedule(() -> { + _log.warn("ADS stream not ready within {} milliseconds", _readyTimeoutMillis); + // notify subscribers about the error and wait for the stream to be ready by keeping it open. + notifyStreamError(Status.DEADLINE_EXCEEDED); + }, _readyTimeoutMillis, TimeUnit.MILLISECONDS); _adsStream.start(); _log.info("ADS stream started, connected to server: {}", _managedChannel.authority()); } @@ -122,6 +161,37 @@ String getXdsServerAuthority() return _managedChannel.authority(); } + /** + * The client may be in backoff if there are RPC stream failures, and if it's waiting to establish the stream again. + * NOTE: Must be called from the executor. + * @return {@code true} if the client is in backoff + */ + private boolean isInBackoff() { + return _adsStream == null && _retryRpcStreamFuture != null && !_retryRpcStreamFuture.isDone(); + } + + /** + * Handles ready callbacks from the RPC stream. Must be called from the executor. + */ + private void readyHandler() { + _log.debug("Received ready callback from the ADS stream"); + if (_adsStream == null || isInBackoff()) { + _log.warn("Unexpected state, ready called on null or backed off ADS stream!"); + return; + } + // confirm ready state to neglect spurious callbacks; we'll get another callback whenever it is ready again. + if (_adsStream.isReady()) { + // if the ready timeout future is non-null, a reconnect notification hasn't been sent yet. + if (_readyTimeoutFuture != null) { + // timeout task will be cancelled only if it hasn't already executed. + boolean cancelledTimeout = _readyTimeoutFuture.cancel(false); + _log.info("ADS stream ready, cancelled timeout task: {}", cancelledTimeout); + _readyTimeoutFuture = null; // set it to null to avoid repeat notifications to subscribers. + notifyStreamReconnect(); + } + } + } + private void handleD2NodeResponse(DiscoveryResponseData data) { Map updates = new HashMap<>(); @@ -213,7 +283,7 @@ private void handleResourceUpdate(Map updates, } } - private void handleStreamClosed(Status error) { + private void notifyStreamError(Status error) { for (ResourceSubscriber subscriber : _d2NodeSubscribers.values()) { subscriber.onError(error); } @@ -222,7 +292,7 @@ private void handleStreamClosed(Status error) { } } - private void handleStreamRestarted() { + private void notifyStreamReconnect() { for (ResourceSubscriber subscriber : _d2NodeSubscribers.values()) { subscriber.onReconnect(); } @@ -328,10 +398,7 @@ private void onReconnect() final class RpcRetryTask implements Runnable { @Override public void run() { - if (_shutdown) { - return; - } - startRpcStream(); + startRpcStreamLocal(); for (ResourceType type : ResourceType.values()) { if (type == ResourceType.UNKNOWN) { continue; @@ -342,7 +409,6 @@ public void run() { _adsStream.sendDiscoveryRequest(type, resources); } } - handleStreamRestarted(); } } @@ -477,37 +543,45 @@ private AdsStream(@Nonnull AggregatedDiscoveryServiceGrpc.AggregatedDiscoverySer _responseReceived = false; } + public boolean isReady() { + return _requestWriter != null && ((ClientCallStreamObserver) _requestWriter).isReady(); + } + private void start() { - StreamObserver responseReader = new StreamObserver() - { - @Override - public void onNext(DeltaDiscoveryResponse response) - { - _executorService.execute(() -> - { - _log.debug("Received {} response:\n{}", ResourceType.fromTypeUrl(response.getTypeUrl()), response); - DiscoveryResponseData responseData = DiscoveryResponseData.fromEnvoyProto(response); - handleResponse(responseData); - }); - } - - @Override - public void onError(Throwable t) - { - _executorService.execute(() -> handleRpcError(t)); - } - - @Override - public void onCompleted() - { - _executorService.execute(() -> handleRpcCompleted()); - } - }; + StreamObserver responseReader = + new ClientResponseObserver() { + @Override + public void beforeStart(ClientCallStreamObserver requestStream) { + requestStream.setOnReadyHandler(() -> _executorService.execute(XdsClientImpl.this::readyHandler)); + } + + @Override + public void onNext(DeltaDiscoveryResponse response) + { + _executorService.execute(() -> + { + _log.debug("Received {} response:\n{}", ResourceType.fromTypeUrl(response.getTypeUrl()), response); + DiscoveryResponseData responseData = DiscoveryResponseData.fromEnvoyProto(response); + handleResponse(responseData); + }); + } + + @Override + public void onError(Throwable t) + { + _executorService.execute(() -> handleRpcError(t)); + } + + @Override + public void onCompleted() + { + _executorService.execute(() -> handleRpcCompleted()); + } + }; _requestWriter = _stub.withWaitForReady().deltaAggregatedResources(responseReader); } - /** * Sends a client-initiated discovery request. */ @@ -576,6 +650,7 @@ private void handleRpcCompleted() handleRpcStreamClosed(Status.UNAVAILABLE.withDescription("ADS stream closed by server")); } + // Must be called from the executor. private void handleRpcStreamClosed(Status error) { if (_closed) @@ -585,7 +660,7 @@ private void handleRpcStreamClosed(Status error) _log.error("ADS stream closed with status {}: {}. Cause: {}", error.getCode(), error.getDescription(), error.getCause()); _closed = true; - handleStreamClosed(error); + notifyStreamError(error); cleanUp(); if (_responseReceived || _retryBackoffPolicy == null) { // Reset the backoff sequence if had received a response, or backoff sequence @@ -597,7 +672,7 @@ private void handleRpcStreamClosed(Status error) delayNanos = _retryBackoffPolicy.nextBackoffNanos(); } _log.info("Retry ADS stream in {} ns", delayNanos); - _executorService.schedule(new RpcRetryTask(), delayNanos, TimeUnit.NANOSECONDS); + _retryRpcStreamFuture = _executorService.schedule(new RpcRetryTask(), delayNanos, TimeUnit.NANOSECONDS); } private void close(Exception error) { diff --git a/d2/src/main/java/com/linkedin/d2/xds/XdsToD2PropertiesAdaptor.java b/d2/src/main/java/com/linkedin/d2/xds/XdsToD2PropertiesAdaptor.java index 2bba859484..8836a7c628 100644 --- a/d2/src/main/java/com/linkedin/d2/xds/XdsToD2PropertiesAdaptor.java +++ b/d2/src/main/java/com/linkedin/d2/xds/XdsToD2PropertiesAdaptor.java @@ -57,6 +57,7 @@ public class XdsToD2PropertiesAdaptor private static final String D2_URI_NODE_PREFIX = "/d2/uris/"; private static final char SYMLINK_NODE_IDENTIFIER = '$'; private static final char PATH_SEPARATOR = '/'; + private static final String NON_EXISTENT_CLUSTER = "NonExistentCluster"; private final XdsClient _xdsClient; private final List _xdsConnectionListeners; @@ -82,7 +83,7 @@ public class XdsToD2PropertiesAdaptor private final Object _symlinkAndActualNodeLock = new Object(); private final ServiceDiscoveryEventEmitter _eventEmitter; - private boolean _isAvailable; + private Boolean _isAvailable; private PropertyEventBus _uriEventBus; private PropertyEventBus _serviceEventBus; private PropertyEventBus _clusterEventBus; @@ -97,7 +98,8 @@ public XdsToD2PropertiesAdaptor(XdsClient xdsClient, DualReadStateManager dualRe _clusterPropertiesJsonSerializer = new ClusterPropertiesJsonSerializer(); _uriPropertiesJsonSerializer = new UriPropertiesJsonSerializer(); _uriPropertiesMerger = new UriPropertiesMerger(); - _isAvailable = false; + // set to null so that the first notification on connection establishment success/failure is always sent + _isAvailable = null; _watchedClusterResources = new ConcurrentHashMap<>(); _watchedSymlinkResources = new ConcurrentHashMap<>(); _watchedServiceResources = new ConcurrentHashMap<>(); @@ -108,7 +110,10 @@ public XdsToD2PropertiesAdaptor(XdsClient xdsClient, DualReadStateManager dualRe public void start() { _xdsClient.startRpcStream(); - notifyAvailabilityChanges(true); + // Watch any resource to get notified of xds connection updates, including initial connection establishment. + // TODO: Note, this is a workaround since the xDS client implementation currently integrates connection + // error/success notifications along with the resource updates. This can be improved in a future refactor. + listenToCluster(NON_EXISTENT_CLUSTER); } public void shutdown() @@ -361,7 +366,7 @@ private void notifyAvailabilityChanges(boolean isAvailable) { synchronized (_xdsConnectionListeners) { - if (_isAvailable != isAvailable) + if (_isAvailable == null || _isAvailable != isAvailable) { _isAvailable = isAvailable; diff --git a/d2/src/main/java/com/linkedin/d2/xds/balancer/XdsLoadBalancerWithFacilitiesFactory.java b/d2/src/main/java/com/linkedin/d2/xds/balancer/XdsLoadBalancerWithFacilitiesFactory.java index f5834b73c6..3c724ef91a 100644 --- a/d2/src/main/java/com/linkedin/d2/xds/balancer/XdsLoadBalancerWithFacilitiesFactory.java +++ b/d2/src/main/java/com/linkedin/d2/xds/balancer/XdsLoadBalancerWithFacilitiesFactory.java @@ -29,6 +29,7 @@ import com.linkedin.r2.util.NamedThreadFactory; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; +import org.apache.commons.lang3.ObjectUtils; /** @@ -47,11 +48,13 @@ public LoadBalancerWithFacilities create(D2ClientConfig config) { d2ClientJmxManager.registerDualReadLoadBalancerJmx(config.dualReadStateManager.getDualReadLoadBalancerJmx()); } - ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor( - new NamedThreadFactory("D2 xDS PropertyEventExecutor")); - + ScheduledExecutorService executorService = ObjectUtils.defaultIfNull(config.xdsExecutorService, + Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("D2 xDS PropertyEventExecutor"))); + long xdsStreamReadyTimeout = ObjectUtils.defaultIfNull(config.xdsStreamReadyTimeout, + XdsClientImpl.DEFAULT_READY_TIMEOUT_MILLIS); XdsClient xdsClient = new XdsClientImpl(new Node(config.hostName), - new XdsChannelFactory(config.grpcSslContext, config.xdsServer).createChannel(), executorService); + new XdsChannelFactory(config.grpcSslContext, config.xdsServer).createChannel(), executorService, + xdsStreamReadyTimeout); XdsToD2PropertiesAdaptor adaptor = new XdsToD2PropertiesAdaptor(xdsClient, config.dualReadStateManager, config.serviceDiscoveryEventEmitter); diff --git a/gradle.properties b/gradle.properties index f39b5f486c..78dc3350f1 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,4 +1,4 @@ -version=29.46.6 +version=29.46.7 group=com.linkedin.pegasus org.gradle.configureondemand=true org.gradle.parallel=true