From 8ad594d4170db307ab4f9581207fadd257b33d0f Mon Sep 17 00:00:00 2001 From: Paul Chesnais Date: Tue, 3 Sep 2024 22:24:22 -0400 Subject: [PATCH] Respect `startPublishing` call by always re-notifying watcher in XdsClientImpl (#1018) * Respect `startPublishing` call by always re-notifying watcher in XdsClientImpl When switching from the backup to the primary store (implemented in the XdsClientImpl), the data in the primary store is never replayed. This is different from the backup store behavior which respects the invocation of startPublishing and replays the contents of the store. This means that if the contents of the backup store are different from the contents of the primary store, and the client switches from the backup to the primary, the client will only see the backup values and not the primary values. * Address comments * Updated CHANGELOG --- CHANGELOG.md | 5 +- .../java/com/linkedin/d2/xds/XdsClient.java | 6 ++ .../com/linkedin/d2/xds/XdsClientImpl.java | 6 +- .../d2/xds/XdsToD2PropertiesAdaptor.java | 38 ++++------ .../linkedin/d2/xds/TestXdsClientImpl.java | 18 +++++ .../d2/xds/TestXdsToD2PropertiesAdaptor.java | 72 +++++++++++++------ gradle.properties | 2 +- 7 files changed, 93 insertions(+), 54 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 720c28891c..a094c64db1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,8 @@ and what APIs have changed, if applicable. ## [Unreleased] +## [29.58.4] - 2024-09-03 + ## [29.58.3] - 2024-08-12 - Disable the warmUp flaky unit test @@ -5719,7 +5721,8 @@ patch operations can re-use these classes for generating patch messages. ## [0.14.1] -[Unreleased]: https://github.com/linkedin/rest.li/compare/v29.58.3...master +[Unreleased]: https://github.com/linkedin/rest.li/compare/v29.58.4...master +[29.58.4]: https://github.com/linkedin/rest.li/compare/v29.58.3...v29.58.4 [29.58.3]: https://github.com/linkedin/rest.li/compare/v29.58.2...v29.58.3 [29.58.2]: https://github.com/linkedin/rest.li/compare/v29.58.1...v29.58.2 [29.58.1]: https://github.com/linkedin/rest.li/compare/v29.58.0...v29.58.1 diff --git a/d2/src/main/java/com/linkedin/d2/xds/XdsClient.java b/d2/src/main/java/com/linkedin/d2/xds/XdsClient.java index f357c7e9c8..a0c08b04f0 100644 --- a/d2/src/main/java/com/linkedin/d2/xds/XdsClient.java +++ b/d2/src/main/java/com/linkedin/d2/xds/XdsClient.java @@ -255,6 +255,12 @@ static ResourceType fromTypeUrl(String typeUrl) } } + /** + * Subscribes the given {@link ResourceWatcher} to the resource of the given name. The watcher will be notified when + * the resource is received from the backend. Repeated calls to this function with the same resource name and watcher + * will always notify the given watcher of the current data if it is already present, even if the given watcher was + * already subscribed to said resource. However, the subscription will only be added once. + */ abstract void watchXdsResource(String resourceName, ResourceWatcher watcher); abstract void startRpcStream(); diff --git a/d2/src/main/java/com/linkedin/d2/xds/XdsClientImpl.java b/d2/src/main/java/com/linkedin/d2/xds/XdsClientImpl.java index 6f8a64bb39..e58c6fc711 100644 --- a/d2/src/main/java/com/linkedin/d2/xds/XdsClientImpl.java +++ b/d2/src/main/java/com/linkedin/d2/xds/XdsClientImpl.java @@ -554,15 +554,11 @@ public void setData(@Nullable ResourceUpdate data) void addWatcher(ResourceWatcher watcher) { - if (_watchers.contains(watcher)) - { - _log.warn("Watcher {} already registered", watcher); - return; - } _watchers.add(watcher); if (_data != null) { watcher.onChanged(_data); + _log.debug("Notifying watcher of current data for resource {} of type {}: {}", _resource, _type, _data); } } diff --git a/d2/src/main/java/com/linkedin/d2/xds/XdsToD2PropertiesAdaptor.java b/d2/src/main/java/com/linkedin/d2/xds/XdsToD2PropertiesAdaptor.java index a8b9293135..4e8deabfb7 100644 --- a/d2/src/main/java/com/linkedin/d2/xds/XdsToD2PropertiesAdaptor.java +++ b/d2/src/main/java/com/linkedin/d2/xds/XdsToD2PropertiesAdaptor.java @@ -150,12 +150,9 @@ public void listenToCluster(String clusterName) } else { - _watchedClusterResources.computeIfAbsent(clusterName, k -> - { - XdsClient.NodeResourceWatcher watcher = getClusterResourceWatcher(clusterName); - _xdsClient.watchXdsResource(resourceName, watcher); - return watcher; - }); + XdsClient.ResourceWatcher watcher = + _watchedClusterResources.computeIfAbsent(clusterName, this::getClusterResourceWatcher); + _xdsClient.watchXdsResource(resourceName, watcher); } } @@ -169,36 +166,27 @@ public void listenToUris(String clusterName) } else { - _watchedUriResources.computeIfAbsent(clusterName, k -> - { - XdsClient.D2URIMapResourceWatcher watcher = getUriResourceWatcher(clusterName); - _xdsClient.watchXdsResource(resourceName, watcher); - return watcher; - }); + XdsClient.ResourceWatcher watcher = + _watchedUriResources.computeIfAbsent(clusterName, this::getUriResourceWatcher); + _xdsClient.watchXdsResource(resourceName, watcher); } } public void listenToService(String serviceName) { - _watchedServiceResources.computeIfAbsent(serviceName, k -> - { - XdsClient.NodeResourceWatcher watcher = getServiceResourceWatcher(serviceName); - _xdsClient.watchXdsResource(D2_SERVICE_NODE_PREFIX + serviceName, watcher); - return watcher; - }); + XdsClient.ResourceWatcher watcher = + _watchedServiceResources.computeIfAbsent(serviceName, this::getServiceResourceWatcher); + _xdsClient.watchXdsResource(D2_SERVICE_NODE_PREFIX + serviceName, watcher); } private void listenToSymlink(String name, String fullResourceName) { // use full resource name ("/d2/clusters/$FooClusterMater", "/d2/uris/$FooClusterMaster") as the key // instead of just the symlink name ("$FooClusterMaster") to differentiate clusters and uris symlink resources. - _watchedSymlinkResources.computeIfAbsent(fullResourceName, k -> - { - // use symlink name "$FooClusterMaster" to create the watcher - XdsClient.NodeResourceWatcher watcher = getSymlinkResourceWatcher(fullResourceName, name); - _xdsClient.watchXdsResource(k, watcher); - return watcher; - }); + XdsClient.ResourceWatcher watcher = + _watchedSymlinkResources.computeIfAbsent(fullResourceName, k -> getSymlinkResourceWatcher(k, name)); + // use symlink name "$FooClusterMaster" to create the watcher + _xdsClient.watchXdsResource(fullResourceName, watcher); } XdsClient.NodeResourceWatcher getServiceResourceWatcher(String serviceName) diff --git a/d2/src/test/java/com/linkedin/d2/xds/TestXdsClientImpl.java b/d2/src/test/java/com/linkedin/d2/xds/TestXdsClientImpl.java index b356a64a1e..565a95c11e 100644 --- a/d2/src/test/java/com/linkedin/d2/xds/TestXdsClientImpl.java +++ b/d2/src/test/java/com/linkedin/d2/xds/TestXdsClientImpl.java @@ -17,6 +17,7 @@ import java.util.Map; import java.util.Objects; import org.mockito.Mock; +import org.mockito.Mockito; import org.mockito.MockitoAnnotations; import org.testng.Assert; import org.testng.annotations.DataProvider; @@ -442,6 +443,23 @@ public void testHandleD2URICollectionResponseWithRemoval() Assert.assertEquals(actualData.getURIMap(), D2_URI_MAP_UPDATE_WITH_DATA1.getURIMap()); } + @Test + public void testResourceSubscriberAddWatcher() + { + ResourceSubscriber subscriber = new ResourceSubscriber(NODE, "foo", null); + XdsClient.ResourceWatcher watcher = Mockito.mock(XdsClient.ResourceWatcher.class); + subscriber.addWatcher(watcher); + verify(watcher, times(0)).onChanged(any()); + + D2URIMapUpdate update = new D2URIMapUpdate(Collections.emptyMap()); + subscriber.setData(update); + for (int i = 0; i < 10; i++) + { + subscriber.addWatcher(watcher); + } + verify(watcher, times(10)).onChanged(eq(update)); + } + private static class XdsClientImplFixture { XdsClientImpl _xdsClientImpl; diff --git a/d2/src/test/java/com/linkedin/d2/xds/TestXdsToD2PropertiesAdaptor.java b/d2/src/test/java/com/linkedin/d2/xds/TestXdsToD2PropertiesAdaptor.java index 95e4b09c85..b630ecc86e 100644 --- a/d2/src/test/java/com/linkedin/d2/xds/TestXdsToD2PropertiesAdaptor.java +++ b/d2/src/test/java/com/linkedin/d2/xds/TestXdsToD2PropertiesAdaptor.java @@ -46,7 +46,8 @@ import org.testng.annotations.DataProvider; import org.testng.annotations.Test; -import static com.linkedin.d2.balancer.properties.PropertyKeys.*; +import static com.linkedin.d2.balancer.properties.PropertyKeys.ALLOWED_CLIENT_OVERRIDE_KEYS; +import static com.linkedin.d2.balancer.properties.PropertyKeys.HTTP_REQUEST_TIMEOUT; import static org.mockito.Mockito.*; @@ -112,10 +113,13 @@ public void testListenToService(Map clientOverride, Map clientOverride, Map uriMap = new HashMap<>(Collections.singletonMap(URI_NAME, protoUri)); fixture._uriMapWatcher.onChanged(new XdsClient.D2URIMapUpdate(uriMap)); @@ -247,17 +266,23 @@ public void testListenToNormalUri() throws PropertySerializationException public void testListenToUriSymlink() throws PropertySerializationException { XdsToD2PropertiesAdaptorFixture fixture = new XdsToD2PropertiesAdaptorFixture(); - fixture.getSpiedAdaptor().listenToUris(SYMLINK_NAME); + for (int i = 0; i < 10; i++) + { + fixture.getSpiedAdaptor().listenToUris(SYMLINK_NAME); + } // verify symlink is watched - verify(fixture._xdsClient).watchXdsResource(eq(URI_SYMLINK_RESOURCE_NAME), anyNodeWatcher()); + verify(fixture._xdsClient, times(10)).watchXdsResource(eq(URI_SYMLINK_RESOURCE_NAME), anyNodeWatcher()); // update symlink data NodeResourceWatcher symlinkNodeWatcher = fixture._nodeWatcher; - symlinkNodeWatcher.onChanged(getSymlinkNodeUpdate(PRIMARY_URI_RESOURCE_NAME)); + for (int i = 0; i < 10; i++) + { + symlinkNodeWatcher.onChanged(getSymlinkNodeUpdate(PRIMARY_URI_RESOURCE_NAME)); + } // verify actual cluster of the uris is watched - verify(fixture._xdsClient).watchXdsResource(eq(PRIMARY_URI_RESOURCE_NAME), anyMapWatcher()); + verify(fixture._xdsClient, times(10)).watchXdsResource(eq(PRIMARY_URI_RESOURCE_NAME), anyMapWatcher()); // update uri data D2URIMapResourceWatcher watcher = fixture._uriMapWatcher; @@ -269,9 +294,12 @@ public void testListenToUriSymlink() throws PropertySerializationException // test update symlink to a new primary cluster String primaryUriResourceName2 = URI_NODE_PREFIX + PRIMARY_CLUSTER_NAME_2; - symlinkNodeWatcher.onChanged(getSymlinkNodeUpdate(primaryUriResourceName2)); + for (int i = 0; i < 10; i++) + { + symlinkNodeWatcher.onChanged(getSymlinkNodeUpdate(primaryUriResourceName2)); + } - verify(fixture._xdsClient).watchXdsResource(eq(primaryUriResourceName2), anyMapWatcher()); + verify(fixture._xdsClient, times(10)).watchXdsResource(eq(primaryUriResourceName2), anyMapWatcher()); verifyUriUpdate(fixture, PRIMARY_CLUSTER_NAME_2, SYMLINK_NAME); // if the old primary cluster gets an update, it will be published under its original cluster name diff --git a/gradle.properties b/gradle.properties index 2bf5deb577..8782188e30 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,4 +1,4 @@ -version=29.58.3 +version=29.58.4 group=com.linkedin.pegasus org.gradle.configureondemand=true org.gradle.parallel=true