Skip to content

Commit

Permalink
Add watchAllResource subscriber
Browse files Browse the repository at this point in the history
  • Loading branch information
brycezhongqing committed Sep 13, 2024
1 parent 5b3e6cc commit b374683
Show file tree
Hide file tree
Showing 4 changed files with 248 additions and 19 deletions.
6 changes: 5 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ and what APIs have changed, if applicable.

## [Unreleased]

## [29.58.7] - 2024-09-13
Add WildcardResourceSubscriber for restli-resource-explorer subscribe to all resources

## [29.58.6] - 2024-09-08
- Allow for null paging inside Collection response envelopes

Expand Down Expand Up @@ -5728,7 +5731,8 @@ patch operations can re-use these classes for generating patch messages.

## [0.14.1]

[Unreleased]: https://github.com/linkedin/rest.li/compare/v29.58.6...master
[Unreleased]: https://github.com/linkedin/rest.li/compare/v29.58.7...master
[29.58.7]: https://github.com/linkedin/rest.li/compare/v29.58.6...v29.58.7
[29.58.6]: https://github.com/linkedin/rest.li/compare/v29.58.5...v29.58.6
[29.58.5]: https://github.com/linkedin/rest.li/compare/v29.58.4...v29.58.5
[29.58.4]: https://github.com/linkedin/rest.li/compare/v29.58.3...v29.58.4
Expand Down
84 changes: 78 additions & 6 deletions d2/src/main/java/com/linkedin/d2/xds/XdsClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,71 @@ final void onChanged(ResourceUpdate update)
}
}

public static abstract class WildcardResourceWatcher
{
private final ResourceType _type;

/**
* Defining a private constructor means only classes that are defined in this file can extend this class (see
* {@link ResourceWatcher}).
*/
private WildcardResourceWatcher(ResourceType type)
{
_type = type;
}

final ResourceType getType()
{
return _type;
}

/**
* Called when the resource discovery RPC encounters some transient error.
*/
public abstract void onError(Status error);

/**
* Called when the resource discovery RPC reestablishes connection.
*/
public abstract void onReconnect();

abstract void onChanged(String resourceName, ResourceUpdate update);

public abstract void onRemoval(String resourceName);
}

public static abstract class WildcardNodeResourceWatcher extends WildcardResourceWatcher
{
public WildcardNodeResourceWatcher()
{
super(ResourceType.NODE);
}

public abstract void onChanged(String resourceName, NodeUpdate update);

@Override
final void onChanged(String resourceName, ResourceUpdate update)
{
onChanged(resourceName, (NodeUpdate) update);
}
}

public static abstract class WildcardD2URIMapResourceWatcher extends WildcardResourceWatcher
{
public WildcardD2URIMapResourceWatcher()
{
super(ResourceType.D2_URI_MAP);
}

public abstract void onChanged(String resourceName, D2URIMapUpdate update);

@Override
final void onChanged(String resourceName, ResourceUpdate update)
{
onChanged(resourceName, (D2URIMapUpdate) update);
}
}

public interface ResourceUpdate
{
boolean isValid();
Expand All @@ -109,7 +174,7 @@ public static final class NodeUpdate implements ResourceUpdate
_nodeData = nodeData;
}

XdsD2.Node getNodeData()
public XdsD2.Node getNodeData()
{
return _nodeData;
}
Expand Down Expand Up @@ -261,13 +326,20 @@ static ResourceType fromTypeUrl(String typeUrl)
* will always notify the given watcher of the current data if it is already present, even if the given watcher was
* already subscribed to said resource. However, the subscription will only be added once.
*/
abstract void watchXdsResource(String resourceName, ResourceWatcher watcher);
public abstract void watchXdsResource(String resourceName, ResourceWatcher watcher);

/**
* Subscribes the given {@link WildcardResourceWatcher} to all the resources of the corresponding type. The watcher
* will be notified whenever a resource is added or removed. Repeated calls to this function with the same watcher
* will always notify the given watcher of the current data.
*/
public abstract void watchAllXdsResources(WildcardResourceWatcher watcher);

