From e18c663e481e71b25541ac3939b2e80fc11174e6 Mon Sep 17 00:00:00 2001 From: Bohan Yang Date: Fri, 28 Jul 2023 11:33:56 -0700 Subject: [PATCH 1/4] Differentiate LB metrics between ZK and xDS read flows --- CHANGELOG.md | 5 +++-- .../java/com/linkedin/d2/jmx/D2ClientJmxManager.java | 10 +++++++--- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f7e7b3f27a..502845ad87 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,10 +13,11 @@ When updating the changelog, remember to be very clear about what behavior has c and what APIs have changed, if applicable. ## [Unreleased] - -## [29.43.11] - 2023-08-01 - fix logging issues about observer host and dual read mode +## [29.43.11] - 2023-07-28 +- differentiate LB metrics between ZK and xDS read flows. + ## [29.43.10] - 2023-07-24 - set log level of dual read mode changes to info. diff --git a/d2/src/main/java/com/linkedin/d2/jmx/D2ClientJmxManager.java b/d2/src/main/java/com/linkedin/d2/jmx/D2ClientJmxManager.java index e2e3591d49..5a9a64e49d 100644 --- a/d2/src/main/java/com/linkedin/d2/jmx/D2ClientJmxManager.java +++ b/d2/src/main/java/com/linkedin/d2/jmx/D2ClientJmxManager.java @@ -127,19 +127,23 @@ public void onServicePropertiesRemoval(LoadBalancerStateItem } } + private String getPrefixForLBJmxNames() { + return _prefix.contains("-xDS") ? String.format("%s-", _prefix) : ""; + } + private String getClusterInfoJmxName(String clusterName) { - return String.format("%s-ClusterInfo", clusterName); + return String.format("%s%s-ClusterInfo", getPrefixForLBJmxNames(), clusterName); } private String getServicePropertiesJmxName(String serviceName) { - return String.format("%s-ServiceProperties", serviceName); + return String.format("%s%s-ServiceProperties", getPrefixForLBJmxNames(), serviceName); } private String getLoadBalancerStrategyJmxName(String serviceName, String scheme) { - return serviceName + "-" + scheme + "-LoadBalancerStrategy"; + return String.format("%s%s-%s-LoadBalancerStrategy", getPrefixForLBJmxNames(), serviceName, scheme); } }); } From e8b922c848ba5123f86fd801dde494ba9c483b7b Mon Sep 17 00:00:00 2001 From: Bohan Yang Date: Tue, 1 Aug 2023 09:13:14 -0700 Subject: [PATCH 2/4] dynamically switch jmx/sensor names based on dual read mode and source type --- CHANGELOG.md | 11 +- ...LastSeenBalancerWithFacilitiesFactory.java | 3 +- ...ZKFSLoadBalancerWithFacilitiesFactory.java | 3 +- .../dualread/DualReadStateManager.java | 78 ++- .../linkedin/d2/jmx/D2ClientJmxManager.java | 392 ++++++++++++-- .../java/com/linkedin/d2/jmx/JmxManager.java | 6 +- .../XdsLoadBalancerWithFacilitiesFactory.java | 4 +- .../d2/jmx/D2ClientJmxManagerTest.java | 499 ++++++++++++++---- gradle.properties | 2 +- .../test/java/test/r2/integ/TestJetty404.java | 3 +- ...elPoolManagerFactorySharingConnection.java | 3 +- 11 files changed, 844 insertions(+), 160 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 502845ad87..a47f99f11d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,10 +13,12 @@ When updating the changelog, remember to be very clear about what behavior has c and what APIs have changed, if applicable. ## [Unreleased] -- fix logging issues about observer host and dual read mode -## [29.43.11] - 2023-07-28 -- differentiate LB metrics between ZK and xDS read flows. +## [29.44.0] - 2023-08-06 +- dynamically switch jmx/sensor names based on dual read mode and source type + +## [29.43.11] - 2023-08-01 +- fix logging issues about observer host and dual read mode ## [29.43.10] - 2023-07-24 - set log level of dual read mode changes to info. @@ -5513,7 +5515,8 @@ patch operations can re-use these classes for generating patch messages. ## [0.14.1] -[Unreleased]: https://github.com/linkedin/rest.li/compare/v29.43.11...master +[Unreleased]: https://github.com/linkedin/rest.li/compare/v29.44.0...master +[29.44.0]: https://github.com/linkedin/rest.li/compare/v29.43.11...v29.44.0 [29.43.11]: https://github.com/linkedin/rest.li/compare/v29.43.10...v29.43.11 [29.43.10]: https://github.com/linkedin/rest.li/compare/v29.43.9...v29.43.10 [29.43.9]: https://github.com/linkedin/rest.li/compare/v29.43.8...v29.43.9 diff --git a/d2/src/main/java/com/linkedin/d2/balancer/LastSeenBalancerWithFacilitiesFactory.java b/d2/src/main/java/com/linkedin/d2/balancer/LastSeenBalancerWithFacilitiesFactory.java index 86c83d4003..92e6badba8 100644 --- a/d2/src/main/java/com/linkedin/d2/balancer/LastSeenBalancerWithFacilitiesFactory.java +++ b/d2/src/main/java/com/linkedin/d2/balancer/LastSeenBalancerWithFacilitiesFactory.java @@ -59,7 +59,8 @@ public LoadBalancerWithFacilities create(D2ClientConfig config) { LOG.info("Creating D2 LoadBalancer based on LastSeenLoadBalancerWithFacilities"); - D2ClientJmxManager d2ClientJmxManager = new D2ClientJmxManager(config.d2JmxManagerPrefix, config.jmxManager); + D2ClientJmxManager d2ClientJmxManager = new D2ClientJmxManager(config.d2JmxManagerPrefix, config.jmxManager, + D2ClientJmxManager.DiscoverySourceType.ZK, config.dualReadStateManager); // init connection ZKConnectionBuilder zkConnectionBuilder = new ZKConnectionBuilder(config.zkHosts); diff --git a/d2/src/main/java/com/linkedin/d2/balancer/ZKFSLoadBalancerWithFacilitiesFactory.java b/d2/src/main/java/com/linkedin/d2/balancer/ZKFSLoadBalancerWithFacilitiesFactory.java index 6c6046e12a..7b666c5bf3 100644 --- a/d2/src/main/java/com/linkedin/d2/balancer/ZKFSLoadBalancerWithFacilitiesFactory.java +++ b/d2/src/main/java/com/linkedin/d2/balancer/ZKFSLoadBalancerWithFacilitiesFactory.java @@ -73,7 +73,8 @@ private ZKFSLoadBalancer.TogglingLoadBalancerFactory createLoadBalancerFactory(D loadBalancerComponentFactory = config.componentFactory; } - D2ClientJmxManager d2ClientJmxManager = new D2ClientJmxManager(config.d2JmxManagerPrefix, config.jmxManager); + D2ClientJmxManager d2ClientJmxManager = new D2ClientJmxManager(config.d2JmxManagerPrefix, config.jmxManager, + D2ClientJmxManager.DiscoverySourceType.ZK, config.dualReadStateManager); return new ZKFSTogglingLoadBalancerFactoryImpl(loadBalancerComponentFactory, config.lbWaitTimeout, diff --git a/d2/src/main/java/com/linkedin/d2/balancer/dualread/DualReadStateManager.java b/d2/src/main/java/com/linkedin/d2/balancer/dualread/DualReadStateManager.java index 8f50401599..3318f8d0bb 100644 --- a/d2/src/main/java/com/linkedin/d2/balancer/dualread/DualReadStateManager.java +++ b/d2/src/main/java/com/linkedin/d2/balancer/dualread/DualReadStateManager.java @@ -22,6 +22,7 @@ import com.linkedin.d2.balancer.properties.UriProperties; import com.linkedin.util.clock.Clock; import com.linkedin.util.clock.SystemClock; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ScheduledExecutorService; @@ -32,7 +33,6 @@ /** * Checks and manages the global and per-service dual read state. * Provides monitoring of the dual read data. - * * The dual read state is broken down into global and per-service state. Per-service dual read * mode has a higher priority. Only if per-service dual read mode is not defined, global * dual read mode will be used. @@ -52,6 +52,10 @@ public class DualReadStateManager private final RateLimiter _rateLimiter; // Stores global dual read mode private volatile DualReadModeProvider.DualReadMode _dualReadMode = DualReadModeProvider.DualReadMode.OLD_LB_ONLY; + private final Set _globalDualReadModeWatchers; + private final ConcurrentMap> _serviceDualReadModeWatchers; + private final ConcurrentMap> _clusterDualReadModeWatchers; + private final DualReadLoadBalancerJmx _dualReadLoadBalancerJmx; private final DualReadLoadBalancerMonitor.UriPropertiesDualReadMonitor _uriPropertiesDualReadMonitor; @@ -74,6 +78,9 @@ public DualReadStateManager(DualReadModeProvider dualReadModeProvider, Scheduled _dualReadModeProvider = dualReadModeProvider; _executorService = executorService; _rateLimiter = RateLimiter.create((double) 1 / DUAL_READ_MODE_SWITCH_MIN_INTERVAL); + _globalDualReadModeWatchers = ConcurrentHashMap.newKeySet(); + _serviceDualReadModeWatchers = new ConcurrentHashMap<>(); + _clusterDualReadModeWatchers = new ConcurrentHashMap<>(); } public void updateGlobal(DualReadModeProvider.DualReadMode mode) @@ -82,6 +89,7 @@ public void updateGlobal(DualReadModeProvider.DualReadMode mode) _dualReadMode = mode; if (updated) { LOG.info("Global dual read mode updated: {}", mode); + notifyGlobalWatchers(); } } @@ -90,6 +98,7 @@ public void updateService(String service, DualReadModeProvider.DualReadMode mode DualReadModeProvider.DualReadMode oldMode = _serviceDualReadModes.put(service, mode); if (oldMode != mode) { LOG.info("Dual read mode for service {} updated: {}", service, mode); + notifyServiceWatchers(service); } } @@ -98,6 +107,7 @@ public void updateCluster(String cluster, DualReadModeProvider.DualReadMode mode DualReadModeProvider.DualReadMode oldMode = _clusterDualReadModes.put(cluster, mode); if (oldMode != mode) { LOG.info("Dual read mode for cluster {} updated: {}", cluster, mode); + notifyClusterWatchers(cluster); } } @@ -198,4 +208,70 @@ public DualReadModeProvider getDualReadModeProvider() { return _dualReadModeProvider; } + + public void addGlobalWatcher(DualReadModeWatcher watcher) + { + _globalDualReadModeWatchers.add(watcher); + } + + public void addServiceWatcher(String serviceName, DualReadModeWatcher watcher) + { + Set watchers = _serviceDualReadModeWatchers.computeIfAbsent(serviceName, k -> ConcurrentHashMap.newKeySet()); + watchers.add(watcher); + } + + public void addClusterWatcher(String clusterName, DualReadModeWatcher watcher) + { + Set watchers = _clusterDualReadModeWatchers.computeIfAbsent(clusterName, k -> ConcurrentHashMap.newKeySet()); + watchers.add(watcher); + } + + public void removeServiceWatcher(String serviceName, DualReadModeWatcher watcher) + { + Set watchers = _serviceDualReadModeWatchers.get(serviceName); + if (watchers != null) + { + watchers.remove(watcher); + } + } + + public void removeClusterWatcher(String clusterName, DualReadModeWatcher watcher) + { + Set watchers = _clusterDualReadModeWatchers.get(clusterName); + if (watchers != null) + { + watchers.remove(watcher); + } + } + + private void notifyGlobalWatchers() + { + notifyWatchers(_globalDualReadModeWatchers); + } + + private void notifyServiceWatchers(String serviceName) + { + notifyWatchers(_serviceDualReadModeWatchers.get(serviceName)); + } + + private void notifyClusterWatchers(String clusterName) + { + notifyWatchers(_clusterDualReadModeWatchers.get(clusterName)); + } + + private void notifyWatchers(Set watchers) + { + if (watchers != null) + { + for (DualReadModeWatcher w : watchers) + { + w.onChanged(); + } + } + } + + public interface DualReadModeWatcher + { + void onChanged(); + } } diff --git a/d2/src/main/java/com/linkedin/d2/jmx/D2ClientJmxManager.java b/d2/src/main/java/com/linkedin/d2/jmx/D2ClientJmxManager.java index 5a9a64e49d..cc5ee6d12c 100644 --- a/d2/src/main/java/com/linkedin/d2/jmx/D2ClientJmxManager.java +++ b/d2/src/main/java/com/linkedin/d2/jmx/D2ClientJmxManager.java @@ -16,9 +16,12 @@ package com.linkedin.d2.jmx; +import com.google.common.annotations.VisibleForTesting; import com.linkedin.d2.balancer.LoadBalancerStateItem; import com.linkedin.d2.balancer.clients.TrackerClient; import com.linkedin.d2.balancer.dualread.DualReadLoadBalancerJmx; +import com.linkedin.d2.balancer.dualread.DualReadModeProvider; +import com.linkedin.d2.balancer.dualread.DualReadStateManager; import com.linkedin.d2.balancer.properties.ServiceProperties; import com.linkedin.d2.balancer.simple.ClusterInfoItem; import com.linkedin.d2.balancer.simple.SimpleLoadBalancer; @@ -29,45 +32,149 @@ import com.linkedin.d2.discovery.stores.zk.ZooKeeperEphemeralStore; import com.linkedin.d2.discovery.stores.zk.ZooKeeperPermanentStore; import com.linkedin.util.ArgumentUtil; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.function.Consumer; import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * JMX manager to register the D2 client components */ public class D2ClientJmxManager { - private final String _prefix; + private static final Logger _log = LoggerFactory.getLogger(D2ClientJmxManager.class); + private final JmxManager _jmxManager; + // Service discovery source type: ZK, xDS, etc. + private final DiscoverySourceType _discoverySourceType; + + /* + When dual read state manager is null, only one discovery source is working (could be a new source other than ZK). We keep using the same Jmx/sensor + names as the ZK one so users can still monitor the same metrics. + + When dual read state manager is not null, it means dual read load balancer is in use, and there are two sets of load balancer, lb state, and FS backup + registering Jmx/sensors for different service discovery sources. + Depending on the specific dual read mode that is dynamically changing, controlled by lix on d2 service level, one source is primary, the other is secondary. + Jmx/sensor names need to be carefully handled to: + 1) for the primary source, use the primary names (the one ZK was using) so users can still monitor the same metrics. + 2) for the secondary source, use different names that include the source type to avoid conflicting the primary names. + */ + private final DualReadStateManager _dualReadStateManager; + + private final String _primaryGlobalPrefix; + + private final String _secondaryGlobalPrefix; + + private static final String _primaryPrefixForLbPropertyJmxName = ""; + + private final String _secondaryPrefixForLbPropertyJmxName; + + @SuppressWarnings("rawtypes") + private final ConcurrentMap _dualReadStateWatchers; + + public enum DiscoverySourceType + { + ZK("ZK"), + XDS("xDS"); + + private final String _printName; + + DiscoverySourceType(String printName) + { + _printName = printName; + } + + public String getPrintName() + { + return _printName; + } + } + public D2ClientJmxManager(String prefix, @Nonnull JmxManager jmxManager) + { + this(prefix, jmxManager, DiscoverySourceType.ZK, null); + } + + public D2ClientJmxManager(String prefix, + @Nonnull JmxManager jmxManager, + @Nonnull DiscoverySourceType discoverySourceType, + @Nullable DualReadStateManager dualReadStateManager) { ArgumentUtil.ensureNotNull(jmxManager,"jmxManager"); - _prefix = prefix; + _primaryGlobalPrefix = prefix; _jmxManager = jmxManager; + _discoverySourceType = discoverySourceType; + _dualReadStateManager = dualReadStateManager; + _secondaryGlobalPrefix = String.format("%s-%s", _primaryGlobalPrefix, _discoverySourceType.getPrintName()); + _secondaryPrefixForLbPropertyJmxName = String.format("%s-", _discoverySourceType.getPrintName()); + _dualReadStateWatchers = new ConcurrentHashMap<>(); } + @SuppressWarnings({"unchecked"}) public void setSimpleLoadBalancer(SimpleLoadBalancer balancer) { - final String jmxName = _prefix + "-LoadBalancer"; - - _jmxManager.registerLoadBalancer(jmxName, balancer); + if (_dualReadStateManager != null) + { + String watcherName = balancer.getClass().getSimpleName(); + D2ClientJmxDualReadModeWatcher currentWatcher = _dualReadStateWatchers.computeIfAbsent(watcherName, k -> + { + D2ClientJmxDualReadModeWatcher watcher = new D2ClientJmxDualReadModeWatcher<>(balancer, this::doRegisterLoadBalancer); + _dualReadStateManager.addGlobalWatcher(watcher); + return watcher; + }); + currentWatcher.setLatestJmxProperty(balancer); + } + doRegisterLoadBalancer(balancer); } + @SuppressWarnings({"unchecked"}) public void setSimpleLoadBalancerState(SimpleLoadBalancerState state) { - _jmxManager.registerLoadBalancerState(_prefix + "-LoadBalancerState", state); + if (_dualReadStateManager != null) + { + String watcherName = state.getClass().getSimpleName(); + D2ClientJmxDualReadModeWatcher currentWatcher = _dualReadStateWatchers.computeIfAbsent(watcherName, k -> + { + D2ClientJmxDualReadModeWatcher watcher = new D2ClientJmxDualReadModeWatcher<>(state, this::doRegisterLoadBalancerState); + _dualReadStateManager.addGlobalWatcher(watcher); + return watcher; + }); + currentWatcher.setLatestJmxProperty(state); + } + doRegisterLoadBalancerState(state); state.register(new SimpleLoadBalancerStateListener() { @Override public void onStrategyAdded(String serviceName, String scheme, LoadBalancerStrategy strategy) { - _jmxManager.registerLoadBalancerStrategy(getLoadBalancerStrategyJmxName(serviceName, scheme), strategy); + if (_dualReadStateManager != null) + { + D2ClientJmxDualReadModeWatcher currentWatcher = + _dualReadStateWatchers.computeIfAbsent(getWatcherNameForLoadBalancerStrategy(serviceName, scheme), k -> + { + Consumer callback = i -> doRegisterLoadBalancerStrategy(serviceName, scheme, i); + D2ClientJmxDualReadModeWatcher watcher = new D2ClientJmxDualReadModeWatcher<>(strategy, callback); + _dualReadStateManager.addServiceWatcher(serviceName, watcher); + return watcher; + }); + currentWatcher.setLatestJmxProperty(strategy); + } + doRegisterLoadBalancerStrategy(serviceName, scheme, strategy); } @Override public void onStrategyRemoved(String serviceName, String scheme, LoadBalancerStrategy strategy) { + DualReadStateManager.DualReadModeWatcher watcher = _dualReadStateWatchers.remove(getWatcherNameForLoadBalancerStrategy(serviceName, scheme)); + if (_dualReadStateManager != null && watcher != null) + { + _dualReadStateManager.removeServiceWatcher(serviceName, watcher); + } _jmxManager.unregister(getLoadBalancerStrategyJmxName(serviceName, scheme)); } @@ -90,10 +197,22 @@ public void onClientRemoved(String clusterName, TrackerClient client) public void onClusterInfoUpdate(ClusterInfoItem clusterInfoItem) { if (clusterInfoItem != null && clusterInfoItem.getClusterPropertiesItem() != null - && clusterInfoItem.getClusterPropertiesItem().getProperty() != null) { - _jmxManager.registerClusterInfo( - getClusterInfoJmxName(clusterInfoItem.getClusterPropertiesItem().getProperty().getClusterName()), - clusterInfoItem); + && clusterInfoItem.getClusterPropertiesItem().getProperty() != null) + { + String clusterName = clusterInfoItem.getClusterPropertiesItem().getProperty().getClusterName(); + if (_dualReadStateManager != null) + { + D2ClientJmxDualReadModeWatcher currentWatcher = + _dualReadStateWatchers.computeIfAbsent(getWatcherNameForClusterInfoItem(clusterName), k -> + { + Consumer callback = i -> doRegisterClusterInfo(clusterName, i); + D2ClientJmxDualReadModeWatcher watcher = new D2ClientJmxDualReadModeWatcher<>(clusterInfoItem, callback); + _dualReadStateManager.addClusterWatcher(clusterName, watcher); + return watcher; + }); + currentWatcher.setLatestJmxProperty(clusterInfoItem); + } + doRegisterClusterInfo(clusterName, clusterInfoItem); } } @@ -101,85 +220,290 @@ public void onClusterInfoUpdate(ClusterInfoItem clusterInfoItem) public void onClusterInfoRemoval(ClusterInfoItem clusterInfoItem) { if (clusterInfoItem != null && clusterInfoItem.getClusterPropertiesItem() != null - && clusterInfoItem.getClusterPropertiesItem().getProperty() != null) { - _jmxManager.unregister( - getClusterInfoJmxName(clusterInfoItem.getClusterPropertiesItem().getProperty().getClusterName()) - ); + && clusterInfoItem.getClusterPropertiesItem().getProperty() != null) + { + String clusterName = clusterInfoItem.getClusterPropertiesItem().getProperty().getClusterName(); + DualReadStateManager.DualReadModeWatcher watcher = _dualReadStateWatchers.remove(getWatcherNameForClusterInfoItem(clusterName)); + if (_dualReadStateManager != null && watcher != null) + { + _dualReadStateManager.removeClusterWatcher(clusterName, watcher); + } + _jmxManager.unregister(getClusterInfoJmxName(clusterName)); } } @Override public void onServicePropertiesUpdate(LoadBalancerStateItem serviceProperties) { - if (serviceProperties != null && serviceProperties.getProperty() != null) { - _jmxManager.registerServiceProperties( - getServicePropertiesJmxName(serviceProperties.getProperty().getServiceName()), - serviceProperties); + if (serviceProperties != null && serviceProperties.getProperty() != null) + { + String serviceName = serviceProperties.getProperty().getServiceName(); + if (_dualReadStateManager != null) + { + D2ClientJmxDualReadModeWatcher> currentWatcher = + _dualReadStateWatchers.computeIfAbsent(getWatcherNameForServiceProperties(serviceName), k -> + { + Consumer> callback = i -> doRegisterServiceProperties(serviceName, i); + D2ClientJmxDualReadModeWatcher> watcher = + new D2ClientJmxDualReadModeWatcher<>(serviceProperties, callback); + _dualReadStateManager.addServiceWatcher(serviceName, watcher); + return watcher; + }); + currentWatcher.setLatestJmxProperty(serviceProperties); + } + doRegisterServiceProperties(serviceName, serviceProperties); } } - @Override public void onServicePropertiesRemoval(LoadBalancerStateItem serviceProperties) { - if (serviceProperties != null && serviceProperties.getProperty() != null) { - _jmxManager.unregister(getServicePropertiesJmxName(serviceProperties.getProperty().getServiceName())); + if (serviceProperties != null && serviceProperties.getProperty() != null) + { + String serviceName = serviceProperties.getProperty().getServiceName(); + DualReadStateManager.DualReadModeWatcher watcher = _dualReadStateWatchers.remove(getWatcherNameForServiceProperties(serviceName)); + if (_dualReadStateManager != null && watcher != null) + { + _dualReadStateManager.removeServiceWatcher(serviceName, watcher); + } + _jmxManager.unregister(getServicePropertiesJmxName(serviceName)); } } - private String getPrefixForLBJmxNames() { - return _prefix.contains("-xDS") ? String.format("%s-", _prefix) : ""; + private void doRegisterLoadBalancerStrategy(String serviceName, String scheme, LoadBalancerStrategy strategy) + { + String jmxName = getLoadBalancerStrategyJmxName(serviceName, scheme); + _jmxManager.registerLoadBalancerStrategy(jmxName, strategy); + } + + private void doRegisterClusterInfo(String clusterName, ClusterInfoItem clusterInfoItem) + { + String jmxName = getClusterInfoJmxName(clusterName); + _jmxManager.registerClusterInfo(jmxName, clusterInfoItem); + } + + private void doRegisterServiceProperties(String serviceName, LoadBalancerStateItem serviceProperties) + { + _jmxManager.registerServiceProperties(getServicePropertiesJmxName(serviceName), serviceProperties); + } + + private String getWatcherNameForLoadBalancerStrategy(String serviceName, String scheme) + { + return String.format("%s-%s-LoadBalancerStrategy", serviceName, scheme); + } + + private String getWatcherNameForClusterInfoItem(String clusterName) { + return String.format("%s-ClusterInfoItem", clusterName); + } + + private String getWatcherNameForServiceProperties(String serviceName) { + return String.format("%s-LoadBalancerStateItem-ServiceProperties", serviceName); } private String getClusterInfoJmxName(String clusterName) { - return String.format("%s%s-ClusterInfo", getPrefixForLBJmxNames(), clusterName); + return String.format("%s%s-ClusterInfo", getClusterPrefixForLBPropertyJmxNames(clusterName), clusterName); } private String getServicePropertiesJmxName(String serviceName) { - return String.format("%s%s-ServiceProperties", getPrefixForLBJmxNames(), serviceName); + return String.format("%s%s-ServiceProperties", getServicePrefixForLBPropertyJmxNames(serviceName), serviceName); } private String getLoadBalancerStrategyJmxName(String serviceName, String scheme) { - return String.format("%s%s-%s-LoadBalancerStrategy", getPrefixForLBJmxNames(), serviceName, scheme); + return String.format("%s%s-%s-LoadBalancerStrategy", getServicePrefixForLBPropertyJmxNames(serviceName), serviceName, scheme); } }); } public void setZkUriRegistry(ZooKeeperEphemeralStore uriRegistry) { - _jmxManager.registerZooKeeperEphemeralStore(_prefix + "-ZooKeeperUriRegistry", uriRegistry); + if (_discoverySourceType != DiscoverySourceType.ZK) + { + _log.warn("Setting ZkUriRegistry for Non-ZK source type: {}", _discoverySourceType); + } + final String jmxName = String.format("%s-ZooKeeperUriRegistry", getGlobalPrefix()); + _jmxManager.registerZooKeeperEphemeralStore(jmxName, uriRegistry); } public void setZkClusterRegistry(ZooKeeperPermanentStore clusterRegistry) { - _jmxManager.registerZooKeeperPermanentStore(_prefix + "-ZooKeeperClusterRegistry", clusterRegistry); + if (_discoverySourceType != DiscoverySourceType.ZK) + { + _log.warn("Setting ZkClusterRegistry for Non-ZK source type: {}", _discoverySourceType); + } + final String jmxName = String.format("%s-ZooKeeperClusterRegistry", getGlobalPrefix()); + _jmxManager.registerZooKeeperPermanentStore(jmxName, clusterRegistry); } public void setZkServiceRegistry(ZooKeeperPermanentStore serviceRegistry) { - _jmxManager.registerZooKeeperPermanentStore(_prefix + "-ZooKeeperServiceRegistry", serviceRegistry); + if (_discoverySourceType != DiscoverySourceType.ZK) + { + _log.warn("Setting ZkServiceRegistry for Non-ZK source type: {}", _discoverySourceType); + } + final String jmxName = String.format("%s-ZooKeeperServiceRegistry", getGlobalPrefix()); + _jmxManager.registerZooKeeperPermanentStore(jmxName, serviceRegistry); } public void setFsUriStore(FileStore uriStore) { - _jmxManager.registerFileStore(_prefix + "-FileStoreUriStore", uriStore); + addDualReadModeWatcherForFileStore("UriProperties", uriStore, this::doRegisterUriFileStore); + doRegisterUriFileStore(uriStore); } public void setFsClusterStore(FileStore clusterStore) { - _jmxManager.registerFileStore(_prefix + "-FileStoreClusterStore", clusterStore); + addDualReadModeWatcherForFileStore("ClusterProperties", clusterStore, this::doRegisterClusterFileStore); + doRegisterClusterFileStore(clusterStore); } public void setFsServiceStore(FileStore serviceStore) { - _jmxManager.registerFileStore(_prefix + "-FileStoreServiceStore", serviceStore); + addDualReadModeWatcherForFileStore("ServiceProperties", serviceStore, this::doRegisterServiceFileStore); + doRegisterServiceFileStore(serviceStore); } public void registerDualReadLoadBalancerJmx(DualReadLoadBalancerJmx dualReadLoadBalancerJmx) { - _jmxManager.registerDualReadLoadBalancerJmxBean(_prefix + "-DualReadLoadBalancerJmx", dualReadLoadBalancerJmx); + if (_discoverySourceType != DiscoverySourceType.XDS) + { + _log.warn("Setting DualReadLoadBalancerJmx for Non-XDS source type: {}", _discoverySourceType); + } + final String jmxName = String.format("%s-DualReadLoadBalancerJmx", getGlobalPrefix()); + _jmxManager.registerDualReadLoadBalancerJmxBean(jmxName, dualReadLoadBalancerJmx); + } + + private void doRegisterLoadBalancer(SimpleLoadBalancer balancer) + { + final String jmxName = String.format("%s-LoadBalancer", getGlobalPrefix()); + _jmxManager.registerLoadBalancer(jmxName, balancer); + } + + private void doRegisterLoadBalancerState(SimpleLoadBalancerState state) + { + final String jmxName = String.format("%s-LoadBalancerState", getGlobalPrefix()); + _jmxManager.registerLoadBalancerState(jmxName, state); + } + + private void doRegisterUriFileStore(FileStore uriStore) + { + final String jmxName = String.format("%s-FileStoreUriStore", getGlobalPrefix()); + _jmxManager.registerFileStore(jmxName, uriStore); + } + + private void doRegisterClusterFileStore(FileStore clusterStore) + { + final String jmxName = String.format("%s-FileStoreClusterStore", getGlobalPrefix()); + _jmxManager.registerFileStore(jmxName, clusterStore); + } + + private void doRegisterServiceFileStore(FileStore serviceStore) + { + final String jmxName = String.format("%s-FileStoreServiceStore", getGlobalPrefix()); + _jmxManager.registerFileStore(jmxName, serviceStore); + } + + @SuppressWarnings("unchecked") + private void addDualReadModeWatcherForFileStore(String watcherNameSuffix, FileStore store, Consumer> watcherCallback) + { + if (_dualReadStateManager != null) + { + String watcherName = String.format("%s-%s", store.getClass().getSimpleName(), watcherNameSuffix); + D2ClientJmxDualReadModeWatcher> currentWatcher = _dualReadStateWatchers.computeIfAbsent(watcherName, k -> + { + D2ClientJmxDualReadModeWatcher> watcher = new D2ClientJmxDualReadModeWatcher<>(store, watcherCallback); + _dualReadStateManager.addGlobalWatcher(watcher); + return watcher; + }); + currentWatcher.setLatestJmxProperty(store); + } + } + + private String getGlobalPrefix() + { + return isGlobalPrimarySource() ? _primaryGlobalPrefix : _secondaryGlobalPrefix; + } + + private String getServicePrefixForLBPropertyJmxNames(String serviceName) + { + return isServicePrimarySource(serviceName) ? _primaryPrefixForLbPropertyJmxName : _secondaryPrefixForLbPropertyJmxName; + } + + private String getClusterPrefixForLBPropertyJmxNames(String clusterName) + { + return isClusterPrimarySource(clusterName) ? _primaryPrefixForLbPropertyJmxName : _secondaryPrefixForLbPropertyJmxName; + } + + private boolean isGlobalPrimarySource() + { + if (_dualReadStateManager == null) + { + return true; // only one source, it is the primary. + } + return isPrimarySourceHelper(_dualReadStateManager.getGlobalDualReadMode()); + } + + private boolean isServicePrimarySource(String serviceName) + { + if (_dualReadStateManager == null) + { + return true; // only one source, it is the primary. + } + return isPrimarySourceHelper(_dualReadStateManager.getServiceDualReadMode(serviceName)); + } + + private boolean isClusterPrimarySource(String clusterName) + { + if (_dualReadStateManager == null) + { + return true; // only one source, it is the primary. + } + return isPrimarySourceHelper(_dualReadStateManager.getClusterDualReadMode(clusterName)); + } + + private boolean isPrimarySourceHelper(DualReadModeProvider.DualReadMode dualReadMode) + { + switch (dualReadMode) + { + case NEW_LB_ONLY: + return _discoverySourceType == DiscoverySourceType.XDS; + case DUAL_READ: + case OLD_LB_ONLY: + return _discoverySourceType == DiscoverySourceType.ZK; + default: + _log.warn("Unknown dual read mode {}, falling back to ZK as primary source.", dualReadMode); + return _discoverySourceType == DiscoverySourceType.ZK; + } + } + + static final class D2ClientJmxDualReadModeWatcher implements DualReadStateManager.DualReadModeWatcher + { + private T _latestJmxProperty; + private final Consumer _callback; + + D2ClientJmxDualReadModeWatcher(T initialJmxProperty, Consumer callback) + { + _latestJmxProperty = initialJmxProperty; + _callback = callback; + } + + @VisibleForTesting + T getLatestJmxProperty() + { + return _latestJmxProperty; + } + + public void setLatestJmxProperty(T latestJmxProperty) + { + _latestJmxProperty = latestJmxProperty; + } + + @Override + public void onChanged() + { + _callback.accept(_latestJmxProperty); + } } } diff --git a/d2/src/main/java/com/linkedin/d2/jmx/JmxManager.java b/d2/src/main/java/com/linkedin/d2/jmx/JmxManager.java index ce4f9a52d9..d6ea394b99 100644 --- a/d2/src/main/java/com/linkedin/d2/jmx/JmxManager.java +++ b/d2/src/main/java/com/linkedin/d2/jmx/JmxManager.java @@ -246,8 +246,10 @@ public void unregister(ObjectName oName) { _server.unregisterMBean(oName); } - _registeredNames.remove(oName); - _log.info("Unregistered MBean {}", oName); + if (_registeredNames.remove(oName)) + { + _log.info("Unregistered MBean {}", oName); + } } catch (Exception e) { diff --git a/d2/src/main/java/com/linkedin/d2/xds/balancer/XdsLoadBalancerWithFacilitiesFactory.java b/d2/src/main/java/com/linkedin/d2/xds/balancer/XdsLoadBalancerWithFacilitiesFactory.java index da42df1be3..f5834b73c6 100644 --- a/d2/src/main/java/com/linkedin/d2/xds/balancer/XdsLoadBalancerWithFacilitiesFactory.java +++ b/d2/src/main/java/com/linkedin/d2/xds/balancer/XdsLoadBalancerWithFacilitiesFactory.java @@ -40,8 +40,8 @@ public class XdsLoadBalancerWithFacilitiesFactory implements LoadBalancerWithFac @Override public LoadBalancerWithFacilities create(D2ClientConfig config) { - D2ClientJmxManager d2ClientJmxManager = new D2ClientJmxManager(config.d2JmxManagerPrefix + "-xDS", - config.jmxManager); + D2ClientJmxManager d2ClientJmxManager = new D2ClientJmxManager(config.d2JmxManagerPrefix, config.jmxManager, + D2ClientJmxManager.DiscoverySourceType.XDS, config.dualReadStateManager); if (config.dualReadStateManager != null) { diff --git a/d2/src/test/java/com/linkedin/d2/jmx/D2ClientJmxManagerTest.java b/d2/src/test/java/com/linkedin/d2/jmx/D2ClientJmxManagerTest.java index fa3c8a59c6..5cbfc105bd 100644 --- a/d2/src/test/java/com/linkedin/d2/jmx/D2ClientJmxManagerTest.java +++ b/d2/src/test/java/com/linkedin/d2/jmx/D2ClientJmxManagerTest.java @@ -17,14 +17,21 @@ package com.linkedin.d2.jmx; import com.linkedin.d2.balancer.LoadBalancerStateItem; +import com.linkedin.d2.balancer.dualread.DualReadModeProvider; +import com.linkedin.d2.balancer.dualread.DualReadStateManager; import com.linkedin.d2.balancer.properties.ClusterProperties; import com.linkedin.d2.balancer.properties.ServiceProperties; +import com.linkedin.d2.balancer.properties.UriProperties; import com.linkedin.d2.balancer.simple.ClusterInfoItem; +import com.linkedin.d2.balancer.simple.SimpleLoadBalancer; import com.linkedin.d2.balancer.simple.SimpleLoadBalancerState; +import com.linkedin.d2.balancer.strategies.relative.RelativeLoadBalancerStrategy; import com.linkedin.d2.balancer.util.canary.CanaryDistributionProvider; import com.linkedin.d2.balancer.util.partitions.PartitionAccessor; +import com.linkedin.d2.discovery.stores.file.FileStore; import java.net.URI; import java.util.Collections; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.atomic.AtomicLong; import org.mockito.Captor; import org.mockito.ArgumentCaptor; @@ -32,150 +39,418 @@ import org.mockito.Mockito; import org.mockito.MockitoAnnotations; import org.testng.Assert; -import org.testng.annotations.BeforeMethod; +import org.testng.annotations.DataProvider; import org.testng.annotations.Test; import static org.mockito.Mockito.*; public class D2ClientJmxManagerTest { - @Mock - SimpleLoadBalancerState _simpleLoadBalancerState; - @Mock - JmxManager _jmxManager; - @Captor - ArgumentCaptor _simpleLoadBalancerStateNameCaptor; - @Captor - ArgumentCaptor _simpleLoadBalancerStateCaptor; - @Captor - ArgumentCaptor _simpleLoadBalancerStateListenerCaptor; - @Captor - ArgumentCaptor _unregisteredObjectNameCaptor; - @Captor - ArgumentCaptor _registerObjectNameCaptor; - @Captor - ArgumentCaptor _clusterInfoArgumentCaptor; - @Captor - ArgumentCaptor> _servicePropertiesArgumentCaptor; - - D2ClientJmxManager _d2ClientJmxManager; - private ClusterInfoItem _clusterInfoItem; - private ClusterInfoItem _noPropertyClusterInfoItem; - private LoadBalancerStateItem _servicePropertiesLBState; - private LoadBalancerStateItem _noPropertyLBStateItem; - - @BeforeMethod - public void setUp() + + private static final LoadBalancerStateItem SERVICE_PROPERTIES_LOAD_BALANCER_STATE_ITEM = new LoadBalancerStateItem<>( + new ServiceProperties("S_Foo", "Bar", "/", Collections.singletonList("Random")), + 0, + 0, + CanaryDistributionProvider.Distribution.CANARY + ); + + private static final LoadBalancerStateItem NO_PROPERTY_LB_STATE_ITEM = new LoadBalancerStateItem<>( + null, 0, 0, CanaryDistributionProvider.Distribution.STABLE); + + private static final LoadBalancerStateItem UPDATED_SERVICE_PROPERTIES_LB_STATE_ITEM = new LoadBalancerStateItem<>( + new ServiceProperties("S_Foo", "Bar", "/", Collections.singletonList("Random")), + 0, + 0, + CanaryDistributionProvider.Distribution.STABLE + ); + + private static final class D2ClientJmxManagerFixture { - MockitoAnnotations.initMocks(this); - AtomicLong version = new AtomicLong(0); - when(_simpleLoadBalancerState.getVersionAccess()).thenReturn(version); - _clusterInfoItem = - new ClusterInfoItem(_simpleLoadBalancerState, new ClusterProperties("C_Foo"), new PartitionAccessor() { - @Override - public int getMaxPartitionId() { - return 0; - } - - @Override - public int getPartitionId(URI uri) { - return 0; - } - }, CanaryDistributionProvider.Distribution.CANARY); - _noPropertyClusterInfoItem = new ClusterInfoItem(_simpleLoadBalancerState, null, null, - CanaryDistributionProvider.Distribution.STABLE); - _servicePropertiesLBState = new LoadBalancerStateItem<>( - new ServiceProperties("S_Foo", "Bar", "/", Collections.singletonList("Random")), - 0, - 0, - CanaryDistributionProvider.Distribution.CANARY - ); - _noPropertyLBStateItem = new LoadBalancerStateItem(null, 0, 0, - CanaryDistributionProvider.Distribution.STABLE); - _d2ClientJmxManager = new D2ClientJmxManager("Foo", _jmxManager); - Mockito.doReturn(_jmxManager).when(_jmxManager).unregister(_unregisteredObjectNameCaptor.capture()); - Mockito.doReturn(_jmxManager).when(_jmxManager).registerLoadBalancerState( - _simpleLoadBalancerStateNameCaptor.capture(), _simpleLoadBalancerStateCaptor.capture()); - Mockito.doReturn(_jmxManager).when(_jmxManager).registerClusterInfo( - _registerObjectNameCaptor.capture(), - _clusterInfoArgumentCaptor.capture()); - Mockito.doReturn(_jmxManager).when(_jmxManager).registerServiceProperties( - _registerObjectNameCaptor.capture(), - _servicePropertiesArgumentCaptor.capture()); - Mockito.doNothing().when(_simpleLoadBalancerState).register(_simpleLoadBalancerStateListenerCaptor.capture()); + @Mock + SimpleLoadBalancer _loadBalancer; + @Mock + SimpleLoadBalancerState _simpleLoadBalancerState; + @Mock + JmxManager _jmxManager; + @Mock + FileStore _uriStore; + @Mock + FileStore _clusterStore; + @Mock + FileStore _serviceStore; + @Mock + RelativeLoadBalancerStrategy _relativeLoadBalancerStrategy; + @Mock + DualReadModeProvider _dualReadModeProvider; + @Mock + ScheduledExecutorService _executorService; + @Captor + ArgumentCaptor _simpleLoadBalancerStateNameCaptor; + @Captor + ArgumentCaptor _simpleLoadBalancerStateCaptor; + @Captor + ArgumentCaptor _simpleLoadBalancerStateListenerCaptor; + @Captor + ArgumentCaptor _unregisteredObjectNameCaptor; + @Captor + ArgumentCaptor _registerObjectNameCaptor; + @Captor + ArgumentCaptor _clusterInfoArgumentCaptor; + @Captor + ArgumentCaptor> _servicePropertiesArgumentCaptor; + @SuppressWarnings("rawtypes") + @Captor + ArgumentCaptor _addWatcherCaptor; + + D2ClientJmxManager _d2ClientJmxManager; + private final ClusterInfoItem _clusterInfoItem; + private final ClusterInfoItem _updatedClusterInfoItem; + private final ClusterInfoItem _noPropertyClusterInfoItem; + private final DualReadStateManager _dualReadStateManager; + + D2ClientJmxManagerFixture() + { + MockitoAnnotations.initMocks(this); + AtomicLong version = new AtomicLong(0); + when(_simpleLoadBalancerState.getVersionAccess()).thenReturn(version); + PartitionAccessor partitionAccessor = new PartitionAccessor() { + @Override + public int getMaxPartitionId() { + return 0; + } + + @Override + public int getPartitionId(URI uri) { + return 0; + } + }; + _clusterInfoItem = + new ClusterInfoItem(_simpleLoadBalancerState, new ClusterProperties("C_Foo"), partitionAccessor, + CanaryDistributionProvider.Distribution.CANARY); + _updatedClusterInfoItem = + new ClusterInfoItem(_simpleLoadBalancerState, new ClusterProperties("C_Foo"), partitionAccessor, + CanaryDistributionProvider.Distribution.STABLE); + _noPropertyClusterInfoItem = new ClusterInfoItem(_simpleLoadBalancerState, null, null, + CanaryDistributionProvider.Distribution.STABLE); + Mockito.doReturn(_jmxManager).when(_jmxManager).unregister(_unregisteredObjectNameCaptor.capture()); + Mockito.doReturn(_jmxManager).when(_jmxManager).registerLoadBalancerState( + _simpleLoadBalancerStateNameCaptor.capture(), _simpleLoadBalancerStateCaptor.capture()); + Mockito.doReturn(_jmxManager).when(_jmxManager).registerClusterInfo( + _registerObjectNameCaptor.capture(), + _clusterInfoArgumentCaptor.capture()); + Mockito.doReturn(_jmxManager).when(_jmxManager).registerServiceProperties( + _registerObjectNameCaptor.capture(), + _servicePropertiesArgumentCaptor.capture()); + Mockito.doNothing().when(_simpleLoadBalancerState).register(_simpleLoadBalancerStateListenerCaptor.capture()); + + _dualReadStateManager = spy(new DualReadStateManager(_dualReadModeProvider, _executorService)); + + doCallRealMethod().when(_dualReadStateManager).addGlobalWatcher(any()); + doCallRealMethod().when(_dualReadStateManager).addServiceWatcher(any(), any()); + doCallRealMethod().when(_dualReadStateManager).addClusterWatcher(any(), any()); + doCallRealMethod().when(_dualReadStateManager).updateGlobal(any()); + doCallRealMethod().when(_dualReadStateManager).updateService(any(), any()); + doCallRealMethod().when(_dualReadStateManager).updateCluster(any(), any()); + } + + D2ClientJmxManager getD2ClientJmxManager(String prefix, D2ClientJmxManager.DiscoverySourceType sourceType, Boolean isDualReadLB) + { + if (sourceType == null) + { // default to ZK source type, null dualReadStateManager + _d2ClientJmxManager = new D2ClientJmxManager(prefix, _jmxManager); + } + else + { + _d2ClientJmxManager = new D2ClientJmxManager(prefix, _jmxManager, sourceType, isDualReadLB ? _dualReadStateManager : null); + } + return _d2ClientJmxManager; + } + } + + @DataProvider(name = "nonDualReadD2ClientJmxManagers") + public Object[][] nonDualReadD2ClientJmxManagers() + { + return new Object[][] + { + {"Foo", null, false}, + {"Foo", D2ClientJmxManager.DiscoverySourceType.ZK, false}, + {"Foo", D2ClientJmxManager.DiscoverySourceType.XDS, false} + }; } - @Test() - public void testSetSimpleLBStateListenerUpdateServiceProperties() + @Test(dataProvider = "nonDualReadD2ClientJmxManagers") + public void testSetSimpleLBStateListenerUpdateServiceProperties(String prefix, D2ClientJmxManager.DiscoverySourceType sourceType, + Boolean isDualReadLB) { - _d2ClientJmxManager.setSimpleLoadBalancerState(_simpleLoadBalancerState); - _simpleLoadBalancerStateListenerCaptor.getValue().onServicePropertiesUpdate(null); - Mockito.verify(_jmxManager, never()).registerServiceProperties(any(), any()); - _simpleLoadBalancerStateListenerCaptor.getValue().onServicePropertiesUpdate(_noPropertyLBStateItem); - Mockito.verify(_jmxManager, never()).registerServiceProperties(any(), any()); + D2ClientJmxManagerFixture fixture = new D2ClientJmxManagerFixture(); + D2ClientJmxManager d2ClientJmxManager = fixture.getD2ClientJmxManager(prefix, sourceType, isDualReadLB); - _simpleLoadBalancerStateListenerCaptor.getValue().onServicePropertiesUpdate(_servicePropertiesLBState); + d2ClientJmxManager.setSimpleLoadBalancerState(fixture._simpleLoadBalancerState); + fixture._simpleLoadBalancerStateListenerCaptor.getValue().onServicePropertiesUpdate(null); + Mockito.verify(fixture._jmxManager, never()).registerServiceProperties(any(), any()); + fixture._simpleLoadBalancerStateListenerCaptor.getValue().onServicePropertiesUpdate(NO_PROPERTY_LB_STATE_ITEM); + Mockito.verify(fixture._jmxManager, never()).registerServiceProperties(any(), any()); + + fixture._simpleLoadBalancerStateListenerCaptor.getValue().onServicePropertiesUpdate( + SERVICE_PROPERTIES_LOAD_BALANCER_STATE_ITEM); Assert.assertEquals( - _registerObjectNameCaptor.getValue(), + fixture._registerObjectNameCaptor.getValue(), "S_Foo-ServiceProperties" ); Assert.assertEquals( - _servicePropertiesArgumentCaptor.getValue(), - _servicePropertiesLBState + fixture._servicePropertiesArgumentCaptor.getValue(), SERVICE_PROPERTIES_LOAD_BALANCER_STATE_ITEM ); } - @Test - public void testSetSimpleLBStateListenerUpdateClusterInfo() + @Test(dataProvider = "nonDualReadD2ClientJmxManagers") + public void testSetSimpleLBStateListenerUpdateClusterInfo(String prefix, D2ClientJmxManager.DiscoverySourceType sourceType, + Boolean isDualReadLB) { - _d2ClientJmxManager.setSimpleLoadBalancerState(_simpleLoadBalancerState); - _simpleLoadBalancerStateListenerCaptor.getValue().onClusterInfoUpdate(null); - Mockito.verify(_jmxManager, never()).registerClusterInfo(any(), any()); - _simpleLoadBalancerStateListenerCaptor.getValue().onClusterInfoUpdate(_noPropertyClusterInfoItem); - Mockito.verify(_jmxManager, never()).registerClusterInfo(any(), any()); + D2ClientJmxManagerFixture fixture = new D2ClientJmxManagerFixture(); + D2ClientJmxManager d2ClientJmxManager = fixture.getD2ClientJmxManager(prefix, sourceType, isDualReadLB); - _simpleLoadBalancerStateListenerCaptor.getValue().onClusterInfoUpdate(_clusterInfoItem); + d2ClientJmxManager.setSimpleLoadBalancerState(fixture._simpleLoadBalancerState); + fixture._simpleLoadBalancerStateListenerCaptor.getValue().onClusterInfoUpdate(null); + Mockito.verify(fixture._jmxManager, never()).registerClusterInfo(any(), any()); + fixture._simpleLoadBalancerStateListenerCaptor.getValue().onClusterInfoUpdate(fixture._noPropertyClusterInfoItem); + Mockito.verify(fixture._jmxManager, never()).registerClusterInfo(any(), any()); + + fixture._simpleLoadBalancerStateListenerCaptor.getValue().onClusterInfoUpdate(fixture._clusterInfoItem); Assert.assertEquals( - _registerObjectNameCaptor.getValue(), + fixture._registerObjectNameCaptor.getValue(), "C_Foo-ClusterInfo" ); Assert.assertEquals( - _clusterInfoArgumentCaptor.getValue(), - _clusterInfoItem + fixture._clusterInfoArgumentCaptor.getValue(), + fixture._clusterInfoItem ); } - @Test - public void testSetSimpleLBStateListenerRemoveClusterInfo() + @Test(dataProvider = "nonDualReadD2ClientJmxManagers") + public void testSetSimpleLBStateListenerRemoveClusterInfo(String prefix, D2ClientJmxManager.DiscoverySourceType sourceType, + Boolean isDualReadLB) { - _d2ClientJmxManager.setSimpleLoadBalancerState(_simpleLoadBalancerState); - Assert.assertEquals(_simpleLoadBalancerStateNameCaptor.getValue(), "Foo-LoadBalancerState"); - Assert.assertEquals(_simpleLoadBalancerStateCaptor.getValue(), _simpleLoadBalancerState); - _simpleLoadBalancerStateListenerCaptor.getValue().onClusterInfoRemoval(null); - Mockito.verify(_jmxManager, never()).unregister(anyString()); - _simpleLoadBalancerStateListenerCaptor.getValue().onClusterInfoRemoval(_noPropertyClusterInfoItem); - Mockito.verify(_jmxManager, never()).unregister(anyString()); - - _simpleLoadBalancerStateListenerCaptor.getValue().onClusterInfoRemoval(_clusterInfoItem); + D2ClientJmxManagerFixture fixture = new D2ClientJmxManagerFixture(); + D2ClientJmxManager d2ClientJmxManager = fixture.getD2ClientJmxManager(prefix, sourceType, isDualReadLB); + + d2ClientJmxManager.setSimpleLoadBalancerState(fixture._simpleLoadBalancerState); + Assert.assertEquals(fixture._simpleLoadBalancerStateNameCaptor.getValue(), "Foo-LoadBalancerState"); + Assert.assertEquals(fixture._simpleLoadBalancerStateCaptor.getValue(), fixture._simpleLoadBalancerState); + fixture._simpleLoadBalancerStateListenerCaptor.getValue().onClusterInfoRemoval(null); + Mockito.verify(fixture._jmxManager, never()).unregister(anyString()); + fixture._simpleLoadBalancerStateListenerCaptor.getValue().onClusterInfoRemoval(fixture._noPropertyClusterInfoItem); + Mockito.verify(fixture._jmxManager, never()).unregister(anyString()); + + fixture._simpleLoadBalancerStateListenerCaptor.getValue().onClusterInfoRemoval(fixture._clusterInfoItem); Assert.assertEquals( - _unregisteredObjectNameCaptor.getValue(), - _clusterInfoItem.getClusterPropertiesItem().getProperty().getClusterName() + "-ClusterInfo"); + fixture._unregisteredObjectNameCaptor.getValue(), + fixture._clusterInfoItem.getClusterPropertiesItem().getProperty().getClusterName() + "-ClusterInfo"); } - @Test - public void testSetSimpleLBStateListenerRemoveServiceProperties() + @Test(dataProvider = "nonDualReadD2ClientJmxManagers") + public void testSetSimpleLBStateListenerRemoveServiceProperties(String prefix, D2ClientJmxManager.DiscoverySourceType sourceType, + Boolean isDualReadLB) { - _d2ClientJmxManager.setSimpleLoadBalancerState(_simpleLoadBalancerState); - Assert.assertEquals(_simpleLoadBalancerStateNameCaptor.getValue(), "Foo-LoadBalancerState"); - Assert.assertEquals(_simpleLoadBalancerStateCaptor.getValue(), _simpleLoadBalancerState); - _simpleLoadBalancerStateListenerCaptor.getValue().onServicePropertiesRemoval(null); - Mockito.verify(_jmxManager, never()).unregister(anyString()); - _simpleLoadBalancerStateListenerCaptor.getValue().onServicePropertiesRemoval(_noPropertyLBStateItem); - Mockito.verify(_jmxManager, never()).unregister(anyString()); - - _simpleLoadBalancerStateListenerCaptor.getValue().onServicePropertiesRemoval(_servicePropertiesLBState); + D2ClientJmxManagerFixture fixture = new D2ClientJmxManagerFixture(); + D2ClientJmxManager d2ClientJmxManager = fixture.getD2ClientJmxManager(prefix, sourceType, isDualReadLB); + + d2ClientJmxManager.setSimpleLoadBalancerState(fixture._simpleLoadBalancerState); + Assert.assertEquals(fixture._simpleLoadBalancerStateNameCaptor.getValue(), "Foo-LoadBalancerState"); + Assert.assertEquals(fixture._simpleLoadBalancerStateCaptor.getValue(), fixture._simpleLoadBalancerState); + fixture._simpleLoadBalancerStateListenerCaptor.getValue().onServicePropertiesRemoval(null); + Mockito.verify(fixture._jmxManager, never()).unregister(anyString()); + fixture._simpleLoadBalancerStateListenerCaptor.getValue().onServicePropertiesRemoval(NO_PROPERTY_LB_STATE_ITEM); + Mockito.verify(fixture._jmxManager, never()).unregister(anyString()); + + fixture._simpleLoadBalancerStateListenerCaptor.getValue().onServicePropertiesRemoval( + SERVICE_PROPERTIES_LOAD_BALANCER_STATE_ITEM); Assert.assertEquals( - _unregisteredObjectNameCaptor.getValue(), - _servicePropertiesLBState.getProperty().getServiceName() + "-ServiceProperties"); + fixture._unregisteredObjectNameCaptor.getValue(), + SERVICE_PROPERTIES_LOAD_BALANCER_STATE_ITEM.getProperty().getServiceName() + "-ServiceProperties"); + } + + @SuppressWarnings({"unchecked", "rawtypes"}) + @Test + public void testAddAndRemoveWatcherAtServicePropertiesUpdate() + { + D2ClientJmxManagerFixture fixture = new D2ClientJmxManagerFixture(); + D2ClientJmxManager d2ClientJmxManager = fixture.getD2ClientJmxManager("Foo", D2ClientJmxManager.DiscoverySourceType.XDS, true); + // Initial dual read mode is ZK only. + DualReadStateManager dualReadStateManager = fixture._dualReadStateManager; + Mockito.doReturn(DualReadModeProvider.DualReadMode.OLD_LB_ONLY).when(dualReadStateManager).getGlobalDualReadMode(); + Mockito.doReturn(DualReadModeProvider.DualReadMode.OLD_LB_ONLY).when(dualReadStateManager).getServiceDualReadMode(any()); + Mockito.doReturn(DualReadModeProvider.DualReadMode.OLD_LB_ONLY).when(dualReadStateManager).getClusterDualReadMode(any()); + + d2ClientJmxManager.setSimpleLoadBalancerState(fixture._simpleLoadBalancerState); + SimpleLoadBalancerState.SimpleLoadBalancerStateListener lbStateListener = fixture._simpleLoadBalancerStateListenerCaptor.getValue(); + ArgumentCaptor addWatcherCaptor = fixture._addWatcherCaptor; + + lbStateListener.onServicePropertiesUpdate(SERVICE_PROPERTIES_LOAD_BALANCER_STATE_ITEM); + // Verify watcher is added with properties inside + verify(dualReadStateManager).addServiceWatcher(eq("S_Foo"), addWatcherCaptor.capture()); + D2ClientJmxManager.D2ClientJmxDualReadModeWatcher> watcher = addWatcherCaptor.getValue(); + Assert.assertEquals(watcher.getLatestJmxProperty(), SERVICE_PROPERTIES_LOAD_BALANCER_STATE_ITEM); + + lbStateListener.onServicePropertiesUpdate(UPDATED_SERVICE_PROPERTIES_LB_STATE_ITEM); + // Verify watcher is not added again, and properties in the watcher is updated + verify(dualReadStateManager, times(1)).addServiceWatcher(any(), any()); + Assert.assertEquals(watcher.getLatestJmxProperty(), UPDATED_SERVICE_PROPERTIES_LB_STATE_ITEM); + + // Verify watch is removed + lbStateListener.onServicePropertiesRemoval(UPDATED_SERVICE_PROPERTIES_LB_STATE_ITEM); + verify(dualReadStateManager).removeServiceWatcher(eq("S_Foo"), eq(watcher)); + } + + @SuppressWarnings({"unchecked", "rawtypes"}) + @Test + public void testAddAndRemoveWatcherAtClusterInfoItemUpdate() + { + D2ClientJmxManagerFixture fixture = new D2ClientJmxManagerFixture(); + D2ClientJmxManager d2ClientJmxManager = fixture.getD2ClientJmxManager("Foo", D2ClientJmxManager.DiscoverySourceType.XDS, true); + d2ClientJmxManager.setSimpleLoadBalancerState(fixture._simpleLoadBalancerState); + // Initial dual read mode is ZK only. + DualReadStateManager dualReadStateManager = fixture._dualReadStateManager; + Mockito.doReturn(DualReadModeProvider.DualReadMode.OLD_LB_ONLY).when(dualReadStateManager).getGlobalDualReadMode(); + Mockito.doReturn(DualReadModeProvider.DualReadMode.OLD_LB_ONLY).when(dualReadStateManager).getServiceDualReadMode(any()); + Mockito.doReturn(DualReadModeProvider.DualReadMode.OLD_LB_ONLY).when(dualReadStateManager).getClusterDualReadMode(any()); + + SimpleLoadBalancerState.SimpleLoadBalancerStateListener lbStateListener = fixture._simpleLoadBalancerStateListenerCaptor.getValue(); + ArgumentCaptor addWatcherCaptor = fixture._addWatcherCaptor; + + lbStateListener.onClusterInfoUpdate(fixture._clusterInfoItem); + + // Verify watcher is added with properties inside + verify(dualReadStateManager).addClusterWatcher(eq("C_Foo"), addWatcherCaptor.capture()); + D2ClientJmxManager.D2ClientJmxDualReadModeWatcher watcher = addWatcherCaptor.getValue(); + Assert.assertEquals(watcher.getLatestJmxProperty(), fixture._clusterInfoItem); + + lbStateListener.onClusterInfoUpdate(fixture._updatedClusterInfoItem); + // Verify watcher is not added again, and properties in the watcher is updated + verify(dualReadStateManager, times(1)).addClusterWatcher(any(), any()); + Assert.assertEquals(watcher.getLatestJmxProperty(), fixture._updatedClusterInfoItem); + + // Verify watch is removed + lbStateListener.onClusterInfoRemoval(fixture._updatedClusterInfoItem); + verify(dualReadStateManager).removeClusterWatcher(eq("C_Foo"), eq(watcher)); + } + + @DataProvider(name = "sourceTypeAndDualReadModeForLixSwitch") + public Object[][] sourceTypeAndDualReadModeForDualReadModeSwitch() + { + return new Object[][] + { + // ZK source is still primary switching OLD_LB_ONLY -> DUAL_READ + {D2ClientJmxManager.DiscoverySourceType.ZK, DualReadModeProvider.DualReadMode.OLD_LB_ONLY, + DualReadModeProvider.DualReadMode.DUAL_READ, true, true}, + // XDS source is still secondary switching OLD_LB_ONLY -> DUAL_READ + {D2ClientJmxManager.DiscoverySourceType.XDS, DualReadModeProvider.DualReadMode.OLD_LB_ONLY, + DualReadModeProvider.DualReadMode.DUAL_READ, false, false}, + // ZK source becomes secondary switching DUAL_READ -> NEW_LB_ONLY + {D2ClientJmxManager.DiscoverySourceType.ZK, DualReadModeProvider.DualReadMode.DUAL_READ, + DualReadModeProvider.DualReadMode.NEW_LB_ONLY, true, false}, + // XDS source becomes primary switching DUAL_READ -> NEW_LB_ONLY + {D2ClientJmxManager.DiscoverySourceType.XDS, DualReadModeProvider.DualReadMode.DUAL_READ, + DualReadModeProvider.DualReadMode.NEW_LB_ONLY, false, true}, + // ZK source becomes primary switching NEW_LB_ONLY -> DUAL_READ + {D2ClientJmxManager.DiscoverySourceType.ZK, DualReadModeProvider.DualReadMode.NEW_LB_ONLY, + DualReadModeProvider.DualReadMode.DUAL_READ, false, true}, + // XDS source becomes secondary switching NEW_LB_ONLY -> DUAL_READ + {D2ClientJmxManager.DiscoverySourceType.XDS, DualReadModeProvider.DualReadMode.NEW_LB_ONLY, + DualReadModeProvider.DualReadMode.DUAL_READ, true, false}, + // ZK source is still primary switching DUAL_READ -> OLD_LB_ONLY + {D2ClientJmxManager.DiscoverySourceType.ZK, DualReadModeProvider.DualReadMode.DUAL_READ, + DualReadModeProvider.DualReadMode.OLD_LB_ONLY, true, true}, + // XDS source is still secondary switching DUAL_READ -> OLD_LB_ONLY + {D2ClientJmxManager.DiscoverySourceType.XDS, DualReadModeProvider.DualReadMode.DUAL_READ, + DualReadModeProvider.DualReadMode.OLD_LB_ONLY, false, false}, + // ZK source is still primary switching NEW_LB_ONLY -> OLD_LB_ONLY + {D2ClientJmxManager.DiscoverySourceType.ZK, DualReadModeProvider.DualReadMode.NEW_LB_ONLY, + DualReadModeProvider.DualReadMode.OLD_LB_ONLY, false, true}, + // XDS source is still secondary switching NEW_LB_ONLY -> OLD_LB_ONLY + {D2ClientJmxManager.DiscoverySourceType.XDS, DualReadModeProvider.DualReadMode.NEW_LB_ONLY, + DualReadModeProvider.DualReadMode.OLD_LB_ONLY, true, false} + }; + } + @Test(dataProvider = "sourceTypeAndDualReadModeForLixSwitch") + public void testJmxNamesOnDualReadModeSwitch(D2ClientJmxManager.DiscoverySourceType sourceType, + DualReadModeProvider.DualReadMode oldMode, DualReadModeProvider.DualReadMode newMode, boolean isPrimaryBefore, boolean isPrimaryAfter) + { + D2ClientJmxManagerFixture fixture = new D2ClientJmxManagerFixture(); + D2ClientJmxManager d2ClientJmxManager = fixture.getD2ClientJmxManager("Foo", sourceType, true); + + DualReadStateManager dualReadStateManager = fixture._dualReadStateManager; + dualReadStateManager.updateGlobal(oldMode); + doReturn(oldMode).when(dualReadStateManager).getGlobalDualReadMode(); + doReturn(oldMode).when(dualReadStateManager).getServiceDualReadMode(any()); + doReturn(oldMode).when(dualReadStateManager).getClusterDualReadMode(any()); + + d2ClientJmxManager.setSimpleLoadBalancer(fixture._loadBalancer); + d2ClientJmxManager.setSimpleLoadBalancerState(fixture._simpleLoadBalancerState); + SimpleLoadBalancerState.SimpleLoadBalancerStateListener lbStateListener = fixture._simpleLoadBalancerStateListenerCaptor.getValue(); + lbStateListener.onServicePropertiesUpdate(SERVICE_PROPERTIES_LOAD_BALANCER_STATE_ITEM); + lbStateListener.onClusterInfoUpdate(fixture._clusterInfoItem); + lbStateListener.onStrategyAdded("S_Foo", "https", fixture._relativeLoadBalancerStrategy); + d2ClientJmxManager.setFsUriStore(fixture._uriStore); + d2ClientJmxManager.setFsClusterStore(fixture._clusterStore); + d2ClientJmxManager.setFsServiceStore(fixture._serviceStore); + + verifyJmxNames(fixture, sourceType, isPrimaryBefore, false); + + doReturn(newMode).when(dualReadStateManager).getGlobalDualReadMode(); + doReturn(newMode).when(dualReadStateManager).getServiceDualReadMode(any()); + doReturn(newMode).when(dualReadStateManager).getClusterDualReadMode(any()); + + // trigger notifying watchers + dualReadStateManager.updateGlobal(newMode); + dualReadStateManager.updateService("S_Foo", newMode); + dualReadStateManager.updateCluster("C_Foo", newMode); + + verifyJmxNames(fixture, sourceType, isPrimaryAfter, isPrimaryBefore == isPrimaryAfter); + } + + private void verifyJmxNames(D2ClientJmxManagerFixture fixture, D2ClientJmxManager.DiscoverySourceType sourceType, + boolean expectedToBePrimary, boolean calledTwice) + { + JmxManager jmxManager = fixture._jmxManager; + int callTimes = calledTwice ? 2 : 1; + if (expectedToBePrimary) + { + verify(jmxManager, times(callTimes)).registerLoadBalancer(eq("Foo-LoadBalancer"), eq(fixture._loadBalancer)); + verify(jmxManager, times(callTimes)).registerLoadBalancerState(eq("Foo-LoadBalancerState"), eq(fixture._simpleLoadBalancerState)); + verify(jmxManager, times(callTimes)).registerFileStore(eq("Foo-FileStoreUriStore"), eq(fixture._uriStore)); + verify(jmxManager, times(callTimes)).registerFileStore(eq("Foo-FileStoreClusterStore"), eq(fixture._clusterStore)); + verify(jmxManager, times(callTimes)).registerFileStore(eq("Foo-FileStoreServiceStore"), eq(fixture._serviceStore)); + verify(jmxManager, times(callTimes)).registerServiceProperties(eq("S_Foo-ServiceProperties"), eq(SERVICE_PROPERTIES_LOAD_BALANCER_STATE_ITEM)); + verify(jmxManager, times(callTimes)).registerClusterInfo(eq("C_Foo-ClusterInfo"), eq(fixture._clusterInfoItem)); + verify(jmxManager, times(callTimes)).registerLoadBalancerStrategy(eq("S_Foo-https-LoadBalancerStrategy"), eq(fixture._relativeLoadBalancerStrategy)); + } + else + { // secondary source, include source type name in jmx names + switch (sourceType) + { + case XDS: + verify(jmxManager, times(callTimes)).registerLoadBalancer(eq("Foo-xDS-LoadBalancer"), eq(fixture._loadBalancer)); + verify(jmxManager, times(callTimes)).registerLoadBalancerState(eq("Foo-xDS-LoadBalancerState"), eq(fixture._simpleLoadBalancerState)); + verify(jmxManager, times(callTimes)).registerFileStore(eq("Foo-xDS-FileStoreUriStore"), eq(fixture._uriStore)); + verify(jmxManager, times(callTimes)).registerFileStore(eq("Foo-xDS-FileStoreClusterStore"), eq(fixture._clusterStore)); + verify(jmxManager, times(callTimes)).registerFileStore(eq("Foo-xDS-FileStoreServiceStore"), eq(fixture._serviceStore)); + verify(jmxManager, times(callTimes)).registerServiceProperties(eq("xDS-S_Foo-ServiceProperties"), eq(SERVICE_PROPERTIES_LOAD_BALANCER_STATE_ITEM)); + verify(jmxManager, times(callTimes)).registerClusterInfo(eq("xDS-C_Foo-ClusterInfo"), eq(fixture._clusterInfoItem)); + verify(jmxManager, times(callTimes)).registerLoadBalancerStrategy(eq("xDS-S_Foo-https-LoadBalancerStrategy"), eq(fixture._relativeLoadBalancerStrategy)); + break; + case ZK: + verify(jmxManager, times(callTimes)).registerLoadBalancer(eq("Foo-ZK-LoadBalancer"), eq(fixture._loadBalancer)); + verify(jmxManager, times(callTimes)).registerLoadBalancerState(eq("Foo-ZK-LoadBalancerState"), eq(fixture._simpleLoadBalancerState)); + verify(jmxManager, times(callTimes)).registerFileStore(eq("Foo-ZK-FileStoreUriStore"), eq(fixture._uriStore)); + verify(jmxManager, times(callTimes)).registerFileStore(eq("Foo-ZK-FileStoreClusterStore"), eq(fixture._clusterStore)); + verify(jmxManager, times(callTimes)).registerFileStore(eq("Foo-ZK-FileStoreServiceStore"), eq(fixture._serviceStore)); + verify(jmxManager, times(callTimes)).registerServiceProperties(eq("ZK-S_Foo-ServiceProperties"), eq(SERVICE_PROPERTIES_LOAD_BALANCER_STATE_ITEM)); + verify(jmxManager, times(callTimes)).registerClusterInfo(eq("ZK-C_Foo-ClusterInfo"), eq(fixture._clusterInfoItem)); + verify(jmxManager, times(callTimes)).registerLoadBalancerStrategy(eq("ZK-S_Foo-https-LoadBalancerStrategy"), eq(fixture._relativeLoadBalancerStrategy)); + break; + default: + Assert.fail(String.format("Unknown source type: %s", sourceType)); + } + } } } diff --git a/gradle.properties b/gradle.properties index e844d0a81d..93304a77b9 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,4 +1,4 @@ -version=29.43.11 +version=29.44.0 group=com.linkedin.pegasus org.gradle.configureondemand=true org.gradle.parallel=true diff --git a/r2-int-test/src/test/java/test/r2/integ/TestJetty404.java b/r2-int-test/src/test/java/test/r2/integ/TestJetty404.java index 15241403f8..1d47bb6406 100644 --- a/r2-int-test/src/test/java/test/r2/integ/TestJetty404.java +++ b/r2-int-test/src/test/java/test/r2/integ/TestJetty404.java @@ -26,6 +26,7 @@ import com.linkedin.r2.transport.http.server.HttpServer; import com.linkedin.r2.transport.http.server.HttpServerFactory; import com.linkedin.test.util.retry.SingleRetry; +import com.linkedin.test.util.retry.ThreeRetries; import java.io.IOException; import java.net.URI; import java.util.Collections; @@ -74,7 +75,7 @@ public void handleStreamRequest(StreamRequest req, Map wireAttrs } // make sure jetty's default behavior will read all the request bytes in case of 404 - @Test(retryAnalyzer = SingleRetry.class) // Known to be flaky in CI + @Test(retryAnalyzer = ThreeRetries.class) // Known to be flaky in CI public void testJetty404() throws Exception { BytesWriter writer = new BytesWriter(200 * 1024, (byte)100); diff --git a/r2-netty/src/test/java/com/linkedin/r2/transport/http/client/TestChannelPoolManagerFactorySharingConnection.java b/r2-netty/src/test/java/com/linkedin/r2/transport/http/client/TestChannelPoolManagerFactorySharingConnection.java index 748b62137c..215ebd6dcd 100644 --- a/r2-netty/src/test/java/com/linkedin/r2/transport/http/client/TestChannelPoolManagerFactorySharingConnection.java +++ b/r2-netty/src/test/java/com/linkedin/r2/transport/http/client/TestChannelPoolManagerFactorySharingConnection.java @@ -16,6 +16,7 @@ package com.linkedin.r2.transport.http.client; +import com.linkedin.test.util.retry.ThreeRetries; import java.net.URI; import java.util.ArrayList; import java.util.HashMap; @@ -74,7 +75,7 @@ public static Object[][] configsOpenedConnections() * End to end test. Testing all the client combinations (http/https stream/rest sharing/not sharing) and check they * are using the same channelPoolManager */ - @Test(dataProvider = "configsOpenedConnections") + @Test(dataProvider = "configsOpenedConnections", retryAnalyzer = ThreeRetries.class) // Known to be flaky in CI public void testSuccessfulRequests(boolean restOverStream, String protocolVersion, boolean shareConnection) throws Exception { From 2dd4db837e41df3bd776b9dd54ad88b5ea1a854c Mon Sep 17 00:00:00 2001 From: Bohan Yang Date: Mon, 7 Aug 2023 16:54:03 -0700 Subject: [PATCH 3/4] move watcher management logic to a manager class --- ...D2ClientJmxDualReadModeWatcherManager.java | 235 ++++++++++++++++++ .../linkedin/d2/jmx/D2ClientJmxManager.java | 166 ++----------- .../d2/jmx/D2ClientJmxManagerTest.java | 12 +- 3 files changed, 261 insertions(+), 152 deletions(-) create mode 100644 d2/src/main/java/com/linkedin/d2/jmx/D2ClientJmxDualReadModeWatcherManager.java diff --git a/d2/src/main/java/com/linkedin/d2/jmx/D2ClientJmxDualReadModeWatcherManager.java b/d2/src/main/java/com/linkedin/d2/jmx/D2ClientJmxDualReadModeWatcherManager.java new file mode 100644 index 0000000000..7539bc9bea --- /dev/null +++ b/d2/src/main/java/com/linkedin/d2/jmx/D2ClientJmxDualReadModeWatcherManager.java @@ -0,0 +1,235 @@ +/* + Copyright (c) 2023 LinkedIn Corp. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package com.linkedin.d2.jmx; + +import com.google.common.annotations.VisibleForTesting; +import com.linkedin.d2.balancer.LoadBalancerStateItem; +import com.linkedin.d2.balancer.dualread.DualReadStateManager; +import com.linkedin.d2.balancer.properties.ClusterProperties; +import com.linkedin.d2.balancer.properties.ServiceProperties; +import com.linkedin.d2.balancer.properties.UriProperties; +import com.linkedin.d2.balancer.simple.ClusterInfoItem; +import com.linkedin.d2.balancer.simple.SimpleLoadBalancer; +import com.linkedin.d2.balancer.simple.SimpleLoadBalancerState; +import com.linkedin.d2.balancer.strategies.LoadBalancerStrategy; +import com.linkedin.d2.discovery.stores.file.FileStore; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.function.Consumer; + + +public class D2ClientJmxDualReadModeWatcherManager { + + private final DualReadStateManager _dualReadStateManager; + + private D2ClientJmxDualReadModeWatcher _lbDualReadModeWatcher; + private D2ClientJmxDualReadModeWatcher _lbStateDualReadModeWatcher; + private D2ClientJmxDualReadModeWatcher> _fileStoreUriPropertiesDualReadModeWatcher; + private D2ClientJmxDualReadModeWatcher> _fileStoreClusterPropertiesDualReadModeWatcher; + private D2ClientJmxDualReadModeWatcher> _fileStoreServicePropertiesDualReadModeWatcher; + private final ConcurrentMap>> + _servicePropertiesDualReadModeWatchers; + private final ConcurrentMap> _lbStrategyDualReadModeWatchers; + private final ConcurrentMap> _clusterInfoDualReadModeWatchers; + + public D2ClientJmxDualReadModeWatcherManager(DualReadStateManager dualReadStateManager) + { + _dualReadStateManager = dualReadStateManager; + _lbDualReadModeWatcher = null; + _lbStateDualReadModeWatcher = null; + _fileStoreUriPropertiesDualReadModeWatcher = null; + _fileStoreClusterPropertiesDualReadModeWatcher = null; + _fileStoreServicePropertiesDualReadModeWatcher = null; + _servicePropertiesDualReadModeWatchers = new ConcurrentHashMap<>(); + _lbStrategyDualReadModeWatchers = new ConcurrentHashMap<>(); + _clusterInfoDualReadModeWatchers = new ConcurrentHashMap<>(); + } + + public void updateWatcher(SimpleLoadBalancer balancer, Consumer callback) + { + if (_dualReadStateManager != null) + { + if (_lbDualReadModeWatcher == null) + { + _lbDualReadModeWatcher = new D2ClientJmxDualReadModeWatcher<>(balancer, callback); + _dualReadStateManager.addGlobalWatcher(_lbDualReadModeWatcher); + } + _lbDualReadModeWatcher.setLatestJmxProperty(balancer); + } + } + + public void updateWatcher(SimpleLoadBalancerState state, Consumer callback) + { + if (_dualReadStateManager != null) + { + if (_lbStateDualReadModeWatcher == null) + { + _lbStateDualReadModeWatcher = new D2ClientJmxDualReadModeWatcher<>(state, callback); + _dualReadStateManager.addGlobalWatcher(_lbStateDualReadModeWatcher); + } + _lbStateDualReadModeWatcher.setLatestJmxProperty(state); + } + } + + public void updateWatcher(String serviceName, String scheme, LoadBalancerStrategy strategy, Consumer callback) + { + if (_dualReadStateManager != null) + { + D2ClientJmxDualReadModeWatcher currentWatcher = + _lbStrategyDualReadModeWatchers.computeIfAbsent(getWatcherNameForLoadBalancerStrategy(serviceName, scheme), k -> + { + D2ClientJmxDualReadModeWatcher watcher = new D2ClientJmxDualReadModeWatcher<>(strategy, callback); + _dualReadStateManager.addServiceWatcher(serviceName, watcher); + return watcher; + }); + currentWatcher.setLatestJmxProperty(strategy); + } + } + + public void updateWatcher(String clusterName, ClusterInfoItem clusterInfoItem, Consumer callback) + { + if (_dualReadStateManager != null) + { + D2ClientJmxDualReadModeWatcher currentWatcher = + _clusterInfoDualReadModeWatchers.computeIfAbsent(clusterName, k -> + { + D2ClientJmxDualReadModeWatcher watcher = new D2ClientJmxDualReadModeWatcher<>(clusterInfoItem, callback); + _dualReadStateManager.addClusterWatcher(clusterName, watcher); + return watcher; + }); + currentWatcher.setLatestJmxProperty(clusterInfoItem); + } + } + + public void updateWatcher(String serviceName, LoadBalancerStateItem serviceProperties, + Consumer> callback) + { + if (_dualReadStateManager != null) + { + D2ClientJmxDualReadModeWatcher> currentWatcher = + _servicePropertiesDualReadModeWatchers.computeIfAbsent(serviceName, k -> + { + D2ClientJmxDualReadModeWatcher> watcher = + new D2ClientJmxDualReadModeWatcher<>(serviceProperties, callback); + _dualReadStateManager.addServiceWatcher(serviceName, watcher); + return watcher; + }); + currentWatcher.setLatestJmxProperty(serviceProperties); + } + } + + public void updateWatcherForFileStoreUriProperties(FileStore uriStore, Consumer> callback) + { + if (_dualReadStateManager != null) + { + if (_fileStoreUriPropertiesDualReadModeWatcher == null) + { + _fileStoreUriPropertiesDualReadModeWatcher = new D2ClientJmxDualReadModeWatcher<>(uriStore, callback); + _dualReadStateManager.addGlobalWatcher(_fileStoreUriPropertiesDualReadModeWatcher); + } + _fileStoreUriPropertiesDualReadModeWatcher.setLatestJmxProperty(uriStore); + } + } + + public void updateWatcherForFileStoreClusterProperties(FileStore clusterStore, Consumer> callback) + { + if (_dualReadStateManager != null) + { + if (_fileStoreClusterPropertiesDualReadModeWatcher == null) + { + _fileStoreClusterPropertiesDualReadModeWatcher = new D2ClientJmxDualReadModeWatcher<>(clusterStore, callback); + _dualReadStateManager.addGlobalWatcher(_fileStoreClusterPropertiesDualReadModeWatcher); + } + _fileStoreClusterPropertiesDualReadModeWatcher.setLatestJmxProperty(clusterStore); + } + } + + public void updateWatcherForFileStoreServiceProperties(FileStore serviceStore, Consumer> callback) + { + if (_dualReadStateManager != null) + { + if (_fileStoreServicePropertiesDualReadModeWatcher == null) + { + _fileStoreServicePropertiesDualReadModeWatcher = new D2ClientJmxDualReadModeWatcher<>(serviceStore, callback); + _dualReadStateManager.addGlobalWatcher(_fileStoreServicePropertiesDualReadModeWatcher); + } + _fileStoreServicePropertiesDualReadModeWatcher.setLatestJmxProperty(serviceStore); + } + } + + public void removeWatcherForLoadBalancerStrategy(String serviceName, String scheme) + { + DualReadStateManager.DualReadModeWatcher watcher = _lbStrategyDualReadModeWatchers.remove(getWatcherNameForLoadBalancerStrategy(serviceName, scheme)); + if (_dualReadStateManager != null && watcher != null) + { + _dualReadStateManager.removeServiceWatcher(serviceName, watcher); + } + } + + public void removeWatcherForClusterInfoItem(String clusterName) + { + DualReadStateManager.DualReadModeWatcher watcher = _clusterInfoDualReadModeWatchers.remove(clusterName); + if (_dualReadStateManager != null && watcher != null) + { + _dualReadStateManager.removeClusterWatcher(clusterName, watcher); + } + } + + public void removeWatcherForServiceProperties(String serviceName) + { + DualReadStateManager.DualReadModeWatcher watcher = _servicePropertiesDualReadModeWatchers.remove(serviceName); + if (_dualReadStateManager != null && watcher != null) + { + _dualReadStateManager.removeServiceWatcher(serviceName, watcher); + } + } + + private String getWatcherNameForLoadBalancerStrategy(String serviceName, String scheme) + { + return String.format("%s-%s", serviceName, scheme); + } + + + public static final class D2ClientJmxDualReadModeWatcher implements DualReadStateManager.DualReadModeWatcher + { + private T _latestJmxProperty; + private final Consumer _callback; + + D2ClientJmxDualReadModeWatcher(T initialJmxProperty, Consumer callback) + { + _latestJmxProperty = initialJmxProperty; + _callback = callback; + } + + @VisibleForTesting + T getLatestJmxProperty() + { + return _latestJmxProperty; + } + + public void setLatestJmxProperty(T latestJmxProperty) + { + _latestJmxProperty = latestJmxProperty; + } + + @Override + public void onChanged() + { + _callback.accept(_latestJmxProperty); + } + } +} diff --git a/d2/src/main/java/com/linkedin/d2/jmx/D2ClientJmxManager.java b/d2/src/main/java/com/linkedin/d2/jmx/D2ClientJmxManager.java index cc5ee6d12c..9c1d49ab4b 100644 --- a/d2/src/main/java/com/linkedin/d2/jmx/D2ClientJmxManager.java +++ b/d2/src/main/java/com/linkedin/d2/jmx/D2ClientJmxManager.java @@ -16,13 +16,14 @@ package com.linkedin.d2.jmx; -import com.google.common.annotations.VisibleForTesting; import com.linkedin.d2.balancer.LoadBalancerStateItem; import com.linkedin.d2.balancer.clients.TrackerClient; import com.linkedin.d2.balancer.dualread.DualReadLoadBalancerJmx; import com.linkedin.d2.balancer.dualread.DualReadModeProvider; import com.linkedin.d2.balancer.dualread.DualReadStateManager; +import com.linkedin.d2.balancer.properties.ClusterProperties; import com.linkedin.d2.balancer.properties.ServiceProperties; +import com.linkedin.d2.balancer.properties.UriProperties; import com.linkedin.d2.balancer.simple.ClusterInfoItem; import com.linkedin.d2.balancer.simple.SimpleLoadBalancer; import com.linkedin.d2.balancer.simple.SimpleLoadBalancerState; @@ -32,9 +33,6 @@ import com.linkedin.d2.discovery.stores.zk.ZooKeeperEphemeralStore; import com.linkedin.d2.discovery.stores.zk.ZooKeeperPermanentStore; import com.linkedin.util.ArgumentUtil; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.function.Consumer; import javax.annotation.Nonnull; import javax.annotation.Nullable; import org.slf4j.Logger; @@ -73,8 +71,8 @@ When dual read state manager is null, only one discovery source is working (coul private final String _secondaryPrefixForLbPropertyJmxName; - @SuppressWarnings("rawtypes") - private final ConcurrentMap _dualReadStateWatchers; + private final D2ClientJmxDualReadModeWatcherManager _watcherManager; + public enum DiscoverySourceType { @@ -111,40 +109,18 @@ public D2ClientJmxManager(String prefix, _dualReadStateManager = dualReadStateManager; _secondaryGlobalPrefix = String.format("%s-%s", _primaryGlobalPrefix, _discoverySourceType.getPrintName()); _secondaryPrefixForLbPropertyJmxName = String.format("%s-", _discoverySourceType.getPrintName()); - _dualReadStateWatchers = new ConcurrentHashMap<>(); + _watcherManager = new D2ClientJmxDualReadModeWatcherManager(_dualReadStateManager); } - @SuppressWarnings({"unchecked"}) public void setSimpleLoadBalancer(SimpleLoadBalancer balancer) { - if (_dualReadStateManager != null) - { - String watcherName = balancer.getClass().getSimpleName(); - D2ClientJmxDualReadModeWatcher currentWatcher = _dualReadStateWatchers.computeIfAbsent(watcherName, k -> - { - D2ClientJmxDualReadModeWatcher watcher = new D2ClientJmxDualReadModeWatcher<>(balancer, this::doRegisterLoadBalancer); - _dualReadStateManager.addGlobalWatcher(watcher); - return watcher; - }); - currentWatcher.setLatestJmxProperty(balancer); - } + _watcherManager.updateWatcher(balancer, this::doRegisterLoadBalancer); doRegisterLoadBalancer(balancer); } - @SuppressWarnings({"unchecked"}) public void setSimpleLoadBalancerState(SimpleLoadBalancerState state) { - if (_dualReadStateManager != null) - { - String watcherName = state.getClass().getSimpleName(); - D2ClientJmxDualReadModeWatcher currentWatcher = _dualReadStateWatchers.computeIfAbsent(watcherName, k -> - { - D2ClientJmxDualReadModeWatcher watcher = new D2ClientJmxDualReadModeWatcher<>(state, this::doRegisterLoadBalancerState); - _dualReadStateManager.addGlobalWatcher(watcher); - return watcher; - }); - currentWatcher.setLatestJmxProperty(state); - } + _watcherManager.updateWatcher(state, this::doRegisterLoadBalancerState); doRegisterLoadBalancerState(state); state.register(new SimpleLoadBalancerStateListener() @@ -152,29 +128,14 @@ public void setSimpleLoadBalancerState(SimpleLoadBalancerState state) @Override public void onStrategyAdded(String serviceName, String scheme, LoadBalancerStrategy strategy) { - if (_dualReadStateManager != null) - { - D2ClientJmxDualReadModeWatcher currentWatcher = - _dualReadStateWatchers.computeIfAbsent(getWatcherNameForLoadBalancerStrategy(serviceName, scheme), k -> - { - Consumer callback = i -> doRegisterLoadBalancerStrategy(serviceName, scheme, i); - D2ClientJmxDualReadModeWatcher watcher = new D2ClientJmxDualReadModeWatcher<>(strategy, callback); - _dualReadStateManager.addServiceWatcher(serviceName, watcher); - return watcher; - }); - currentWatcher.setLatestJmxProperty(strategy); - } + _watcherManager.updateWatcher(serviceName, scheme, strategy, i -> doRegisterLoadBalancerStrategy(serviceName, scheme, i)); doRegisterLoadBalancerStrategy(serviceName, scheme, strategy); } @Override public void onStrategyRemoved(String serviceName, String scheme, LoadBalancerStrategy strategy) { - DualReadStateManager.DualReadModeWatcher watcher = _dualReadStateWatchers.remove(getWatcherNameForLoadBalancerStrategy(serviceName, scheme)); - if (_dualReadStateManager != null && watcher != null) - { - _dualReadStateManager.removeServiceWatcher(serviceName, watcher); - } + _watcherManager.removeWatcherForLoadBalancerStrategy(serviceName, scheme); _jmxManager.unregister(getLoadBalancerStrategyJmxName(serviceName, scheme)); } @@ -200,18 +161,7 @@ public void onClusterInfoUpdate(ClusterInfoItem clusterInfoItem) && clusterInfoItem.getClusterPropertiesItem().getProperty() != null) { String clusterName = clusterInfoItem.getClusterPropertiesItem().getProperty().getClusterName(); - if (_dualReadStateManager != null) - { - D2ClientJmxDualReadModeWatcher currentWatcher = - _dualReadStateWatchers.computeIfAbsent(getWatcherNameForClusterInfoItem(clusterName), k -> - { - Consumer callback = i -> doRegisterClusterInfo(clusterName, i); - D2ClientJmxDualReadModeWatcher watcher = new D2ClientJmxDualReadModeWatcher<>(clusterInfoItem, callback); - _dualReadStateManager.addClusterWatcher(clusterName, watcher); - return watcher; - }); - currentWatcher.setLatestJmxProperty(clusterInfoItem); - } + _watcherManager.updateWatcher(clusterName, clusterInfoItem, i -> doRegisterClusterInfo(clusterName, i)); doRegisterClusterInfo(clusterName, clusterInfoItem); } } @@ -223,11 +173,7 @@ public void onClusterInfoRemoval(ClusterInfoItem clusterInfoItem) && clusterInfoItem.getClusterPropertiesItem().getProperty() != null) { String clusterName = clusterInfoItem.getClusterPropertiesItem().getProperty().getClusterName(); - DualReadStateManager.DualReadModeWatcher watcher = _dualReadStateWatchers.remove(getWatcherNameForClusterInfoItem(clusterName)); - if (_dualReadStateManager != null && watcher != null) - { - _dualReadStateManager.removeClusterWatcher(clusterName, watcher); - } + _watcherManager.removeWatcherForClusterInfoItem(clusterName); _jmxManager.unregister(getClusterInfoJmxName(clusterName)); } } @@ -238,19 +184,7 @@ public void onServicePropertiesUpdate(LoadBalancerStateItem s if (serviceProperties != null && serviceProperties.getProperty() != null) { String serviceName = serviceProperties.getProperty().getServiceName(); - if (_dualReadStateManager != null) - { - D2ClientJmxDualReadModeWatcher> currentWatcher = - _dualReadStateWatchers.computeIfAbsent(getWatcherNameForServiceProperties(serviceName), k -> - { - Consumer> callback = i -> doRegisterServiceProperties(serviceName, i); - D2ClientJmxDualReadModeWatcher> watcher = - new D2ClientJmxDualReadModeWatcher<>(serviceProperties, callback); - _dualReadStateManager.addServiceWatcher(serviceName, watcher); - return watcher; - }); - currentWatcher.setLatestJmxProperty(serviceProperties); - } + _watcherManager.updateWatcher(serviceName, serviceProperties, i -> doRegisterServiceProperties(serviceName, i)); doRegisterServiceProperties(serviceName, serviceProperties); } } @@ -261,11 +195,7 @@ public void onServicePropertiesRemoval(LoadBalancerStateItem if (serviceProperties != null && serviceProperties.getProperty() != null) { String serviceName = serviceProperties.getProperty().getServiceName(); - DualReadStateManager.DualReadModeWatcher watcher = _dualReadStateWatchers.remove(getWatcherNameForServiceProperties(serviceName)); - if (_dualReadStateManager != null && watcher != null) - { - _dualReadStateManager.removeServiceWatcher(serviceName, watcher); - } + _watcherManager.removeWatcherForServiceProperties(serviceName); _jmxManager.unregister(getServicePropertiesJmxName(serviceName)); } } @@ -287,19 +217,6 @@ private void doRegisterServiceProperties(String serviceName, LoadBalancerStateIt _jmxManager.registerServiceProperties(getServicePropertiesJmxName(serviceName), serviceProperties); } - private String getWatcherNameForLoadBalancerStrategy(String serviceName, String scheme) - { - return String.format("%s-%s-LoadBalancerStrategy", serviceName, scheme); - } - - private String getWatcherNameForClusterInfoItem(String clusterName) { - return String.format("%s-ClusterInfoItem", clusterName); - } - - private String getWatcherNameForServiceProperties(String serviceName) { - return String.format("%s-LoadBalancerStateItem-ServiceProperties", serviceName); - } - private String getClusterInfoJmxName(String clusterName) { return String.format("%s%s-ClusterInfo", getClusterPrefixForLBPropertyJmxNames(clusterName), clusterName); @@ -347,21 +264,21 @@ public void setZkServiceRegistry(ZooKeeperPermanentStore serviceRegistry) _jmxManager.registerZooKeeperPermanentStore(jmxName, serviceRegistry); } - public void setFsUriStore(FileStore uriStore) + public void setFsUriStore(FileStore uriStore) { - addDualReadModeWatcherForFileStore("UriProperties", uriStore, this::doRegisterUriFileStore); + _watcherManager.updateWatcherForFileStoreUriProperties(uriStore, this::doRegisterUriFileStore); doRegisterUriFileStore(uriStore); } - public void setFsClusterStore(FileStore clusterStore) + public void setFsClusterStore(FileStore clusterStore) { - addDualReadModeWatcherForFileStore("ClusterProperties", clusterStore, this::doRegisterClusterFileStore); + _watcherManager.updateWatcherForFileStoreClusterProperties(clusterStore, this::doRegisterClusterFileStore); doRegisterClusterFileStore(clusterStore); } - public void setFsServiceStore(FileStore serviceStore) + public void setFsServiceStore(FileStore serviceStore) { - addDualReadModeWatcherForFileStore("ServiceProperties", serviceStore, this::doRegisterServiceFileStore); + _watcherManager.updateWatcherForFileStoreServiceProperties(serviceStore, this::doRegisterServiceFileStore); doRegisterServiceFileStore(serviceStore); } @@ -405,22 +322,6 @@ private void doRegisterServiceFileStore(FileStore serviceStore) _jmxManager.registerFileStore(jmxName, serviceStore); } - @SuppressWarnings("unchecked") - private void addDualReadModeWatcherForFileStore(String watcherNameSuffix, FileStore store, Consumer> watcherCallback) - { - if (_dualReadStateManager != null) - { - String watcherName = String.format("%s-%s", store.getClass().getSimpleName(), watcherNameSuffix); - D2ClientJmxDualReadModeWatcher> currentWatcher = _dualReadStateWatchers.computeIfAbsent(watcherName, k -> - { - D2ClientJmxDualReadModeWatcher> watcher = new D2ClientJmxDualReadModeWatcher<>(store, watcherCallback); - _dualReadStateManager.addGlobalWatcher(watcher); - return watcher; - }); - currentWatcher.setLatestJmxProperty(store); - } - } - private String getGlobalPrefix() { return isGlobalPrimarySource() ? _primaryGlobalPrefix : _secondaryGlobalPrefix; @@ -477,33 +378,4 @@ private boolean isPrimarySourceHelper(DualReadModeProvider.DualReadMode dualRead return _discoverySourceType == DiscoverySourceType.ZK; } } - - static final class D2ClientJmxDualReadModeWatcher implements DualReadStateManager.DualReadModeWatcher - { - private T _latestJmxProperty; - private final Consumer _callback; - - D2ClientJmxDualReadModeWatcher(T initialJmxProperty, Consumer callback) - { - _latestJmxProperty = initialJmxProperty; - _callback = callback; - } - - @VisibleForTesting - T getLatestJmxProperty() - { - return _latestJmxProperty; - } - - public void setLatestJmxProperty(T latestJmxProperty) - { - _latestJmxProperty = latestJmxProperty; - } - - @Override - public void onChanged() - { - _callback.accept(_latestJmxProperty); - } - } } diff --git a/d2/src/test/java/com/linkedin/d2/jmx/D2ClientJmxManagerTest.java b/d2/src/test/java/com/linkedin/d2/jmx/D2ClientJmxManagerTest.java index 5cbfc105bd..14d4028353 100644 --- a/d2/src/test/java/com/linkedin/d2/jmx/D2ClientJmxManagerTest.java +++ b/d2/src/test/java/com/linkedin/d2/jmx/D2ClientJmxManagerTest.java @@ -100,7 +100,7 @@ private static final class D2ClientJmxManagerFixture ArgumentCaptor> _servicePropertiesArgumentCaptor; @SuppressWarnings("rawtypes") @Captor - ArgumentCaptor _addWatcherCaptor; + ArgumentCaptor _addWatcherCaptor; D2ClientJmxManager _d2ClientJmxManager; private final ClusterInfoItem _clusterInfoItem; @@ -277,18 +277,19 @@ public void testAddAndRemoveWatcherAtServicePropertiesUpdate() D2ClientJmxManager d2ClientJmxManager = fixture.getD2ClientJmxManager("Foo", D2ClientJmxManager.DiscoverySourceType.XDS, true); // Initial dual read mode is ZK only. DualReadStateManager dualReadStateManager = fixture._dualReadStateManager; + dualReadStateManager.updateGlobal(DualReadModeProvider.DualReadMode.OLD_LB_ONLY); Mockito.doReturn(DualReadModeProvider.DualReadMode.OLD_LB_ONLY).when(dualReadStateManager).getGlobalDualReadMode(); Mockito.doReturn(DualReadModeProvider.DualReadMode.OLD_LB_ONLY).when(dualReadStateManager).getServiceDualReadMode(any()); Mockito.doReturn(DualReadModeProvider.DualReadMode.OLD_LB_ONLY).when(dualReadStateManager).getClusterDualReadMode(any()); d2ClientJmxManager.setSimpleLoadBalancerState(fixture._simpleLoadBalancerState); SimpleLoadBalancerState.SimpleLoadBalancerStateListener lbStateListener = fixture._simpleLoadBalancerStateListenerCaptor.getValue(); - ArgumentCaptor addWatcherCaptor = fixture._addWatcherCaptor; + ArgumentCaptor addWatcherCaptor = fixture._addWatcherCaptor; lbStateListener.onServicePropertiesUpdate(SERVICE_PROPERTIES_LOAD_BALANCER_STATE_ITEM); // Verify watcher is added with properties inside verify(dualReadStateManager).addServiceWatcher(eq("S_Foo"), addWatcherCaptor.capture()); - D2ClientJmxManager.D2ClientJmxDualReadModeWatcher> watcher = addWatcherCaptor.getValue(); + D2ClientJmxDualReadModeWatcherManager.D2ClientJmxDualReadModeWatcher> watcher = addWatcherCaptor.getValue(); Assert.assertEquals(watcher.getLatestJmxProperty(), SERVICE_PROPERTIES_LOAD_BALANCER_STATE_ITEM); lbStateListener.onServicePropertiesUpdate(UPDATED_SERVICE_PROPERTIES_LB_STATE_ITEM); @@ -310,18 +311,19 @@ public void testAddAndRemoveWatcherAtClusterInfoItemUpdate() d2ClientJmxManager.setSimpleLoadBalancerState(fixture._simpleLoadBalancerState); // Initial dual read mode is ZK only. DualReadStateManager dualReadStateManager = fixture._dualReadStateManager; + dualReadStateManager.updateGlobal(DualReadModeProvider.DualReadMode.OLD_LB_ONLY); Mockito.doReturn(DualReadModeProvider.DualReadMode.OLD_LB_ONLY).when(dualReadStateManager).getGlobalDualReadMode(); Mockito.doReturn(DualReadModeProvider.DualReadMode.OLD_LB_ONLY).when(dualReadStateManager).getServiceDualReadMode(any()); Mockito.doReturn(DualReadModeProvider.DualReadMode.OLD_LB_ONLY).when(dualReadStateManager).getClusterDualReadMode(any()); SimpleLoadBalancerState.SimpleLoadBalancerStateListener lbStateListener = fixture._simpleLoadBalancerStateListenerCaptor.getValue(); - ArgumentCaptor addWatcherCaptor = fixture._addWatcherCaptor; + ArgumentCaptor addWatcherCaptor = fixture._addWatcherCaptor; lbStateListener.onClusterInfoUpdate(fixture._clusterInfoItem); // Verify watcher is added with properties inside verify(dualReadStateManager).addClusterWatcher(eq("C_Foo"), addWatcherCaptor.capture()); - D2ClientJmxManager.D2ClientJmxDualReadModeWatcher watcher = addWatcherCaptor.getValue(); + D2ClientJmxDualReadModeWatcherManager.D2ClientJmxDualReadModeWatcher watcher = addWatcherCaptor.getValue(); Assert.assertEquals(watcher.getLatestJmxProperty(), fixture._clusterInfoItem); lbStateListener.onClusterInfoUpdate(fixture._updatedClusterInfoItem); From 2901b636045e9fb93cd2e9081ff44d712b3485d8 Mon Sep 17 00:00:00 2001 From: Bohan Yang Date: Mon, 14 Aug 2023 09:00:34 -0700 Subject: [PATCH 4/4] pass updated dual read mode to watcher callbacks --- .../dualread/DualReadStateManager.java | 30 +-- ...D2ClientJmxDualReadModeWatcherManager.java | 201 +++--------------- .../linkedin/d2/jmx/D2ClientJmxManager.java | 132 ++++++------ ...ientJmxDualReadModeWatcherManagerImpl.java | 192 +++++++++++++++++ ...ientJmxDualReadModeWatcherManagerImpl.java | 100 +++++++++ 5 files changed, 412 insertions(+), 243 deletions(-) create mode 100644 d2/src/main/java/com/linkedin/d2/jmx/DefaultD2ClientJmxDualReadModeWatcherManagerImpl.java create mode 100644 d2/src/main/java/com/linkedin/d2/jmx/NoOpD2ClientJmxDualReadModeWatcherManagerImpl.java diff --git a/d2/src/main/java/com/linkedin/d2/balancer/dualread/DualReadStateManager.java b/d2/src/main/java/com/linkedin/d2/balancer/dualread/DualReadStateManager.java index 3318f8d0bb..2ea8df28a3 100644 --- a/d2/src/main/java/com/linkedin/d2/balancer/dualread/DualReadStateManager.java +++ b/d2/src/main/java/com/linkedin/d2/balancer/dualread/DualReadStateManager.java @@ -26,6 +26,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ScheduledExecutorService; +import javax.annotation.Nonnull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -89,7 +90,7 @@ public void updateGlobal(DualReadModeProvider.DualReadMode mode) _dualReadMode = mode; if (updated) { LOG.info("Global dual read mode updated: {}", mode); - notifyGlobalWatchers(); + notifyGlobalWatchers(_dualReadMode); } } @@ -98,7 +99,7 @@ public void updateService(String service, DualReadModeProvider.DualReadMode mode DualReadModeProvider.DualReadMode oldMode = _serviceDualReadModes.put(service, mode); if (oldMode != mode) { LOG.info("Dual read mode for service {} updated: {}", service, mode); - notifyServiceWatchers(service); + notifyServiceWatchers(service, mode); } } @@ -107,7 +108,7 @@ public void updateCluster(String cluster, DualReadModeProvider.DualReadMode mode DualReadModeProvider.DualReadMode oldMode = _clusterDualReadModes.put(cluster, mode); if (oldMode != mode) { LOG.info("Dual read mode for cluster {} updated: {}", cluster, mode); - notifyClusterWatchers(cluster); + notifyClusterWatchers(cluster, mode); } } @@ -209,23 +210,27 @@ public DualReadModeProvider getDualReadModeProvider() return _dualReadModeProvider; } + // Add watchers watching for global dual read mode. The watcher will be notified when the global dual read mode changes. public void addGlobalWatcher(DualReadModeWatcher watcher) { _globalDualReadModeWatchers.add(watcher); } + // Add watchers watching for dual read mode of a service. The watcher will be notified when the dual read mode changes. public void addServiceWatcher(String serviceName, DualReadModeWatcher watcher) { Set watchers = _serviceDualReadModeWatchers.computeIfAbsent(serviceName, k -> ConcurrentHashMap.newKeySet()); watchers.add(watcher); } + // Add watchers watching for dual read mode of a cluster. The watcher will be notified when the dual read mode changes. public void addClusterWatcher(String clusterName, DualReadModeWatcher watcher) { Set watchers = _clusterDualReadModeWatchers.computeIfAbsent(clusterName, k -> ConcurrentHashMap.newKeySet()); watchers.add(watcher); } + // Remove watchers for dual read mode of a service. public void removeServiceWatcher(String serviceName, DualReadModeWatcher watcher) { Set watchers = _serviceDualReadModeWatchers.get(serviceName); @@ -235,6 +240,7 @@ public void removeServiceWatcher(String serviceName, DualReadModeWatcher watcher } } + // Remove watchers for dual read mode of a cluster. public void removeClusterWatcher(String clusterName, DualReadModeWatcher watcher) { Set watchers = _clusterDualReadModeWatchers.get(clusterName); @@ -244,34 +250,34 @@ public void removeClusterWatcher(String clusterName, DualReadModeWatcher watcher } } - private void notifyGlobalWatchers() + private void notifyGlobalWatchers(DualReadModeProvider.DualReadMode mode) { - notifyWatchers(_globalDualReadModeWatchers); + notifyWatchers(_globalDualReadModeWatchers, mode); } - private void notifyServiceWatchers(String serviceName) + private void notifyServiceWatchers(String serviceName, DualReadModeProvider.DualReadMode mode) { - notifyWatchers(_serviceDualReadModeWatchers.get(serviceName)); + notifyWatchers(_serviceDualReadModeWatchers.get(serviceName), mode); } - private void notifyClusterWatchers(String clusterName) + private void notifyClusterWatchers(String clusterName, DualReadModeProvider.DualReadMode mode) { - notifyWatchers(_clusterDualReadModeWatchers.get(clusterName)); + notifyWatchers(_clusterDualReadModeWatchers.get(clusterName), mode); } - private void notifyWatchers(Set watchers) + private static void notifyWatchers(Set watchers, DualReadModeProvider.DualReadMode mode) { if (watchers != null) { for (DualReadModeWatcher w : watchers) { - w.onChanged(); + w.onChanged(mode); } } } public interface DualReadModeWatcher { - void onChanged(); + void onChanged(@Nonnull DualReadModeProvider.DualReadMode mode); } } diff --git a/d2/src/main/java/com/linkedin/d2/jmx/D2ClientJmxDualReadModeWatcherManager.java b/d2/src/main/java/com/linkedin/d2/jmx/D2ClientJmxDualReadModeWatcherManager.java index 7539bc9bea..fc4ffd865b 100644 --- a/d2/src/main/java/com/linkedin/d2/jmx/D2ClientJmxDualReadModeWatcherManager.java +++ b/d2/src/main/java/com/linkedin/d2/jmx/D2ClientJmxDualReadModeWatcherManager.java @@ -16,8 +16,8 @@ package com.linkedin.d2.jmx; -import com.google.common.annotations.VisibleForTesting; import com.linkedin.d2.balancer.LoadBalancerStateItem; +import com.linkedin.d2.balancer.dualread.DualReadModeProvider; import com.linkedin.d2.balancer.dualread.DualReadStateManager; import com.linkedin.d2.balancer.properties.ClusterProperties; import com.linkedin.d2.balancer.properties.ServiceProperties; @@ -27,196 +27,57 @@ import com.linkedin.d2.balancer.simple.SimpleLoadBalancerState; import com.linkedin.d2.balancer.strategies.LoadBalancerStrategy; import com.linkedin.d2.discovery.stores.file.FileStore; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.function.Consumer; +import java.util.function.BiConsumer; +import javax.annotation.Nonnull; -public class D2ClientJmxDualReadModeWatcherManager { +/** + * Manage d2 client jmx dual read mode watchers for different types of load balancing related properties. + */ +public interface D2ClientJmxDualReadModeWatcherManager +{ - private final DualReadStateManager _dualReadStateManager; + void updateWatcher(SimpleLoadBalancer balancer, BiConsumer callback); - private D2ClientJmxDualReadModeWatcher _lbDualReadModeWatcher; - private D2ClientJmxDualReadModeWatcher _lbStateDualReadModeWatcher; - private D2ClientJmxDualReadModeWatcher> _fileStoreUriPropertiesDualReadModeWatcher; - private D2ClientJmxDualReadModeWatcher> _fileStoreClusterPropertiesDualReadModeWatcher; - private D2ClientJmxDualReadModeWatcher> _fileStoreServicePropertiesDualReadModeWatcher; - private final ConcurrentMap>> - _servicePropertiesDualReadModeWatchers; - private final ConcurrentMap> _lbStrategyDualReadModeWatchers; - private final ConcurrentMap> _clusterInfoDualReadModeWatchers; + void updateWatcher(SimpleLoadBalancerState state, BiConsumer callback); - public D2ClientJmxDualReadModeWatcherManager(DualReadStateManager dualReadStateManager) - { - _dualReadStateManager = dualReadStateManager; - _lbDualReadModeWatcher = null; - _lbStateDualReadModeWatcher = null; - _fileStoreUriPropertiesDualReadModeWatcher = null; - _fileStoreClusterPropertiesDualReadModeWatcher = null; - _fileStoreServicePropertiesDualReadModeWatcher = null; - _servicePropertiesDualReadModeWatchers = new ConcurrentHashMap<>(); - _lbStrategyDualReadModeWatchers = new ConcurrentHashMap<>(); - _clusterInfoDualReadModeWatchers = new ConcurrentHashMap<>(); - } - - public void updateWatcher(SimpleLoadBalancer balancer, Consumer callback) - { - if (_dualReadStateManager != null) - { - if (_lbDualReadModeWatcher == null) - { - _lbDualReadModeWatcher = new D2ClientJmxDualReadModeWatcher<>(balancer, callback); - _dualReadStateManager.addGlobalWatcher(_lbDualReadModeWatcher); - } - _lbDualReadModeWatcher.setLatestJmxProperty(balancer); - } - } - - public void updateWatcher(SimpleLoadBalancerState state, Consumer callback) - { - if (_dualReadStateManager != null) - { - if (_lbStateDualReadModeWatcher == null) - { - _lbStateDualReadModeWatcher = new D2ClientJmxDualReadModeWatcher<>(state, callback); - _dualReadStateManager.addGlobalWatcher(_lbStateDualReadModeWatcher); - } - _lbStateDualReadModeWatcher.setLatestJmxProperty(state); - } - } - - public void updateWatcher(String serviceName, String scheme, LoadBalancerStrategy strategy, Consumer callback) - { - if (_dualReadStateManager != null) - { - D2ClientJmxDualReadModeWatcher currentWatcher = - _lbStrategyDualReadModeWatchers.computeIfAbsent(getWatcherNameForLoadBalancerStrategy(serviceName, scheme), k -> - { - D2ClientJmxDualReadModeWatcher watcher = new D2ClientJmxDualReadModeWatcher<>(strategy, callback); - _dualReadStateManager.addServiceWatcher(serviceName, watcher); - return watcher; - }); - currentWatcher.setLatestJmxProperty(strategy); - } - } + void updateWatcher(String serviceName, String scheme, LoadBalancerStrategy strategy, + BiConsumer callback); - public void updateWatcher(String clusterName, ClusterInfoItem clusterInfoItem, Consumer callback) - { - if (_dualReadStateManager != null) - { - D2ClientJmxDualReadModeWatcher currentWatcher = - _clusterInfoDualReadModeWatchers.computeIfAbsent(clusterName, k -> - { - D2ClientJmxDualReadModeWatcher watcher = new D2ClientJmxDualReadModeWatcher<>(clusterInfoItem, callback); - _dualReadStateManager.addClusterWatcher(clusterName, watcher); - return watcher; - }); - currentWatcher.setLatestJmxProperty(clusterInfoItem); - } - } - - public void updateWatcher(String serviceName, LoadBalancerStateItem serviceProperties, - Consumer> callback) - { - if (_dualReadStateManager != null) - { - D2ClientJmxDualReadModeWatcher> currentWatcher = - _servicePropertiesDualReadModeWatchers.computeIfAbsent(serviceName, k -> - { - D2ClientJmxDualReadModeWatcher> watcher = - new D2ClientJmxDualReadModeWatcher<>(serviceProperties, callback); - _dualReadStateManager.addServiceWatcher(serviceName, watcher); - return watcher; - }); - currentWatcher.setLatestJmxProperty(serviceProperties); - } - } + void updateWatcher(String clusterName, ClusterInfoItem clusterInfoItem, + BiConsumer callback); - public void updateWatcherForFileStoreUriProperties(FileStore uriStore, Consumer> callback) - { - if (_dualReadStateManager != null) - { - if (_fileStoreUriPropertiesDualReadModeWatcher == null) - { - _fileStoreUriPropertiesDualReadModeWatcher = new D2ClientJmxDualReadModeWatcher<>(uriStore, callback); - _dualReadStateManager.addGlobalWatcher(_fileStoreUriPropertiesDualReadModeWatcher); - } - _fileStoreUriPropertiesDualReadModeWatcher.setLatestJmxProperty(uriStore); - } - } + void updateWatcher(String serviceName, LoadBalancerStateItem serviceProperties, + BiConsumer, DualReadModeProvider.DualReadMode> callback); - public void updateWatcherForFileStoreClusterProperties(FileStore clusterStore, Consumer> callback) - { - if (_dualReadStateManager != null) - { - if (_fileStoreClusterPropertiesDualReadModeWatcher == null) - { - _fileStoreClusterPropertiesDualReadModeWatcher = new D2ClientJmxDualReadModeWatcher<>(clusterStore, callback); - _dualReadStateManager.addGlobalWatcher(_fileStoreClusterPropertiesDualReadModeWatcher); - } - _fileStoreClusterPropertiesDualReadModeWatcher.setLatestJmxProperty(clusterStore); - } - } + void updateWatcherForFileStoreUriProperties(FileStore uriStore, + BiConsumer, DualReadModeProvider.DualReadMode> callback); - public void updateWatcherForFileStoreServiceProperties(FileStore serviceStore, Consumer> callback) - { - if (_dualReadStateManager != null) - { - if (_fileStoreServicePropertiesDualReadModeWatcher == null) - { - _fileStoreServicePropertiesDualReadModeWatcher = new D2ClientJmxDualReadModeWatcher<>(serviceStore, callback); - _dualReadStateManager.addGlobalWatcher(_fileStoreServicePropertiesDualReadModeWatcher); - } - _fileStoreServicePropertiesDualReadModeWatcher.setLatestJmxProperty(serviceStore); - } - } + void updateWatcherForFileStoreClusterProperties(FileStore clusterStore, + BiConsumer, DualReadModeProvider.DualReadMode> callback); - public void removeWatcherForLoadBalancerStrategy(String serviceName, String scheme) - { - DualReadStateManager.DualReadModeWatcher watcher = _lbStrategyDualReadModeWatchers.remove(getWatcherNameForLoadBalancerStrategy(serviceName, scheme)); - if (_dualReadStateManager != null && watcher != null) - { - _dualReadStateManager.removeServiceWatcher(serviceName, watcher); - } - } + void updateWatcherForFileStoreServiceProperties(FileStore serviceStore, + BiConsumer, DualReadModeProvider.DualReadMode> callback); - public void removeWatcherForClusterInfoItem(String clusterName) - { - DualReadStateManager.DualReadModeWatcher watcher = _clusterInfoDualReadModeWatchers.remove(clusterName); - if (_dualReadStateManager != null && watcher != null) - { - _dualReadStateManager.removeClusterWatcher(clusterName, watcher); - } - } + void removeWatcherForLoadBalancerStrategy(String serviceName, String scheme); - public void removeWatcherForServiceProperties(String serviceName) - { - DualReadStateManager.DualReadModeWatcher watcher = _servicePropertiesDualReadModeWatchers.remove(serviceName); - if (_dualReadStateManager != null && watcher != null) - { - _dualReadStateManager.removeServiceWatcher(serviceName, watcher); - } - } + void removeWatcherForClusterInfoItem(String clusterName); - private String getWatcherNameForLoadBalancerStrategy(String serviceName, String scheme) - { - return String.format("%s-%s", serviceName, scheme); - } + void removeWatcherForServiceProperties(String serviceName); - public static final class D2ClientJmxDualReadModeWatcher implements DualReadStateManager.DualReadModeWatcher + final class D2ClientJmxDualReadModeWatcher implements DualReadStateManager.DualReadModeWatcher { private T _latestJmxProperty; - private final Consumer _callback; + private final BiConsumer _callback; - D2ClientJmxDualReadModeWatcher(T initialJmxProperty, Consumer callback) + D2ClientJmxDualReadModeWatcher(T initialJmxProperty, BiConsumercallback) { _latestJmxProperty = initialJmxProperty; _callback = callback; } - @VisibleForTesting - T getLatestJmxProperty() + public T getLatestJmxProperty() { return _latestJmxProperty; } @@ -227,9 +88,9 @@ public void setLatestJmxProperty(T latestJmxProperty) } @Override - public void onChanged() + public void onChanged(@Nonnull DualReadModeProvider.DualReadMode mode) { - _callback.accept(_latestJmxProperty); + _callback.accept(_latestJmxProperty, mode); } } } diff --git a/d2/src/main/java/com/linkedin/d2/jmx/D2ClientJmxManager.java b/d2/src/main/java/com/linkedin/d2/jmx/D2ClientJmxManager.java index 9c1d49ab4b..a7f02cc14d 100644 --- a/d2/src/main/java/com/linkedin/d2/jmx/D2ClientJmxManager.java +++ b/d2/src/main/java/com/linkedin/d2/jmx/D2ClientJmxManager.java @@ -51,13 +51,13 @@ public class D2ClientJmxManager private final DiscoverySourceType _discoverySourceType; /* - When dual read state manager is null, only one discovery source is working (could be a new source other than ZK). We keep using the same Jmx/sensor - names as the ZK one so users can still monitor the same metrics. + When dual read state manager is null, only one discovery source is working (could be a new source other than ZK). We keep using + the same Jmx/sensor names as the ZK one so users can still monitor the same metrics. - When dual read state manager is not null, it means dual read load balancer is in use, and there are two sets of load balancer, lb state, and FS backup - registering Jmx/sensors for different service discovery sources. - Depending on the specific dual read mode that is dynamically changing, controlled by lix on d2 service level, one source is primary, the other is secondary. - Jmx/sensor names need to be carefully handled to: + When dual read state manager is not null, it means dual read load balancer is in use, and there are two sets of load balancer, lb + state, and FS backup registering Jmx/sensors for different service discovery sources. + Depending on the specific dual read mode that is dynamically changing, controlled by lix on d2 service level, one source is primary, + the other is secondary. Jmx/sensor names need to be carefully handled to: 1) for the primary source, use the primary names (the one ZK was using) so users can still monitor the same metrics. 2) for the secondary source, use different names that include the source type to avoid conflicting the primary names. */ @@ -67,7 +67,7 @@ When dual read state manager is null, only one discovery source is working (coul private final String _secondaryGlobalPrefix; - private static final String _primaryPrefixForLbPropertyJmxName = ""; + private static final String PRIMARY_PREFIX_FOR_LB_PROPERTY_JMX_NAME = ""; private final String _secondaryPrefixForLbPropertyJmxName; @@ -109,34 +109,36 @@ public D2ClientJmxManager(String prefix, _dualReadStateManager = dualReadStateManager; _secondaryGlobalPrefix = String.format("%s-%s", _primaryGlobalPrefix, _discoverySourceType.getPrintName()); _secondaryPrefixForLbPropertyJmxName = String.format("%s-", _discoverySourceType.getPrintName()); - _watcherManager = new D2ClientJmxDualReadModeWatcherManager(_dualReadStateManager); + _watcherManager = _dualReadStateManager == null ? new NoOpD2ClientJmxDualReadModeWatcherManagerImpl() + : new DefaultD2ClientJmxDualReadModeWatcherManagerImpl(_dualReadStateManager); } public void setSimpleLoadBalancer(SimpleLoadBalancer balancer) { _watcherManager.updateWatcher(balancer, this::doRegisterLoadBalancer); - doRegisterLoadBalancer(balancer); + doRegisterLoadBalancer(balancer, null); } public void setSimpleLoadBalancerState(SimpleLoadBalancerState state) { _watcherManager.updateWatcher(state, this::doRegisterLoadBalancerState); - doRegisterLoadBalancerState(state); + doRegisterLoadBalancerState(state, null); state.register(new SimpleLoadBalancerStateListener() { @Override public void onStrategyAdded(String serviceName, String scheme, LoadBalancerStrategy strategy) { - _watcherManager.updateWatcher(serviceName, scheme, strategy, i -> doRegisterLoadBalancerStrategy(serviceName, scheme, i)); - doRegisterLoadBalancerStrategy(serviceName, scheme, strategy); + _watcherManager.updateWatcher(serviceName, scheme, strategy, + (item, mode) -> doRegisterLoadBalancerStrategy(serviceName, scheme, item, mode)); + doRegisterLoadBalancerStrategy(serviceName, scheme, strategy, null); } @Override public void onStrategyRemoved(String serviceName, String scheme, LoadBalancerStrategy strategy) { _watcherManager.removeWatcherForLoadBalancerStrategy(serviceName, scheme); - _jmxManager.unregister(getLoadBalancerStrategyJmxName(serviceName, scheme)); + _jmxManager.unregister(getLoadBalancerStrategyJmxName(serviceName, scheme, null)); } @Override @@ -161,8 +163,9 @@ public void onClusterInfoUpdate(ClusterInfoItem clusterInfoItem) && clusterInfoItem.getClusterPropertiesItem().getProperty() != null) { String clusterName = clusterInfoItem.getClusterPropertiesItem().getProperty().getClusterName(); - _watcherManager.updateWatcher(clusterName, clusterInfoItem, i -> doRegisterClusterInfo(clusterName, i)); - doRegisterClusterInfo(clusterName, clusterInfoItem); + _watcherManager.updateWatcher(clusterName, clusterInfoItem, + (item, mode) -> doRegisterClusterInfo(clusterName, item, mode)); + doRegisterClusterInfo(clusterName, clusterInfoItem, null); } } @@ -174,7 +177,7 @@ public void onClusterInfoRemoval(ClusterInfoItem clusterInfoItem) { String clusterName = clusterInfoItem.getClusterPropertiesItem().getProperty().getClusterName(); _watcherManager.removeWatcherForClusterInfoItem(clusterName); - _jmxManager.unregister(getClusterInfoJmxName(clusterName)); + _jmxManager.unregister(getClusterInfoJmxName(clusterName, null)); } } @@ -184,8 +187,9 @@ public void onServicePropertiesUpdate(LoadBalancerStateItem s if (serviceProperties != null && serviceProperties.getProperty() != null) { String serviceName = serviceProperties.getProperty().getServiceName(); - _watcherManager.updateWatcher(serviceName, serviceProperties, i -> doRegisterServiceProperties(serviceName, i)); - doRegisterServiceProperties(serviceName, serviceProperties); + _watcherManager.updateWatcher(serviceName, serviceProperties, + (item, mode) -> doRegisterServiceProperties(serviceName, item, mode)); + doRegisterServiceProperties(serviceName, serviceProperties, null); } } @@ -196,40 +200,43 @@ public void onServicePropertiesRemoval(LoadBalancerStateItem { String serviceName = serviceProperties.getProperty().getServiceName(); _watcherManager.removeWatcherForServiceProperties(serviceName); - _jmxManager.unregister(getServicePropertiesJmxName(serviceName)); + _jmxManager.unregister(getServicePropertiesJmxName(serviceName, null)); } } - private void doRegisterLoadBalancerStrategy(String serviceName, String scheme, LoadBalancerStrategy strategy) + private void doRegisterLoadBalancerStrategy(String serviceName, String scheme, LoadBalancerStrategy strategy, + @Nullable DualReadModeProvider.DualReadMode mode) { - String jmxName = getLoadBalancerStrategyJmxName(serviceName, scheme); + String jmxName = getLoadBalancerStrategyJmxName(serviceName, scheme, mode); _jmxManager.registerLoadBalancerStrategy(jmxName, strategy); } - private void doRegisterClusterInfo(String clusterName, ClusterInfoItem clusterInfoItem) + private void doRegisterClusterInfo(String clusterName, ClusterInfoItem clusterInfoItem, + @Nullable DualReadModeProvider.DualReadMode mode) { - String jmxName = getClusterInfoJmxName(clusterName); + String jmxName = getClusterInfoJmxName(clusterName, mode); _jmxManager.registerClusterInfo(jmxName, clusterInfoItem); } - private void doRegisterServiceProperties(String serviceName, LoadBalancerStateItem serviceProperties) + private void doRegisterServiceProperties(String serviceName, LoadBalancerStateItem serviceProperties, + @Nullable DualReadModeProvider.DualReadMode mode) { - _jmxManager.registerServiceProperties(getServicePropertiesJmxName(serviceName), serviceProperties); + _jmxManager.registerServiceProperties(getServicePropertiesJmxName(serviceName, mode), serviceProperties); } - private String getClusterInfoJmxName(String clusterName) + private String getClusterInfoJmxName(String clusterName, @Nullable DualReadModeProvider.DualReadMode mode) { - return String.format("%s%s-ClusterInfo", getClusterPrefixForLBPropertyJmxNames(clusterName), clusterName); + return String.format("%s%s-ClusterInfo", getClusterPrefixForLBPropertyJmxNames(clusterName, mode), clusterName); } - private String getServicePropertiesJmxName(String serviceName) + private String getServicePropertiesJmxName(String serviceName, @Nullable DualReadModeProvider.DualReadMode mode) { - return String.format("%s%s-ServiceProperties", getServicePrefixForLBPropertyJmxNames(serviceName), serviceName); + return String.format("%s%s-ServiceProperties", getServicePrefixForLBPropertyJmxNames(serviceName, mode), serviceName); } - private String getLoadBalancerStrategyJmxName(String serviceName, String scheme) + private String getLoadBalancerStrategyJmxName(String serviceName, String scheme, @Nullable DualReadModeProvider.DualReadMode mode) { - return String.format("%s%s-%s-LoadBalancerStrategy", getServicePrefixForLBPropertyJmxNames(serviceName), serviceName, scheme); + return String.format("%s%s-%s-LoadBalancerStrategy", getServicePrefixForLBPropertyJmxNames(serviceName, mode), serviceName, scheme); } }); } @@ -240,7 +247,7 @@ public void setZkUriRegistry(ZooKeeperEphemeralStore uriRegistry) { _log.warn("Setting ZkUriRegistry for Non-ZK source type: {}", _discoverySourceType); } - final String jmxName = String.format("%s-ZooKeeperUriRegistry", getGlobalPrefix()); + final String jmxName = String.format("%s-ZooKeeperUriRegistry", getGlobalPrefix(null)); _jmxManager.registerZooKeeperEphemeralStore(jmxName, uriRegistry); } @@ -250,7 +257,7 @@ public void setZkClusterRegistry(ZooKeeperPermanentStore clusterRegistry) { _log.warn("Setting ZkClusterRegistry for Non-ZK source type: {}", _discoverySourceType); } - final String jmxName = String.format("%s-ZooKeeperClusterRegistry", getGlobalPrefix()); + final String jmxName = String.format("%s-ZooKeeperClusterRegistry", getGlobalPrefix(null)); _jmxManager.registerZooKeeperPermanentStore(jmxName, clusterRegistry); } @@ -260,26 +267,26 @@ public void setZkServiceRegistry(ZooKeeperPermanentStore serviceRegistry) { _log.warn("Setting ZkServiceRegistry for Non-ZK source type: {}", _discoverySourceType); } - final String jmxName = String.format("%s-ZooKeeperServiceRegistry", getGlobalPrefix()); + final String jmxName = String.format("%s-ZooKeeperServiceRegistry", getGlobalPrefix(null)); _jmxManager.registerZooKeeperPermanentStore(jmxName, serviceRegistry); } public void setFsUriStore(FileStore uriStore) { _watcherManager.updateWatcherForFileStoreUriProperties(uriStore, this::doRegisterUriFileStore); - doRegisterUriFileStore(uriStore); + doRegisterUriFileStore(uriStore, null); } public void setFsClusterStore(FileStore clusterStore) { _watcherManager.updateWatcherForFileStoreClusterProperties(clusterStore, this::doRegisterClusterFileStore); - doRegisterClusterFileStore(clusterStore); + doRegisterClusterFileStore(clusterStore, null); } public void setFsServiceStore(FileStore serviceStore) { _watcherManager.updateWatcherForFileStoreServiceProperties(serviceStore, this::doRegisterServiceFileStore); - doRegisterServiceFileStore(serviceStore); + doRegisterServiceFileStore(serviceStore, null); } public void registerDualReadLoadBalancerJmx(DualReadLoadBalancerJmx dualReadLoadBalancerJmx) @@ -288,83 +295,86 @@ public void registerDualReadLoadBalancerJmx(DualReadLoadBalancerJmx dualReadLoad { _log.warn("Setting DualReadLoadBalancerJmx for Non-XDS source type: {}", _discoverySourceType); } - final String jmxName = String.format("%s-DualReadLoadBalancerJmx", getGlobalPrefix()); + final String jmxName = String.format("%s-DualReadLoadBalancerJmx", getGlobalPrefix(null)); _jmxManager.registerDualReadLoadBalancerJmxBean(jmxName, dualReadLoadBalancerJmx); } - private void doRegisterLoadBalancer(SimpleLoadBalancer balancer) + private void doRegisterLoadBalancer(SimpleLoadBalancer balancer, @Nullable DualReadModeProvider.DualReadMode mode) { - final String jmxName = String.format("%s-LoadBalancer", getGlobalPrefix()); + final String jmxName = String.format("%s-LoadBalancer", getGlobalPrefix(mode)); _jmxManager.registerLoadBalancer(jmxName, balancer); } - private void doRegisterLoadBalancerState(SimpleLoadBalancerState state) + private void doRegisterLoadBalancerState(SimpleLoadBalancerState state, @Nullable DualReadModeProvider.DualReadMode mode) { - final String jmxName = String.format("%s-LoadBalancerState", getGlobalPrefix()); + final String jmxName = String.format("%s-LoadBalancerState", getGlobalPrefix(mode)); _jmxManager.registerLoadBalancerState(jmxName, state); } - private void doRegisterUriFileStore(FileStore uriStore) + private void doRegisterUriFileStore(FileStore uriStore, @Nullable DualReadModeProvider.DualReadMode mode) { - final String jmxName = String.format("%s-FileStoreUriStore", getGlobalPrefix()); + final String jmxName = String.format("%s-FileStoreUriStore", getGlobalPrefix(mode)); _jmxManager.registerFileStore(jmxName, uriStore); } - private void doRegisterClusterFileStore(FileStore clusterStore) + private void doRegisterClusterFileStore(FileStore clusterStore, @Nullable DualReadModeProvider.DualReadMode mode) { - final String jmxName = String.format("%s-FileStoreClusterStore", getGlobalPrefix()); + final String jmxName = String.format("%s-FileStoreClusterStore", getGlobalPrefix(mode)); _jmxManager.registerFileStore(jmxName, clusterStore); } - private void doRegisterServiceFileStore(FileStore serviceStore) + private void doRegisterServiceFileStore(FileStore serviceStore, @Nullable DualReadModeProvider.DualReadMode mode) { - final String jmxName = String.format("%s-FileStoreServiceStore", getGlobalPrefix()); + final String jmxName = String.format("%s-FileStoreServiceStore", getGlobalPrefix(mode)); _jmxManager.registerFileStore(jmxName, serviceStore); } - private String getGlobalPrefix() + // mode is null when the dual read mode is unknown and needs to be fetched from dual read manager + private String getGlobalPrefix(@Nullable DualReadModeProvider.DualReadMode mode) { - return isGlobalPrimarySource() ? _primaryGlobalPrefix : _secondaryGlobalPrefix; + return isGlobalPrimarySource(mode) ? _primaryGlobalPrefix : _secondaryGlobalPrefix; } - private String getServicePrefixForLBPropertyJmxNames(String serviceName) + // mode is null when the dual read mode is unknown and needs to be fetched from dual read manager + private String getServicePrefixForLBPropertyJmxNames(String serviceName, @Nullable DualReadModeProvider.DualReadMode mode) { - return isServicePrimarySource(serviceName) ? _primaryPrefixForLbPropertyJmxName : _secondaryPrefixForLbPropertyJmxName; + return isServicePrimarySource(serviceName, mode) ? PRIMARY_PREFIX_FOR_LB_PROPERTY_JMX_NAME : _secondaryPrefixForLbPropertyJmxName; } - private String getClusterPrefixForLBPropertyJmxNames(String clusterName) + // mode is null when the dual read mode is unknown and needs to be fetched from dual read manager + private String getClusterPrefixForLBPropertyJmxNames(String clusterName, @Nullable DualReadModeProvider.DualReadMode mode) { - return isClusterPrimarySource(clusterName) ? _primaryPrefixForLbPropertyJmxName : _secondaryPrefixForLbPropertyJmxName; + return isClusterPrimarySource(clusterName, mode) ? PRIMARY_PREFIX_FOR_LB_PROPERTY_JMX_NAME : _secondaryPrefixForLbPropertyJmxName; } - private boolean isGlobalPrimarySource() + private boolean isGlobalPrimarySource(@Nullable DualReadModeProvider.DualReadMode mode) { if (_dualReadStateManager == null) { return true; // only one source, it is the primary. } - return isPrimarySourceHelper(_dualReadStateManager.getGlobalDualReadMode()); + return isPrimarySourceHelper(mode == null ? _dualReadStateManager.getGlobalDualReadMode() : mode); } - private boolean isServicePrimarySource(String serviceName) + private boolean isServicePrimarySource(String serviceName, @Nullable DualReadModeProvider.DualReadMode mode) { if (_dualReadStateManager == null) { return true; // only one source, it is the primary. } - return isPrimarySourceHelper(_dualReadStateManager.getServiceDualReadMode(serviceName)); + return isPrimarySourceHelper(mode == null ? _dualReadStateManager.getServiceDualReadMode(serviceName) : mode); } - private boolean isClusterPrimarySource(String clusterName) + private boolean isClusterPrimarySource(String clusterName, @Nullable DualReadModeProvider.DualReadMode mode) { if (_dualReadStateManager == null) { return true; // only one source, it is the primary. } - return isPrimarySourceHelper(_dualReadStateManager.getClusterDualReadMode(clusterName)); + return isPrimarySourceHelper(mode == null ? _dualReadStateManager.getClusterDualReadMode(clusterName) : mode); } - private boolean isPrimarySourceHelper(DualReadModeProvider.DualReadMode dualReadMode) + private boolean isPrimarySourceHelper(@Nonnull DualReadModeProvider.DualReadMode dualReadMode) { switch (dualReadMode) { diff --git a/d2/src/main/java/com/linkedin/d2/jmx/DefaultD2ClientJmxDualReadModeWatcherManagerImpl.java b/d2/src/main/java/com/linkedin/d2/jmx/DefaultD2ClientJmxDualReadModeWatcherManagerImpl.java new file mode 100644 index 0000000000..d40ad8075f --- /dev/null +++ b/d2/src/main/java/com/linkedin/d2/jmx/DefaultD2ClientJmxDualReadModeWatcherManagerImpl.java @@ -0,0 +1,192 @@ +/* + Copyright (c) 2023 LinkedIn Corp. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package com.linkedin.d2.jmx; + +import com.linkedin.d2.balancer.LoadBalancerStateItem; +import com.linkedin.d2.balancer.dualread.DualReadModeProvider; +import com.linkedin.d2.balancer.dualread.DualReadStateManager; +import com.linkedin.d2.balancer.properties.ClusterProperties; +import com.linkedin.d2.balancer.properties.ServiceProperties; +import com.linkedin.d2.balancer.properties.UriProperties; +import com.linkedin.d2.balancer.simple.ClusterInfoItem; +import com.linkedin.d2.balancer.simple.SimpleLoadBalancer; +import com.linkedin.d2.balancer.simple.SimpleLoadBalancerState; +import com.linkedin.d2.balancer.strategies.LoadBalancerStrategy; +import com.linkedin.d2.discovery.stores.file.FileStore; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.function.BiConsumer; +import javax.annotation.Nonnull; + + +/** + * Default watcher manager impl that add/update/remove watchers. + */ +public class DefaultD2ClientJmxDualReadModeWatcherManagerImpl implements D2ClientJmxDualReadModeWatcherManager +{ + + private final DualReadStateManager _dualReadStateManager; + + private D2ClientJmxDualReadModeWatcher _lbDualReadModeWatcher; + private D2ClientJmxDualReadModeWatcher _lbStateDualReadModeWatcher; + private D2ClientJmxDualReadModeWatcher> _fileStoreUriPropertiesDualReadModeWatcher; + private D2ClientJmxDualReadModeWatcher> _fileStoreClusterPropertiesDualReadModeWatcher; + private D2ClientJmxDualReadModeWatcher> _fileStoreServicePropertiesDualReadModeWatcher; + private final ConcurrentMap>> + _servicePropertiesDualReadModeWatchers; + private final ConcurrentMap> _lbStrategyDualReadModeWatchers; + private final ConcurrentMap> _clusterInfoDualReadModeWatchers; + + public DefaultD2ClientJmxDualReadModeWatcherManagerImpl(@Nonnull DualReadStateManager dualReadStateManager) + { + _dualReadStateManager = dualReadStateManager; + _lbDualReadModeWatcher = null; + _lbStateDualReadModeWatcher = null; + _fileStoreUriPropertiesDualReadModeWatcher = null; + _fileStoreClusterPropertiesDualReadModeWatcher = null; + _fileStoreServicePropertiesDualReadModeWatcher = null; + _servicePropertiesDualReadModeWatchers = new ConcurrentHashMap<>(); + _lbStrategyDualReadModeWatchers = new ConcurrentHashMap<>(); + _clusterInfoDualReadModeWatchers = new ConcurrentHashMap<>(); + } + + public void updateWatcher(SimpleLoadBalancer balancer, BiConsumer callback) + { + if (_lbDualReadModeWatcher == null) + { + _lbDualReadModeWatcher = new D2ClientJmxDualReadModeWatcher<>(balancer, callback); + _dualReadStateManager.addGlobalWatcher(_lbDualReadModeWatcher); + } + _lbDualReadModeWatcher.setLatestJmxProperty(balancer); + } + + public void updateWatcher(SimpleLoadBalancerState state, BiConsumer callback) + { + if (_lbStateDualReadModeWatcher == null) + { + _lbStateDualReadModeWatcher = new D2ClientJmxDualReadModeWatcher<>(state, callback); + _dualReadStateManager.addGlobalWatcher(_lbStateDualReadModeWatcher); + } + _lbStateDualReadModeWatcher.setLatestJmxProperty(state); + } + + public void updateWatcher(String serviceName, String scheme, LoadBalancerStrategy strategy, + BiConsumer callback) + { + D2ClientJmxDualReadModeWatcher currentWatcher = + _lbStrategyDualReadModeWatchers.computeIfAbsent(getWatcherNameForLoadBalancerStrategy(serviceName, scheme), k -> + { + D2ClientJmxDualReadModeWatcher watcher = new D2ClientJmxDualReadModeWatcher<>(strategy, callback); + _dualReadStateManager.addServiceWatcher(serviceName, watcher); + return watcher; + }); + currentWatcher.setLatestJmxProperty(strategy); + } + + public void updateWatcher(String clusterName, ClusterInfoItem clusterInfoItem, + BiConsumer callback) + { + D2ClientJmxDualReadModeWatcher currentWatcher = + _clusterInfoDualReadModeWatchers.computeIfAbsent(clusterName, k -> + { + D2ClientJmxDualReadModeWatcher watcher = new D2ClientJmxDualReadModeWatcher<>(clusterInfoItem, callback); + _dualReadStateManager.addClusterWatcher(clusterName, watcher); + return watcher; + }); + currentWatcher.setLatestJmxProperty(clusterInfoItem); + } + + public void updateWatcher(String serviceName, LoadBalancerStateItem serviceProperties, + BiConsumer, DualReadModeProvider.DualReadMode> callback) + { + D2ClientJmxDualReadModeWatcher> currentWatcher = + _servicePropertiesDualReadModeWatchers.computeIfAbsent(serviceName, k -> + { + D2ClientJmxDualReadModeWatcher> watcher = + new D2ClientJmxDualReadModeWatcher<>(serviceProperties, callback); + _dualReadStateManager.addServiceWatcher(serviceName, watcher); + return watcher; + }); + currentWatcher.setLatestJmxProperty(serviceProperties); + } + + public void updateWatcherForFileStoreUriProperties(FileStore uriStore, + BiConsumer, DualReadModeProvider.DualReadMode> callback) + { + if (_fileStoreUriPropertiesDualReadModeWatcher == null) + { + _fileStoreUriPropertiesDualReadModeWatcher = new D2ClientJmxDualReadModeWatcher<>(uriStore, callback); + _dualReadStateManager.addGlobalWatcher(_fileStoreUriPropertiesDualReadModeWatcher); + } + _fileStoreUriPropertiesDualReadModeWatcher.setLatestJmxProperty(uriStore); + } + + public void updateWatcherForFileStoreClusterProperties(FileStore clusterStore, + BiConsumer, DualReadModeProvider.DualReadMode> callback) + { + if (_fileStoreClusterPropertiesDualReadModeWatcher == null) + { + _fileStoreClusterPropertiesDualReadModeWatcher = new D2ClientJmxDualReadModeWatcher<>(clusterStore, callback); + _dualReadStateManager.addGlobalWatcher(_fileStoreClusterPropertiesDualReadModeWatcher); + } + _fileStoreClusterPropertiesDualReadModeWatcher.setLatestJmxProperty(clusterStore); + } + + public void updateWatcherForFileStoreServiceProperties(FileStore serviceStore, + BiConsumer, DualReadModeProvider.DualReadMode> callback) + { + if (_fileStoreServicePropertiesDualReadModeWatcher == null) + { + _fileStoreServicePropertiesDualReadModeWatcher = new D2ClientJmxDualReadModeWatcher<>(serviceStore, callback); + _dualReadStateManager.addGlobalWatcher(_fileStoreServicePropertiesDualReadModeWatcher); + } + _fileStoreServicePropertiesDualReadModeWatcher.setLatestJmxProperty(serviceStore); + } + + public void removeWatcherForLoadBalancerStrategy(String serviceName, String scheme) + { + DualReadStateManager.DualReadModeWatcher watcher = _lbStrategyDualReadModeWatchers.remove( + getWatcherNameForLoadBalancerStrategy(serviceName, scheme)); + if (watcher != null) + { + _dualReadStateManager.removeServiceWatcher(serviceName, watcher); + } + } + + public void removeWatcherForClusterInfoItem(String clusterName) + { + DualReadStateManager.DualReadModeWatcher watcher = _clusterInfoDualReadModeWatchers.remove(clusterName); + if (watcher != null) + { + _dualReadStateManager.removeClusterWatcher(clusterName, watcher); + } + } + + public void removeWatcherForServiceProperties(String serviceName) + { + DualReadStateManager.DualReadModeWatcher watcher = _servicePropertiesDualReadModeWatchers.remove(serviceName); + if (watcher != null) + { + _dualReadStateManager.removeServiceWatcher(serviceName, watcher); + } + } + + private String getWatcherNameForLoadBalancerStrategy(String serviceName, String scheme) + { + return String.format("%s-%s", serviceName, scheme); + } +} diff --git a/d2/src/main/java/com/linkedin/d2/jmx/NoOpD2ClientJmxDualReadModeWatcherManagerImpl.java b/d2/src/main/java/com/linkedin/d2/jmx/NoOpD2ClientJmxDualReadModeWatcherManagerImpl.java new file mode 100644 index 0000000000..366c583a50 --- /dev/null +++ b/d2/src/main/java/com/linkedin/d2/jmx/NoOpD2ClientJmxDualReadModeWatcherManagerImpl.java @@ -0,0 +1,100 @@ +/* + Copyright (c) 2023 LinkedIn Corp. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package com.linkedin.d2.jmx; + +import com.linkedin.d2.balancer.LoadBalancerStateItem; +import com.linkedin.d2.balancer.dualread.DualReadModeProvider; +import com.linkedin.d2.balancer.properties.ClusterProperties; +import com.linkedin.d2.balancer.properties.ServiceProperties; +import com.linkedin.d2.balancer.properties.UriProperties; +import com.linkedin.d2.balancer.simple.ClusterInfoItem; +import com.linkedin.d2.balancer.simple.SimpleLoadBalancer; +import com.linkedin.d2.balancer.simple.SimpleLoadBalancerState; +import com.linkedin.d2.balancer.strategies.LoadBalancerStrategy; +import com.linkedin.d2.discovery.stores.file.FileStore; +import java.util.function.BiConsumer; + + +/** + * No-op manager impl does nothing. Could be used when dual read load balancer is not in use. + */ +public class NoOpD2ClientJmxDualReadModeWatcherManagerImpl implements D2ClientJmxDualReadModeWatcherManager +{ + + @Override + public void updateWatcher(SimpleLoadBalancer balancer, + BiConsumer callback) + { + } + + @Override + public void updateWatcher(SimpleLoadBalancerState state, + BiConsumer callback) + { + } + + @Override + public void updateWatcher(String serviceName, String scheme, LoadBalancerStrategy strategy, + BiConsumer callback) + { + } + + @Override + public void updateWatcher(String clusterName, ClusterInfoItem clusterInfoItem, + BiConsumer callback) + { + } + + @Override + public void updateWatcher(String serviceName, LoadBalancerStateItem serviceProperties, + BiConsumer, DualReadModeProvider.DualReadMode> callback) + { + } + + @Override + public void updateWatcherForFileStoreUriProperties(FileStore uriStore, + BiConsumer, DualReadModeProvider.DualReadMode> callback) + { + } + + @Override + public void updateWatcherForFileStoreClusterProperties(FileStore clusterStore, + BiConsumer, DualReadModeProvider.DualReadMode> callback) + { + } + + @Override + public void updateWatcherForFileStoreServiceProperties(FileStore serviceStore, + BiConsumer, DualReadModeProvider.DualReadMode> callback) + { + } + + @Override + public void removeWatcherForLoadBalancerStrategy(String serviceName, String scheme) + { + } + + @Override + public void removeWatcherForClusterInfoItem(String clusterName) + { + } + + @Override + public void removeWatcherForServiceProperties(String serviceName) + { + } +}