Skip to content

Commit

Permalink
refactor the code structure and resolve comments
Browse files Browse the repository at this point in the history
  • Loading branch information
brycezhongqing committed Nov 11, 2023
1 parent 677929c commit b87968b
Show file tree
Hide file tree
Showing 6 changed files with 73 additions and 130 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import javax.net.ssl.SSLContext;
Expand Down Expand Up @@ -201,7 +202,8 @@ public D2Client build()
_config.serviceDiscoveryEventEmitter,
_config.dualReadStateManager,
_config.xdsExecutorService,
_config.xdsStreamReadyTimeout
_config.xdsStreamReadyTimeout,
_config.loadBalancerThreadPool
);

final LoadBalancerWithFacilitiesFactory loadBalancerFactory = (_config.lbWithFacilitiesFactory == null) ?
Expand Down Expand Up @@ -643,6 +645,11 @@ public D2ClientBuilder setDualReadStateManager(DualReadStateManager dualReadStat
return this;
}

public D2ClientBuilder setLoadBalancerThreadPool(ThreadPoolExecutor loadBalancerThreadPool) {
_config.loadBalancerThreadPool = loadBalancerThreadPool;
return this;
}

/**
* Single-threaded executor service for xDS communication.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import javax.net.ssl.SSLContext;
Expand Down Expand Up @@ -127,6 +128,7 @@ public class D2ClientConfig

public ScheduledExecutorService xdsExecutorService = null;
public Long xdsStreamReadyTimeout = null;
public ThreadPoolExecutor loadBalancerThreadPool = null;

public D2ClientConfig()
{
Expand Down Expand Up @@ -195,7 +197,8 @@ public D2ClientConfig()
ServiceDiscoveryEventEmitter serviceDiscoveryEventEmitter,
DualReadStateManager dualReadStateManager,
ScheduledExecutorService xdsExecutorService,
Long xdsStreamReadyTimeout)
Long xdsStreamReadyTimeout,
ThreadPoolExecutor loadBalancerThreadPool)
{
this.zkHosts = zkHosts;
this.xdsServer = xdsServer;
Expand Down Expand Up @@ -261,5 +264,6 @@ public D2ClientConfig()
this.dualReadStateManager = dualReadStateManager;
this.xdsExecutorService = xdsExecutorService;
this.xdsStreamReadyTimeout = xdsStreamReadyTimeout;
this.loadBalancerThreadPool = loadBalancerThreadPool;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,15 @@
import com.linkedin.d2.balancer.util.hashing.HashRingProvider;
import com.linkedin.d2.balancer.util.partitions.PartitionInfoProvider;
import com.linkedin.d2.discovery.event.PropertyEventThread;
import com.linkedin.d2.xds.LoadBalanceTaskPool.NewLoadBalancerTaskthreadPool;
import com.linkedin.d2.xds.LoadBalanceTaskPool.NewBalanceGetPropertiesTask;
import com.linkedin.r2.message.Request;
import com.linkedin.r2.message.RequestContext;
import com.linkedin.r2.transport.common.TransportClientFactory;
import com.linkedin.r2.transport.common.bridge.client.TransportClient;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -55,21 +57,20 @@
* In DUAL_READ mode, it reads from both the old and the new load balancer, but relies on the data from old
* load balancer only.
*/
public class DualReadLoadBalancer implements LoadBalancerWithFacilities
{
public class DualReadLoadBalancer implements LoadBalancerWithFacilities {
private static final Logger LOG = LoggerFactory.getLogger(DualReadLoadBalancer.class);
private final LoadBalancerWithFacilities _oldLb;
private final LoadBalancerWithFacilities _newLb;
private final DualReadStateManager _dualReadStateManager;
private final NewLoadBalancerTaskthreadPool _newNewLoadBalancerTaskThreadPool;
private ThreadPoolExecutor _loadBalancerThreadPool;
private boolean _isNewLbReady;

public DualReadLoadBalancer(LoadBalancerWithFacilities oldLb, LoadBalancerWithFacilities newLb,
@Nonnull DualReadStateManager dualReadStateManager, NewLoadBalancerTaskthreadPool newLoadBalancerTaskThreadPool) {
@Nonnull DualReadStateManager dualReadStateManager)
{
_oldLb = oldLb;
_newLb = newLb;
_dualReadStateManager = dualReadStateManager;
_newNewLoadBalancerTaskThreadPool = newLoadBalancerTaskThreadPool;
_isNewLbReady = false;
}

Expand Down Expand Up @@ -111,8 +112,31 @@ public void getClient(Request request, RequestContext requestContext, Callback<T
_newLb.getClient(request, requestContext, clientCallback);
break;
case DUAL_READ:
_newNewLoadBalancerTaskThreadPool.execute(
new NewBalanceGetPropertiesTask(_newLb, _dualReadStateManager, serviceName));
getLoadBalancerThreadPool().execute(
() -> _newLb.getLoadBalancedServiceProperties(serviceName, new Callback<ServiceProperties>() {
@Override
public void onError(Throwable e) {
LOG.error("Double read failure. Unable to read service properties from: " + serviceName, e);
}

@Override
public void onSuccess(ServiceProperties result) {
String clusterName = result.getClusterName();
_dualReadStateManager.updateCluster(clusterName, DualReadModeProvider.DualReadMode.DUAL_READ);
_newLb.getLoadBalancedClusterAndUriProperties(clusterName,
new Callback<Pair<ClusterProperties, UriProperties>>() {
@Override
public void onError(Throwable e) {
LOG.error("Dual read failure. Unable to read cluster properties from: " + clusterName, e);
}

@Override
public void onSuccess(Pair<ClusterProperties, UriProperties> result) {
LOG.debug("Dual read is successful. Get cluster and uri properties: " + result);
}
});
}
}));
_oldLb.getClient(request, requestContext, clientCallback);
break;
case OLD_LB_ONLY:
Expand Down Expand Up @@ -257,6 +281,29 @@ private DualReadModeProvider.DualReadMode getDualReadMode(String d2ServiceName)
return _dualReadStateManager.getServiceDualReadMode(d2ServiceName);
}


/**
* Get the thread pool for load balancer tasks. If not set, a default thread pool will be used.
*/
public ThreadPoolExecutor getLoadBalancerThreadPool() {
return _loadBalancerThreadPool;
}

/**
* Set the thread pool for load balancer tasks. If not set, a default thread pool will be used.
*/
public void setLoadBalancerThreadPool(ThreadPoolExecutor loadBalancerThreadPool) {
if (loadBalancerThreadPool != null) {
this._loadBalancerThreadPool = loadBalancerThreadPool;
} else {
LOG.info("LoadBalancerTaskThreadPool is null, using default thread pool");
this._loadBalancerThreadPool =
new ThreadPoolExecutor(2, 3, 200, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(1000), (r, executor) -> {
LOG.error("LoadBalancerTaskThreadPool rejected execution, isNewReady: " + _isNewLbReady);
});
}
}

@Override
public void shutdown(PropertyEventThread.PropertyEventShutdownCallback callback)
{
Expand All @@ -266,5 +313,6 @@ public void shutdown(PropertyEventThread.PropertyEventShutdownCallback callback)
});

_oldLb.shutdown(callback);
_loadBalancerThreadPool.shutdown();
}
}

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import com.linkedin.d2.balancer.dualread.DualReadLoadBalancer;
import com.linkedin.d2.balancer.dualread.DualReadModeProvider;
import com.linkedin.d2.balancer.dualread.DualReadStateManager;
import com.linkedin.d2.xds.LoadBalanceTaskPool.NewLoadBalancerTaskthreadPool;
import javax.annotation.Nonnull;


