From 2dd4db837e41df3bd776b9dd54ad88b5ea1a854c Mon Sep 17 00:00:00 2001 From: Bohan Yang Date: Mon, 7 Aug 2023 16:54:03 -0700 Subject: [PATCH] move watcher management logic to a manager class --- ...D2ClientJmxDualReadModeWatcherManager.java | 235 ++++++++++++++++++ .../linkedin/d2/jmx/D2ClientJmxManager.java | 166 ++----------- .../d2/jmx/D2ClientJmxManagerTest.java | 12 +- 3 files changed, 261 insertions(+), 152 deletions(-) create mode 100644 d2/src/main/java/com/linkedin/d2/jmx/D2ClientJmxDualReadModeWatcherManager.java diff --git a/d2/src/main/java/com/linkedin/d2/jmx/D2ClientJmxDualReadModeWatcherManager.java b/d2/src/main/java/com/linkedin/d2/jmx/D2ClientJmxDualReadModeWatcherManager.java new file mode 100644 index 0000000000..7539bc9bea --- /dev/null +++ b/d2/src/main/java/com/linkedin/d2/jmx/D2ClientJmxDualReadModeWatcherManager.java @@ -0,0 +1,235 @@ +/* + Copyright (c) 2023 LinkedIn Corp. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package com.linkedin.d2.jmx; + +import com.google.common.annotations.VisibleForTesting; +import com.linkedin.d2.balancer.LoadBalancerStateItem; +import com.linkedin.d2.balancer.dualread.DualReadStateManager; +import com.linkedin.d2.balancer.properties.ClusterProperties; +import com.linkedin.d2.balancer.properties.ServiceProperties; +import com.linkedin.d2.balancer.properties.UriProperties; +import com.linkedin.d2.balancer.simple.ClusterInfoItem; +import com.linkedin.d2.balancer.simple.SimpleLoadBalancer; +import com.linkedin.d2.balancer.simple.SimpleLoadBalancerState; +import com.linkedin.d2.balancer.strategies.LoadBalancerStrategy; +import com.linkedin.d2.discovery.stores.file.FileStore; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.function.Consumer; + + +public class D2ClientJmxDualReadModeWatcherManager { + + private final DualReadStateManager _dualReadStateManager; + + private D2ClientJmxDualReadModeWatcher _lbDualReadModeWatcher; + private D2ClientJmxDualReadModeWatcher _lbStateDualReadModeWatcher; + private D2ClientJmxDualReadModeWatcher> _fileStoreUriPropertiesDualReadModeWatcher; + private D2ClientJmxDualReadModeWatcher> _fileStoreClusterPropertiesDualReadModeWatcher; + private D2ClientJmxDualReadModeWatcher> _fileStoreServicePropertiesDualReadModeWatcher; + private final ConcurrentMap>> + _servicePropertiesDualReadModeWatchers; + private final ConcurrentMap> _lbStrategyDualReadModeWatchers; + private final ConcurrentMap> _clusterInfoDualReadModeWatchers; + + public D2ClientJmxDualReadModeWatcherManager(DualReadStateManager dualReadStateManager) + { + _dualReadStateManager = dualReadStateManager; + _lbDualReadModeWatcher = null; + _lbStateDualReadModeWatcher = null; + _fileStoreUriPropertiesDualReadModeWatcher = null; + _fileStoreClusterPropertiesDualReadModeWatcher = null; + _fileStoreServicePropertiesDualReadModeWatcher = null; + _servicePropertiesDualReadModeWatchers = new ConcurrentHashMap<>(); + _lbStrategyDualReadModeWatchers = new ConcurrentHashMap<>(); + _clusterInfoDualReadModeWatchers = new ConcurrentHashMap<>(); + } + + public void updateWatcher(SimpleLoadBalancer balancer, Consumer callback) + { + if (_dualReadStateManager != null) + { + if (_lbDualReadModeWatcher == null) + { + _lbDualReadModeWatcher = new D2ClientJmxDualReadModeWatcher<>(balancer, callback); + _dualReadStateManager.addGlobalWatcher(_lbDualReadModeWatcher); + } + _lbDualReadModeWatcher.setLatestJmxProperty(balancer); + } + } + + public void updateWatcher(SimpleLoadBalancerState state, Consumer callback) + { + if (_dualReadStateManager != null) + { + if (_lbStateDualReadModeWatcher == null) + { + _lbStateDualReadModeWatcher = new D2ClientJmxDualReadModeWatcher<>(state, callback); + _dualReadStateManager.addGlobalWatcher(_lbStateDualReadModeWatcher); + } + _lbStateDualReadModeWatcher.setLatestJmxProperty(state); + } + } + + public void updateWatcher(String serviceName, String scheme, LoadBalancerStrategy strategy, Consumer callback) + { + if (_dualReadStateManager != null) + { + D2ClientJmxDualReadModeWatcher currentWatcher = + _lbStrategyDualReadModeWatchers.computeIfAbsent(getWatcherNameForLoadBalancerStrategy(serviceName, scheme), k -> + { + D2ClientJmxDualReadModeWatcher watcher = new D2ClientJmxDualReadModeWatcher<>(strategy, callback); + _dualReadStateManager.addServiceWatcher(serviceName, watcher); + return watcher; + }); + currentWatcher.setLatestJmxProperty(strategy); + } + } + + public void updateWatcher(String clusterName, ClusterInfoItem clusterInfoItem, Consumer callback) + { + if (_dualReadStateManager != null) + { + D2ClientJmxDualReadModeWatcher currentWatcher = + _clusterInfoDualReadModeWatchers.computeIfAbsent(clusterName, k -> + { + D2ClientJmxDualReadModeWatcher watcher = new D2ClientJmxDualReadModeWatcher<>(clusterInfoItem, callback); + _dualReadStateManager.addClusterWatcher(clusterName, watcher); + return watcher; + }); + currentWatcher.setLatestJmxProperty(clusterInfoItem); + } + } + + public void updateWatcher(String serviceName, LoadBalancerStateItem serviceProperties, + Consumer> callback) + { + if (_dualReadStateManager != null) + { + D2ClientJmxDualReadModeWatcher> currentWatcher = + _servicePropertiesDualReadModeWatchers.computeIfAbsent(serviceName, k -> + { + D2ClientJmxDualReadModeWatcher> watcher = + new D2ClientJmxDualReadModeWatcher<>(serviceProperties, callback); + _dualReadStateManager.addServiceWatcher(serviceName, watcher); + return watcher; + }); + currentWatcher.setLatestJmxProperty(serviceProperties); + } + } + + public void updateWatcherForFileStoreUriProperties(FileStore uriStore, Consumer> callback) + { + if (_dualReadStateManager != null) + { + if (_fileStoreUriPropertiesDualReadModeWatcher == null) + { + _fileStoreUriPropertiesDualReadModeWatcher = new D2ClientJmxDualReadModeWatcher<>(uriStore, callback); + _dualReadStateManager.addGlobalWatcher(_fileStoreUriPropertiesDualReadModeWatcher); + } + _fileStoreUriPropertiesDualReadModeWatcher.setLatestJmxProperty(uriStore); + } + } + + public void updateWatcherForFileStoreClusterProperties(FileStore clusterStore, Consumer> callback) + { + if (_dualReadStateManager != null) + { + if (_fileStoreClusterPropertiesDualReadModeWatcher == null) + { + _fileStoreClusterPropertiesDualReadModeWatcher = new D2ClientJmxDualReadModeWatcher<>(clusterStore, callback); + _dualReadStateManager.addGlobalWatcher(_fileStoreClusterPropertiesDualReadModeWatcher); + } + _fileStoreClusterPropertiesDualReadModeWatcher.setLatestJmxProperty(clusterStore); + } + } + + public void updateWatcherForFileStoreServiceProperties(FileStore serviceStore, Consumer> callback) + { + if (_dualReadStateManager != null) + { + if (_fileStoreServicePropertiesDualReadModeWatcher == null) + { + _fileStoreServicePropertiesDualReadModeWatcher = new D2ClientJmxDualReadModeWatcher<>(serviceStore, callback); + _dualReadStateManager.addGlobalWatcher(_fileStoreServicePropertiesDualReadModeWatcher); + } + _fileStoreServicePropertiesDualReadModeWatcher.setLatestJmxProperty(serviceStore); + } + } + + public void removeWatcherForLoadBalancerStrategy(String serviceName, String scheme) + { + DualReadStateManager.DualReadModeWatcher watcher = _lbStrategyDualReadModeWatchers.remove(getWatcherNameForLoadBalancerStrategy(serviceName, scheme)); + if (_dualReadStateManager != null && watcher != null) + { + _dualReadStateManager.removeServiceWatcher(serviceName, watcher); + } + } + + public void removeWatcherForClusterInfoItem(String clusterName) + { + DualReadStateManager.DualReadModeWatcher watcher = _clusterInfoDualReadModeWatchers.remove(clusterName); + if (_dualReadStateManager != null && watcher != null) + { + _dualReadStateManager.removeClusterWatcher(clusterName, watcher); + } + } + + public void removeWatcherForServiceProperties(String serviceName) + { + DualReadStateManager.DualReadModeWatcher watcher = _servicePropertiesDualReadModeWatchers.remove(serviceName); + if (_dualReadStateManager != null && watcher != null) + { + _dualReadStateManager.removeServiceWatcher(serviceName, watcher); + } + } + + private String getWatcherNameForLoadBalancerStrategy(String serviceName, String scheme) + { + return String.format("%s-%s", serviceName, scheme); + } + + + public static final class D2ClientJmxDualReadModeWatcher implements DualReadStateManager.DualReadModeWatcher + { + private T _latestJmxProperty; + private final Consumer _callback; + + D2ClientJmxDualReadModeWatcher(T initialJmxProperty, Consumer callback) + { + _latestJmxProperty = initialJmxProperty; + _callback = callback; + } + + @VisibleForTesting + T getLatestJmxProperty() + { + return _latestJmxProperty; + } + + public void setLatestJmxProperty(T latestJmxProperty) + { + _latestJmxProperty = latestJmxProperty; + } + + @Override + public void onChanged() + { + _callback.accept(_latestJmxProperty); + } + } +} diff --git a/d2/src/main/java/com/linkedin/d2/jmx/D2ClientJmxManager.java b/d2/src/main/java/com/linkedin/d2/jmx/D2ClientJmxManager.java index cc5ee6d12c..9c1d49ab4b 100644 --- a/d2/src/main/java/com/linkedin/d2/jmx/D2ClientJmxManager.java +++ b/d2/src/main/java/com/linkedin/d2/jmx/D2ClientJmxManager.java @@ -16,13 +16,14 @@ package com.linkedin.d2.jmx; -import com.google.common.annotations.VisibleForTesting; import com.linkedin.d2.balancer.LoadBalancerStateItem; import com.linkedin.d2.balancer.clients.TrackerClient; import com.linkedin.d2.balancer.dualread.DualReadLoadBalancerJmx; import com.linkedin.d2.balancer.dualread.DualReadModeProvider; import com.linkedin.d2.balancer.dualread.DualReadStateManager; +import com.linkedin.d2.balancer.properties.ClusterProperties; import com.linkedin.d2.balancer.properties.ServiceProperties; +import com.linkedin.d2.balancer.properties.UriProperties; import com.linkedin.d2.balancer.simple.ClusterInfoItem; import com.linkedin.d2.balancer.simple.SimpleLoadBalancer; import com.linkedin.d2.balancer.simple.SimpleLoadBalancerState; @@ -32,9 +33,6 @@ import com.linkedin.d2.discovery.stores.zk.ZooKeeperEphemeralStore; import com.linkedin.d2.discovery.stores.zk.ZooKeeperPermanentStore; import com.linkedin.util.ArgumentUtil; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.function.Consumer; import javax.annotation.Nonnull; import javax.annotation.Nullable; import org.slf4j.Logger; @@ -73,8 +71,8 @@ When dual read state manager is null, only one discovery source is working (coul private final String _secondaryPrefixForLbPropertyJmxName; - @SuppressWarnings("rawtypes") - private final ConcurrentMap _dualReadStateWatchers; + private final D2ClientJmxDualReadModeWatcherManager _watcherManager; + public enum DiscoverySourceType { @@ -111,40 +109,18 @@ public D2ClientJmxManager(String prefix, _dualReadStateManager = dualReadStateManager; _secondaryGlobalPrefix = String.format("%s-%s", _primaryGlobalPrefix, _discoverySourceType.getPrintName()); _secondaryPrefixForLbPropertyJmxName = String.format("%s-", _discoverySourceType.getPrintName()); - _dualReadStateWatchers = new ConcurrentHashMap<>(); + _watcherManager = new D2ClientJmxDualReadModeWatcherManager(_dualReadStateManager); } - @SuppressWarnings({"unchecked"}) public void setSimpleLoadBalancer(SimpleLoadBalancer balancer) { - if (_dualReadStateManager != null) - { - String watcherName = balancer.getClass().getSimpleName(); - D2ClientJmxDualReadModeWatcher currentWatcher = _dualReadStateWatchers.computeIfAbsent(watcherName, k -> - { - D2ClientJmxDualReadModeWatcher watcher = new D2ClientJmxDualReadModeWatcher<>(balancer, this::doRegisterLoadBalancer); - _dualReadStateManager.addGlobalWatcher(watcher); - return watcher; - }); - currentWatcher.setLatestJmxProperty(balancer); - } + _watcherManager.updateWatcher(balancer, this::doRegisterLoadBalancer); doRegisterLoadBalancer(balancer); } - @SuppressWarnings({"unchecked"}) public void setSimpleLoadBalancerState(SimpleLoadBalancerState state) { - if (_dualReadStateManager != null) - { - String watcherName = state.getClass().getSimpleName(); - D2ClientJmxDualReadModeWatcher currentWatcher = _dualReadStateWatchers.computeIfAbsent(watcherName, k -> - { - D2ClientJmxDualReadModeWatcher watcher = new D2ClientJmxDualReadModeWatcher<>(state, this::doRegisterLoadBalancerState); - _dualReadStateManager.addGlobalWatcher(watcher); - return watcher; - }); - currentWatcher.setLatestJmxProperty(state); - } + _watcherManager.updateWatcher(state, this::doRegisterLoadBalancerState); doRegisterLoadBalancerState(state); state.register(new SimpleLoadBalancerStateListener() @@ -152,29 +128,14 @@ public void setSimpleLoadBalancerState(SimpleLoadBalancerState state) @Override public void onStrategyAdded(String serviceName, String scheme, LoadBalancerStrategy strategy) { - if (_dualReadStateManager != null) - { - D2ClientJmxDualReadModeWatcher currentWatcher = - _dualReadStateWatchers.computeIfAbsent(getWatcherNameForLoadBalancerStrategy(serviceName, scheme), k -> - { - Consumer callback = i -> doRegisterLoadBalancerStrategy(serviceName, scheme, i); - D2ClientJmxDualReadModeWatcher watcher = new D2ClientJmxDualReadModeWatcher<>(strategy, callback); - _dualReadStateManager.addServiceWatcher(serviceName, watcher); - return watcher; - }); - currentWatcher.setLatestJmxProperty(strategy); - } + _watcherManager.updateWatcher(serviceName, scheme, strategy, i -> doRegisterLoadBalancerStrategy(serviceName, scheme, i)); doRegisterLoadBalancerStrategy(serviceName, scheme, strategy); } @Override public void onStrategyRemoved(String serviceName, String scheme, LoadBalancerStrategy strategy) { - DualReadStateManager.DualReadModeWatcher watcher = _dualReadStateWatchers.remove(getWatcherNameForLoadBalancerStrategy(serviceName, scheme)); - if (_dualReadStateManager != null && watcher != null) - { - _dualReadStateManager.removeServiceWatcher(serviceName, watcher); - } + _watcherManager.removeWatcherForLoadBalancerStrategy(serviceName, scheme); _jmxManager.unregister(getLoadBalancerStrategyJmxName(serviceName, scheme)); } @@ -200,18 +161,7 @@ public void onClusterInfoUpdate(ClusterInfoItem clusterInfoItem) && clusterInfoItem.getClusterPropertiesItem().getProperty() != null) { String clusterName = clusterInfoItem.getClusterPropertiesItem().getProperty().getClusterName(); - if (_dualReadStateManager != null) - { - D2ClientJmxDualReadModeWatcher currentWatcher = - _dualReadStateWatchers.computeIfAbsent(getWatcherNameForClusterInfoItem(clusterName), k -> - { - Consumer callback = i -> doRegisterClusterInfo(clusterName, i); - D2ClientJmxDualReadModeWatcher watcher = new D2ClientJmxDualReadModeWatcher<>(clusterInfoItem, callback); - _dualReadStateManager.addClusterWatcher(clusterName, watcher); - return watcher; - }); - currentWatcher.setLatestJmxProperty(clusterInfoItem); - } + _watcherManager.updateWatcher(clusterName, clusterInfoItem, i -> doRegisterClusterInfo(clusterName, i)); doRegisterClusterInfo(clusterName, clusterInfoItem); } } @@ -223,11 +173,7 @@ public void onClusterInfoRemoval(ClusterInfoItem clusterInfoItem) && clusterInfoItem.getClusterPropertiesItem().getProperty() != null) { String clusterName = clusterInfoItem.getClusterPropertiesItem().getProperty().getClusterName(); - DualReadStateManager.DualReadModeWatcher watcher = _dualReadStateWatchers.remove(getWatcherNameForClusterInfoItem(clusterName)); - if (_dualReadStateManager != null && watcher != null) - { - _dualReadStateManager.removeClusterWatcher(clusterName, watcher); - } + _watcherManager.removeWatcherForClusterInfoItem(clusterName); _jmxManager.unregister(getClusterInfoJmxName(clusterName)); } } @@ -238,19 +184,7 @@ public void onServicePropertiesUpdate(LoadBalancerStateItem s if (serviceProperties != null && serviceProperties.getProperty() != null) { String serviceName = serviceProperties.getProperty().getServiceName(); - if (_dualReadStateManager != null) - { - D2ClientJmxDualReadModeWatcher> currentWatcher = - _dualReadStateWatchers.computeIfAbsent(getWatcherNameForServiceProperties(serviceName), k -> - { - Consumer> callback = i -> doRegisterServiceProperties(serviceName, i); - D2ClientJmxDualReadModeWatcher> watcher = - new D2ClientJmxDualReadModeWatcher<>(serviceProperties, callback); - _dualReadStateManager.addServiceWatcher(serviceName, watcher); - return watcher; - }); - currentWatcher.setLatestJmxProperty(serviceProperties); - } + _watcherManager.updateWatcher(serviceName, serviceProperties, i -> doRegisterServiceProperties(serviceName, i)); doRegisterServiceProperties(serviceName, serviceProperties); } } @@ -261,11 +195,7 @@ public void onServicePropertiesRemoval(LoadBalancerStateItem if (serviceProperties != null && serviceProperties.getProperty() != null) { String serviceName = serviceProperties.getProperty().getServiceName(); - DualReadStateManager.DualReadModeWatcher watcher = _dualReadStateWatchers.remove(getWatcherNameForServiceProperties(serviceName)); - if (_dualReadStateManager != null && watcher != null) - { - _dualReadStateManager.removeServiceWatcher(serviceName, watcher); - } + _watcherManager.removeWatcherForServiceProperties(serviceName); _jmxManager.unregister(getServicePropertiesJmxName(serviceName)); } } @@ -287,19 +217,6 @@ private void doRegisterServiceProperties(String serviceName, LoadBalancerStateIt _jmxManager.registerServiceProperties(getServicePropertiesJmxName(serviceName), serviceProperties); } - private String getWatcherNameForLoadBalancerStrategy(String serviceName, String scheme) - { - return String.format("%s-%s-LoadBalancerStrategy", serviceName, scheme); - } - - private String getWatcherNameForClusterInfoItem(String clusterName) { - return String.format("%s-ClusterInfoItem", clusterName); - } - - private String getWatcherNameForServiceProperties(String serviceName) { - return String.format("%s-LoadBalancerStateItem-ServiceProperties", serviceName); - } - private String getClusterInfoJmxName(String clusterName) { return String.format("%s%s-ClusterInfo", getClusterPrefixForLBPropertyJmxNames(clusterName), clusterName); @@ -347,21 +264,21 @@ public void setZkServiceRegistry(ZooKeeperPermanentStore serviceRegistry) _jmxManager.registerZooKeeperPermanentStore(jmxName, serviceRegistry); } - public void setFsUriStore(FileStore uriStore) + public void setFsUriStore(FileStore uriStore) { - addDualReadModeWatcherForFileStore("UriProperties", uriStore, this::doRegisterUriFileStore); + _watcherManager.updateWatcherForFileStoreUriProperties(uriStore, this::doRegisterUriFileStore); doRegisterUriFileStore(uriStore); } - public void setFsClusterStore(FileStore clusterStore) + public void setFsClusterStore(FileStore clusterStore) { - addDualReadModeWatcherForFileStore("ClusterProperties", clusterStore, this::doRegisterClusterFileStore); + _watcherManager.updateWatcherForFileStoreClusterProperties(clusterStore, this::doRegisterClusterFileStore); doRegisterClusterFileStore(clusterStore); } - public void setFsServiceStore(FileStore serviceStore) + public void setFsServiceStore(FileStore serviceStore) { - addDualReadModeWatcherForFileStore("ServiceProperties", serviceStore, this::doRegisterServiceFileStore); + _watcherManager.updateWatcherForFileStoreServiceProperties(serviceStore, this::doRegisterServiceFileStore); doRegisterServiceFileStore(serviceStore); } @@ -405,22 +322,6 @@ private void doRegisterServiceFileStore(FileStore serviceStore) _jmxManager.registerFileStore(jmxName, serviceStore); } - @SuppressWarnings("unchecked") - private void addDualReadModeWatcherForFileStore(String watcherNameSuffix, FileStore store, Consumer> watcherCallback) - { - if (_dualReadStateManager != null) - { - String watcherName = String.format("%s-%s", store.getClass().getSimpleName(), watcherNameSuffix); - D2ClientJmxDualReadModeWatcher> currentWatcher = _dualReadStateWatchers.computeIfAbsent(watcherName, k -> - { - D2ClientJmxDualReadModeWatcher> watcher = new D2ClientJmxDualReadModeWatcher<>(store, watcherCallback); - _dualReadStateManager.addGlobalWatcher(watcher); - return watcher; - }); - currentWatcher.setLatestJmxProperty(store); - } - } - private String getGlobalPrefix() { return isGlobalPrimarySource() ? _primaryGlobalPrefix : _secondaryGlobalPrefix; @@ -477,33 +378,4 @@ private boolean isPrimarySourceHelper(DualReadModeProvider.DualReadMode dualRead return _discoverySourceType == DiscoverySourceType.ZK; } } - - static final class D2ClientJmxDualReadModeWatcher implements DualReadStateManager.DualReadModeWatcher - { - private T _latestJmxProperty; - private final Consumer _callback; - - D2ClientJmxDualReadModeWatcher(T initialJmxProperty, Consumer callback) - { - _latestJmxProperty = initialJmxProperty; - _callback = callback; - } - - @VisibleForTesting - T getLatestJmxProperty() - { - return _latestJmxProperty; - } - - public void setLatestJmxProperty(T latestJmxProperty) - { - _latestJmxProperty = latestJmxProperty; - } - - @Override - public void onChanged() - { - _callback.accept(_latestJmxProperty); - } - } } diff --git a/d2/src/test/java/com/linkedin/d2/jmx/D2ClientJmxManagerTest.java b/d2/src/test/java/com/linkedin/d2/jmx/D2ClientJmxManagerTest.java index 5cbfc105bd..14d4028353 100644 --- a/d2/src/test/java/com/linkedin/d2/jmx/D2ClientJmxManagerTest.java +++ b/d2/src/test/java/com/linkedin/d2/jmx/D2ClientJmxManagerTest.java @@ -100,7 +100,7 @@ private static final class D2ClientJmxManagerFixture ArgumentCaptor> _servicePropertiesArgumentCaptor; @SuppressWarnings("rawtypes") @Captor - ArgumentCaptor _addWatcherCaptor; + ArgumentCaptor _addWatcherCaptor; D2ClientJmxManager _d2ClientJmxManager; private final ClusterInfoItem _clusterInfoItem; @@ -277,18 +277,19 @@ public void testAddAndRemoveWatcherAtServicePropertiesUpdate() D2ClientJmxManager d2ClientJmxManager = fixture.getD2ClientJmxManager("Foo", D2ClientJmxManager.DiscoverySourceType.XDS, true); // Initial dual read mode is ZK only. DualReadStateManager dualReadStateManager = fixture._dualReadStateManager; + dualReadStateManager.updateGlobal(DualReadModeProvider.DualReadMode.OLD_LB_ONLY); Mockito.doReturn(DualReadModeProvider.DualReadMode.OLD_LB_ONLY).when(dualReadStateManager).getGlobalDualReadMode(); Mockito.doReturn(DualReadModeProvider.DualReadMode.OLD_LB_ONLY).when(dualReadStateManager).getServiceDualReadMode(any()); Mockito.doReturn(DualReadModeProvider.DualReadMode.OLD_LB_ONLY).when(dualReadStateManager).getClusterDualReadMode(any()); d2ClientJmxManager.setSimpleLoadBalancerState(fixture._simpleLoadBalancerState); SimpleLoadBalancerState.SimpleLoadBalancerStateListener lbStateListener = fixture._simpleLoadBalancerStateListenerCaptor.getValue(); - ArgumentCaptor addWatcherCaptor = fixture._addWatcherCaptor; + ArgumentCaptor addWatcherCaptor = fixture._addWatcherCaptor; lbStateListener.onServicePropertiesUpdate(SERVICE_PROPERTIES_LOAD_BALANCER_STATE_ITEM); // Verify watcher is added with properties inside verify(dualReadStateManager).addServiceWatcher(eq("S_Foo"), addWatcherCaptor.capture()); - D2ClientJmxManager.D2ClientJmxDualReadModeWatcher> watcher = addWatcherCaptor.getValue(); + D2ClientJmxDualReadModeWatcherManager.D2ClientJmxDualReadModeWatcher> watcher = addWatcherCaptor.getValue(); Assert.assertEquals(watcher.getLatestJmxProperty(), SERVICE_PROPERTIES_LOAD_BALANCER_STATE_ITEM); lbStateListener.onServicePropertiesUpdate(UPDATED_SERVICE_PROPERTIES_LB_STATE_ITEM); @@ -310,18 +311,19 @@ public void testAddAndRemoveWatcherAtClusterInfoItemUpdate() d2ClientJmxManager.setSimpleLoadBalancerState(fixture._simpleLoadBalancerState); // Initial dual read mode is ZK only. DualReadStateManager dualReadStateManager = fixture._dualReadStateManager; + dualReadStateManager.updateGlobal(DualReadModeProvider.DualReadMode.OLD_LB_ONLY); Mockito.doReturn(DualReadModeProvider.DualReadMode.OLD_LB_ONLY).when(dualReadStateManager).getGlobalDualReadMode(); Mockito.doReturn(DualReadModeProvider.DualReadMode.OLD_LB_ONLY).when(dualReadStateManager).getServiceDualReadMode(any()); Mockito.doReturn(DualReadModeProvider.DualReadMode.OLD_LB_ONLY).when(dualReadStateManager).getClusterDualReadMode(any()); SimpleLoadBalancerState.SimpleLoadBalancerStateListener lbStateListener = fixture._simpleLoadBalancerStateListenerCaptor.getValue(); - ArgumentCaptor addWatcherCaptor = fixture._addWatcherCaptor; + ArgumentCaptor addWatcherCaptor = fixture._addWatcherCaptor; lbStateListener.onClusterInfoUpdate(fixture._clusterInfoItem); // Verify watcher is added with properties inside verify(dualReadStateManager).addClusterWatcher(eq("C_Foo"), addWatcherCaptor.capture()); - D2ClientJmxManager.D2ClientJmxDualReadModeWatcher watcher = addWatcherCaptor.getValue(); + D2ClientJmxDualReadModeWatcherManager.D2ClientJmxDualReadModeWatcher watcher = addWatcherCaptor.getValue(); Assert.assertEquals(watcher.getLatestJmxProperty(), fixture._clusterInfoItem); lbStateListener.onClusterInfoUpdate(fixture._updatedClusterInfoItem);