diff --git a/CHANGELOG.md b/CHANGELOG.md index adf7ab8170..1efd4cf5a8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,9 @@ and what APIs have changed, if applicable. ## [Unreleased] +## [29.58.7] - 2024-09-13 +Add WildcardResourceSubscriber for restli-resource-explorer subscribe to all resources + ## [29.58.6] - 2024-09-08 - Allow for null paging inside Collection response envelopes @@ -5728,7 +5731,8 @@ patch operations can re-use these classes for generating patch messages. ## [0.14.1] -[Unreleased]: https://github.com/linkedin/rest.li/compare/v29.58.6...master +[Unreleased]: https://github.com/linkedin/rest.li/compare/v29.58.7...master +[29.58.7]: https://github.com/linkedin/rest.li/compare/v29.58.6...v29.58.7 [29.58.6]: https://github.com/linkedin/rest.li/compare/v29.58.5...v29.58.6 [29.58.5]: https://github.com/linkedin/rest.li/compare/v29.58.4...v29.58.5 [29.58.4]: https://github.com/linkedin/rest.li/compare/v29.58.3...v29.58.4 diff --git a/d2/src/main/java/com/linkedin/d2/xds/XdsClient.java b/d2/src/main/java/com/linkedin/d2/xds/XdsClient.java index a0c08b04f0..8ed0693978 100644 --- a/d2/src/main/java/com/linkedin/d2/xds/XdsClient.java +++ b/d2/src/main/java/com/linkedin/d2/xds/XdsClient.java @@ -95,6 +95,71 @@ final void onChanged(ResourceUpdate update) } } + public static abstract class WildcardResourceWatcher + { + private final ResourceType _type; + + /** + * Defining a private constructor means only classes that are defined in this file can extend this class (see + * {@link ResourceWatcher}). + */ + private WildcardResourceWatcher(ResourceType type) + { + _type = type; + } + + final ResourceType getType() + { + return _type; + } + + /** + * Called when the resource discovery RPC encounters some transient error. + */ + public abstract void onError(Status error); + + /** + * Called when the resource discovery RPC reestablishes connection. + */ + public abstract void onReconnect(); + + abstract void onChanged(String resourceName, ResourceUpdate update); + + public abstract void onRemoval(String resourceName); + } + + public static abstract class WildcardNodeResourceWatcher extends WildcardResourceWatcher + { + public WildcardNodeResourceWatcher() + { + super(ResourceType.NODE); + } + + public abstract void onChanged(String resourceName, NodeUpdate update); + + @Override + final void onChanged(String resourceName, ResourceUpdate update) + { + onChanged(resourceName, (NodeUpdate) update); + } + } + + public static abstract class WildcardD2URIMapResourceWatcher extends WildcardResourceWatcher + { + public WildcardD2URIMapResourceWatcher() + { + super(ResourceType.D2_URI_MAP); + } + + public abstract void onChanged(String resourceName, D2URIMapUpdate update); + + @Override + final void onChanged(String resourceName, ResourceUpdate update) + { + onChanged(resourceName, (D2URIMapUpdate) update); + } + } + public interface ResourceUpdate { boolean isValid(); @@ -109,7 +174,7 @@ public static final class NodeUpdate implements ResourceUpdate _nodeData = nodeData; } - XdsD2.Node getNodeData() + public XdsD2.Node getNodeData() { return _nodeData; } @@ -261,13 +326,20 @@ static ResourceType fromTypeUrl(String typeUrl) * will always notify the given watcher of the current data if it is already present, even if the given watcher was * already subscribed to said resource. However, the subscription will only be added once. */ - abstract void watchXdsResource(String resourceName, ResourceWatcher watcher); + public abstract void watchXdsResource(String resourceName, ResourceWatcher watcher); + + /** + * Subscribes the given {@link WildcardResourceWatcher} to all the resources of the corresponding type. The watcher + * will be notified whenever a resource is added or removed. Repeated calls to this function with the same watcher + * will always notify the given watcher of the current data. + */ + public abstract void watchAllXdsResources(WildcardResourceWatcher watcher); - abstract void startRpcStream(); + public abstract void startRpcStream(); - abstract void shutdown(); + public abstract void shutdown(); - abstract String getXdsServerAuthority(); + public abstract String getXdsServerAuthority(); - abstract public XdsClientJmx getXdsClientJmx(); + public abstract XdsClientJmx getXdsClientJmx(); } diff --git a/d2/src/main/java/com/linkedin/d2/xds/XdsClientImpl.java b/d2/src/main/java/com/linkedin/d2/xds/XdsClientImpl.java index b5ee6b0126..5e9ee583e2 100644 --- a/d2/src/main/java/com/linkedin/d2/xds/XdsClientImpl.java +++ b/d2/src/main/java/com/linkedin/d2/xds/XdsClientImpl.java @@ -77,6 +77,7 @@ public class XdsClientImpl extends XdsClient Arrays.stream(ResourceType.values()) .filter(e -> e.typeUrl() != null) .collect(Collectors.toMap(Function.identity(), e -> new HashMap<>()))); + private final Map _wildcardSubscribers = new HashMap<>(); private final Node _node; private final ManagedChannel _managedChannel; private final ScheduledExecutorService _executorService; @@ -131,7 +132,7 @@ public XdsClientImpl(Node node, ManagedChannel managedChannel, ScheduledExecutor } @Override - void watchXdsResource(String resourceName, ResourceWatcher watcher) + public void watchXdsResource(String resourceName, ResourceWatcher watcher) { _executorService.execute(() -> { @@ -168,6 +169,32 @@ void watchXdsResource(String resourceName, ResourceWatcher watcher) }); } + @Override + public void watchAllXdsResources(WildcardResourceWatcher watcher) + { + _executorService.execute(() -> + { + _log.info("Subscribing to wildcard for resource type: {}", watcher.getType()); + WildcardResourceSubscriber subscriber = _wildcardSubscribers.get(watcher.getType()); + if (subscriber == null) + { + subscriber = new WildcardResourceSubscriber(watcher.getType()); + _wildcardSubscribers.put(watcher.getType(), subscriber); + + if (_adsStream == null && !isInBackoff()) + { + startRpcStreamLocal(); + } + if (_adsStream != null) + { + _adsStream.sendDiscoveryRequest(watcher.getType(), Collections.singletonList("*")); + } + } + + subscriber.addWatcher(watcher); + }); + } + @Override public void startRpcStream() { @@ -225,7 +252,7 @@ private void startRpcStreamLocal() { } @Override - void shutdown() + public void shutdown() { _executorService.execute(() -> { @@ -238,7 +265,7 @@ void shutdown() } @Override - String getXdsServerAuthority() + public String getXdsServerAuthority() { return _managedChannel.authority(); } @@ -471,6 +498,11 @@ private void handleResourceUpdate(Map updates, { subscriber.onData(entry.getValue(), _serverMetricsProvider); } + WildcardResourceSubscriber wildcardSubscriber = _wildcardSubscribers.get(type); + if (wildcardSubscriber != null) + { + wildcardSubscriber.onData(entry.getKey(), entry.getValue()); + } } } @@ -490,6 +522,11 @@ private void handleResourceRemoval(Collection removedResources, Resource subscriber.onRemoval(); } } + WildcardResourceSubscriber wildcardSubscriber = _wildcardSubscribers.get(type); + if (wildcardSubscriber != null) + { + removedResources.forEach(wildcardSubscriber::onRemoval); + } } @@ -502,6 +539,10 @@ private void notifyStreamError(Status error) subscriber.onError(error); } } + for (WildcardResourceSubscriber wildcardResourceSubscriber : _wildcardSubscribers.values()) + { + wildcardResourceSubscriber.onError(error); + } _xdsClientJmx.setIsConnected(false); } @@ -514,6 +555,10 @@ private void notifyStreamReconnect() subscriber.onReconnect(); } } + for (WildcardResourceSubscriber wildcardResourceSubscriber : _wildcardSubscribers.values()) + { + wildcardResourceSubscriber.onReconnect(); + } _xdsClientJmx.setIsConnected(true); } @@ -690,24 +735,132 @@ void onRemoval() } } - final class RpcRetryTask implements Runnable { + static class WildcardResourceSubscriber + { + private final ResourceType _type; + private final Set _watchers = new HashSet<>(); + private Map _data = new HashMap<>(); + + @VisibleForTesting + public Map getData() + { + return _data; + } + + @VisibleForTesting + public void setData(@Nullable Map data) + { + _data = data; + } + + WildcardResourceSubscriber(ResourceType type) + { + _type = type; + } + + void addWatcher(WildcardResourceWatcher watcher) + { + _watchers.add(watcher); + for (Map.Entry entry : _data.entrySet()) + { + watcher.onChanged(entry.getKey(), entry.getValue()); + _log.debug("Notifying watcher of current data for resource {} of type {}: {}", + entry.getKey(), _type, entry.getValue()); + } + } + + private void onData(String resourceName, ResourceUpdate data) + { + if (Objects.equals(_data.get(resourceName), data)) + { + _log.debug("Received resource update data equal to the current data. Will not perform the update."); + return; + } + // null value guard to avoid overwriting the property with null + if (data != null && data.isValid()) + { + _data.put(resourceName, data); + for (WildcardResourceWatcher watcher : _watchers) + { + watcher.onChanged(resourceName, data); + } + } + else + { + if (_type == ResourceType.D2_URI_MAP || _type == ResourceType.D2_URI) + { + RATE_LIMITED_LOGGER.warn("Received invalid data for {} {}, data: {}", _type, resourceName, data); + } + else + { + _log.warn("Received invalid data for {} {}, data: {}", _type, resourceName, data); + } + } + } + + public ResourceType getType() + { + return _type; + } + + private void onError(Status error) + { + for (WildcardResourceWatcher watcher : _watchers) + { + watcher.onError(error); + } + } + + private void onReconnect() + { + for (WildcardResourceWatcher watcher : _watchers) + { + watcher.onReconnect(); + } + } + + @VisibleForTesting + void onRemoval(String resourceName) + { + _log.info("{} resource deleted: {}", _type, resourceName); + _data.remove(resourceName); + for (WildcardResourceWatcher watcher : _watchers) + { + watcher.onRemoval(resourceName); + } + } + } + + final class RpcRetryTask implements Runnable + { @Override - public void run() { + public void run() + { startRpcStreamLocal(); - for (ResourceType type : ResourceType.values()) { - Collection resources = getResourceSubscriberMap(type).keySet(); - if (resources.isEmpty()) + for (ResourceType type : ResourceType.values()) + { + Set resources = new HashSet<>(getResourceSubscriberMap(type).keySet()); + if (resources.isEmpty() && !_wildcardSubscribers.containsKey(type)) { continue; } + ResourceType rewrittenType; if (_subscribeToUriGlobCollection && type == ResourceType.D2_URI_MAP) { resources = resources.stream() .map(GlobCollectionUtils::globCollectionUrlForClusterResource) - .collect(Collectors.toSet()); - type = ResourceType.D2_URI; + .collect(Collectors.toCollection(HashSet::new)); + rewrittenType = ResourceType.D2_URI; + } + else + { + rewrittenType = type; + } + if (_wildcardSubscribers.containsKey(type)) + { + resources.add("*"); } - _adsStream.sendDiscoveryRequest(type, resources); + _adsStream.sendDiscoveryRequest(rewrittenType, resources); } } } diff --git a/gradle.properties b/gradle.properties index a42923ef16..61da0787d6 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,4 +1,4 @@ -version=29.58.6 +version=29.58.7 group=com.linkedin.pegasus org.gradle.configureondemand=true org.gradle.parallel=true