From 5128dd56eb2a4dc6091a410ca3c04384500ba80a Mon Sep 17 00:00:00 2001 From: Paul Chesnais Date: Fri, 10 May 2024 15:32:55 -0700 Subject: [PATCH] Implement new DualReadMonitor for UriProperties (#999) * Implement new DualReadMonitor for UriProperties This new monitor should be as expensive as the previous one since the previous one called the `.equals` method on `UriProperties`, which compares every single URI. It now produces a similarity metric which represents the fraction of matching hosts present in the ZK response and the observer response. The Cluster and Service monitors remain unchanged. * Update logging strategy * add jmx method and tests, and put behind a config * update changelog * make it thread safe * lock per cluster * use cluster match record itself as lock and add test * execute multi-thread test multiple times * check if tasks completed before executor timeout * modify test: ensure properties for the same lb are reported in order * adding more multi-thread test cases * clarify comments in test * add a debug log and clean up test * use one queue for each lb in test * address comments * adjust log msg * Just lock the whole damn thing --------- Co-authored-by: Bohan Yang --- CHANGELOG.md | 6 +- .../dualread/DualReadLoadBalancerJmx.java | 67 +++-- .../DualReadLoadBalancerJmxMBean.java | 10 + .../dualread/DualReadLoadBalancerMonitor.java | 87 +----- .../dualread/DualReadStateManager.java | 21 +- .../UriPropertiesDualReadMonitor.java | 248 +++++++++++++++++ .../DualReadLoadBalancerMonitorTest.java | 90 ------- .../UriPropertiesDualReadMonitorTest.java | 255 ++++++++++++++++++ .../d2/jmx/D2ClientJmxManagerTest.java | 2 +- .../linkedin/d2/xds/XdsToD2SampleClient.java | 2 +- gradle.properties | 2 +- .../com/linkedin/util/RateLimitedLogger.java | 2 +- 12 files changed, 587 insertions(+), 205 deletions(-) create mode 100644 d2/src/main/java/com/linkedin/d2/balancer/dualread/UriPropertiesDualReadMonitor.java create mode 100644 d2/src/test/java/com/linkedin/d2/balancer/dualread/UriPropertiesDualReadMonitorTest.java diff --git a/CHANGELOG.md b/CHANGELOG.md index e1e310c95b..53f1f4bdcb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,9 @@ and what APIs have changed, if applicable. ## [Unreleased] +## [29.54.0] - 2024-05-08 +- Dual read monitors cluster uris similarity + ## [29.53.1] - 2024-04-24 - Remove emitting SD event for receiving URI data update @@ -5686,7 +5689,8 @@ patch operations can re-use these classes for generating patch messages. ## [0.14.1] -[Unreleased]: https://github.com/linkedin/rest.li/compare/v29.53.1...master +[Unreleased]: https://github.com/linkedin/rest.li/compare/v29.54.0...master +[29.54.0]: https://github.com/linkedin/rest.li/compare/v29.53.1...v29.54.0 [29.53.1]: https://github.com/linkedin/rest.li/compare/v29.53.0...v29.53.1 [29.53.0]: https://github.com/linkedin/rest.li/compare/v29.52.1...v29.53.0 [29.52.1]: https://github.com/linkedin/rest.li/compare/v29.52.0...v29.52.1 diff --git a/d2/src/main/java/com/linkedin/d2/balancer/dualread/DualReadLoadBalancerJmx.java b/d2/src/main/java/com/linkedin/d2/balancer/dualread/DualReadLoadBalancerJmx.java index 299830ddb8..2551983176 100644 --- a/d2/src/main/java/com/linkedin/d2/balancer/dualread/DualReadLoadBalancerJmx.java +++ b/d2/src/main/java/com/linkedin/d2/balancer/dualread/DualReadLoadBalancerJmx.java @@ -16,22 +16,27 @@ package com.linkedin.d2.balancer.dualread; +import java.util.HashMap; +import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import javax.annotation.Nullable; public class DualReadLoadBalancerJmx implements DualReadLoadBalancerJmxMBean { private final AtomicInteger _servicePropertiesErrorCount = new AtomicInteger(); private final AtomicInteger _clusterPropertiesErrorCount = new AtomicInteger(); - private final AtomicInteger _uriPropertiesErrorCount = new AtomicInteger(); private final AtomicInteger _servicePropertiesEvictCount = new AtomicInteger(); private final AtomicInteger _clusterPropertiesEvictCount = new AtomicInteger(); - private final AtomicInteger _uriPropertiesEvictCount = new AtomicInteger(); private final AtomicInteger _servicePropertiesOutOfSyncCount = new AtomicInteger(); private final AtomicInteger _clusterPropertiesOutOfSyncCount = new AtomicInteger(); - private final AtomicInteger _uriPropertiesOutOfSyncCount = new AtomicInteger(); + + private final AtomicReference _uriPropertiesSimilarity = new AtomicReference<>(0d); + + private final Map _clusters = new HashMap<>(); @Override @@ -46,10 +51,11 @@ public int getClusterPropertiesErrorCount() return _clusterPropertiesErrorCount.get(); } + @Deprecated @Override public int getUriPropertiesErrorCount() { - return _uriPropertiesErrorCount.get(); + return 0; } @Override @@ -64,25 +70,41 @@ public int getClusterPropertiesEvictCount() return _clusterPropertiesEvictCount.get(); } + @Deprecated @Override public int getUriPropertiesEvictCount() { - return _uriPropertiesEvictCount.get(); + return 0; } @Override - public int getServicePropertiesOutOfSyncCount() { + public int getServicePropertiesOutOfSyncCount() + { return _servicePropertiesOutOfSyncCount.get(); } @Override - public int getClusterPropertiesOutOfSyncCount() { + public int getClusterPropertiesOutOfSyncCount() + { return _clusterPropertiesOutOfSyncCount.get(); } + @Deprecated + @Override + public int getUriPropertiesOutOfSyncCount() + { + return 0; + } + + @Override + public double getUriPropertiesSimilarity() + { + return _uriPropertiesSimilarity.get(); + } + @Override - public int getUriPropertiesOutOfSyncCount() { - return _uriPropertiesOutOfSyncCount.get(); + public @Nullable UriPropertiesDualReadMonitor.ClusterMatchRecord getClusterMatchRecord(String clusterName) { + return _clusters.get(clusterName); } public void incrementServicePropertiesErrorCount() @@ -95,11 +117,6 @@ public void incrementClusterPropertiesErrorCount() _clusterPropertiesErrorCount.incrementAndGet(); } - public void incrementUriPropertiesErrorCount() - { - _uriPropertiesErrorCount.incrementAndGet(); - } - public void incrementServicePropertiesEvictCount() { _servicePropertiesEvictCount.incrementAndGet(); @@ -110,11 +127,6 @@ public void incrementClusterPropertiesEvictCount() _clusterPropertiesEvictCount.incrementAndGet(); } - public void incrementUriPropertiesEvictCount() - { - _uriPropertiesEvictCount.incrementAndGet(); - } - public void incrementServicePropertiesOutOfSyncCount() { _servicePropertiesOutOfSyncCount.incrementAndGet(); @@ -125,11 +137,6 @@ public void incrementClusterPropertiesOutOfSyncCount() _clusterPropertiesOutOfSyncCount.incrementAndGet(); } - public void incrementUriPropertiesOutOfSyncCount() - { - _uriPropertiesOutOfSyncCount.incrementAndGet(); - } - public void decrementServicePropertiesOutOfSyncCount() { _servicePropertiesOutOfSyncCount.decrementAndGet(); @@ -140,8 +147,14 @@ public void decrementClusterPropertiesOutOfSyncCount() _clusterPropertiesOutOfSyncCount.decrementAndGet(); } - public void decrementUriPropertiesOutOfSyncCount() + public void setUriPropertiesSimilarity(double similarity) + { + _uriPropertiesSimilarity.set(similarity); + } + + public void setClusterMatchRecord(String clusterName, + UriPropertiesDualReadMonitor.ClusterMatchRecord clusterMatchRecord) { - _uriPropertiesOutOfSyncCount.decrementAndGet(); + _clusters.put(clusterName, clusterMatchRecord); } -} +} \ No newline at end of file diff --git a/d2/src/main/java/com/linkedin/d2/balancer/dualread/DualReadLoadBalancerJmxMBean.java b/d2/src/main/java/com/linkedin/d2/balancer/dualread/DualReadLoadBalancerJmxMBean.java index c47bc18fc9..99aeb758d5 100644 --- a/d2/src/main/java/com/linkedin/d2/balancer/dualread/DualReadLoadBalancerJmxMBean.java +++ b/d2/src/main/java/com/linkedin/d2/balancer/dualread/DualReadLoadBalancerJmxMBean.java @@ -23,6 +23,7 @@ public interface DualReadLoadBalancerJmxMBean int getClusterPropertiesErrorCount(); + @Deprecated int getUriPropertiesErrorCount(); // Evict count is incremented when cache grows to the max size and entries get evicted. @@ -30,6 +31,7 @@ public interface DualReadLoadBalancerJmxMBean int getClusterPropertiesEvictCount(); + @Deprecated int getUriPropertiesEvictCount(); // Entries become out of sync when: @@ -41,5 +43,13 @@ public interface DualReadLoadBalancerJmxMBean int getClusterPropertiesOutOfSyncCount(); + @Deprecated int getUriPropertiesOutOfSyncCount(); + + // Similarity is calculated as the ratio of the number of URIs that are common between the two LBs to the total + // number of URIs in both LBs. + double getUriPropertiesSimilarity(); + + // Returns the ClusterMatchRecord for the given clusterName. + UriPropertiesDualReadMonitor.ClusterMatchRecord getClusterMatchRecord(String clusterName); } diff --git a/d2/src/main/java/com/linkedin/d2/balancer/dualread/DualReadLoadBalancerMonitor.java b/d2/src/main/java/com/linkedin/d2/balancer/dualread/DualReadLoadBalancerMonitor.java index efdf59e3c1..e8e8a45993 100644 --- a/d2/src/main/java/com/linkedin/d2/balancer/dualread/DualReadLoadBalancerMonitor.java +++ b/d2/src/main/java/com/linkedin/d2/balancer/dualread/DualReadLoadBalancerMonitor.java @@ -22,8 +22,6 @@ import com.google.common.cache.RemovalCause; import com.linkedin.d2.balancer.properties.ClusterProperties; import com.linkedin.d2.balancer.properties.ServiceProperties; -import com.linkedin.d2.balancer.properties.UriProperties; -import com.linkedin.util.RateLimitedLogger; import com.linkedin.util.clock.Clock; import java.time.Instant; import java.time.ZoneId; @@ -42,26 +40,27 @@ * * When a new service discovery data is reported, it will check if the cache of the other data source * has data for the same property name. If there is, it will compare whether the two data are equal. + * + * Note that there are only two implementations of this class, one for {@link ServiceProperties} and one for + * {@link ClusterProperties}, and not one for {@link com.linkedin.d2.balancer.properties.UriProperties}. This is because + * the URI properties need to be compared holistically at the cluster level. */ public abstract class DualReadLoadBalancerMonitor { private static final Logger LOG = LoggerFactory.getLogger(DualReadLoadBalancerMonitor.class); public final static String DEFAULT_DATE_FORMAT = "YYYY/MM/dd HH:mm:ss.SSS"; public final static String VERSION_FROM_FS = "-1"; - private static final long ERROR_REPORT_PERIOD = 600 * 1000; // Limit error report logging to every 10 minutes private static final int MAX_CACHE_SIZE = 10000; private final Cache> _oldLbPropertyCache; private final Cache> _newLbPropertyCache; - private final RateLimitedLogger _rateLimitedLogger; private final Clock _clock; private final DateTimeFormatter _format; - public DualReadLoadBalancerMonitor(Clock clock) + private DualReadLoadBalancerMonitor(Clock clock) { _oldLbPropertyCache = buildCache(); _newLbPropertyCache = buildCache(); - _rateLimitedLogger = new RateLimitedLogger(LOG, ERROR_REPORT_PERIOD, clock); _clock = clock; _format = DateTimeFormatter.ofPattern(DEFAULT_DATE_FORMAT); } @@ -72,7 +71,6 @@ public void reportData(String propertyName, T property, String propertyVersion, Cache> cacheToAdd = fromNewLb ? _newLbPropertyCache : _oldLbPropertyCache; CacheEntry existingEntry = cacheToAdd.getIfPresent(propertyName); String propertyClassName = property.getClass().getSimpleName(); - boolean isUriProp = property instanceof UriProperties; if (existingEntry != null && existingEntry._data.equals(property)) { @@ -92,19 +90,10 @@ public void reportData(String propertyName, T property, String propertyVersion, // different. if (!isReadFromFS(existingEntry._version, propertyVersion)) { - String msg = String.format("Received same data of different versions in %s LB for %s: %s." - + " Old version: %s, New version: %s, Data: %s", - fromNewLb ? "New" : "Old", propertyClassName, propertyName, existingEntry._version, - propertyVersion, property); - - if (isUriProp) - { - LOG.debug(msg); - } - else - { - warnByPropType(isUriProp, msg); - } + LOG.warn("Received same data of different versions in {} LB for {} {}" + + " Old version: {} New version: {} Data: {}", + fromNewLb ? "New" : "Old", propertyClassName, propertyName, existingEntry._version, + propertyVersion, property); } // still need to put in the cache, don't skip } @@ -136,23 +125,14 @@ else if (!isDataEqual && isVersionEqual) { // data is not the same but version is the same, a mismatch! incrementPropertiesErrorCount(); incrementEntryOutOfSyncCount(); // increment the out-of-sync count for the entry received later - warnByPropType(isUriProp, - String.format("Received mismatched %s for %s. %s", propertyClassName, propertyName, entriesLogMsg)); + LOG.warn("Received mismatched {} for {}. {}", propertyClassName, propertyName, entriesLogMsg); cacheToCompare.invalidate(propertyName); } else { if (isDataEqual) { - String msg = String.format("Received same data of %s for %s but with different versions: %s", + LOG.warn("Received same data of {} for {} but with different versions: {}", propertyClassName, propertyName, entriesLogMsg); - if (isUriProp) - { - LOG.debug(msg); - } - else - { - warnByPropType(isUriProp, msg); - } } cacheToAdd.put(propertyName, newEntry); incrementEntryOutOfSyncCount(); @@ -209,18 +189,6 @@ private boolean isReadFromFS(String v1, String v2) return v1.startsWith(VERSION_FROM_FS) || v2.startsWith(VERSION_FROM_FS); } - private void warnByPropType(boolean isUriProp, String msg) - { - if (isUriProp) - { - _rateLimitedLogger.warn(msg); - } - else - { - LOG.warn(msg); - } - } - @VisibleForTesting String getEntriesMessage(boolean fromNewLb, CacheEntry oldE, CacheEntry newE) { @@ -319,37 +287,4 @@ void onEvict() _dualReadLoadBalancerJmx.incrementServicePropertiesEvictCount(); } } - - public static final class UriPropertiesDualReadMonitor extends DualReadLoadBalancerMonitor - { - private final DualReadLoadBalancerJmx _dualReadLoadBalancerJmx; - - public UriPropertiesDualReadMonitor(DualReadLoadBalancerJmx dualReadLoadBalancerJmx, Clock clock) - { - super(clock); - _dualReadLoadBalancerJmx = dualReadLoadBalancerJmx; - } - - @Override - void incrementEntryOutOfSyncCount() { - _dualReadLoadBalancerJmx.incrementUriPropertiesOutOfSyncCount(); - } - - @Override - void decrementEntryOutOfSyncCount() { - _dualReadLoadBalancerJmx.decrementUriPropertiesOutOfSyncCount(); - } - - @Override - void incrementPropertiesErrorCount() - { - _dualReadLoadBalancerJmx.incrementUriPropertiesErrorCount(); - } - - @Override - void onEvict() - { - _dualReadLoadBalancerJmx.incrementUriPropertiesEvictCount(); - } - } } diff --git a/d2/src/main/java/com/linkedin/d2/balancer/dualread/DualReadStateManager.java b/d2/src/main/java/com/linkedin/d2/balancer/dualread/DualReadStateManager.java index eafd26f0fa..d413e454da 100644 --- a/d2/src/main/java/com/linkedin/d2/balancer/dualread/DualReadStateManager.java +++ b/d2/src/main/java/com/linkedin/d2/balancer/dualread/DualReadStateManager.java @@ -60,17 +60,24 @@ public class DualReadStateManager private final DualReadLoadBalancerJmx _dualReadLoadBalancerJmx; - private final DualReadLoadBalancerMonitor.UriPropertiesDualReadMonitor _uriPropertiesDualReadMonitor; private final DualReadLoadBalancerMonitor.ServicePropertiesDualReadMonitor _servicePropertiesDualReadMonitor; private final DualReadLoadBalancerMonitor.ClusterPropertiesDualReadMonitor _clusterPropertiesDualReadMonitor; + private final UriPropertiesDualReadMonitor _uriPropertiesDualReadMonitor; + private final boolean _monitorUriProperties; - + @Deprecated public DualReadStateManager(DualReadModeProvider dualReadModeProvider, ScheduledExecutorService executorService) + { + this(dualReadModeProvider, executorService, false); + } + + public DualReadStateManager(DualReadModeProvider dualReadModeProvider, ScheduledExecutorService executorService, + boolean monitorUriProperties) { _dualReadLoadBalancerJmx = new DualReadLoadBalancerJmx(); Clock clock = SystemClock.instance(); - _uriPropertiesDualReadMonitor = new DualReadLoadBalancerMonitor.UriPropertiesDualReadMonitor( - _dualReadLoadBalancerJmx, clock); + _monitorUriProperties = monitorUriProperties; + _uriPropertiesDualReadMonitor = new UriPropertiesDualReadMonitor(_dualReadLoadBalancerJmx); _servicePropertiesDualReadMonitor = new DualReadLoadBalancerMonitor.ServicePropertiesDualReadMonitor( _dualReadLoadBalancerJmx, clock); _clusterPropertiesDualReadMonitor = new DualReadLoadBalancerMonitor.ClusterPropertiesDualReadMonitor( @@ -172,10 +179,10 @@ private void reportClusterPropertiesData(String propertyName, ClusterProperties private void reportUriPropertiesData(String propertyName, UriProperties property, boolean fromNewLb) { - if (_clusterDualReadModes.getOrDefault(propertyName, _dualReadMode) == DualReadModeProvider.DualReadMode.DUAL_READ) + if (_monitorUriProperties && + _clusterDualReadModes.getOrDefault(propertyName, _dualReadMode) == DualReadModeProvider.DualReadMode.DUAL_READ) { - String version = property.getVersion() + "|" + property.Uris().size(); - _uriPropertiesDualReadMonitor.reportData(propertyName, property, version, fromNewLb); + _uriPropertiesDualReadMonitor.reportData(propertyName, property, fromNewLb); } } diff --git a/d2/src/main/java/com/linkedin/d2/balancer/dualread/UriPropertiesDualReadMonitor.java b/d2/src/main/java/com/linkedin/d2/balancer/dualread/UriPropertiesDualReadMonitor.java new file mode 100644 index 0000000000..771605d99a --- /dev/null +++ b/d2/src/main/java/com/linkedin/d2/balancer/dualread/UriPropertiesDualReadMonitor.java @@ -0,0 +1,248 @@ +package com.linkedin.d2.balancer.dualread; + +import com.google.common.annotations.VisibleForTesting; +import com.linkedin.d2.balancer.properties.UriProperties; +import com.linkedin.util.RateLimitedLogger; +import com.linkedin.util.clock.SystemClock; +import java.net.URI; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; +import javax.annotation.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class UriPropertiesDualReadMonitor +{ + private static final Logger LOG = LoggerFactory.getLogger(UriPropertiesDualReadMonitor.class); + + private final HashMap _clusters = new HashMap<>(); + // Limit error report logging to every 10 minutes + private final RateLimitedLogger RATE_LIMITED_LOGGER = + new RateLimitedLogger(LOG, TimeUnit.MINUTES.toMillis(10), SystemClock.instance()); + private int _totalUris = 0; + private int _matchedUris = 0; + private final DualReadLoadBalancerJmx _dualReadLoadBalancerJmx; + + public UriPropertiesDualReadMonitor(DualReadLoadBalancerJmx dualReadLoadBalancerJmx) + { + _dualReadLoadBalancerJmx = dualReadLoadBalancerJmx; + } + + public synchronized void reportData(String clusterName, UriProperties property, boolean fromNewLb) + { + ClusterMatchRecord cluster = _clusters.computeIfAbsent(clusterName, k -> new ClusterMatchRecord()); + + if (fromNewLb) + { + cluster._newLb = property; + } + else + { + cluster._oldLb = property; + } + + _totalUris -= cluster._uris; + _matchedUris -= cluster._matched; + + LOG.debug("Updated URI properties for cluster {}:\nOld LB: {}\nNew LB: {}", + clusterName, cluster._oldLb, cluster._newLb); + + if (cluster._oldLb == null && cluster._newLb == null) + { + _clusters.remove(clusterName); + updateJmxMetrics(clusterName, null); + return; + } + + cluster._matched = 0; + + if (cluster._oldLb == null || cluster._newLb == null) + { + LOG.debug("Added new URI properties for {} for {} LB.", clusterName, fromNewLb ? "New" : "Old"); + + cluster._uris = (cluster._oldLb == null) ? cluster._newLb.Uris().size() : cluster._oldLb.Uris().size(); + _totalUris += cluster._uris; + + updateJmxMetrics(clusterName, cluster); + return; + } + + cluster._uris = cluster._oldLb.Uris().size(); + Set newLbUris = new HashSet<>(cluster._newLb.Uris()); + + for (URI uri : cluster._oldLb.Uris()) + { + if (!newLbUris.remove(uri)) + { + continue; + } + + if (compareURI(uri, cluster._oldLb, cluster._newLb)) + { + cluster._matched++; + } + } + // add the remaining unmatched URIs in newLbUris to the uri count + cluster._uris += newLbUris.size(); + + if (cluster._matched != cluster._uris) + { + infoOrDebugIfLimited( + "Mismatched uri properties for cluster {} (match score: {}, total uris: {}):\nOld LB: {}\nNew LB: {}", + clusterName, (double) cluster._matched / (double) cluster._uris, cluster._uris, cluster._oldLb, + cluster._newLb); + } + else + { + LOG.debug("Matched uri properties for cluster {} (matched {} out of {} URIs)", clusterName, + cluster._matched, cluster._uris); + } + + _totalUris += cluster._uris; + _matchedUris += cluster._matched; + + updateJmxMetrics(clusterName, cluster); + } + + private void updateJmxMetrics(String clusterName, ClusterMatchRecord cluster) + { + // set a copy of cluster match record to jmx to avoid jmx reading the record in the middle of an update + _dualReadLoadBalancerJmx.setClusterMatchRecord(clusterName, cluster == null ? null : cluster.copy()); + _dualReadLoadBalancerJmx.setUriPropertiesSimilarity((double) _matchedUris / (double) _totalUris); + } + + private static boolean compareURI(URI uri, UriProperties oldLb, UriProperties newLb) + { + String clusterName = oldLb.getClusterName(); + return compareMaps("partition desc", clusterName, uri, UriProperties::getPartitionDesc, oldLb, newLb) && + compareMaps("specific properties", clusterName, uri, UriProperties::getUriSpecificProperties, oldLb, newLb); + } + + private static boolean compareMaps( + String type, String cluster, URI uri, Function>> extractor, + UriProperties oldLb, UriProperties newLb + ) + { + Map oldData = extractor.apply(oldLb).get(uri); + Map newData = extractor.apply(newLb).get(uri); + if (Objects.equals(oldData, newData)) + { + return true; + } + + LOG.debug("URI {} for {}/{} mismatched between old and new LB.\nOld LB: {}\nNew LB: {}", + type, cluster, uri, oldData, newData); + return false; + } + + private void infoOrDebugIfLimited(String msg, Object... args) + { + if (RATE_LIMITED_LOGGER.logAllowed()) + { + LOG.info(msg, args); + } + else + { + LOG.debug(msg, args); + } + } + + @VisibleForTesting + synchronized int getTotalUris() + { + return _totalUris; + } + + @VisibleForTesting + synchronized int getMatchedUris() + { + return _matchedUris; + } + + @VisibleForTesting + synchronized ClusterMatchRecord getMatchRecord(String cluster) + { + return _clusters.get(cluster).copy(); + } + + public static class ClusterMatchRecord + { + @Nullable + @VisibleForTesting + UriProperties _oldLb; + + @Nullable + @VisibleForTesting + UriProperties _newLb; + + @VisibleForTesting + int _uris; + + @VisibleForTesting + int _matched; + + private ClusterMatchRecord() + { + } + + @VisibleForTesting + ClusterMatchRecord(@Nullable UriProperties oldLb, @Nullable UriProperties newLb, int uris, int matched) + { + _oldLb = oldLb; + _newLb = newLb; + _uris = uris; + _matched = matched; + } + + ClusterMatchRecord copy() + { + return new ClusterMatchRecord(_oldLb, _newLb, _uris, _matched); + } + + @Override + public String toString() + { + return "ClusterMatchRecord{ " + + "\nTotal Uris: " + _uris + ", Matched: " + _matched + + "\nOld LB: " + _oldLb + + "\nNew LB: " + _newLb + + '}'; + } + + @Override + public boolean equals(Object obj) + { + if (this == obj) + { + return true; + } + if (obj == null) + { + return false; + } + if (getClass() != obj.getClass()) + { + return false; + } + + ClusterMatchRecord o = (ClusterMatchRecord) obj; + + return Objects.equals(_oldLb, o._oldLb) + && Objects.equals(_newLb, o._newLb) + && _uris == o._uris + && _matched == o._matched; + } + + @Override + public int hashCode() + { + return Objects.hash(_oldLb, _newLb, _uris, _matched); + } + } +} diff --git a/d2/src/test/java/com/linkedin/d2/balancer/dualread/DualReadLoadBalancerMonitorTest.java b/d2/src/test/java/com/linkedin/d2/balancer/dualread/DualReadLoadBalancerMonitorTest.java index c6fed50145..65a50df126 100644 --- a/d2/src/test/java/com/linkedin/d2/balancer/dualread/DualReadLoadBalancerMonitorTest.java +++ b/d2/src/test/java/com/linkedin/d2/balancer/dualread/DualReadLoadBalancerMonitorTest.java @@ -3,12 +3,10 @@ import com.google.common.cache.Cache; import com.linkedin.d2.balancer.properties.ServiceProperties; import com.linkedin.d2.balancer.properties.ServiceStoreProperties; -import com.linkedin.d2.balancer.properties.UriProperties; import com.linkedin.d2.util.TestDataHelper; import org.mockito.Mock; import org.mockito.MockitoAnnotations; import org.testng.Assert; -import org.testng.annotations.DataProvider; import org.testng.annotations.Test; import static com.linkedin.d2.util.TestDataHelper.*; @@ -26,7 +24,6 @@ private static class DualReadLoadBalancerMonitorTestFixure { MockitoAnnotations.initMocks(this); doNothing().when(_mockJmx).incrementServicePropertiesOutOfSyncCount(); - doNothing().when(_mockJmx).decrementUriPropertiesOutOfSyncCount(); doNothing().when(_mockJmx).incrementServicePropertiesErrorCount(); } @@ -34,11 +31,6 @@ DualReadLoadBalancerMonitor.ServicePropertiesDualReadMonitor getServiceMonitor() { return new DualReadLoadBalancerMonitor.ServicePropertiesDualReadMonitor(_mockJmx, TestDataHelper.getClock()); } - - DualReadLoadBalancerMonitor.UriPropertiesDualReadMonitor getUriMonitor() - { - return new DualReadLoadBalancerMonitor.UriPropertiesDualReadMonitor(_mockJmx, TestDataHelper.getClock()); - } } @Test @@ -105,26 +97,6 @@ public void testServiceDataMatch() verifyNoMoreInteractions(fixture._mockJmx); } - @Test // since uri version read from FS is "-1|x", test for this case specifically - public void testUriDataMatch() - { - DualReadLoadBalancerMonitorTestFixure fixture = new DualReadLoadBalancerMonitorTestFixure(); - DualReadLoadBalancerMonitor.UriPropertiesDualReadMonitor monitor = fixture.getUriMonitor(); - - // put in one new lb entry and one old lb entry, with different data and version - putInUri(monitor, CLUSTER_NAME, PROPERTIES_1, "1|2", true, 1); - putInUri(monitor, CLUSTER_NAME, PROPERTIES_2, "-1|2", false, 1); - verify(fixture._mockJmx, times(2)).incrementUriPropertiesOutOfSyncCount(); - verifyNoMoreInteractions(fixture._mockJmx); - - // data match, version differs with version read FS - monitor.reportData(CLUSTER_NAME, PROPERTIES_2, "2|2", true); - Assert.assertEquals(monitor.getOldLbCache().size(), 0); - verifyUriOnCache(monitor.getNewLbCache(), CLUSTER_NAME, PROPERTIES_1, "1|2"); - verify(fixture._mockJmx).decrementUriPropertiesOutOfSyncCount(); - verifyNoMoreInteractions(fixture._mockJmx); - } - @Test public void testMismatch() { @@ -165,66 +137,4 @@ private void verifyServiceOnCache(Cache> cache = - isFromNewLb ? monitor.getNewLbCache() : monitor.getOldLbCache(); - Assert.assertEquals(cache.size(), expectedSizeAfter); - verifyUriOnCache(cache, name, prop, version); - } - - private void verifyUriOnCache(Cache> cache, - String name, UriProperties prop, String v) - { - DualReadLoadBalancerMonitor.CacheEntry entry = cache.getIfPresent(name); - Assert.assertNotNull(entry); - Assert.assertEquals(entry._data, prop); - Assert.assertEquals(entry._version, v); - } - - @DataProvider - public Object[][] getEntriesMessageDataProvider() - { - return new Object[][] { - { - true, new DualReadLoadBalancerMonitor.CacheEntry<>("", "", PROPERTIES_1), - new DualReadLoadBalancerMonitor.CacheEntry<>("", "", PROPERTIES_2), - "\nOld LB: CacheEntry{_version=, _timeStamp='', _data=UriProperties [_clusterName=TestCluster," - + " _urisBySchemeAndPartition={http={0=[http://google.com:1], 1=[http://google.com:1]}}," - + " _partitions={http://google.com:1={0=[ weight =1.0 ], 1=[ weight =2.0 ]}}," - + " _uriSpecificProperties={}]}\n" - + "New LB: CacheEntry{_version=, _timeStamp='', " - + "_data=UriProperties [_clusterName=TestCluster, " - + "_urisBySchemeAndPartition={http={1=[http://linkedin.com:2]}}, " - + "_partitions={http://linkedin.com:2={1=[ weight =0.5 ]}}, _uriSpecificProperties={}]}" - }, - { - false, new DualReadLoadBalancerMonitor.CacheEntry<>("", "", PROPERTIES_1), - new DualReadLoadBalancerMonitor.CacheEntry<>("", "", PROPERTIES_2), - "\nOld LB: CacheEntry{_version=, _timeStamp='', _data=UriProperties [_clusterName=TestCluster," - + " _urisBySchemeAndPartition={http={1=[http://linkedin.com:2]}}, " - + "_partitions={http://linkedin.com:2={1=[ weight =0.5 ]}}, _uriSpecificProperties={}]}\n" - + "New LB: CacheEntry{_version=, _timeStamp='', _data=UriProperties [_clusterName=TestCluster," - + " _urisBySchemeAndPartition={http={0=[http://google.com:1], 1=[http://google.com:1]}}," - + " _partitions={http://google.com:1={0=[ weight =1.0 ], 1=[ weight =2.0 ]}}," - + " _uriSpecificProperties={}]}" - } - }; - } - @Test(dataProvider = "getEntriesMessageDataProvider") - public void testGetEntriesMessage(Boolean isFromNewLb, - DualReadLoadBalancerMonitor.CacheEntry oldE, - DualReadLoadBalancerMonitor.CacheEntry newE, - String expected) - { - DualReadLoadBalancerMonitor.UriPropertiesDualReadMonitor monitor = - new DualReadLoadBalancerMonitorTestFixure().getUriMonitor(); - - Assert.assertEquals(monitor.getEntriesMessage(isFromNewLb, oldE, newE), - expected, - "entry message is not the same"); - } } diff --git a/d2/src/test/java/com/linkedin/d2/balancer/dualread/UriPropertiesDualReadMonitorTest.java b/d2/src/test/java/com/linkedin/d2/balancer/dualread/UriPropertiesDualReadMonitorTest.java new file mode 100644 index 0000000000..ee4eae8af0 --- /dev/null +++ b/d2/src/test/java/com/linkedin/d2/balancer/dualread/UriPropertiesDualReadMonitorTest.java @@ -0,0 +1,255 @@ +package com.linkedin.d2.balancer.dualread; + +import com.google.common.collect.ImmutableMap; +import com.linkedin.d2.balancer.properties.PartitionData; +import com.linkedin.d2.balancer.properties.UriProperties; +import java.util.Arrays; +import java.util.Collections; +import java.util.Map; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +import static com.linkedin.d2.balancer.dualread.UriPropertiesDualReadMonitor.*; +import static com.linkedin.d2.util.TestDataHelper.*; +import static org.mockito.Mockito.*; +import static org.testng.Assert.*; + + +public class UriPropertiesDualReadMonitorTest { + private static final String CLUSTER_1 = "cluster1"; + private static final String CLUSTER_2 = "cluster2"; + private static final Map WEIGHT_1_PARTITION_DATA = ImmutableMap.of(0, new PartitionData(1)); + private static final Map WEIGHT_2_PARTITION_DATA = ImmutableMap.of(0, new PartitionData(2)); + private static final Map SIZE_ONE_URI_SPECIFIC_PROPERTIES = ImmutableMap.of("foo", "foo-value"); + private static final Map SIZE_TWO_URI_SPECIFIC_PROPERTIES = ImmutableMap.of( + "foo", "foo-value", + "bar", 1); + private static final UriProperties URI_PROPERTIES_1 = new UriProperties(CLUSTER_1, + ImmutableMap.of(URI_1, WEIGHT_1_PARTITION_DATA), + ImmutableMap.of(URI_1, Collections.emptyMap())); + + private static final UriProperties URI_PROPERTIES_2 = new UriProperties(CLUSTER_1, + ImmutableMap.of(URI_2, WEIGHT_1_PARTITION_DATA), + ImmutableMap.of(URI_2, Collections.emptyMap())); + + private static final UriProperties URI_PROPERTIES_URI_1_AND_2 = new UriProperties(CLUSTER_1, + ImmutableMap.of(URI_1, WEIGHT_1_PARTITION_DATA, URI_2, WEIGHT_1_PARTITION_DATA), + ImmutableMap.of(URI_1, Collections.emptyMap(), URI_2, Collections.emptyMap())); + + private static final UriProperties URI_PROPERTIES_URI_3_AND_4 = new UriProperties(CLUSTER_2, + ImmutableMap.of(URI_3, WEIGHT_1_PARTITION_DATA, URI_4, WEIGHT_1_PARTITION_DATA), + ImmutableMap.of(URI_3, Collections.emptyMap(), URI_4, SIZE_ONE_URI_SPECIFIC_PROPERTIES)); + + private static final UriProperties URI_PROPERTIES_URI_3_DIFF_SPECIFIC_PROPERTIES_AND_4_DIFF_WEIGHT = + new UriProperties(CLUSTER_2, + ImmutableMap.of(URI_3, WEIGHT_1_PARTITION_DATA, URI_4, WEIGHT_2_PARTITION_DATA), + ImmutableMap.of(URI_3, SIZE_ONE_URI_SPECIFIC_PROPERTIES, URI_4, SIZE_ONE_URI_SPECIFIC_PROPERTIES)); + + private static final UriProperties URI_PROPERTIES_URI_3_ANOTHER_DIFF_SPECIFIC_PROPERTIES_AND_4_DIFF_WEIGHT = + new UriProperties(CLUSTER_2, + ImmutableMap.of(URI_3, WEIGHT_1_PARTITION_DATA, URI_4, WEIGHT_2_PARTITION_DATA), + ImmutableMap.of(URI_3, SIZE_TWO_URI_SPECIFIC_PROPERTIES, URI_4, SIZE_ONE_URI_SPECIFIC_PROPERTIES)); + + @Test + public void testReportData() { + UriPropertiesDualReadMonitorTestFixture fixture = new UriPropertiesDualReadMonitorTestFixture(); + UriPropertiesDualReadMonitor monitor = fixture.getMonitor(); + + // new lb has uri 1 + monitor.reportData(CLUSTER_1, URI_PROPERTIES_1, true); + verifyJmxMetricParams(fixture, CLUSTER_1, + new ClusterMatchRecord(null, URI_PROPERTIES_1, 1, 0), + 0.0); + + // old lb has uri 2 + monitor.reportData(CLUSTER_1, URI_PROPERTIES_2, false); + verifyJmxMetricParams(fixture, CLUSTER_1, + new ClusterMatchRecord(URI_PROPERTIES_2, URI_PROPERTIES_1, 2, 0), + 0.0); + + // old lb updated with both uri 1 and 2 + monitor.reportData(CLUSTER_1, URI_PROPERTIES_URI_1_AND_2, false); + verifyJmxMetricParams(fixture, CLUSTER_1, + new ClusterMatchRecord(URI_PROPERTIES_URI_1_AND_2, URI_PROPERTIES_1, 2, 1), + 0.5); + + // new lb updated with both uri 1 and 2 + monitor.reportData(CLUSTER_1, URI_PROPERTIES_URI_1_AND_2, true); + verifyJmxMetricParams(fixture, CLUSTER_1, + new ClusterMatchRecord(URI_PROPERTIES_URI_1_AND_2, URI_PROPERTIES_URI_1_AND_2, 2, 2), + 1.0); + + // add data for cluster 2, old lb with uri 3 and 4 + monitor.reportData(CLUSTER_2, URI_PROPERTIES_URI_3_AND_4, false); + assertEquals(monitor.getTotalUris(), 4); + assertEquals(monitor.getMatchedUris(), 2); + verifyJmxMetricParams(fixture, CLUSTER_2, + new ClusterMatchRecord(URI_PROPERTIES_URI_3_AND_4, null, 2, 0), + 0.5); + + // new lb updated with uri 3 with different uri specific properties and uri 4 with different weight + monitor.reportData(CLUSTER_2, URI_PROPERTIES_URI_3_DIFF_SPECIFIC_PROPERTIES_AND_4_DIFF_WEIGHT, true); + assertEquals(monitor.getTotalUris(), 4); + assertEquals(monitor.getMatchedUris(), 2); + verifyJmxMetricParams(fixture, CLUSTER_2, + new ClusterMatchRecord(URI_PROPERTIES_URI_3_AND_4, URI_PROPERTIES_URI_3_DIFF_SPECIFIC_PROPERTIES_AND_4_DIFF_WEIGHT, + 2, 0), + 0.5); + + // old lb updated with uri 3 with still different uri specific properties and uri 4 with same weight as new lb + monitor.reportData(CLUSTER_2, URI_PROPERTIES_URI_3_ANOTHER_DIFF_SPECIFIC_PROPERTIES_AND_4_DIFF_WEIGHT, false); + assertEquals(monitor.getTotalUris(), 4); + assertEquals(monitor.getMatchedUris(), 3); + verifyJmxMetricParams(fixture, CLUSTER_2, + new ClusterMatchRecord(URI_PROPERTIES_URI_3_ANOTHER_DIFF_SPECIFIC_PROPERTIES_AND_4_DIFF_WEIGHT, + URI_PROPERTIES_URI_3_DIFF_SPECIFIC_PROPERTIES_AND_4_DIFF_WEIGHT, + 2, 1), + 0.75); + + // delete both lbs data for cluster 2 + monitor.reportData(CLUSTER_2, null, true); + monitor.reportData(CLUSTER_2, null, false); + verifyJmxMetricParams(fixture, CLUSTER_2, null, 1.0); + } + + @DataProvider + public Object[][] reportDataInMultiThreadsDataProvider() { + Queue twoUpdates = new ConcurrentLinkedDeque<>(Arrays.asList( + URI_PROPERTIES_1, + URI_PROPERTIES_URI_1_AND_2)); + + Queue threeUpdates = new ConcurrentLinkedDeque<>(Arrays.asList( + URI_PROPERTIES_1, + URI_PROPERTIES_URI_1_AND_2, + URI_PROPERTIES_1)); + + Queue fiveUpdates = new ConcurrentLinkedDeque<>(Arrays.asList( + URI_PROPERTIES_1, + URI_PROPERTIES_URI_1_AND_2, + URI_PROPERTIES_2, + URI_PROPERTIES_URI_1_AND_2, + URI_PROPERTIES_1)); + + /* + * Params: + * oldLbProps - uri properties to be reported by old lb + * newLbProps - uri properties to be reported by new lb + */ + return new Object[][]{ + { + twoUpdates, + new ConcurrentLinkedDeque<>(twoUpdates) + }, + { + threeUpdates, + new ConcurrentLinkedDeque<>(threeUpdates) + }, + { + fiveUpdates, + new ConcurrentLinkedDeque<>(fiveUpdates) + } + }; + } + + @Test(dataProvider = "reportDataInMultiThreadsDataProvider", invocationCount = 100, timeOut = 5_000) + public void testReportDataInMultiThreads(Queue oldLbProps, Queue newLbProps) + throws InterruptedException { + UriPropertiesDualReadMonitorTestFixture fixture = new UriPropertiesDualReadMonitorTestFixture(); + UriPropertiesDualReadMonitor monitor = fixture.getMonitor(); + + ScheduledExecutorService executor = fixture.getExecutor(); + + CountDownLatch done = fixture.getDoneSignal(oldLbProps.size() + newLbProps.size()); + // randomly report properties from old and new Lbs but properties in same lb are reported in order + executor.execute(() -> runNext(fixture, oldLbProps, false)); + executor.execute(() -> runNext(fixture, newLbProps, true)); + + done.await(); + // similarity eventually converge to 1 + assertEquals((double) monitor.getMatchedUris() / (double) monitor.getTotalUris(), 1.0, + "Similarity score not 1. Match record: " + monitor.getMatchRecord(CLUSTER_1)); + executor.shutdownNow(); + } + + // ensure properties for the same lb are reported in order + private void runNext(UriPropertiesDualReadMonitorTestFixture fixture, Queue props, boolean fromNewLb) { + UriProperties p = props.poll(); + + if (p != null && !fixture._executor.isShutdown()) { + fixture._executor.execute(() -> { + reportAndVerifyState(fixture._monitor, p, fromNewLb); + runNext(fixture, props, fromNewLb); + fixture._doneSignal.countDown(); + }); + } + } + + private void reportAndVerifyState(UriPropertiesDualReadMonitor monitor, UriProperties prop, boolean fromNewLb) { + monitor.reportData(CLUSTER_1, prop, fromNewLb); + // if reportData on the same cluster are NOT synchronized, the total uris and matched uris counts could become < 0 + // or > 2. + // e.g: when total uris = 1, matched uris = 1, if reporting URI_PROPERTIES_URI_1_AND_2 for new and old Lbs are + // executed concurrently, both counts will decrement by 1 twice and become -1 first, and total uris will increment + // by 2 twice and become 4. + // We verify that doesn't happen no matter what order the data is reported between the old and new Lbs. + int totalUris = monitor.getTotalUris(); + int matchedUris = monitor.getMatchedUris(); + double similarity = (double) matchedUris / (double) totalUris; + + assertTrue(totalUris >= 0 && totalUris <= 2); + assertTrue(matchedUris >= 0 && matchedUris <= 2); + assertTrue(similarity >= 0.0 && similarity <= 1.0, "Similarity score should be >= 0 and <= 1." + + " Match record: " + monitor.getMatchRecord(CLUSTER_1)); + } + + private void verifyJmxMetricParams(UriPropertiesDualReadMonitorTestFixture fixture, String clusterName, + ClusterMatchRecord clusterMatchRecord, double totalSimilarity) { + assertEquals(fixture._clusterNameCaptor.getValue(), clusterName); + assertEquals(fixture._clusterMatchCaptor.getValue(), clusterMatchRecord); + assertEquals(fixture._similarityCaptor.getValue(), totalSimilarity); + } + + private static class UriPropertiesDualReadMonitorTestFixture { + @Mock + DualReadLoadBalancerJmx _mockJmx; + @Captor + ArgumentCaptor _clusterNameCaptor; + @Captor + ArgumentCaptor _clusterMatchCaptor; + @Captor + ArgumentCaptor _similarityCaptor; + UriPropertiesDualReadMonitor _monitor; + ScheduledExecutorService _executor; + CountDownLatch _doneSignal; + + UriPropertiesDualReadMonitorTestFixture() { + MockitoAnnotations.initMocks(this); + doNothing().when(_mockJmx).setUriPropertiesSimilarity(_similarityCaptor.capture()); + doNothing().when(_mockJmx).setClusterMatchRecord(_clusterNameCaptor.capture(), _clusterMatchCaptor.capture()); + } + + UriPropertiesDualReadMonitor getMonitor() { + _monitor = new UriPropertiesDualReadMonitor(_mockJmx); + return _monitor; + } + + ScheduledExecutorService getExecutor() { + _executor = Executors.newScheduledThreadPool(2); + return _executor; + } + + CountDownLatch getDoneSignal(int size) { + _doneSignal = new CountDownLatch(size); + return _doneSignal; + } + } +} \ No newline at end of file diff --git a/d2/src/test/java/com/linkedin/d2/jmx/D2ClientJmxManagerTest.java b/d2/src/test/java/com/linkedin/d2/jmx/D2ClientJmxManagerTest.java index 14d4028353..7f5747bad5 100644 --- a/d2/src/test/java/com/linkedin/d2/jmx/D2ClientJmxManagerTest.java +++ b/d2/src/test/java/com/linkedin/d2/jmx/D2ClientJmxManagerTest.java @@ -143,7 +143,7 @@ public int getPartitionId(URI uri) { _servicePropertiesArgumentCaptor.capture()); Mockito.doNothing().when(_simpleLoadBalancerState).register(_simpleLoadBalancerStateListenerCaptor.capture()); - _dualReadStateManager = spy(new DualReadStateManager(_dualReadModeProvider, _executorService)); + _dualReadStateManager = spy(new DualReadStateManager(_dualReadModeProvider, _executorService, true)); doCallRealMethod().when(_dualReadStateManager).addGlobalWatcher(any()); doCallRealMethod().when(_dualReadStateManager).addServiceWatcher(any(), any()); diff --git a/d2/src/test/java/com/linkedin/d2/xds/XdsToD2SampleClient.java b/d2/src/test/java/com/linkedin/d2/xds/XdsToD2SampleClient.java index 52cde627b1..7cb1714da5 100644 --- a/d2/src/test/java/com/linkedin/d2/xds/XdsToD2SampleClient.java +++ b/d2/src/test/java/com/linkedin/d2/xds/XdsToD2SampleClient.java @@ -95,7 +95,7 @@ public static void main(String[] args) throws Exception DualReadStateManager dualReadStateManager = new DualReadStateManager( () -> DualReadModeProvider.DualReadMode.DUAL_READ, - Executors.newSingleThreadScheduledExecutor()); + Executors.newSingleThreadScheduledExecutor(), true); XdsToD2PropertiesAdaptor adaptor = new XdsToD2PropertiesAdaptor(xdsClient, dualReadStateManager, null); adaptor.listenToService(serviceName); diff --git a/gradle.properties b/gradle.properties index a382e12121..61d65c82df 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,4 +1,4 @@ -version=29.53.1 +version=29.54.0 group=com.linkedin.pegasus org.gradle.configureondemand=true org.gradle.parallel=true diff --git a/pegasus-common/src/main/java/com/linkedin/util/RateLimitedLogger.java b/pegasus-common/src/main/java/com/linkedin/util/RateLimitedLogger.java index 4da8181413..de8b3a5caa 100644 --- a/pegasus-common/src/main/java/com/linkedin/util/RateLimitedLogger.java +++ b/pegasus-common/src/main/java/com/linkedin/util/RateLimitedLogger.java @@ -551,7 +551,7 @@ public void error(Marker marker, String msg, Throwable t) } } - private boolean logAllowed() + public boolean logAllowed() { final long now = _clock.currentTimeMillis(); final long lastLog = _lastLog.get();