diff --git a/d2/src/main/java/com/linkedin/d2/xds/XdsClientImpl.java b/d2/src/main/java/com/linkedin/d2/xds/XdsClientImpl.java index e53ed30235..05e4a922a3 100644 --- a/d2/src/main/java/com/linkedin/d2/xds/XdsClientImpl.java +++ b/d2/src/main/java/com/linkedin/d2/xds/XdsClientImpl.java @@ -42,7 +42,6 @@ import io.grpc.stub.ClientResponseObserver; import io.grpc.stub.StreamObserver; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -57,6 +56,7 @@ import java.util.function.BiConsumer; import java.util.function.Function; import java.util.stream.Collectors; +import java.util.stream.Stream; import javax.annotation.Nonnull; import javax.annotation.Nullable; import org.slf4j.Logger; @@ -73,9 +73,14 @@ public class XdsClientImpl extends XdsClient new RateLimitedLogger(_log, TimeUnit.MINUTES.toMillis(10), SystemClock.instance()); public static final long DEFAULT_READY_TIMEOUT_MILLIS = 2000L; + /** + * The resource subscriber map stores the subscribers to specific resources of a given type. Note that it only has 2 + * keys: {@link ResourceType#D2_URI_MAP} and {@link ResourceType#NODE}. The {@link ResourceType#D2_URI} is absent from + * this map because it should not be used as a key, as glob collection updates are translated to appear as normal map + * updates to subscribers. + */ private final Map> _resourceSubscribers = Maps.immutableEnumMap( - Arrays.stream(ResourceType.values()) - .filter(e -> e.typeUrl() != null) + Stream.of(ResourceType.NODE, ResourceType.D2_URI_MAP) .collect(Collectors.toMap(Function.identity(), e -> new HashMap<>()))); private final Map _wildcardSubscribers = Maps.newEnumMap(ResourceType.class); private final Node _node; @@ -174,20 +179,26 @@ public void watchAllXdsResources(WildcardResourceWatcher watcher) { _executorService.execute(() -> { - _log.info("Subscribing to wildcard for resource type: {}", watcher.getType()); WildcardResourceSubscriber subscriber = getWildcardResourceSubscriber(watcher.getType()); if (subscriber == null) { subscriber = new WildcardResourceSubscriber(watcher.getType()); _wildcardSubscribers.put(watcher.getType(), subscriber); + ResourceType adjustedType = + (watcher.getType() == ResourceType.D2_URI_MAP && _subscribeToUriGlobCollection) + ? ResourceType.D2_URI + : watcher.getType(); + + _log.info("Subscribing to wildcard for resource type: {}", adjustedType); + if (_adsStream == null && !isInBackoff()) { startRpcStreamLocal(); } if (_adsStream != null) { - _adsStream.sendDiscoveryRequest(watcher.getType(), Collections.singletonList("*")); + _adsStream.sendDiscoveryRequest(adjustedType, Collections.singletonList("*")); } } @@ -417,7 +428,8 @@ private void handleD2URICollectionResponse(DiscoveryResponseData data) ResourceSubscriber subscriber = getResourceSubscriberMap(ResourceType.D2_URI_MAP).get(uriId.getClusterResourceName()); - if (subscriber == null) + WildcardResourceSubscriber wildcardSubscriber = getWildcardResourceSubscriber(ResourceType.D2_URI_MAP); + if (subscriber == null && wildcardSubscriber == null) { String msg = String.format("Ignoring D2URI resource update for untracked cluster: %s", resourceName); _log.warn(msg); @@ -428,7 +440,17 @@ private void handleD2URICollectionResponse(DiscoveryResponseData data) // Get or create a new D2URIMapUpdate which is a copy of the existing data for that cluster. D2URIMapUpdate update = updates.computeIfAbsent(uriId.getClusterResourceName(), k -> { - D2URIMapUpdate currentData = (D2URIMapUpdate) subscriber._data; + D2URIMapUpdate currentData; + // Use the existing data from whichever subscriber is present. If both are present, they will point to the same + // D2URIMapUpdate. + if (subscriber != null) + { + currentData = (D2URIMapUpdate) subscriber._data; + } + else + { + currentData = (D2URIMapUpdate) wildcardSubscriber._data.get(uriId.getClusterResourceName()); + } if (currentData == null || !currentData.isValid()) { return new D2URIMapUpdate(null); @@ -498,7 +520,7 @@ private void handleResourceUpdate(Map updates, { subscriber.onData(entry.getValue(), _serverMetricsProvider); } - WildcardResourceSubscriber wildcardSubscriber = _wildcardSubscribers.get(type); + WildcardResourceSubscriber wildcardSubscriber = getWildcardResourceSubscriber(type); if (wildcardSubscriber != null) { wildcardSubscriber.onData(entry.getKey(), entry.getValue()); @@ -522,7 +544,7 @@ private void handleResourceRemoval(Collection removedResources, Resource subscriber.onRemoval(); } } - WildcardResourceSubscriber wildcardSubscriber = _wildcardSubscribers.get(type); + WildcardResourceSubscriber wildcardSubscriber = getWildcardResourceSubscriber(type); if (wildcardSubscriber != null) { removedResources.forEach(wildcardSubscriber::onRemoval); @@ -745,18 +767,18 @@ static class WildcardResourceSubscriber { private final ResourceType _type; private final Set _watchers = new HashSet<>(); - private Map _data = new HashMap<>(); + private final Map _data = new HashMap<>(); @VisibleForTesting - public Map getData() + public ResourceUpdate getData(String resourceName) { - return _data; + return _data.get(resourceName); } @VisibleForTesting - public void setData(@Nullable Map data) + public void setData(String resourceName, ResourceUpdate data) { - _data = data; + _data.put(resourceName, data); } WildcardResourceSubscriber(ResourceType type) @@ -842,7 +864,7 @@ final class RpcRetryTask implements Runnable public void run() { startRpcStreamLocal(); - for (ResourceType type : ResourceType.values()) + for (ResourceType type : _resourceSubscribers.keySet()) { Set resources = new HashSet<>(getResourceSubscriberMap(type).keySet()); if (resources.isEmpty() && getWildcardResourceSubscriber(type) == null) @@ -1149,8 +1171,7 @@ private void handleRpcStreamClosed(Status error) { return; } - _log.error("ADS stream closed with status {}: {}. Cause: {}", error.getCode(), error.getDescription(), - error.getCause()); + _log.error("ADS stream closed with status {}: {}", error.getCode(), error.getDescription(), error.getCause()); _closed = true; notifyStreamError(error); cleanUp(); diff --git a/d2/src/test/java/com/linkedin/d2/xds/TestXdsClientImpl.java b/d2/src/test/java/com/linkedin/d2/xds/TestXdsClientImpl.java index 6c89dbabc1..76457deef3 100644 --- a/d2/src/test/java/com/linkedin/d2/xds/TestXdsClientImpl.java +++ b/d2/src/test/java/com/linkedin/d2/xds/TestXdsClientImpl.java @@ -9,6 +9,7 @@ import com.linkedin.d2.xds.XdsClient.ResourceType; import com.linkedin.d2.xds.XdsClientImpl.DiscoveryResponseData; import com.linkedin.d2.xds.XdsClientImpl.ResourceSubscriber; +import com.linkedin.d2.xds.XdsClientImpl.WildcardResourceSubscriber; import indis.XdsD2; import io.envoyproxy.envoy.service.discovery.v3.Resource; import java.util.Collections; @@ -159,24 +160,33 @@ public void testHandleD2NodeResponseWithData() fixture._xdsClientImpl.handleResponse(DISCOVERY_RESPONSE_NODE_DATA1); fixture.verifyAckSent(1); verify(fixture._resourceWatcher).onChanged(eq(NODE_UPDATE1)); + verify(fixture._wildcardResourceWatcher).onChanged(eq(SERVICE_RESOURCE_NAME), eq(NODE_UPDATE1)); verifyZeroInteractions(fixture._serverMetricsProvider); // initial update should not track latency XdsClient.NodeUpdate actualData = (XdsClient.NodeUpdate) fixture._nodeSubscriber.getData(); // subscriber data should be updated to NODE_UPDATE1 Assert.assertEquals(Objects.requireNonNull(actualData).getNodeData(), NODE_UPDATE1.getNodeData()); + actualData = (XdsClient.NodeUpdate) fixture._nodeWildcardSubscriber.getData(SERVICE_RESOURCE_NAME); + // subscriber data should be updated to NODE_UPDATE1 + Assert.assertEquals(Objects.requireNonNull(actualData).getNodeData(), NODE_UPDATE1.getNodeData()); // subscriber original data is invalid, xds server latency won't be tracked fixture._nodeSubscriber.setData(new XdsClient.NodeUpdate(null)); + fixture._nodeWildcardSubscriber.setData(SERVICE_RESOURCE_NAME, new XdsClient.NodeUpdate(null)); fixture._xdsClientImpl.handleResponse(DISCOVERY_RESPONSE_NODE_DATA1); fixture.verifyAckSent(2); verify(fixture._resourceWatcher, times(2)).onChanged(eq(NODE_UPDATE1)); + verify(fixture._wildcardResourceWatcher, times(2)).onChanged(eq(SERVICE_RESOURCE_NAME), eq(NODE_UPDATE1)); verifyZeroInteractions(fixture._serverMetricsProvider); // subscriber data should be updated to NODE_UPDATE2 fixture._xdsClientImpl.handleResponse(DISCOVERY_RESPONSE_NODE_DATA2); actualData = (XdsClient.NodeUpdate) fixture._nodeSubscriber.getData(); verify(fixture._resourceWatcher).onChanged(eq(NODE_UPDATE2)); + verify(fixture._wildcardResourceWatcher).onChanged(eq(SERVICE_RESOURCE_NAME), eq(NODE_UPDATE2)); verify(fixture._serverMetricsProvider).trackLatency(anyLong()); Assert.assertEquals(actualData.getNodeData(), NODE_UPDATE2.getNodeData()); + actualData = (XdsClient.NodeUpdate) fixture._nodeWildcardSubscriber.getData(SERVICE_RESOURCE_NAME); + Assert.assertEquals(actualData.getNodeData(), NODE_UPDATE2.getNodeData()); } @Test @@ -204,6 +214,8 @@ public void testHandleD2NodeUpdateWithBadData(DiscoveryResponseData badData, boo fixture._xdsClientImpl.handleResponse(badData); fixture.verifyAckOrNack(nackExpected, 1); verify(fixture._resourceWatcher).onChanged(eq(NODE.emptyData())); + // The wildcard subscriber doesn't care about bad data, it doesn't need to notify the watcher + verify(fixture._wildcardResourceWatcher, times(0)).onChanged(any(), any()); XdsClient.NodeUpdate actualData = (XdsClient.NodeUpdate) fixture._nodeSubscriber.getData(); Assert.assertNull(Objects.requireNonNull(actualData).getNodeData()); @@ -211,6 +223,7 @@ public void testHandleD2NodeUpdateWithBadData(DiscoveryResponseData badData, boo fixture._xdsClientImpl.handleResponse(badData); fixture.verifyAckOrNack(nackExpected, 2); verify(fixture._resourceWatcher).onChanged(eq(NODE_UPDATE1)); + verify(fixture._wildcardResourceWatcher, times(0)).onChanged(any(), any()); actualData = (XdsClient.NodeUpdate) fixture._nodeSubscriber.getData(); // bad data will not overwrite the original valid data Assert.assertEquals(actualData.getNodeData(), NODE_UPDATE1.getNodeData()); @@ -221,10 +234,13 @@ public void testHandleD2NodeResponseWithRemoval() { XdsClientImplFixture fixture = new XdsClientImplFixture(); fixture._nodeSubscriber.setData(NODE_UPDATE1); + fixture._nodeWildcardSubscriber.setData(SERVICE_RESOURCE_NAME, NODE_UPDATE1); fixture._xdsClientImpl.handleResponse(DISCOVERY_RESPONSE_NODE_DATA_WITH_REMOVAL); fixture.verifyAckSent(1); verify(fixture._resourceWatcher).onChanged(eq(NODE_UPDATE1)); + verify(fixture._wildcardResourceWatcher).onRemoval(eq(SERVICE_RESOURCE_NAME)); verify(fixture._nodeSubscriber).onRemoval(); + verify(fixture._nodeWildcardSubscriber).onRemoval(eq(SERVICE_RESOURCE_NAME)); XdsClient.NodeUpdate actualData = (XdsClient.NodeUpdate) fixture._nodeSubscriber.getData(); // removed resource will not overwrite the original valid data Assert.assertEquals(Objects.requireNonNull(actualData).getNodeData(), NODE_UPDATE1.getNodeData()); @@ -239,15 +255,20 @@ public void testHandleD2URIMapResponseWithData() fixture._xdsClientImpl.handleResponse(DISCOVERY_RESPONSE_URI_MAP_DATA1); fixture.verifyAckSent(1); verify(fixture._resourceWatcher).onChanged(eq(D2_URI_MAP_UPDATE_WITH_DATA1)); + verify(fixture._wildcardResourceWatcher).onChanged(eq(CLUSTER_RESOURCE_NAME), eq(D2_URI_MAP_UPDATE_WITH_DATA1)); verifyZeroInteractions(fixture._serverMetricsProvider); D2URIMapUpdate actualData = (D2URIMapUpdate) fixture._clusterSubscriber.getData(); // subscriber data should be updated to D2_URI_MAP_UPDATE_WITH_DATA1 Assert.assertEquals(Objects.requireNonNull(actualData).getURIMap(), D2_URI_MAP_UPDATE_WITH_DATA1.getURIMap()); + actualData = (D2URIMapUpdate) fixture._uriMapWildcardSubscriber.getData(CLUSTER_RESOURCE_NAME); + Assert.assertEquals(Objects.requireNonNull(actualData).getURIMap(), D2_URI_MAP_UPDATE_WITH_DATA1.getURIMap()); // subscriber original data is invalid, xds server latency won't be tracked fixture._clusterSubscriber.setData(new XdsClient.D2URIMapUpdate(null)); + fixture._uriMapWildcardSubscriber.setData(CLUSTER_RESOURCE_NAME, new XdsClient.D2URIMapUpdate(null)); fixture._xdsClientImpl.handleResponse(DISCOVERY_RESPONSE_URI_MAP_DATA1); verify(fixture._resourceWatcher, times(2)).onChanged(eq(D2_URI_MAP_UPDATE_WITH_DATA1)); + verify(fixture._wildcardResourceWatcher, times(2)).onChanged(eq(CLUSTER_RESOURCE_NAME), eq(D2_URI_MAP_UPDATE_WITH_DATA1)); verifyZeroInteractions(fixture._serverMetricsProvider); fixture.verifyAckSent(2); @@ -257,6 +278,8 @@ public void testHandleD2URIMapResponseWithData() verify(fixture._resourceWatcher).onChanged(eq(D2_URI_MAP_UPDATE_WITH_DATA2)); verify(fixture._serverMetricsProvider, times(2)).trackLatency(anyLong()); Assert.assertEquals(actualData.getURIMap(), D2_URI_MAP_UPDATE_WITH_DATA2.getURIMap()); + actualData = (D2URIMapUpdate) fixture._uriMapWildcardSubscriber.getData(CLUSTER_RESOURCE_NAME); + Assert.assertEquals(actualData.getURIMap(), D2_URI_MAP_UPDATE_WITH_DATA2.getURIMap()); fixture.verifyAckSent(3); } @@ -291,24 +314,37 @@ public void testHandleD2URIMapUpdateWithBadData(DiscoveryResponseData badData, b ? (D2URIMapUpdate) D2_URI_MAP.emptyData() : new D2URIMapUpdate(Collections.emptyMap()); verify(fixture._resourceWatcher).onChanged(eq(expectedUpdate)); + if (!invalidData) + { + verify(fixture._wildcardResourceWatcher).onChanged(eq(CLUSTER_RESOURCE_NAME), eq(expectedUpdate)); + } verify(fixture._clusterSubscriber).setData(eq(null)); + verify(fixture._uriMapWildcardSubscriber, times(0)).setData(any(), any()); verifyZeroInteractions(fixture._serverMetricsProvider); D2URIMapUpdate actualData = (D2URIMapUpdate) fixture._clusterSubscriber.getData(); Assert.assertEquals(actualData, expectedUpdate); fixture._clusterSubscriber.setData(D2_URI_MAP_UPDATE_WITH_DATA1); + fixture._uriMapWildcardSubscriber.setData(CLUSTER_RESOURCE_NAME, D2_URI_MAP_UPDATE_WITH_DATA1); fixture._xdsClientImpl.handleResponse(badData); fixture.verifyAckOrNack(invalidData, 2); actualData = (D2URIMapUpdate) fixture._clusterSubscriber.getData(); Objects.requireNonNull(actualData); - if (invalidData) { + if (invalidData) + { verify(fixture._resourceWatcher).onChanged(eq(D2_URI_MAP_UPDATE_WITH_DATA1)); + verify(fixture._wildcardResourceWatcher, times(0)).onChanged(any(), any()); // bad data will not overwrite the original valid data Assert.assertEquals(actualData.getURIMap(), D2_URI_MAP_UPDATE_WITH_DATA1.getURIMap()); - } else { + } + else + { verify(fixture._resourceWatcher, times(2)).onChanged(eq(expectedUpdate)); + verify(fixture._wildcardResourceWatcher, times(2)).onChanged(eq(CLUSTER_RESOURCE_NAME), eq(expectedUpdate)); // But an empty cluster should clear the data Assert.assertEquals(actualData.getURIMap(), Collections.emptyMap()); + actualData = (D2URIMapUpdate) fixture._uriMapWildcardSubscriber.getData(CLUSTER_RESOURCE_NAME); + Assert.assertEquals(actualData.getURIMap(), Collections.emptyMap()); } verifyZeroInteractions(fixture._serverMetricsProvider); } @@ -318,10 +354,13 @@ public void testHandleD2URIMapResponseWithRemoval() { XdsClientImplFixture fixture = new XdsClientImplFixture(); fixture._clusterSubscriber.setData(D2_URI_MAP_UPDATE_WITH_DATA1); + fixture._uriMapWildcardSubscriber.setData(CLUSTER_RESOURCE_NAME, D2_URI_MAP_UPDATE_WITH_DATA1); fixture._xdsClientImpl.handleResponse(DISCOVERY_RESPONSE_URI_MAP_DATA_WITH_REMOVAL); fixture.verifyAckSent(1); verify(fixture._resourceWatcher).onChanged(eq(D2_URI_MAP_UPDATE_WITH_DATA1)); + verify(fixture._wildcardResourceWatcher).onRemoval(eq(CLUSTER_RESOURCE_NAME)); verify(fixture._clusterSubscriber).onRemoval(); + verify(fixture._uriMapWildcardSubscriber).onRemoval(eq(CLUSTER_RESOURCE_NAME)); verifyZeroInteractions(fixture._serverMetricsProvider); D2URIMapUpdate actualData = (D2URIMapUpdate) fixture._clusterSubscriber.getData(); // removed resource will not overwrite the original valid data @@ -344,16 +383,21 @@ public void testHandleD2URICollectionResponseWithData() fixture._xdsClientImpl.handleResponse(createUri1); fixture.verifyAckSent(1); verify(fixture._resourceWatcher).onChanged(eq(D2_URI_MAP_UPDATE_WITH_DATA1)); + verify(fixture._wildcardResourceWatcher).onChanged(eq(CLUSTER_RESOURCE_NAME), eq(D2_URI_MAP_UPDATE_WITH_DATA1)); verifyZeroInteractions(fixture._serverMetricsProvider); D2URIMapUpdate actualData = (D2URIMapUpdate) fixture._clusterSubscriber.getData(); // subscriber data should be updated to D2_URI_MAP_UPDATE_WITH_DATA1 Assert.assertEquals(Objects.requireNonNull(actualData).getURIMap(), D2_URI_MAP_UPDATE_WITH_DATA1.getURIMap()); + actualData = (D2URIMapUpdate) fixture._uriMapWildcardSubscriber.getData(CLUSTER_RESOURCE_NAME); + Assert.assertEquals(Objects.requireNonNull(actualData).getURIMap(), D2_URI_MAP_UPDATE_WITH_DATA1.getURIMap()); // subscriber original data is invalid, xds server latency won't be tracked fixture._clusterSubscriber.setData(new D2URIMapUpdate(null)); + fixture._uriMapWildcardSubscriber.setData(CLUSTER_RESOURCE_NAME, new D2URIMapUpdate(null)); fixture._xdsClientImpl.handleResponse(createUri1); fixture.verifyAckSent(2); verify(fixture._resourceWatcher, times(2)).onChanged(eq(D2_URI_MAP_UPDATE_WITH_DATA1)); + verify(fixture._wildcardResourceWatcher, times(2)).onChanged(eq(CLUSTER_RESOURCE_NAME), eq(D2_URI_MAP_UPDATE_WITH_DATA1)); verifyZeroInteractions(fixture._serverMetricsProvider); DiscoveryResponseData createUri2Delete1 = new DiscoveryResponseData(D2_URI, Collections.singletonList( @@ -368,9 +412,12 @@ public void testHandleD2URICollectionResponseWithData() // subscriber data should be updated to D2_URI_MAP_UPDATE_WITH_DATA2 D2URIMapUpdate expectedUpdate = new D2URIMapUpdate(Collections.singletonMap(URI2, D2URI_2)); verify(fixture._resourceWatcher).onChanged(eq(expectedUpdate)); + verify(fixture._wildcardResourceWatcher).onChanged(eq(CLUSTER_RESOURCE_NAME), eq(expectedUpdate)); // track latency only for updated/new uri (not for deletion) verify(fixture._serverMetricsProvider).trackLatency(anyLong()); Assert.assertEquals(actualData.getURIMap(), expectedUpdate.getURIMap()); + actualData = (D2URIMapUpdate) fixture._uriMapWildcardSubscriber.getData(CLUSTER_RESOURCE_NAME); + Assert.assertEquals(actualData.getURIMap(), expectedUpdate.getURIMap()); fixture.verifyAckSent(3); // Finally sanity check that the client correctly handles the deletion of the final URI in the collection @@ -381,8 +428,11 @@ public void testHandleD2URICollectionResponseWithData() // subscriber data should be updated to empty map expectedUpdate = new D2URIMapUpdate(Collections.emptyMap()); verify(fixture._resourceWatcher).onChanged(eq(expectedUpdate)); + verify(fixture._wildcardResourceWatcher).onChanged(eq(CLUSTER_RESOURCE_NAME), eq(expectedUpdate)); verifyNoMoreInteractions(fixture._serverMetricsProvider); Assert.assertEquals(actualData.getURIMap(), expectedUpdate.getURIMap()); + actualData = (D2URIMapUpdate) fixture._uriMapWildcardSubscriber.getData(CLUSTER_RESOURCE_NAME); + Assert.assertEquals(actualData.getURIMap(), expectedUpdate.getURIMap()); fixture.verifyAckSent(4); } @@ -412,16 +462,22 @@ public void testHandleD2URICollectionUpdateWithBadData() fixture._xdsClientImpl.handleResponse(badData); fixture.verifyNackSent(1); verify(fixture._resourceWatcher).onChanged(eq(D2_URI_MAP.emptyData())); + // The wildcard subscriber doesn't care about bad data, and simply treats it as the resource not existing + verify(fixture._wildcardResourceWatcher, times(0)).onChanged(any(), any()); verifyZeroInteractions(fixture._serverMetricsProvider); D2URIMapUpdate actualData = (D2URIMapUpdate) fixture._clusterSubscriber.getData(); Assert.assertNull(Objects.requireNonNull(actualData).getURIMap()); + actualData = (D2URIMapUpdate) fixture._uriMapWildcardSubscriber.getData(CLUSTER_RESOURCE_NAME); + Assert.assertNull(actualData); fixture._clusterSubscriber.setData(D2_URI_MAP_UPDATE_WITH_DATA1); + fixture._uriMapWildcardSubscriber.setData(CLUSTER_RESOURCE_NAME, D2_URI_MAP_UPDATE_WITH_DATA1); fixture._xdsClientImpl.handleResponse(badData); fixture.verifyNackSent(2); // Due to the way glob collection updates are handled, bad data is dropped rather than showing any visible side // effects other than NACKing the response. verify(fixture._resourceWatcher, times(0)).onChanged(eq(D2_URI_MAP_UPDATE_WITH_DATA1)); + verify(fixture._wildcardResourceWatcher, times(0)).onChanged(any(), any()); verifyZeroInteractions(fixture._serverMetricsProvider); } @@ -433,14 +489,19 @@ public void testHandleD2URICollectionResponseWithRemoval() XdsClientImplFixture fixture = new XdsClientImplFixture(); fixture._clusterSubscriber.setData(D2_URI_MAP_UPDATE_WITH_DATA1); + fixture._uriMapWildcardSubscriber.setData(CLUSTER_RESOURCE_NAME, D2_URI_MAP_UPDATE_WITH_DATA1); fixture._xdsClientImpl.handleResponse(removeClusterResponse); fixture.verifyAckSent(1); verify(fixture._resourceWatcher).onChanged(eq(D2_URI_MAP_UPDATE_WITH_DATA1)); + verify(fixture._wildcardResourceWatcher).onRemoval(eq(CLUSTER_RESOURCE_NAME)); verify(fixture._clusterSubscriber).onRemoval(); + verify(fixture._uriMapWildcardSubscriber).onRemoval(eq(CLUSTER_RESOURCE_NAME)); verifyZeroInteractions(fixture._serverMetricsProvider); D2URIMapUpdate actualData = (D2URIMapUpdate) fixture._clusterSubscriber.getData(); // removed resource will not overwrite the original valid data Assert.assertEquals(actualData.getURIMap(), D2_URI_MAP_UPDATE_WITH_DATA1.getURIMap()); + actualData = (D2URIMapUpdate) fixture._uriMapWildcardSubscriber.getData(CLUSTER_RESOURCE_NAME); + Assert.assertNull(actualData); } @Test @@ -506,6 +567,8 @@ private static class XdsClientImplFixture @Mock XdsClient.ResourceWatcher _resourceWatcher; @Mock + XdsClient.WildcardResourceWatcher _wildcardResourceWatcher; + @Mock XdsServerMetricsProvider _serverMetricsProvider; XdsClientImplFixture() @@ -518,9 +581,8 @@ private static class XdsClientImplFixture MockitoAnnotations.initMocks(this); _nodeSubscriber = spy(new ResourceSubscriber(NODE, SERVICE_RESOURCE_NAME, _xdsClientJmx)); _clusterSubscriber = spy(new ResourceSubscriber(D2_URI_MAP, CLUSTER_RESOURCE_NAME, _xdsClientJmx)); - _nodeWildcardSubscriber = new XdsClientImpl.WildcardResourceSubscriber(NODE); - _uriMapWildcardSubscriber = new XdsClientImpl.WildcardResourceSubscriber(D2_URI_MAP); - + _nodeWildcardSubscriber = spy(new XdsClientImpl.WildcardResourceSubscriber(NODE)); + _uriMapWildcardSubscriber = spy(new XdsClientImpl.WildcardResourceSubscriber(D2_URI_MAP)); doNothing().when(_resourceWatcher).onChanged(any()); @@ -529,8 +591,11 @@ private static class XdsClientImplFixture subscriber.addWatcher(_resourceWatcher); _subscribers.put(subscriber.getType(), Collections.singletonMap(subscriber.getResource(), subscriber)); } - _wildcardSubscribers.put(NODE, _nodeWildcardSubscriber); - _wildcardSubscribers.put(D2_URI_MAP, _uriMapWildcardSubscriber); + for (WildcardResourceSubscriber subscriber : Lists.newArrayList(_nodeWildcardSubscriber, _uriMapWildcardSubscriber)) + { + subscriber.addWatcher(_wildcardResourceWatcher); + _wildcardSubscribers.put(subscriber.getType(), subscriber); + } doNothing().when(_serverMetricsProvider).trackLatency(anyLong()); _xdsClientImpl = spy(new XdsClientImpl(null, null, null, 0, useGlobCollections, _serverMetricsProvider));