Skip to content

Commit

Permalink
Fix invalid handling of glob collections for wildcard subscribers
Browse files Browse the repository at this point in the history
As the title says, things weren't wired through correctly for glob collections,
namely they were being ignored because they were triggering the check for
whtehre the cluster is being watched at all. This fixes that (and the fact that
the initial subscription did not respect the `useGlobCollections` flag). This
change has been unit tested and tested through in restli-resource-explorer, and
it works for both glob and non-glob.
  • Loading branch information
PapaCharlie committed Sep 20, 2024
1 parent 4f561f8 commit 7a55cea
Show file tree
Hide file tree
Showing 2 changed files with 110 additions and 24 deletions.
55 changes: 38 additions & 17 deletions d2/src/main/java/com/linkedin/d2/xds/XdsClientImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import io.grpc.stub.ClientResponseObserver;
import io.grpc.stub.StreamObserver;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
Expand All @@ -57,6 +56,7 @@
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.slf4j.Logger;
Expand All @@ -73,9 +73,14 @@ public class XdsClientImpl extends XdsClient
new RateLimitedLogger(_log, TimeUnit.MINUTES.toMillis(10), SystemClock.instance());
public static final long DEFAULT_READY_TIMEOUT_MILLIS = 2000L;

/**
* The resource subscriber map stores the subscribers to specific resources of a given type. Note that it only has 2
* keys: {@link ResourceType#D2_URI_MAP} and {@link ResourceType#NODE}. The {@link ResourceType#D2_URI} is absent from
* this map because it should not be used as a key, as glob collection updates are translated to appear as normal map
* updates to subscribers.
*/
private final Map<ResourceType, Map<String, ResourceSubscriber>> _resourceSubscribers = Maps.immutableEnumMap(
Arrays.stream(ResourceType.values())
.filter(e -> e.typeUrl() != null)
Stream.of(ResourceType.NODE, ResourceType.D2_URI_MAP)
.collect(Collectors.toMap(Function.identity(), e -> new HashMap<>())));
private final Map<ResourceType, WildcardResourceSubscriber> _wildcardSubscribers = Maps.newEnumMap(ResourceType.class);
private final Node _node;
Expand Down Expand Up @@ -174,20 +179,26 @@ public void watchAllXdsResources(WildcardResourceWatcher watcher)
{
_executorService.execute(() ->
{
_log.info("Subscribing to wildcard for resource type: {}", watcher.getType());
WildcardResourceSubscriber subscriber = getWildcardResourceSubscriber(watcher.getType());
if (subscriber == null)
{
subscriber = new WildcardResourceSubscriber(watcher.getType());
_wildcardSubscribers.put(watcher.getType(), subscriber);

ResourceType adjustedType =
(watcher.getType() == ResourceType.D2_URI_MAP && _subscribeToUriGlobCollection)
? ResourceType.D2_URI
: watcher.getType();

_log.info("Subscribing to wildcard for resource type: {}", adjustedType);

if (_adsStream == null && !isInBackoff())
{
startRpcStreamLocal();
}
if (_adsStream != null)
{
_adsStream.sendDiscoveryRequest(watcher.getType(), Collections.singletonList("*"));
_adsStream.sendDiscoveryRequest(adjustedType, Collections.singletonList("*"));
}
}

Expand Down Expand Up @@ -417,7 +428,8 @@ private void handleD2URICollectionResponse(DiscoveryResponseData data)

ResourceSubscriber subscriber =
getResourceSubscriberMap(ResourceType.D2_URI_MAP).get(uriId.getClusterResourceName());
if (subscriber == null)
WildcardResourceSubscriber wildcardSubscriber = getWildcardResourceSubscriber(ResourceType.D2_URI_MAP);
if (subscriber == null && wildcardSubscriber == null)
{
String msg = String.format("Ignoring D2URI resource update for untracked cluster: %s", resourceName);
_log.warn(msg);
Expand All @@ -428,7 +440,17 @@ private void handleD2URICollectionResponse(DiscoveryResponseData data)
// Get or create a new D2URIMapUpdate which is a copy of the existing data for that cluster.
D2URIMapUpdate update = updates.computeIfAbsent(uriId.getClusterResourceName(), k ->
{
D2URIMapUpdate currentData = (D2URIMapUpdate) subscriber._data;
D2URIMapUpdate currentData;
// Use the existing data from whichever subscriber is present. If both are present, they will point to the same
// D2URIMapUpdate.
if (subscriber != null)
{
currentData = (D2URIMapUpdate) subscriber._data;
}
else
{
currentData = (D2URIMapUpdate) wildcardSubscriber._data.get(uriId.getClusterResourceName());
}
if (currentData == null || !currentData.isValid())
{
return new D2URIMapUpdate(null);
Expand Down Expand Up @@ -498,7 +520,7 @@ private void handleResourceUpdate(Map<String, ? extends ResourceUpdate> updates,
{
subscriber.onData(entry.getValue(), _serverMetricsProvider);
}
WildcardResourceSubscriber wildcardSubscriber = _wildcardSubscribers.get(type);
WildcardResourceSubscriber wildcardSubscriber = getWildcardResourceSubscriber(type);
if (wildcardSubscriber != null)
{
wildcardSubscriber.onData(entry.getKey(), entry.getValue());
Expand All @@ -522,7 +544,7 @@ private void handleResourceRemoval(Collection<String> removedResources, Resource
subscriber.onRemoval();
}
}
WildcardResourceSubscriber wildcardSubscriber = _wildcardSubscribers.get(type);
WildcardResourceSubscriber wildcardSubscriber = getWildcardResourceSubscriber(type);
if (wildcardSubscriber != null)
{
removedResources.forEach(wildcardSubscriber::onRemoval);
Expand Down Expand Up @@ -745,18 +767,18 @@ static class WildcardResourceSubscriber
{
private final ResourceType _type;
private final Set<WildcardResourceWatcher> _watchers = new HashSet<>();
private Map<String, ResourceUpdate> _data = new HashMap<>();
private final Map<String, ResourceUpdate> _data = new HashMap<>();

@VisibleForTesting
public Map<String, ResourceUpdate> getData()
public ResourceUpdate getData(String resourceName)
{
return _data;
return _data.get(resourceName);
}

@VisibleForTesting
public void setData(@Nullable Map<String, ResourceUpdate> data)
public void setData(String resourceName, ResourceUpdate data)
{
_data = data;
_data.put(resourceName, data);
}

WildcardResourceSubscriber(ResourceType type)
Expand Down Expand Up @@ -842,7 +864,7 @@ final class RpcRetryTask implements Runnable
public void run()
{
startRpcStreamLocal();
for (ResourceType type : ResourceType.values())
for (ResourceType type : _resourceSubscribers.keySet())
{
Set<String> resources = new HashSet<>(getResourceSubscriberMap(type).keySet());
if (resources.isEmpty() && getWildcardResourceSubscriber(type) == null)
Expand Down Expand Up @@ -1149,8 +1171,7 @@ private void handleRpcStreamClosed(Status error)
{
return;
}
_log.error("ADS stream closed with status {}: {}. Cause: {}", error.getCode(), error.getDescription(),
error.getCause());
_log.error("ADS stream closed with status {}: {}", error.getCode(), error.getDescription(), error.getCause());
_closed = true;
notifyStreamError(error);
cleanUp();
Expand Down
Loading

0 comments on commit 7a55cea

Please sign in to comment.