Skip to content

Commit

Permalink
Dynamically switch jmx/sensor names based on dual read mode and sourc…
Browse files Browse the repository at this point in the history
…e type (#926)

* Differentiate LB metrics between ZK and xDS read flows

* dynamically switch jmx/sensor names based on dual read mode and source type

* move watcher management logic to a manager class

* pass updated dual read mode to watcher callbacks
  • Loading branch information
bohhyang authored Aug 15, 2023
1 parent 47784c3 commit 641694b
Show file tree
Hide file tree
Showing 14 changed files with 1,129 additions and 162 deletions.
6 changes: 5 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ and what APIs have changed, if applicable.

## [Unreleased]

## [29.44.0] - 2023-08-06
- dynamically switch jmx/sensor names based on dual read mode and source type

## [29.43.11] - 2023-08-01
- fix logging issues about observer host and dual read mode

Expand Down Expand Up @@ -5512,7 +5515,8 @@ patch operations can re-use these classes for generating patch messages.

## [0.14.1]

[Unreleased]: https://github.com/linkedin/rest.li/compare/v29.43.11...master
[Unreleased]: https://github.com/linkedin/rest.li/compare/v29.44.0...master
[29.44.0]: https://github.com/linkedin/rest.li/compare/v29.43.11...v29.44.0
[29.43.11]: https://github.com/linkedin/rest.li/compare/v29.43.10...v29.43.11
[29.43.10]: https://github.com/linkedin/rest.li/compare/v29.43.9...v29.43.10
[29.43.9]: https://github.com/linkedin/rest.li/compare/v29.43.8...v29.43.9
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ public LoadBalancerWithFacilities create(D2ClientConfig config)
{
LOG.info("Creating D2 LoadBalancer based on LastSeenLoadBalancerWithFacilities");

D2ClientJmxManager d2ClientJmxManager = new D2ClientJmxManager(config.d2JmxManagerPrefix, config.jmxManager);
D2ClientJmxManager d2ClientJmxManager = new D2ClientJmxManager(config.d2JmxManagerPrefix, config.jmxManager,
D2ClientJmxManager.DiscoverySourceType.ZK, config.dualReadStateManager);

// init connection
ZKConnectionBuilder zkConnectionBuilder = new ZKConnectionBuilder(config.zkHosts);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ private ZKFSLoadBalancer.TogglingLoadBalancerFactory createLoadBalancerFactory(D
loadBalancerComponentFactory = config.componentFactory;
}

D2ClientJmxManager d2ClientJmxManager = new D2ClientJmxManager(config.d2JmxManagerPrefix, config.jmxManager);
D2ClientJmxManager d2ClientJmxManager = new D2ClientJmxManager(config.d2JmxManagerPrefix, config.jmxManager,
D2ClientJmxManager.DiscoverySourceType.ZK, config.dualReadStateManager);

return new ZKFSTogglingLoadBalancerFactoryImpl(loadBalancerComponentFactory,
config.lbWaitTimeout,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,18 @@
import com.linkedin.d2.balancer.properties.UriProperties;
import com.linkedin.util.clock.Clock;
import com.linkedin.util.clock.SystemClock;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledExecutorService;
import javax.annotation.Nonnull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
* Checks and manages the global and per-service dual read state.
* Provides monitoring of the dual read data.
*
* The dual read state is broken down into global and per-service state. Per-service dual read
* mode has a higher priority. Only if per-service dual read mode is not defined, global
* dual read mode will be used.
Expand All @@ -52,6 +53,10 @@ public class DualReadStateManager
private final RateLimiter _rateLimiter;
// Stores global dual read mode
private volatile DualReadModeProvider.DualReadMode _dualReadMode = DualReadModeProvider.DualReadMode.OLD_LB_ONLY;
private final Set<DualReadModeWatcher> _globalDualReadModeWatchers;
private final ConcurrentMap<String, Set<DualReadModeWatcher>> _serviceDualReadModeWatchers;
private final ConcurrentMap<String, Set<DualReadModeWatcher>> _clusterDualReadModeWatchers;

private final DualReadLoadBalancerJmx _dualReadLoadBalancerJmx;

private final DualReadLoadBalancerMonitor.UriPropertiesDualReadMonitor _uriPropertiesDualReadMonitor;
Expand All @@ -74,6 +79,9 @@ public DualReadStateManager(DualReadModeProvider dualReadModeProvider, Scheduled
_dualReadModeProvider = dualReadModeProvider;
_executorService = executorService;
_rateLimiter = RateLimiter.create((double) 1 / DUAL_READ_MODE_SWITCH_MIN_INTERVAL);
_globalDualReadModeWatchers = ConcurrentHashMap.newKeySet();
_serviceDualReadModeWatchers = new ConcurrentHashMap<>();
_clusterDualReadModeWatchers = new ConcurrentHashMap<>();
}

public void updateGlobal(DualReadModeProvider.DualReadMode mode)
Expand All @@ -82,6 +90,7 @@ public void updateGlobal(DualReadModeProvider.DualReadMode mode)
_dualReadMode = mode;
if (updated) {
LOG.info("Global dual read mode updated: {}", mode);
notifyGlobalWatchers(_dualReadMode);
}
}

Expand All @@ -90,6 +99,7 @@ public void updateService(String service, DualReadModeProvider.DualReadMode mode
DualReadModeProvider.DualReadMode oldMode = _serviceDualReadModes.put(service, mode);
if (oldMode != mode) {
LOG.info("Dual read mode for service {} updated: {}", service, mode);
notifyServiceWatchers(service, mode);
}
}

Expand All @@ -98,6 +108,7 @@ public void updateCluster(String cluster, DualReadModeProvider.DualReadMode mode
DualReadModeProvider.DualReadMode oldMode = _clusterDualReadModes.put(cluster, mode);
if (oldMode != mode) {
LOG.info("Dual read mode for cluster {} updated: {}", cluster, mode);
notifyClusterWatchers(cluster, mode);
}
}

Expand Down Expand Up @@ -198,4 +209,75 @@ public DualReadModeProvider getDualReadModeProvider()
{
return _dualReadModeProvider;
}

// Add watchers watching for global dual read mode. The watcher will be notified when the global dual read mode changes.
public void addGlobalWatcher(DualReadModeWatcher watcher)
{
_globalDualReadModeWatchers.add(watcher);
}

// Add watchers watching for dual read mode of a service. The watcher will be notified when the dual read mode changes.
public void addServiceWatcher(String serviceName, DualReadModeWatcher watcher)
{
Set<DualReadModeWatcher> watchers = _serviceDualReadModeWatchers.computeIfAbsent(serviceName, k -> ConcurrentHashMap.newKeySet());
watchers.add(watcher);
}

// Add watchers watching for dual read mode of a cluster. The watcher will be notified when the dual read mode changes.
public void addClusterWatcher(String clusterName, DualReadModeWatcher watcher)
{
Set<DualReadModeWatcher> watchers = _clusterDualReadModeWatchers.computeIfAbsent(clusterName, k -> ConcurrentHashMap.newKeySet());
watchers.add(watcher);
}

// Remove watchers for dual read mode of a service.
public void removeServiceWatcher(String serviceName, DualReadModeWatcher watcher)
{
Set<DualReadModeWatcher> watchers = _serviceDualReadModeWatchers.get(serviceName);
if (watchers != null)
{
watchers.remove(watcher);
}
}

// Remove watchers for dual read mode of a cluster.
public void removeClusterWatcher(String clusterName, DualReadModeWatcher watcher)
{
Set<DualReadModeWatcher> watchers = _clusterDualReadModeWatchers.get(clusterName);
if (watchers != null)
{
watchers.remove(watcher);
}
}

private void notifyGlobalWatchers(DualReadModeProvider.DualReadMode mode)
{
notifyWatchers(_globalDualReadModeWatchers, mode);
}

private void notifyServiceWatchers(String serviceName, DualReadModeProvider.DualReadMode mode)
{
notifyWatchers(_serviceDualReadModeWatchers.get(serviceName), mode);
}

private void notifyClusterWatchers(String clusterName, DualReadModeProvider.DualReadMode mode)
{
notifyWatchers(_clusterDualReadModeWatchers.get(clusterName), mode);
}

private static void notifyWatchers(Set<DualReadModeWatcher> watchers, DualReadModeProvider.DualReadMode mode)
{
if (watchers != null)
{
for (DualReadModeWatcher w : watchers)
{
w.onChanged(mode);
}
}
}

public interface DualReadModeWatcher
{
void onChanged(@Nonnull DualReadModeProvider.DualReadMode mode);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
Copyright (c) 2023 LinkedIn Corp.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package com.linkedin.d2.jmx;

import com.linkedin.d2.balancer.LoadBalancerStateItem;
import com.linkedin.d2.balancer.dualread.DualReadModeProvider;
import com.linkedin.d2.balancer.dualread.DualReadStateManager;
import com.linkedin.d2.balancer.properties.ClusterProperties;
import com.linkedin.d2.balancer.properties.ServiceProperties;
import com.linkedin.d2.balancer.properties.UriProperties;
import com.linkedin.d2.balancer.simple.ClusterInfoItem;
import com.linkedin.d2.balancer.simple.SimpleLoadBalancer;
import com.linkedin.d2.balancer.simple.SimpleLoadBalancerState;
import com.linkedin.d2.balancer.strategies.LoadBalancerStrategy;
import com.linkedin.d2.discovery.stores.file.FileStore;
import java.util.function.BiConsumer;
import javax.annotation.Nonnull;


/**
* Manage d2 client jmx dual read mode watchers for different types of load balancing related properties.
*/
public interface D2ClientJmxDualReadModeWatcherManager
{

void updateWatcher(SimpleLoadBalancer balancer, BiConsumer<SimpleLoadBalancer, DualReadModeProvider.DualReadMode> callback);

void updateWatcher(SimpleLoadBalancerState state, BiConsumer<SimpleLoadBalancerState, DualReadModeProvider.DualReadMode> callback);

void updateWatcher(String serviceName, String scheme, LoadBalancerStrategy strategy,
BiConsumer<LoadBalancerStrategy, DualReadModeProvider.DualReadMode> callback);

void updateWatcher(String clusterName, ClusterInfoItem clusterInfoItem,
BiConsumer<ClusterInfoItem, DualReadModeProvider.DualReadMode> callback);

void updateWatcher(String serviceName, LoadBalancerStateItem<ServiceProperties> serviceProperties,
BiConsumer<LoadBalancerStateItem<ServiceProperties>, DualReadModeProvider.DualReadMode> callback);

void updateWatcherForFileStoreUriProperties(FileStore<UriProperties> uriStore,
BiConsumer<FileStore<UriProperties>, DualReadModeProvider.DualReadMode> callback);

void updateWatcherForFileStoreClusterProperties(FileStore<ClusterProperties> clusterStore,
BiConsumer<FileStore<ClusterProperties>, DualReadModeProvider.DualReadMode> callback);

void updateWatcherForFileStoreServiceProperties(FileStore<ServiceProperties> serviceStore,
BiConsumer<FileStore<ServiceProperties>, DualReadModeProvider.DualReadMode> callback);

void removeWatcherForLoadBalancerStrategy(String serviceName, String scheme);

void removeWatcherForClusterInfoItem(String clusterName);

void removeWatcherForServiceProperties(String serviceName);


final class D2ClientJmxDualReadModeWatcher<T> implements DualReadStateManager.DualReadModeWatcher
{
private T _latestJmxProperty;
private final BiConsumer<T, DualReadModeProvider.DualReadMode> _callback;

D2ClientJmxDualReadModeWatcher(T initialJmxProperty, BiConsumer<T, DualReadModeProvider.DualReadMode>callback)
{
_latestJmxProperty = initialJmxProperty;
_callback = callback;
}

public T getLatestJmxProperty()
{
return _latestJmxProperty;
}

public void setLatestJmxProperty(T latestJmxProperty)
{
_latestJmxProperty = latestJmxProperty;
}

@Override
public void onChanged(@Nonnull DualReadModeProvider.DualReadMode mode)
{
_callback.accept(_latestJmxProperty, mode);
}
}
}
Loading

0 comments on commit 641694b

Please sign in to comment.