Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dynamically switch jmx/sensor names based on dual read mode and source type #926

Merged
merged 4 commits into from
Aug 15, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ and what APIs have changed, if applicable.

## [Unreleased]

## [29.44.0] - 2023-08-06
- dynamically switch jmx/sensor names based on dual read mode and source type

## [29.43.11] - 2023-08-01
- fix logging issues about observer host and dual read mode

Expand Down Expand Up @@ -5512,7 +5515,8 @@ patch operations can re-use these classes for generating patch messages.

## [0.14.1]

[Unreleased]: https://github.com/linkedin/rest.li/compare/v29.43.11...master
[Unreleased]: https://github.com/linkedin/rest.li/compare/v29.44.0...master
[29.44.0]: https://github.com/linkedin/rest.li/compare/v29.43.11...v29.44.0
[29.43.11]: https://github.com/linkedin/rest.li/compare/v29.43.10...v29.43.11
[29.43.10]: https://github.com/linkedin/rest.li/compare/v29.43.9...v29.43.10
[29.43.9]: https://github.com/linkedin/rest.li/compare/v29.43.8...v29.43.9
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ public LoadBalancerWithFacilities create(D2ClientConfig config)
{
LOG.info("Creating D2 LoadBalancer based on LastSeenLoadBalancerWithFacilities");

D2ClientJmxManager d2ClientJmxManager = new D2ClientJmxManager(config.d2JmxManagerPrefix, config.jmxManager);
D2ClientJmxManager d2ClientJmxManager = new D2ClientJmxManager(config.d2JmxManagerPrefix, config.jmxManager,
D2ClientJmxManager.DiscoverySourceType.ZK, config.dualReadStateManager);

// init connection
ZKConnectionBuilder zkConnectionBuilder = new ZKConnectionBuilder(config.zkHosts);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ private ZKFSLoadBalancer.TogglingLoadBalancerFactory createLoadBalancerFactory(D
loadBalancerComponentFactory = config.componentFactory;
}

D2ClientJmxManager d2ClientJmxManager = new D2ClientJmxManager(config.d2JmxManagerPrefix, config.jmxManager);
D2ClientJmxManager d2ClientJmxManager = new D2ClientJmxManager(config.d2JmxManagerPrefix, config.jmxManager,
D2ClientJmxManager.DiscoverySourceType.ZK, config.dualReadStateManager);

return new ZKFSTogglingLoadBalancerFactoryImpl(loadBalancerComponentFactory,
config.lbWaitTimeout,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.linkedin.d2.balancer.properties.UriProperties;
import com.linkedin.util.clock.Clock;
import com.linkedin.util.clock.SystemClock;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledExecutorService;
Expand All @@ -32,7 +33,6 @@
/**
* Checks and manages the global and per-service dual read state.
* Provides monitoring of the dual read data.
*
* The dual read state is broken down into global and per-service state. Per-service dual read
* mode has a higher priority. Only if per-service dual read mode is not defined, global
* dual read mode will be used.
Expand All @@ -52,6 +52,10 @@ public class DualReadStateManager
private final RateLimiter _rateLimiter;
// Stores global dual read mode
private volatile DualReadModeProvider.DualReadMode _dualReadMode = DualReadModeProvider.DualReadMode.OLD_LB_ONLY;
private final Set<DualReadModeWatcher> _globalDualReadModeWatchers;
private final ConcurrentMap<String, Set<DualReadModeWatcher>> _serviceDualReadModeWatchers;
private final ConcurrentMap<String, Set<DualReadModeWatcher>> _clusterDualReadModeWatchers;

private final DualReadLoadBalancerJmx _dualReadLoadBalancerJmx;

private final DualReadLoadBalancerMonitor.UriPropertiesDualReadMonitor _uriPropertiesDualReadMonitor;
Expand All @@ -74,6 +78,9 @@ public DualReadStateManager(DualReadModeProvider dualReadModeProvider, Scheduled
_dualReadModeProvider = dualReadModeProvider;
_executorService = executorService;
_rateLimiter = RateLimiter.create((double) 1 / DUAL_READ_MODE_SWITCH_MIN_INTERVAL);
_globalDualReadModeWatchers = ConcurrentHashMap.newKeySet();
_serviceDualReadModeWatchers = new ConcurrentHashMap<>();
_clusterDualReadModeWatchers = new ConcurrentHashMap<>();
}

public void updateGlobal(DualReadModeProvider.DualReadMode mode)
Expand All @@ -82,6 +89,7 @@ public void updateGlobal(DualReadModeProvider.DualReadMode mode)
_dualReadMode = mode;
if (updated) {
LOG.info("Global dual read mode updated: {}", mode);
notifyGlobalWatchers();
}
}

Expand All @@ -90,6 +98,7 @@ public void updateService(String service, DualReadModeProvider.DualReadMode mode
DualReadModeProvider.DualReadMode oldMode = _serviceDualReadModes.put(service, mode);
if (oldMode != mode) {
LOG.info("Dual read mode for service {} updated: {}", service, mode);
notifyServiceWatchers(service);
}
}

Expand All @@ -98,6 +107,7 @@ public void updateCluster(String cluster, DualReadModeProvider.DualReadMode mode
DualReadModeProvider.DualReadMode oldMode = _clusterDualReadModes.put(cluster, mode);
if (oldMode != mode) {
LOG.info("Dual read mode for cluster {} updated: {}", cluster, mode);
notifyClusterWatchers(cluster);
}
}

Expand Down Expand Up @@ -198,4 +208,70 @@ public DualReadModeProvider getDualReadModeProvider()
{
return _dualReadModeProvider;
}

bohhyang marked this conversation as resolved.
Show resolved Hide resolved
public void addGlobalWatcher(DualReadModeWatcher watcher)
{
_globalDualReadModeWatchers.add(watcher);
}

public void addServiceWatcher(String serviceName, DualReadModeWatcher watcher)
{
Set<DualReadModeWatcher> watchers = _serviceDualReadModeWatchers.computeIfAbsent(serviceName, k -> ConcurrentHashMap.newKeySet());
watchers.add(watcher);
}

public void addClusterWatcher(String clusterName, DualReadModeWatcher watcher)
{
Set<DualReadModeWatcher> watchers = _clusterDualReadModeWatchers.computeIfAbsent(clusterName, k -> ConcurrentHashMap.newKeySet());
watchers.add(watcher);
}

public void removeServiceWatcher(String serviceName, DualReadModeWatcher watcher)
{
Set<DualReadModeWatcher> watchers = _serviceDualReadModeWatchers.get(serviceName);
if (watchers != null)
{
watchers.remove(watcher);
}
}

public void removeClusterWatcher(String clusterName, DualReadModeWatcher watcher)
{
Set<DualReadModeWatcher> watchers = _clusterDualReadModeWatchers.get(clusterName);
if (watchers != null)
{
watchers.remove(watcher);
}
}

private void notifyGlobalWatchers()
{
notifyWatchers(_globalDualReadModeWatchers);
}

private void notifyServiceWatchers(String serviceName)
{
notifyWatchers(_serviceDualReadModeWatchers.get(serviceName));
}

private void notifyClusterWatchers(String clusterName)
{
notifyWatchers(_clusterDualReadModeWatchers.get(clusterName));
}

private void notifyWatchers(Set<DualReadModeWatcher> watchers)
bohhyang marked this conversation as resolved.
Show resolved Hide resolved
{
if (watchers != null)
{
for (DualReadModeWatcher w : watchers)
{
w.onChanged();
}
}
}

public interface DualReadModeWatcher
{
void onChanged();
bohhyang marked this conversation as resolved.
Show resolved Hide resolved
}
}
Loading
Loading