From 7a55cea913f3097f33638ecef9dd3e2307aa83a0 Mon Sep 17 00:00:00 2001 From: PapaCharlie Date: Tue, 17 Sep 2024 18:32:06 -0700 Subject: [PATCH] Fix invalid handling of glob collections for wildcard subscribers As the title says, things weren't wired through correctly for glob collections, namely they were being ignored because they were triggering the check for whtehre the cluster is being watched at all. This fixes that (and the fact that the initial subscription did not respect the `useGlobCollections` flag). This change has been unit tested and tested through in restli-resource-explorer, and it works for both glob and non-glob. --- .../com/linkedin/d2/xds/XdsClientImpl.java | 55 +++++++++---- .../linkedin/d2/xds/TestXdsClientImpl.java | 79 +++++++++++++++++-- 2 files changed, 110 insertions(+), 24 deletions(-) diff --git a/d2/src/main/java/com/linkedin/d2/xds/XdsClientImpl.java b/d2/src/main/java/com/linkedin/d2/xds/XdsClientImpl.java index e53ed30235..05e4a922a3 100644 --- a/d2/src/main/java/com/linkedin/d2/xds/XdsClientImpl.java +++ b/d2/src/main/java/com/linkedin/d2/xds/XdsClientImpl.java @@ -42,7 +42,6 @@ import io.grpc.stub.ClientResponseObserver; import io.grpc.stub.StreamObserver; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -57,6 +56,7 @@ import java.util.function.BiConsumer; import java.util.function.Function; import java.util.stream.Collectors; +import java.util.stream.Stream; import javax.annotation.Nonnull; import javax.annotation.Nullable; import org.slf4j.Logger; @@ -73,9 +73,14 @@ public class XdsClientImpl extends XdsClient new RateLimitedLogger(_log, TimeUnit.MINUTES.toMillis(10), SystemClock.instance()); public static final long DEFAULT_READY_TIMEOUT_MILLIS = 2000L; + /** + * The resource subscriber map stores the subscribers to specific resources of a given type. Note that it only has 2 + * keys: {@link ResourceType#D2_URI_MAP} and {@link ResourceType#NODE}. The {@link ResourceType#D2_URI} is absent from + * this map because it should not be used as a key, as glob collection updates are translated to appear as normal map + * updates to subscribers. + */ private final Map> _resourceSubscribers = Maps.immutableEnumMap( - Arrays.stream(ResourceType.values()) - .filter(e -> e.typeUrl() != null) + Stream.of(ResourceType.NODE, ResourceType.D2_URI_MAP) .collect(Collectors.toMap(Function.identity(), e -> new HashMap<>()))); private final Map _wildcardSubscribers = Maps.newEnumMap(ResourceType.class); private final Node _node; @@ -174,20 +179,26 @@ public void watchAllXdsResources(WildcardResourceWatcher watcher) { _executorService.execute(() -> { - _log.info("Subscribing to wildcard for resource type: {}", watcher.getType()); WildcardResourceSubscriber subscriber = getWildcardResourceSubscriber(watcher.getType()); if (subscriber == null) { subscriber = new WildcardResourceSubscriber(watcher.getType()); _wildcardSubscribers.put(watcher.getType(), subscriber); + ResourceType adjustedType = + (watcher.getType() == ResourceType.D2_URI_MAP && _subscribeToUriGlobCollection) + ? ResourceType.D2_URI + : watcher.getType(); + + _log.info("Subscribing to wildcard for resource type: {}", adjustedType); + if (_adsStream == null && !isInBackoff()) { startRpcStreamLocal(); } if (_adsStream != null) { - _adsStream.sendDiscoveryRequest(watcher.getType(), Collections.singletonList("*")); + _adsStream.sendDiscoveryRequest(adjustedType, Collections.singletonList("*")); } } @@ -417,7 +428,8 @@ private void handleD2URICollectionResponse(DiscoveryResponseData data) ResourceSubscriber subscriber = getResourceSubscriberMap(ResourceType.D2_URI_MAP).get(uriId.getClusterResourceName()); - if (subscriber == null) + WildcardResourceSubscriber wildcardSubscriber = getWildcardResourceSubscriber(ResourceType.D2_URI_MAP); + if (subscriber == null && wildcardSubscriber == null) { String msg = String.format("Ignoring D2URI resource update for untracked cluster: %s", resourceName); _log.warn(msg); @@ -428,7 +440,17 @@ private void handleD2URICollectionResponse(DiscoveryResponseData data) // Get or create a new D2URIMapUpdate which is a copy of the existing data for that cluster. D2URIMapUpdate update = updates.computeIfAbsent(uriId.getClusterResourceName(), k -> { - D2URIMapUpdate currentData = (D2URIMapUpdate) subscriber._data; + D2URIMapUpdate currentData; + // Use the existing data from whichever subscriber is present. If both are present, they will point to the same + // D2URIMapUpdate. + if (subscriber != null) + { + currentData = (D2URIMapUpdate) subscriber._data; + } + else + { + currentData = (D2URIMapUpdate) wildcardSubscriber._data.get(uriId.getClusterResourceName()); + } if (currentData == null || !currentData.isValid()) { return new D2URIMapUpdate(null); @@ -498,7 +520,7 @@ private void handleResourceUpdate(Map updates, { subscriber.onData(entry.getValue(), _serverMetricsProvider); } - WildcardResourceSubscriber wildcardSubscriber = _wildcardSubscribers.get(type); + WildcardResourceSubscriber wildcardSubscriber = getWildcardResourceSubscriber(type); if (wildcardSubscriber != null) { wildcardSubscriber.onData(entry.getKey(), entry.getValue()); @@ -522,7 +544,7 @@ private void handleResourceRemoval(Collection removedResources, Resource subscriber.onRemoval(); } } - WildcardResourceSubscriber wildcardSubscriber = _wildcardSubscribers.get(type); + WildcardResourceSubscriber wildcardSubscriber = getWildcardResourceSubscriber(type); if (wildcardSubscriber != null) { removedResources.forEach(wildcardSubscriber::onRemoval); @@ -745,18 +767,18 @@ static class WildcardResourceSubscriber { private final ResourceType _type; private final Set _watchers = new HashSet<>(); - private Map _data = new HashMap<>(); + private final Map _data = new HashMap<>(); @VisibleForTesting - public Map getData() + public ResourceUpdate getData(String resourceName) { - return _data; + return _data.get(resourceName); } @VisibleForTesting - public void setData(@Nullable Map data) + public void setData(String resourceName, ResourceUpdate data) { - _data = data; + _data.put(resourceName, data); } WildcardResourceSubscriber(ResourceType type) @@ -842,7 +864,7 @@ final class RpcRetryTask implements Runnable public void run() { startRpcStreamLocal(); - for (ResourceType type : ResourceType.values()) + for (ResourceType type : _resourceSubscribers.keySet()) { Set resources = new HashSet<>(getResourceSubscriberMap(type).keySet()); if (resources.isEmpty() && getWildcardResourceSubscriber(type) == null) @@ -1149,8 +1171,7 @@ private void handleRpcStreamClosed(Status error) { return; } - _log.error("ADS stream closed with status {}: {}. Cause: {}", error.getCode(), error.getDescription(), - error.getCause()); + _log.error("ADS stream closed with status {}: {}", error.getCode(), error.getDescription(), error.getCause()); _closed = true; notifyStreamError(error); cleanUp(); diff --git a/d2/src/test/java/com/linkedin/d2/xds/TestXdsClientImpl.java b/d2/src/test/java/com/linkedin/d2/xds/TestXdsClientImpl.java index 6c89dbabc1..76457deef3 100644 --- a/d2/src/test/java/com/linkedin/d2/xds/TestXdsClientImpl.java +++ b/d2/src/test/java/com/linkedin/d2/xds/TestXdsClientImpl.java @@ -9,6 +9,7 @@ import com.linkedin.d2.xds.XdsClient.ResourceType; import com.linkedin.d2.xds.XdsClientImpl.DiscoveryResponseData; import com.linkedin.d2.xds.XdsClientImpl.ResourceSubscriber; +import com.linkedin.d2.xds.XdsClientImpl.WildcardResourceSubscriber; import indis.XdsD2; import io.envoyproxy.envoy.service.discovery.v3.Resource; import java.util.Collections; @@ -159,24 +160,33 @@ public void testHandleD2NodeResponseWithData() fixture._xdsClientImpl.handleResponse(DISCOVERY_RESPONSE_NODE_DATA1); fixture.verifyAckSent(1); verify(fixture._resourceWatcher).onChanged(eq(NODE_UPDATE1)); + verify(fixture._wildcardResourceWatcher).onChanged(eq(SERVICE_RESOURCE_NAME), eq(NODE_UPDATE1)); verifyZeroInteractions(fixture._serverMetricsProvider); // initial update should not track latency XdsClient.NodeUpdate actualData = (XdsClient.NodeUpdate) fixture._nodeSubscriber.getData(); // subscriber data should be updated to NODE_UPDATE1 Assert.assertEquals(Objects.requireNonNull(actualData).getNodeData(), NODE_UPDATE1.getNodeData()); + actualData = (XdsClient.NodeUpdate) fixture._nodeWildcardSubscriber.getData(SERVICE_RESOURCE_NAME); + // subscriber data should be updated to NODE_UPDATE1 + Assert.assertEquals(Objects.requireNonNull(actualData).getNodeData(), NODE_UPDATE1.getNodeData()); // subscriber original data is invalid, xds server latency won't be tracked fixture._nodeSubscriber.setData(new XdsClient.NodeUpdate(null)); + fixture._nodeWildcardSubscriber.setData(SERVICE_RESOURCE_NAME, new XdsClient.NodeUpdate(null)); fixture._xdsClientImpl.handleResponse(DISCOVERY_RESPONSE_NODE_DATA1); fixture.verifyAckSent(2); verify(fixture._resourceWatcher, times(2)).onChanged(eq(NODE_UPDATE1)); + verify(fixture._wildcardResourceWatcher, times(2)).onChanged(eq(SERVICE_RESOURCE_NAME), eq(NODE_UPDATE1)); verifyZeroInteractions(fixture._serverMetricsProvider); // subscriber data should be updated to NODE_UPDATE2 fixture._xdsClientImpl.handleResponse(DISCOVERY_RESPONSE_NODE_DATA2); actualData = (XdsClient.NodeUpdate) fixture._nodeSubscriber.getData(); verify(fixture._resourceWatcher).onChanged(eq(NODE_UPDATE2)); + verify(fixture._wildcardResourceWatcher).onChanged(eq(SERVICE_RESOURCE_NAME), eq(NODE_UPDATE2)); verify(fixture._serverMetricsProvider).trackLatency(anyLong()); Assert.assertEquals(actualData.getNodeData(), NODE_UPDATE2.getNodeData()); + actualData = (XdsClient.NodeUpdate) fixture._nodeWildcardSubscriber.getData(SERVICE_RESOURCE_NAME); + Assert.assertEquals(actualData.getNodeData(), NODE_UPDATE2.getNodeData()); } @Test @@ -204,6 +214,8 @@ public void testHandleD2NodeUpdateWithBadData(DiscoveryResponseData badData, boo fixture._xdsClientImpl.handleResponse(badData); fixture.verifyAckOrNack(nackExpected, 1); verify(fixture._resourceWatcher).onChanged(eq(NODE.emptyData())); + // The wildcard subscriber doesn't care about bad data, it doesn't need to notify the watcher + verify(fixture._wildcardResourceWatcher, times(0)).onChanged(any(), any()); XdsClient.NodeUpdate actualData = (XdsClient.NodeUpdate) fixture._nodeSubscriber.getData(); Assert.assertNull(Objects.requireNonNull(actualData).getNodeData()); @@ -211,6 +223,7 @@ public void testHandleD2NodeUpdateWithBadData(DiscoveryResponseData badData, boo fixture._xdsClientImpl.handleResponse(badData); fixture.verifyAckOrNack(nackExpected, 2); verify(fixture._resourceWatcher).onChanged(eq(NODE_UPDATE1)); + verify(fixture._wildcardResourceWatcher, times(0)).onChanged(any(), any()); actualData = (XdsClient.NodeUpdate) fixture._nodeSubscriber.getData(); // bad data will not overwrite the original valid data Assert.assertEquals(actualData.getNodeData(), NODE_UPDATE1.getNodeData()); @@ -221,10 +234,13 @@ public void testHandleD2NodeResponseWithRemoval() { XdsClientImplFixture fixture = new XdsClientImplFixture(); fixture._nodeSubscriber.setData(NODE_UPDATE1); + fixture._nodeWildcardSubscriber.setData(SERVICE_RESOURCE_NAME, NODE_UPDATE1); fixture._xdsClientImpl.handleResponse(DISCOVERY_RESPONSE_NODE_DATA_WITH_REMOVAL); fixture.verifyAckSent(1); verify(fixture._resourceWatcher).onChanged(eq(NODE_UPDATE1)); + verify(fixture._wildcardResourceWatcher).onRemoval(eq(SERVICE_RESOURCE_NAME)); verify(fixture._nodeSubscriber).onRemoval(); + verify(fixture._nodeWildcardSubscriber).onRemoval(eq(SERVICE_RESOURCE_NAME)); XdsClient.NodeUpdate actualData = (XdsClient.NodeUpdate) fixture._nodeSubscriber.getData(); // removed resource will not overwrite the original valid data Assert.assertEquals(Objects.requireNonNull(actualData).getNodeData(), NODE_UPDATE1.getNodeData()); @@ -239,15 +255,20 @@ public void testHandleD2URIMapResponseWithData() fixture._xdsClientImpl.handleResponse(DISCOVERY_RESPONSE_URI_MAP_DATA1); fixture.verifyAckSent(1); verify(fixture._resourceWatcher).onChanged(eq(D2_URI_MAP_UPDATE_WITH_DATA1)); + verify(fixture._wildcardResourceWatcher).onChanged(eq(CLUSTER_RESOURCE_NAME), eq(D2_URI_MAP_UPDATE_WITH_DATA1)); verifyZeroInteractions(fixture._serverMetricsProvider); D2URIMapUpdate actualData = (D2URIMapUpdate) fixture._clusterSubscriber.getData(); // subscriber data should be updated to D2_URI_MAP_UPDATE_WITH_DATA1 Assert.assertEquals(Objects.requireNonNull(actualData).getURIMap(), D2_URI_MAP_UPDATE_WITH_DATA1.getURIMap()); + actualData = (D2URIMapUpdate) fixture._uriMapWildcardSubscriber.getData(CLUSTER_RESOURCE_NAME); + Assert.assertEquals(Objects.requireNonNull(actualData).getURIMap(), D2_URI_MAP_UPDATE_WITH_DATA1.getURIMap()); // subscriber original data is invalid, xds server latency won't be tracked fixture._clusterSubscriber.setData(new XdsClient.D2URIMapUpdate(null)); + fixture._uriMapWildcardSubscriber.setData(CLUSTER_RESOURCE_NAME, new XdsClient.D2URIMapUpdate(null)); fixture._xdsClientImpl.handleResponse(DISCOVERY_RESPONSE_URI_MAP_DATA1); verify(fixture._resourceWatcher, times(2)).onChanged(eq(D2_URI_MAP_UPDATE_WITH_DATA1)); + verify(fixture._wildcardResourceWatcher, times(2)).onChanged(eq(CLUSTER_RESOURCE_NAME), eq(D2_URI_MAP_UPDATE_WITH_DATA1)); verifyZeroInteractions(fixture._serverMetricsProvider); fixture.verifyAckSent(2); @@ -257,6 +278,8 @@ public void testHandleD2URIMapResponseWithData() verify(fixture._resourceWatcher).onChanged(eq(D2_URI_MAP_UPDATE_WITH_DATA2)); verify(fixture._serverMetricsProvider, times(2)).trackLatency(anyLong()); Assert.assertEquals(actualData.getURIMap(), D2_URI_MAP_UPDATE_WITH_DATA2.getURIMap()); + actualData = (D2URIMapUpdate) fixture._uriMapWildcardSubscriber.getData(CLUSTER_RESOURCE_NAME); + Assert.assertEquals(actualData.getURIMap(), D2_URI_MAP_UPDATE_WITH_DATA2.getURIMap()); fixture.verifyAckSent(3); } @@ -291,24 +314,37 @@ public void testHandleD2URIMapUpdateWithBadData(DiscoveryResponseData badData, b ? (D2URIMapUpdate) D2_URI_MAP.emptyData() : new D2URIMapUpdate(Collections.emptyMap()); verify(fixture._resourceWatcher).onChanged(eq(expectedUpdate)); + if (!invalidData) + { + verify(fixture._wildcardResourceWatcher).onChanged(eq(CLUSTER_RESOURCE_NAME), eq(expectedUpdate)); + } verify(fixture._clusterSubscriber).setData(eq(null)); + verify(fixture._uriMapWildcardSubscriber, times(0)).setData(any(), any()); verifyZeroInteractions(fixture._serverMetricsProvider); D2URIMapUpdate actualData = (D2URIMapUpdate) fixture._clusterSubscriber.getData(); Assert.assertEquals(actualData, expectedUpdate); fixture._clusterSubscriber.setData(D2_URI_MAP_UPDATE_WITH_DATA1); + fixture._uriMapWildcardSubscriber.setData(CLUSTER_RESOURCE_NAME, D2_URI_MAP_UPDATE_WITH_DATA1); fixture._xdsClientImpl.handleResponse(badData); fixture.verifyAckOrNack(invalidData, 2); actualData = (D2URIMapUpdate) fixture._clusterSubscriber.getData(); Objects.requireNonNull(actualData); - if (invalidData) { + if (invalidData) + { verify(fixture._resourceWatcher).onChanged(eq(D2_URI_MAP_UPDATE_WITH_DATA1)); + verify(fixture._wildcardResourceWatcher, times(0)).onChanged(any(), any()); // bad data will not overwrite the original valid data Assert.assertEquals(actualData.getURIMap(), D2_URI_MAP_UPDATE_WITH_DATA1.getURIMap()); - } else { + } + else + { verify(fixture._resourceWatcher, times(2)).onChanged(eq(expectedUpdate)); + verify(fixture._wildcardResourceWatcher, times(2)).onChanged(eq(CLUSTER_RESOURCE_NAME), eq(expectedUpdate)); // But an empty cluster should clear the data Assert.assertEquals(actualData.getURIMap(), Collections.emptyMap()); + actualData = (D2URIMapUpdate) fixture._uriMapWildcardSubscriber.getData(CLUSTER_RESOURCE_NAME); + Assert.assertEquals(actualData.getURIMap(), Collections.emptyMap()); } verifyZeroInteractions(fixture._serverMetricsProvider); } @@ -318,10 +354,13 @@ public void testHandleD2URIMapResponseWithRemoval() { XdsClientImplFixture fixture = new XdsClientImplFixture(); fixture._clusterSubscriber.setData(D2_URI_MAP_UPDATE_WITH_DATA1); + fixture._uriMapWildcardSubscriber.setData(CLUSTER_RESOURCE_NAME, D2_URI_MAP_UPDATE_WITH_DATA1); fixture._xdsClientImpl.handleResponse(DISCOVERY_RESPONSE_URI_MAP_DATA_WITH_REMOVAL); fixture.verifyAckSent(1); verify(fixture._resourceWatcher).onChanged(eq(D2_URI_MAP_UPDATE_WITH_DATA1)); + verify(fixture._wildcardResourceWatcher).onRemoval(eq(CLUSTER_RESOURCE_NAME)); verify(fixture._clusterSubscriber).onRemoval(); + verify(fixture._uriMapWildcardSubscriber).onRemoval(eq(CLUSTER_RESOURCE_NAME)); verifyZeroInteractions(fixture._serverMetricsProvider); D2URIMapUpdate actualData = (D2URIMapUpdate) fixture._clusterSubscriber.getData(); // removed resource will not overwrite the original valid data @@ -344,16 +383,21 @@ public void testHandleD2URICollectionResponseWithData() fixture._xdsClientImpl.handleResponse(createUri1); fixture.verifyAckSent(1); verify(fixture._resourceWatcher).onChanged(eq(D2_URI_MAP_UPDATE_WITH_DATA1)); + verify(fixture._wildcardResourceWatcher).onChanged(eq(CLUSTER_RESOURCE_NAME), eq(D2_URI_MAP_UPDATE_WITH_DATA1)); verifyZeroInteractions(fixture._serverMetricsProvider); D2URIMapUpdate actualData = (D2URIMapUpdate) fixture._clusterSubscriber.getData(); // subscriber data should be updated to D2_URI_MAP_UPDATE_WITH_DATA1 Assert.assertEquals(Objects.requireNonNull(actualData).getURIMap(), D2_URI_MAP_UPDATE_WITH_DATA1.getURIMap()); + actualData = (D2URIMapUpdate) fixture._uriMapWildcardSubscriber.getData(CLUSTER_RESOURCE_NAME); + Assert.assertEquals(Objects.requireNonNull(actualData).getURIMap(), D2_URI_MAP_UPDATE_WITH_DATA1.getURIMap()); // subscriber original data is invalid, xds server latency won't be tracked fixture._clusterSubscriber.setData(new D2URIMapUpdate(null)); + fixture._uriMapWildcardSubscriber.setData(CLUSTER_RESOURCE_NAME, new D2URIMapUpdate(null)); fixture._xdsClientImpl.handleResponse(createUri1); fixture.verifyAckSent(2); verify(fixture._resourceWatcher, times(2)).onChanged(eq(D2_URI_MAP_UPDATE_WITH_DATA1)); + verify(fixture._wildcardResourceWatcher, times(2)).onChanged(eq(CLUSTER_RESOURCE_NAME), eq(D2_URI_MAP_UPDATE_WITH_DATA1)); verifyZeroInteractions(fixture._serverMetricsProvider); DiscoveryResponseData createUri2Delete1 = new DiscoveryResponseData(D2_URI, Collections.singletonList( @@ -368,9 +412,12 @@ public void testHandleD2URICollectionResponseWithData() // subscriber data should be updated to D2_URI_MAP_UPDATE_WITH_DATA2 D2URIMapUpdate expectedUpdate = new D2URIMapUpdate(Collections.singletonMap(URI2, D2URI_2)); verify(fixture._resourceWatcher).onChanged(eq(expectedUpdate)); + verify(fixture._wildcardResourceWatcher).onChanged(eq(CLUSTER_RESOURCE_NAME), eq(expectedUpdate)); // track latency only for updated/new uri (not for deletion) verify(fixture._serverMetricsProvider).trackLatency(anyLong()); Assert.assertEquals(actualData.getURIMap(), expectedUpdate.getURIMap()); + actualData = (D2URIMapUpdate) fixture._uriMapWildcardSubscriber.getData(CLUSTER_RESOURCE_NAME); + Assert.assertEquals(actualData.getURIMap(), expectedUpdate.getURIMap()); fixture.verifyAckSent(3); // Finally sanity check that the client correctly handles the deletion of the final URI in the collection @@ -381,8 +428,11 @@ public void testHandleD2URICollectionResponseWithData() // subscriber data should be updated to empty map expectedUpdate = new D2URIMapUpdate(Collections.emptyMap()); verify(fixture._resourceWatcher).onChanged(eq(expectedUpdate)); + verify(fixture._wildcardResourceWatcher).onChanged(eq(CLUSTER_RESOURCE_NAME), eq(expectedUpdate)); verifyNoMoreInteractions(fixture._serverMetricsProvider); Assert.assertEquals(actualData.getURIMap(), expectedUpdate.getURIMap()); + actualData = (D2URIMapUpdate) fixture._uriMapWildcardSubscriber.getData(CLUSTER_RESOURCE_NAME); + Assert.assertEquals(actualData.getURIMap(), expectedUpdate.getURIMap()); fixture.verifyAckSent(4); } @@ -412,16 +462,22 @@ public void testHandleD2URICollectionUpdateWithBadData() fixture._xdsClientImpl.handleResponse(badData); fixture.verifyNackSent(1); verify(fixture._resourceWatcher).onChanged(eq(D2_URI_MAP.emptyData())); + // The wildcard subscriber doesn't care about bad data, and simply treats it as the resource not existing + verify(fixture._wildcardResourceWatcher, times(0)).onChanged(any(), any()); verifyZeroInteractions(fixture._serverMetricsProvider); D2URIMapUpdate actualData = (D2URIMapUpdate) fixture._clusterSubscriber.getData(); Assert.assertNull(Objects.requireNonNull(actualData).getURIMap()); + actualData = (D2URIMapUpdate) fixture._uriMapWildcardSubscriber.getData(CLUSTER_RESOURCE_NAME); + Assert.assertNull(actualData); fixture._clusterSubscriber.setData(D2_URI_MAP_UPDATE_WITH_DATA1); + fixture._uriMapWildcardSubscriber.setData(CLUSTER_RESOURCE_NAME, D2_URI_MAP_UPDATE_WITH_DATA1); fixture._xdsClientImpl.handleResponse(badData); fixture.verifyNackSent(2); // Due to the way glob collection updates are handled, bad data is dropped rather than showing any visible side // effects other than NACKing the response. verify(fixture._resourceWatcher, times(0)).onChanged(eq(D2_URI_MAP_UPDATE_WITH_DATA1)); + verify(fixture._wildcardResourceWatcher, times(0)).onChanged(any(), any()); verifyZeroInteractions(fixture._serverMetricsProvider); } @@ -433,14 +489,19 @@ public void testHandleD2URICollectionResponseWithRemoval() XdsClientImplFixture fixture = new XdsClientImplFixture(); fixture._clusterSubscriber.setData(D2_URI_MAP_UPDATE_WITH_DATA1); + fixture._uriMapWildcardSubscriber.setData(CLUSTER_RESOURCE_NAME, D2_URI_MAP_UPDATE_WITH_DATA1); fixture._xdsClientImpl.handleResponse(removeClusterResponse); fixture.verifyAckSent(1); verify(fixture._resourceWatcher).onChanged(eq(D2_URI_MAP_UPDATE_WITH_DATA1)); + verify(fixture._wildcardResourceWatcher).onRemoval(eq(CLUSTER_RESOURCE_NAME)); verify(fixture._clusterSubscriber).onRemoval(); + verify(fixture._uriMapWildcardSubscriber).onRemoval(eq(CLUSTER_RESOURCE_NAME)); verifyZeroInteractions(fixture._serverMetricsProvider); D2URIMapUpdate actualData = (D2URIMapUpdate) fixture._clusterSubscriber.getData(); // removed resource will not overwrite the original valid data Assert.assertEquals(actualData.getURIMap(), D2_URI_MAP_UPDATE_WITH_DATA1.getURIMap()); + actualData = (D2URIMapUpdate) fixture._uriMapWildcardSubscriber.getData(CLUSTER_RESOURCE_NAME); + Assert.assertNull(actualData); } @Test @@ -506,6 +567,8 @@ private static class XdsClientImplFixture @Mock XdsClient.ResourceWatcher _resourceWatcher; @Mock + XdsClient.WildcardResourceWatcher _wildcardResourceWatcher; + @Mock XdsServerMetricsProvider _serverMetricsProvider; XdsClientImplFixture() @@ -518,9 +581,8 @@ private static class XdsClientImplFixture MockitoAnnotations.initMocks(this); _nodeSubscriber = spy(new ResourceSubscriber(NODE, SERVICE_RESOURCE_NAME, _xdsClientJmx)); _clusterSubscriber = spy(new ResourceSubscriber(D2_URI_MAP, CLUSTER_RESOURCE_NAME, _xdsClientJmx)); - _nodeWildcardSubscriber = new XdsClientImpl.WildcardResourceSubscriber(NODE); - _uriMapWildcardSubscriber = new XdsClientImpl.WildcardResourceSubscriber(D2_URI_MAP); - + _nodeWildcardSubscriber = spy(new XdsClientImpl.WildcardResourceSubscriber(NODE)); + _uriMapWildcardSubscriber = spy(new XdsClientImpl.WildcardResourceSubscriber(D2_URI_MAP)); doNothing().when(_resourceWatcher).onChanged(any()); @@ -529,8 +591,11 @@ private static class XdsClientImplFixture subscriber.addWatcher(_resourceWatcher); _subscribers.put(subscriber.getType(), Collections.singletonMap(subscriber.getResource(), subscriber)); } - _wildcardSubscribers.put(NODE, _nodeWildcardSubscriber); - _wildcardSubscribers.put(D2_URI_MAP, _uriMapWildcardSubscriber); + for (WildcardResourceSubscriber subscriber : Lists.newArrayList(_nodeWildcardSubscriber, _uriMapWildcardSubscriber)) + { + subscriber.addWatcher(_wildcardResourceWatcher); + _wildcardSubscribers.put(subscriber.getType(), subscriber); + } doNothing().when(_serverMetricsProvider).trackLatency(anyLong()); _xdsClientImpl = spy(new XdsClientImpl(null, null, null, 0, useGlobCollections, _serverMetricsProvider));