Skip to content

Commit

Permalink
resolve comments
Browse files Browse the repository at this point in the history
  • Loading branch information
brycezhongqing committed Oct 10, 2024
1 parent 71f30d0 commit 9c25b1b
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,4 +69,6 @@ void addUriSpecificProperty(String clusterName,
void start(Callback<None> callback);

void shutdown(Callback<None> callback);

String getConnectString();
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,15 @@

import com.linkedin.common.callback.Callback;
import com.linkedin.common.util.None;
import com.linkedin.d2.balancer.LoadBalancerServer;
import com.linkedin.d2.balancer.properties.PartitionData;
import com.linkedin.d2.balancer.properties.PropertyKeys;
import com.linkedin.d2.balancer.properties.UriProperties;
import com.linkedin.d2.balancer.util.partitions.DefaultPartitionAccessor;
import com.linkedin.d2.discovery.event.D2ServiceDiscoveryEventHelper;
import com.linkedin.d2.discovery.event.LogOnlyServiceDiscoveryEventEmitter;
import com.linkedin.d2.discovery.event.ServiceDiscoveryEventEmitter;
import com.linkedin.d2.discovery.event.ServiceDiscoveryEventEmitter.StatusUpdateActionType;
import com.linkedin.d2.discovery.event.D2ServiceDiscoveryEventHelper;
import com.linkedin.d2.discovery.stores.zk.ZooKeeperEphemeralStore;
import com.linkedin.util.ArgumentUtil;

Expand Down Expand Up @@ -66,7 +67,7 @@ public class ZooKeeperAnnouncer implements D2ServiceDiscoveryEventHelper
public static final int DEFAULT_DARK_WARMUP_DURATION = 0;
public static final String DEFAULT_DARK_WARMUP_CLUSTER_NAME = null;

private final ZooKeeperServer _server;
private final LoadBalancerServer _server;
private static final Logger _log = LoggerFactory.getLogger(ZooKeeperAnnouncer.class);
private volatile String _cluster;
private volatile URI _uri;
Expand Down Expand Up @@ -140,24 +141,24 @@ public class ZooKeeperAnnouncer implements D2ServiceDiscoveryEventHelper
// Field to store the dark warm-up time duration in seconds, defaults to zero
private int _warmupDuration;

public ZooKeeperAnnouncer(ZooKeeperServer server)
public ZooKeeperAnnouncer(LoadBalancerServer server)
{
this(server, true);
}

public ZooKeeperAnnouncer(ZooKeeperServer server, boolean initialIsUp)
public ZooKeeperAnnouncer(LoadBalancerServer server, boolean initialIsUp)
{
this(server, initialIsUp, DEFAULT_DARK_WARMUP_ENABLED, DEFAULT_DARK_WARMUP_CLUSTER_NAME, DEFAULT_DARK_WARMUP_DURATION, (ScheduledExecutorService) null);
}

public ZooKeeperAnnouncer(ZooKeeperServer server, boolean initialIsUp,
public ZooKeeperAnnouncer(LoadBalancerServer server, boolean initialIsUp,
boolean isDarkWarmupEnabled, String warmupClusterName, int warmupDuration, ScheduledExecutorService executorService)
{
this(server, initialIsUp, isDarkWarmupEnabled, warmupClusterName, warmupDuration, executorService,
new LogOnlyServiceDiscoveryEventEmitter()); // default to use log-only event emitter
}

public ZooKeeperAnnouncer(ZooKeeperServer server, boolean initialIsUp,
public ZooKeeperAnnouncer(LoadBalancerServer server, boolean initialIsUp,
boolean isDarkWarmupEnabled, String warmupClusterName, int warmupDuration, ScheduledExecutorService executorService, ServiceDiscoveryEventEmitter eventEmitter)
{
_server = server;
Expand All @@ -175,7 +176,9 @@ public ZooKeeperAnnouncer(ZooKeeperServer server, boolean initialIsUp,
_executorService = executorService;
_eventEmitter = eventEmitter;

_server.setServiceDiscoveryEventHelper(this);
if(server instanceof ZooKeeperServer){
((ZooKeeperServer) server).setServiceDiscoveryEventHelper(this);
}
}

/**
Expand Down Expand Up @@ -555,18 +558,21 @@ private void drain(Deque<Callback<None>> callbacks, @Nullable Throwable t)

public void setStore(ZooKeeperEphemeralStore<UriProperties> store)
{
store.setZnodePathAndDataCallback((cluster, path, data) -> {
if (cluster.equals(_cluster)) {
_znodePathRef.set(path);
_znodeDataRef.set(data);
} else if (cluster.equals(_warmupClusterName)) {
_warmupClusterZnodePathRef.set(path);
_warmupClusterZnodeDataRef.set(data);
} else {
_log.warn("znode path and data callback is called with unknown cluster: " + cluster + ", node path: " + path + ", and data: " + data);
}
});
_server.setStore(store);
if (_server instanceof ZooKeeperServer)
{
store.setZnodePathAndDataCallback((cluster, path, data) -> {
if (cluster.equals(_cluster)) {
_znodePathRef.set(path);
_znodeDataRef.set(data);
} else if (cluster.equals(_warmupClusterName)) {
_warmupClusterZnodePathRef.set(path);
_warmupClusterZnodeDataRef.set(data);
} else {
_log.warn("znode path and data callback is called with unknown cluster: " + cluster + ", node path: " + path + ", and data: " + data);
}
});
((ZooKeeperServer) _server).setStore(store);
}
}

public synchronized void changeWeight(final Callback<None> callback, boolean doNotSlowStart)
Expand Down

0 comments on commit 9c25b1b

Please sign in to comment.