Skip to content

Commit

Permalink
add service/cluster-not-found count to simple load balancer jmx. And …
Browse files Browse the repository at this point in the history
…add entry-out-of-sync count to dual read monitoring.
  • Loading branch information
bohhyang committed Sep 26, 2023
1 parent fe20597 commit f0d4b74
Show file tree
Hide file tree
Showing 8 changed files with 140 additions and 2 deletions.
6 changes: 5 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ and what APIs have changed, if applicable.

## [Unreleased]

## [29.46.2] - 2023-09-25
- add service/cluster-not-found count to simple load balancer jmx. And add entry-out-of-sync count to dual read monitoring.

## [29.46.1] - 2023-09-20
- Keep the old convention (using a variable java of type matrix) in publish.yml

Expand Down Expand Up @@ -5533,7 +5536,8 @@ patch operations can re-use these classes for generating patch messages.

## [0.14.1]

[Unreleased]: https://github.com/linkedin/rest.li/compare/v29.46.1...master
[Unreleased]: https://github.com/linkedin/rest.li/compare/v29.46.2...master
[29.46.2]: https://github.com/linkedin/rest.li/compare/v29.46.1...v29.46.2
[29.46.1]: https://github.com/linkedin/rest.li/compare/v29.46.0...v29.46.1
[29.46.0]: https://github.com/linkedin/rest.li/compare/v29.45.1...v29.46.0
[29.45.1]: https://github.com/linkedin/rest.li/compare/v29.45.0...v29.45.1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ public class DualReadLoadBalancerJmx implements DualReadLoadBalancerJmxMBean
private final AtomicInteger _clusterPropertiesEvictCount = new AtomicInteger();
private final AtomicInteger _uriPropertiesEvictCount = new AtomicInteger();

private final AtomicInteger _servicePropertiesOutOfSyncCount = new AtomicInteger();
private final AtomicInteger _clusterPropertiesOutOfSyncCount = new AtomicInteger();
private final AtomicInteger _uriPropertiesOutOfSyncCount = new AtomicInteger();


@Override
public int getServicePropertiesErrorCount()
{
Expand Down Expand Up @@ -65,6 +70,21 @@ public int getUriPropertiesEvictCount()
return _uriPropertiesEvictCount.get();
}

@Override
public int getServicePropertiesOutOfSyncCount() {
return _servicePropertiesOutOfSyncCount.get();
}

@Override
public int getClusterPropertiesOutOfSyncCount() {
return _clusterPropertiesOutOfSyncCount.get();
}

@Override
public int getUriPropertiesOutOfSyncCount() {
return _uriPropertiesOutOfSyncCount.get();
}

