Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for initial resource versions in xds client #1028

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 32 additions & 11 deletions d2/src/main/java/com/linkedin/d2/xds/XdsClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.Objects;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;


Expand Down Expand Up @@ -183,15 +184,19 @@ final void onChanged(String resourceName, ResourceUpdate update)
public interface ResourceUpdate
{
boolean isValid();
@Nonnull
Map<String, String> getVersions();
}

public static final class NodeUpdate implements ResourceUpdate
{
XdsD2.Node _nodeData;
Map<String, String> _versions;

NodeUpdate(XdsD2.Node nodeData)
NodeUpdate(XdsD2.Node nodeData, @Nonnull Map<String, String> versions)
{
_nodeData = nodeData;
_versions = versions;
}

public XdsD2.Node getNodeData()
Expand All @@ -211,13 +216,13 @@ public boolean equals(Object object)
return false;
}
NodeUpdate that = (NodeUpdate) object;
return Objects.equals(_nodeData, that._nodeData);
return Objects.equals(_versions, that._versions) && Objects.equals(_nodeData, that._nodeData);
}

@Override
public int hashCode()
{
return Objects.hash(_nodeData);
return Objects.hash(_versions, _nodeData);
}

@Override
Expand All @@ -226,34 +231,43 @@ public boolean isValid()
return _nodeData != null && !_nodeData.getData().isEmpty();
}

@Nonnull
@Override
public Map<String, String> getVersions() {
return _versions;
}

@Override
public String toString()
{
return MoreObjects.toStringHelper(this).add("_nodeData", _nodeData).toString();
return MoreObjects.toStringHelper(this).add("_versions", _versions).add("_nodeData", _nodeData).toString();
}
}

public static final class D2URIMapUpdate implements ResourceUpdate
{
Map<String, XdsD2.D2URI> _uriMap;
Map<String, String> _versions;

D2URIMapUpdate(Map<String, XdsD2.D2URI> uriMap)
D2URIMapUpdate(Map<String, XdsD2.D2URI> uriMap, @Nonnull Map<String, String> versions)
{
_uriMap = uriMap;
_versions = versions;
}

public Map<String, XdsD2.D2URI> getURIMap()
{
return _uriMap;
}

D2URIMapUpdate putUri(String name, XdsD2.D2URI uri)
D2URIMapUpdate putUri(String name, XdsD2.D2URI uri, String version)
{
if (_uriMap == null)
{
_uriMap = new HashMap<>();
}
_uriMap.put(name, uri);
_versions.put(name, version);
return this;
}

Expand All @@ -263,6 +277,7 @@ D2URIMapUpdate removeUri(String name)
{
_uriMap.remove(name);
}
_versions.remove(name);
return this;
}

Expand All @@ -278,13 +293,13 @@ public boolean equals(Object object)
return false;
}
D2URIMapUpdate that = (D2URIMapUpdate) object;
return Objects.equals(_uriMap, that._uriMap);
return Objects.equals(_versions, that._versions) && Objects.equals(_uriMap, that._uriMap);
}

@Override
public int hashCode()
{
return Objects.hash(_uriMap);
return Objects.hash(_versions, _uriMap);
}

@Override
Expand All @@ -293,15 +308,21 @@ public boolean isValid()
return _uriMap != null;
}

@Nonnull
@Override
public Map<String, String> getVersions() {
return _versions;
}

@Override
public String toString()
{
return MoreObjects.toStringHelper(this).add("_uriMap", _uriMap).toString();
return MoreObjects.toStringHelper(this).add("_versions", _versions).add("_uriMap", _uriMap).toString();
}
}

public static final NodeUpdate EMPTY_NODE_UPDATE = new NodeUpdate(null);
public static final D2URIMapUpdate EMPTY_D2_URI_MAP_UPDATE = new D2URIMapUpdate(null);
public static final NodeUpdate EMPTY_NODE_UPDATE = new NodeUpdate(null, new HashMap<>());
public static final D2URIMapUpdate EMPTY_D2_URI_MAP_UPDATE = new D2URIMapUpdate(null, new HashMap<>());

