Skip to content

Commit

Permalink
dynamically switch jmx/sensor names based on dual read mode and sourc…
Browse files Browse the repository at this point in the history
…e type
  • Loading branch information
bohhyang committed Aug 6, 2023
1 parent e18c663 commit e8b922c
Show file tree
Hide file tree
Showing 11 changed files with 844 additions and 160 deletions.
11 changes: 7 additions & 4 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@ When updating the changelog, remember to be very clear about what behavior has c
and what APIs have changed, if applicable.

## [Unreleased]
- fix logging issues about observer host and dual read mode

## [29.43.11] - 2023-07-28
- differentiate LB metrics between ZK and xDS read flows.
## [29.44.0] - 2023-08-06
- dynamically switch jmx/sensor names based on dual read mode and source type

## [29.43.11] - 2023-08-01
- fix logging issues about observer host and dual read mode

## [29.43.10] - 2023-07-24
- set log level of dual read mode changes to info.
Expand Down Expand Up @@ -5513,7 +5515,8 @@ patch operations can re-use these classes for generating patch messages.

## [0.14.1]

[Unreleased]: https://github.com/linkedin/rest.li/compare/v29.43.11...master
[Unreleased]: https://github.com/linkedin/rest.li/compare/v29.44.0...master
[29.44.0]: https://github.com/linkedin/rest.li/compare/v29.43.11...v29.44.0
[29.43.11]: https://github.com/linkedin/rest.li/compare/v29.43.10...v29.43.11
[29.43.10]: https://github.com/linkedin/rest.li/compare/v29.43.9...v29.43.10
[29.43.9]: https://github.com/linkedin/rest.li/compare/v29.43.8...v29.43.9
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ public LoadBalancerWithFacilities create(D2ClientConfig config)
{
LOG.info("Creating D2 LoadBalancer based on LastSeenLoadBalancerWithFacilities");

D2ClientJmxManager d2ClientJmxManager = new D2ClientJmxManager(config.d2JmxManagerPrefix, config.jmxManager);
D2ClientJmxManager d2ClientJmxManager = new D2ClientJmxManager(config.d2JmxManagerPrefix, config.jmxManager,
D2ClientJmxManager.DiscoverySourceType.ZK, config.dualReadStateManager);

// init connection
ZKConnectionBuilder zkConnectionBuilder = new ZKConnectionBuilder(config.zkHosts);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ private ZKFSLoadBalancer.TogglingLoadBalancerFactory createLoadBalancerFactory(D
loadBalancerComponentFactory = config.componentFactory;
}

D2ClientJmxManager d2ClientJmxManager = new D2ClientJmxManager(config.d2JmxManagerPrefix, config.jmxManager);
D2ClientJmxManager d2ClientJmxManager = new D2ClientJmxManager(config.d2JmxManagerPrefix, config.jmxManager,
D2ClientJmxManager.DiscoverySourceType.ZK, config.dualReadStateManager);

return new ZKFSTogglingLoadBalancerFactoryImpl(loadBalancerComponentFactory,
config.lbWaitTimeout,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.linkedin.d2.balancer.properties.UriProperties;
import com.linkedin.util.clock.Clock;
import com.linkedin.util.clock.SystemClock;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledExecutorService;
Expand All @@ -32,7 +33,6 @@
/**
* Checks and manages the global and per-service dual read state.
* Provides monitoring of the dual read data.
*
* The dual read state is broken down into global and per-service state. Per-service dual read
* mode has a higher priority. Only if per-service dual read mode is not defined, global
* dual read mode will be used.
Expand All @@ -52,6 +52,10 @@ public class DualReadStateManager
private final RateLimiter _rateLimiter;
// Stores global dual read mode
private volatile DualReadModeProvider.DualReadMode _dualReadMode = DualReadModeProvider.DualReadMode.OLD_LB_ONLY;
private final Set<DualReadModeWatcher> _globalDualReadModeWatchers;
private final ConcurrentMap<String, Set<DualReadModeWatcher>> _serviceDualReadModeWatchers;
private final ConcurrentMap<String, Set<DualReadModeWatcher>> _clusterDualReadModeWatchers;

private final DualReadLoadBalancerJmx _dualReadLoadBalancerJmx;

private final DualReadLoadBalancerMonitor.UriPropertiesDualReadMonitor _uriPropertiesDualReadMonitor;
Expand All @@ -74,6 +78,9 @@ public DualReadStateManager(DualReadModeProvider dualReadModeProvider, Scheduled
_dualReadModeProvider = dualReadModeProvider;
_executorService = executorService;
_rateLimiter = RateLimiter.create((double) 1 / DUAL_READ_MODE_SWITCH_MIN_INTERVAL);
_globalDualReadModeWatchers = ConcurrentHashMap.newKeySet();
_serviceDualReadModeWatchers = new ConcurrentHashMap<>();
_clusterDualReadModeWatchers = new ConcurrentHashMap<>();
}

public void updateGlobal(DualReadModeProvider.DualReadMode mode)
Expand All @@ -82,6 +89,7 @@ public void updateGlobal(DualReadModeProvider.DualReadMode mode)
_dualReadMode = mode;
if (updated) {
LOG.info("Global dual read mode updated: {}", mode);
notifyGlobalWatchers();
}
}

Expand All @@ -90,6 +98,7 @@ public void updateService(String service, DualReadModeProvider.DualReadMode mode
DualReadModeProvider.DualReadMode oldMode = _serviceDualReadModes.put(service, mode);
if (oldMode != mode) {
LOG.info("Dual read mode for service {} updated: {}", service, mode);
notifyServiceWatchers(service);
}
}

Expand All @@ -98,6 +107,7 @@ public void updateCluster(String cluster, DualReadModeProvider.DualReadMode mode
DualReadModeProvider.DualReadMode oldMode = _clusterDualReadModes.put(cluster, mode);
if (oldMode != mode) {
LOG.info("Dual read mode for cluster {} updated: {}", cluster, mode);
notifyClusterWatchers(cluster);
}
}

Expand Down Expand Up @@ -198,4 +208,70 @@ public DualReadModeProvider getDualReadModeProvider()
{
return _dualReadModeProvider;
}

public void addGlobalWatcher(DualReadModeWatcher watcher)
{
_globalDualReadModeWatchers.add(watcher);
}

public void addServiceWatcher(String serviceName, DualReadModeWatcher watcher)
{
Set<DualReadModeWatcher> watchers = _serviceDualReadModeWatchers.computeIfAbsent(serviceName, k -> ConcurrentHashMap.newKeySet());
watchers.add(watcher);
}

public void addClusterWatcher(String clusterName, DualReadModeWatcher watcher)
{
Set<DualReadModeWatcher> watchers = _clusterDualReadModeWatchers.computeIfAbsent(clusterName, k -> ConcurrentHashMap.newKeySet());
watchers.add(watcher);
}

public void removeServiceWatcher(String serviceName, DualReadModeWatcher watcher)
{
Set<DualReadModeWatcher> watchers = _serviceDualReadModeWatchers.get(serviceName);
if (watchers != null)
{
watchers.remove(watcher);
}
}

public void removeClusterWatcher(String clusterName, DualReadModeWatcher watcher)
{
Set<DualReadModeWatcher> watchers = _clusterDualReadModeWatchers.get(clusterName);
if (watchers != null)
{
watchers.remove(watcher);
}
}

private void notifyGlobalWatchers()
{
notifyWatchers(_globalDualReadModeWatchers);
}

private void notifyServiceWatchers(String serviceName)
{
notifyWatchers(_serviceDualReadModeWatchers.get(serviceName));
}

private void notifyClusterWatchers(String clusterName)
{
notifyWatchers(_clusterDualReadModeWatchers.get(clusterName));
}

private void notifyWatchers(Set<DualReadModeWatcher> watchers)
{
if (watchers != null)
{
for (DualReadModeWatcher w : watchers)
{
w.onChanged();
}
}
}

public interface DualReadModeWatcher
{
void onChanged();
}
}
Loading

0 comments on commit e8b922c

Please sign in to comment.