Expand All @@ -34,20 +33,19 @@ public class DualReadZkAndXdsLoadBalancerFactory implements LoadBalancerWithFaci
private final LoadBalancerWithFacilitiesFactory _zkLbFactory;
private final LoadBalancerWithFacilitiesFactory _xdsLbFactory;
private final DualReadStateManager _dualReadStateManager;
private final NewLoadBalancerTaskthreadPool _newNewLoadBalancerTaskThreadPool;

public DualReadZkAndXdsLoadBalancerFactory(@Nonnull DualReadStateManager dualReadStateManager)
{
_zkLbFactory = new ZKFSLoadBalancerWithFacilitiesFactory();
_xdsLbFactory = new XdsLoadBalancerWithFacilitiesFactory();
_dualReadStateManager = dualReadStateManager;
_newNewLoadBalancerTaskThreadPool = new NewLoadBalancerTaskthreadPool();
}

@Override
public LoadBalancerWithFacilities create(D2ClientConfig config)
{
return new DualReadLoadBalancer(_zkLbFactory.create(config), _xdsLbFactory.create(config), _dualReadStateManager,
_newNewLoadBalancerTaskThreadPool);
DualReadLoadBalancer loadBalancer = new DualReadLoadBalancer(_zkLbFactory.create(config), _xdsLbFactory.create(config), _dualReadStateManager);
loadBalancer.setLoadBalancerThreadPool(config.loadBalancerThreadPool);
return loadBalancer;
}
}

0 comments on commit b87968b

Please sign in to comment.