From 4e391dfab6849a91d04009b4ff35953296f4e55a Mon Sep 17 00:00:00 2001 From: Paul Chesnais Date: Tue, 24 Sep 2024 17:59:25 -0400 Subject: [PATCH] Fix invalid handling of glob collections for wildcard subscribers (#1022) * Add watchAllResource subscriber * Fix invalid handling of glob collections for wildcard subscribers As the title says, things weren't wired through correctly for glob collections, namely they were being ignored because they were triggering the check for whtehre the cluster is being watched at all. This fixes that (and the fact that the initial subscription did not respect the `useGlobCollections` flag). This change has been unit tested and tested through in restli-resource-explorer, and it works for both glob and non-glob. * CHANGELOG * try to make test less flaky * Un-revert * Fix CL date * CL * Make flaky tests run more --- CHANGELOG.md | 6 +- .../d2/loadbalancer/TestDynamicClient.java | 45 ++-- .../java/com/linkedin/d2/xds/XdsClient.java | 116 +++++++++- .../com/linkedin/d2/xds/XdsClientImpl.java | 216 ++++++++++++++++-- .../clients/TestBackupRequestsClient.java | 25 +- .../linkedin/d2/xds/TestXdsClientImpl.java | 115 +++++++++- gradle.properties | 2 +- .../linkedin/test/util/retry/TenRetries.java | 29 +++ 8 files changed, 500 insertions(+), 54 deletions(-) create mode 100644 test-util/src/main/java/com/linkedin/test/util/retry/TenRetries.java diff --git a/CHANGELOG.md b/CHANGELOG.md index 34fd020d28..4a526f0b45 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,9 @@ and what APIs have changed, if applicable. ## [Unreleased] +## [29.58.9] - 2024-09-24 +- Fix invalid handling of glob collections for wildcard subscribers + ## [29.58.8] - 2024-09-23 - Revert Add WildcardResourceSubscriber which could subscribe to all resources, like NODE and URIMap resources. @@ -5734,7 +5737,8 @@ patch operations can re-use these classes for generating patch messages. ## [0.14.1] -[Unreleased]: https://github.com/linkedin/rest.li/compare/v29.58.8...master +[Unreleased]: https://github.com/linkedin/rest.li/compare/v29.58.9...master +[29.58.9]: https://github.com/linkedin/rest.li/compare/v29.58.8...v29.58.9 [29.58.8]: https://github.com/linkedin/rest.li/compare/v29.58.7...v29.58.8 [29.58.7]: https://github.com/linkedin/rest.li/compare/v29.58.6...v29.58.7 [29.58.6]: https://github.com/linkedin/rest.li/compare/v29.58.5...v29.58.6 diff --git a/d2-int-test/src/test/java/com/linkedin/d2/loadbalancer/TestDynamicClient.java b/d2-int-test/src/test/java/com/linkedin/d2/loadbalancer/TestDynamicClient.java index a1f8733e15..fc6cb3f380 100644 --- a/d2-int-test/src/test/java/com/linkedin/d2/loadbalancer/TestDynamicClient.java +++ b/d2-int-test/src/test/java/com/linkedin/d2/loadbalancer/TestDynamicClient.java @@ -19,7 +19,7 @@ import com.linkedin.r2.message.rest.RestRequestBuilder; import com.linkedin.r2.message.rest.RestResponse; import com.linkedin.r2.util.NamedThreadFactory; -import com.linkedin.test.util.retry.ThreeRetries; +import com.linkedin.test.util.retry.TenRetries; import java.io.File; import java.lang.management.ManagementFactory; import java.net.URI; @@ -141,7 +141,7 @@ public void teardown() * the requests sending from the clients should result in an even distribution. The total call count * received by a single server should not deviate by more than 15% of the average. */ - @Test(retryAnalyzer = ThreeRetries.class) + @Test(retryAnalyzer = TenRetries.class) public void testBalancedLoadDistribution() { SimpleLoadBalancerStateTest.TestListener listener = new SimpleLoadBalancerStateTest.TestListener(); @@ -198,7 +198,7 @@ public void testBalancedLoadDistribution() * After the update event is received by the ZK event subscriber. One request is required to actually trigger the * load balancer state and hash ring changes. */ - @Test(retryAnalyzer = ThreeRetries.class) + @Test(retryAnalyzer = TenRetries.class) public void testD2WeightLessThanOne() { SimpleLoadBalancerStateTest.TestListener listener = new SimpleLoadBalancerStateTest.TestListener(); @@ -218,11 +218,12 @@ public void testD2WeightLessThanOne() throw new RuntimeException("Failed the test because thread was interrupted"); } - try { + try + { // Change the D2 weight of server:2851 to 0.5 invokeD2ChangeWeightJmx(new ObjectName("com.linkedin.d2:type=\"server:2851\""), 0.5); - // Wait 5ms for the change to propagate - Thread.sleep(5); + // Wait 50ms for the change to propagate + Thread.sleep(50); } catch (Exception e) { fail("Failed to invoke d2 weight change jmx", e); } @@ -284,7 +285,7 @@ public void testD2WeightLessThanOne() * And if we further increase the weight to 4.0. The host will receive 4x the traffic of the other hosts * (with a tolerance of 15%). */ - @Test(retryAnalyzer = ThreeRetries.class) + @Test(retryAnalyzer = TenRetries.class) public void testD2WeightGreaterThanOne() { SimpleLoadBalancerStateTest.TestListener listener = new SimpleLoadBalancerStateTest.TestListener(); @@ -304,11 +305,12 @@ public void testD2WeightGreaterThanOne() throw new RuntimeException("Failed the test because thread was interrupted"); } - try { + try + { // Change the D2 weight of server:2851 to 2.0 invokeD2ChangeWeightJmx(new ObjectName("com.linkedin.d2:type=\"server:2851\""), 2); - // Wait 5ms for the change to propagate - Thread.sleep(5); + // Wait 50ms for the change to propagate + Thread.sleep(50); } catch (Exception e) { fail("Failed to invoke d2 weight change jmx", e); } @@ -363,11 +365,12 @@ public void testD2WeightGreaterThanOne() } } - try { + try + { // Change the D2 weight of server:2851 to 4.0 invokeD2ChangeWeightJmx(new ObjectName("com.linkedin.d2:type=\"server:2851\""), 4); - // Wait 5ms for the change to propagate - Thread.sleep(5); + // Wait 50ms for the change to propagate + Thread.sleep(50); } catch (Exception e) { fail("Failed to invoke d2 weight change jmx", e); } @@ -405,7 +408,7 @@ public void testD2WeightGreaterThanOne() * 2. The host start receiving traffic and has a health score > 0.5. * The host will then be kicked out of the recovery program and continue to recover/degrade using normal up/downStep. */ - @Test(retryAnalyzer = ThreeRetries.class) + @Test(retryAnalyzer = TenRetries.class) public void testHostMarkDownAndMarkUp() { SimpleLoadBalancerStateTest.TestListener listener = new SimpleLoadBalancerStateTest.TestListener(); @@ -425,11 +428,12 @@ public void testHostMarkDownAndMarkUp() throw new RuntimeException("Failed the test because thread was interrupted"); } - try { + try + { // Mark down server:2851 invokeMarkDownJmx(new ObjectName("com.linkedin.d2:type=\"server:2851\"")); - // Wait 5ms for the change to propagate - Thread.sleep(5); + // Wait 50ms for the change to propagate + Thread.sleep(50); } catch (Exception e) { fail("Failed to invoke d2 weight change jmx", e); } @@ -459,11 +463,12 @@ public void testHostMarkDownAndMarkUp() throw new RuntimeException("Failed the test because thread was interrupted"); } - try { + try + { // Mark up server:2851 invokeMarkUpJmx(new ObjectName("com.linkedin.d2:type=\"server:2851\"")); - // Wait 5ms for the change to propagate - Thread.sleep(5); + // Wait 50ms for the change to propagate + Thread.sleep(50); } catch (Exception e) { fail("Failed to invoke d2 weight change jmx", e); } diff --git a/d2/src/main/java/com/linkedin/d2/xds/XdsClient.java b/d2/src/main/java/com/linkedin/d2/xds/XdsClient.java index a0c08b04f0..f5040dcb45 100644 --- a/d2/src/main/java/com/linkedin/d2/xds/XdsClient.java +++ b/d2/src/main/java/com/linkedin/d2/xds/XdsClient.java @@ -95,6 +95,91 @@ final void onChanged(ResourceUpdate update) } } + public static abstract class WildcardResourceWatcher + { + private final ResourceType _type; + + /** + * Defining a private constructor means only classes that are defined in this file can extend this class (see + * {@link ResourceWatcher}). + */ + WildcardResourceWatcher(ResourceType type) + { + _type = type; + } + + final ResourceType getType() + { + return _type; + } + + /** + * Called when the resource discovery RPC encounters some transient error. + */ + public abstract void onError(Status error); + + /** + * Called when the resource discovery RPC reestablishes connection. + */ + public abstract void onReconnect(); + + /** + * Called when a resource is added or updated. + * @param resourceName the name of the resource that was added or updated. + * @param update the new data {@link ResourceUpdate} for the resource. + */ + abstract void onChanged(String resourceName, ResourceUpdate update); + + /** + * Called when a resource is removed. + * @param resourceName the name of the resource that was removed. + */ + public abstract void onRemoval(String resourceName); + } + + public static abstract class WildcardNodeResourceWatcher extends WildcardResourceWatcher + { + public WildcardNodeResourceWatcher() + { + super(ResourceType.NODE); + } + + /** + * Called when a node resource is added or updated. + * @param resourceName the resource name of the {@link NodeUpdate} that was added or updated. + * @param update the new data for the {@link NodeUpdate}, including D2 cluster and service information. + */ + public abstract void onChanged(String resourceName, NodeUpdate update); + + @Override + final void onChanged(String resourceName, ResourceUpdate update) + { + onChanged(resourceName, (NodeUpdate) update); + } + } + + public static abstract class WildcardD2URIMapResourceWatcher extends WildcardResourceWatcher + { + public WildcardD2URIMapResourceWatcher() + { + super(ResourceType.D2_URI_MAP); + } + + /** + * Called when a {@link D2URIMapUpdate} resource is added or updated. + * @param resourceName the resource name of the {@link D2URIMapUpdate} map resource that was added or updated. + * like the /d2/uris/clusterName + * @param update the new data for the {@link D2URIMapUpdate} resource + */ + public abstract void onChanged(String resourceName, D2URIMapUpdate update); + + @Override + final void onChanged(String resourceName, ResourceUpdate update) + { + onChanged(resourceName, (D2URIMapUpdate) update); + } + } + public interface ResourceUpdate { boolean isValid(); @@ -109,7 +194,7 @@ public static final class NodeUpdate implements ResourceUpdate _nodeData = nodeData; } - XdsD2.Node getNodeData() + public XdsD2.Node getNodeData() { return _nodeData; } @@ -261,13 +346,32 @@ static ResourceType fromTypeUrl(String typeUrl) * will always notify the given watcher of the current data if it is already present, even if the given watcher was * already subscribed to said resource. However, the subscription will only be added once. */ - abstract void watchXdsResource(String resourceName, ResourceWatcher watcher); + public abstract void watchXdsResource(String resourceName, ResourceWatcher watcher); - abstract void startRpcStream(); + /** + * Subscribes the given {@link WildcardResourceWatcher} to all the resources of the corresponding type. The watcher + * will be notified whenever a resource is added or removed. Repeated calls to this function with the same watcher + * will always notify the given watcher of the current data. + */ + public abstract void watchAllXdsResources(WildcardResourceWatcher watcher); - abstract void shutdown(); + /** + * Initiates the RPC stream to the xDS server. + */ + public abstract void startRpcStream(); + + /** + * Shuts down the xDS client. + */ + public abstract void shutdown(); - abstract String getXdsServerAuthority(); + /** + * Returns the authority of the xDS server. + */ + public abstract String getXdsServerAuthority(); - abstract public XdsClientJmx getXdsClientJmx(); + /** + * Returns the JMX bean for the xDS client. + */ + public abstract XdsClientJmx getXdsClientJmx(); } diff --git a/d2/src/main/java/com/linkedin/d2/xds/XdsClientImpl.java b/d2/src/main/java/com/linkedin/d2/xds/XdsClientImpl.java index b5ee6b0126..05e4a922a3 100644 --- a/d2/src/main/java/com/linkedin/d2/xds/XdsClientImpl.java +++ b/d2/src/main/java/com/linkedin/d2/xds/XdsClientImpl.java @@ -42,7 +42,6 @@ import io.grpc.stub.ClientResponseObserver; import io.grpc.stub.StreamObserver; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -57,6 +56,7 @@ import java.util.function.BiConsumer; import java.util.function.Function; import java.util.stream.Collectors; +import java.util.stream.Stream; import javax.annotation.Nonnull; import javax.annotation.Nullable; import org.slf4j.Logger; @@ -73,10 +73,16 @@ public class XdsClientImpl extends XdsClient new RateLimitedLogger(_log, TimeUnit.MINUTES.toMillis(10), SystemClock.instance()); public static final long DEFAULT_READY_TIMEOUT_MILLIS = 2000L; + /** + * The resource subscriber map stores the subscribers to specific resources of a given type. Note that it only has 2 + * keys: {@link ResourceType#D2_URI_MAP} and {@link ResourceType#NODE}. The {@link ResourceType#D2_URI} is absent from + * this map because it should not be used as a key, as glob collection updates are translated to appear as normal map + * updates to subscribers. + */ private final Map> _resourceSubscribers = Maps.immutableEnumMap( - Arrays.stream(ResourceType.values()) - .filter(e -> e.typeUrl() != null) + Stream.of(ResourceType.NODE, ResourceType.D2_URI_MAP) .collect(Collectors.toMap(Function.identity(), e -> new HashMap<>()))); + private final Map _wildcardSubscribers = Maps.newEnumMap(ResourceType.class); private final Node _node; private final ManagedChannel _managedChannel; private final ScheduledExecutorService _executorService; @@ -131,7 +137,7 @@ public XdsClientImpl(Node node, ManagedChannel managedChannel, ScheduledExecutor } @Override - void watchXdsResource(String resourceName, ResourceWatcher watcher) + public void watchXdsResource(String resourceName, ResourceWatcher watcher) { _executorService.execute(() -> { @@ -168,6 +174,38 @@ void watchXdsResource(String resourceName, ResourceWatcher watcher) }); } + @Override + public void watchAllXdsResources(WildcardResourceWatcher watcher) + { + _executorService.execute(() -> + { + WildcardResourceSubscriber subscriber = getWildcardResourceSubscriber(watcher.getType()); + if (subscriber == null) + { + subscriber = new WildcardResourceSubscriber(watcher.getType()); + _wildcardSubscribers.put(watcher.getType(), subscriber); + + ResourceType adjustedType = + (watcher.getType() == ResourceType.D2_URI_MAP && _subscribeToUriGlobCollection) + ? ResourceType.D2_URI + : watcher.getType(); + + _log.info("Subscribing to wildcard for resource type: {}", adjustedType); + + if (_adsStream == null && !isInBackoff()) + { + startRpcStreamLocal(); + } + if (_adsStream != null) + { + _adsStream.sendDiscoveryRequest(adjustedType, Collections.singletonList("*")); + } + } + + subscriber.addWatcher(watcher); + }); + } + @Override public void startRpcStream() { @@ -225,7 +263,7 @@ private void startRpcStreamLocal() { } @Override - void shutdown() + public void shutdown() { _executorService.execute(() -> { @@ -238,7 +276,7 @@ void shutdown() } @Override - String getXdsServerAuthority() + public String getXdsServerAuthority() { return _managedChannel.authority(); } @@ -390,7 +428,8 @@ private void handleD2URICollectionResponse(DiscoveryResponseData data) ResourceSubscriber subscriber = getResourceSubscriberMap(ResourceType.D2_URI_MAP).get(uriId.getClusterResourceName()); - if (subscriber == null) + WildcardResourceSubscriber wildcardSubscriber = getWildcardResourceSubscriber(ResourceType.D2_URI_MAP); + if (subscriber == null && wildcardSubscriber == null) { String msg = String.format("Ignoring D2URI resource update for untracked cluster: %s", resourceName); _log.warn(msg); @@ -401,7 +440,17 @@ private void handleD2URICollectionResponse(DiscoveryResponseData data) // Get or create a new D2URIMapUpdate which is a copy of the existing data for that cluster. D2URIMapUpdate update = updates.computeIfAbsent(uriId.getClusterResourceName(), k -> { - D2URIMapUpdate currentData = (D2URIMapUpdate) subscriber._data; + D2URIMapUpdate currentData; + // Use the existing data from whichever subscriber is present. If both are present, they will point to the same + // D2URIMapUpdate. + if (subscriber != null) + { + currentData = (D2URIMapUpdate) subscriber._data; + } + else + { + currentData = (D2URIMapUpdate) wildcardSubscriber._data.get(uriId.getClusterResourceName()); + } if (currentData == null || !currentData.isValid()) { return new D2URIMapUpdate(null); @@ -471,6 +520,11 @@ private void handleResourceUpdate(Map updates, { subscriber.onData(entry.getValue(), _serverMetricsProvider); } + WildcardResourceSubscriber wildcardSubscriber = getWildcardResourceSubscriber(type); + if (wildcardSubscriber != null) + { + wildcardSubscriber.onData(entry.getKey(), entry.getValue()); + } } } @@ -490,6 +544,11 @@ private void handleResourceRemoval(Collection removedResources, Resource subscriber.onRemoval(); } } + WildcardResourceSubscriber wildcardSubscriber = getWildcardResourceSubscriber(type); + if (wildcardSubscriber != null) + { + removedResources.forEach(wildcardSubscriber::onRemoval); + } } @@ -502,6 +561,10 @@ private void notifyStreamError(Status error) subscriber.onError(error); } } + for (WildcardResourceSubscriber wildcardResourceSubscriber : _wildcardSubscribers.values()) + { + wildcardResourceSubscriber.onError(error); + } _xdsClientJmx.setIsConnected(false); } @@ -514,6 +577,10 @@ private void notifyStreamReconnect() subscriber.onReconnect(); } } + for (WildcardResourceSubscriber wildcardResourceSubscriber : _wildcardSubscribers.values()) + { + wildcardResourceSubscriber.onReconnect(); + } _xdsClientJmx.setIsConnected(true); } @@ -523,6 +590,12 @@ Map getResourceSubscriberMap(ResourceType type) return _resourceSubscribers.get(type); } + @VisibleForTesting + WildcardResourceSubscriber getWildcardResourceSubscriber(ResourceType type) + { + return _wildcardSubscribers.get(type); + } + static class ResourceSubscriber { private final ResourceType _type; @@ -690,24 +763,132 @@ void onRemoval() } } - final class RpcRetryTask implements Runnable { + static class WildcardResourceSubscriber + { + private final ResourceType _type; + private final Set _watchers = new HashSet<>(); + private final Map _data = new HashMap<>(); + + @VisibleForTesting + public ResourceUpdate getData(String resourceName) + { + return _data.get(resourceName); + } + + @VisibleForTesting + public void setData(String resourceName, ResourceUpdate data) + { + _data.put(resourceName, data); + } + + WildcardResourceSubscriber(ResourceType type) + { + _type = type; + } + + void addWatcher(WildcardResourceWatcher watcher) + { + _watchers.add(watcher); + for (Map.Entry entry : _data.entrySet()) + { + watcher.onChanged(entry.getKey(), entry.getValue()); + _log.debug("Notifying watcher of current data for resource {} of type {}: {}", + entry.getKey(), _type, entry.getValue()); + } + } + + private void onData(String resourceName, ResourceUpdate data) + { + if (Objects.equals(_data.get(resourceName), data)) + { + _log.debug("Received resource update data equal to the current data. Will not perform the update."); + return; + } + // null value guard to avoid overwriting the property with null + if (data != null && data.isValid()) + { + _data.put(resourceName, data); + for (WildcardResourceWatcher watcher : _watchers) + { + watcher.onChanged(resourceName, data); + } + } + else + { + if (_type == ResourceType.D2_URI_MAP || _type == ResourceType.D2_URI) + { + RATE_LIMITED_LOGGER.warn("Received invalid data for {} {}, data: {}", _type, resourceName, data); + } + else + { + _log.warn("Received invalid data for {} {}, data: {}", _type, resourceName, data); + } + } + } + + public ResourceType getType() + { + return _type; + } + + private void onError(Status error) + { + for (WildcardResourceWatcher watcher : _watchers) + { + watcher.onError(error); + } + } + + private void onReconnect() + { + for (WildcardResourceWatcher watcher : _watchers) + { + watcher.onReconnect(); + } + } + + @VisibleForTesting + void onRemoval(String resourceName) + { + _data.remove(resourceName); + for (WildcardResourceWatcher watcher : _watchers) + { + watcher.onRemoval(resourceName); + } + } + } + + final class RpcRetryTask implements Runnable + { @Override - public void run() { + public void run() + { startRpcStreamLocal(); - for (ResourceType type : ResourceType.values()) { - Collection resources = getResourceSubscriberMap(type).keySet(); - if (resources.isEmpty()) + for (ResourceType type : _resourceSubscribers.keySet()) + { + Set resources = new HashSet<>(getResourceSubscriberMap(type).keySet()); + if (resources.isEmpty() && getWildcardResourceSubscriber(type) == null) { continue; } + ResourceType rewrittenType; if (_subscribeToUriGlobCollection && type == ResourceType.D2_URI_MAP) { resources = resources.stream() .map(GlobCollectionUtils::globCollectionUrlForClusterResource) - .collect(Collectors.toSet()); - type = ResourceType.D2_URI; + .collect(Collectors.toCollection(HashSet::new)); + rewrittenType = ResourceType.D2_URI; + } + else + { + rewrittenType = type; + } + // If there is a wildcard subscriber, we should always send a wildcard request to the server. + if (getWildcardResourceSubscriber(type) != null) + { + resources.add("*"); } - _adsStream.sendDiscoveryRequest(type, resources); + _adsStream.sendDiscoveryRequest(rewrittenType, resources); } } } @@ -990,8 +1171,7 @@ private void handleRpcStreamClosed(Status error) { return; } - _log.error("ADS stream closed with status {}: {}. Cause: {}", error.getCode(), error.getDescription(), - error.getCause()); + _log.error("ADS stream closed with status {}: {}", error.getCode(), error.getDescription(), error.getCause()); _closed = true; notifyStreamError(error); cleanUp(); diff --git a/d2/src/test/java/com/linkedin/d2/balancer/clients/TestBackupRequestsClient.java b/d2/src/test/java/com/linkedin/d2/balancer/clients/TestBackupRequestsClient.java index 8ad9590d70..03fa3b1441 100644 --- a/d2/src/test/java/com/linkedin/d2/balancer/clients/TestBackupRequestsClient.java +++ b/d2/src/test/java/com/linkedin/d2/balancer/clients/TestBackupRequestsClient.java @@ -61,7 +61,6 @@ import com.linkedin.r2.transport.common.bridge.client.TransportClient; import com.linkedin.r2.transport.common.bridge.common.TransportCallback; import com.linkedin.r2.transport.common.bridge.common.TransportResponseImpl; -import com.linkedin.test.util.retry.SingleRetry; import com.linkedin.test.util.retry.ThreeRetries; import com.linkedin.util.clock.SystemClock; import java.io.IOException; @@ -234,7 +233,7 @@ public void onSuccess(StreamResponse result) { /** * Backup Request should still work when a hint is given together with the flag indicating that the hint is only a preference, not requirement. */ - @Test(invocationCount = 3, dataProvider = "isD2Async", retryAnalyzer = ThreeRetries.class) // Appears to be flaky in CI + @Test(dataProvider = "isD2Async", timeOut = 10_000L, retryAnalyzer = ThreeRetries.class) // Appears to be flaky in CI public void testRequestWithHint(boolean isD2Async) throws Exception { int responseDelayNano = 100000000; //1s till response comes back @@ -257,7 +256,7 @@ public void testRequestWithHint(boolean isD2Async) throws Exception RequestContext context1 = context.clone(); Future response1 = client.restRequest(restRequest, context1); assertEquals(response1.get().getStatus(), 200); - assertEquals(hostsReceivingRequest.size(), 2); + waitUntilTrue(() -> hostsReceivingRequest.size() == 2); assertEquals(new HashSet<>(hostsReceivingRequest).size(), 2); hostsReceivingRequest.clear(); @@ -267,7 +266,7 @@ public void testRequestWithHint(boolean isD2Async) throws Exception KeyMapper.TargetHostHints.setRequestContextTargetHost(context2, hint); Future response2 = client.restRequest(restRequest, context2); assertEquals(response2.get().getStatus(), 200); - assertEquals(hostsReceivingRequest.size(), 1); + waitUntilTrue(() -> hostsReceivingRequest.size() == 1); assertEquals(hostsReceivingRequest.poll(), hint); hostsReceivingRequest.clear(); @@ -277,7 +276,7 @@ public void testRequestWithHint(boolean isD2Async) throws Exception KeyMapper.TargetHostHints.setRequestContextOtherHostAcceptable(context3, true); Future response3 = client.restRequest(restRequest, context3); assertEquals(response3.get().getStatus(), 200); - assertEquals(hostsReceivingRequest.size(), 2); + waitUntilTrue(() -> hostsReceivingRequest.size() == 2); // The first request should be made to the hinted host while the second should go to the other. assertEquals(hostsReceivingRequest.toArray(), new URI[]{new URI("http://test1.com:123"), new URI("http://test2.com:123")}); assertEquals(new HashSet<>(hostsReceivingRequest).size(), 2); @@ -782,6 +781,22 @@ Optional getStrategyAfterUpdate(final String ser }; } + private static void waitUntilTrue(Supplier check) + { + while (!check.get()) + { + try + { + Thread.sleep(10); + } + catch (InterruptedException e) + { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + } + } + class TestLoadBalancer implements LoadBalancer { diff --git a/d2/src/test/java/com/linkedin/d2/xds/TestXdsClientImpl.java b/d2/src/test/java/com/linkedin/d2/xds/TestXdsClientImpl.java index 565a95c11e..76457deef3 100644 --- a/d2/src/test/java/com/linkedin/d2/xds/TestXdsClientImpl.java +++ b/d2/src/test/java/com/linkedin/d2/xds/TestXdsClientImpl.java @@ -9,6 +9,7 @@ import com.linkedin.d2.xds.XdsClient.ResourceType; import com.linkedin.d2.xds.XdsClientImpl.DiscoveryResponseData; import com.linkedin.d2.xds.XdsClientImpl.ResourceSubscriber; +import com.linkedin.d2.xds.XdsClientImpl.WildcardResourceSubscriber; import indis.XdsD2; import io.envoyproxy.envoy.service.discovery.v3.Resource; import java.util.Collections; @@ -159,24 +160,33 @@ public void testHandleD2NodeResponseWithData() fixture._xdsClientImpl.handleResponse(DISCOVERY_RESPONSE_NODE_DATA1); fixture.verifyAckSent(1); verify(fixture._resourceWatcher).onChanged(eq(NODE_UPDATE1)); + verify(fixture._wildcardResourceWatcher).onChanged(eq(SERVICE_RESOURCE_NAME), eq(NODE_UPDATE1)); verifyZeroInteractions(fixture._serverMetricsProvider); // initial update should not track latency XdsClient.NodeUpdate actualData = (XdsClient.NodeUpdate) fixture._nodeSubscriber.getData(); // subscriber data should be updated to NODE_UPDATE1 Assert.assertEquals(Objects.requireNonNull(actualData).getNodeData(), NODE_UPDATE1.getNodeData()); + actualData = (XdsClient.NodeUpdate) fixture._nodeWildcardSubscriber.getData(SERVICE_RESOURCE_NAME); + // subscriber data should be updated to NODE_UPDATE1 + Assert.assertEquals(Objects.requireNonNull(actualData).getNodeData(), NODE_UPDATE1.getNodeData()); // subscriber original data is invalid, xds server latency won't be tracked fixture._nodeSubscriber.setData(new XdsClient.NodeUpdate(null)); + fixture._nodeWildcardSubscriber.setData(SERVICE_RESOURCE_NAME, new XdsClient.NodeUpdate(null)); fixture._xdsClientImpl.handleResponse(DISCOVERY_RESPONSE_NODE_DATA1); fixture.verifyAckSent(2); verify(fixture._resourceWatcher, times(2)).onChanged(eq(NODE_UPDATE1)); + verify(fixture._wildcardResourceWatcher, times(2)).onChanged(eq(SERVICE_RESOURCE_NAME), eq(NODE_UPDATE1)); verifyZeroInteractions(fixture._serverMetricsProvider); // subscriber data should be updated to NODE_UPDATE2 fixture._xdsClientImpl.handleResponse(DISCOVERY_RESPONSE_NODE_DATA2); actualData = (XdsClient.NodeUpdate) fixture._nodeSubscriber.getData(); verify(fixture._resourceWatcher).onChanged(eq(NODE_UPDATE2)); + verify(fixture._wildcardResourceWatcher).onChanged(eq(SERVICE_RESOURCE_NAME), eq(NODE_UPDATE2)); verify(fixture._serverMetricsProvider).trackLatency(anyLong()); Assert.assertEquals(actualData.getNodeData(), NODE_UPDATE2.getNodeData()); + actualData = (XdsClient.NodeUpdate) fixture._nodeWildcardSubscriber.getData(SERVICE_RESOURCE_NAME); + Assert.assertEquals(actualData.getNodeData(), NODE_UPDATE2.getNodeData()); } @Test @@ -204,6 +214,8 @@ public void testHandleD2NodeUpdateWithBadData(DiscoveryResponseData badData, boo fixture._xdsClientImpl.handleResponse(badData); fixture.verifyAckOrNack(nackExpected, 1); verify(fixture._resourceWatcher).onChanged(eq(NODE.emptyData())); + // The wildcard subscriber doesn't care about bad data, it doesn't need to notify the watcher + verify(fixture._wildcardResourceWatcher, times(0)).onChanged(any(), any()); XdsClient.NodeUpdate actualData = (XdsClient.NodeUpdate) fixture._nodeSubscriber.getData(); Assert.assertNull(Objects.requireNonNull(actualData).getNodeData()); @@ -211,6 +223,7 @@ public void testHandleD2NodeUpdateWithBadData(DiscoveryResponseData badData, boo fixture._xdsClientImpl.handleResponse(badData); fixture.verifyAckOrNack(nackExpected, 2); verify(fixture._resourceWatcher).onChanged(eq(NODE_UPDATE1)); + verify(fixture._wildcardResourceWatcher, times(0)).onChanged(any(), any()); actualData = (XdsClient.NodeUpdate) fixture._nodeSubscriber.getData(); // bad data will not overwrite the original valid data Assert.assertEquals(actualData.getNodeData(), NODE_UPDATE1.getNodeData()); @@ -221,10 +234,13 @@ public void testHandleD2NodeResponseWithRemoval() { XdsClientImplFixture fixture = new XdsClientImplFixture(); fixture._nodeSubscriber.setData(NODE_UPDATE1); + fixture._nodeWildcardSubscriber.setData(SERVICE_RESOURCE_NAME, NODE_UPDATE1); fixture._xdsClientImpl.handleResponse(DISCOVERY_RESPONSE_NODE_DATA_WITH_REMOVAL); fixture.verifyAckSent(1); verify(fixture._resourceWatcher).onChanged(eq(NODE_UPDATE1)); + verify(fixture._wildcardResourceWatcher).onRemoval(eq(SERVICE_RESOURCE_NAME)); verify(fixture._nodeSubscriber).onRemoval(); + verify(fixture._nodeWildcardSubscriber).onRemoval(eq(SERVICE_RESOURCE_NAME)); XdsClient.NodeUpdate actualData = (XdsClient.NodeUpdate) fixture._nodeSubscriber.getData(); // removed resource will not overwrite the original valid data Assert.assertEquals(Objects.requireNonNull(actualData).getNodeData(), NODE_UPDATE1.getNodeData()); @@ -239,15 +255,20 @@ public void testHandleD2URIMapResponseWithData() fixture._xdsClientImpl.handleResponse(DISCOVERY_RESPONSE_URI_MAP_DATA1); fixture.verifyAckSent(1); verify(fixture._resourceWatcher).onChanged(eq(D2_URI_MAP_UPDATE_WITH_DATA1)); + verify(fixture._wildcardResourceWatcher).onChanged(eq(CLUSTER_RESOURCE_NAME), eq(D2_URI_MAP_UPDATE_WITH_DATA1)); verifyZeroInteractions(fixture._serverMetricsProvider); D2URIMapUpdate actualData = (D2URIMapUpdate) fixture._clusterSubscriber.getData(); // subscriber data should be updated to D2_URI_MAP_UPDATE_WITH_DATA1 Assert.assertEquals(Objects.requireNonNull(actualData).getURIMap(), D2_URI_MAP_UPDATE_WITH_DATA1.getURIMap()); + actualData = (D2URIMapUpdate) fixture._uriMapWildcardSubscriber.getData(CLUSTER_RESOURCE_NAME); + Assert.assertEquals(Objects.requireNonNull(actualData).getURIMap(), D2_URI_MAP_UPDATE_WITH_DATA1.getURIMap()); // subscriber original data is invalid, xds server latency won't be tracked fixture._clusterSubscriber.setData(new XdsClient.D2URIMapUpdate(null)); + fixture._uriMapWildcardSubscriber.setData(CLUSTER_RESOURCE_NAME, new XdsClient.D2URIMapUpdate(null)); fixture._xdsClientImpl.handleResponse(DISCOVERY_RESPONSE_URI_MAP_DATA1); verify(fixture._resourceWatcher, times(2)).onChanged(eq(D2_URI_MAP_UPDATE_WITH_DATA1)); + verify(fixture._wildcardResourceWatcher, times(2)).onChanged(eq(CLUSTER_RESOURCE_NAME), eq(D2_URI_MAP_UPDATE_WITH_DATA1)); verifyZeroInteractions(fixture._serverMetricsProvider); fixture.verifyAckSent(2); @@ -257,6 +278,8 @@ public void testHandleD2URIMapResponseWithData() verify(fixture._resourceWatcher).onChanged(eq(D2_URI_MAP_UPDATE_WITH_DATA2)); verify(fixture._serverMetricsProvider, times(2)).trackLatency(anyLong()); Assert.assertEquals(actualData.getURIMap(), D2_URI_MAP_UPDATE_WITH_DATA2.getURIMap()); + actualData = (D2URIMapUpdate) fixture._uriMapWildcardSubscriber.getData(CLUSTER_RESOURCE_NAME); + Assert.assertEquals(actualData.getURIMap(), D2_URI_MAP_UPDATE_WITH_DATA2.getURIMap()); fixture.verifyAckSent(3); } @@ -291,24 +314,37 @@ public void testHandleD2URIMapUpdateWithBadData(DiscoveryResponseData badData, b ? (D2URIMapUpdate) D2_URI_MAP.emptyData() : new D2URIMapUpdate(Collections.emptyMap()); verify(fixture._resourceWatcher).onChanged(eq(expectedUpdate)); + if (!invalidData) + { + verify(fixture._wildcardResourceWatcher).onChanged(eq(CLUSTER_RESOURCE_NAME), eq(expectedUpdate)); + } verify(fixture._clusterSubscriber).setData(eq(null)); + verify(fixture._uriMapWildcardSubscriber, times(0)).setData(any(), any()); verifyZeroInteractions(fixture._serverMetricsProvider); D2URIMapUpdate actualData = (D2URIMapUpdate) fixture._clusterSubscriber.getData(); Assert.assertEquals(actualData, expectedUpdate); fixture._clusterSubscriber.setData(D2_URI_MAP_UPDATE_WITH_DATA1); + fixture._uriMapWildcardSubscriber.setData(CLUSTER_RESOURCE_NAME, D2_URI_MAP_UPDATE_WITH_DATA1); fixture._xdsClientImpl.handleResponse(badData); fixture.verifyAckOrNack(invalidData, 2); actualData = (D2URIMapUpdate) fixture._clusterSubscriber.getData(); Objects.requireNonNull(actualData); - if (invalidData) { + if (invalidData) + { verify(fixture._resourceWatcher).onChanged(eq(D2_URI_MAP_UPDATE_WITH_DATA1)); + verify(fixture._wildcardResourceWatcher, times(0)).onChanged(any(), any()); // bad data will not overwrite the original valid data Assert.assertEquals(actualData.getURIMap(), D2_URI_MAP_UPDATE_WITH_DATA1.getURIMap()); - } else { + } + else + { verify(fixture._resourceWatcher, times(2)).onChanged(eq(expectedUpdate)); + verify(fixture._wildcardResourceWatcher, times(2)).onChanged(eq(CLUSTER_RESOURCE_NAME), eq(expectedUpdate)); // But an empty cluster should clear the data Assert.assertEquals(actualData.getURIMap(), Collections.emptyMap()); + actualData = (D2URIMapUpdate) fixture._uriMapWildcardSubscriber.getData(CLUSTER_RESOURCE_NAME); + Assert.assertEquals(actualData.getURIMap(), Collections.emptyMap()); } verifyZeroInteractions(fixture._serverMetricsProvider); } @@ -318,10 +354,13 @@ public void testHandleD2URIMapResponseWithRemoval() { XdsClientImplFixture fixture = new XdsClientImplFixture(); fixture._clusterSubscriber.setData(D2_URI_MAP_UPDATE_WITH_DATA1); + fixture._uriMapWildcardSubscriber.setData(CLUSTER_RESOURCE_NAME, D2_URI_MAP_UPDATE_WITH_DATA1); fixture._xdsClientImpl.handleResponse(DISCOVERY_RESPONSE_URI_MAP_DATA_WITH_REMOVAL); fixture.verifyAckSent(1); verify(fixture._resourceWatcher).onChanged(eq(D2_URI_MAP_UPDATE_WITH_DATA1)); + verify(fixture._wildcardResourceWatcher).onRemoval(eq(CLUSTER_RESOURCE_NAME)); verify(fixture._clusterSubscriber).onRemoval(); + verify(fixture._uriMapWildcardSubscriber).onRemoval(eq(CLUSTER_RESOURCE_NAME)); verifyZeroInteractions(fixture._serverMetricsProvider); D2URIMapUpdate actualData = (D2URIMapUpdate) fixture._clusterSubscriber.getData(); // removed resource will not overwrite the original valid data @@ -344,16 +383,21 @@ public void testHandleD2URICollectionResponseWithData() fixture._xdsClientImpl.handleResponse(createUri1); fixture.verifyAckSent(1); verify(fixture._resourceWatcher).onChanged(eq(D2_URI_MAP_UPDATE_WITH_DATA1)); + verify(fixture._wildcardResourceWatcher).onChanged(eq(CLUSTER_RESOURCE_NAME), eq(D2_URI_MAP_UPDATE_WITH_DATA1)); verifyZeroInteractions(fixture._serverMetricsProvider); D2URIMapUpdate actualData = (D2URIMapUpdate) fixture._clusterSubscriber.getData(); // subscriber data should be updated to D2_URI_MAP_UPDATE_WITH_DATA1 Assert.assertEquals(Objects.requireNonNull(actualData).getURIMap(), D2_URI_MAP_UPDATE_WITH_DATA1.getURIMap()); + actualData = (D2URIMapUpdate) fixture._uriMapWildcardSubscriber.getData(CLUSTER_RESOURCE_NAME); + Assert.assertEquals(Objects.requireNonNull(actualData).getURIMap(), D2_URI_MAP_UPDATE_WITH_DATA1.getURIMap()); // subscriber original data is invalid, xds server latency won't be tracked fixture._clusterSubscriber.setData(new D2URIMapUpdate(null)); + fixture._uriMapWildcardSubscriber.setData(CLUSTER_RESOURCE_NAME, new D2URIMapUpdate(null)); fixture._xdsClientImpl.handleResponse(createUri1); fixture.verifyAckSent(2); verify(fixture._resourceWatcher, times(2)).onChanged(eq(D2_URI_MAP_UPDATE_WITH_DATA1)); + verify(fixture._wildcardResourceWatcher, times(2)).onChanged(eq(CLUSTER_RESOURCE_NAME), eq(D2_URI_MAP_UPDATE_WITH_DATA1)); verifyZeroInteractions(fixture._serverMetricsProvider); DiscoveryResponseData createUri2Delete1 = new DiscoveryResponseData(D2_URI, Collections.singletonList( @@ -368,9 +412,12 @@ public void testHandleD2URICollectionResponseWithData() // subscriber data should be updated to D2_URI_MAP_UPDATE_WITH_DATA2 D2URIMapUpdate expectedUpdate = new D2URIMapUpdate(Collections.singletonMap(URI2, D2URI_2)); verify(fixture._resourceWatcher).onChanged(eq(expectedUpdate)); + verify(fixture._wildcardResourceWatcher).onChanged(eq(CLUSTER_RESOURCE_NAME), eq(expectedUpdate)); // track latency only for updated/new uri (not for deletion) verify(fixture._serverMetricsProvider).trackLatency(anyLong()); Assert.assertEquals(actualData.getURIMap(), expectedUpdate.getURIMap()); + actualData = (D2URIMapUpdate) fixture._uriMapWildcardSubscriber.getData(CLUSTER_RESOURCE_NAME); + Assert.assertEquals(actualData.getURIMap(), expectedUpdate.getURIMap()); fixture.verifyAckSent(3); // Finally sanity check that the client correctly handles the deletion of the final URI in the collection @@ -381,8 +428,11 @@ public void testHandleD2URICollectionResponseWithData() // subscriber data should be updated to empty map expectedUpdate = new D2URIMapUpdate(Collections.emptyMap()); verify(fixture._resourceWatcher).onChanged(eq(expectedUpdate)); + verify(fixture._wildcardResourceWatcher).onChanged(eq(CLUSTER_RESOURCE_NAME), eq(expectedUpdate)); verifyNoMoreInteractions(fixture._serverMetricsProvider); Assert.assertEquals(actualData.getURIMap(), expectedUpdate.getURIMap()); + actualData = (D2URIMapUpdate) fixture._uriMapWildcardSubscriber.getData(CLUSTER_RESOURCE_NAME); + Assert.assertEquals(actualData.getURIMap(), expectedUpdate.getURIMap()); fixture.verifyAckSent(4); } @@ -412,16 +462,22 @@ public void testHandleD2URICollectionUpdateWithBadData() fixture._xdsClientImpl.handleResponse(badData); fixture.verifyNackSent(1); verify(fixture._resourceWatcher).onChanged(eq(D2_URI_MAP.emptyData())); + // The wildcard subscriber doesn't care about bad data, and simply treats it as the resource not existing + verify(fixture._wildcardResourceWatcher, times(0)).onChanged(any(), any()); verifyZeroInteractions(fixture._serverMetricsProvider); D2URIMapUpdate actualData = (D2URIMapUpdate) fixture._clusterSubscriber.getData(); Assert.assertNull(Objects.requireNonNull(actualData).getURIMap()); + actualData = (D2URIMapUpdate) fixture._uriMapWildcardSubscriber.getData(CLUSTER_RESOURCE_NAME); + Assert.assertNull(actualData); fixture._clusterSubscriber.setData(D2_URI_MAP_UPDATE_WITH_DATA1); + fixture._uriMapWildcardSubscriber.setData(CLUSTER_RESOURCE_NAME, D2_URI_MAP_UPDATE_WITH_DATA1); fixture._xdsClientImpl.handleResponse(badData); fixture.verifyNackSent(2); // Due to the way glob collection updates are handled, bad data is dropped rather than showing any visible side // effects other than NACKing the response. verify(fixture._resourceWatcher, times(0)).onChanged(eq(D2_URI_MAP_UPDATE_WITH_DATA1)); + verify(fixture._wildcardResourceWatcher, times(0)).onChanged(any(), any()); verifyZeroInteractions(fixture._serverMetricsProvider); } @@ -433,14 +489,50 @@ public void testHandleD2URICollectionResponseWithRemoval() XdsClientImplFixture fixture = new XdsClientImplFixture(); fixture._clusterSubscriber.setData(D2_URI_MAP_UPDATE_WITH_DATA1); + fixture._uriMapWildcardSubscriber.setData(CLUSTER_RESOURCE_NAME, D2_URI_MAP_UPDATE_WITH_DATA1); fixture._xdsClientImpl.handleResponse(removeClusterResponse); fixture.verifyAckSent(1); verify(fixture._resourceWatcher).onChanged(eq(D2_URI_MAP_UPDATE_WITH_DATA1)); + verify(fixture._wildcardResourceWatcher).onRemoval(eq(CLUSTER_RESOURCE_NAME)); verify(fixture._clusterSubscriber).onRemoval(); + verify(fixture._uriMapWildcardSubscriber).onRemoval(eq(CLUSTER_RESOURCE_NAME)); verifyZeroInteractions(fixture._serverMetricsProvider); D2URIMapUpdate actualData = (D2URIMapUpdate) fixture._clusterSubscriber.getData(); // removed resource will not overwrite the original valid data Assert.assertEquals(actualData.getURIMap(), D2_URI_MAP_UPDATE_WITH_DATA1.getURIMap()); + actualData = (D2URIMapUpdate) fixture._uriMapWildcardSubscriber.getData(CLUSTER_RESOURCE_NAME); + Assert.assertNull(actualData); + } + + @Test + public void testWildCardResourceSubscription() + { + XdsClientImplFixture fixture = new XdsClientImplFixture(); + + XdsClient.WildcardNodeResourceWatcher nodeWildCardWatcher = Mockito.mock(XdsClient.WildcardNodeResourceWatcher.class); + XdsClient.WildcardD2URIMapResourceWatcher uriMapWildCardWatcher = Mockito.mock(XdsClient.WildcardD2URIMapResourceWatcher.class); + fixture._xdsClientImpl.getWildcardResourceSubscriber(NODE).addWatcher(nodeWildCardWatcher); + fixture._xdsClientImpl.getWildcardResourceSubscriber(D2_URI_MAP).addWatcher(uriMapWildCardWatcher); + + // NODE resource added + fixture._xdsClientImpl.handleResponse(DISCOVERY_RESPONSE_NODE_DATA1); + fixture.verifyAckSent(1); + nodeWildCardWatcher.onChanged(eq(SERVICE_RESOURCE_NAME) , eq(NODE_UPDATE1)); + + // NODE resource removed + fixture._xdsClientImpl.handleResponse(DISCOVERY_RESPONSE_NODE_DATA_WITH_REMOVAL); + fixture.verifyAckSent(2); + nodeWildCardWatcher.onRemoval(eq(SERVICE_RESOURCE_NAME)); + + // URI_MAP resource added + fixture._xdsClientImpl.handleResponse(DISCOVERY_RESPONSE_URI_MAP_DATA1); + fixture.verifyAckSent(3); + uriMapWildCardWatcher.onChanged(eq(CLUSTER_RESOURCE_NAME), eq(D2_URI_MAP_UPDATE_WITH_DATA1)); + + // URI_MAP resource removed + fixture._xdsClientImpl.handleResponse(DISCOVERY_RESPONSE_URI_MAP_DATA_WITH_REMOVAL); + fixture.verifyAckSent(4); + uriMapWildCardWatcher.onRemoval(eq(CLUSTER_RESOURCE_NAME)); } @Test @@ -467,10 +559,16 @@ private static class XdsClientImplFixture XdsClientJmx _xdsClientJmx; ResourceSubscriber _nodeSubscriber; ResourceSubscriber _clusterSubscriber; + XdsClientImpl.WildcardResourceSubscriber _nodeWildcardSubscriber; + XdsClientImpl.WildcardResourceSubscriber _uriMapWildcardSubscriber; Map> _subscribers = new HashMap<>(); + Map _wildcardSubscribers = new HashMap<>(); + @Mock XdsClient.ResourceWatcher _resourceWatcher; @Mock + XdsClient.WildcardResourceWatcher _wildcardResourceWatcher; + @Mock XdsServerMetricsProvider _serverMetricsProvider; XdsClientImplFixture() @@ -483,6 +581,9 @@ private static class XdsClientImplFixture MockitoAnnotations.initMocks(this); _nodeSubscriber = spy(new ResourceSubscriber(NODE, SERVICE_RESOURCE_NAME, _xdsClientJmx)); _clusterSubscriber = spy(new ResourceSubscriber(D2_URI_MAP, CLUSTER_RESOURCE_NAME, _xdsClientJmx)); + _nodeWildcardSubscriber = spy(new XdsClientImpl.WildcardResourceSubscriber(NODE)); + _uriMapWildcardSubscriber = spy(new XdsClientImpl.WildcardResourceSubscriber(D2_URI_MAP)); + doNothing().when(_resourceWatcher).onChanged(any()); for (ResourceSubscriber subscriber : Lists.newArrayList(_nodeSubscriber, _clusterSubscriber)) @@ -490,7 +591,11 @@ private static class XdsClientImplFixture subscriber.addWatcher(_resourceWatcher); _subscribers.put(subscriber.getType(), Collections.singletonMap(subscriber.getResource(), subscriber)); } - + for (WildcardResourceSubscriber subscriber : Lists.newArrayList(_nodeWildcardSubscriber, _uriMapWildcardSubscriber)) + { + subscriber.addWatcher(_wildcardResourceWatcher); + _wildcardSubscribers.put(subscriber.getType(), subscriber); + } doNothing().when(_serverMetricsProvider).trackLatency(anyLong()); _xdsClientImpl = spy(new XdsClientImpl(null, null, null, 0, useGlobCollections, _serverMetricsProvider)); @@ -498,6 +603,10 @@ private static class XdsClientImplFixture when(_xdsClientImpl.getXdsClientJmx()).thenReturn(_xdsClientJmx); when(_xdsClientImpl.getResourceSubscriberMap(any())) .thenAnswer(a -> _subscribers.get((ResourceType) a.getArguments()[0])); + doNothing().when(_xdsClientImpl).watchAllXdsResources(any()); + when(_xdsClientImpl.getWildcardResourceSubscriber(any())) + .thenAnswer(a -> _wildcardSubscribers.get((ResourceType) a.getArguments()[0])); + } void verifyAckSent(int count) diff --git a/gradle.properties b/gradle.properties index dcf79f962f..16ac2d1310 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,4 +1,4 @@ -version=29.58.8 +version=29.58.9 group=com.linkedin.pegasus org.gradle.configureondemand=true org.gradle.parallel=true diff --git a/test-util/src/main/java/com/linkedin/test/util/retry/TenRetries.java b/test-util/src/main/java/com/linkedin/test/util/retry/TenRetries.java new file mode 100644 index 0000000000..415d2d5c42 --- /dev/null +++ b/test-util/src/main/java/com/linkedin/test/util/retry/TenRetries.java @@ -0,0 +1,29 @@ +/* + Copyright (c) 2020 LinkedIn Corp. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package com.linkedin.test.util.retry; + + +/** + * Allows ten retries for a given test method. This is useful for tests that are especially flaky. + */ +public class TenRetries extends Retries +{ + public TenRetries() + { + super(10); + } +}