abstract void startRpcStream();
public abstract void startRpcStream();

abstract void shutdown();
public abstract void shutdown();

abstract String getXdsServerAuthority();
public abstract String getXdsServerAuthority();

abstract public XdsClientJmx getXdsClientJmx();
public abstract XdsClientJmx getXdsClientJmx();
}
175 changes: 164 additions & 11 deletions d2/src/main/java/com/linkedin/d2/xds/XdsClientImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ public class XdsClientImpl extends XdsClient
Arrays.stream(ResourceType.values())
.filter(e -> e.typeUrl() != null)
.collect(Collectors.toMap(Function.identity(), e -> new HashMap<>())));
private final Map<ResourceType, WildcardResourceSubscriber> _wildcardSubscribers = new HashMap<>();
private final Node _node;
private final ManagedChannel _managedChannel;
private final ScheduledExecutorService _executorService;
Expand Down Expand Up @@ -131,7 +132,7 @@ public XdsClientImpl(Node node, ManagedChannel managedChannel, ScheduledExecutor
}

@Override
void watchXdsResource(String resourceName, ResourceWatcher watcher)
public void watchXdsResource(String resourceName, ResourceWatcher watcher)
{
_executorService.execute(() ->
{
Expand Down Expand Up @@ -168,6 +169,32 @@ void watchXdsResource(String resourceName, ResourceWatcher watcher)
});
}

@Override
public void watchAllXdsResources(WildcardResourceWatcher watcher)
{
_executorService.execute(() ->
{
_log.info("Subscribing to wildcard for resource type: {}", watcher.getType());
WildcardResourceSubscriber subscriber = _wildcardSubscribers.get(watcher.getType());
if (subscriber == null)
{
subscriber = new WildcardResourceSubscriber(watcher.getType());
_wildcardSubscribers.put(watcher.getType(), subscriber);

if (_adsStream == null && !isInBackoff())
{
startRpcStreamLocal();
}
if (_adsStream != null)
{
_adsStream.sendDiscoveryRequest(watcher.getType(), Collections.singletonList("*"));
}
}

subscriber.addWatcher(watcher);
});
}

@Override
public void startRpcStream()
{
Expand Down Expand Up @@ -225,7 +252,7 @@ private void startRpcStreamLocal() {
}

@Override
void shutdown()
public void shutdown()
{
_executorService.execute(() ->
{
Expand All @@ -238,7 +265,7 @@ void shutdown()
}

@Override
String getXdsServerAuthority()
public String getXdsServerAuthority()
{
return _managedChannel.authority();
}
Expand Down Expand Up @@ -471,6 +498,11 @@ private void handleResourceUpdate(Map<String, ? extends ResourceUpdate> updates,
{
subscriber.onData(entry.getValue(), _serverMetricsProvider);
}
WildcardResourceSubscriber wildcardSubscriber = _wildcardSubscribers.get(type);
if (wildcardSubscriber != null)
{
wildcardSubscriber.onData(entry.getKey(), entry.getValue());
}
}
}

Expand All @@ -490,6 +522,11 @@ private void handleResourceRemoval(Collection<String> removedResources, Resource
subscriber.onRemoval();
}
}
WildcardResourceSubscriber wildcardSubscriber = _wildcardSubscribers.get(type);
if (wildcardSubscriber != null)
{
removedResources.forEach(wildcardSubscriber::onRemoval);
}
}


Expand All @@ -502,6 +539,10 @@ private void notifyStreamError(Status error)
subscriber.onError(error);
}
}
for (WildcardResourceSubscriber wildcardResourceSubscriber : _wildcardSubscribers.values())
{
wildcardResourceSubscriber.onError(error);
}
_xdsClientJmx.setIsConnected(false);
}