public void incrementServicePropertiesErrorCount()
{
_servicePropertiesErrorCount.incrementAndGet();
Expand Down Expand Up @@ -94,4 +114,34 @@ public void incrementUriPropertiesEvictCount()
{
_uriPropertiesEvictCount.incrementAndGet();
}

public void incrementServicePropertiesOutOfSyncCount()
{
_servicePropertiesOutOfSyncCount.incrementAndGet();
}

public void incrementClusterPropertiesOutOfSyncCount()
{
_clusterPropertiesOutOfSyncCount.incrementAndGet();
}

public void incrementUriPropertiesOutOfSyncCount()
{
_uriPropertiesOutOfSyncCount.incrementAndGet();
}

public void decrementServicePropertiesOutOfSyncCount()
{
_servicePropertiesOutOfSyncCount.decrementAndGet();
}

public void decrementClusterPropertiesOutOfSyncCount()
{
_clusterPropertiesOutOfSyncCount.decrementAndGet();
}

public void decrementUriPropertiesOutOfSyncCount()
{
_uriPropertiesOutOfSyncCount.decrementAndGet();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,28 @@

public interface DualReadLoadBalancerJmxMBean
{
// Error count is incremented only when data of the same version is unequal
int getServicePropertiesErrorCount();

int getClusterPropertiesErrorCount();

int getUriPropertiesErrorCount();

// Evict count is incremented when cache grows to the max size and entries get evicted.
int getServicePropertiesEvictCount();

int getClusterPropertiesEvictCount();

int getUriPropertiesEvictCount();

// Entries become out of sync when:
// 1) data of the same version is unequal.
// OR. 2) data of a newer version is received in one cache before the other cache receives the older version to compare.
// Note that entries in each cache are counted individually.
// For example: A1 != A2 is considered as TWO entries being out of sync.
int getServicePropertiesOutOfSyncCount();

int getClusterPropertiesOutOfSyncCount();

int getUriPropertiesOutOfSyncCount();
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,15 +80,26 @@ public void reportData(String propertyName, T property, String propertyVersion,
{
_rateLimitedLogger.warn("Received mismatched properties from dual read. Old LB: {}, New LB: {}",
entry, newEntry);
incrementEntryOutOfSyncCount(); // increment the out-of-sync count for the entry received later
}
else
{ // entries are in-sync, decrement the out-of-sync count for the entry received earlier
decrementEntryOutOfSyncCount();
}
cacheToCompare.invalidate(propertyName);
}
else
{
cacheToAdd.put(propertyName, new CacheEntry<>(propertyVersion, getTimestamp(), property));
// if version is different, entries of both the old version and the new version will increment the out-of-sync count
incrementEntryOutOfSyncCount();
}
}

abstract void incrementEntryOutOfSyncCount();

abstract void decrementEntryOutOfSyncCount();

abstract boolean isEqual(CacheEntry<T> oldLbEntry, CacheEntry<T> newLbEntry);

abstract void onEvict();
Expand Down Expand Up @@ -145,6 +156,16 @@ public ClusterPropertiesDualReadMonitor(DualReadLoadBalancerJmx dualReadLoadBala
_dualReadLoadBalancerJmx = dualReadLoadBalancerJmx;
}

@Override
void incrementEntryOutOfSyncCount() {
_dualReadLoadBalancerJmx.incrementClusterPropertiesOutOfSyncCount();
}

@Override
void decrementEntryOutOfSyncCount() {
_dualReadLoadBalancerJmx.decrementClusterPropertiesOutOfSyncCount();
}

@Override
boolean isEqual(CacheEntry<ClusterProperties> oldLbEntry, CacheEntry<ClusterProperties> newLbEntry)
{
Expand Down Expand Up @@ -174,6 +195,16 @@ public ServicePropertiesDualReadMonitor(DualReadLoadBalancerJmx dualReadLoadBala
_dualReadLoadBalancerJmx = dualReadLoadBalancerJmx;
}

@Override
void incrementEntryOutOfSyncCount() {
_dualReadLoadBalancerJmx.incrementServicePropertiesOutOfSyncCount();
}

@Override
void decrementEntryOutOfSyncCount() {
_dualReadLoadBalancerJmx.decrementServicePropertiesOutOfSyncCount();
}

@Override
boolean isEqual(CacheEntry<ServiceProperties> oldLbEntry, CacheEntry<ServiceProperties> newLbEntry)
{
Expand Down Expand Up @@ -203,6 +234,16 @@ public UriPropertiesDualReadMonitor(DualReadLoadBalancerJmx dualReadLoadBalancer
_dualReadLoadBalancerJmx = dualReadLoadBalancerJmx;
}

@Override
void incrementEntryOutOfSyncCount() {
_dualReadLoadBalancerJmx.incrementUriPropertiesOutOfSyncCount();
}

@Override
void decrementEntryOutOfSyncCount() {
_dualReadLoadBalancerJmx.decrementUriPropertiesOutOfSyncCount();
}

@Override
boolean isEqual(CacheEntry<UriProperties> oldLbEntry, CacheEntry<UriProperties> newLbEntry)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ public class SimpleLoadBalancer implements LoadBalancer, HashRingProvider, Clien
private final LoadBalancerState _state;
private final Stats _serviceUnavailableStats;
private final Stats _serviceAvailableStats;
private final Stats _serviceNotFoundStats; // service is not present in service discovery system
private final Stats _clusterNotFoundStats; // cluster is not present in service discovery system
private final long _timeout;
private final TimeUnit _unit;
private final ScheduledExecutorService _executor;
Expand Down Expand Up @@ -137,6 +139,8 @@ public SimpleLoadBalancer(LoadBalancerState state,
_state = state;
_serviceUnavailableStats = serviceUnavailableStats;
_serviceAvailableStats = serviceAvailableStats;
_serviceNotFoundStats = new Stats(1000);
_clusterNotFoundStats = new Stats(1000);
_timeout = timeout;
_unit = unit;
_executor = executor;
Expand All @@ -151,6 +155,16 @@ public SimpleLoadBalancer(LoadBalancerState state,
}
}

public Stats getServiceNotFoundStats()
{
return _serviceNotFoundStats;
}

public Stats getClusterNotFoundStats()
{
return _clusterNotFoundStats;
}

public Stats getServiceUnavailableStats()
{
return _serviceUnavailableStats;
Expand Down Expand Up @@ -749,6 +763,7 @@ public void getLoadBalancedServiceProperties(String serviceName, Callback<Servic
@Override
public void onError(Throwable e)
{
_serviceNotFoundStats.inc();
finalCallback.onError(new ServiceUnavailableException(serviceName, "PEGA_1011. " + e.getMessage(), e));
}

Expand Down Expand Up @@ -808,6 +823,7 @@ public void getLoadBalancedClusterAndUriProperties(String clusterName,
@Override
public void onError(Throwable e)
{
_clusterNotFoundStats.inc();
finalCallback.onError(new ServiceUnavailableException(clusterName, "PEGA_1011. " + e.getMessage(), e));
}

Expand Down
10 changes: 10 additions & 0 deletions d2/src/main/java/com/linkedin/d2/jmx/SimpleLoadBalancerJmx.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,16 @@ public long getClientNotFoundCount()
return _loadBalancer.getServiceUnavailableStats().getCount();
}

@Override
public long getServiceNotFoundCount() {
return _loadBalancer.getServiceNotFoundStats().getCount();
}

@Override
public long getClusterNotFoundCount() {
return _loadBalancer.getClusterNotFoundStats().getCount();
}

@Override
public String toString()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,9 @@ public interface SimpleLoadBalancerJmxMBean

long getClientNotFoundCount();

long getServiceNotFoundCount();

long getClusterNotFoundCount();

String toString();
}
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
version=29.46.1
version=29.46.2
group=com.linkedin.pegasus
org.gradle.configureondemand=true
org.gradle.parallel=true
Expand Down

0 comments on commit f0d4b74

Please sign in to comment.