From 072ce1df4bd7166006198f9448304d85a4d4ad28 Mon Sep 17 00:00:00 2001 From: Bohan Yang Date: Tue, 19 Sep 2023 17:53:04 -0700 Subject: [PATCH] add unit tests --- .../java/com/linkedin/d2/xds/XdsClient.java | 1 - .../com/linkedin/d2/xds/XdsClientImpl.java | 1 - .../d2/xds/XdsToD2PropertiesAdaptor.java | 142 +++++------ .../d2/xds/TestXdsToD2PropertiesAdaptor.java | 237 ++++++++++++++++++ 4 files changed, 308 insertions(+), 73 deletions(-) create mode 100644 d2/src/test/java/com/linkedin/d2/xds/TestXdsToD2PropertiesAdaptor.java diff --git a/d2/src/main/java/com/linkedin/d2/xds/XdsClient.java b/d2/src/main/java/com/linkedin/d2/xds/XdsClient.java index a2b3a8a81b..71ddde5ceb 100644 --- a/d2/src/main/java/com/linkedin/d2/xds/XdsClient.java +++ b/d2/src/main/java/com/linkedin/d2/xds/XdsClient.java @@ -128,7 +128,6 @@ public String getVersion() enum ResourceType { - // TODO: add D2_SYMLINK_NODE type UNKNOWN, D2_NODE, D2_SYMLINK_NODE, D2_NODE_MAP; static ResourceType fromTypeUrl(String typeUrl) diff --git a/d2/src/main/java/com/linkedin/d2/xds/XdsClientImpl.java b/d2/src/main/java/com/linkedin/d2/xds/XdsClientImpl.java index 317f31a94d..307af9c75b 100644 --- a/d2/src/main/java/com/linkedin/d2/xds/XdsClientImpl.java +++ b/d2/src/main/java/com/linkedin/d2/xds/XdsClientImpl.java @@ -547,7 +547,6 @@ private void handleResponse(DiscoveryResponseData response) _responseReceived = true; String respNonce = response.getNonce(); ResourceType resourceType = response.getResourceType(); - // TODO: handle D2_SYMLINK_NODE type switch (resourceType) { case D2_NODE: diff --git a/d2/src/main/java/com/linkedin/d2/xds/XdsToD2PropertiesAdaptor.java b/d2/src/main/java/com/linkedin/d2/xds/XdsToD2PropertiesAdaptor.java index e889b37724..757ab4190e 100644 --- a/d2/src/main/java/com/linkedin/d2/xds/XdsToD2PropertiesAdaptor.java +++ b/d2/src/main/java/com/linkedin/d2/xds/XdsToD2PropertiesAdaptor.java @@ -128,11 +128,6 @@ public void setClusterEventBus(PropertyEventBus clusterEventB _clusterEventBus = clusterEventBus; } - private boolean isSymlinkNode(String nodeNameOrPath) - { - return nodeNameOrPath != null && nodeNameOrPath.indexOf(SYMLINK_NODE_IDENTIFIER) >= 0; - } - public void registerXdsConnectionListener(XdsConnectionListener listener) { _xdsConnectionListeners.add(listener); @@ -142,7 +137,7 @@ public void listenToCluster(String clusterName) { // if cluster name is a symlink, watch for D2SymlinkNode instead String resourceName = D2_CLUSTER_NODE_PREFIX + clusterName; - if (!checkAndListenToSymlink(clusterName, resourceName)) + if (shouldNotListenToSymlink(clusterName, resourceName)) { _watchedClusterResources.computeIfAbsent(clusterName, k -> { @@ -158,7 +153,7 @@ public void listenToUris(String clusterName) { // if cluster name is a symlink, watch for D2SymlinkNode instead String resourceName = D2_URI_NODE_PREFIX + clusterName; - if (!checkAndListenToSymlink(clusterName, resourceName)) + if (shouldNotListenToSymlink(clusterName, resourceName)) { _watchedUriResources.computeIfAbsent(clusterName, k -> { @@ -170,7 +165,18 @@ public void listenToUris(String clusterName) } } - private boolean checkAndListenToSymlink(String symlinkName, String fullResourceName) + public void listenToService(String serviceName) + { + _watchedServiceResources.computeIfAbsent(serviceName, k -> + { + XdsClient.D2NodeResourceWatcher watcher = getServiceResourceWatcher(serviceName); + _xdsClient.watchXdsResource(D2_SERVICE_NODE_PREFIX + serviceName, XdsClient.ResourceType.D2_NODE, + watcher); + return watcher; + }); + } + + private boolean shouldNotListenToSymlink(String symlinkName, String fullResourceName) { boolean isSymlink = isSymlinkNode(symlinkName); if (isSymlink) @@ -187,18 +193,12 @@ private boolean checkAndListenToSymlink(String symlinkName, String fullResourceN }); } - return isSymlink; + return !isSymlink; } - public void listenToService(String serviceName) + private boolean isSymlinkNode(String nodeNameOrPath) { - _watchedServiceResources.computeIfAbsent(serviceName, k -> - { - XdsClient.D2NodeResourceWatcher watcher = getServiceResourceWatcher(serviceName); - _xdsClient.watchXdsResource(D2_SERVICE_NODE_PREFIX + serviceName, XdsClient.ResourceType.D2_NODE, - watcher); - return watcher; - }); + return nodeNameOrPath != null && nodeNameOrPath.indexOf(SYMLINK_NODE_IDENTIFIER) >= 0; } XdsClient.D2NodeResourceWatcher getServiceResourceWatcher(String serviceName) @@ -241,6 +241,60 @@ public void onReconnect() }; } + XdsClient.D2NodeResourceWatcher getClusterResourceWatcher(String clusterName) + { + return new XdsClient.D2NodeResourceWatcher() + { + @Override + public void onChanged(XdsClient.D2NodeUpdate update) + { + if (_clusterEventBus != null) + { + try + { + ClusterProperties clusterProperties = toClusterProperties(update.getNodeData().getData(), + update.getNodeData().getStat().getMzxid()); + // For symlink clusters, ClusterLoadBalancerSubscriber subscribed to the symlinks, instead of the actual node, in event bus, + // so we need to publish under the symlink names. + // For other clusters, publish under its original name. Note that these clusters could be either: + // 1) regular clusters requested normally. + // 2) clusters that were pointed by a symlink previously, but no longer the case after the symlink points to other clusters. + // For case #2: the actualResourceToSymlink map will no longer has an entry for this cluster (removed in + // D2SymlinkNodeResourceWatcher::onChanged), thus the updates will be published under the original cluster name + // (like "FooCluster-prod-ltx1"), which has no subscribers anyway, so no harm to publish. + String clusterNameToPublish = _actualNodeToSymlink.getOrDefault(clusterName, clusterName); + _clusterEventBus.publishInitialize(clusterNameToPublish, clusterProperties); + if (_dualReadStateManager != null) + { + _dualReadStateManager.reportData(clusterName, clusterProperties, true); + } + } + catch (InvalidProtocolBufferException | PropertySerializationException e) + { + _log.error("Failed to parse D2 cluster properties from xDS update. Cluster name: " + clusterName, e); + } + } + } + + @Override + public void onError(Status error) + { + notifyAvailabilityChanges(false); + } + + @Override + public void onReconnect() + { + notifyAvailabilityChanges(true); + } + }; + } + + XdsClient.D2NodeMapResourceWatcher getUriResourceWatcher(String clusterName) + { + return new UriPropertiesResourceWatcher(clusterName); + } + XdsClient.D2SymlinkNodeResourceWatcher getSymlinkResourceWatcher(String symlinkName) { return new XdsClient.D2SymlinkNodeResourceWatcher() @@ -306,60 +360,6 @@ private String removeNodePathPrefix(String path, String prefix) } } - XdsClient.D2NodeResourceWatcher getClusterResourceWatcher(String clusterName) - { - return new XdsClient.D2NodeResourceWatcher() - { - @Override - public void onChanged(XdsClient.D2NodeUpdate update) - { - if (_clusterEventBus != null) - { - try - { - ClusterProperties clusterProperties = toClusterProperties(update.getNodeData().getData(), - update.getNodeData().getStat().getMzxid()); - // For symlink clusters, ClusterLoadBalancerSubscriber subscribed to the symlinks, instead of the actual node, in event bus, - // so we need to publish under the symlink names. - // For other clusters, publish under its original name. Note that these clusters could be either: - // 1) regular clusters requested normally. - // 2) clusters that were pointed by a symlink previously, but no longer the case after the symlink points to other clusters. - // For case #2: the actualResourceToSymlink map will no longer has an entry for this cluster (removed in - // D2SymlinkNodeResourceWatcher::onChanged), thus the updates will be published under the original cluster name - // (like "FooCluster-prod-ltx1"), which has no subscribers anyway, so no harm to publish. - String clusterNameToPublish = _actualNodeToSymlink.getOrDefault(clusterName, clusterName); - _clusterEventBus.publishInitialize(clusterNameToPublish, clusterProperties); - if (_dualReadStateManager != null) - { - _dualReadStateManager.reportData(clusterName, clusterProperties, true); - } - } - catch (InvalidProtocolBufferException | PropertySerializationException e) - { - _log.error("Failed to parse D2 cluster properties from xDS update. Cluster name: " + clusterName, e); - } - } - } - - @Override - public void onError(Status error) - { - notifyAvailabilityChanges(false); - } - - @Override - public void onReconnect() - { - notifyAvailabilityChanges(true); - } - }; - } - - XdsClient.D2NodeMapResourceWatcher getUriResourceWatcher(String clusterName) - { - return new UriPropertiesResourceWatcher(clusterName); - } - private void notifyAvailabilityChanges(boolean isAvailable) { synchronized (_xdsConnectionListeners) diff --git a/d2/src/test/java/com/linkedin/d2/xds/TestXdsToD2PropertiesAdaptor.java b/d2/src/test/java/com/linkedin/d2/xds/TestXdsToD2PropertiesAdaptor.java new file mode 100644 index 0000000000..654e2aa65c --- /dev/null +++ b/d2/src/test/java/com/linkedin/d2/xds/TestXdsToD2PropertiesAdaptor.java @@ -0,0 +1,237 @@ +package com.linkedin.d2.xds; + +import com.google.common.collect.ImmutableMap; +import com.google.protobuf.ListValue; +import com.google.protobuf.Struct; +import com.google.protobuf.Value; +import com.linkedin.d2.balancer.properties.ClusterProperties; +import com.linkedin.d2.balancer.properties.ClusterStoreProperties; +import com.linkedin.d2.balancer.properties.ServiceProperties; +import com.linkedin.d2.balancer.properties.ServiceStoreProperties; +import com.linkedin.d2.balancer.properties.UriProperties; +import com.linkedin.d2.discovery.event.PropertyEventBus; +import com.linkedin.d2.discovery.event.ServiceDiscoveryEventEmitter; +import indis.XdsD2; +import java.util.Collections; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.testng.annotations.Test; + +import static org.mockito.Mockito.*; + + +public class TestXdsToD2PropertiesAdaptor { + private static final String CLUSTER_NODE_PREFIX = "/d2/clusters/"; + private static final String URI_NODE_PREFIX = "/d2/uris/"; + private static final String SYMLINK_NAME = "$FooClusterMaster"; + private static final String PRIMARY_CLUSTER_NAME = "FooClusterMaster-prod-ltx1"; + private static final String PRIMARY_CLUSTER_NAME_2 = "FooClusterMaster-prod-lor1"; + private static final String CLUSTER_SYMLINK_RESOURCE_NAME = CLUSTER_NODE_PREFIX + SYMLINK_NAME; + private static final String PRIMARY_CLUSTER_RESOURCE_NAME = CLUSTER_NODE_PREFIX + PRIMARY_CLUSTER_NAME; + private static final ClusterStoreProperties PRIMARY_CLUSTER_PROPERTIES = new ClusterStoreProperties(PRIMARY_CLUSTER_NAME); + private static final String URI_SYMLINK_RESOURCE_NAME = URI_NODE_PREFIX + SYMLINK_NAME; + private static final String PRIMARY_URI_RESOURCE_NAME = URI_NODE_PREFIX + PRIMARY_CLUSTER_NAME; + + private static final XdsClient.D2NodeMapUpdate DUMMY_NODE_MAP_UPDATE = new XdsClient.D2NodeMapUpdate("", + Collections.emptyMap()); + + @Test + public void testListenToService() + { + XdsToD2PropertiesAdaptorFixture fixture = new XdsToD2PropertiesAdaptorFixture(); + String serviceName = "FooService"; + fixture.getSpiedAdaptor().listenToService(serviceName); + + verify(fixture._xdsClient).watchXdsResource(eq("/d2/services/" + serviceName), eq(XdsClient.ResourceType.D2_NODE), any()); + + XdsClient.D2NodeResourceWatcher symlinkNodeWatcher = + (XdsClient.D2NodeResourceWatcher) fixture._watcherArgumentCaptor.getValue(); + symlinkNodeWatcher.onChanged(new XdsClient.D2NodeUpdate("", XdsD2.D2Node.newBuilder() + .setData(Struct.newBuilder().putAllFields( + ImmutableMap.of( + "serviceName", getProtoStringValue(serviceName), + "clusterName", getProtoStringValue(PRIMARY_CLUSTER_NAME), + "path", getProtoStringValue(""), + "loadBalancerStrategyList", Value.newBuilder().setListValue( + ListValue.newBuilder().addValues(getProtoStringValue("relative")).build() + ).build() + ) + )) + .setStat(XdsD2.Stat.newBuilder().setMzxid(1L).build()) + .build()) + ); + verify(fixture._serviceEventBus).publishInitialize(serviceName, + new ServiceStoreProperties(serviceName, PRIMARY_CLUSTER_NAME, "", + Collections.singletonList("relative")) + ); + } + + @Test + public void testListenToNormalCluster() + { + XdsToD2PropertiesAdaptorFixture fixture = new XdsToD2PropertiesAdaptorFixture(); + fixture.getSpiedAdaptor().listenToCluster(PRIMARY_CLUSTER_NAME); + + verify(fixture._xdsClient).watchXdsResource(eq(PRIMARY_CLUSTER_RESOURCE_NAME), eq(XdsClient.ResourceType.D2_NODE), any()); + verifyClusterNodeUpdate(fixture, PRIMARY_CLUSTER_NAME, PRIMARY_CLUSTER_NAME, PRIMARY_CLUSTER_PROPERTIES); + } + + @Test + public void testListenToClusterSymlink() { + XdsToD2PropertiesAdaptorFixture fixture = new XdsToD2PropertiesAdaptorFixture(); + fixture.getSpiedAdaptor().listenToCluster(SYMLINK_NAME); + + verify(fixture._xdsClient).watchXdsResource(eq(CLUSTER_SYMLINK_RESOURCE_NAME), eq(XdsClient.ResourceType.D2_SYMLINK_NODE), any()); + + XdsClient.D2SymlinkNodeResourceWatcher symlinkNodeWatcher = + (XdsClient.D2SymlinkNodeResourceWatcher) fixture._watcherArgumentCaptor.getValue(); + symlinkNodeWatcher.onChanged(CLUSTER_SYMLINK_RESOURCE_NAME, getSymlinkNodeUpdate(PRIMARY_CLUSTER_RESOURCE_NAME)); + + verify(fixture._xdsClient).watchXdsResource(eq(PRIMARY_CLUSTER_RESOURCE_NAME), eq(XdsClient.ResourceType.D2_NODE), any()); + + XdsClient.D2NodeResourceWatcher clusterNodeWatcher = + (XdsClient.D2NodeResourceWatcher) fixture._watcherArgumentCaptor.getValue(); + clusterNodeWatcher.onChanged(getClusterNodeUpdate(PRIMARY_CLUSTER_NAME)); + + verify(fixture._clusterEventBus).publishInitialize(SYMLINK_NAME, PRIMARY_CLUSTER_PROPERTIES); + + // test update symlink to a new primary cluster + String primaryClusterResourceName2 = CLUSTER_NODE_PREFIX + PRIMARY_CLUSTER_NAME_2; + ClusterStoreProperties primaryClusterProperties2 = new ClusterStoreProperties(PRIMARY_CLUSTER_NAME_2); + + symlinkNodeWatcher.onChanged(CLUSTER_SYMLINK_RESOURCE_NAME, getSymlinkNodeUpdate(primaryClusterResourceName2)); + + verify(fixture._xdsClient).watchXdsResource(eq(primaryClusterResourceName2), eq(XdsClient.ResourceType.D2_NODE), any()); + verifyClusterNodeUpdate(fixture, PRIMARY_CLUSTER_NAME_2, SYMLINK_NAME, primaryClusterProperties2); + + // if the old primary cluster gets an update, it will be published under its original cluster name + // since the symlink points to the new primary cluster now. + clusterNodeWatcher.onChanged(getClusterNodeUpdate(PRIMARY_CLUSTER_NAME_2)); + + verify(fixture._clusterEventBus).publishInitialize(PRIMARY_CLUSTER_NAME, primaryClusterProperties2); + } + + @Test + public void testListenToNormalUri() + { + XdsToD2PropertiesAdaptorFixture fixture = new XdsToD2PropertiesAdaptorFixture(); + fixture.getSpiedAdaptor().listenToUris(PRIMARY_CLUSTER_NAME); + + verify(fixture._xdsClient).watchXdsResource(eq(PRIMARY_URI_RESOURCE_NAME), eq(XdsClient.ResourceType.D2_NODE_MAP), any()); + verifyUriUpdate(fixture, PRIMARY_CLUSTER_NAME, PRIMARY_CLUSTER_NAME); + } + + @Test + public void testListenToUriSymlink() + { + XdsToD2PropertiesAdaptorFixture fixture = new XdsToD2PropertiesAdaptorFixture(); + fixture.getSpiedAdaptor().listenToUris(SYMLINK_NAME); + + verify(fixture._xdsClient).watchXdsResource(eq(URI_SYMLINK_RESOURCE_NAME), eq(XdsClient.ResourceType.D2_SYMLINK_NODE), any()); + + XdsClient.D2SymlinkNodeResourceWatcher symlinkNodeWatcher = + (XdsClient.D2SymlinkNodeResourceWatcher) fixture._watcherArgumentCaptor.getValue(); + symlinkNodeWatcher.onChanged(URI_SYMLINK_RESOURCE_NAME, getSymlinkNodeUpdate(PRIMARY_URI_RESOURCE_NAME)); + + verify(fixture._xdsClient).watchXdsResource(eq(PRIMARY_URI_RESOURCE_NAME), eq(XdsClient.ResourceType.D2_NODE_MAP), any()); + + XdsClient.D2NodeMapResourceWatcher watcher = + (XdsClient.D2NodeMapResourceWatcher) fixture._watcherArgumentCaptor.getValue(); + watcher.onChanged(DUMMY_NODE_MAP_UPDATE); + + verify(fixture._uriEventBus).publishInitialize(SYMLINK_NAME, getDefaultUriProperties(PRIMARY_CLUSTER_NAME)); + + // test update symlink to a new primary cluster + String primaryUriResourceName2 = URI_NODE_PREFIX + PRIMARY_CLUSTER_NAME_2; + symlinkNodeWatcher.onChanged(URI_SYMLINK_RESOURCE_NAME, getSymlinkNodeUpdate(primaryUriResourceName2)); + + verify(fixture._xdsClient).watchXdsResource(eq(primaryUriResourceName2), eq(XdsClient.ResourceType.D2_NODE_MAP), any()); + verifyUriUpdate(fixture, PRIMARY_CLUSTER_NAME_2, SYMLINK_NAME); + + // if the old primary cluster gets an update, it will be published under its original cluster name + // since the symlink points to the new primary cluster now. + watcher.onChanged(DUMMY_NODE_MAP_UPDATE); + + verify(fixture._uriEventBus).publishInitialize(PRIMARY_CLUSTER_NAME, getDefaultUriProperties(PRIMARY_CLUSTER_NAME)); + } + + private static Value getProtoStringValue(String v) + { + return Value.newBuilder().setStringValue(v).build(); + } + + private static XdsClient.D2SymlinkNodeUpdate getSymlinkNodeUpdate(String primaryClusterResourceName) + { + return new XdsClient.D2SymlinkNodeUpdate("", + XdsD2.D2SymlinkNode.newBuilder() + .setMasterClusterNodePath(primaryClusterResourceName) + .build() + ); + } + + private static XdsClient.D2NodeUpdate getClusterNodeUpdate(String clusterName) + { + return new XdsClient.D2NodeUpdate("", XdsD2.D2Node.newBuilder() + .setData(Struct.newBuilder().putFields("clusterName", getProtoStringValue(clusterName))) + .setStat(XdsD2.Stat.newBuilder().setMzxid(1L).build()) + .build() + ); + } + + private void verifyClusterNodeUpdate(XdsToD2PropertiesAdaptorFixture fixture, String clusterName, String expectedPublishName, + ClusterStoreProperties expectedPublishProp) + { + XdsClient.D2NodeResourceWatcher watcher = (XdsClient.D2NodeResourceWatcher) fixture._watcherArgumentCaptor.getValue(); + watcher.onChanged(getClusterNodeUpdate(clusterName)); + verify(fixture._clusterEventBus).publishInitialize(expectedPublishName, expectedPublishProp); + } + + private void verifyUriUpdate(XdsToD2PropertiesAdaptorFixture fixture, String clusterName, String expectedPublishName) + { + XdsClient.D2NodeMapResourceWatcher watcher = (XdsClient.D2NodeMapResourceWatcher) fixture._watcherArgumentCaptor.getValue(); + watcher.onChanged(DUMMY_NODE_MAP_UPDATE); + verify(fixture._uriEventBus).publishInitialize(expectedPublishName, getDefaultUriProperties(clusterName)); + } + + private UriProperties getDefaultUriProperties(String clusterName) + { + return new UriProperties(clusterName, Collections.emptyMap(), Collections.emptyMap(), -1); + } + + private static class XdsToD2PropertiesAdaptorFixture + { + @Mock + XdsClient _xdsClient; + @Mock + ServiceDiscoveryEventEmitter _eventEmitter; + @Mock + PropertyEventBus _clusterEventBus; + @Mock + PropertyEventBus _serviceEventBus; + @Mock + PropertyEventBus _uriEventBus; + @Captor + ArgumentCaptor _watcherArgumentCaptor; + + XdsToD2PropertiesAdaptor _adaptor; + + XdsToD2PropertiesAdaptorFixture() + { + MockitoAnnotations.initMocks(this); + doNothing().when(_xdsClient).watchXdsResource(any(), any(), _watcherArgumentCaptor.capture()); + doNothing().when(_clusterEventBus).publishInitialize(any(), any()); + doNothing().when(_serviceEventBus).publishInitialize(any(), any()); + doNothing().when(_uriEventBus).publishInitialize(any(), any()); + } + + XdsToD2PropertiesAdaptor getSpiedAdaptor() { + _adaptor = spy(new XdsToD2PropertiesAdaptor(_xdsClient, null, _eventEmitter)); + _adaptor.setClusterEventBus(_clusterEventBus); + _adaptor.setServiceEventBus(_serviceEventBus); + _adaptor.setUriEventBus(_uriEventBus); + return _adaptor; + } + } +}