Expand All @@ -514,6 +555,10 @@ private void notifyStreamReconnect()
subscriber.onReconnect();
}
}
for (WildcardResourceSubscriber wildcardResourceSubscriber : _wildcardSubscribers.values())
{
wildcardResourceSubscriber.onReconnect();
}
_xdsClientJmx.setIsConnected(true);
}

Expand Down Expand Up @@ -690,24 +735,132 @@ void onRemoval()
}
}

final class RpcRetryTask implements Runnable {
static class WildcardResourceSubscriber
{
private final ResourceType _type;
private final Set<WildcardResourceWatcher> _watchers = new HashSet<>();
private Map<String, ResourceUpdate> _data = new HashMap<>();

@VisibleForTesting
public Map<String, ResourceUpdate> getData()
{
return _data;
}

@VisibleForTesting
public void setData(@Nullable Map<String, ResourceUpdate> data)
{
_data = data;
}

WildcardResourceSubscriber(ResourceType type)
{
_type = type;
}

void addWatcher(WildcardResourceWatcher watcher)
{
_watchers.add(watcher);
for (Map.Entry<String, ResourceUpdate> entry : _data.entrySet())
{
watcher.onChanged(entry.getKey(), entry.getValue());
_log.debug("Notifying watcher of current data for resource {} of type {}: {}",
entry.getKey(), _type, entry.getValue());
}
}

private void onData(String resourceName, ResourceUpdate data)
{
if (Objects.equals(_data.get(resourceName), data))
{
_log.debug("Received resource update data equal to the current data. Will not perform the update.");
return;
}
// null value guard to avoid overwriting the property with null
if (data != null && data.isValid())
{
_data.put(resourceName, data);
for (WildcardResourceWatcher watcher : _watchers)
{
watcher.onChanged(resourceName, data);
}
}
else
{
if (_type == ResourceType.D2_URI_MAP || _type == ResourceType.D2_URI)
{
RATE_LIMITED_LOGGER.warn("Received invalid data for {} {}, data: {}", _type, resourceName, data);
}
else
{
_log.warn("Received invalid data for {} {}, data: {}", _type, resourceName, data);
}
}
}

public ResourceType getType()
{
return _type;
}

private void onError(Status error)
{
for (WildcardResourceWatcher watcher : _watchers)
{
watcher.onError(error);
}
}

private void onReconnect()
{
for (WildcardResourceWatcher watcher : _watchers)
{
watcher.onReconnect();
}
}

@VisibleForTesting
void onRemoval(String resourceName)
{
_log.info("{} resource deleted: {}", _type, resourceName);
_data.remove(resourceName);
for (WildcardResourceWatcher watcher : _watchers)
{
watcher.onRemoval(resourceName);
}
}
}

final class RpcRetryTask implements Runnable
{
@Override
public void run() {
public void run()
{
startRpcStreamLocal();
for (ResourceType type : ResourceType.values()) {
Collection<String> resources = getResourceSubscriberMap(type).keySet();
if (resources.isEmpty())
for (ResourceType type : ResourceType.values())
{
Set<String> resources = new HashSet<>(getResourceSubscriberMap(type).keySet());
if (resources.isEmpty() && !_wildcardSubscribers.containsKey(type))
{
continue;
}
ResourceType rewrittenType;
if (_subscribeToUriGlobCollection && type == ResourceType.D2_URI_MAP)
{
resources = resources.stream()
.map(GlobCollectionUtils::globCollectionUrlForClusterResource)
.collect(Collectors.toSet());
type = ResourceType.D2_URI;
.collect(Collectors.toCollection(HashSet::new));
rewrittenType = ResourceType.D2_URI;
}
else
{
rewrittenType = type;
}
if (_wildcardSubscribers.containsKey(type))
{
resources.add("*");
}
_adsStream.sendDiscoveryRequest(type, resources);
_adsStream.sendDiscoveryRequest(rewrittenType, resources);
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
version=29.58.6
version=29.58.7
group=com.linkedin.pegasus
org.gradle.configureondemand=true
org.gradle.parallel=true
Expand Down

0 comments on commit b374683

Please sign in to comment.