enum ResourceType
{
Expand Down
42 changes: 29 additions & 13 deletions d2/src/main/java/com/linkedin/d2/xds/XdsClientImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.MapDifference;
import com.google.common.collect.Maps;
import com.google.protobuf.InvalidProtocolBufferException;
Expand Down Expand Up @@ -97,6 +98,7 @@ public class XdsClientImpl extends XdsClient

private final XdsClientJmx _xdsClientJmx;
private final XdsServerMetricsProvider _serverMetricsProvider;
private final Map<String, String> _resourceVersions = new HashMap<>();

@Deprecated
public XdsClientImpl(Node node, ManagedChannel managedChannel, ScheduledExecutorService executorService)
Expand Down Expand Up @@ -167,7 +169,7 @@ public void watchXdsResource(String resourceName, ResourceWatcher watcher)
}
if (_adsStream != null)
{
_adsStream.sendDiscoveryRequest(type, Collections.singletonList(adjustedResourceName));
_adsStream.sendDiscoveryRequest(type, Collections.singletonList(adjustedResourceName), null);
}
}
subscriber.addWatcher(watcher);
Expand Down Expand Up @@ -198,7 +200,7 @@ public void watchAllXdsResources(WildcardResourceWatcher watcher)
}
if (_adsStream != null)
{
_adsStream.sendDiscoveryRequest(adjustedType, Collections.singletonList("*"));
_adsStream.sendDiscoveryRequest(adjustedType, Collections.singletonList("*"), null);
}
}

Expand Down Expand Up @@ -356,7 +358,7 @@ private void handleD2NodeResponse(DiscoveryResponseData data)
{
_log.warn("Received a Node response with no data, resource is : {}", resourceName);
}
updates.put(resourceName, new NodeUpdate(d2Node));
updates.put(resourceName, new NodeUpdate(d2Node, ImmutableMap.of(resourceName, resource.getVersion())));
}
catch (InvalidProtocolBufferException e)
{
Expand Down Expand Up @@ -388,7 +390,7 @@ private void handleD2URIMapResponse(DiscoveryResponseData data)
{
_log.warn("Received a D2URIMap response with no data, resource is : {}", resourceName);
}
updates.put(resourceName, new D2URIMapUpdate(nodeData));
updates.put(resourceName, new D2URIMapUpdate(nodeData, ImmutableMap.of(resourceName, resource.getVersion())));
}
catch (InvalidProtocolBufferException e)
{
Expand Down Expand Up @@ -453,11 +455,11 @@ private void handleD2URICollectionResponse(DiscoveryResponseData data)
}
if (currentData == null || !currentData.isValid())
{
return new D2URIMapUpdate(null);
return new D2URIMapUpdate(null, new HashMap<>());
}
else
{
return new D2URIMapUpdate(new HashMap<>(currentData.getURIMap()));
return new D2URIMapUpdate(new HashMap<>(currentData.getURIMap()), currentData.getVersions());
}
});

Expand All @@ -481,7 +483,7 @@ private void handleD2URICollectionResponse(DiscoveryResponseData data)
try
{
XdsD2.D2URI uri = resource.getResource().unpack(XdsD2.D2URI.class);
update.putUri(uriId.getUriName(), uri);
update.putUri(uriId.getUriName(), uri, resource.getVersion());
}
catch (InvalidProtocolBufferException e)
{
Expand Down Expand Up @@ -866,7 +868,14 @@ public void run()
startRpcStreamLocal();
for (ResourceType type : _resourceSubscribers.keySet())
{
Set<String> resources = new HashSet<>(getResourceSubscriberMap(type).keySet());
Map<String, String> resourceVersions = new HashMap<>();
Set<String> resources = new HashSet<>();
for (Map.Entry<String, ResourceSubscriber> entry : getResourceSubscriberMap(type).entrySet()) {
resources.add(entry.getKey());
if (entry.getValue() != null && entry.getValue().getData() != null) {
resourceVersions.putAll(entry.getValue().getData().getVersions());
}
}
if (resources.isEmpty() && getWildcardResourceSubscriber(type) == null)
{
continue;
Expand All @@ -888,7 +897,7 @@ public void run()
{
resources.add("*");
}
_adsStream.sendDiscoveryRequest(rewrittenType, resources);
_adsStream.sendDiscoveryRequest(rewrittenType, resources, resourceVersions);
}
}
}
Expand All @@ -898,12 +907,16 @@ private static final class DiscoveryRequestData
private final Node _node;
private final ResourceType _resourceType;
private final Collection<String> _resourceNames;
@Nullable
private final Map<String, String> _resourceVersions;

DiscoveryRequestData(Node node, ResourceType resourceType, Collection<String> resourceNames)
DiscoveryRequestData(Node node, ResourceType resourceType, Collection<String> resourceNames,
@Nullable Map<String, String> resourceVersions)
{
_node = node;
_resourceType = resourceType;
_resourceNames = resourceNames;
_resourceVersions = resourceVersions;
}

DeltaDiscoveryRequest toEnvoyProto()
Expand All @@ -912,7 +925,9 @@ DeltaDiscoveryRequest toEnvoyProto()
.setNode(_node.toEnvoyProtoNode())
.addAllResourceNamesSubscribe(_resourceNames)
.setTypeUrl(_resourceType.typeUrl());

if (_resourceVersions != null) {
builder.putAllInitialResourceVersions(_resourceVersions);
}
return builder.build();
}

