diff --git a/CHANGELOG.md b/CHANGELOG.md index adf7ab8170..6bdfc3b4c8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,9 @@ and what APIs have changed, if applicable. ## [Unreleased] +## [29.58.7] - 2024-09-13 +- Add WildcardResourceSubscriber which could subscribe to all resources, like NODE and URIMap resources. + ## [29.58.6] - 2024-09-08 - Allow for null paging inside Collection response envelopes @@ -5728,7 +5731,8 @@ patch operations can re-use these classes for generating patch messages. ## [0.14.1] -[Unreleased]: https://github.com/linkedin/rest.li/compare/v29.58.6...master +[Unreleased]: https://github.com/linkedin/rest.li/compare/v29.58.7...master +[29.58.7]: https://github.com/linkedin/rest.li/compare/v29.58.6...v29.58.7 [29.58.6]: https://github.com/linkedin/rest.li/compare/v29.58.5...v29.58.6 [29.58.5]: https://github.com/linkedin/rest.li/compare/v29.58.4...v29.58.5 [29.58.4]: https://github.com/linkedin/rest.li/compare/v29.58.3...v29.58.4 diff --git a/d2/src/main/java/com/linkedin/d2/xds/XdsClient.java b/d2/src/main/java/com/linkedin/d2/xds/XdsClient.java index a0c08b04f0..f5040dcb45 100644 --- a/d2/src/main/java/com/linkedin/d2/xds/XdsClient.java +++ b/d2/src/main/java/com/linkedin/d2/xds/XdsClient.java @@ -95,6 +95,91 @@ final void onChanged(ResourceUpdate update) } } + public static abstract class WildcardResourceWatcher + { + private final ResourceType _type; + + /** + * Defining a private constructor means only classes that are defined in this file can extend this class (see + * {@link ResourceWatcher}). + */ + WildcardResourceWatcher(ResourceType type) + { + _type = type; + } + + final ResourceType getType() + { + return _type; + } + + /** + * Called when the resource discovery RPC encounters some transient error. + */ + public abstract void onError(Status error); + + /** + * Called when the resource discovery RPC reestablishes connection. + */ + public abstract void onReconnect(); + + /** + * Called when a resource is added or updated. + * @param resourceName the name of the resource that was added or updated. + * @param update the new data {@link ResourceUpdate} for the resource. + */ + abstract void onChanged(String resourceName, ResourceUpdate update); + + /** + * Called when a resource is removed. + * @param resourceName the name of the resource that was removed. + */ + public abstract void onRemoval(String resourceName); + } + + public static abstract class WildcardNodeResourceWatcher extends WildcardResourceWatcher + { + public WildcardNodeResourceWatcher() + { + super(ResourceType.NODE); + } + + /** + * Called when a node resource is added or updated. + * @param resourceName the resource name of the {@link NodeUpdate} that was added or updated. + * @param update the new data for the {@link NodeUpdate}, including D2 cluster and service information. + */ + public abstract void onChanged(String resourceName, NodeUpdate update); + + @Override + final void onChanged(String resourceName, ResourceUpdate update) + { + onChanged(resourceName, (NodeUpdate) update); + } + } + + public static abstract class WildcardD2URIMapResourceWatcher extends WildcardResourceWatcher + { + public WildcardD2URIMapResourceWatcher() + { + super(ResourceType.D2_URI_MAP); + } + + /** + * Called when a {@link D2URIMapUpdate} resource is added or updated. + * @param resourceName the resource name of the {@link D2URIMapUpdate} map resource that was added or updated. + * like the /d2/uris/clusterName + * @param update the new data for the {@link D2URIMapUpdate} resource + */ + public abstract void onChanged(String resourceName, D2URIMapUpdate update); + + @Override + final void onChanged(String resourceName, ResourceUpdate update) + { + onChanged(resourceName, (D2URIMapUpdate) update); + } + } + public interface ResourceUpdate { boolean isValid(); @@ -109,7 +194,7 @@ public static final class NodeUpdate implements ResourceUpdate _nodeData = nodeData; } - XdsD2.Node getNodeData() + public XdsD2.Node getNodeData() { return _nodeData; } @@ -261,13 +346,32 @@ static ResourceType fromTypeUrl(String typeUrl) * will always notify the given watcher of the current data if it is already present, even if the given watcher was * already subscribed to said resource. However, the subscription will only be added once. */ - abstract void watchXdsResource(String resourceName, ResourceWatcher watcher); + public abstract void watchXdsResource(String resourceName, ResourceWatcher watcher); - abstract void startRpcStream(); + /** + * Subscribes the given {@link WildcardResourceWatcher} to all the resources of the corresponding type. The watcher + * will be notified whenever a resource is added or removed. Repeated calls to this function with the same watcher + * will always notify the given watcher of the current data. + */ + public abstract void watchAllXdsResources(WildcardResourceWatcher watcher); - abstract void shutdown(); + /** + * Initiates the RPC stream to the xDS server. + */ + public abstract void startRpcStream(); + + /** + * Shuts down the xDS client. + */ + public abstract void shutdown(); - abstract String getXdsServerAuthority(); + /** + * Returns the authority of the xDS server. + */ + public abstract String getXdsServerAuthority(); - abstract public XdsClientJmx getXdsClientJmx(); + /** + * Returns the JMX bean for the xDS client. + */ + public abstract XdsClientJmx getXdsClientJmx(); } diff --git a/d2/src/main/java/com/linkedin/d2/xds/XdsClientImpl.java b/d2/src/main/java/com/linkedin/d2/xds/XdsClientImpl.java index b5ee6b0126..e53ed30235 100644 --- a/d2/src/main/java/com/linkedin/d2/xds/XdsClientImpl.java +++ b/d2/src/main/java/com/linkedin/d2/xds/XdsClientImpl.java @@ -77,6 +77,7 @@ public class XdsClientImpl extends XdsClient Arrays.stream(ResourceType.values()) .filter(e -> e.typeUrl() != null) .collect(Collectors.toMap(Function.identity(), e -> new HashMap<>()))); + private final Map _wildcardSubscribers = Maps.newEnumMap(ResourceType.class); private final Node _node; private final ManagedChannel _managedChannel; private final ScheduledExecutorService _executorService; @@ -131,7 +132,7 @@ public XdsClientImpl(Node node, ManagedChannel managedChannel, ScheduledExecutor } @Override - void watchXdsResource(String resourceName, ResourceWatcher watcher) + public void watchXdsResource(String resourceName, ResourceWatcher watcher) { _executorService.execute(() -> { @@ -168,6 +169,32 @@ void watchXdsResource(String resourceName, ResourceWatcher watcher) }); } + @Override + public void watchAllXdsResources(WildcardResourceWatcher watcher) + { + _executorService.execute(() -> + { + _log.info("Subscribing to wildcard for resource type: {}", watcher.getType()); + WildcardResourceSubscriber subscriber = getWildcardResourceSubscriber(watcher.getType()); + if (subscriber == null) + { + subscriber = new WildcardResourceSubscriber(watcher.getType()); + _wildcardSubscribers.put(watcher.getType(), subscriber); + + if (_adsStream == null && !isInBackoff()) + { + startRpcStreamLocal(); + } + if (_adsStream != null) + { + _adsStream.sendDiscoveryRequest(watcher.getType(), Collections.singletonList("*")); + } + } + + subscriber.addWatcher(watcher); + }); + } + @Override public void startRpcStream() { @@ -225,7 +252,7 @@ private void startRpcStreamLocal() { } @Override - void shutdown() + public void shutdown() { _executorService.execute(() -> { @@ -238,7 +265,7 @@ void shutdown() } @Override - String getXdsServerAuthority() + public String getXdsServerAuthority() { return _managedChannel.authority(); } @@ -471,6 +498,11 @@ private void handleResourceUpdate(Map updates, { subscriber.onData(entry.getValue(), _serverMetricsProvider); } + WildcardResourceSubscriber wildcardSubscriber = _wildcardSubscribers.get(type); + if (wildcardSubscriber != null) + { + wildcardSubscriber.onData(entry.getKey(), entry.getValue()); + } } } @@ -490,6 +522,11 @@ private void handleResourceRemoval(Collection removedResources, Resource subscriber.onRemoval(); } } + WildcardResourceSubscriber wildcardSubscriber = _wildcardSubscribers.get(type); + if (wildcardSubscriber != null) + { + removedResources.forEach(wildcardSubscriber::onRemoval); + } } @@ -502,6 +539,10 @@ private void notifyStreamError(Status error) subscriber.onError(error); } } + for (WildcardResourceSubscriber wildcardResourceSubscriber : _wildcardSubscribers.values()) + { + wildcardResourceSubscriber.onError(error); + } _xdsClientJmx.setIsConnected(false); } @@ -514,6 +555,10 @@ private void notifyStreamReconnect() subscriber.onReconnect(); } } + for (WildcardResourceSubscriber wildcardResourceSubscriber : _wildcardSubscribers.values()) + { + wildcardResourceSubscriber.onReconnect(); + } _xdsClientJmx.setIsConnected(true); } @@ -523,6 +568,12 @@ Map getResourceSubscriberMap(ResourceType type) return _resourceSubscribers.get(type); } + @VisibleForTesting + WildcardResourceSubscriber getWildcardResourceSubscriber(ResourceType type) + { + return _wildcardSubscribers.get(type); + } + static class ResourceSubscriber { private final ResourceType _type; @@ -690,24 +741,132 @@ void onRemoval() } } - final class RpcRetryTask implements Runnable { + static class WildcardResourceSubscriber + { + private final ResourceType _type; + private final Set _watchers = new HashSet<>(); + private Map _data = new HashMap<>(); + + @VisibleForTesting + public Map getData() + { + return _data; + } + + @VisibleForTesting + public void setData(@Nullable Map data) + { + _data = data; + } + + WildcardResourceSubscriber(ResourceType type) + { + _type = type; + } + + void addWatcher(WildcardResourceWatcher watcher) + { + _watchers.add(watcher); + for (Map.Entry entry : _data.entrySet()) + { + watcher.onChanged(entry.getKey(), entry.getValue()); + _log.debug("Notifying watcher of current data for resource {} of type {}: {}", + entry.getKey(), _type, entry.getValue()); + } + } + + private void onData(String resourceName, ResourceUpdate data) + { + if (Objects.equals(_data.get(resourceName), data)) + { + _log.debug("Received resource update data equal to the current data. Will not perform the update."); + return; + } + // null value guard to avoid overwriting the property with null + if (data != null && data.isValid()) + { + _data.put(resourceName, data); + for (WildcardResourceWatcher watcher : _watchers) + { + watcher.onChanged(resourceName, data); + } + } + else + { + if (_type == ResourceType.D2_URI_MAP || _type == ResourceType.D2_URI) + { + RATE_LIMITED_LOGGER.warn("Received invalid data for {} {}, data: {}", _type, resourceName, data); + } + else + { + _log.warn("Received invalid data for {} {}, data: {}", _type, resourceName, data); + } + } + } + + public ResourceType getType() + { + return _type; + } + + private void onError(Status error) + { + for (WildcardResourceWatcher watcher : _watchers) + { + watcher.onError(error); + } + } + + private void onReconnect() + { + for (WildcardResourceWatcher watcher : _watchers) + { + watcher.onReconnect(); + } + } + + @VisibleForTesting + void onRemoval(String resourceName) + { + _data.remove(resourceName); + for (WildcardResourceWatcher watcher : _watchers) + { + watcher.onRemoval(resourceName); + } + } + } + + final class RpcRetryTask implements Runnable + { @Override - public void run() { + public void run() + { startRpcStreamLocal(); - for (ResourceType type : ResourceType.values()) { - Collection resources = getResourceSubscriberMap(type).keySet(); - if (resources.isEmpty()) + for (ResourceType type : ResourceType.values()) + { + Set resources = new HashSet<>(getResourceSubscriberMap(type).keySet()); + if (resources.isEmpty() && getWildcardResourceSubscriber(type) == null) { continue; } + ResourceType rewrittenType; if (_subscribeToUriGlobCollection && type == ResourceType.D2_URI_MAP) { resources = resources.stream() .map(GlobCollectionUtils::globCollectionUrlForClusterResource) - .collect(Collectors.toSet()); - type = ResourceType.D2_URI; + .collect(Collectors.toCollection(HashSet::new)); + rewrittenType = ResourceType.D2_URI; + } + else + { + rewrittenType = type; + } + // If there is a wildcard subscriber, we should always send a wildcard request to the server. + if (getWildcardResourceSubscriber(type) != null) + { + resources.add("*"); } - _adsStream.sendDiscoveryRequest(type, resources); + _adsStream.sendDiscoveryRequest(rewrittenType, resources); } } } diff --git a/d2/src/test/java/com/linkedin/d2/xds/TestXdsClientImpl.java b/d2/src/test/java/com/linkedin/d2/xds/TestXdsClientImpl.java index 565a95c11e..6c89dbabc1 100644 --- a/d2/src/test/java/com/linkedin/d2/xds/TestXdsClientImpl.java +++ b/d2/src/test/java/com/linkedin/d2/xds/TestXdsClientImpl.java @@ -443,6 +443,37 @@ public void testHandleD2URICollectionResponseWithRemoval() Assert.assertEquals(actualData.getURIMap(), D2_URI_MAP_UPDATE_WITH_DATA1.getURIMap()); } + @Test + public void testWildCardResourceSubscription() + { + XdsClientImplFixture fixture = new XdsClientImplFixture(); + + XdsClient.WildcardNodeResourceWatcher nodeWildCardWatcher = Mockito.mock(XdsClient.WildcardNodeResourceWatcher.class); + XdsClient.WildcardD2URIMapResourceWatcher uriMapWildCardWatcher = Mockito.mock(XdsClient.WildcardD2URIMapResourceWatcher.class); + fixture._xdsClientImpl.getWildcardResourceSubscriber(NODE).addWatcher(nodeWildCardWatcher); + fixture._xdsClientImpl.getWildcardResourceSubscriber(D2_URI_MAP).addWatcher(uriMapWildCardWatcher); + + // NODE resource added + fixture._xdsClientImpl.handleResponse(DISCOVERY_RESPONSE_NODE_DATA1); + fixture.verifyAckSent(1); + nodeWildCardWatcher.onChanged(eq(SERVICE_RESOURCE_NAME) , eq(NODE_UPDATE1)); + + // NODE resource removed + fixture._xdsClientImpl.handleResponse(DISCOVERY_RESPONSE_NODE_DATA_WITH_REMOVAL); + fixture.verifyAckSent(2); + nodeWildCardWatcher.onRemoval(eq(SERVICE_RESOURCE_NAME)); + + // URI_MAP resource added + fixture._xdsClientImpl.handleResponse(DISCOVERY_RESPONSE_URI_MAP_DATA1); + fixture.verifyAckSent(3); + uriMapWildCardWatcher.onChanged(eq(CLUSTER_RESOURCE_NAME), eq(D2_URI_MAP_UPDATE_WITH_DATA1)); + + // URI_MAP resource removed + fixture._xdsClientImpl.handleResponse(DISCOVERY_RESPONSE_URI_MAP_DATA_WITH_REMOVAL); + fixture.verifyAckSent(4); + uriMapWildCardWatcher.onRemoval(eq(CLUSTER_RESOURCE_NAME)); + } + @Test public void testResourceSubscriberAddWatcher() { @@ -467,7 +498,11 @@ private static class XdsClientImplFixture XdsClientJmx _xdsClientJmx; ResourceSubscriber _nodeSubscriber; ResourceSubscriber _clusterSubscriber; + XdsClientImpl.WildcardResourceSubscriber _nodeWildcardSubscriber; + XdsClientImpl.WildcardResourceSubscriber _uriMapWildcardSubscriber; Map> _subscribers = new HashMap<>(); + Map _wildcardSubscribers = new HashMap<>(); + @Mock XdsClient.ResourceWatcher _resourceWatcher; @Mock @@ -483,6 +518,10 @@ private static class XdsClientImplFixture MockitoAnnotations.initMocks(this); _nodeSubscriber = spy(new ResourceSubscriber(NODE, SERVICE_RESOURCE_NAME, _xdsClientJmx)); _clusterSubscriber = spy(new ResourceSubscriber(D2_URI_MAP, CLUSTER_RESOURCE_NAME, _xdsClientJmx)); + _nodeWildcardSubscriber = new XdsClientImpl.WildcardResourceSubscriber(NODE); + _uriMapWildcardSubscriber = new XdsClientImpl.WildcardResourceSubscriber(D2_URI_MAP); + + doNothing().when(_resourceWatcher).onChanged(any()); for (ResourceSubscriber subscriber : Lists.newArrayList(_nodeSubscriber, _clusterSubscriber)) @@ -490,7 +529,8 @@ private static class XdsClientImplFixture subscriber.addWatcher(_resourceWatcher); _subscribers.put(subscriber.getType(), Collections.singletonMap(subscriber.getResource(), subscriber)); } - + _wildcardSubscribers.put(NODE, _nodeWildcardSubscriber); + _wildcardSubscribers.put(D2_URI_MAP, _uriMapWildcardSubscriber); doNothing().when(_serverMetricsProvider).trackLatency(anyLong()); _xdsClientImpl = spy(new XdsClientImpl(null, null, null, 0, useGlobCollections, _serverMetricsProvider)); @@ -498,6 +538,10 @@ private static class XdsClientImplFixture when(_xdsClientImpl.getXdsClientJmx()).thenReturn(_xdsClientJmx); when(_xdsClientImpl.getResourceSubscriberMap(any())) .thenAnswer(a -> _subscribers.get((ResourceType) a.getArguments()[0])); + doNothing().when(_xdsClientImpl).watchAllXdsResources(any()); + when(_xdsClientImpl.getWildcardResourceSubscriber(any())) + .thenAnswer(a -> _wildcardSubscribers.get((ResourceType) a.getArguments()[0])); + } void verifyAckSent(int count) diff --git a/gradle.properties b/gradle.properties index a42923ef16..61da0787d6 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,4 +1,4 @@ -version=29.58.6 +version=29.58.7 group=com.linkedin.pegasus org.gradle.configureondemand=true org.gradle.parallel=true