diff --git a/CHANGELOG.md b/CHANGELOG.md index 502845ad87..a47f99f11d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,10 +13,12 @@ When updating the changelog, remember to be very clear about what behavior has c and what APIs have changed, if applicable. ## [Unreleased] -- fix logging issues about observer host and dual read mode -## [29.43.11] - 2023-07-28 -- differentiate LB metrics between ZK and xDS read flows. +## [29.44.0] - 2023-08-06 +- dynamically switch jmx/sensor names based on dual read mode and source type + +## [29.43.11] - 2023-08-01 +- fix logging issues about observer host and dual read mode ## [29.43.10] - 2023-07-24 - set log level of dual read mode changes to info. @@ -5513,7 +5515,8 @@ patch operations can re-use these classes for generating patch messages. ## [0.14.1] -[Unreleased]: https://github.com/linkedin/rest.li/compare/v29.43.11...master +[Unreleased]: https://github.com/linkedin/rest.li/compare/v29.44.0...master +[29.44.0]: https://github.com/linkedin/rest.li/compare/v29.43.11...v29.44.0 [29.43.11]: https://github.com/linkedin/rest.li/compare/v29.43.10...v29.43.11 [29.43.10]: https://github.com/linkedin/rest.li/compare/v29.43.9...v29.43.10 [29.43.9]: https://github.com/linkedin/rest.li/compare/v29.43.8...v29.43.9 diff --git a/d2/src/main/java/com/linkedin/d2/balancer/LastSeenBalancerWithFacilitiesFactory.java b/d2/src/main/java/com/linkedin/d2/balancer/LastSeenBalancerWithFacilitiesFactory.java index 86c83d4003..92e6badba8 100644 --- a/d2/src/main/java/com/linkedin/d2/balancer/LastSeenBalancerWithFacilitiesFactory.java +++ b/d2/src/main/java/com/linkedin/d2/balancer/LastSeenBalancerWithFacilitiesFactory.java @@ -59,7 +59,8 @@ public LoadBalancerWithFacilities create(D2ClientConfig config) { LOG.info("Creating D2 LoadBalancer based on LastSeenLoadBalancerWithFacilities"); - D2ClientJmxManager d2ClientJmxManager = new D2ClientJmxManager(config.d2JmxManagerPrefix, config.jmxManager); + D2ClientJmxManager d2ClientJmxManager = new D2ClientJmxManager(config.d2JmxManagerPrefix, config.jmxManager, + D2ClientJmxManager.DiscoverySourceType.ZK, config.dualReadStateManager); // init connection ZKConnectionBuilder zkConnectionBuilder = new ZKConnectionBuilder(config.zkHosts); diff --git a/d2/src/main/java/com/linkedin/d2/balancer/ZKFSLoadBalancerWithFacilitiesFactory.java b/d2/src/main/java/com/linkedin/d2/balancer/ZKFSLoadBalancerWithFacilitiesFactory.java index 6c6046e12a..7b666c5bf3 100644 --- a/d2/src/main/java/com/linkedin/d2/balancer/ZKFSLoadBalancerWithFacilitiesFactory.java +++ b/d2/src/main/java/com/linkedin/d2/balancer/ZKFSLoadBalancerWithFacilitiesFactory.java @@ -73,7 +73,8 @@ private ZKFSLoadBalancer.TogglingLoadBalancerFactory createLoadBalancerFactory(D loadBalancerComponentFactory = config.componentFactory; } - D2ClientJmxManager d2ClientJmxManager = new D2ClientJmxManager(config.d2JmxManagerPrefix, config.jmxManager); + D2ClientJmxManager d2ClientJmxManager = new D2ClientJmxManager(config.d2JmxManagerPrefix, config.jmxManager, + D2ClientJmxManager.DiscoverySourceType.ZK, config.dualReadStateManager); return new ZKFSTogglingLoadBalancerFactoryImpl(loadBalancerComponentFactory, config.lbWaitTimeout, diff --git a/d2/src/main/java/com/linkedin/d2/balancer/dualread/DualReadStateManager.java b/d2/src/main/java/com/linkedin/d2/balancer/dualread/DualReadStateManager.java index 8f50401599..3318f8d0bb 100644 --- a/d2/src/main/java/com/linkedin/d2/balancer/dualread/DualReadStateManager.java +++ b/d2/src/main/java/com/linkedin/d2/balancer/dualread/DualReadStateManager.java @@ -22,6 +22,7 @@ import com.linkedin.d2.balancer.properties.UriProperties; import com.linkedin.util.clock.Clock; import com.linkedin.util.clock.SystemClock; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ScheduledExecutorService; @@ -32,7 +33,6 @@ /** * Checks and manages the global and per-service dual read state. * Provides monitoring of the dual read data. - * * The dual read state is broken down into global and per-service state. Per-service dual read * mode has a higher priority. Only if per-service dual read mode is not defined, global * dual read mode will be used. @@ -52,6 +52,10 @@ public class DualReadStateManager private final RateLimiter _rateLimiter; // Stores global dual read mode private volatile DualReadModeProvider.DualReadMode _dualReadMode = DualReadModeProvider.DualReadMode.OLD_LB_ONLY; + private final Set _globalDualReadModeWatchers; + private final ConcurrentMap> _serviceDualReadModeWatchers; + private final ConcurrentMap> _clusterDualReadModeWatchers; + private final DualReadLoadBalancerJmx _dualReadLoadBalancerJmx; private final DualReadLoadBalancerMonitor.UriPropertiesDualReadMonitor _uriPropertiesDualReadMonitor; @@ -74,6 +78,9 @@ public DualReadStateManager(DualReadModeProvider dualReadModeProvider, Scheduled _dualReadModeProvider = dualReadModeProvider; _executorService = executorService; _rateLimiter = RateLimiter.create((double) 1 / DUAL_READ_MODE_SWITCH_MIN_INTERVAL); + _globalDualReadModeWatchers = ConcurrentHashMap.newKeySet(); + _serviceDualReadModeWatchers = new ConcurrentHashMap<>(); + _clusterDualReadModeWatchers = new ConcurrentHashMap<>(); } public void updateGlobal(DualReadModeProvider.DualReadMode mode) @@ -82,6 +89,7 @@ public void updateGlobal(DualReadModeProvider.DualReadMode mode) _dualReadMode = mode; if (updated) { LOG.info("Global dual read mode updated: {}", mode); + notifyGlobalWatchers(); } } @@ -90,6 +98,7 @@ public void updateService(String service, DualReadModeProvider.DualReadMode mode DualReadModeProvider.DualReadMode oldMode = _serviceDualReadModes.put(service, mode); if (oldMode != mode) { LOG.info("Dual read mode for service {} updated: {}", service, mode); + notifyServiceWatchers(service); } } @@ -98,6 +107,7 @@ public void updateCluster(String cluster, DualReadModeProvider.DualReadMode mode DualReadModeProvider.DualReadMode oldMode = _clusterDualReadModes.put(cluster, mode); if (oldMode != mode) { LOG.info("Dual read mode for cluster {} updated: {}", cluster, mode); + notifyClusterWatchers(cluster); } } @@ -198,4 +208,70 @@ public DualReadModeProvider getDualReadModeProvider() { return _dualReadModeProvider; } + + public void addGlobalWatcher(DualReadModeWatcher watcher) + { + _globalDualReadModeWatchers.add(watcher); + } + + public void addServiceWatcher(String serviceName, DualReadModeWatcher watcher) + { + Set watchers = _serviceDualReadModeWatchers.computeIfAbsent(serviceName, k -> ConcurrentHashMap.newKeySet()); + watchers.add(watcher); + } + + public void addClusterWatcher(String clusterName, DualReadModeWatcher watcher) + { + Set watchers = _clusterDualReadModeWatchers.computeIfAbsent(clusterName, k -> ConcurrentHashMap.newKeySet()); + watchers.add(watcher); + } + + public void removeServiceWatcher(String serviceName, DualReadModeWatcher watcher) + { + Set watchers = _serviceDualReadModeWatchers.get(serviceName); + if (watchers != null) + { + watchers.remove(watcher); + } + } + + public void removeClusterWatcher(String clusterName, DualReadModeWatcher watcher) + { + Set watchers = _clusterDualReadModeWatchers.get(clusterName); + if (watchers != null) + { + watchers.remove(watcher); + } + } + + private void notifyGlobalWatchers() + { + notifyWatchers(_globalDualReadModeWatchers); + } + + private void notifyServiceWatchers(String serviceName) + { + notifyWatchers(_serviceDualReadModeWatchers.get(serviceName)); + } + + private void notifyClusterWatchers(String clusterName) + { + notifyWatchers(_clusterDualReadModeWatchers.get(clusterName)); + } + + private void notifyWatchers(Set watchers) + { + if (watchers != null) + { + for (DualReadModeWatcher w : watchers) + { + w.onChanged(); + } + } + } + + public interface DualReadModeWatcher + { + void onChanged(); + } } diff --git a/d2/src/main/java/com/linkedin/d2/jmx/D2ClientJmxManager.java b/d2/src/main/java/com/linkedin/d2/jmx/D2ClientJmxManager.java index 5a9a64e49d..cc5ee6d12c 100644 --- a/d2/src/main/java/com/linkedin/d2/jmx/D2ClientJmxManager.java +++ b/d2/src/main/java/com/linkedin/d2/jmx/D2ClientJmxManager.java @@ -16,9 +16,12 @@ package com.linkedin.d2.jmx; +import com.google.common.annotations.VisibleForTesting; import com.linkedin.d2.balancer.LoadBalancerStateItem; import com.linkedin.d2.balancer.clients.TrackerClient; import com.linkedin.d2.balancer.dualread.DualReadLoadBalancerJmx; +import com.linkedin.d2.balancer.dualread.DualReadModeProvider; +import com.linkedin.d2.balancer.dualread.DualReadStateManager; import com.linkedin.d2.balancer.properties.ServiceProperties; import com.linkedin.d2.balancer.simple.ClusterInfoItem; import com.linkedin.d2.balancer.simple.SimpleLoadBalancer; @@ -29,45 +32,149 @@ import com.linkedin.d2.discovery.stores.zk.ZooKeeperEphemeralStore; import com.linkedin.d2.discovery.stores.zk.ZooKeeperPermanentStore; import com.linkedin.util.ArgumentUtil; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.function.Consumer; import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * JMX manager to register the D2 client components */ public class D2ClientJmxManager { - private final String _prefix; + private static final Logger _log = LoggerFactory.getLogger(D2ClientJmxManager.class); + private final JmxManager _jmxManager; + // Service discovery source type: ZK, xDS, etc. + private final DiscoverySourceType _discoverySourceType; + + /* + When dual read state manager is null, only one discovery source is working (could be a new source other than ZK). We keep using the same Jmx/sensor + names as the ZK one so users can still monitor the same metrics. + + When dual read state manager is not null, it means dual read load balancer is in use, and there are two sets of load balancer, lb state, and FS backup + registering Jmx/sensors for different service discovery sources. + Depending on the specific dual read mode that is dynamically changing, controlled by lix on d2 service level, one source is primary, the other is secondary. + Jmx/sensor names need to be carefully handled to: + 1) for the primary source, use the primary names (the one ZK was using) so users can still monitor the same metrics. + 2) for the secondary source, use different names that include the source type to avoid conflicting the primary names. + */ + private final DualReadStateManager _dualReadStateManager; + + private final String _primaryGlobalPrefix; + + private final String _secondaryGlobalPrefix; + + private static final String _primaryPrefixForLbPropertyJmxName = ""; + + private final String _secondaryPrefixForLbPropertyJmxName; + + @SuppressWarnings("rawtypes") + private final ConcurrentMap _dualReadStateWatchers; + + public enum DiscoverySourceType + { + ZK("ZK"), + XDS("xDS"); + + private final String _printName; + + DiscoverySourceType(String printName) + { + _printName = printName; + } + + public String getPrintName() + { + return _printName; + } + } + public D2ClientJmxManager(String prefix, @Nonnull JmxManager jmxManager) + { + this(prefix, jmxManager, DiscoverySourceType.ZK, null); + } + + public D2ClientJmxManager(String prefix, + @Nonnull JmxManager jmxManager, + @Nonnull DiscoverySourceType discoverySourceType, + @Nullable DualReadStateManager dualReadStateManager) { ArgumentUtil.ensureNotNull(jmxManager,"jmxManager"); - _prefix = prefix; + _primaryGlobalPrefix = prefix; _jmxManager = jmxManager; + _discoverySourceType = discoverySourceType; + _dualReadStateManager = dualReadStateManager; + _secondaryGlobalPrefix = String.format("%s-%s", _primaryGlobalPrefix, _discoverySourceType.getPrintName()); + _secondaryPrefixForLbPropertyJmxName = String.format("%s-", _discoverySourceType.getPrintName()); + _dualReadStateWatchers = new ConcurrentHashMap<>(); } + @SuppressWarnings({"unchecked"}) public void setSimpleLoadBalancer(SimpleLoadBalancer balancer) { - final String jmxName = _prefix + "-LoadBalancer"; - - _jmxManager.registerLoadBalancer(jmxName, balancer); + if (_dualReadStateManager != null) + { + String watcherName = balancer.getClass().getSimpleName(); + D2ClientJmxDualReadModeWatcher currentWatcher = _dualReadStateWatchers.computeIfAbsent(watcherName, k -> + { + D2ClientJmxDualReadModeWatcher watcher = new D2ClientJmxDualReadModeWatcher<>(balancer, this::doRegisterLoadBalancer); + _dualReadStateManager.addGlobalWatcher(watcher); + return watcher; + }); + currentWatcher.setLatestJmxProperty(balancer); + } + doRegisterLoadBalancer(balancer); } + @SuppressWarnings({"unchecked"}) public void setSimpleLoadBalancerState(SimpleLoadBalancerState state) { - _jmxManager.registerLoadBalancerState(_prefix + "-LoadBalancerState", state); + if (_dualReadStateManager != null) + { + String watcherName = state.getClass().getSimpleName(); + D2ClientJmxDualReadModeWatcher currentWatcher = _dualReadStateWatchers.computeIfAbsent(watcherName, k -> + { + D2ClientJmxDualReadModeWatcher watcher = new D2ClientJmxDualReadModeWatcher<>(state, this::doRegisterLoadBalancerState); + _dualReadStateManager.addGlobalWatcher(watcher); + return watcher; + }); + currentWatcher.setLatestJmxProperty(state); + } + doRegisterLoadBalancerState(state); state.register(new SimpleLoadBalancerStateListener() { @Override public void onStrategyAdded(String serviceName, String scheme, LoadBalancerStrategy strategy) { - _jmxManager.registerLoadBalancerStrategy(getLoadBalancerStrategyJmxName(serviceName, scheme), strategy); + if (_dualReadStateManager != null) + { + D2ClientJmxDualReadModeWatcher currentWatcher = + _dualReadStateWatchers.computeIfAbsent(getWatcherNameForLoadBalancerStrategy(serviceName, scheme), k -> + { + Consumer callback = i -> doRegisterLoadBalancerStrategy(serviceName, scheme, i); + D2ClientJmxDualReadModeWatcher watcher = new D2ClientJmxDualReadModeWatcher<>(strategy, callback); + _dualReadStateManager.addServiceWatcher(serviceName, watcher); + return watcher; + }); + currentWatcher.setLatestJmxProperty(strategy); + } + doRegisterLoadBalancerStrategy(serviceName, scheme, strategy); } @Override public void onStrategyRemoved(String serviceName, String scheme, LoadBalancerStrategy strategy) { + DualReadStateManager.DualReadModeWatcher watcher = _dualReadStateWatchers.remove(getWatcherNameForLoadBalancerStrategy(serviceName, scheme)); + if (_dualReadStateManager != null && watcher != null) + { + _dualReadStateManager.removeServiceWatcher(serviceName, watcher); + } _jmxManager.unregister(getLoadBalancerStrategyJmxName(serviceName, scheme)); } @@ -90,10 +197,22 @@ public void onClientRemoved(String clusterName, TrackerClient client) public void onClusterInfoUpdate(ClusterInfoItem clusterInfoItem) { if (clusterInfoItem != null && clusterInfoItem.getClusterPropertiesItem() != null - && clusterInfoItem.getClusterPropertiesItem().getProperty() != null) { - _jmxManager.registerClusterInfo( - getClusterInfoJmxName(clusterInfoItem.getClusterPropertiesItem().getProperty().getClusterName()), - clusterInfoItem); + && clusterInfoItem.getClusterPropertiesItem().getProperty() != null) + { + String clusterName = clusterInfoItem.getClusterPropertiesItem().getProperty().getClusterName(); + if (_dualReadStateManager != null) + { + D2ClientJmxDualReadModeWatcher currentWatcher = + _dualReadStateWatchers.computeIfAbsent(getWatcherNameForClusterInfoItem(clusterName), k -> + { + Consumer callback = i -> doRegisterClusterInfo(clusterName, i); + D2ClientJmxDualReadModeWatcher watcher = new D2ClientJmxDualReadModeWatcher<>(clusterInfoItem, callback); + _dualReadStateManager.addClusterWatcher(clusterName, watcher); + return watcher; + }); + currentWatcher.setLatestJmxProperty(clusterInfoItem); + } + doRegisterClusterInfo(clusterName, clusterInfoItem); } } @@ -101,85 +220,290 @@ public void onClusterInfoUpdate(ClusterInfoItem clusterInfoItem) public void onClusterInfoRemoval(ClusterInfoItem clusterInfoItem) { if (clusterInfoItem != null && clusterInfoItem.getClusterPropertiesItem() != null - && clusterInfoItem.getClusterPropertiesItem().getProperty() != null) { - _jmxManager.unregister( - getClusterInfoJmxName(clusterInfoItem.getClusterPropertiesItem().getProperty().getClusterName()) - ); + && clusterInfoItem.getClusterPropertiesItem().getProperty() != null) + { + String clusterName = clusterInfoItem.getClusterPropertiesItem().getProperty().getClusterName(); + DualReadStateManager.DualReadModeWatcher watcher = _dualReadStateWatchers.remove(getWatcherNameForClusterInfoItem(clusterName)); + if (_dualReadStateManager != null && watcher != null) + { + _dualReadStateManager.removeClusterWatcher(clusterName, watcher); + } + _jmxManager.unregister(getClusterInfoJmxName(clusterName)); } } @Override public void onServicePropertiesUpdate(LoadBalancerStateItem serviceProperties) { - if (serviceProperties != null && serviceProperties.getProperty() != null) { - _jmxManager.registerServiceProperties( - getServicePropertiesJmxName(serviceProperties.getProperty().getServiceName()), - serviceProperties); + if (serviceProperties != null && serviceProperties.getProperty() != null) + { + String serviceName = serviceProperties.getProperty().getServiceName(); + if (_dualReadStateManager != null) + { + D2ClientJmxDualReadModeWatcher> currentWatcher = + _dualReadStateWatchers.computeIfAbsent(getWatcherNameForServiceProperties(serviceName), k -> + { + Consumer> callback = i -> doRegisterServiceProperties(serviceName, i); + D2ClientJmxDualReadModeWatcher> watcher = + new D2ClientJmxDualReadModeWatcher<>(serviceProperties, callback); + _dualReadStateManager.addServiceWatcher(serviceName, watcher); + return watcher; + }); + currentWatcher.setLatestJmxProperty(serviceProperties); + } + doRegisterServiceProperties(serviceName, serviceProperties); } } - @Override public void onServicePropertiesRemoval(LoadBalancerStateItem serviceProperties) { - if (serviceProperties != null && serviceProperties.getProperty() != null) { - _jmxManager.unregister(getServicePropertiesJmxName(serviceProperties.getProperty().getServiceName())); + if (serviceProperties != null && serviceProperties.getProperty() != null) + { + String serviceName = serviceProperties.getProperty().getServiceName(); + DualReadStateManager.DualReadModeWatcher watcher = _dualReadStateWatchers.remove(getWatcherNameForServiceProperties(serviceName)); + if (_dualReadStateManager != null && watcher != null) + { + _dualReadStateManager.removeServiceWatcher(serviceName, watcher); + } + _jmxManager.unregister(getServicePropertiesJmxName(serviceName)); } } - private String getPrefixForLBJmxNames() { - return _prefix.contains("-xDS") ? String.format("%s-", _prefix) : ""; + private void doRegisterLoadBalancerStrategy(String serviceName, String scheme, LoadBalancerStrategy strategy) + { + String jmxName = getLoadBalancerStrategyJmxName(serviceName, scheme); + _jmxManager.registerLoadBalancerStrategy(jmxName, strategy); + } + + private void doRegisterClusterInfo(String clusterName, ClusterInfoItem clusterInfoItem) + { + String jmxName = getClusterInfoJmxName(clusterName); + _jmxManager.registerClusterInfo(jmxName, clusterInfoItem); + } + + private void doRegisterServiceProperties(String serviceName, LoadBalancerStateItem serviceProperties) + { + _jmxManager.registerServiceProperties(getServicePropertiesJmxName(serviceName), serviceProperties); + } + + private String getWatcherNameForLoadBalancerStrategy(String serviceName, String scheme) + { + return String.format("%s-%s-LoadBalancerStrategy", serviceName, scheme); + } + + private String getWatcherNameForClusterInfoItem(String clusterName) { + return String.format("%s-ClusterInfoItem", clusterName); + } + + private String getWatcherNameForServiceProperties(String serviceName) { + return String.format("%s-LoadBalancerStateItem-ServiceProperties", serviceName); } private String getClusterInfoJmxName(String clusterName) { - return String.format("%s%s-ClusterInfo", getPrefixForLBJmxNames(), clusterName); + return String.format("%s%s-ClusterInfo", getClusterPrefixForLBPropertyJmxNames(clusterName), clusterName); } private String getServicePropertiesJmxName(String serviceName) { - return String.format("%s%s-ServiceProperties", getPrefixForLBJmxNames(), serviceName); + return String.format("%s%s-ServiceProperties", getServicePrefixForLBPropertyJmxNames(serviceName), serviceName); } private String getLoadBalancerStrategyJmxName(String serviceName, String scheme) { - return String.format("%s%s-%s-LoadBalancerStrategy", getPrefixForLBJmxNames(), serviceName, scheme); + return String.format("%s%s-%s-LoadBalancerStrategy", getServicePrefixForLBPropertyJmxNames(serviceName), serviceName, scheme); } }); } public void setZkUriRegistry(ZooKeeperEphemeralStore uriRegistry) { - _jmxManager.registerZooKeeperEphemeralStore(_prefix + "-ZooKeeperUriRegistry", uriRegistry); + if (_discoverySourceType != DiscoverySourceType.ZK) + { + _log.warn("Setting ZkUriRegistry for Non-ZK source type: {}", _discoverySourceType); + } + final String jmxName = String.format("%s-ZooKeeperUriRegistry", getGlobalPrefix()); + _jmxManager.registerZooKeeperEphemeralStore(jmxName, uriRegistry); } public void setZkClusterRegistry(ZooKeeperPermanentStore clusterRegistry) { - _jmxManager.registerZooKeeperPermanentStore(_prefix + "-ZooKeeperClusterRegistry", clusterRegistry); + if (_discoverySourceType != DiscoverySourceType.ZK) + { + _log.warn("Setting ZkClusterRegistry for Non-ZK source type: {}", _discoverySourceType); + } + final String jmxName = String.format("%s-ZooKeeperClusterRegistry", getGlobalPrefix()); + _jmxManager.registerZooKeeperPermanentStore(jmxName, clusterRegistry); } public void setZkServiceRegistry(ZooKeeperPermanentStore serviceRegistry) { - _jmxManager.registerZooKeeperPermanentStore(_prefix + "-ZooKeeperServiceRegistry", serviceRegistry); + if (_discoverySourceType != DiscoverySourceType.ZK) + { + _log.warn("Setting ZkServiceRegistry for Non-ZK source type: {}", _discoverySourceType); + } + final String jmxName = String.format("%s-ZooKeeperServiceRegistry", getGlobalPrefix()); + _jmxManager.registerZooKeeperPermanentStore(jmxName, serviceRegistry); } public void setFsUriStore(FileStore uriStore) { - _jmxManager.registerFileStore(_prefix + "-FileStoreUriStore", uriStore); + addDualReadModeWatcherForFileStore("UriProperties", uriStore, this::doRegisterUriFileStore); + doRegisterUriFileStore(uriStore); } public void setFsClusterStore(FileStore clusterStore) { - _jmxManager.registerFileStore(_prefix + "-FileStoreClusterStore", clusterStore); + addDualReadModeWatcherForFileStore("ClusterProperties", clusterStore, this::doRegisterClusterFileStore); + doRegisterClusterFileStore(clusterStore); } public void setFsServiceStore(FileStore serviceStore) { - _jmxManager.registerFileStore(_prefix + "-FileStoreServiceStore", serviceStore); + addDualReadModeWatcherForFileStore("ServiceProperties", serviceStore, this::doRegisterServiceFileStore); + doRegisterServiceFileStore(serviceStore); } public void registerDualReadLoadBalancerJmx(DualReadLoadBalancerJmx dualReadLoadBalancerJmx) { - _jmxManager.registerDualReadLoadBalancerJmxBean(_prefix + "-DualReadLoadBalancerJmx", dualReadLoadBalancerJmx); + if (_discoverySourceType != DiscoverySourceType.XDS) + { + _log.warn("Setting DualReadLoadBalancerJmx for Non-XDS source type: {}", _discoverySourceType); + } + final String jmxName = String.format("%s-DualReadLoadBalancerJmx", getGlobalPrefix()); + _jmxManager.registerDualReadLoadBalancerJmxBean(jmxName, dualReadLoadBalancerJmx); + } + + private void doRegisterLoadBalancer(SimpleLoadBalancer balancer) + { + final String jmxName = String.format("%s-LoadBalancer", getGlobalPrefix()); + _jmxManager.registerLoadBalancer(jmxName, balancer); + } + + private void doRegisterLoadBalancerState(SimpleLoadBalancerState state) + { + final String jmxName = String.format("%s-LoadBalancerState", getGlobalPrefix()); + _jmxManager.registerLoadBalancerState(jmxName, state); + } + + private void doRegisterUriFileStore(FileStore uriStore) + { + final String jmxName = String.format("%s-FileStoreUriStore", getGlobalPrefix()); + _jmxManager.registerFileStore(jmxName, uriStore); + } + + private void doRegisterClusterFileStore(FileStore clusterStore) + { + final String jmxName = String.format("%s-FileStoreClusterStore", getGlobalPrefix()); + _jmxManager.registerFileStore(jmxName, clusterStore); + } + + private void doRegisterServiceFileStore(FileStore serviceStore) + { + final String jmxName = String.format("%s-FileStoreServiceStore", getGlobalPrefix()); + _jmxManager.registerFileStore(jmxName, serviceStore); + } + + @SuppressWarnings("unchecked") + private void addDualReadModeWatcherForFileStore(String watcherNameSuffix, FileStore store, Consumer> watcherCallback) + { + if (_dualReadStateManager != null) + { + String watcherName = String.format("%s-%s", store.getClass().getSimpleName(), watcherNameSuffix); + D2ClientJmxDualReadModeWatcher> currentWatcher = _dualReadStateWatchers.computeIfAbsent(watcherName, k -> + { + D2ClientJmxDualReadModeWatcher> watcher = new D2ClientJmxDualReadModeWatcher<>(store, watcherCallback); + _dualReadStateManager.addGlobalWatcher(watcher); + return watcher; + }); + currentWatcher.setLatestJmxProperty(store); + } + } + + private String getGlobalPrefix() + { + return isGlobalPrimarySource() ? _primaryGlobalPrefix : _secondaryGlobalPrefix; + } + + private String getServicePrefixForLBPropertyJmxNames(String serviceName) + { + return isServicePrimarySource(serviceName) ? _primaryPrefixForLbPropertyJmxName : _secondaryPrefixForLbPropertyJmxName; + } + + private String getClusterPrefixForLBPropertyJmxNames(String clusterName) + { + return isClusterPrimarySource(clusterName) ? _primaryPrefixForLbPropertyJmxName : _secondaryPrefixForLbPropertyJmxName; + } + + private boolean isGlobalPrimarySource() + { + if (_dualReadStateManager == null) + { + return true; // only one source, it is the primary. + } + return isPrimarySourceHelper(_dualReadStateManager.getGlobalDualReadMode()); + } + + private boolean isServicePrimarySource(String serviceName) + { + if (_dualReadStateManager == null) + { + return true; // only one source, it is the primary. + } + return isPrimarySourceHelper(_dualReadStateManager.getServiceDualReadMode(serviceName)); + } + + private boolean isClusterPrimarySource(String clusterName) + { + if (_dualReadStateManager == null) + { + return true; // only one source, it is the primary. + } + return isPrimarySourceHelper(_dualReadStateManager.getClusterDualReadMode(clusterName)); + } + + private boolean isPrimarySourceHelper(DualReadModeProvider.DualReadMode dualReadMode) + { + switch (dualReadMode) + { + case NEW_LB_ONLY: + return _discoverySourceType == DiscoverySourceType.XDS; + case DUAL_READ: + case OLD_LB_ONLY: + return _discoverySourceType == DiscoverySourceType.ZK; + default: + _log.warn("Unknown dual read mode {}, falling back to ZK as primary source.", dualReadMode); + return _discoverySourceType == DiscoverySourceType.ZK; + } + } + + static final class D2ClientJmxDualReadModeWatcher implements DualReadStateManager.DualReadModeWatcher + { + private T _latestJmxProperty; + private final Consumer _callback; + + D2ClientJmxDualReadModeWatcher(T initialJmxProperty, Consumer callback) + { + _latestJmxProperty = initialJmxProperty; + _callback = callback; + } + + @VisibleForTesting + T getLatestJmxProperty() + { + return _latestJmxProperty; + } + + public void setLatestJmxProperty(T latestJmxProperty) + { + _latestJmxProperty = latestJmxProperty; + } + + @Override + public void onChanged() + { + _callback.accept(_latestJmxProperty); + } } } diff --git a/d2/src/main/java/com/linkedin/d2/jmx/JmxManager.java b/d2/src/main/java/com/linkedin/d2/jmx/JmxManager.java index ce4f9a52d9..d6ea394b99 100644 --- a/d2/src/main/java/com/linkedin/d2/jmx/JmxManager.java +++ b/d2/src/main/java/com/linkedin/d2/jmx/JmxManager.java @@ -246,8 +246,10 @@ public void unregister(ObjectName oName) { _server.unregisterMBean(oName); } - _registeredNames.remove(oName); - _log.info("Unregistered MBean {}", oName); + if (_registeredNames.remove(oName)) + { + _log.info("Unregistered MBean {}", oName); + } } catch (Exception e) { diff --git a/d2/src/main/java/com/linkedin/d2/xds/balancer/XdsLoadBalancerWithFacilitiesFactory.java b/d2/src/main/java/com/linkedin/d2/xds/balancer/XdsLoadBalancerWithFacilitiesFactory.java index da42df1be3..f5834b73c6 100644 --- a/d2/src/main/java/com/linkedin/d2/xds/balancer/XdsLoadBalancerWithFacilitiesFactory.java +++ b/d2/src/main/java/com/linkedin/d2/xds/balancer/XdsLoadBalancerWithFacilitiesFactory.java @@ -40,8 +40,8 @@ public class XdsLoadBalancerWithFacilitiesFactory implements LoadBalancerWithFac @Override public LoadBalancerWithFacilities create(D2ClientConfig config) { - D2ClientJmxManager d2ClientJmxManager = new D2ClientJmxManager(config.d2JmxManagerPrefix + "-xDS", - config.jmxManager); + D2ClientJmxManager d2ClientJmxManager = new D2ClientJmxManager(config.d2JmxManagerPrefix, config.jmxManager, + D2ClientJmxManager.DiscoverySourceType.XDS, config.dualReadStateManager); if (config.dualReadStateManager != null) { diff --git a/d2/src/test/java/com/linkedin/d2/jmx/D2ClientJmxManagerTest.java b/d2/src/test/java/com/linkedin/d2/jmx/D2ClientJmxManagerTest.java index fa3c8a59c6..5cbfc105bd 100644 --- a/d2/src/test/java/com/linkedin/d2/jmx/D2ClientJmxManagerTest.java +++ b/d2/src/test/java/com/linkedin/d2/jmx/D2ClientJmxManagerTest.java @@ -17,14 +17,21 @@ package com.linkedin.d2.jmx; import com.linkedin.d2.balancer.LoadBalancerStateItem; +import com.linkedin.d2.balancer.dualread.DualReadModeProvider; +import com.linkedin.d2.balancer.dualread.DualReadStateManager; import com.linkedin.d2.balancer.properties.ClusterProperties; import com.linkedin.d2.balancer.properties.ServiceProperties; +import com.linkedin.d2.balancer.properties.UriProperties; import com.linkedin.d2.balancer.simple.ClusterInfoItem; +import com.linkedin.d2.balancer.simple.SimpleLoadBalancer; import com.linkedin.d2.balancer.simple.SimpleLoadBalancerState; +import com.linkedin.d2.balancer.strategies.relative.RelativeLoadBalancerStrategy; import com.linkedin.d2.balancer.util.canary.CanaryDistributionProvider; import com.linkedin.d2.balancer.util.partitions.PartitionAccessor; +import com.linkedin.d2.discovery.stores.file.FileStore; import java.net.URI; import java.util.Collections; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.atomic.AtomicLong; import org.mockito.Captor; import org.mockito.ArgumentCaptor; @@ -32,150 +39,418 @@ import org.mockito.Mockito; import org.mockito.MockitoAnnotations; import org.testng.Assert; -import org.testng.annotations.BeforeMethod; +import org.testng.annotations.DataProvider; import org.testng.annotations.Test; import static org.mockito.Mockito.*; public class D2ClientJmxManagerTest { - @Mock - SimpleLoadBalancerState _simpleLoadBalancerState; - @Mock - JmxManager _jmxManager; - @Captor - ArgumentCaptor _simpleLoadBalancerStateNameCaptor; - @Captor - ArgumentCaptor _simpleLoadBalancerStateCaptor; - @Captor - ArgumentCaptor _simpleLoadBalancerStateListenerCaptor; - @Captor - ArgumentCaptor _unregisteredObjectNameCaptor; - @Captor - ArgumentCaptor _registerObjectNameCaptor; - @Captor - ArgumentCaptor _clusterInfoArgumentCaptor; - @Captor - ArgumentCaptor> _servicePropertiesArgumentCaptor; - - D2ClientJmxManager _d2ClientJmxManager; - private ClusterInfoItem _clusterInfoItem; - private ClusterInfoItem _noPropertyClusterInfoItem; - private LoadBalancerStateItem _servicePropertiesLBState; - private LoadBalancerStateItem _noPropertyLBStateItem; - - @BeforeMethod - public void setUp() + + private static final LoadBalancerStateItem SERVICE_PROPERTIES_LOAD_BALANCER_STATE_ITEM = new LoadBalancerStateItem<>( + new ServiceProperties("S_Foo", "Bar", "/", Collections.singletonList("Random")), + 0, + 0, + CanaryDistributionProvider.Distribution.CANARY + ); + + private static final LoadBalancerStateItem NO_PROPERTY_LB_STATE_ITEM = new LoadBalancerStateItem<>( + null, 0, 0, CanaryDistributionProvider.Distribution.STABLE); + + private static final LoadBalancerStateItem UPDATED_SERVICE_PROPERTIES_LB_STATE_ITEM = new LoadBalancerStateItem<>( + new ServiceProperties("S_Foo", "Bar", "/", Collections.singletonList("Random")), + 0, + 0, + CanaryDistributionProvider.Distribution.STABLE + ); + + private static final class D2ClientJmxManagerFixture { - MockitoAnnotations.initMocks(this); - AtomicLong version = new AtomicLong(0); - when(_simpleLoadBalancerState.getVersionAccess()).thenReturn(version); - _clusterInfoItem = - new ClusterInfoItem(_simpleLoadBalancerState, new ClusterProperties("C_Foo"), new PartitionAccessor() { - @Override - public int getMaxPartitionId() { - return 0; - } - - @Override - public int getPartitionId(URI uri) { - return 0; - } - }, CanaryDistributionProvider.Distribution.CANARY); - _noPropertyClusterInfoItem = new ClusterInfoItem(_simpleLoadBalancerState, null, null, - CanaryDistributionProvider.Distribution.STABLE); - _servicePropertiesLBState = new LoadBalancerStateItem<>( - new ServiceProperties("S_Foo", "Bar", "/", Collections.singletonList("Random")), - 0, - 0, - CanaryDistributionProvider.Distribution.CANARY - ); - _noPropertyLBStateItem = new LoadBalancerStateItem(null, 0, 0, - CanaryDistributionProvider.Distribution.STABLE); - _d2ClientJmxManager = new D2ClientJmxManager("Foo", _jmxManager); - Mockito.doReturn(_jmxManager).when(_jmxManager).unregister(_unregisteredObjectNameCaptor.capture()); - Mockito.doReturn(_jmxManager).when(_jmxManager).registerLoadBalancerState( - _simpleLoadBalancerStateNameCaptor.capture(), _simpleLoadBalancerStateCaptor.capture()); - Mockito.doReturn(_jmxManager).when(_jmxManager).registerClusterInfo( - _registerObjectNameCaptor.capture(), - _clusterInfoArgumentCaptor.capture()); - Mockito.doReturn(_jmxManager).when(_jmxManager).registerServiceProperties( - _registerObjectNameCaptor.capture(), - _servicePropertiesArgumentCaptor.capture()); - Mockito.doNothing().when(_simpleLoadBalancerState).register(_simpleLoadBalancerStateListenerCaptor.capture()); + @Mock + SimpleLoadBalancer _loadBalancer; + @Mock + SimpleLoadBalancerState _simpleLoadBalancerState; + @Mock + JmxManager _jmxManager; + @Mock + FileStore _uriStore; + @Mock + FileStore _clusterStore; + @Mock + FileStore _serviceStore; + @Mock + RelativeLoadBalancerStrategy _relativeLoadBalancerStrategy; + @Mock + DualReadModeProvider _dualReadModeProvider; + @Mock + ScheduledExecutorService _executorService; + @Captor + ArgumentCaptor _simpleLoadBalancerStateNameCaptor; + @Captor + ArgumentCaptor _simpleLoadBalancerStateCaptor; + @Captor + ArgumentCaptor _simpleLoadBalancerStateListenerCaptor; + @Captor + ArgumentCaptor _unregisteredObjectNameCaptor; + @Captor + ArgumentCaptor _registerObjectNameCaptor; + @Captor + ArgumentCaptor _clusterInfoArgumentCaptor; + @Captor + ArgumentCaptor> _servicePropertiesArgumentCaptor; + @SuppressWarnings("rawtypes") + @Captor + ArgumentCaptor _addWatcherCaptor; + + D2ClientJmxManager _d2ClientJmxManager; + private final ClusterInfoItem _clusterInfoItem; + private final ClusterInfoItem _updatedClusterInfoItem; + private final ClusterInfoItem _noPropertyClusterInfoItem; + private final DualReadStateManager _dualReadStateManager; + + D2ClientJmxManagerFixture() + { + MockitoAnnotations.initMocks(this); + AtomicLong version = new AtomicLong(0); + when(_simpleLoadBalancerState.getVersionAccess()).thenReturn(version); + PartitionAccessor partitionAccessor = new PartitionAccessor() { + @Override + public int getMaxPartitionId() { + return 0; + } + + @Override + public int getPartitionId(URI uri) { + return 0; + } + }; + _clusterInfoItem = + new ClusterInfoItem(_simpleLoadBalancerState, new ClusterProperties("C_Foo"), partitionAccessor, + CanaryDistributionProvider.Distribution.CANARY); + _updatedClusterInfoItem = + new ClusterInfoItem(_simpleLoadBalancerState, new ClusterProperties("C_Foo"), partitionAccessor, + CanaryDistributionProvider.Distribution.STABLE); + _noPropertyClusterInfoItem = new ClusterInfoItem(_simpleLoadBalancerState, null, null, + CanaryDistributionProvider.Distribution.STABLE); + Mockito.doReturn(_jmxManager).when(_jmxManager).unregister(_unregisteredObjectNameCaptor.capture()); + Mockito.doReturn(_jmxManager).when(_jmxManager).registerLoadBalancerState( + _simpleLoadBalancerStateNameCaptor.capture(), _simpleLoadBalancerStateCaptor.capture()); + Mockito.doReturn(_jmxManager).when(_jmxManager).registerClusterInfo( + _registerObjectNameCaptor.capture(), + _clusterInfoArgumentCaptor.capture()); + Mockito.doReturn(_jmxManager).when(_jmxManager).registerServiceProperties( + _registerObjectNameCaptor.capture(), + _servicePropertiesArgumentCaptor.capture()); + Mockito.doNothing().when(_simpleLoadBalancerState).register(_simpleLoadBalancerStateListenerCaptor.capture()); + + _dualReadStateManager = spy(new DualReadStateManager(_dualReadModeProvider, _executorService)); + + doCallRealMethod().when(_dualReadStateManager).addGlobalWatcher(any()); + doCallRealMethod().when(_dualReadStateManager).addServiceWatcher(any(), any()); + doCallRealMethod().when(_dualReadStateManager).addClusterWatcher(any(), any()); + doCallRealMethod().when(_dualReadStateManager).updateGlobal(any()); + doCallRealMethod().when(_dualReadStateManager).updateService(any(), any()); + doCallRealMethod().when(_dualReadStateManager).updateCluster(any(), any()); + } + + D2ClientJmxManager getD2ClientJmxManager(String prefix, D2ClientJmxManager.DiscoverySourceType sourceType, Boolean isDualReadLB) + { + if (sourceType == null) + { // default to ZK source type, null dualReadStateManager + _d2ClientJmxManager = new D2ClientJmxManager(prefix, _jmxManager); + } + else + { + _d2ClientJmxManager = new D2ClientJmxManager(prefix, _jmxManager, sourceType, isDualReadLB ? _dualReadStateManager : null); + } + return _d2ClientJmxManager; + } + } + + @DataProvider(name = "nonDualReadD2ClientJmxManagers") + public Object[][] nonDualReadD2ClientJmxManagers() + { + return new Object[][] + { + {"Foo", null, false}, + {"Foo", D2ClientJmxManager.DiscoverySourceType.ZK, false}, + {"Foo", D2ClientJmxManager.DiscoverySourceType.XDS, false} + }; } - @Test() - public void testSetSimpleLBStateListenerUpdateServiceProperties() + @Test(dataProvider = "nonDualReadD2ClientJmxManagers") + public void testSetSimpleLBStateListenerUpdateServiceProperties(String prefix, D2ClientJmxManager.DiscoverySourceType sourceType, + Boolean isDualReadLB) { - _d2ClientJmxManager.setSimpleLoadBalancerState(_simpleLoadBalancerState); - _simpleLoadBalancerStateListenerCaptor.getValue().onServicePropertiesUpdate(null); - Mockito.verify(_jmxManager, never()).registerServiceProperties(any(), any()); - _simpleLoadBalancerStateListenerCaptor.getValue().onServicePropertiesUpdate(_noPropertyLBStateItem); - Mockito.verify(_jmxManager, never()).registerServiceProperties(any(), any()); + D2ClientJmxManagerFixture fixture = new D2ClientJmxManagerFixture(); + D2ClientJmxManager d2ClientJmxManager = fixture.getD2ClientJmxManager(prefix, sourceType, isDualReadLB); - _simpleLoadBalancerStateListenerCaptor.getValue().onServicePropertiesUpdate(_servicePropertiesLBState); + d2ClientJmxManager.setSimpleLoadBalancerState(fixture._simpleLoadBalancerState); + fixture._simpleLoadBalancerStateListenerCaptor.getValue().onServicePropertiesUpdate(null); + Mockito.verify(fixture._jmxManager, never()).registerServiceProperties(any(), any()); + fixture._simpleLoadBalancerStateListenerCaptor.getValue().onServicePropertiesUpdate(NO_PROPERTY_LB_STATE_ITEM); + Mockito.verify(fixture._jmxManager, never()).registerServiceProperties(any(), any()); + + fixture._simpleLoadBalancerStateListenerCaptor.getValue().onServicePropertiesUpdate( + SERVICE_PROPERTIES_LOAD_BALANCER_STATE_ITEM); Assert.assertEquals( - _registerObjectNameCaptor.getValue(), + fixture._registerObjectNameCaptor.getValue(), "S_Foo-ServiceProperties" ); Assert.assertEquals( - _servicePropertiesArgumentCaptor.getValue(), - _servicePropertiesLBState + fixture._servicePropertiesArgumentCaptor.getValue(), SERVICE_PROPERTIES_LOAD_BALANCER_STATE_ITEM ); } - @Test - public void testSetSimpleLBStateListenerUpdateClusterInfo() + @Test(dataProvider = "nonDualReadD2ClientJmxManagers") + public void testSetSimpleLBStateListenerUpdateClusterInfo(String prefix, D2ClientJmxManager.DiscoverySourceType sourceType, + Boolean isDualReadLB) { - _d2ClientJmxManager.setSimpleLoadBalancerState(_simpleLoadBalancerState); - _simpleLoadBalancerStateListenerCaptor.getValue().onClusterInfoUpdate(null); - Mockito.verify(_jmxManager, never()).registerClusterInfo(any(), any()); - _simpleLoadBalancerStateListenerCaptor.getValue().onClusterInfoUpdate(_noPropertyClusterInfoItem); - Mockito.verify(_jmxManager, never()).registerClusterInfo(any(), any()); + D2ClientJmxManagerFixture fixture = new D2ClientJmxManagerFixture(); + D2ClientJmxManager d2ClientJmxManager = fixture.getD2ClientJmxManager(prefix, sourceType, isDualReadLB); - _simpleLoadBalancerStateListenerCaptor.getValue().onClusterInfoUpdate(_clusterInfoItem); + d2ClientJmxManager.setSimpleLoadBalancerState(fixture._simpleLoadBalancerState); + fixture._simpleLoadBalancerStateListenerCaptor.getValue().onClusterInfoUpdate(null); + Mockito.verify(fixture._jmxManager, never()).registerClusterInfo(any(), any()); + fixture._simpleLoadBalancerStateListenerCaptor.getValue().onClusterInfoUpdate(fixture._noPropertyClusterInfoItem); + Mockito.verify(fixture._jmxManager, never()).registerClusterInfo(any(), any()); + + fixture._simpleLoadBalancerStateListenerCaptor.getValue().onClusterInfoUpdate(fixture._clusterInfoItem); Assert.assertEquals( - _registerObjectNameCaptor.getValue(), + fixture._registerObjectNameCaptor.getValue(), "C_Foo-ClusterInfo" ); Assert.assertEquals( - _clusterInfoArgumentCaptor.getValue(), - _clusterInfoItem + fixture._clusterInfoArgumentCaptor.getValue(), + fixture._clusterInfoItem ); } - @Test - public void testSetSimpleLBStateListenerRemoveClusterInfo() + @Test(dataProvider = "nonDualReadD2ClientJmxManagers") + public void testSetSimpleLBStateListenerRemoveClusterInfo(String prefix, D2ClientJmxManager.DiscoverySourceType sourceType, + Boolean isDualReadLB) { - _d2ClientJmxManager.setSimpleLoadBalancerState(_simpleLoadBalancerState); - Assert.assertEquals(_simpleLoadBalancerStateNameCaptor.getValue(), "Foo-LoadBalancerState"); - Assert.assertEquals(_simpleLoadBalancerStateCaptor.getValue(), _simpleLoadBalancerState); - _simpleLoadBalancerStateListenerCaptor.getValue().onClusterInfoRemoval(null); - Mockito.verify(_jmxManager, never()).unregister(anyString()); - _simpleLoadBalancerStateListenerCaptor.getValue().onClusterInfoRemoval(_noPropertyClusterInfoItem); - Mockito.verify(_jmxManager, never()).unregister(anyString()); - - _simpleLoadBalancerStateListenerCaptor.getValue().onClusterInfoRemoval(_clusterInfoItem); + D2ClientJmxManagerFixture fixture = new D2ClientJmxManagerFixture(); + D2ClientJmxManager d2ClientJmxManager = fixture.getD2ClientJmxManager(prefix, sourceType, isDualReadLB); + + d2ClientJmxManager.setSimpleLoadBalancerState(fixture._simpleLoadBalancerState); + Assert.assertEquals(fixture._simpleLoadBalancerStateNameCaptor.getValue(), "Foo-LoadBalancerState"); + Assert.assertEquals(fixture._simpleLoadBalancerStateCaptor.getValue(), fixture._simpleLoadBalancerState); + fixture._simpleLoadBalancerStateListenerCaptor.getValue().onClusterInfoRemoval(null); + Mockito.verify(fixture._jmxManager, never()).unregister(anyString()); + fixture._simpleLoadBalancerStateListenerCaptor.getValue().onClusterInfoRemoval(fixture._noPropertyClusterInfoItem); + Mockito.verify(fixture._jmxManager, never()).unregister(anyString()); + + fixture._simpleLoadBalancerStateListenerCaptor.getValue().onClusterInfoRemoval(fixture._clusterInfoItem); Assert.assertEquals( - _unregisteredObjectNameCaptor.getValue(), - _clusterInfoItem.getClusterPropertiesItem().getProperty().getClusterName() + "-ClusterInfo"); + fixture._unregisteredObjectNameCaptor.getValue(), + fixture._clusterInfoItem.getClusterPropertiesItem().getProperty().getClusterName() + "-ClusterInfo"); } - @Test - public void testSetSimpleLBStateListenerRemoveServiceProperties() + @Test(dataProvider = "nonDualReadD2ClientJmxManagers") + public void testSetSimpleLBStateListenerRemoveServiceProperties(String prefix, D2ClientJmxManager.DiscoverySourceType sourceType, + Boolean isDualReadLB) { - _d2ClientJmxManager.setSimpleLoadBalancerState(_simpleLoadBalancerState); - Assert.assertEquals(_simpleLoadBalancerStateNameCaptor.getValue(), "Foo-LoadBalancerState"); - Assert.assertEquals(_simpleLoadBalancerStateCaptor.getValue(), _simpleLoadBalancerState); - _simpleLoadBalancerStateListenerCaptor.getValue().onServicePropertiesRemoval(null); - Mockito.verify(_jmxManager, never()).unregister(anyString()); - _simpleLoadBalancerStateListenerCaptor.getValue().onServicePropertiesRemoval(_noPropertyLBStateItem); - Mockito.verify(_jmxManager, never()).unregister(anyString()); - - _simpleLoadBalancerStateListenerCaptor.getValue().onServicePropertiesRemoval(_servicePropertiesLBState); + D2ClientJmxManagerFixture fixture = new D2ClientJmxManagerFixture(); + D2ClientJmxManager d2ClientJmxManager = fixture.getD2ClientJmxManager(prefix, sourceType, isDualReadLB); + + d2ClientJmxManager.setSimpleLoadBalancerState(fixture._simpleLoadBalancerState); + Assert.assertEquals(fixture._simpleLoadBalancerStateNameCaptor.getValue(), "Foo-LoadBalancerState"); + Assert.assertEquals(fixture._simpleLoadBalancerStateCaptor.getValue(), fixture._simpleLoadBalancerState); + fixture._simpleLoadBalancerStateListenerCaptor.getValue().onServicePropertiesRemoval(null); + Mockito.verify(fixture._jmxManager, never()).unregister(anyString()); + fixture._simpleLoadBalancerStateListenerCaptor.getValue().onServicePropertiesRemoval(NO_PROPERTY_LB_STATE_ITEM); + Mockito.verify(fixture._jmxManager, never()).unregister(anyString()); + + fixture._simpleLoadBalancerStateListenerCaptor.getValue().onServicePropertiesRemoval( + SERVICE_PROPERTIES_LOAD_BALANCER_STATE_ITEM); Assert.assertEquals( - _unregisteredObjectNameCaptor.getValue(), - _servicePropertiesLBState.getProperty().getServiceName() + "-ServiceProperties"); + fixture._unregisteredObjectNameCaptor.getValue(), + SERVICE_PROPERTIES_LOAD_BALANCER_STATE_ITEM.getProperty().getServiceName() + "-ServiceProperties"); + } + + @SuppressWarnings({"unchecked", "rawtypes"}) + @Test + public void testAddAndRemoveWatcherAtServicePropertiesUpdate() + { + D2ClientJmxManagerFixture fixture = new D2ClientJmxManagerFixture(); + D2ClientJmxManager d2ClientJmxManager = fixture.getD2ClientJmxManager("Foo", D2ClientJmxManager.DiscoverySourceType.XDS, true); + // Initial dual read mode is ZK only. + DualReadStateManager dualReadStateManager = fixture._dualReadStateManager; + Mockito.doReturn(DualReadModeProvider.DualReadMode.OLD_LB_ONLY).when(dualReadStateManager).getGlobalDualReadMode(); + Mockito.doReturn(DualReadModeProvider.DualReadMode.OLD_LB_ONLY).when(dualReadStateManager).getServiceDualReadMode(any()); + Mockito.doReturn(DualReadModeProvider.DualReadMode.OLD_LB_ONLY).when(dualReadStateManager).getClusterDualReadMode(any()); + + d2ClientJmxManager.setSimpleLoadBalancerState(fixture._simpleLoadBalancerState); + SimpleLoadBalancerState.SimpleLoadBalancerStateListener lbStateListener = fixture._simpleLoadBalancerStateListenerCaptor.getValue(); + ArgumentCaptor addWatcherCaptor = fixture._addWatcherCaptor; + + lbStateListener.onServicePropertiesUpdate(SERVICE_PROPERTIES_LOAD_BALANCER_STATE_ITEM); + // Verify watcher is added with properties inside + verify(dualReadStateManager).addServiceWatcher(eq("S_Foo"), addWatcherCaptor.capture()); + D2ClientJmxManager.D2ClientJmxDualReadModeWatcher> watcher = addWatcherCaptor.getValue(); + Assert.assertEquals(watcher.getLatestJmxProperty(), SERVICE_PROPERTIES_LOAD_BALANCER_STATE_ITEM); + + lbStateListener.onServicePropertiesUpdate(UPDATED_SERVICE_PROPERTIES_LB_STATE_ITEM); + // Verify watcher is not added again, and properties in the watcher is updated + verify(dualReadStateManager, times(1)).addServiceWatcher(any(), any()); + Assert.assertEquals(watcher.getLatestJmxProperty(), UPDATED_SERVICE_PROPERTIES_LB_STATE_ITEM); + + // Verify watch is removed + lbStateListener.onServicePropertiesRemoval(UPDATED_SERVICE_PROPERTIES_LB_STATE_ITEM); + verify(dualReadStateManager).removeServiceWatcher(eq("S_Foo"), eq(watcher)); + } + + @SuppressWarnings({"unchecked", "rawtypes"}) + @Test + public void testAddAndRemoveWatcherAtClusterInfoItemUpdate() + { + D2ClientJmxManagerFixture fixture = new D2ClientJmxManagerFixture(); + D2ClientJmxManager d2ClientJmxManager = fixture.getD2ClientJmxManager("Foo", D2ClientJmxManager.DiscoverySourceType.XDS, true); + d2ClientJmxManager.setSimpleLoadBalancerState(fixture._simpleLoadBalancerState); + // Initial dual read mode is ZK only. + DualReadStateManager dualReadStateManager = fixture._dualReadStateManager; + Mockito.doReturn(DualReadModeProvider.DualReadMode.OLD_LB_ONLY).when(dualReadStateManager).getGlobalDualReadMode(); + Mockito.doReturn(DualReadModeProvider.DualReadMode.OLD_LB_ONLY).when(dualReadStateManager).getServiceDualReadMode(any()); + Mockito.doReturn(DualReadModeProvider.DualReadMode.OLD_LB_ONLY).when(dualReadStateManager).getClusterDualReadMode(any()); + + SimpleLoadBalancerState.SimpleLoadBalancerStateListener lbStateListener = fixture._simpleLoadBalancerStateListenerCaptor.getValue(); + ArgumentCaptor addWatcherCaptor = fixture._addWatcherCaptor; + + lbStateListener.onClusterInfoUpdate(fixture._clusterInfoItem); + + // Verify watcher is added with properties inside + verify(dualReadStateManager).addClusterWatcher(eq("C_Foo"), addWatcherCaptor.capture()); + D2ClientJmxManager.D2ClientJmxDualReadModeWatcher watcher = addWatcherCaptor.getValue(); + Assert.assertEquals(watcher.getLatestJmxProperty(), fixture._clusterInfoItem); + + lbStateListener.onClusterInfoUpdate(fixture._updatedClusterInfoItem); + // Verify watcher is not added again, and properties in the watcher is updated + verify(dualReadStateManager, times(1)).addClusterWatcher(any(), any()); + Assert.assertEquals(watcher.getLatestJmxProperty(), fixture._updatedClusterInfoItem); + + // Verify watch is removed + lbStateListener.onClusterInfoRemoval(fixture._updatedClusterInfoItem); + verify(dualReadStateManager).removeClusterWatcher(eq("C_Foo"), eq(watcher)); + } + + @DataProvider(name = "sourceTypeAndDualReadModeForLixSwitch") + public Object[][] sourceTypeAndDualReadModeForDualReadModeSwitch() + { + return new Object[][] + { + // ZK source is still primary switching OLD_LB_ONLY -> DUAL_READ + {D2ClientJmxManager.DiscoverySourceType.ZK, DualReadModeProvider.DualReadMode.OLD_LB_ONLY, + DualReadModeProvider.DualReadMode.DUAL_READ, true, true}, + // XDS source is still secondary switching OLD_LB_ONLY -> DUAL_READ + {D2ClientJmxManager.DiscoverySourceType.XDS, DualReadModeProvider.DualReadMode.OLD_LB_ONLY, + DualReadModeProvider.DualReadMode.DUAL_READ, false, false}, + // ZK source becomes secondary switching DUAL_READ -> NEW_LB_ONLY + {D2ClientJmxManager.DiscoverySourceType.ZK, DualReadModeProvider.DualReadMode.DUAL_READ, + DualReadModeProvider.DualReadMode.NEW_LB_ONLY, true, false}, + // XDS source becomes primary switching DUAL_READ -> NEW_LB_ONLY + {D2ClientJmxManager.DiscoverySourceType.XDS, DualReadModeProvider.DualReadMode.DUAL_READ, + DualReadModeProvider.DualReadMode.NEW_LB_ONLY, false, true}, + // ZK source becomes primary switching NEW_LB_ONLY -> DUAL_READ + {D2ClientJmxManager.DiscoverySourceType.ZK, DualReadModeProvider.DualReadMode.NEW_LB_ONLY, + DualReadModeProvider.DualReadMode.DUAL_READ, false, true}, + // XDS source becomes secondary switching NEW_LB_ONLY -> DUAL_READ + {D2ClientJmxManager.DiscoverySourceType.XDS, DualReadModeProvider.DualReadMode.NEW_LB_ONLY, + DualReadModeProvider.DualReadMode.DUAL_READ, true, false}, + // ZK source is still primary switching DUAL_READ -> OLD_LB_ONLY + {D2ClientJmxManager.DiscoverySourceType.ZK, DualReadModeProvider.DualReadMode.DUAL_READ, + DualReadModeProvider.DualReadMode.OLD_LB_ONLY, true, true}, + // XDS source is still secondary switching DUAL_READ -> OLD_LB_ONLY + {D2ClientJmxManager.DiscoverySourceType.XDS, DualReadModeProvider.DualReadMode.DUAL_READ, + DualReadModeProvider.DualReadMode.OLD_LB_ONLY, false, false}, + // ZK source is still primary switching NEW_LB_ONLY -> OLD_LB_ONLY + {D2ClientJmxManager.DiscoverySourceType.ZK, DualReadModeProvider.DualReadMode.NEW_LB_ONLY, + DualReadModeProvider.DualReadMode.OLD_LB_ONLY, false, true}, + // XDS source is still secondary switching NEW_LB_ONLY -> OLD_LB_ONLY + {D2ClientJmxManager.DiscoverySourceType.XDS, DualReadModeProvider.DualReadMode.NEW_LB_ONLY, + DualReadModeProvider.DualReadMode.OLD_LB_ONLY, true, false} + }; + } + @Test(dataProvider = "sourceTypeAndDualReadModeForLixSwitch") + public void testJmxNamesOnDualReadModeSwitch(D2ClientJmxManager.DiscoverySourceType sourceType, + DualReadModeProvider.DualReadMode oldMode, DualReadModeProvider.DualReadMode newMode, boolean isPrimaryBefore, boolean isPrimaryAfter) + { + D2ClientJmxManagerFixture fixture = new D2ClientJmxManagerFixture(); + D2ClientJmxManager d2ClientJmxManager = fixture.getD2ClientJmxManager("Foo", sourceType, true); + + DualReadStateManager dualReadStateManager = fixture._dualReadStateManager; + dualReadStateManager.updateGlobal(oldMode); + doReturn(oldMode).when(dualReadStateManager).getGlobalDualReadMode(); + doReturn(oldMode).when(dualReadStateManager).getServiceDualReadMode(any()); + doReturn(oldMode).when(dualReadStateManager).getClusterDualReadMode(any()); + + d2ClientJmxManager.setSimpleLoadBalancer(fixture._loadBalancer); + d2ClientJmxManager.setSimpleLoadBalancerState(fixture._simpleLoadBalancerState); + SimpleLoadBalancerState.SimpleLoadBalancerStateListener lbStateListener = fixture._simpleLoadBalancerStateListenerCaptor.getValue(); + lbStateListener.onServicePropertiesUpdate(SERVICE_PROPERTIES_LOAD_BALANCER_STATE_ITEM); + lbStateListener.onClusterInfoUpdate(fixture._clusterInfoItem); + lbStateListener.onStrategyAdded("S_Foo", "https", fixture._relativeLoadBalancerStrategy); + d2ClientJmxManager.setFsUriStore(fixture._uriStore); + d2ClientJmxManager.setFsClusterStore(fixture._clusterStore); + d2ClientJmxManager.setFsServiceStore(fixture._serviceStore); + + verifyJmxNames(fixture, sourceType, isPrimaryBefore, false); + + doReturn(newMode).when(dualReadStateManager).getGlobalDualReadMode(); + doReturn(newMode).when(dualReadStateManager).getServiceDualReadMode(any()); + doReturn(newMode).when(dualReadStateManager).getClusterDualReadMode(any()); + + // trigger notifying watchers + dualReadStateManager.updateGlobal(newMode); + dualReadStateManager.updateService("S_Foo", newMode); + dualReadStateManager.updateCluster("C_Foo", newMode); + + verifyJmxNames(fixture, sourceType, isPrimaryAfter, isPrimaryBefore == isPrimaryAfter); + } + + private void verifyJmxNames(D2ClientJmxManagerFixture fixture, D2ClientJmxManager.DiscoverySourceType sourceType, + boolean expectedToBePrimary, boolean calledTwice) + { + JmxManager jmxManager = fixture._jmxManager; + int callTimes = calledTwice ? 2 : 1; + if (expectedToBePrimary) + { + verify(jmxManager, times(callTimes)).registerLoadBalancer(eq("Foo-LoadBalancer"), eq(fixture._loadBalancer)); + verify(jmxManager, times(callTimes)).registerLoadBalancerState(eq("Foo-LoadBalancerState"), eq(fixture._simpleLoadBalancerState)); + verify(jmxManager, times(callTimes)).registerFileStore(eq("Foo-FileStoreUriStore"), eq(fixture._uriStore)); + verify(jmxManager, times(callTimes)).registerFileStore(eq("Foo-FileStoreClusterStore"), eq(fixture._clusterStore)); + verify(jmxManager, times(callTimes)).registerFileStore(eq("Foo-FileStoreServiceStore"), eq(fixture._serviceStore)); + verify(jmxManager, times(callTimes)).registerServiceProperties(eq("S_Foo-ServiceProperties"), eq(SERVICE_PROPERTIES_LOAD_BALANCER_STATE_ITEM)); + verify(jmxManager, times(callTimes)).registerClusterInfo(eq("C_Foo-ClusterInfo"), eq(fixture._clusterInfoItem)); + verify(jmxManager, times(callTimes)).registerLoadBalancerStrategy(eq("S_Foo-https-LoadBalancerStrategy"), eq(fixture._relativeLoadBalancerStrategy)); + } + else + { // secondary source, include source type name in jmx names + switch (sourceType) + { + case XDS: + verify(jmxManager, times(callTimes)).registerLoadBalancer(eq("Foo-xDS-LoadBalancer"), eq(fixture._loadBalancer)); + verify(jmxManager, times(callTimes)).registerLoadBalancerState(eq("Foo-xDS-LoadBalancerState"), eq(fixture._simpleLoadBalancerState)); + verify(jmxManager, times(callTimes)).registerFileStore(eq("Foo-xDS-FileStoreUriStore"), eq(fixture._uriStore)); + verify(jmxManager, times(callTimes)).registerFileStore(eq("Foo-xDS-FileStoreClusterStore"), eq(fixture._clusterStore)); + verify(jmxManager, times(callTimes)).registerFileStore(eq("Foo-xDS-FileStoreServiceStore"), eq(fixture._serviceStore)); + verify(jmxManager, times(callTimes)).registerServiceProperties(eq("xDS-S_Foo-ServiceProperties"), eq(SERVICE_PROPERTIES_LOAD_BALANCER_STATE_ITEM)); + verify(jmxManager, times(callTimes)).registerClusterInfo(eq("xDS-C_Foo-ClusterInfo"), eq(fixture._clusterInfoItem)); + verify(jmxManager, times(callTimes)).registerLoadBalancerStrategy(eq("xDS-S_Foo-https-LoadBalancerStrategy"), eq(fixture._relativeLoadBalancerStrategy)); + break; + case ZK: + verify(jmxManager, times(callTimes)).registerLoadBalancer(eq("Foo-ZK-LoadBalancer"), eq(fixture._loadBalancer)); + verify(jmxManager, times(callTimes)).registerLoadBalancerState(eq("Foo-ZK-LoadBalancerState"), eq(fixture._simpleLoadBalancerState)); + verify(jmxManager, times(callTimes)).registerFileStore(eq("Foo-ZK-FileStoreUriStore"), eq(fixture._uriStore)); + verify(jmxManager, times(callTimes)).registerFileStore(eq("Foo-ZK-FileStoreClusterStore"), eq(fixture._clusterStore)); + verify(jmxManager, times(callTimes)).registerFileStore(eq("Foo-ZK-FileStoreServiceStore"), eq(fixture._serviceStore)); + verify(jmxManager, times(callTimes)).registerServiceProperties(eq("ZK-S_Foo-ServiceProperties"), eq(SERVICE_PROPERTIES_LOAD_BALANCER_STATE_ITEM)); + verify(jmxManager, times(callTimes)).registerClusterInfo(eq("ZK-C_Foo-ClusterInfo"), eq(fixture._clusterInfoItem)); + verify(jmxManager, times(callTimes)).registerLoadBalancerStrategy(eq("ZK-S_Foo-https-LoadBalancerStrategy"), eq(fixture._relativeLoadBalancerStrategy)); + break; + default: + Assert.fail(String.format("Unknown source type: %s", sourceType)); + } + } } } diff --git a/gradle.properties b/gradle.properties index e844d0a81d..93304a77b9 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,4 +1,4 @@ -version=29.43.11 +version=29.44.0 group=com.linkedin.pegasus org.gradle.configureondemand=true org.gradle.parallel=true diff --git a/r2-int-test/src/test/java/test/r2/integ/TestJetty404.java b/r2-int-test/src/test/java/test/r2/integ/TestJetty404.java index 15241403f8..1d47bb6406 100644 --- a/r2-int-test/src/test/java/test/r2/integ/TestJetty404.java +++ b/r2-int-test/src/test/java/test/r2/integ/TestJetty404.java @@ -26,6 +26,7 @@ import com.linkedin.r2.transport.http.server.HttpServer; import com.linkedin.r2.transport.http.server.HttpServerFactory; import com.linkedin.test.util.retry.SingleRetry; +import com.linkedin.test.util.retry.ThreeRetries; import java.io.IOException; import java.net.URI; import java.util.Collections; @@ -74,7 +75,7 @@ public void handleStreamRequest(StreamRequest req, Map wireAttrs } // make sure jetty's default behavior will read all the request bytes in case of 404 - @Test(retryAnalyzer = SingleRetry.class) // Known to be flaky in CI + @Test(retryAnalyzer = ThreeRetries.class) // Known to be flaky in CI public void testJetty404() throws Exception { BytesWriter writer = new BytesWriter(200 * 1024, (byte)100); diff --git a/r2-netty/src/test/java/com/linkedin/r2/transport/http/client/TestChannelPoolManagerFactorySharingConnection.java b/r2-netty/src/test/java/com/linkedin/r2/transport/http/client/TestChannelPoolManagerFactorySharingConnection.java index 748b62137c..215ebd6dcd 100644 --- a/r2-netty/src/test/java/com/linkedin/r2/transport/http/client/TestChannelPoolManagerFactorySharingConnection.java +++ b/r2-netty/src/test/java/com/linkedin/r2/transport/http/client/TestChannelPoolManagerFactorySharingConnection.java @@ -16,6 +16,7 @@ package com.linkedin.r2.transport.http.client; +import com.linkedin.test.util.retry.ThreeRetries; import java.net.URI; import java.util.ArrayList; import java.util.HashMap; @@ -74,7 +75,7 @@ public static Object[][] configsOpenedConnections() * End to end test. Testing all the client combinations (http/https stream/rest sharing/not sharing) and check they * are using the same channelPoolManager */ - @Test(dataProvider = "configsOpenedConnections") + @Test(dataProvider = "configsOpenedConnections", retryAnalyzer = ThreeRetries.class) // Known to be flaky in CI public void testSuccessfulRequests(boolean restOverStream, String protocolVersion, boolean shareConnection) throws Exception {