diff --git a/CHANGELOG.md b/CHANGELOG.md index 2da2f25c3..ec05a9b40 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,9 @@ and what APIs have changed, if applicable. ## [Unreleased] +## [29.59.0] - 2024-10-07 +- Add support for announcing/deannoucing service only to INDIS + ## [29.58.11] - 2024-10-03 - Add getters in ZookeeperAnnouncer @@ -5743,7 +5746,8 @@ patch operations can re-use these classes for generating patch messages. ## [0.14.1] -[Unreleased]: https://github.com/linkedin/rest.li/compare/v29.58.11...master +[Unreleased]: https://github.com/linkedin/rest.li/compare/v29.59.0...master +[29.59.0]: https://github.com/linkedin/rest.li/compare/v29.58.11...v29.59.0 [29.58.11]: https://github.com/linkedin/rest.li/compare/v29.58.10...v29.58.11 [29.58.10]: https://github.com/linkedin/rest.li/compare/v29.58.9...v29.58.10 [29.58.9]: https://github.com/linkedin/rest.li/compare/v29.58.8...v29.58.9 diff --git a/d2/src/main/java/com/linkedin/d2/balancer/LoadBalancerServer.java b/d2/src/main/java/com/linkedin/d2/balancer/LoadBalancerServer.java index f8fe92ee4..978b80fc3 100644 --- a/d2/src/main/java/com/linkedin/d2/balancer/LoadBalancerServer.java +++ b/d2/src/main/java/com/linkedin/d2/balancer/LoadBalancerServer.java @@ -69,4 +69,6 @@ void addUriSpecificProperty(String clusterName, void start(Callback callback); void shutdown(Callback callback); + + String getConnectString(); } diff --git a/d2/src/main/java/com/linkedin/d2/balancer/servers/ConnectionManager.java b/d2/src/main/java/com/linkedin/d2/balancer/servers/ConnectionManager.java new file mode 100644 index 000000000..75a61b4ba --- /dev/null +++ b/d2/src/main/java/com/linkedin/d2/balancer/servers/ConnectionManager.java @@ -0,0 +1,101 @@ +package com.linkedin.d2.balancer.servers; + +import com.linkedin.common.callback.Callback; +import com.linkedin.common.callback.Callbacks; +import com.linkedin.common.util.None; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * ConnectionManager is an abstract class responsible for managing connections to external systems. + * It can be extended to handle specific service registries (e.g., Zookeeper). + * For example, see {@link com.linkedin.d2.balancer.servers.ZooKeeperConnectionManager} for managing Zookeeper + * connections during D2 server announcements. + * This class provides basic functionalities such as start, shutdown, markDownAllServers, and markUpAllServers which + * is called during D2 server announcements/de-announcement. + */ +public abstract class ConnectionManager +{ + private final ZooKeeperAnnouncer[] _servers; + + private static final Logger LOG = LoggerFactory.getLogger(ConnectionManager.class); + + protected ConnectionManager(ZooKeeperAnnouncer[] servers) + { + _servers = servers; + } + + abstract public void start(Callback callback); + + abstract public void shutdown(final Callback callback); + + abstract public String getAnnouncementTargetIdentifier(); + + public void markDownAllServers(final Callback callback) + { + Callback markDownCallback; + if (callback != null) + { + markDownCallback = callback; + } + else + { + markDownCallback = new Callback() + { + @Override + public void onError(Throwable e) + { + LOG.error("failed to mark down servers", e); + } + + @Override + public void onSuccess(None result) + { + LOG.info("mark down all servers successful"); + } + }; + } + Callback multiCallback = Callbacks.countDown(markDownCallback, _servers.length); + for (ZooKeeperAnnouncer server : _servers) + { + server.markDown(multiCallback); + } + } + + public void markUpAllServers(final Callback callback) + { + Callback markUpCallback; + if (callback != null) + { + markUpCallback = callback; + } + else + { + markUpCallback = new Callback() + { + @Override + public void onError(Throwable e) + { + LOG.error("failed to mark up servers", e); + } + + @Override + public void onSuccess(None result) + { + LOG.info("mark up all servers successful"); + } + }; + } + Callback multiCallback = Callbacks.countDown(markUpCallback, _servers.length); + for (ZooKeeperAnnouncer server : _servers) + { + server.markUp(multiCallback); + } + + } + + public ZooKeeperAnnouncer[] getAnnouncers() + { + return _servers; + } +} diff --git a/d2/src/main/java/com/linkedin/d2/balancer/servers/ZooKeeperAnnouncer.java b/d2/src/main/java/com/linkedin/d2/balancer/servers/ZooKeeperAnnouncer.java index 29a71b9a2..0a85e8547 100644 --- a/d2/src/main/java/com/linkedin/d2/balancer/servers/ZooKeeperAnnouncer.java +++ b/d2/src/main/java/com/linkedin/d2/balancer/servers/ZooKeeperAnnouncer.java @@ -31,14 +31,15 @@ import com.linkedin.common.callback.Callback; import com.linkedin.common.util.None; +import com.linkedin.d2.balancer.LoadBalancerServer; import com.linkedin.d2.balancer.properties.PartitionData; import com.linkedin.d2.balancer.properties.PropertyKeys; import com.linkedin.d2.balancer.properties.UriProperties; import com.linkedin.d2.balancer.util.partitions.DefaultPartitionAccessor; +import com.linkedin.d2.discovery.event.D2ServiceDiscoveryEventHelper; import com.linkedin.d2.discovery.event.LogOnlyServiceDiscoveryEventEmitter; import com.linkedin.d2.discovery.event.ServiceDiscoveryEventEmitter; import com.linkedin.d2.discovery.event.ServiceDiscoveryEventEmitter.StatusUpdateActionType; -import com.linkedin.d2.discovery.event.D2ServiceDiscoveryEventHelper; import com.linkedin.d2.discovery.stores.zk.ZooKeeperEphemeralStore; import com.linkedin.util.ArgumentUtil; @@ -66,7 +67,7 @@ public class ZooKeeperAnnouncer implements D2ServiceDiscoveryEventHelper public static final int DEFAULT_DARK_WARMUP_DURATION = 0; public static final String DEFAULT_DARK_WARMUP_CLUSTER_NAME = null; - private final ZooKeeperServer _server; + private final LoadBalancerServer _server; private static final Logger _log = LoggerFactory.getLogger(ZooKeeperAnnouncer.class); private volatile String _cluster; private volatile URI _uri; @@ -140,24 +141,24 @@ public class ZooKeeperAnnouncer implements D2ServiceDiscoveryEventHelper // Field to store the dark warm-up time duration in seconds, defaults to zero private int _warmupDuration; - public ZooKeeperAnnouncer(ZooKeeperServer server) + public ZooKeeperAnnouncer(LoadBalancerServer server) { this(server, true); } - public ZooKeeperAnnouncer(ZooKeeperServer server, boolean initialIsUp) + public ZooKeeperAnnouncer(LoadBalancerServer server, boolean initialIsUp) { this(server, initialIsUp, DEFAULT_DARK_WARMUP_ENABLED, DEFAULT_DARK_WARMUP_CLUSTER_NAME, DEFAULT_DARK_WARMUP_DURATION, (ScheduledExecutorService) null); } - public ZooKeeperAnnouncer(ZooKeeperServer server, boolean initialIsUp, + public ZooKeeperAnnouncer(LoadBalancerServer server, boolean initialIsUp, boolean isDarkWarmupEnabled, String warmupClusterName, int warmupDuration, ScheduledExecutorService executorService) { this(server, initialIsUp, isDarkWarmupEnabled, warmupClusterName, warmupDuration, executorService, new LogOnlyServiceDiscoveryEventEmitter()); // default to use log-only event emitter } - public ZooKeeperAnnouncer(ZooKeeperServer server, boolean initialIsUp, + public ZooKeeperAnnouncer(LoadBalancerServer server, boolean initialIsUp, boolean isDarkWarmupEnabled, String warmupClusterName, int warmupDuration, ScheduledExecutorService executorService, ServiceDiscoveryEventEmitter eventEmitter) { _server = server; @@ -175,7 +176,10 @@ public ZooKeeperAnnouncer(ZooKeeperServer server, boolean initialIsUp, _executorService = executorService; _eventEmitter = eventEmitter; - _server.setServiceDiscoveryEventHelper(this); + if (server instanceof ZooKeeperServer) + { + ((ZooKeeperServer) server).setServiceDiscoveryEventHelper(this); + } } /** @@ -555,18 +559,21 @@ private void drain(Deque> callbacks, @Nullable Throwable t) public void setStore(ZooKeeperEphemeralStore store) { - store.setZnodePathAndDataCallback((cluster, path, data) -> { - if (cluster.equals(_cluster)) { - _znodePathRef.set(path); - _znodeDataRef.set(data); - } else if (cluster.equals(_warmupClusterName)) { - _warmupClusterZnodePathRef.set(path); - _warmupClusterZnodeDataRef.set(data); - } else { - _log.warn("znode path and data callback is called with unknown cluster: " + cluster + ", node path: " + path + ", and data: " + data); - } - }); - _server.setStore(store); + if (_server instanceof ZooKeeperServer) + { + store.setZnodePathAndDataCallback((cluster, path, data) -> { + if (cluster.equals(_cluster)) { + _znodePathRef.set(path); + _znodeDataRef.set(data); + } else if (cluster.equals(_warmupClusterName)) { + _warmupClusterZnodePathRef.set(path); + _warmupClusterZnodeDataRef.set(data); + } else { + _log.warn("znode path and data callback is called with unknown cluster: " + cluster + ", node path: " + path + ", and data: " + data); + } + }); + ((ZooKeeperServer) _server).setStore(store); + } } public synchronized void changeWeight(final Callback callback, boolean doNotSlowStart) @@ -718,6 +725,13 @@ public void setEventEmitter(ServiceDiscoveryEventEmitter emitter) { @Override public void emitSDStatusActiveUpdateIntentAndWriteEvents(String cluster, boolean isMarkUp, boolean succeeded, long startAt) { + // since SD event is sent in IndisAnnouncer for INDIS-write-only, inside ZookeeperAnnouncer, any calls to + // "emitSDStatusActiveUpdateIntentAndWriteEvents" should only happen when _server is an instance of + // ZooKeeperServer (which means it only emits the event when it's doing zk-only or dual write). + if (!(_server instanceof ZooKeeperServer)) + { + return; + } if (_eventEmitter == null) { _log.info("Service discovery event emitter in ZookeeperAnnouncer is null. Skipping emitting events."); return; diff --git a/d2/src/main/java/com/linkedin/d2/balancer/servers/ZooKeeperConnectionManager.java b/d2/src/main/java/com/linkedin/d2/balancer/servers/ZooKeeperConnectionManager.java index 1eda85e19..7d36bc218 100644 --- a/d2/src/main/java/com/linkedin/d2/balancer/servers/ZooKeeperConnectionManager.java +++ b/d2/src/main/java/com/linkedin/d2/balancer/servers/ZooKeeperConnectionManager.java @@ -46,7 +46,7 @@ * @version $Revision: $ */ -public class ZooKeeperConnectionManager +public class ZooKeeperConnectionManager extends ConnectionManager { private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperConnectionManager.class); @@ -79,6 +79,7 @@ public ZooKeeperConnectionManager(ZKPersistentConnection zkConnection, ZKStoreFactory> factory, ZooKeeperAnnouncer... servers) { + super(servers); _zkBasePath = zkBasePath; _zkConnection = zkConnection; _factory = factory; @@ -94,6 +95,7 @@ public ZooKeeperConnectionManager(String zkConnectString, int zkSessionTimeout, ZKStoreFactory> factory, ZooKeeperAnnouncer... servers) { + super(servers); _zkConnectString = zkConnectString; _zkSessionTimeout = zkSessionTimeout; _zkBasePath = zkBasePath; @@ -132,6 +134,7 @@ public ZooKeeperConnectionManager(String zkConnectString, int zkSessionTimeout, this(zkConnectString, zkSessionTimeout, zkBasePath, factory, servers); } + @Override public void start(Callback callback) { _managerStarted = true; @@ -154,6 +157,7 @@ public void start(Callback callback) } } + @Override public void shutdown(final Callback callback) { _managerStarted = false; @@ -180,68 +184,6 @@ protected None convertResponse(None none) throws Exception } } - public void markDownAllServers(final Callback callback) - { - Callback markDownCallback; - if (callback != null) - { - markDownCallback = callback; - } - else - { - markDownCallback = new Callback() - { - @Override - public void onError(Throwable e) - { - LOG.error("failed to mark down servers", e); - } - - @Override - public void onSuccess(None result) - { - LOG.info("mark down all servers successful"); - } - }; - } - Callback multiCallback = Callbacks.countDown(markDownCallback, _servers.length); - for (ZooKeeperAnnouncer server : _servers) - { - server.markDown(multiCallback); - } - } - - public void markUpAllServers(final Callback callback) - { - Callback markUpCallback; - if (callback != null) - { - markUpCallback = callback; - } - else - { - markUpCallback = new Callback() - { - @Override - public void onError(Throwable e) - { - LOG.error("failed to mark up servers", e); - } - - @Override - public void onSuccess(None result) - { - LOG.info("mark up all servers successful"); - } - }; - } - Callback multiCallback = Callbacks.countDown(markUpCallback, _servers.length); - for (ZooKeeperAnnouncer server : _servers) - { - server.markUp(multiCallback); - } - } - private class Listener implements ZKPersistentConnection.EventListener { @Override @@ -353,9 +295,10 @@ public interface ZKStoreFactory> Z createStore(ZKConnection connection, String path); } - public ZooKeeperAnnouncer[] getAnnouncers() + @Override + public String getAnnouncementTargetIdentifier() { - return _servers; + return getZooKeeperConnectString(); } public boolean isSessionEstablished() diff --git a/gradle.properties b/gradle.properties index 7854292db..87b5768bb 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,4 +1,4 @@ -version=29.58.11 +version=29.59.0 group=com.linkedin.pegasus org.gradle.configureondemand=true org.gradle.parallel=true