From 522bed0c8c2bfbc1520ae257ed1232d47d63c9a2 Mon Sep 17 00:00:00 2001 From: Bohan Yang Date: Tue, 2 Apr 2024 09:01:48 -0700 Subject: [PATCH] Fix applying client side service config override in INDIS flow (#995) * fix applying client side service config override in INDIS flow * bump minor version and add null guard * fix SimpleLoadBalancerTest flaky failed issue * tiny cleanup --------- Co-authored-by: brycezhongqing --- CHANGELOG.md | 6 +- .../ServicePropertiesJsonSerializer.java | 4 + .../d2/xds/XdsToD2PropertiesAdaptor.java | 9 +- .../XdsLoadBalancerWithFacilitiesFactory.java | 2 +- .../simple/SimpleLoadBalancerTest.java | 135 ++++++++---------- .../d2/xds/TestXdsToD2PropertiesAdaptor.java | 55 ++++++- .../linkedin/d2/xds/XdsToD2SampleClient.java | 2 +- gradle.properties | 2 +- 8 files changed, 125 insertions(+), 90 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1961879f60..a539ffe094 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,9 @@ and what APIs have changed, if applicable. ## [Unreleased] +## [29.52.0] - 2024-04-01 +- fix applying client side service config override in INDIS flow + ## [29.51.14] - 2024-03-27 - Support translating default values for optional non-record/union fields to Avro (when TRANSLATE_DEFAULT is enabled). @@ -5674,7 +5677,8 @@ patch operations can re-use these classes for generating patch messages. ## [0.14.1] -[Unreleased]: https://github.com/linkedin/rest.li/compare/v29.51.14...master +[Unreleased]: https://github.com/linkedin/rest.li/compare/v29.52.0...master +[29.52.0]: https://github.com/linkedin/rest.li/compare/v29.51.14...v29.52.0 [29.51.14]: https://github.com/linkedin/rest.li/compare/v29.51.13...v29.51.14 [29.51.13]: https://github.com/linkedin/rest.li/compare/v29.51.12...v29.51.13 [29.51.12]: https://github.com/linkedin/rest.li/compare/v29.51.11...v29.51.12 diff --git a/d2/src/main/java/com/linkedin/d2/balancer/properties/ServicePropertiesJsonSerializer.java b/d2/src/main/java/com/linkedin/d2/balancer/properties/ServicePropertiesJsonSerializer.java index 28da19ee02..d3e9918943 100644 --- a/d2/src/main/java/com/linkedin/d2/balancer/properties/ServicePropertiesJsonSerializer.java +++ b/d2/src/main/java/com/linkedin/d2/balancer/properties/ServicePropertiesJsonSerializer.java @@ -70,6 +70,10 @@ public ServicePropertiesJsonSerializer() public ServicePropertiesJsonSerializer(Map> clientServicesConfig) { + if (clientServicesConfig == null) + { + clientServicesConfig = Collections.emptyMap(); + } _clientServicesConfig = validateClientServicesConfig(clientServicesConfig); } diff --git a/d2/src/main/java/com/linkedin/d2/xds/XdsToD2PropertiesAdaptor.java b/d2/src/main/java/com/linkedin/d2/xds/XdsToD2PropertiesAdaptor.java index 9484cb32ae..d265653c18 100644 --- a/d2/src/main/java/com/linkedin/d2/xds/XdsToD2PropertiesAdaptor.java +++ b/d2/src/main/java/com/linkedin/d2/xds/XdsToD2PropertiesAdaptor.java @@ -61,7 +61,7 @@ public class XdsToD2PropertiesAdaptor private final XdsClient _xdsClient; private final List _xdsConnectionListeners = Collections.synchronizedList(new ArrayList<>()); - private final ServicePropertiesJsonSerializer _servicePropertiesJsonSerializer = new ServicePropertiesJsonSerializer(); + private final ServicePropertiesJsonSerializer _servicePropertiesJsonSerializer; private final ClusterPropertiesJsonSerializer _clusterPropertiesJsonSerializer = new ClusterPropertiesJsonSerializer(); private final UriPropertiesJsonSerializer _uriPropertiesJsonSerializer = new UriPropertiesJsonSerializer(); private final UriPropertiesMerger _uriPropertiesMerger = new UriPropertiesMerger(); @@ -94,10 +94,17 @@ public class XdsToD2PropertiesAdaptor public XdsToD2PropertiesAdaptor(XdsClient xdsClient, DualReadStateManager dualReadStateManager, ServiceDiscoveryEventEmitter eventEmitter) + { + this(xdsClient, dualReadStateManager, eventEmitter, Collections.emptyMap()); + } + + public XdsToD2PropertiesAdaptor(XdsClient xdsClient, DualReadStateManager dualReadStateManager, + ServiceDiscoveryEventEmitter eventEmitter, Map> clientServicesConfig) { _xdsClient = xdsClient; _dualReadStateManager = dualReadStateManager; _eventEmitter = eventEmitter; + _servicePropertiesJsonSerializer = new ServicePropertiesJsonSerializer(clientServicesConfig); } public void start() diff --git a/d2/src/main/java/com/linkedin/d2/xds/balancer/XdsLoadBalancerWithFacilitiesFactory.java b/d2/src/main/java/com/linkedin/d2/xds/balancer/XdsLoadBalancerWithFacilitiesFactory.java index 7c1b1ebf37..f8bf574053 100644 --- a/d2/src/main/java/com/linkedin/d2/xds/balancer/XdsLoadBalancerWithFacilitiesFactory.java +++ b/d2/src/main/java/com/linkedin/d2/xds/balancer/XdsLoadBalancerWithFacilitiesFactory.java @@ -62,7 +62,7 @@ public LoadBalancerWithFacilities create(D2ClientConfig config) d2ClientJmxManager.registerXdsClientJmx(xdsClient.getXdsClientJmx()); XdsToD2PropertiesAdaptor adaptor = new XdsToD2PropertiesAdaptor(xdsClient, config.dualReadStateManager, - config.serviceDiscoveryEventEmitter); + config.serviceDiscoveryEventEmitter, config.clientServicesConfig); XdsLoadBalancer xdsLoadBalancer = new XdsLoadBalancer(adaptor, executorService, new XdsFsTogglingLoadBalancerFactory(config.lbWaitTimeout, config.lbWaitUnit, config.indisFsBasePath, diff --git a/d2/src/test/java/com/linkedin/d2/balancer/simple/SimpleLoadBalancerTest.java b/d2/src/test/java/com/linkedin/d2/balancer/simple/SimpleLoadBalancerTest.java index ca4b96fedd..62d45eb944 100644 --- a/d2/src/test/java/com/linkedin/d2/balancer/simple/SimpleLoadBalancerTest.java +++ b/d2/src/test/java/com/linkedin/d2/balancer/simple/SimpleLoadBalancerTest.java @@ -131,7 +131,7 @@ public class SimpleLoadBalancerTest private static final String NONEXISTENT_CLUSTER = "nonexistent_cluster"; private static final String SERVICE_NAME = "foo"; private static final ServiceProperties SERVICE_PROPERTIES = - new ServiceProperties(SERVICE_NAME, CLUSTER1_NAME, "/" + SERVICE_NAME, Arrays.asList("degrader"), + new ServiceProperties(SERVICE_NAME, CLUSTER1_NAME, "/" + SERVICE_NAME, Collections.singletonList("degrader"), Collections.emptyMap(), null, null, Collections.emptyList(), null); private static final ClusterProperties CLUSTER_PROPERTIES = @@ -182,25 +182,21 @@ public void doOneTimeTearDown() throws IOException } } - private SimpleLoadBalancer setupLoadBalancer(LoadBalancerState state, MockStore serviceRegistry, + private SimpleLoadBalancer setupLoadBalancer(MockStore serviceRegistry, MockStore clusterRegistry, MockStore uriRegistry) throws ExecutionException, InterruptedException { Map> loadBalancerStrategyFactories = new HashMap<>(); Map clientFactories = new HashMap<>(); - LoadBalancerState loadBalancerState = state; loadBalancerStrategyFactories.put("degrader", new DegraderLoadBalancerStrategyFactoryV3()); clientFactories.put(PropertyKeys.HTTP_SCHEME, new DoNothingClientFactory()); clientFactories.put(PropertyKeys.HTTPS_SCHEME, new DoNothingClientFactory()); - if (loadBalancerState == null) - { - loadBalancerState = - new SimpleLoadBalancerState(new SynchronousExecutorService(), uriRegistry, clusterRegistry, serviceRegistry, - clientFactories, loadBalancerStrategyFactories); - } + LoadBalancerState loadBalancerState = + new SimpleLoadBalancerState(new SynchronousExecutorService(), uriRegistry, clusterRegistry, serviceRegistry, + clientFactories, loadBalancerStrategyFactories); SimpleLoadBalancer loadBalancer = new SimpleLoadBalancer(loadBalancerState, 5, TimeUnit.SECONDS, _d2Executor); @@ -240,7 +236,7 @@ public void testClusterInfoProvider(int numHttp, int numHttps, int expectedNumHt MockStore serviceRegistry = new MockStore<>(); MockStore clusterRegistry = new MockStore<>(); MockStore uriRegistry = new MockStore<>(); - SimpleLoadBalancer loadBalancer = setupLoadBalancer(null, serviceRegistry, clusterRegistry, uriRegistry); + SimpleLoadBalancer loadBalancer = setupLoadBalancer(serviceRegistry, clusterRegistry, uriRegistry); populateUriRegistry(numHttp, numHttps, partitionIdForAdd, uriRegistry); clusterRegistry.put(CLUSTER1_NAME, new ClusterProperties(CLUSTER1_NAME)); @@ -277,7 +273,7 @@ public void testClusterInfoProviderGetDarkClusters() MockStore serviceRegistry = new MockStore<>(); MockStore clusterRegistry = new MockStore<>(); MockStore uriRegistry = new MockStore<>(); - SimpleLoadBalancer loadBalancer = setupLoadBalancer(null, serviceRegistry, clusterRegistry, uriRegistry); + SimpleLoadBalancer loadBalancer = setupLoadBalancer(serviceRegistry, clusterRegistry, uriRegistry); DarkClusterConfig darkClusterConfig = new DarkClusterConfig() .setMultiplier(1.0f) @@ -318,7 +314,7 @@ public void testClusterInfoProviderGetDarkClustersNoUris() MockStore serviceRegistry = new MockStore<>(); MockStore clusterRegistry = new MockStore<>(); MockStore uriRegistry = new MockStore<>(); - SimpleLoadBalancer loadBalancer = setupLoadBalancer(null, serviceRegistry, clusterRegistry, uriRegistry); + SimpleLoadBalancer loadBalancer = setupLoadBalancer(serviceRegistry, clusterRegistry, uriRegistry); DarkClusterConfig darkClusterConfig = new DarkClusterConfig() .setMultiplier(1.0f) @@ -356,7 +352,7 @@ public void testClusterInfoProviderGetDarkClustersNoCluster() MockStore serviceRegistry = new MockStore<>(); MockStore clusterRegistry = new MockStore<>(); MockStore uriRegistry = new MockStore<>(); - SimpleLoadBalancer loadBalancer = setupLoadBalancer(null, serviceRegistry, clusterRegistry, uriRegistry); + SimpleLoadBalancer loadBalancer = setupLoadBalancer(serviceRegistry, clusterRegistry, uriRegistry); loadBalancer.getDarkClusterConfigMap(NONEXISTENT_CLUSTER, new Callback() { @@ -385,7 +381,7 @@ public void testClusterInfoProviderRegisterClusterListener() MockStore serviceRegistry = new MockStore<>(); MockStore clusterRegistry = new MockStore<>(); MockStore uriRegistry = new MockStore<>(); - SimpleLoadBalancer loadBalancer = setupLoadBalancer(null, serviceRegistry, clusterRegistry, uriRegistry); + SimpleLoadBalancer loadBalancer = setupLoadBalancer(serviceRegistry, clusterRegistry, uriRegistry); FutureCallback balancerCallback = new FutureCallback<>(); loadBalancer.start(balancerCallback); balancerCallback.get(); @@ -416,7 +412,7 @@ public void testClusterInfoProviderUnregisterClusterListener() MockStore serviceRegistry = new MockStore<>(); MockStore clusterRegistry = new MockStore<>(); MockStore uriRegistry = new MockStore<>(); - SimpleLoadBalancer loadBalancer = setupLoadBalancer(null, serviceRegistry, clusterRegistry, uriRegistry); + SimpleLoadBalancer loadBalancer = setupLoadBalancer(serviceRegistry, clusterRegistry, uriRegistry); FutureCallback balancerCallback = new FutureCallback<>(); loadBalancer.start(balancerCallback); balancerCallback.get(); @@ -439,6 +435,7 @@ public void testClusterInfoProviderUnregisterClusterListener() } @Test + @SuppressWarnings("unchecked") public void testListenToServiceAndClusterTimeout() throws ExecutionException, InterruptedException { MockStore serviceRegistry = new MockStore<>(); @@ -454,7 +451,7 @@ public void testListenToServiceAndClusterTimeout() throws ExecutionException, In }).when(state).listenToService(any(), any()); SimpleLoadBalancer loadBalancer = spy(new SimpleLoadBalancer(state, 1, TimeUnit.MILLISECONDS, _d2Executor)); // case1: listenToService timeout, and simpleLoadBalancer not hit the cache value - FutureCallback callback = mock(FutureCallback.class); + FutureCallback callback = spy(new FutureCallback<>()); loadBalancer.listenToServiceAndCluster(SERVICE_NAME, callback); try { @@ -471,7 +468,7 @@ public void testListenToServiceAndClusterTimeout() throws ExecutionException, In // case2: listenToService timeout, and simpleLoadBalancer hit the cache value from state LoadBalancerStateItem serviceItem = new LoadBalancerStateItem<>(SERVICE_PROPERTIES, 1, 1); when(state.getServiceProperties(SERVICE_NAME)).thenReturn(serviceItem); - callback = mock(FutureCallback.class); + callback = spy(new FutureCallback<>()); loadBalancer.listenToServiceAndCluster(SERVICE_NAME, callback); // Make sure the onSuccess is called with SERVICE_PROPERTIES only once. callback.get(); @@ -483,7 +480,7 @@ public void testListenToServiceAndClusterTimeout() throws ExecutionException, In spy(new SimpleLoadBalancerState(new SynchronousExecutorService(), uriRegistry, clusterRegistry, serviceRegistry, new HashMap<>(), new HashMap<>())); loadBalancer = spy(new SimpleLoadBalancer(state, 5, TimeUnit.SECONDS, _d2Executor)); - callback = mock(FutureCallback.class); + callback = spy(new FutureCallback<>()); loadBalancer.listenToServiceAndCluster(SERVICE_NAME, callback); callback.get(); // Make sure there is no timeout. @@ -492,6 +489,7 @@ public void testListenToServiceAndClusterTimeout() throws ExecutionException, In } @Test + @SuppressWarnings("unchecked") public void testGetLoadBalancedClusterAndUriProperties() throws InterruptedException, ExecutionException { MockStore serviceRegistry = new MockStore<>(); @@ -508,7 +506,7 @@ public void testGetLoadBalancedClusterAndUriProperties() throws InterruptedExcep }).when(state).listenToCluster(any(), any()); SimpleLoadBalancer loadBalancer = spy(new SimpleLoadBalancer(state, 1, TimeUnit.MILLISECONDS, _d2Executor)); - FutureCallback> callback = mock(FutureCallback.class); + FutureCallback> callback = spy(new FutureCallback<>()); // case1: listenToCluster timeout, and simpleLoadBalancer not hit the cache value loadBalancer.getLoadBalancedClusterAndUriProperties(CLUSTER1_NAME, callback); try @@ -527,7 +525,7 @@ public void testGetLoadBalancedClusterAndUriProperties() throws InterruptedExcep LoadBalancerStateItem uriItem = new LoadBalancerStateItem<>(URI_PROPERTIES, 1, 1); when(state.getClusterProperties(CLUSTER1_NAME)).thenReturn(clusterItem); when(state.getUriProperties(CLUSTER1_NAME)).thenReturn(uriItem); - callback = mock(FutureCallback.class); + callback = spy(new FutureCallback<>()); loadBalancer.getLoadBalancedClusterAndUriProperties(CLUSTER1_NAME, callback); callback.get(); verify(callback).onSuccess(eq(Pair.of(CLUSTER_PROPERTIES, URI_PROPERTIES))); @@ -540,7 +538,7 @@ public void testGetLoadBalancedClusterAndUriProperties() throws InterruptedExcep loadBalancer = spy(new SimpleLoadBalancer(state, 5, TimeUnit.SECONDS, _d2Executor)); clusterRegistry.put(CLUSTER1_NAME, CLUSTER_PROPERTIES); uriRegistry.put(CLUSTER1_NAME, URI_PROPERTIES); - callback = mock(FutureCallback.class); + callback = spy(new FutureCallback<>()); loadBalancer.getLoadBalancedClusterAndUriProperties(CLUSTER1_NAME, callback); callback.get(); verify(loadBalancer, never()).handleTimeoutFromGetClusterAndUriProperties(any(), any()); @@ -659,8 +657,7 @@ public void testLoadBalancerSmoke() throws URISyntaxException, serviceRegistry.put("foo", new ServiceProperties("foo", "cluster-1", - "/foo", - Arrays.asList("degrader"), + "/foo", Collections.singletonList("degrader"), Collections.emptyMap(), null, null, @@ -733,7 +730,7 @@ public void testGetClientWithCustomAffinityRoutingURIProvider(boolean isAffinity MockStore uriRegistry = new MockStore<>(); List prioritizedSchemes = new ArrayList<>(); - SimpleLoadBalancer loadBalancer = setupLoadBalancer(null, serviceRegistry, clusterRegistry, uriRegistry); + SimpleLoadBalancer loadBalancer = setupLoadBalancer(serviceRegistry, clusterRegistry, uriRegistry); //URI uri = URI.create("http://test.qd.com:5678"); Map partitionData = new HashMap<>(1); @@ -753,8 +750,7 @@ public void testGetClientWithCustomAffinityRoutingURIProvider(boolean isAffinity serviceRegistry.put("foo", new ServiceProperties("foo", CLUSTER1_NAME, - "/foo", - Arrays.asList("degrader"), + "/foo", Collections.singletonList("degrader"), Collections.emptyMap(), null, null, @@ -825,7 +821,7 @@ public void testCustomAffinityRoutingSkipped(boolean enableTargetHostHint, boole MockStore uriRegistry = new MockStore<>(); List prioritizedSchemes = new ArrayList<>(); - SimpleLoadBalancer loadBalancer = setupLoadBalancer(null, serviceRegistry, clusterRegistry, uriRegistry); + SimpleLoadBalancer loadBalancer = setupLoadBalancer(serviceRegistry, clusterRegistry, uriRegistry); URI uri1 = URI.create("http://test.qd.com:1234"); @@ -850,8 +846,7 @@ public void testCustomAffinityRoutingSkipped(boolean enableTargetHostHint, boole serviceRegistry.put("foo", new ServiceProperties("foo", CLUSTER1_NAME, - "/foo", - Arrays.asList("degrader"), + "/foo", Collections.singletonList("degrader"), Collections.emptyMap(), null, null, @@ -938,8 +933,7 @@ public void testGetClientWithBannedURI() throws Exception serviceRegistry.put("foo", new ServiceProperties("foo", "cluster-1", - "/foo", - Arrays.asList("degrader"), + "/foo", Collections.singletonList("degrader"), Collections.emptyMap(), null, null, @@ -959,7 +953,6 @@ public void testGetClientWithBannedURI() throws Exception /** * This tests getClient(). When TargetHints and scheme does not match, throw ServiceUnavailableException - * @throws Exception */ @Test (expectedExceptions = ServiceUnavailableException.class) @SuppressWarnings("deprecation") @@ -998,8 +991,6 @@ public void testGetClient() throws Exception loadBalancer.start(balancerCallback); balancerCallback.get(5, TimeUnit.SECONDS); - Map partitionData = new HashMap<>(1); - partitionData.put(DEFAULT_PARTITION_ID, new PartitionData(1d)); Map> uriData = new HashMap<>(3); prioritizedSchemes.add(PropertyKeys.HTTPS_SCHEME); @@ -1008,8 +999,7 @@ public void testGetClient() throws Exception serviceRegistry.put("foo", new ServiceProperties("foo", "cluster-1", - "/foo", - Arrays.asList("degrader"), + "/foo", Collections.singletonList("degrader"), Collections.emptyMap(), null, null, @@ -1030,7 +1020,6 @@ public void testGetClient() throws Exception /** * Tests getClient() when with host override list specified in the request context. - * @throws Exception */ @Test public void testGetClientHostOverrideList() throws Exception @@ -1080,8 +1069,7 @@ public void testGetClientHostOverrideList() throws Exception serviceRegistry.put(service1, new ServiceProperties(service1, cluster1, - "/service1Path", - Arrays.asList("degrader"), + "/service1Path", Collections.singletonList("degrader"), Collections.emptyMap(), null, null, @@ -1090,7 +1078,6 @@ public void testGetClientHostOverrideList() throws Exception uriRegistry.put(cluster1, new UriProperties(cluster1, uriData)); URI override = URI.create("http://override/path"); - URI expected = URI.create("http://override/path/service1Path"); URIRequest uriRequest = new URIRequest("d2://service1"); HostOverrideList clusterOverrides = new HostOverrideList(); @@ -1200,22 +1187,22 @@ public void testGetPartitionInfoOrdering() Assert.assertNull(result.getPartitionInfoMap().get(0)); // results for partition 1 should contain server1, server2 and server3 KeysAndHosts keysAndHosts1 = result.getPartitionInfoMap().get(1); - Assert.assertTrue(keysAndHosts1.getKeys().size() == 1); - Assert.assertTrue(keysAndHosts1.getKeys().iterator().next() == 1); + assertEquals(keysAndHosts1.getKeys().size(), 1); + assertEquals((int) keysAndHosts1.getKeys().iterator().next(), 1); List ordering1 = keysAndHosts1.getHosts(); // results for partition 2 should be the same as partition1. KeysAndHosts keysAndHosts2 = result.getPartitionInfoMap().get(2); - Assert.assertTrue(keysAndHosts2.getKeys().size() == 1); - Assert.assertTrue(keysAndHosts2.getKeys().iterator().next() == 2); + assertEquals(keysAndHosts2.getKeys().size(), 1); + assertEquals((int) keysAndHosts2.getKeys().iterator().next(), 2); List ordering2 = keysAndHosts2.getHosts(); //for partition 3 KeysAndHosts keysAndHosts3 = result.getPartitionInfoMap().get(3); - Assert.assertTrue(keysAndHosts3.getKeys().size() == 1); - Assert.assertTrue(keysAndHosts3.getKeys().iterator().next() == 3); + assertEquals(keysAndHosts3.getKeys().size(), 1); + assertEquals((int) keysAndHosts3.getKeys().iterator().next(), 3); List ordering3 = keysAndHosts3.getHosts(); // Just compare the size and contents of the list, not the ordering. - Assert.assertTrue(ordering1.size() == 3); + assertEquals(ordering1.size(), 3); List allServers = new ArrayList<>(); allServers.add(server1); allServers.add(server2); @@ -1388,7 +1375,7 @@ public void testGetAllPartitionMultipleHostsOrdering() allServers.add(server2); allServers.add(server3); - Assert.assertTrue(ordering1.size() == 3); + assertEquals(ordering1.size(), 3); Assert.assertTrue(ordering1.containsAll(allServers)); // partition 2 should be the same as partition 1 @@ -1446,12 +1433,6 @@ public void testLoadBalancerWithPartitionsSmoke() throws URISyntaxException, URI uri2 = URI.create("http://test.qa2.com:2345"); URI uri3 = URI.create("http://test.qa3.com:6789"); - Map uris = new HashMap<>(); - - uris.put(uri1, 1d); - uris.put(uri2, 1d); - uris.put(uri3, 1d); - Map> partitionDesc = new HashMap<>(); @@ -1498,8 +1479,7 @@ public void testLoadBalancerWithPartitionsSmoke() throws URISyntaxException, serviceRegistry.put("foo", new ServiceProperties("foo", "cluster-1", - "/foo", - Arrays.asList("degrader"), + "/foo", Collections.singletonList("degrader"), Collections.singletonMap(PropertyKeys.HTTP_LB_CONSISTENT_HASH_ALGORITHM, "pointBased"), null, null, @@ -1564,12 +1544,12 @@ public void testLoadBalancerWithPartitionsSmoke() throws URISyntaxException, } else if (partitionMethod == 1) { - assertTrue(ii % 2 == 0); + assertEquals(ii % 2, 0); } else { str[0] = ii + ""; - assertTrue(hashFunction.hash(str) % 2 == 0); + assertEquals(hashFunction.hash(str) % 2, 0); } } // check if only key belonging to partition 1 gets uri3 @@ -1581,12 +1561,12 @@ else if (partitionMethod == 1) } else if (partitionMethod == 1) { - assertTrue(ii % 2 == 1); + assertEquals(ii % 2, 1); } else { str[0] = ii + ""; - assertTrue(hashFunction.hash(str) % 2 == 1); + assertEquals(hashFunction.hash(str) % 2, 1); } } } @@ -1657,7 +1637,8 @@ else if (partitionMethod == 1) assertTrue(client.getDecoratedClient() instanceof RewriteClient); RewriteClient rewriteClient = (RewriteClient) client.getDecoratedClient(); assertTrue(rewriteClient.getDecoratedClient() instanceof TrackerClient); - assertTrue(((TrackerClient) rewriteClient.getDecoratedClient()).getPartitionWeight(DEFAULT_PARTITION_ID) == 1.0d); + assertEquals(((TrackerClient) rewriteClient.getDecoratedClient()).getPartitionWeight(DEFAULT_PARTITION_ID), + 1.0d); } final CountDownLatch latch = new CountDownLatch(1); @@ -1698,7 +1679,7 @@ public void testLoadBalancerWithWait() throws URISyntaxException, balancer.getClient(uriRequest, new RequestContext()); fail("should have received a service unavailable exception, case 1"); } - catch (ServiceUnavailableException e) + catch (ServiceUnavailableException ignored) { } @@ -1709,7 +1690,7 @@ public void testLoadBalancerWithWait() throws URISyntaxException, balancer.getClient(uriRequest, new RequestContext()); fail("should have received a service unavailable exception, case 2"); } - catch (ServiceUnavailableException e) + catch (ServiceUnavailableException ignored) { } @@ -1720,7 +1701,7 @@ public void testLoadBalancerWithWait() throws URISyntaxException, balancer.getClient(uriRequest, new RequestContext()); fail("should have received a service unavailable exception, case 3"); } - catch (ServiceUnavailableException e) + catch (ServiceUnavailableException ignored) { } @@ -1731,7 +1712,7 @@ public void testLoadBalancerWithWait() throws URISyntaxException, balancer.getClient(uriRequest, new RequestContext()); fail("should have received a service unavailable exception, case 4"); } - catch (ServiceUnavailableException e) + catch (ServiceUnavailableException ignored) { } @@ -1742,7 +1723,7 @@ public void testLoadBalancerWithWait() throws URISyntaxException, balancer.getClient(uriRequest, new RequestContext()); fail("should have received a service unavailable exception, case 5"); } - catch (ServiceUnavailableException e) + catch (ServiceUnavailableException ignored) { } @@ -1753,7 +1734,7 @@ public void testLoadBalancerWithWait() throws URISyntaxException, balancer.getClient(uriRequest, new RequestContext()); fail("should have received a service unavailable exception, case 6"); } - catch (ServiceUnavailableException e) + catch (ServiceUnavailableException ignored) { } @@ -1764,7 +1745,7 @@ public void testLoadBalancerWithWait() throws URISyntaxException, balancer.getClient(uriRequest, new RequestContext()); fail("should have received a service unavailable exception, case 7"); } - catch (ServiceUnavailableException e) + catch (ServiceUnavailableException ignored) { } @@ -1775,7 +1756,7 @@ public void testLoadBalancerWithWait() throws URISyntaxException, balancer.getClient(uriRequest, new RequestContext()); fail("should have received a service unavailable exception, case 8"); } - catch (ServiceUnavailableException e) + catch (ServiceUnavailableException ignored) { } @@ -1784,11 +1765,9 @@ public void testLoadBalancerWithWait() throws URISyntaxException, try { balancer.getClient(uriRequest, new RequestContext()); - fail("should have received a service unavailable exception, case 9" + - "" + - ""); + fail("should have received a service unavailable exception, case 9"); } - catch (ServiceUnavailableException e) + catch (ServiceUnavailableException ignored) { } @@ -1799,7 +1778,7 @@ public void testLoadBalancerWithWait() throws URISyntaxException, balancer.getClient(uriRequest, new RequestContext()); fail("should have received a service unavailable exception, case 10"); } - catch (ServiceUnavailableException e) + catch (ServiceUnavailableException ignored) { } @@ -1977,9 +1956,8 @@ public static class DoNothingClientFactory implements TransportClientFactory { private final AtomicLong _count = new AtomicLong(); - @SuppressWarnings("unchecked") @Override - public TransportClient getClient(Map properties) + public TransportClient getClient(Map properties) { _count.incrementAndGet(); if (properties.containsKey("foobar")) @@ -2098,7 +2076,7 @@ public int getPartitionId(URI uri) public int getPartitionId(String key) throws PartitionAccessException { - Integer i = Integer.parseInt(key); + int i = Integer.parseInt(key); if (i == 1) { return 1; @@ -2182,8 +2160,7 @@ public void testLoadBalancerDropRate() throws ServiceUnavailableException, serviceRegistry.put("foo", new ServiceProperties("foo", "cluster-1", - "/foo", - Arrays.asList("degrader"), + "/foo", Collections.singletonList("degrader"), Collections.emptyMap(), null, null, diff --git a/d2/src/test/java/com/linkedin/d2/xds/TestXdsToD2PropertiesAdaptor.java b/d2/src/test/java/com/linkedin/d2/xds/TestXdsToD2PropertiesAdaptor.java index 866558366d..9a98802b6b 100644 --- a/d2/src/test/java/com/linkedin/d2/xds/TestXdsToD2PropertiesAdaptor.java +++ b/d2/src/test/java/com/linkedin/d2/xds/TestXdsToD2PropertiesAdaptor.java @@ -37,11 +37,15 @@ import indis.XdsD2; import java.net.URI; import java.util.Collections; +import java.util.HashMap; +import java.util.Map; import org.mockito.Mock; import org.mockito.MockitoAnnotations; import org.testng.Assert; +import org.testng.annotations.DataProvider; import org.testng.annotations.Test; +import static com.linkedin.d2.balancer.properties.PropertyKeys.*; import static org.mockito.Mockito.*; @@ -70,12 +74,39 @@ public class TestXdsToD2PropertiesAdaptor { private static final XdsClient.NodeUpdate EMPTY_NODE_DATA = new XdsClient.NodeUpdate(null); private static final XdsClient.D2URIMapUpdate EMPTY_DATA_URI_MAP = new XdsClient.D2URIMapUpdate(null); - @Test - public void testListenToService() + /* Provide { + * @clientOverride transport port client properties set on client override + * @original original transport client properties fetched from SD backend + * @expected overridden transport client properties after applying client override + * } + */ + @DataProvider + public Object[][] provideTransportClientProperties() + { + + Map original = new HashMap<>(); + original.put(HTTP_REQUEST_TIMEOUT, "1000"); + original.put(ALLOWED_CLIENT_OVERRIDE_KEYS, + Collections.singletonList(HTTP_REQUEST_TIMEOUT)); + + Map overridden = new HashMap<>(); + overridden.put(HTTP_REQUEST_TIMEOUT, "20000"); + overridden.put(ALLOWED_CLIENT_OVERRIDE_KEYS, + Collections.singletonList(HTTP_REQUEST_TIMEOUT)); + + return new Object[][]{ + {Collections.emptyMap(), original, original}, + {Collections.singletonMap(HTTP_REQUEST_TIMEOUT, "20000"), original, overridden} + }; + } + @Test(dataProvider = "provideTransportClientProperties") + public void testListenToService(Map clientOverride, Map original, + Map overridden) { XdsToD2PropertiesAdaptorFixture fixture = new XdsToD2PropertiesAdaptorFixture(); String serviceName = "FooService"; - fixture.getSpiedAdaptor().listenToService(serviceName); + fixture.getSpiedAdaptor(Collections.singletonMap(serviceName, clientOverride)) + .listenToService(serviceName); verify(fixture._xdsClient).watchXdsResource(eq("/d2/services/" + serviceName), anyNodeWatcher()); @@ -88,7 +119,10 @@ public void testListenToService() serviceName, PRIMARY_CLUSTER_NAME, "", - Collections.singletonList("relative") + Collections.singletonList("relative"), + Collections.emptyMap(), + original, + Collections.emptyMap(), Collections.emptyList(), Collections.emptySet() ) ) ) @@ -98,7 +132,10 @@ public void testListenToService() ); verify(fixture._serviceEventBus).publishInitialize(serviceName, new ServiceStoreProperties(serviceName, PRIMARY_CLUSTER_NAME, "", - Collections.singletonList("relative")) + Collections.singletonList("relative"), + Collections.emptyMap(), + overridden, + Collections.emptyMap(), Collections.emptyList(), Collections.emptySet()) ); } @@ -375,7 +412,13 @@ private static class XdsToD2PropertiesAdaptorFixture XdsToD2PropertiesAdaptor getSpiedAdaptor() { - _adaptor = spy(new XdsToD2PropertiesAdaptor(_xdsClient, null, _eventEmitter)); + return getSpiedAdaptor(Collections.emptyMap()); + } + + XdsToD2PropertiesAdaptor getSpiedAdaptor(Map> clientServicesConfig) + { + _adaptor = spy(new XdsToD2PropertiesAdaptor(_xdsClient, null, + _eventEmitter, clientServicesConfig)); _adaptor.setClusterEventBus(_clusterEventBus); _adaptor.setServiceEventBus(_serviceEventBus); _adaptor.setUriEventBus(_uriEventBus); diff --git a/d2/src/test/java/com/linkedin/d2/xds/XdsToD2SampleClient.java b/d2/src/test/java/com/linkedin/d2/xds/XdsToD2SampleClient.java index 224ba44e6b..c3741000ce 100644 --- a/d2/src/test/java/com/linkedin/d2/xds/XdsToD2SampleClient.java +++ b/d2/src/test/java/com/linkedin/d2/xds/XdsToD2SampleClient.java @@ -89,7 +89,7 @@ public static void main(String[] args) throws Exception XdsChannelFactory xdsChannelFactory = new XdsChannelFactory(sslContext, xdsServer); XdsClient xdsClient = new XdsClientImpl(node, xdsChannelFactory.createChannel(), - Executors.newSingleThreadScheduledExecutor(), XdsClientImpl.DEFAULT_READY_TIMEOUT_MILLIS); + Executors.newSingleThreadScheduledExecutor(), XdsClientImpl.DEFAULT_READY_TIMEOUT_MILLIS, false); DualReadStateManager dualReadStateManager = new DualReadStateManager( () -> DualReadModeProvider.DualReadMode.DUAL_READ, diff --git a/gradle.properties b/gradle.properties index dc3d2d563c..e541423404 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,4 +1,4 @@ -version=29.51.14 +version=29.52.0 group=com.linkedin.pegasus org.gradle.configureondemand=true org.gradle.parallel=true