Expand Down Expand Up @@ -1122,10 +1137,11 @@ public void onCompleted()
/**
* Sends a client-initiated discovery request.
*/
private void sendDiscoveryRequest(ResourceType type, Collection<String> resources)
private void sendDiscoveryRequest(ResourceType type, Collection<String> resources,
@Nullable Map<String, String> resourceVersions)
{
_log.info("Sending {} request for resources: {}", type, resources);
DeltaDiscoveryRequest request = new DiscoveryRequestData(_node, type, resources).toEnvoyProto();
DeltaDiscoveryRequest request = new DiscoveryRequestData(_node, type, resources, resourceVersions).toEnvoyProto();
_requestWriter.onNext(request);
_log.debug("Sent DiscoveryRequest\n{}", request);
}
Expand Down
28 changes: 14 additions & 14 deletions d2/src/test/java/com/linkedin/d2/xds/TestXdsClientImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ public class TestXdsClientImpl
private static final Any PACKED_NODE_WITH_DATA = Any.pack(NODE_WITH_DATA);
private static final Any PACKED_NODE_WITH_DATA2 = Any.pack(NODE_WITH_DATA2);
private static final Any PACKED_NODE_WITH_EMPTY_DATA = Any.pack(NODE_WITH_EMPTY_DATA);
private static final XdsClient.NodeUpdate NODE_UPDATE1 = new XdsClient.NodeUpdate(NODE_WITH_DATA);
private static final XdsClient.NodeUpdate NODE_UPDATE2 = new XdsClient.NodeUpdate(NODE_WITH_DATA2);
private static final XdsClient.NodeUpdate NODE_UPDATE1 = new XdsClient.NodeUpdate(NODE_WITH_DATA, new HashMap<>());
private static final XdsClient.NodeUpdate NODE_UPDATE2 = new XdsClient.NodeUpdate(NODE_WITH_DATA2, new HashMap<>());
private static final List<Resource> NODE_RESOURCES_WITH_DATA1 = Collections.singletonList(
Resource.newBuilder().setVersion(VERSION1).setName(SERVICE_RESOURCE_NAME).setResource(PACKED_NODE_WITH_DATA).build());
private static final List<Resource> NODE_RESOURCES_WITH_DATA2 = Collections.singletonList(
Expand All @@ -72,9 +72,9 @@ public class TestXdsClientImpl
.putUris(URI1, D2URI_1_1) // updated uri1
.putUris(URI2, D2URI_2).build(); // added ur2
private static final D2URIMapUpdate D2_URI_MAP_UPDATE_WITH_DATA1 =
new D2URIMapUpdate(D2_URI_MAP_WITH_DATA1.getUrisMap());
new D2URIMapUpdate(D2_URI_MAP_WITH_DATA1.getUrisMap(), new HashMap<>());
private static final D2URIMapUpdate D2_URI_MAP_UPDATE_WITH_DATA2 =
new D2URIMapUpdate(D2_URI_MAP_WITH_DATA2.getUrisMap());
new D2URIMapUpdate(D2_URI_MAP_WITH_DATA2.getUrisMap(), new HashMap<>());
private static final Any PACKED_D2_URI_MAP_WITH_DATA1 = Any.pack(D2_URI_MAP_WITH_DATA1);
private static final Any PACKED_D2_URI_MAP_WITH_DATA2 = Any.pack(D2_URI_MAP_WITH_DATA2);
private static final Any PACKED_D2_URI_MAP_WITH_EMPTY_DATA = Any.pack(D2_URI_MAP_WITH_EMPTY_DATA);
Expand Down Expand Up @@ -170,8 +170,8 @@ public void testHandleD2NodeResponseWithData()
Assert.assertEquals(Objects.requireNonNull(actualData).getNodeData(), NODE_UPDATE1.getNodeData());

// subscriber original data is invalid, xds server latency won't be tracked
fixture._nodeSubscriber.setData(new XdsClient.NodeUpdate(null));
fixture._nodeWildcardSubscriber.setData(SERVICE_RESOURCE_NAME, new XdsClient.NodeUpdate(null));
fixture._nodeSubscriber.setData(new XdsClient.NodeUpdate(null, new HashMap<>()));
fixture._nodeWildcardSubscriber.setData(SERVICE_RESOURCE_NAME, new XdsClient.NodeUpdate(null, new HashMap<>()));
fixture._xdsClientImpl.handleResponse(DISCOVERY_RESPONSE_NODE_DATA1);
fixture.verifyAckSent(2);
verify(fixture._resourceWatcher, times(2)).onChanged(eq(NODE_UPDATE1));
Expand Down Expand Up @@ -264,8 +264,8 @@ public void testHandleD2URIMapResponseWithData()
Assert.assertEquals(Objects.requireNonNull(actualData).getURIMap(), D2_URI_MAP_UPDATE_WITH_DATA1.getURIMap());

// subscriber original data is invalid, xds server latency won't be tracked
fixture._clusterSubscriber.setData(new XdsClient.D2URIMapUpdate(null));
fixture._uriMapWildcardSubscriber.setData(CLUSTER_RESOURCE_NAME, new XdsClient.D2URIMapUpdate(null));
fixture._clusterSubscriber.setData(new XdsClient.D2URIMapUpdate(null, new HashMap<>()));
fixture._uriMapWildcardSubscriber.setData(CLUSTER_RESOURCE_NAME, new XdsClient.D2URIMapUpdate(null, new HashMap<>()));
fixture._xdsClientImpl.handleResponse(DISCOVERY_RESPONSE_URI_MAP_DATA1);
verify(fixture._resourceWatcher, times(2)).onChanged(eq(D2_URI_MAP_UPDATE_WITH_DATA1));
verify(fixture._wildcardResourceWatcher, times(2)).onChanged(eq(CLUSTER_RESOURCE_NAME), eq(D2_URI_MAP_UPDATE_WITH_DATA1));
Expand Down Expand Up @@ -312,7 +312,7 @@ public void testHandleD2URIMapUpdateWithBadData(DiscoveryResponseData badData, b
D2URIMapUpdate expectedUpdate =
invalidData
? (D2URIMapUpdate) D2_URI_MAP.emptyData()
: new D2URIMapUpdate(Collections.emptyMap());
: new D2URIMapUpdate(Collections.emptyMap(), new HashMap<>());
verify(fixture._resourceWatcher).onChanged(eq(expectedUpdate));
if (!invalidData)
{
Expand Down Expand Up @@ -392,8 +392,8 @@ public void testHandleD2URICollectionResponseWithData()
Assert.assertEquals(Objects.requireNonNull(actualData).getURIMap(), D2_URI_MAP_UPDATE_WITH_DATA1.getURIMap());

// subscriber original data is invalid, xds server latency won't be tracked
fixture._clusterSubscriber.setData(new D2URIMapUpdate(null));
fixture._uriMapWildcardSubscriber.setData(CLUSTER_RESOURCE_NAME, new D2URIMapUpdate(null));
fixture._clusterSubscriber.setData(new D2URIMapUpdate(null, new HashMap<>()));
fixture._uriMapWildcardSubscriber.setData(CLUSTER_RESOURCE_NAME, new D2URIMapUpdate(null, new HashMap<>()));
fixture._xdsClientImpl.handleResponse(createUri1);
fixture.verifyAckSent(2);
verify(fixture._resourceWatcher, times(2)).onChanged(eq(D2_URI_MAP_UPDATE_WITH_DATA1));
Expand All @@ -410,7 +410,7 @@ public void testHandleD2URICollectionResponseWithData()
fixture._xdsClientImpl.handleResponse(createUri2Delete1);
actualData = (D2URIMapUpdate) fixture._clusterSubscriber.getData();
// subscriber data should be updated to D2_URI_MAP_UPDATE_WITH_DATA2
D2URIMapUpdate expectedUpdate = new D2URIMapUpdate(Collections.singletonMap(URI2, D2URI_2));
D2URIMapUpdate expectedUpdate = new D2URIMapUpdate(Collections.singletonMap(URI2, D2URI_2), new HashMap<>());
verify(fixture._resourceWatcher).onChanged(eq(expectedUpdate));
verify(fixture._wildcardResourceWatcher).onChanged(eq(CLUSTER_RESOURCE_NAME), eq(expectedUpdate));
// track latency only for updated/new uri (not for deletion)
Expand All @@ -426,7 +426,7 @@ public void testHandleD2URICollectionResponseWithData()
fixture._xdsClientImpl.handleResponse(deleteUri2);
actualData = (D2URIMapUpdate) fixture._clusterSubscriber.getData();
// subscriber data should be updated to empty map
expectedUpdate = new D2URIMapUpdate(Collections.emptyMap());
expectedUpdate = new D2URIMapUpdate(Collections.emptyMap(), new HashMap<>());
verify(fixture._resourceWatcher).onChanged(eq(expectedUpdate));
verify(fixture._wildcardResourceWatcher).onChanged(eq(CLUSTER_RESOURCE_NAME), eq(expectedUpdate));
verifyNoMoreInteractions(fixture._serverMetricsProvider);
Expand Down Expand Up @@ -543,7 +543,7 @@ public void testResourceSubscriberAddWatcher()
subscriber.addWatcher(watcher);
verify(watcher, times(0)).onChanged(any());

D2URIMapUpdate update = new D2URIMapUpdate(Collections.emptyMap());
D2URIMapUpdate update = new D2URIMapUpdate(Collections.emptyMap(), new HashMap<>());
subscriber.setData(update);
for (int i = 0; i < 10; i++)
{
Expand Down
Loading
Loading