diff --git a/CHANGELOG.md b/CHANGELOG.md index f7e7b3f27a..a47f99f11d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,9 @@ and what APIs have changed, if applicable. ## [Unreleased] +## [29.44.0] - 2023-08-06 +- dynamically switch jmx/sensor names based on dual read mode and source type + ## [29.43.11] - 2023-08-01 - fix logging issues about observer host and dual read mode @@ -5512,7 +5515,8 @@ patch operations can re-use these classes for generating patch messages. ## [0.14.1] -[Unreleased]: https://github.com/linkedin/rest.li/compare/v29.43.11...master +[Unreleased]: https://github.com/linkedin/rest.li/compare/v29.44.0...master +[29.44.0]: https://github.com/linkedin/rest.li/compare/v29.43.11...v29.44.0 [29.43.11]: https://github.com/linkedin/rest.li/compare/v29.43.10...v29.43.11 [29.43.10]: https://github.com/linkedin/rest.li/compare/v29.43.9...v29.43.10 [29.43.9]: https://github.com/linkedin/rest.li/compare/v29.43.8...v29.43.9 diff --git a/d2/src/main/java/com/linkedin/d2/balancer/LastSeenBalancerWithFacilitiesFactory.java b/d2/src/main/java/com/linkedin/d2/balancer/LastSeenBalancerWithFacilitiesFactory.java index 86c83d4003..92e6badba8 100644 --- a/d2/src/main/java/com/linkedin/d2/balancer/LastSeenBalancerWithFacilitiesFactory.java +++ b/d2/src/main/java/com/linkedin/d2/balancer/LastSeenBalancerWithFacilitiesFactory.java @@ -59,7 +59,8 @@ public LoadBalancerWithFacilities create(D2ClientConfig config) { LOG.info("Creating D2 LoadBalancer based on LastSeenLoadBalancerWithFacilities"); - D2ClientJmxManager d2ClientJmxManager = new D2ClientJmxManager(config.d2JmxManagerPrefix, config.jmxManager); + D2ClientJmxManager d2ClientJmxManager = new D2ClientJmxManager(config.d2JmxManagerPrefix, config.jmxManager, + D2ClientJmxManager.DiscoverySourceType.ZK, config.dualReadStateManager); // init connection ZKConnectionBuilder zkConnectionBuilder = new ZKConnectionBuilder(config.zkHosts); diff --git a/d2/src/main/java/com/linkedin/d2/balancer/ZKFSLoadBalancerWithFacilitiesFactory.java b/d2/src/main/java/com/linkedin/d2/balancer/ZKFSLoadBalancerWithFacilitiesFactory.java index 6c6046e12a..7b666c5bf3 100644 --- a/d2/src/main/java/com/linkedin/d2/balancer/ZKFSLoadBalancerWithFacilitiesFactory.java +++ b/d2/src/main/java/com/linkedin/d2/balancer/ZKFSLoadBalancerWithFacilitiesFactory.java @@ -73,7 +73,8 @@ private ZKFSLoadBalancer.TogglingLoadBalancerFactory createLoadBalancerFactory(D loadBalancerComponentFactory = config.componentFactory; } - D2ClientJmxManager d2ClientJmxManager = new D2ClientJmxManager(config.d2JmxManagerPrefix, config.jmxManager); + D2ClientJmxManager d2ClientJmxManager = new D2ClientJmxManager(config.d2JmxManagerPrefix, config.jmxManager, + D2ClientJmxManager.DiscoverySourceType.ZK, config.dualReadStateManager); return new ZKFSTogglingLoadBalancerFactoryImpl(loadBalancerComponentFactory, config.lbWaitTimeout, diff --git a/d2/src/main/java/com/linkedin/d2/balancer/dualread/DualReadStateManager.java b/d2/src/main/java/com/linkedin/d2/balancer/dualread/DualReadStateManager.java index 8f50401599..2ea8df28a3 100644 --- a/d2/src/main/java/com/linkedin/d2/balancer/dualread/DualReadStateManager.java +++ b/d2/src/main/java/com/linkedin/d2/balancer/dualread/DualReadStateManager.java @@ -22,9 +22,11 @@ import com.linkedin.d2.balancer.properties.UriProperties; import com.linkedin.util.clock.Clock; import com.linkedin.util.clock.SystemClock; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ScheduledExecutorService; +import javax.annotation.Nonnull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -32,7 +34,6 @@ /** * Checks and manages the global and per-service dual read state. * Provides monitoring of the dual read data. - * * The dual read state is broken down into global and per-service state. Per-service dual read * mode has a higher priority. Only if per-service dual read mode is not defined, global * dual read mode will be used. @@ -52,6 +53,10 @@ public class DualReadStateManager private final RateLimiter _rateLimiter; // Stores global dual read mode private volatile DualReadModeProvider.DualReadMode _dualReadMode = DualReadModeProvider.DualReadMode.OLD_LB_ONLY; + private final Set _globalDualReadModeWatchers; + private final ConcurrentMap> _serviceDualReadModeWatchers; + private final ConcurrentMap> _clusterDualReadModeWatchers; + private final DualReadLoadBalancerJmx _dualReadLoadBalancerJmx; private final DualReadLoadBalancerMonitor.UriPropertiesDualReadMonitor _uriPropertiesDualReadMonitor; @@ -74,6 +79,9 @@ public DualReadStateManager(DualReadModeProvider dualReadModeProvider, Scheduled _dualReadModeProvider = dualReadModeProvider; _executorService = executorService; _rateLimiter = RateLimiter.create((double) 1 / DUAL_READ_MODE_SWITCH_MIN_INTERVAL); + _globalDualReadModeWatchers = ConcurrentHashMap.newKeySet(); + _serviceDualReadModeWatchers = new ConcurrentHashMap<>(); + _clusterDualReadModeWatchers = new ConcurrentHashMap<>(); } public void updateGlobal(DualReadModeProvider.DualReadMode mode) @@ -82,6 +90,7 @@ public void updateGlobal(DualReadModeProvider.DualReadMode mode) _dualReadMode = mode; if (updated) { LOG.info("Global dual read mode updated: {}", mode); + notifyGlobalWatchers(_dualReadMode); } } @@ -90,6 +99,7 @@ public void updateService(String service, DualReadModeProvider.DualReadMode mode DualReadModeProvider.DualReadMode oldMode = _serviceDualReadModes.put(service, mode); if (oldMode != mode) { LOG.info("Dual read mode for service {} updated: {}", service, mode); + notifyServiceWatchers(service, mode); } } @@ -98,6 +108,7 @@ public void updateCluster(String cluster, DualReadModeProvider.DualReadMode mode DualReadModeProvider.DualReadMode oldMode = _clusterDualReadModes.put(cluster, mode); if (oldMode != mode) { LOG.info("Dual read mode for cluster {} updated: {}", cluster, mode); + notifyClusterWatchers(cluster, mode); } } @@ -198,4 +209,75 @@ public DualReadModeProvider getDualReadModeProvider() { return _dualReadModeProvider; } + + // Add watchers watching for global dual read mode. The watcher will be notified when the global dual read mode changes. + public void addGlobalWatcher(DualReadModeWatcher watcher) + { + _globalDualReadModeWatchers.add(watcher); + } + + // Add watchers watching for dual read mode of a service. The watcher will be notified when the dual read mode changes. + public void addServiceWatcher(String serviceName, DualReadModeWatcher watcher) + { + Set watchers = _serviceDualReadModeWatchers.computeIfAbsent(serviceName, k -> ConcurrentHashMap.newKeySet()); + watchers.add(watcher); + } + + // Add watchers watching for dual read mode of a cluster. The watcher will be notified when the dual read mode changes. + public void addClusterWatcher(String clusterName, DualReadModeWatcher watcher) + { + Set watchers = _clusterDualReadModeWatchers.computeIfAbsent(clusterName, k -> ConcurrentHashMap.newKeySet()); + watchers.add(watcher); + } + + // Remove watchers for dual read mode of a service. + public void removeServiceWatcher(String serviceName, DualReadModeWatcher watcher) + { + Set watchers = _serviceDualReadModeWatchers.get(serviceName); + if (watchers != null) + { + watchers.remove(watcher); + } + } + + // Remove watchers for dual read mode of a cluster. + public void removeClusterWatcher(String clusterName, DualReadModeWatcher watcher) + { + Set watchers = _clusterDualReadModeWatchers.get(clusterName); + if (watchers != null) + { + watchers.remove(watcher); + } + } + + private void notifyGlobalWatchers(DualReadModeProvider.DualReadMode mode) + { + notifyWatchers(_globalDualReadModeWatchers, mode); + } + + private void notifyServiceWatchers(String serviceName, DualReadModeProvider.DualReadMode mode) + { + notifyWatchers(_serviceDualReadModeWatchers.get(serviceName), mode); + } + + private void notifyClusterWatchers(String clusterName, DualReadModeProvider.DualReadMode mode) + { + notifyWatchers(_clusterDualReadModeWatchers.get(clusterName), mode); + } + + private static void notifyWatchers(Set watchers, DualReadModeProvider.DualReadMode mode) + { + if (watchers != null) + { + for (DualReadModeWatcher w : watchers) + { + w.onChanged(mode); + } + } + } + + public interface DualReadModeWatcher + { + void onChanged(@Nonnull DualReadModeProvider.DualReadMode mode); + } } diff --git a/d2/src/main/java/com/linkedin/d2/jmx/D2ClientJmxDualReadModeWatcherManager.java b/d2/src/main/java/com/linkedin/d2/jmx/D2ClientJmxDualReadModeWatcherManager.java new file mode 100644 index 0000000000..fc4ffd865b --- /dev/null +++ b/d2/src/main/java/com/linkedin/d2/jmx/D2ClientJmxDualReadModeWatcherManager.java @@ -0,0 +1,96 @@ +/* + Copyright (c) 2023 LinkedIn Corp. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package com.linkedin.d2.jmx; + +import com.linkedin.d2.balancer.LoadBalancerStateItem; +import com.linkedin.d2.balancer.dualread.DualReadModeProvider; +import com.linkedin.d2.balancer.dualread.DualReadStateManager; +import com.linkedin.d2.balancer.properties.ClusterProperties; +import com.linkedin.d2.balancer.properties.ServiceProperties; +import com.linkedin.d2.balancer.properties.UriProperties; +import com.linkedin.d2.balancer.simple.ClusterInfoItem; +import com.linkedin.d2.balancer.simple.SimpleLoadBalancer; +import com.linkedin.d2.balancer.simple.SimpleLoadBalancerState; +import com.linkedin.d2.balancer.strategies.LoadBalancerStrategy; +import com.linkedin.d2.discovery.stores.file.FileStore; +import java.util.function.BiConsumer; +import javax.annotation.Nonnull; + + +/** + * Manage d2 client jmx dual read mode watchers for different types of load balancing related properties. + */ +public interface D2ClientJmxDualReadModeWatcherManager +{ + + void updateWatcher(SimpleLoadBalancer balancer, BiConsumer callback); + + void updateWatcher(SimpleLoadBalancerState state, BiConsumer callback); + + void updateWatcher(String serviceName, String scheme, LoadBalancerStrategy strategy, + BiConsumer callback); + + void updateWatcher(String clusterName, ClusterInfoItem clusterInfoItem, + BiConsumer callback); + + void updateWatcher(String serviceName, LoadBalancerStateItem serviceProperties, + BiConsumer, DualReadModeProvider.DualReadMode> callback); + + void updateWatcherForFileStoreUriProperties(FileStore uriStore, + BiConsumer, DualReadModeProvider.DualReadMode> callback); + + void updateWatcherForFileStoreClusterProperties(FileStore clusterStore, + BiConsumer, DualReadModeProvider.DualReadMode> callback); + + void updateWatcherForFileStoreServiceProperties(FileStore serviceStore, + BiConsumer, DualReadModeProvider.DualReadMode> callback); + + void removeWatcherForLoadBalancerStrategy(String serviceName, String scheme); + + void removeWatcherForClusterInfoItem(String clusterName); + + void removeWatcherForServiceProperties(String serviceName); + + + final class D2ClientJmxDualReadModeWatcher implements DualReadStateManager.DualReadModeWatcher + { + private T _latestJmxProperty; + private final BiConsumer _callback; + + D2ClientJmxDualReadModeWatcher(T initialJmxProperty, BiConsumercallback) + { + _latestJmxProperty = initialJmxProperty; + _callback = callback; + } + + public T getLatestJmxProperty() + { + return _latestJmxProperty; + } + + public void setLatestJmxProperty(T latestJmxProperty) + { + _latestJmxProperty = latestJmxProperty; + } + + @Override + public void onChanged(@Nonnull DualReadModeProvider.DualReadMode mode) + { + _callback.accept(_latestJmxProperty, mode); + } + } +} diff --git a/d2/src/main/java/com/linkedin/d2/jmx/D2ClientJmxManager.java b/d2/src/main/java/com/linkedin/d2/jmx/D2ClientJmxManager.java index e2e3591d49..a7f02cc14d 100644 --- a/d2/src/main/java/com/linkedin/d2/jmx/D2ClientJmxManager.java +++ b/d2/src/main/java/com/linkedin/d2/jmx/D2ClientJmxManager.java @@ -19,7 +19,11 @@ import com.linkedin.d2.balancer.LoadBalancerStateItem; import com.linkedin.d2.balancer.clients.TrackerClient; import com.linkedin.d2.balancer.dualread.DualReadLoadBalancerJmx; +import com.linkedin.d2.balancer.dualread.DualReadModeProvider; +import com.linkedin.d2.balancer.dualread.DualReadStateManager; +import com.linkedin.d2.balancer.properties.ClusterProperties; import com.linkedin.d2.balancer.properties.ServiceProperties; +import com.linkedin.d2.balancer.properties.UriProperties; import com.linkedin.d2.balancer.simple.ClusterInfoItem; import com.linkedin.d2.balancer.simple.SimpleLoadBalancer; import com.linkedin.d2.balancer.simple.SimpleLoadBalancerState; @@ -30,45 +34,111 @@ import com.linkedin.d2.discovery.stores.zk.ZooKeeperPermanentStore; import com.linkedin.util.ArgumentUtil; import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * JMX manager to register the D2 client components */ public class D2ClientJmxManager { - private final String _prefix; + private static final Logger _log = LoggerFactory.getLogger(D2ClientJmxManager.class); + private final JmxManager _jmxManager; + // Service discovery source type: ZK, xDS, etc. + private final DiscoverySourceType _discoverySourceType; + + /* + When dual read state manager is null, only one discovery source is working (could be a new source other than ZK). We keep using + the same Jmx/sensor names as the ZK one so users can still monitor the same metrics. + + When dual read state manager is not null, it means dual read load balancer is in use, and there are two sets of load balancer, lb + state, and FS backup registering Jmx/sensors for different service discovery sources. + Depending on the specific dual read mode that is dynamically changing, controlled by lix on d2 service level, one source is primary, + the other is secondary. Jmx/sensor names need to be carefully handled to: + 1) for the primary source, use the primary names (the one ZK was using) so users can still monitor the same metrics. + 2) for the secondary source, use different names that include the source type to avoid conflicting the primary names. + */ + private final DualReadStateManager _dualReadStateManager; + + private final String _primaryGlobalPrefix; + + private final String _secondaryGlobalPrefix; + + private static final String PRIMARY_PREFIX_FOR_LB_PROPERTY_JMX_NAME = ""; + + private final String _secondaryPrefixForLbPropertyJmxName; + + private final D2ClientJmxDualReadModeWatcherManager _watcherManager; + + + public enum DiscoverySourceType + { + ZK("ZK"), + XDS("xDS"); + + private final String _printName; + + DiscoverySourceType(String printName) + { + _printName = printName; + } + + public String getPrintName() + { + return _printName; + } + } + public D2ClientJmxManager(String prefix, @Nonnull JmxManager jmxManager) + { + this(prefix, jmxManager, DiscoverySourceType.ZK, null); + } + + public D2ClientJmxManager(String prefix, + @Nonnull JmxManager jmxManager, + @Nonnull DiscoverySourceType discoverySourceType, + @Nullable DualReadStateManager dualReadStateManager) { ArgumentUtil.ensureNotNull(jmxManager,"jmxManager"); - _prefix = prefix; + _primaryGlobalPrefix = prefix; _jmxManager = jmxManager; + _discoverySourceType = discoverySourceType; + _dualReadStateManager = dualReadStateManager; + _secondaryGlobalPrefix = String.format("%s-%s", _primaryGlobalPrefix, _discoverySourceType.getPrintName()); + _secondaryPrefixForLbPropertyJmxName = String.format("%s-", _discoverySourceType.getPrintName()); + _watcherManager = _dualReadStateManager == null ? new NoOpD2ClientJmxDualReadModeWatcherManagerImpl() + : new DefaultD2ClientJmxDualReadModeWatcherManagerImpl(_dualReadStateManager); } public void setSimpleLoadBalancer(SimpleLoadBalancer balancer) { - final String jmxName = _prefix + "-LoadBalancer"; - - _jmxManager.registerLoadBalancer(jmxName, balancer); + _watcherManager.updateWatcher(balancer, this::doRegisterLoadBalancer); + doRegisterLoadBalancer(balancer, null); } public void setSimpleLoadBalancerState(SimpleLoadBalancerState state) { - _jmxManager.registerLoadBalancerState(_prefix + "-LoadBalancerState", state); + _watcherManager.updateWatcher(state, this::doRegisterLoadBalancerState); + doRegisterLoadBalancerState(state, null); state.register(new SimpleLoadBalancerStateListener() { @Override public void onStrategyAdded(String serviceName, String scheme, LoadBalancerStrategy strategy) { - _jmxManager.registerLoadBalancerStrategy(getLoadBalancerStrategyJmxName(serviceName, scheme), strategy); + _watcherManager.updateWatcher(serviceName, scheme, strategy, + (item, mode) -> doRegisterLoadBalancerStrategy(serviceName, scheme, item, mode)); + doRegisterLoadBalancerStrategy(serviceName, scheme, strategy, null); } @Override public void onStrategyRemoved(String serviceName, String scheme, LoadBalancerStrategy strategy) { - _jmxManager.unregister(getLoadBalancerStrategyJmxName(serviceName, scheme)); + _watcherManager.removeWatcherForLoadBalancerStrategy(serviceName, scheme); + _jmxManager.unregister(getLoadBalancerStrategyJmxName(serviceName, scheme, null)); } @Override @@ -90,10 +160,12 @@ public void onClientRemoved(String clusterName, TrackerClient client) public void onClusterInfoUpdate(ClusterInfoItem clusterInfoItem) { if (clusterInfoItem != null && clusterInfoItem.getClusterPropertiesItem() != null - && clusterInfoItem.getClusterPropertiesItem().getProperty() != null) { - _jmxManager.registerClusterInfo( - getClusterInfoJmxName(clusterInfoItem.getClusterPropertiesItem().getProperty().getClusterName()), - clusterInfoItem); + && clusterInfoItem.getClusterPropertiesItem().getProperty() != null) + { + String clusterName = clusterInfoItem.getClusterPropertiesItem().getProperty().getClusterName(); + _watcherManager.updateWatcher(clusterName, clusterInfoItem, + (item, mode) -> doRegisterClusterInfo(clusterName, item, mode)); + doRegisterClusterInfo(clusterName, clusterInfoItem, null); } } @@ -101,81 +173,219 @@ public void onClusterInfoUpdate(ClusterInfoItem clusterInfoItem) public void onClusterInfoRemoval(ClusterInfoItem clusterInfoItem) { if (clusterInfoItem != null && clusterInfoItem.getClusterPropertiesItem() != null - && clusterInfoItem.getClusterPropertiesItem().getProperty() != null) { - _jmxManager.unregister( - getClusterInfoJmxName(clusterInfoItem.getClusterPropertiesItem().getProperty().getClusterName()) - ); + && clusterInfoItem.getClusterPropertiesItem().getProperty() != null) + { + String clusterName = clusterInfoItem.getClusterPropertiesItem().getProperty().getClusterName(); + _watcherManager.removeWatcherForClusterInfoItem(clusterName); + _jmxManager.unregister(getClusterInfoJmxName(clusterName, null)); } } @Override public void onServicePropertiesUpdate(LoadBalancerStateItem serviceProperties) { - if (serviceProperties != null && serviceProperties.getProperty() != null) { - _jmxManager.registerServiceProperties( - getServicePropertiesJmxName(serviceProperties.getProperty().getServiceName()), - serviceProperties); + if (serviceProperties != null && serviceProperties.getProperty() != null) + { + String serviceName = serviceProperties.getProperty().getServiceName(); + _watcherManager.updateWatcher(serviceName, serviceProperties, + (item, mode) -> doRegisterServiceProperties(serviceName, item, mode)); + doRegisterServiceProperties(serviceName, serviceProperties, null); } } - @Override public void onServicePropertiesRemoval(LoadBalancerStateItem serviceProperties) { - if (serviceProperties != null && serviceProperties.getProperty() != null) { - _jmxManager.unregister(getServicePropertiesJmxName(serviceProperties.getProperty().getServiceName())); + if (serviceProperties != null && serviceProperties.getProperty() != null) + { + String serviceName = serviceProperties.getProperty().getServiceName(); + _watcherManager.removeWatcherForServiceProperties(serviceName); + _jmxManager.unregister(getServicePropertiesJmxName(serviceName, null)); } } - private String getClusterInfoJmxName(String clusterName) + private void doRegisterLoadBalancerStrategy(String serviceName, String scheme, LoadBalancerStrategy strategy, + @Nullable DualReadModeProvider.DualReadMode mode) + { + String jmxName = getLoadBalancerStrategyJmxName(serviceName, scheme, mode); + _jmxManager.registerLoadBalancerStrategy(jmxName, strategy); + } + + private void doRegisterClusterInfo(String clusterName, ClusterInfoItem clusterInfoItem, + @Nullable DualReadModeProvider.DualReadMode mode) + { + String jmxName = getClusterInfoJmxName(clusterName, mode); + _jmxManager.registerClusterInfo(jmxName, clusterInfoItem); + } + + private void doRegisterServiceProperties(String serviceName, LoadBalancerStateItem serviceProperties, + @Nullable DualReadModeProvider.DualReadMode mode) { - return String.format("%s-ClusterInfo", clusterName); + _jmxManager.registerServiceProperties(getServicePropertiesJmxName(serviceName, mode), serviceProperties); } - private String getServicePropertiesJmxName(String serviceName) + private String getClusterInfoJmxName(String clusterName, @Nullable DualReadModeProvider.DualReadMode mode) { - return String.format("%s-ServiceProperties", serviceName); + return String.format("%s%s-ClusterInfo", getClusterPrefixForLBPropertyJmxNames(clusterName, mode), clusterName); } - private String getLoadBalancerStrategyJmxName(String serviceName, String scheme) + private String getServicePropertiesJmxName(String serviceName, @Nullable DualReadModeProvider.DualReadMode mode) { - return serviceName + "-" + scheme + "-LoadBalancerStrategy"; + return String.format("%s%s-ServiceProperties", getServicePrefixForLBPropertyJmxNames(serviceName, mode), serviceName); + } + + private String getLoadBalancerStrategyJmxName(String serviceName, String scheme, @Nullable DualReadModeProvider.DualReadMode mode) + { + return String.format("%s%s-%s-LoadBalancerStrategy", getServicePrefixForLBPropertyJmxNames(serviceName, mode), serviceName, scheme); } }); } public void setZkUriRegistry(ZooKeeperEphemeralStore uriRegistry) { - _jmxManager.registerZooKeeperEphemeralStore(_prefix + "-ZooKeeperUriRegistry", uriRegistry); + if (_discoverySourceType != DiscoverySourceType.ZK) + { + _log.warn("Setting ZkUriRegistry for Non-ZK source type: {}", _discoverySourceType); + } + final String jmxName = String.format("%s-ZooKeeperUriRegistry", getGlobalPrefix(null)); + _jmxManager.registerZooKeeperEphemeralStore(jmxName, uriRegistry); } public void setZkClusterRegistry(ZooKeeperPermanentStore clusterRegistry) { - _jmxManager.registerZooKeeperPermanentStore(_prefix + "-ZooKeeperClusterRegistry", clusterRegistry); + if (_discoverySourceType != DiscoverySourceType.ZK) + { + _log.warn("Setting ZkClusterRegistry for Non-ZK source type: {}", _discoverySourceType); + } + final String jmxName = String.format("%s-ZooKeeperClusterRegistry", getGlobalPrefix(null)); + _jmxManager.registerZooKeeperPermanentStore(jmxName, clusterRegistry); } public void setZkServiceRegistry(ZooKeeperPermanentStore serviceRegistry) { - _jmxManager.registerZooKeeperPermanentStore(_prefix + "-ZooKeeperServiceRegistry", serviceRegistry); + if (_discoverySourceType != DiscoverySourceType.ZK) + { + _log.warn("Setting ZkServiceRegistry for Non-ZK source type: {}", _discoverySourceType); + } + final String jmxName = String.format("%s-ZooKeeperServiceRegistry", getGlobalPrefix(null)); + _jmxManager.registerZooKeeperPermanentStore(jmxName, serviceRegistry); } - public void setFsUriStore(FileStore uriStore) + public void setFsUriStore(FileStore uriStore) { - _jmxManager.registerFileStore(_prefix + "-FileStoreUriStore", uriStore); + _watcherManager.updateWatcherForFileStoreUriProperties(uriStore, this::doRegisterUriFileStore); + doRegisterUriFileStore(uriStore, null); } - public void setFsClusterStore(FileStore clusterStore) + public void setFsClusterStore(FileStore clusterStore) { - _jmxManager.registerFileStore(_prefix + "-FileStoreClusterStore", clusterStore); + _watcherManager.updateWatcherForFileStoreClusterProperties(clusterStore, this::doRegisterClusterFileStore); + doRegisterClusterFileStore(clusterStore, null); } - public void setFsServiceStore(FileStore serviceStore) + public void setFsServiceStore(FileStore serviceStore) { - _jmxManager.registerFileStore(_prefix + "-FileStoreServiceStore", serviceStore); + _watcherManager.updateWatcherForFileStoreServiceProperties(serviceStore, this::doRegisterServiceFileStore); + doRegisterServiceFileStore(serviceStore, null); } public void registerDualReadLoadBalancerJmx(DualReadLoadBalancerJmx dualReadLoadBalancerJmx) { - _jmxManager.registerDualReadLoadBalancerJmxBean(_prefix + "-DualReadLoadBalancerJmx", dualReadLoadBalancerJmx); + if (_discoverySourceType != DiscoverySourceType.XDS) + { + _log.warn("Setting DualReadLoadBalancerJmx for Non-XDS source type: {}", _discoverySourceType); + } + final String jmxName = String.format("%s-DualReadLoadBalancerJmx", getGlobalPrefix(null)); + _jmxManager.registerDualReadLoadBalancerJmxBean(jmxName, dualReadLoadBalancerJmx); + } + + private void doRegisterLoadBalancer(SimpleLoadBalancer balancer, @Nullable DualReadModeProvider.DualReadMode mode) + { + final String jmxName = String.format("%s-LoadBalancer", getGlobalPrefix(mode)); + _jmxManager.registerLoadBalancer(jmxName, balancer); + } + + private void doRegisterLoadBalancerState(SimpleLoadBalancerState state, @Nullable DualReadModeProvider.DualReadMode mode) + { + final String jmxName = String.format("%s-LoadBalancerState", getGlobalPrefix(mode)); + _jmxManager.registerLoadBalancerState(jmxName, state); + } + + private void doRegisterUriFileStore(FileStore uriStore, @Nullable DualReadModeProvider.DualReadMode mode) + { + final String jmxName = String.format("%s-FileStoreUriStore", getGlobalPrefix(mode)); + _jmxManager.registerFileStore(jmxName, uriStore); + } + + private void doRegisterClusterFileStore(FileStore clusterStore, @Nullable DualReadModeProvider.DualReadMode mode) + { + final String jmxName = String.format("%s-FileStoreClusterStore", getGlobalPrefix(mode)); + _jmxManager.registerFileStore(jmxName, clusterStore); + } + + private void doRegisterServiceFileStore(FileStore serviceStore, @Nullable DualReadModeProvider.DualReadMode mode) + { + final String jmxName = String.format("%s-FileStoreServiceStore", getGlobalPrefix(mode)); + _jmxManager.registerFileStore(jmxName, serviceStore); + } + + // mode is null when the dual read mode is unknown and needs to be fetched from dual read manager + private String getGlobalPrefix(@Nullable DualReadModeProvider.DualReadMode mode) + { + return isGlobalPrimarySource(mode) ? _primaryGlobalPrefix : _secondaryGlobalPrefix; + } + + // mode is null when the dual read mode is unknown and needs to be fetched from dual read manager + private String getServicePrefixForLBPropertyJmxNames(String serviceName, @Nullable DualReadModeProvider.DualReadMode mode) + { + return isServicePrimarySource(serviceName, mode) ? PRIMARY_PREFIX_FOR_LB_PROPERTY_JMX_NAME : _secondaryPrefixForLbPropertyJmxName; + } + + // mode is null when the dual read mode is unknown and needs to be fetched from dual read manager + private String getClusterPrefixForLBPropertyJmxNames(String clusterName, @Nullable DualReadModeProvider.DualReadMode mode) + { + return isClusterPrimarySource(clusterName, mode) ? PRIMARY_PREFIX_FOR_LB_PROPERTY_JMX_NAME : _secondaryPrefixForLbPropertyJmxName; + } + + private boolean isGlobalPrimarySource(@Nullable DualReadModeProvider.DualReadMode mode) + { + if (_dualReadStateManager == null) + { + return true; // only one source, it is the primary. + } + return isPrimarySourceHelper(mode == null ? _dualReadStateManager.getGlobalDualReadMode() : mode); + } + + private boolean isServicePrimarySource(String serviceName, @Nullable DualReadModeProvider.DualReadMode mode) + { + if (_dualReadStateManager == null) + { + return true; // only one source, it is the primary. + } + return isPrimarySourceHelper(mode == null ? _dualReadStateManager.getServiceDualReadMode(serviceName) : mode); + } + + private boolean isClusterPrimarySource(String clusterName, @Nullable DualReadModeProvider.DualReadMode mode) + { + if (_dualReadStateManager == null) + { + return true; // only one source, it is the primary. + } + return isPrimarySourceHelper(mode == null ? _dualReadStateManager.getClusterDualReadMode(clusterName) : mode); + } + + private boolean isPrimarySourceHelper(@Nonnull DualReadModeProvider.DualReadMode dualReadMode) + { + switch (dualReadMode) + { + case NEW_LB_ONLY: + return _discoverySourceType == DiscoverySourceType.XDS; + case DUAL_READ: + case OLD_LB_ONLY: + return _discoverySourceType == DiscoverySourceType.ZK; + default: + _log.warn("Unknown dual read mode {}, falling back to ZK as primary source.", dualReadMode); + return _discoverySourceType == DiscoverySourceType.ZK; + } } } diff --git a/d2/src/main/java/com/linkedin/d2/jmx/DefaultD2ClientJmxDualReadModeWatcherManagerImpl.java b/d2/src/main/java/com/linkedin/d2/jmx/DefaultD2ClientJmxDualReadModeWatcherManagerImpl.java new file mode 100644 index 0000000000..d40ad8075f --- /dev/null +++ b/d2/src/main/java/com/linkedin/d2/jmx/DefaultD2ClientJmxDualReadModeWatcherManagerImpl.java @@ -0,0 +1,192 @@ +/* + Copyright (c) 2023 LinkedIn Corp. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package com.linkedin.d2.jmx; + +import com.linkedin.d2.balancer.LoadBalancerStateItem; +import com.linkedin.d2.balancer.dualread.DualReadModeProvider; +import com.linkedin.d2.balancer.dualread.DualReadStateManager; +import com.linkedin.d2.balancer.properties.ClusterProperties; +import com.linkedin.d2.balancer.properties.ServiceProperties; +import com.linkedin.d2.balancer.properties.UriProperties; +import com.linkedin.d2.balancer.simple.ClusterInfoItem; +import com.linkedin.d2.balancer.simple.SimpleLoadBalancer; +import com.linkedin.d2.balancer.simple.SimpleLoadBalancerState; +import com.linkedin.d2.balancer.strategies.LoadBalancerStrategy; +import com.linkedin.d2.discovery.stores.file.FileStore; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.function.BiConsumer; +import javax.annotation.Nonnull; + + +/** + * Default watcher manager impl that add/update/remove watchers. + */ +public class DefaultD2ClientJmxDualReadModeWatcherManagerImpl implements D2ClientJmxDualReadModeWatcherManager +{ + + private final DualReadStateManager _dualReadStateManager; + + private D2ClientJmxDualReadModeWatcher _lbDualReadModeWatcher; + private D2ClientJmxDualReadModeWatcher _lbStateDualReadModeWatcher; + private D2ClientJmxDualReadModeWatcher> _fileStoreUriPropertiesDualReadModeWatcher; + private D2ClientJmxDualReadModeWatcher> _fileStoreClusterPropertiesDualReadModeWatcher; + private D2ClientJmxDualReadModeWatcher> _fileStoreServicePropertiesDualReadModeWatcher; + private final ConcurrentMap>> + _servicePropertiesDualReadModeWatchers; + private final ConcurrentMap> _lbStrategyDualReadModeWatchers; + private final ConcurrentMap> _clusterInfoDualReadModeWatchers; + + public DefaultD2ClientJmxDualReadModeWatcherManagerImpl(@Nonnull DualReadStateManager dualReadStateManager) + { + _dualReadStateManager = dualReadStateManager; + _lbDualReadModeWatcher = null; + _lbStateDualReadModeWatcher = null; + _fileStoreUriPropertiesDualReadModeWatcher = null; + _fileStoreClusterPropertiesDualReadModeWatcher = null; + _fileStoreServicePropertiesDualReadModeWatcher = null; + _servicePropertiesDualReadModeWatchers = new ConcurrentHashMap<>(); + _lbStrategyDualReadModeWatchers = new ConcurrentHashMap<>(); + _clusterInfoDualReadModeWatchers = new ConcurrentHashMap<>(); + } + + public void updateWatcher(SimpleLoadBalancer balancer, BiConsumer callback) + { + if (_lbDualReadModeWatcher == null) + { + _lbDualReadModeWatcher = new D2ClientJmxDualReadModeWatcher<>(balancer, callback); + _dualReadStateManager.addGlobalWatcher(_lbDualReadModeWatcher); + } + _lbDualReadModeWatcher.setLatestJmxProperty(balancer); + } + + public void updateWatcher(SimpleLoadBalancerState state, BiConsumer callback) + { + if (_lbStateDualReadModeWatcher == null) + { + _lbStateDualReadModeWatcher = new D2ClientJmxDualReadModeWatcher<>(state, callback); + _dualReadStateManager.addGlobalWatcher(_lbStateDualReadModeWatcher); + } + _lbStateDualReadModeWatcher.setLatestJmxProperty(state); + } + + public void updateWatcher(String serviceName, String scheme, LoadBalancerStrategy strategy, + BiConsumer callback) + { + D2ClientJmxDualReadModeWatcher currentWatcher = + _lbStrategyDualReadModeWatchers.computeIfAbsent(getWatcherNameForLoadBalancerStrategy(serviceName, scheme), k -> + { + D2ClientJmxDualReadModeWatcher watcher = new D2ClientJmxDualReadModeWatcher<>(strategy, callback); + _dualReadStateManager.addServiceWatcher(serviceName, watcher); + return watcher; + }); + currentWatcher.setLatestJmxProperty(strategy); + } + + public void updateWatcher(String clusterName, ClusterInfoItem clusterInfoItem, + BiConsumer callback) + { + D2ClientJmxDualReadModeWatcher currentWatcher = + _clusterInfoDualReadModeWatchers.computeIfAbsent(clusterName, k -> + { + D2ClientJmxDualReadModeWatcher watcher = new D2ClientJmxDualReadModeWatcher<>(clusterInfoItem, callback); + _dualReadStateManager.addClusterWatcher(clusterName, watcher); + return watcher; + }); + currentWatcher.setLatestJmxProperty(clusterInfoItem); + } + + public void updateWatcher(String serviceName, LoadBalancerStateItem serviceProperties, + BiConsumer, DualReadModeProvider.DualReadMode> callback) + { + D2ClientJmxDualReadModeWatcher> currentWatcher = + _servicePropertiesDualReadModeWatchers.computeIfAbsent(serviceName, k -> + { + D2ClientJmxDualReadModeWatcher> watcher = + new D2ClientJmxDualReadModeWatcher<>(serviceProperties, callback); + _dualReadStateManager.addServiceWatcher(serviceName, watcher); + return watcher; + }); + currentWatcher.setLatestJmxProperty(serviceProperties); + } + + public void updateWatcherForFileStoreUriProperties(FileStore uriStore, + BiConsumer, DualReadModeProvider.DualReadMode> callback) + { + if (_fileStoreUriPropertiesDualReadModeWatcher == null) + { + _fileStoreUriPropertiesDualReadModeWatcher = new D2ClientJmxDualReadModeWatcher<>(uriStore, callback); + _dualReadStateManager.addGlobalWatcher(_fileStoreUriPropertiesDualReadModeWatcher); + } + _fileStoreUriPropertiesDualReadModeWatcher.setLatestJmxProperty(uriStore); + } + + public void updateWatcherForFileStoreClusterProperties(FileStore clusterStore, + BiConsumer, DualReadModeProvider.DualReadMode> callback) + { + if (_fileStoreClusterPropertiesDualReadModeWatcher == null) + { + _fileStoreClusterPropertiesDualReadModeWatcher = new D2ClientJmxDualReadModeWatcher<>(clusterStore, callback); + _dualReadStateManager.addGlobalWatcher(_fileStoreClusterPropertiesDualReadModeWatcher); + } + _fileStoreClusterPropertiesDualReadModeWatcher.setLatestJmxProperty(clusterStore); + } + + public void updateWatcherForFileStoreServiceProperties(FileStore serviceStore, + BiConsumer, DualReadModeProvider.DualReadMode> callback) + { + if (_fileStoreServicePropertiesDualReadModeWatcher == null) + { + _fileStoreServicePropertiesDualReadModeWatcher = new D2ClientJmxDualReadModeWatcher<>(serviceStore, callback); + _dualReadStateManager.addGlobalWatcher(_fileStoreServicePropertiesDualReadModeWatcher); + } + _fileStoreServicePropertiesDualReadModeWatcher.setLatestJmxProperty(serviceStore); + } + + public void removeWatcherForLoadBalancerStrategy(String serviceName, String scheme) + { + DualReadStateManager.DualReadModeWatcher watcher = _lbStrategyDualReadModeWatchers.remove( + getWatcherNameForLoadBalancerStrategy(serviceName, scheme)); + if (watcher != null) + { + _dualReadStateManager.removeServiceWatcher(serviceName, watcher); + } + } + + public void removeWatcherForClusterInfoItem(String clusterName) + { + DualReadStateManager.DualReadModeWatcher watcher = _clusterInfoDualReadModeWatchers.remove(clusterName); + if (watcher != null) + { + _dualReadStateManager.removeClusterWatcher(clusterName, watcher); + } + } + + public void removeWatcherForServiceProperties(String serviceName) + { + DualReadStateManager.DualReadModeWatcher watcher = _servicePropertiesDualReadModeWatchers.remove(serviceName); + if (watcher != null) + { + _dualReadStateManager.removeServiceWatcher(serviceName, watcher); + } + } + + private String getWatcherNameForLoadBalancerStrategy(String serviceName, String scheme) + { + return String.format("%s-%s", serviceName, scheme); + } +} diff --git a/d2/src/main/java/com/linkedin/d2/jmx/JmxManager.java b/d2/src/main/java/com/linkedin/d2/jmx/JmxManager.java index ce4f9a52d9..d6ea394b99 100644 --- a/d2/src/main/java/com/linkedin/d2/jmx/JmxManager.java +++ b/d2/src/main/java/com/linkedin/d2/jmx/JmxManager.java @@ -246,8 +246,10 @@ public void unregister(ObjectName oName) { _server.unregisterMBean(oName); } - _registeredNames.remove(oName); - _log.info("Unregistered MBean {}", oName); + if (_registeredNames.remove(oName)) + { + _log.info("Unregistered MBean {}", oName); + } } catch (Exception e) { diff --git a/d2/src/main/java/com/linkedin/d2/jmx/NoOpD2ClientJmxDualReadModeWatcherManagerImpl.java b/d2/src/main/java/com/linkedin/d2/jmx/NoOpD2ClientJmxDualReadModeWatcherManagerImpl.java new file mode 100644 index 0000000000..366c583a50 --- /dev/null +++ b/d2/src/main/java/com/linkedin/d2/jmx/NoOpD2ClientJmxDualReadModeWatcherManagerImpl.java @@ -0,0 +1,100 @@ +/* + Copyright (c) 2023 LinkedIn Corp. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package com.linkedin.d2.jmx; + +import com.linkedin.d2.balancer.LoadBalancerStateItem; +import com.linkedin.d2.balancer.dualread.DualReadModeProvider; +import com.linkedin.d2.balancer.properties.ClusterProperties; +import com.linkedin.d2.balancer.properties.ServiceProperties; +import com.linkedin.d2.balancer.properties.UriProperties; +import com.linkedin.d2.balancer.simple.ClusterInfoItem; +import com.linkedin.d2.balancer.simple.SimpleLoadBalancer; +import com.linkedin.d2.balancer.simple.SimpleLoadBalancerState; +import com.linkedin.d2.balancer.strategies.LoadBalancerStrategy; +import com.linkedin.d2.discovery.stores.file.FileStore; +import java.util.function.BiConsumer; + + +/** + * No-op manager impl does nothing. Could be used when dual read load balancer is not in use. + */ +public class NoOpD2ClientJmxDualReadModeWatcherManagerImpl implements D2ClientJmxDualReadModeWatcherManager +{ + + @Override + public void updateWatcher(SimpleLoadBalancer balancer, + BiConsumer callback) + { + } + + @Override + public void updateWatcher(SimpleLoadBalancerState state, + BiConsumer callback) + { + } + + @Override + public void updateWatcher(String serviceName, String scheme, LoadBalancerStrategy strategy, + BiConsumer callback) + { + } + + @Override + public void updateWatcher(String clusterName, ClusterInfoItem clusterInfoItem, + BiConsumer callback) + { + } + + @Override + public void updateWatcher(String serviceName, LoadBalancerStateItem serviceProperties, + BiConsumer, DualReadModeProvider.DualReadMode> callback) + { + } + + @Override + public void updateWatcherForFileStoreUriProperties(FileStore uriStore, + BiConsumer, DualReadModeProvider.DualReadMode> callback) + { + } + + @Override + public void updateWatcherForFileStoreClusterProperties(FileStore clusterStore, + BiConsumer, DualReadModeProvider.DualReadMode> callback) + { + } + + @Override + public void updateWatcherForFileStoreServiceProperties(FileStore serviceStore, + BiConsumer, DualReadModeProvider.DualReadMode> callback) + { + } + + @Override + public void removeWatcherForLoadBalancerStrategy(String serviceName, String scheme) + { + } + + @Override + public void removeWatcherForClusterInfoItem(String clusterName) + { + } + + @Override + public void removeWatcherForServiceProperties(String serviceName) + { + } +} diff --git a/d2/src/main/java/com/linkedin/d2/xds/balancer/XdsLoadBalancerWithFacilitiesFactory.java b/d2/src/main/java/com/linkedin/d2/xds/balancer/XdsLoadBalancerWithFacilitiesFactory.java index da42df1be3..f5834b73c6 100644 --- a/d2/src/main/java/com/linkedin/d2/xds/balancer/XdsLoadBalancerWithFacilitiesFactory.java +++ b/d2/src/main/java/com/linkedin/d2/xds/balancer/XdsLoadBalancerWithFacilitiesFactory.java @@ -40,8 +40,8 @@ public class XdsLoadBalancerWithFacilitiesFactory implements LoadBalancerWithFac @Override public LoadBalancerWithFacilities create(D2ClientConfig config) { - D2ClientJmxManager d2ClientJmxManager = new D2ClientJmxManager(config.d2JmxManagerPrefix + "-xDS", - config.jmxManager); + D2ClientJmxManager d2ClientJmxManager = new D2ClientJmxManager(config.d2JmxManagerPrefix, config.jmxManager, + D2ClientJmxManager.DiscoverySourceType.XDS, config.dualReadStateManager); if (config.dualReadStateManager != null) { diff --git a/d2/src/test/java/com/linkedin/d2/jmx/D2ClientJmxManagerTest.java b/d2/src/test/java/com/linkedin/d2/jmx/D2ClientJmxManagerTest.java index fa3c8a59c6..14d4028353 100644 --- a/d2/src/test/java/com/linkedin/d2/jmx/D2ClientJmxManagerTest.java +++ b/d2/src/test/java/com/linkedin/d2/jmx/D2ClientJmxManagerTest.java @@ -17,14 +17,21 @@ package com.linkedin.d2.jmx; import com.linkedin.d2.balancer.LoadBalancerStateItem; +import com.linkedin.d2.balancer.dualread.DualReadModeProvider; +import com.linkedin.d2.balancer.dualread.DualReadStateManager; import com.linkedin.d2.balancer.properties.ClusterProperties; import com.linkedin.d2.balancer.properties.ServiceProperties; +import com.linkedin.d2.balancer.properties.UriProperties; import com.linkedin.d2.balancer.simple.ClusterInfoItem; +import com.linkedin.d2.balancer.simple.SimpleLoadBalancer; import com.linkedin.d2.balancer.simple.SimpleLoadBalancerState; +import com.linkedin.d2.balancer.strategies.relative.RelativeLoadBalancerStrategy; import com.linkedin.d2.balancer.util.canary.CanaryDistributionProvider; import com.linkedin.d2.balancer.util.partitions.PartitionAccessor; +import com.linkedin.d2.discovery.stores.file.FileStore; import java.net.URI; import java.util.Collections; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.atomic.AtomicLong; import org.mockito.Captor; import org.mockito.ArgumentCaptor; @@ -32,150 +39,420 @@ import org.mockito.Mockito; import org.mockito.MockitoAnnotations; import org.testng.Assert; -import org.testng.annotations.BeforeMethod; +import org.testng.annotations.DataProvider; import org.testng.annotations.Test; import static org.mockito.Mockito.*; public class D2ClientJmxManagerTest { - @Mock - SimpleLoadBalancerState _simpleLoadBalancerState; - @Mock - JmxManager _jmxManager; - @Captor - ArgumentCaptor _simpleLoadBalancerStateNameCaptor; - @Captor - ArgumentCaptor _simpleLoadBalancerStateCaptor; - @Captor - ArgumentCaptor _simpleLoadBalancerStateListenerCaptor; - @Captor - ArgumentCaptor _unregisteredObjectNameCaptor; - @Captor - ArgumentCaptor _registerObjectNameCaptor; - @Captor - ArgumentCaptor _clusterInfoArgumentCaptor; - @Captor - ArgumentCaptor> _servicePropertiesArgumentCaptor; - - D2ClientJmxManager _d2ClientJmxManager; - private ClusterInfoItem _clusterInfoItem; - private ClusterInfoItem _noPropertyClusterInfoItem; - private LoadBalancerStateItem _servicePropertiesLBState; - private LoadBalancerStateItem _noPropertyLBStateItem; - - @BeforeMethod - public void setUp() + + private static final LoadBalancerStateItem SERVICE_PROPERTIES_LOAD_BALANCER_STATE_ITEM = new LoadBalancerStateItem<>( + new ServiceProperties("S_Foo", "Bar", "/", Collections.singletonList("Random")), + 0, + 0, + CanaryDistributionProvider.Distribution.CANARY + ); + + private static final LoadBalancerStateItem NO_PROPERTY_LB_STATE_ITEM = new LoadBalancerStateItem<>( + null, 0, 0, CanaryDistributionProvider.Distribution.STABLE); + + private static final LoadBalancerStateItem UPDATED_SERVICE_PROPERTIES_LB_STATE_ITEM = new LoadBalancerStateItem<>( + new ServiceProperties("S_Foo", "Bar", "/", Collections.singletonList("Random")), + 0, + 0, + CanaryDistributionProvider.Distribution.STABLE + ); + + private static final class D2ClientJmxManagerFixture { - MockitoAnnotations.initMocks(this); - AtomicLong version = new AtomicLong(0); - when(_simpleLoadBalancerState.getVersionAccess()).thenReturn(version); - _clusterInfoItem = - new ClusterInfoItem(_simpleLoadBalancerState, new ClusterProperties("C_Foo"), new PartitionAccessor() { - @Override - public int getMaxPartitionId() { - return 0; - } - - @Override - public int getPartitionId(URI uri) { - return 0; - } - }, CanaryDistributionProvider.Distribution.CANARY); - _noPropertyClusterInfoItem = new ClusterInfoItem(_simpleLoadBalancerState, null, null, - CanaryDistributionProvider.Distribution.STABLE); - _servicePropertiesLBState = new LoadBalancerStateItem<>( - new ServiceProperties("S_Foo", "Bar", "/", Collections.singletonList("Random")), - 0, - 0, - CanaryDistributionProvider.Distribution.CANARY - ); - _noPropertyLBStateItem = new LoadBalancerStateItem(null, 0, 0, - CanaryDistributionProvider.Distribution.STABLE); - _d2ClientJmxManager = new D2ClientJmxManager("Foo", _jmxManager); - Mockito.doReturn(_jmxManager).when(_jmxManager).unregister(_unregisteredObjectNameCaptor.capture()); - Mockito.doReturn(_jmxManager).when(_jmxManager).registerLoadBalancerState( - _simpleLoadBalancerStateNameCaptor.capture(), _simpleLoadBalancerStateCaptor.capture()); - Mockito.doReturn(_jmxManager).when(_jmxManager).registerClusterInfo( - _registerObjectNameCaptor.capture(), - _clusterInfoArgumentCaptor.capture()); - Mockito.doReturn(_jmxManager).when(_jmxManager).registerServiceProperties( - _registerObjectNameCaptor.capture(), - _servicePropertiesArgumentCaptor.capture()); - Mockito.doNothing().when(_simpleLoadBalancerState).register(_simpleLoadBalancerStateListenerCaptor.capture()); + @Mock + SimpleLoadBalancer _loadBalancer; + @Mock + SimpleLoadBalancerState _simpleLoadBalancerState; + @Mock + JmxManager _jmxManager; + @Mock + FileStore _uriStore; + @Mock + FileStore _clusterStore; + @Mock + FileStore _serviceStore; + @Mock + RelativeLoadBalancerStrategy _relativeLoadBalancerStrategy; + @Mock + DualReadModeProvider _dualReadModeProvider; + @Mock + ScheduledExecutorService _executorService; + @Captor + ArgumentCaptor _simpleLoadBalancerStateNameCaptor; + @Captor + ArgumentCaptor _simpleLoadBalancerStateCaptor; + @Captor + ArgumentCaptor _simpleLoadBalancerStateListenerCaptor; + @Captor + ArgumentCaptor _unregisteredObjectNameCaptor; + @Captor + ArgumentCaptor _registerObjectNameCaptor; + @Captor + ArgumentCaptor _clusterInfoArgumentCaptor; + @Captor + ArgumentCaptor> _servicePropertiesArgumentCaptor; + @SuppressWarnings("rawtypes") + @Captor + ArgumentCaptor _addWatcherCaptor; + + D2ClientJmxManager _d2ClientJmxManager; + private final ClusterInfoItem _clusterInfoItem; + private final ClusterInfoItem _updatedClusterInfoItem; + private final ClusterInfoItem _noPropertyClusterInfoItem; + private final DualReadStateManager _dualReadStateManager; + + D2ClientJmxManagerFixture() + { + MockitoAnnotations.initMocks(this); + AtomicLong version = new AtomicLong(0); + when(_simpleLoadBalancerState.getVersionAccess()).thenReturn(version); + PartitionAccessor partitionAccessor = new PartitionAccessor() { + @Override + public int getMaxPartitionId() { + return 0; + } + + @Override + public int getPartitionId(URI uri) { + return 0; + } + }; + _clusterInfoItem = + new ClusterInfoItem(_simpleLoadBalancerState, new ClusterProperties("C_Foo"), partitionAccessor, + CanaryDistributionProvider.Distribution.CANARY); + _updatedClusterInfoItem = + new ClusterInfoItem(_simpleLoadBalancerState, new ClusterProperties("C_Foo"), partitionAccessor, + CanaryDistributionProvider.Distribution.STABLE); + _noPropertyClusterInfoItem = new ClusterInfoItem(_simpleLoadBalancerState, null, null, + CanaryDistributionProvider.Distribution.STABLE); + Mockito.doReturn(_jmxManager).when(_jmxManager).unregister(_unregisteredObjectNameCaptor.capture()); + Mockito.doReturn(_jmxManager).when(_jmxManager).registerLoadBalancerState( + _simpleLoadBalancerStateNameCaptor.capture(), _simpleLoadBalancerStateCaptor.capture()); + Mockito.doReturn(_jmxManager).when(_jmxManager).registerClusterInfo( + _registerObjectNameCaptor.capture(), + _clusterInfoArgumentCaptor.capture()); + Mockito.doReturn(_jmxManager).when(_jmxManager).registerServiceProperties( + _registerObjectNameCaptor.capture(), + _servicePropertiesArgumentCaptor.capture()); + Mockito.doNothing().when(_simpleLoadBalancerState).register(_simpleLoadBalancerStateListenerCaptor.capture()); + + _dualReadStateManager = spy(new DualReadStateManager(_dualReadModeProvider, _executorService)); + + doCallRealMethod().when(_dualReadStateManager).addGlobalWatcher(any()); + doCallRealMethod().when(_dualReadStateManager).addServiceWatcher(any(), any()); + doCallRealMethod().when(_dualReadStateManager).addClusterWatcher(any(), any()); + doCallRealMethod().when(_dualReadStateManager).updateGlobal(any()); + doCallRealMethod().when(_dualReadStateManager).updateService(any(), any()); + doCallRealMethod().when(_dualReadStateManager).updateCluster(any(), any()); + } + + D2ClientJmxManager getD2ClientJmxManager(String prefix, D2ClientJmxManager.DiscoverySourceType sourceType, Boolean isDualReadLB) + { + if (sourceType == null) + { // default to ZK source type, null dualReadStateManager + _d2ClientJmxManager = new D2ClientJmxManager(prefix, _jmxManager); + } + else + { + _d2ClientJmxManager = new D2ClientJmxManager(prefix, _jmxManager, sourceType, isDualReadLB ? _dualReadStateManager : null); + } + return _d2ClientJmxManager; + } + } + + @DataProvider(name = "nonDualReadD2ClientJmxManagers") + public Object[][] nonDualReadD2ClientJmxManagers() + { + return new Object[][] + { + {"Foo", null, false}, + {"Foo", D2ClientJmxManager.DiscoverySourceType.ZK, false}, + {"Foo", D2ClientJmxManager.DiscoverySourceType.XDS, false} + }; } - @Test() - public void testSetSimpleLBStateListenerUpdateServiceProperties() + @Test(dataProvider = "nonDualReadD2ClientJmxManagers") + public void testSetSimpleLBStateListenerUpdateServiceProperties(String prefix, D2ClientJmxManager.DiscoverySourceType sourceType, + Boolean isDualReadLB) { - _d2ClientJmxManager.setSimpleLoadBalancerState(_simpleLoadBalancerState); - _simpleLoadBalancerStateListenerCaptor.getValue().onServicePropertiesUpdate(null); - Mockito.verify(_jmxManager, never()).registerServiceProperties(any(), any()); - _simpleLoadBalancerStateListenerCaptor.getValue().onServicePropertiesUpdate(_noPropertyLBStateItem); - Mockito.verify(_jmxManager, never()).registerServiceProperties(any(), any()); + D2ClientJmxManagerFixture fixture = new D2ClientJmxManagerFixture(); + D2ClientJmxManager d2ClientJmxManager = fixture.getD2ClientJmxManager(prefix, sourceType, isDualReadLB); - _simpleLoadBalancerStateListenerCaptor.getValue().onServicePropertiesUpdate(_servicePropertiesLBState); + d2ClientJmxManager.setSimpleLoadBalancerState(fixture._simpleLoadBalancerState); + fixture._simpleLoadBalancerStateListenerCaptor.getValue().onServicePropertiesUpdate(null); + Mockito.verify(fixture._jmxManager, never()).registerServiceProperties(any(), any()); + fixture._simpleLoadBalancerStateListenerCaptor.getValue().onServicePropertiesUpdate(NO_PROPERTY_LB_STATE_ITEM); + Mockito.verify(fixture._jmxManager, never()).registerServiceProperties(any(), any()); + + fixture._simpleLoadBalancerStateListenerCaptor.getValue().onServicePropertiesUpdate( + SERVICE_PROPERTIES_LOAD_BALANCER_STATE_ITEM); Assert.assertEquals( - _registerObjectNameCaptor.getValue(), + fixture._registerObjectNameCaptor.getValue(), "S_Foo-ServiceProperties" ); Assert.assertEquals( - _servicePropertiesArgumentCaptor.getValue(), - _servicePropertiesLBState + fixture._servicePropertiesArgumentCaptor.getValue(), SERVICE_PROPERTIES_LOAD_BALANCER_STATE_ITEM ); } - @Test - public void testSetSimpleLBStateListenerUpdateClusterInfo() + @Test(dataProvider = "nonDualReadD2ClientJmxManagers") + public void testSetSimpleLBStateListenerUpdateClusterInfo(String prefix, D2ClientJmxManager.DiscoverySourceType sourceType, + Boolean isDualReadLB) { - _d2ClientJmxManager.setSimpleLoadBalancerState(_simpleLoadBalancerState); - _simpleLoadBalancerStateListenerCaptor.getValue().onClusterInfoUpdate(null); - Mockito.verify(_jmxManager, never()).registerClusterInfo(any(), any()); - _simpleLoadBalancerStateListenerCaptor.getValue().onClusterInfoUpdate(_noPropertyClusterInfoItem); - Mockito.verify(_jmxManager, never()).registerClusterInfo(any(), any()); + D2ClientJmxManagerFixture fixture = new D2ClientJmxManagerFixture(); + D2ClientJmxManager d2ClientJmxManager = fixture.getD2ClientJmxManager(prefix, sourceType, isDualReadLB); - _simpleLoadBalancerStateListenerCaptor.getValue().onClusterInfoUpdate(_clusterInfoItem); + d2ClientJmxManager.setSimpleLoadBalancerState(fixture._simpleLoadBalancerState); + fixture._simpleLoadBalancerStateListenerCaptor.getValue().onClusterInfoUpdate(null); + Mockito.verify(fixture._jmxManager, never()).registerClusterInfo(any(), any()); + fixture._simpleLoadBalancerStateListenerCaptor.getValue().onClusterInfoUpdate(fixture._noPropertyClusterInfoItem); + Mockito.verify(fixture._jmxManager, never()).registerClusterInfo(any(), any()); + + fixture._simpleLoadBalancerStateListenerCaptor.getValue().onClusterInfoUpdate(fixture._clusterInfoItem); Assert.assertEquals( - _registerObjectNameCaptor.getValue(), + fixture._registerObjectNameCaptor.getValue(), "C_Foo-ClusterInfo" ); Assert.assertEquals( - _clusterInfoArgumentCaptor.getValue(), - _clusterInfoItem + fixture._clusterInfoArgumentCaptor.getValue(), + fixture._clusterInfoItem ); } - @Test - public void testSetSimpleLBStateListenerRemoveClusterInfo() + @Test(dataProvider = "nonDualReadD2ClientJmxManagers") + public void testSetSimpleLBStateListenerRemoveClusterInfo(String prefix, D2ClientJmxManager.DiscoverySourceType sourceType, + Boolean isDualReadLB) { - _d2ClientJmxManager.setSimpleLoadBalancerState(_simpleLoadBalancerState); - Assert.assertEquals(_simpleLoadBalancerStateNameCaptor.getValue(), "Foo-LoadBalancerState"); - Assert.assertEquals(_simpleLoadBalancerStateCaptor.getValue(), _simpleLoadBalancerState); - _simpleLoadBalancerStateListenerCaptor.getValue().onClusterInfoRemoval(null); - Mockito.verify(_jmxManager, never()).unregister(anyString()); - _simpleLoadBalancerStateListenerCaptor.getValue().onClusterInfoRemoval(_noPropertyClusterInfoItem); - Mockito.verify(_jmxManager, never()).unregister(anyString()); - - _simpleLoadBalancerStateListenerCaptor.getValue().onClusterInfoRemoval(_clusterInfoItem); + D2ClientJmxManagerFixture fixture = new D2ClientJmxManagerFixture(); + D2ClientJmxManager d2ClientJmxManager = fixture.getD2ClientJmxManager(prefix, sourceType, isDualReadLB); + + d2ClientJmxManager.setSimpleLoadBalancerState(fixture._simpleLoadBalancerState); + Assert.assertEquals(fixture._simpleLoadBalancerStateNameCaptor.getValue(), "Foo-LoadBalancerState"); + Assert.assertEquals(fixture._simpleLoadBalancerStateCaptor.getValue(), fixture._simpleLoadBalancerState); + fixture._simpleLoadBalancerStateListenerCaptor.getValue().onClusterInfoRemoval(null); + Mockito.verify(fixture._jmxManager, never()).unregister(anyString()); + fixture._simpleLoadBalancerStateListenerCaptor.getValue().onClusterInfoRemoval(fixture._noPropertyClusterInfoItem); + Mockito.verify(fixture._jmxManager, never()).unregister(anyString()); + + fixture._simpleLoadBalancerStateListenerCaptor.getValue().onClusterInfoRemoval(fixture._clusterInfoItem); Assert.assertEquals( - _unregisteredObjectNameCaptor.getValue(), - _clusterInfoItem.getClusterPropertiesItem().getProperty().getClusterName() + "-ClusterInfo"); + fixture._unregisteredObjectNameCaptor.getValue(), + fixture._clusterInfoItem.getClusterPropertiesItem().getProperty().getClusterName() + "-ClusterInfo"); } - @Test - public void testSetSimpleLBStateListenerRemoveServiceProperties() + @Test(dataProvider = "nonDualReadD2ClientJmxManagers") + public void testSetSimpleLBStateListenerRemoveServiceProperties(String prefix, D2ClientJmxManager.DiscoverySourceType sourceType, + Boolean isDualReadLB) { - _d2ClientJmxManager.setSimpleLoadBalancerState(_simpleLoadBalancerState); - Assert.assertEquals(_simpleLoadBalancerStateNameCaptor.getValue(), "Foo-LoadBalancerState"); - Assert.assertEquals(_simpleLoadBalancerStateCaptor.getValue(), _simpleLoadBalancerState); - _simpleLoadBalancerStateListenerCaptor.getValue().onServicePropertiesRemoval(null); - Mockito.verify(_jmxManager, never()).unregister(anyString()); - _simpleLoadBalancerStateListenerCaptor.getValue().onServicePropertiesRemoval(_noPropertyLBStateItem); - Mockito.verify(_jmxManager, never()).unregister(anyString()); - - _simpleLoadBalancerStateListenerCaptor.getValue().onServicePropertiesRemoval(_servicePropertiesLBState); + D2ClientJmxManagerFixture fixture = new D2ClientJmxManagerFixture(); + D2ClientJmxManager d2ClientJmxManager = fixture.getD2ClientJmxManager(prefix, sourceType, isDualReadLB); + + d2ClientJmxManager.setSimpleLoadBalancerState(fixture._simpleLoadBalancerState); + Assert.assertEquals(fixture._simpleLoadBalancerStateNameCaptor.getValue(), "Foo-LoadBalancerState"); + Assert.assertEquals(fixture._simpleLoadBalancerStateCaptor.getValue(), fixture._simpleLoadBalancerState); + fixture._simpleLoadBalancerStateListenerCaptor.getValue().onServicePropertiesRemoval(null); + Mockito.verify(fixture._jmxManager, never()).unregister(anyString()); + fixture._simpleLoadBalancerStateListenerCaptor.getValue().onServicePropertiesRemoval(NO_PROPERTY_LB_STATE_ITEM); + Mockito.verify(fixture._jmxManager, never()).unregister(anyString()); + + fixture._simpleLoadBalancerStateListenerCaptor.getValue().onServicePropertiesRemoval( + SERVICE_PROPERTIES_LOAD_BALANCER_STATE_ITEM); Assert.assertEquals( - _unregisteredObjectNameCaptor.getValue(), - _servicePropertiesLBState.getProperty().getServiceName() + "-ServiceProperties"); + fixture._unregisteredObjectNameCaptor.getValue(), + SERVICE_PROPERTIES_LOAD_BALANCER_STATE_ITEM.getProperty().getServiceName() + "-ServiceProperties"); + } + + @SuppressWarnings({"unchecked", "rawtypes"}) + @Test + public void testAddAndRemoveWatcherAtServicePropertiesUpdate() + { + D2ClientJmxManagerFixture fixture = new D2ClientJmxManagerFixture(); + D2ClientJmxManager d2ClientJmxManager = fixture.getD2ClientJmxManager("Foo", D2ClientJmxManager.DiscoverySourceType.XDS, true); + // Initial dual read mode is ZK only. + DualReadStateManager dualReadStateManager = fixture._dualReadStateManager; + dualReadStateManager.updateGlobal(DualReadModeProvider.DualReadMode.OLD_LB_ONLY); + Mockito.doReturn(DualReadModeProvider.DualReadMode.OLD_LB_ONLY).when(dualReadStateManager).getGlobalDualReadMode(); + Mockito.doReturn(DualReadModeProvider.DualReadMode.OLD_LB_ONLY).when(dualReadStateManager).getServiceDualReadMode(any()); + Mockito.doReturn(DualReadModeProvider.DualReadMode.OLD_LB_ONLY).when(dualReadStateManager).getClusterDualReadMode(any()); + + d2ClientJmxManager.setSimpleLoadBalancerState(fixture._simpleLoadBalancerState); + SimpleLoadBalancerState.SimpleLoadBalancerStateListener lbStateListener = fixture._simpleLoadBalancerStateListenerCaptor.getValue(); + ArgumentCaptor addWatcherCaptor = fixture._addWatcherCaptor; + + lbStateListener.onServicePropertiesUpdate(SERVICE_PROPERTIES_LOAD_BALANCER_STATE_ITEM); + // Verify watcher is added with properties inside + verify(dualReadStateManager).addServiceWatcher(eq("S_Foo"), addWatcherCaptor.capture()); + D2ClientJmxDualReadModeWatcherManager.D2ClientJmxDualReadModeWatcher> watcher = addWatcherCaptor.getValue(); + Assert.assertEquals(watcher.getLatestJmxProperty(), SERVICE_PROPERTIES_LOAD_BALANCER_STATE_ITEM); + + lbStateListener.onServicePropertiesUpdate(UPDATED_SERVICE_PROPERTIES_LB_STATE_ITEM); + // Verify watcher is not added again, and properties in the watcher is updated + verify(dualReadStateManager, times(1)).addServiceWatcher(any(), any()); + Assert.assertEquals(watcher.getLatestJmxProperty(), UPDATED_SERVICE_PROPERTIES_LB_STATE_ITEM); + + // Verify watch is removed + lbStateListener.onServicePropertiesRemoval(UPDATED_SERVICE_PROPERTIES_LB_STATE_ITEM); + verify(dualReadStateManager).removeServiceWatcher(eq("S_Foo"), eq(watcher)); + } + + @SuppressWarnings({"unchecked", "rawtypes"}) + @Test + public void testAddAndRemoveWatcherAtClusterInfoItemUpdate() + { + D2ClientJmxManagerFixture fixture = new D2ClientJmxManagerFixture(); + D2ClientJmxManager d2ClientJmxManager = fixture.getD2ClientJmxManager("Foo", D2ClientJmxManager.DiscoverySourceType.XDS, true); + d2ClientJmxManager.setSimpleLoadBalancerState(fixture._simpleLoadBalancerState); + // Initial dual read mode is ZK only. + DualReadStateManager dualReadStateManager = fixture._dualReadStateManager; + dualReadStateManager.updateGlobal(DualReadModeProvider.DualReadMode.OLD_LB_ONLY); + Mockito.doReturn(DualReadModeProvider.DualReadMode.OLD_LB_ONLY).when(dualReadStateManager).getGlobalDualReadMode(); + Mockito.doReturn(DualReadModeProvider.DualReadMode.OLD_LB_ONLY).when(dualReadStateManager).getServiceDualReadMode(any()); + Mockito.doReturn(DualReadModeProvider.DualReadMode.OLD_LB_ONLY).when(dualReadStateManager).getClusterDualReadMode(any()); + + SimpleLoadBalancerState.SimpleLoadBalancerStateListener lbStateListener = fixture._simpleLoadBalancerStateListenerCaptor.getValue(); + ArgumentCaptor addWatcherCaptor = fixture._addWatcherCaptor; + + lbStateListener.onClusterInfoUpdate(fixture._clusterInfoItem); + + // Verify watcher is added with properties inside + verify(dualReadStateManager).addClusterWatcher(eq("C_Foo"), addWatcherCaptor.capture()); + D2ClientJmxDualReadModeWatcherManager.D2ClientJmxDualReadModeWatcher watcher = addWatcherCaptor.getValue(); + Assert.assertEquals(watcher.getLatestJmxProperty(), fixture._clusterInfoItem); + + lbStateListener.onClusterInfoUpdate(fixture._updatedClusterInfoItem); + // Verify watcher is not added again, and properties in the watcher is updated + verify(dualReadStateManager, times(1)).addClusterWatcher(any(), any()); + Assert.assertEquals(watcher.getLatestJmxProperty(), fixture._updatedClusterInfoItem); + + // Verify watch is removed + lbStateListener.onClusterInfoRemoval(fixture._updatedClusterInfoItem); + verify(dualReadStateManager).removeClusterWatcher(eq("C_Foo"), eq(watcher)); + } + + @DataProvider(name = "sourceTypeAndDualReadModeForLixSwitch") + public Object[][] sourceTypeAndDualReadModeForDualReadModeSwitch() + { + return new Object[][] + { + // ZK source is still primary switching OLD_LB_ONLY -> DUAL_READ + {D2ClientJmxManager.DiscoverySourceType.ZK, DualReadModeProvider.DualReadMode.OLD_LB_ONLY, + DualReadModeProvider.DualReadMode.DUAL_READ, true, true}, + // XDS source is still secondary switching OLD_LB_ONLY -> DUAL_READ + {D2ClientJmxManager.DiscoverySourceType.XDS, DualReadModeProvider.DualReadMode.OLD_LB_ONLY, + DualReadModeProvider.DualReadMode.DUAL_READ, false, false}, + // ZK source becomes secondary switching DUAL_READ -> NEW_LB_ONLY + {D2ClientJmxManager.DiscoverySourceType.ZK, DualReadModeProvider.DualReadMode.DUAL_READ, + DualReadModeProvider.DualReadMode.NEW_LB_ONLY, true, false}, + // XDS source becomes primary switching DUAL_READ -> NEW_LB_ONLY + {D2ClientJmxManager.DiscoverySourceType.XDS, DualReadModeProvider.DualReadMode.DUAL_READ, + DualReadModeProvider.DualReadMode.NEW_LB_ONLY, false, true}, + // ZK source becomes primary switching NEW_LB_ONLY -> DUAL_READ + {D2ClientJmxManager.DiscoverySourceType.ZK, DualReadModeProvider.DualReadMode.NEW_LB_ONLY, + DualReadModeProvider.DualReadMode.DUAL_READ, false, true}, + // XDS source becomes secondary switching NEW_LB_ONLY -> DUAL_READ + {D2ClientJmxManager.DiscoverySourceType.XDS, DualReadModeProvider.DualReadMode.NEW_LB_ONLY, + DualReadModeProvider.DualReadMode.DUAL_READ, true, false}, + // ZK source is still primary switching DUAL_READ -> OLD_LB_ONLY + {D2ClientJmxManager.DiscoverySourceType.ZK, DualReadModeProvider.DualReadMode.DUAL_READ, + DualReadModeProvider.DualReadMode.OLD_LB_ONLY, true, true}, + // XDS source is still secondary switching DUAL_READ -> OLD_LB_ONLY + {D2ClientJmxManager.DiscoverySourceType.XDS, DualReadModeProvider.DualReadMode.DUAL_READ, + DualReadModeProvider.DualReadMode.OLD_LB_ONLY, false, false}, + // ZK source is still primary switching NEW_LB_ONLY -> OLD_LB_ONLY + {D2ClientJmxManager.DiscoverySourceType.ZK, DualReadModeProvider.DualReadMode.NEW_LB_ONLY, + DualReadModeProvider.DualReadMode.OLD_LB_ONLY, false, true}, + // XDS source is still secondary switching NEW_LB_ONLY -> OLD_LB_ONLY + {D2ClientJmxManager.DiscoverySourceType.XDS, DualReadModeProvider.DualReadMode.NEW_LB_ONLY, + DualReadModeProvider.DualReadMode.OLD_LB_ONLY, true, false} + }; + } + @Test(dataProvider = "sourceTypeAndDualReadModeForLixSwitch") + public void testJmxNamesOnDualReadModeSwitch(D2ClientJmxManager.DiscoverySourceType sourceType, + DualReadModeProvider.DualReadMode oldMode, DualReadModeProvider.DualReadMode newMode, boolean isPrimaryBefore, boolean isPrimaryAfter) + { + D2ClientJmxManagerFixture fixture = new D2ClientJmxManagerFixture(); + D2ClientJmxManager d2ClientJmxManager = fixture.getD2ClientJmxManager("Foo", sourceType, true); + + DualReadStateManager dualReadStateManager = fixture._dualReadStateManager; + dualReadStateManager.updateGlobal(oldMode); + doReturn(oldMode).when(dualReadStateManager).getGlobalDualReadMode(); + doReturn(oldMode).when(dualReadStateManager).getServiceDualReadMode(any()); + doReturn(oldMode).when(dualReadStateManager).getClusterDualReadMode(any()); + + d2ClientJmxManager.setSimpleLoadBalancer(fixture._loadBalancer); + d2ClientJmxManager.setSimpleLoadBalancerState(fixture._simpleLoadBalancerState); + SimpleLoadBalancerState.SimpleLoadBalancerStateListener lbStateListener = fixture._simpleLoadBalancerStateListenerCaptor.getValue(); + lbStateListener.onServicePropertiesUpdate(SERVICE_PROPERTIES_LOAD_BALANCER_STATE_ITEM); + lbStateListener.onClusterInfoUpdate(fixture._clusterInfoItem); + lbStateListener.onStrategyAdded("S_Foo", "https", fixture._relativeLoadBalancerStrategy); + d2ClientJmxManager.setFsUriStore(fixture._uriStore); + d2ClientJmxManager.setFsClusterStore(fixture._clusterStore); + d2ClientJmxManager.setFsServiceStore(fixture._serviceStore); + + verifyJmxNames(fixture, sourceType, isPrimaryBefore, false); + + doReturn(newMode).when(dualReadStateManager).getGlobalDualReadMode(); + doReturn(newMode).when(dualReadStateManager).getServiceDualReadMode(any()); + doReturn(newMode).when(dualReadStateManager).getClusterDualReadMode(any()); + + // trigger notifying watchers + dualReadStateManager.updateGlobal(newMode); + dualReadStateManager.updateService("S_Foo", newMode); + dualReadStateManager.updateCluster("C_Foo", newMode); + + verifyJmxNames(fixture, sourceType, isPrimaryAfter, isPrimaryBefore == isPrimaryAfter); + } + + private void verifyJmxNames(D2ClientJmxManagerFixture fixture, D2ClientJmxManager.DiscoverySourceType sourceType, + boolean expectedToBePrimary, boolean calledTwice) + { + JmxManager jmxManager = fixture._jmxManager; + int callTimes = calledTwice ? 2 : 1; + if (expectedToBePrimary) + { + verify(jmxManager, times(callTimes)).registerLoadBalancer(eq("Foo-LoadBalancer"), eq(fixture._loadBalancer)); + verify(jmxManager, times(callTimes)).registerLoadBalancerState(eq("Foo-LoadBalancerState"), eq(fixture._simpleLoadBalancerState)); + verify(jmxManager, times(callTimes)).registerFileStore(eq("Foo-FileStoreUriStore"), eq(fixture._uriStore)); + verify(jmxManager, times(callTimes)).registerFileStore(eq("Foo-FileStoreClusterStore"), eq(fixture._clusterStore)); + verify(jmxManager, times(callTimes)).registerFileStore(eq("Foo-FileStoreServiceStore"), eq(fixture._serviceStore)); + verify(jmxManager, times(callTimes)).registerServiceProperties(eq("S_Foo-ServiceProperties"), eq(SERVICE_PROPERTIES_LOAD_BALANCER_STATE_ITEM)); + verify(jmxManager, times(callTimes)).registerClusterInfo(eq("C_Foo-ClusterInfo"), eq(fixture._clusterInfoItem)); + verify(jmxManager, times(callTimes)).registerLoadBalancerStrategy(eq("S_Foo-https-LoadBalancerStrategy"), eq(fixture._relativeLoadBalancerStrategy)); + } + else + { // secondary source, include source type name in jmx names + switch (sourceType) + { + case XDS: + verify(jmxManager, times(callTimes)).registerLoadBalancer(eq("Foo-xDS-LoadBalancer"), eq(fixture._loadBalancer)); + verify(jmxManager, times(callTimes)).registerLoadBalancerState(eq("Foo-xDS-LoadBalancerState"), eq(fixture._simpleLoadBalancerState)); + verify(jmxManager, times(callTimes)).registerFileStore(eq("Foo-xDS-FileStoreUriStore"), eq(fixture._uriStore)); + verify(jmxManager, times(callTimes)).registerFileStore(eq("Foo-xDS-FileStoreClusterStore"), eq(fixture._clusterStore)); + verify(jmxManager, times(callTimes)).registerFileStore(eq("Foo-xDS-FileStoreServiceStore"), eq(fixture._serviceStore)); + verify(jmxManager, times(callTimes)).registerServiceProperties(eq("xDS-S_Foo-ServiceProperties"), eq(SERVICE_PROPERTIES_LOAD_BALANCER_STATE_ITEM)); + verify(jmxManager, times(callTimes)).registerClusterInfo(eq("xDS-C_Foo-ClusterInfo"), eq(fixture._clusterInfoItem)); + verify(jmxManager, times(callTimes)).registerLoadBalancerStrategy(eq("xDS-S_Foo-https-LoadBalancerStrategy"), eq(fixture._relativeLoadBalancerStrategy)); + break; + case ZK: + verify(jmxManager, times(callTimes)).registerLoadBalancer(eq("Foo-ZK-LoadBalancer"), eq(fixture._loadBalancer)); + verify(jmxManager, times(callTimes)).registerLoadBalancerState(eq("Foo-ZK-LoadBalancerState"), eq(fixture._simpleLoadBalancerState)); + verify(jmxManager, times(callTimes)).registerFileStore(eq("Foo-ZK-FileStoreUriStore"), eq(fixture._uriStore)); + verify(jmxManager, times(callTimes)).registerFileStore(eq("Foo-ZK-FileStoreClusterStore"), eq(fixture._clusterStore)); + verify(jmxManager, times(callTimes)).registerFileStore(eq("Foo-ZK-FileStoreServiceStore"), eq(fixture._serviceStore)); + verify(jmxManager, times(callTimes)).registerServiceProperties(eq("ZK-S_Foo-ServiceProperties"), eq(SERVICE_PROPERTIES_LOAD_BALANCER_STATE_ITEM)); + verify(jmxManager, times(callTimes)).registerClusterInfo(eq("ZK-C_Foo-ClusterInfo"), eq(fixture._clusterInfoItem)); + verify(jmxManager, times(callTimes)).registerLoadBalancerStrategy(eq("ZK-S_Foo-https-LoadBalancerStrategy"), eq(fixture._relativeLoadBalancerStrategy)); + break; + default: + Assert.fail(String.format("Unknown source type: %s", sourceType)); + } + } } } diff --git a/gradle.properties b/gradle.properties index e844d0a81d..93304a77b9 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,4 +1,4 @@ -version=29.43.11 +version=29.44.0 group=com.linkedin.pegasus org.gradle.configureondemand=true org.gradle.parallel=true diff --git a/r2-int-test/src/test/java/test/r2/integ/TestJetty404.java b/r2-int-test/src/test/java/test/r2/integ/TestJetty404.java index 15241403f8..1d47bb6406 100644 --- a/r2-int-test/src/test/java/test/r2/integ/TestJetty404.java +++ b/r2-int-test/src/test/java/test/r2/integ/TestJetty404.java @@ -26,6 +26,7 @@ import com.linkedin.r2.transport.http.server.HttpServer; import com.linkedin.r2.transport.http.server.HttpServerFactory; import com.linkedin.test.util.retry.SingleRetry; +import com.linkedin.test.util.retry.ThreeRetries; import java.io.IOException; import java.net.URI; import java.util.Collections; @@ -74,7 +75,7 @@ public void handleStreamRequest(StreamRequest req, Map wireAttrs } // make sure jetty's default behavior will read all the request bytes in case of 404 - @Test(retryAnalyzer = SingleRetry.class) // Known to be flaky in CI + @Test(retryAnalyzer = ThreeRetries.class) // Known to be flaky in CI public void testJetty404() throws Exception { BytesWriter writer = new BytesWriter(200 * 1024, (byte)100); diff --git a/r2-netty/src/test/java/com/linkedin/r2/transport/http/client/TestChannelPoolManagerFactorySharingConnection.java b/r2-netty/src/test/java/com/linkedin/r2/transport/http/client/TestChannelPoolManagerFactorySharingConnection.java index 748b62137c..215ebd6dcd 100644 --- a/r2-netty/src/test/java/com/linkedin/r2/transport/http/client/TestChannelPoolManagerFactorySharingConnection.java +++ b/r2-netty/src/test/java/com/linkedin/r2/transport/http/client/TestChannelPoolManagerFactorySharingConnection.java @@ -16,6 +16,7 @@ package com.linkedin.r2.transport.http.client; +import com.linkedin.test.util.retry.ThreeRetries; import java.net.URI; import java.util.ArrayList; import java.util.HashMap; @@ -74,7 +75,7 @@ public static Object[][] configsOpenedConnections() * End to end test. Testing all the client combinations (http/https stream/rest sharing/not sharing) and check they * are using the same channelPoolManager */ - @Test(dataProvider = "configsOpenedConnections") + @Test(dataProvider = "configsOpenedConnections", retryAnalyzer = ThreeRetries.class) // Known to be flaky in CI public void testSuccessfulRequests(boolean restOverStream, String protocolVersion, boolean shareConnection) throws Exception {