Skip to content

Commit

Permalink
Add configs for xds timeout and executor
Browse files Browse the repository at this point in the history
  • Loading branch information
shivamgupta1 committed Oct 10, 2023
1 parent d176e62 commit 52a418b
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 7 deletions.
17 changes: 16 additions & 1 deletion d2/src/main/java/com/linkedin/d2/balancer/D2ClientBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,9 @@ public D2Client build()
_config.failoutConfigProviderFactory,
_config.failoutRedirectStrategy,
_config.serviceDiscoveryEventEmitter,
_config.dualReadStateManager
_config.dualReadStateManager,
_config.xdsExecutorService,
_config.xdsStreamReadyTimeout
);

final LoadBalancerWithFacilitiesFactory loadBalancerFactory = (_config.lbWithFacilitiesFactory == null) ?
Expand Down Expand Up @@ -641,6 +643,19 @@ public D2ClientBuilder setDualReadStateManager(DualReadStateManager dualReadStat
return this;
}

/**
* Single-threaded executor service for xDS communication.
*/
public D2ClientBuilder setXdsExecutorService(ScheduledExecutorService xdsExecutorService) {
_config.xdsExecutorService = xdsExecutorService;
return this;
}

public D2ClientBuilder setXdsStreamReadyTimeout(long xdsStreamReadyTimeout) {
_config.xdsStreamReadyTimeout = xdsStreamReadyTimeout;
return this;
}

private Map<String, TransportClientFactory> createDefaultTransportClientFactories()
{
final Map<String, TransportClientFactory> clientFactories = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,9 @@ public class D2ClientConfig
public ServiceDiscoveryEventEmitter serviceDiscoveryEventEmitter = new LogOnlyServiceDiscoveryEventEmitter(); // default to use log-only emitter
public DualReadStateManager dualReadStateManager = null;

public ScheduledExecutorService xdsExecutorService = null;
public Long xdsStreamReadyTimeout = null;

public D2ClientConfig()
{
}
Expand Down Expand Up @@ -190,7 +193,9 @@ public D2ClientConfig()
FailoutConfigProviderFactory failoutConfigProviderFactory,
FailoutRedirectStrategy failoutRedirectStrategy,
ServiceDiscoveryEventEmitter serviceDiscoveryEventEmitter,
DualReadStateManager dualReadStateManager)
DualReadStateManager dualReadStateManager,
ScheduledExecutorService xdsExecutorService,
Long xdsStreamReadyTimeout)
{
this.zkHosts = zkHosts;
this.xdsServer = xdsServer;
Expand Down Expand Up @@ -254,5 +259,7 @@ public D2ClientConfig()
this.failoutRedirectStrategy = failoutRedirectStrategy;
this.serviceDiscoveryEventEmitter = serviceDiscoveryEventEmitter;
this.dualReadStateManager = dualReadStateManager;
this.xdsExecutorService = xdsExecutorService;
this.xdsStreamReadyTimeout = xdsStreamReadyTimeout;
}
}
2 changes: 1 addition & 1 deletion d2/src/main/java/com/linkedin/d2/xds/XdsClientImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
public class XdsClientImpl extends XdsClient
{
private static final Logger _log = LoggerFactory.getLogger(XdsClientImpl.class);
private static final long DEFAULT_READY_TIMEOUT_MILLIS = 2000L;
public static final long DEFAULT_READY_TIMEOUT_MILLIS = 2000L;

private final Map<String, ResourceSubscriber> _d2NodeSubscribers = new HashMap<>();
private final Map<String, ResourceSubscriber> _d2SymlinkNodeSubscribers = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.linkedin.r2.util.NamedThreadFactory;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.commons.lang3.ObjectUtils;


/**
Expand All @@ -47,11 +48,13 @@ public LoadBalancerWithFacilities create(D2ClientConfig config)
{
d2ClientJmxManager.registerDualReadLoadBalancerJmx(config.dualReadStateManager.getDualReadLoadBalancerJmx());
}
ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(
new NamedThreadFactory("D2 xDS PropertyEventExecutor"));

ScheduledExecutorService executorService = ObjectUtils.defaultIfNull(config.xdsExecutorService,
Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("D2 xDS PropertyEventExecutor")));
long xdsStreamReadyTimeout = ObjectUtils.defaultIfNull(config.xdsStreamReadyTimeout,
XdsClientImpl.DEFAULT_READY_TIMEOUT_MILLIS);
XdsClient xdsClient = new XdsClientImpl(new Node(config.hostName),
new XdsChannelFactory(config.grpcSslContext, config.xdsServer).createChannel(), executorService);
new XdsChannelFactory(config.grpcSslContext, config.xdsServer).createChannel(), executorService,
xdsStreamReadyTimeout);
XdsToD2PropertiesAdaptor adaptor = new XdsToD2PropertiesAdaptor(xdsClient, config.dualReadStateManager,
config.serviceDiscoveryEventEmitter);

Expand Down

0 comments on commit 52a418b

Please sign in to comment.