From 83e10380c5aaebb9997f61eb8c869cec414478a2 Mon Sep 17 00:00:00 2001 From: David Grossman Date: Fri, 18 Oct 2024 03:03:11 +0000 Subject: [PATCH] Revert "add kafka announcement only logic (#1027)" This reverts commit bd54a9835d262012402678aa9ead9813089aaf9d. --- CHANGELOG.md | 6 +- .../d2/balancer/LoadBalancerServer.java | 2 - .../balancer/servers/ConnectionManager.java | 101 ------------------ .../balancer/servers/ZooKeeperAnnouncer.java | 52 ++++----- .../servers/ZooKeeperConnectionManager.java | 73 +++++++++++-- gradle.properties | 2 +- 6 files changed, 86 insertions(+), 150 deletions(-) delete mode 100644 d2/src/main/java/com/linkedin/d2/balancer/servers/ConnectionManager.java diff --git a/CHANGELOG.md b/CHANGELOG.md index ec05a9b40d..2da2f25c37 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,9 +14,6 @@ and what APIs have changed, if applicable. ## [Unreleased] -## [29.59.0] - 2024-10-07 -- Add support for announcing/deannoucing service only to INDIS - ## [29.58.11] - 2024-10-03 - Add getters in ZookeeperAnnouncer @@ -5746,8 +5743,7 @@ patch operations can re-use these classes for generating patch messages. ## [0.14.1] -[Unreleased]: https://github.com/linkedin/rest.li/compare/v29.59.0...master -[29.59.0]: https://github.com/linkedin/rest.li/compare/v29.58.11...v29.59.0 +[Unreleased]: https://github.com/linkedin/rest.li/compare/v29.58.11...master [29.58.11]: https://github.com/linkedin/rest.li/compare/v29.58.10...v29.58.11 [29.58.10]: https://github.com/linkedin/rest.li/compare/v29.58.9...v29.58.10 [29.58.9]: https://github.com/linkedin/rest.li/compare/v29.58.8...v29.58.9 diff --git a/d2/src/main/java/com/linkedin/d2/balancer/LoadBalancerServer.java b/d2/src/main/java/com/linkedin/d2/balancer/LoadBalancerServer.java index 978b80fc3e..f8fe92ee48 100644 --- a/d2/src/main/java/com/linkedin/d2/balancer/LoadBalancerServer.java +++ b/d2/src/main/java/com/linkedin/d2/balancer/LoadBalancerServer.java @@ -69,6 +69,4 @@ void addUriSpecificProperty(String clusterName, void start(Callback callback); void shutdown(Callback callback); - - String getConnectString(); } diff --git a/d2/src/main/java/com/linkedin/d2/balancer/servers/ConnectionManager.java b/d2/src/main/java/com/linkedin/d2/balancer/servers/ConnectionManager.java deleted file mode 100644 index 75a61b4ba9..0000000000 --- a/d2/src/main/java/com/linkedin/d2/balancer/servers/ConnectionManager.java +++ /dev/null @@ -1,101 +0,0 @@ -package com.linkedin.d2.balancer.servers; - -import com.linkedin.common.callback.Callback; -import com.linkedin.common.callback.Callbacks; -import com.linkedin.common.util.None; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * ConnectionManager is an abstract class responsible for managing connections to external systems. - * It can be extended to handle specific service registries (e.g., Zookeeper). - * For example, see {@link com.linkedin.d2.balancer.servers.ZooKeeperConnectionManager} for managing Zookeeper - * connections during D2 server announcements. - * This class provides basic functionalities such as start, shutdown, markDownAllServers, and markUpAllServers which - * is called during D2 server announcements/de-announcement. - */ -public abstract class ConnectionManager -{ - private final ZooKeeperAnnouncer[] _servers; - - private static final Logger LOG = LoggerFactory.getLogger(ConnectionManager.class); - - protected ConnectionManager(ZooKeeperAnnouncer[] servers) - { - _servers = servers; - } - - abstract public void start(Callback callback); - - abstract public void shutdown(final Callback callback); - - abstract public String getAnnouncementTargetIdentifier(); - - public void markDownAllServers(final Callback callback) - { - Callback markDownCallback; - if (callback != null) - { - markDownCallback = callback; - } - else - { - markDownCallback = new Callback() - { - @Override - public void onError(Throwable e) - { - LOG.error("failed to mark down servers", e); - } - - @Override - public void onSuccess(None result) - { - LOG.info("mark down all servers successful"); - } - }; - } - Callback multiCallback = Callbacks.countDown(markDownCallback, _servers.length); - for (ZooKeeperAnnouncer server : _servers) - { - server.markDown(multiCallback); - } - } - - public void markUpAllServers(final Callback callback) - { - Callback markUpCallback; - if (callback != null) - { - markUpCallback = callback; - } - else - { - markUpCallback = new Callback() - { - @Override - public void onError(Throwable e) - { - LOG.error("failed to mark up servers", e); - } - - @Override - public void onSuccess(None result) - { - LOG.info("mark up all servers successful"); - } - }; - } - Callback multiCallback = Callbacks.countDown(markUpCallback, _servers.length); - for (ZooKeeperAnnouncer server : _servers) - { - server.markUp(multiCallback); - } - - } - - public ZooKeeperAnnouncer[] getAnnouncers() - { - return _servers; - } -} diff --git a/d2/src/main/java/com/linkedin/d2/balancer/servers/ZooKeeperAnnouncer.java b/d2/src/main/java/com/linkedin/d2/balancer/servers/ZooKeeperAnnouncer.java index 0a85e8547b..29a71b9a2e 100644 --- a/d2/src/main/java/com/linkedin/d2/balancer/servers/ZooKeeperAnnouncer.java +++ b/d2/src/main/java/com/linkedin/d2/balancer/servers/ZooKeeperAnnouncer.java @@ -31,15 +31,14 @@ import com.linkedin.common.callback.Callback; import com.linkedin.common.util.None; -import com.linkedin.d2.balancer.LoadBalancerServer; import com.linkedin.d2.balancer.properties.PartitionData; import com.linkedin.d2.balancer.properties.PropertyKeys; import com.linkedin.d2.balancer.properties.UriProperties; import com.linkedin.d2.balancer.util.partitions.DefaultPartitionAccessor; -import com.linkedin.d2.discovery.event.D2ServiceDiscoveryEventHelper; import com.linkedin.d2.discovery.event.LogOnlyServiceDiscoveryEventEmitter; import com.linkedin.d2.discovery.event.ServiceDiscoveryEventEmitter; import com.linkedin.d2.discovery.event.ServiceDiscoveryEventEmitter.StatusUpdateActionType; +import com.linkedin.d2.discovery.event.D2ServiceDiscoveryEventHelper; import com.linkedin.d2.discovery.stores.zk.ZooKeeperEphemeralStore; import com.linkedin.util.ArgumentUtil; @@ -67,7 +66,7 @@ public class ZooKeeperAnnouncer implements D2ServiceDiscoveryEventHelper public static final int DEFAULT_DARK_WARMUP_DURATION = 0; public static final String DEFAULT_DARK_WARMUP_CLUSTER_NAME = null; - private final LoadBalancerServer _server; + private final ZooKeeperServer _server; private static final Logger _log = LoggerFactory.getLogger(ZooKeeperAnnouncer.class); private volatile String _cluster; private volatile URI _uri; @@ -141,24 +140,24 @@ public class ZooKeeperAnnouncer implements D2ServiceDiscoveryEventHelper // Field to store the dark warm-up time duration in seconds, defaults to zero private int _warmupDuration; - public ZooKeeperAnnouncer(LoadBalancerServer server) + public ZooKeeperAnnouncer(ZooKeeperServer server) { this(server, true); } - public ZooKeeperAnnouncer(LoadBalancerServer server, boolean initialIsUp) + public ZooKeeperAnnouncer(ZooKeeperServer server, boolean initialIsUp) { this(server, initialIsUp, DEFAULT_DARK_WARMUP_ENABLED, DEFAULT_DARK_WARMUP_CLUSTER_NAME, DEFAULT_DARK_WARMUP_DURATION, (ScheduledExecutorService) null); } - public ZooKeeperAnnouncer(LoadBalancerServer server, boolean initialIsUp, + public ZooKeeperAnnouncer(ZooKeeperServer server, boolean initialIsUp, boolean isDarkWarmupEnabled, String warmupClusterName, int warmupDuration, ScheduledExecutorService executorService) { this(server, initialIsUp, isDarkWarmupEnabled, warmupClusterName, warmupDuration, executorService, new LogOnlyServiceDiscoveryEventEmitter()); // default to use log-only event emitter } - public ZooKeeperAnnouncer(LoadBalancerServer server, boolean initialIsUp, + public ZooKeeperAnnouncer(ZooKeeperServer server, boolean initialIsUp, boolean isDarkWarmupEnabled, String warmupClusterName, int warmupDuration, ScheduledExecutorService executorService, ServiceDiscoveryEventEmitter eventEmitter) { _server = server; @@ -176,10 +175,7 @@ public ZooKeeperAnnouncer(LoadBalancerServer server, boolean initialIsUp, _executorService = executorService; _eventEmitter = eventEmitter; - if (server instanceof ZooKeeperServer) - { - ((ZooKeeperServer) server).setServiceDiscoveryEventHelper(this); - } + _server.setServiceDiscoveryEventHelper(this); } /** @@ -559,21 +555,18 @@ private void drain(Deque> callbacks, @Nullable Throwable t) public void setStore(ZooKeeperEphemeralStore store) { - if (_server instanceof ZooKeeperServer) - { - store.setZnodePathAndDataCallback((cluster, path, data) -> { - if (cluster.equals(_cluster)) { - _znodePathRef.set(path); - _znodeDataRef.set(data); - } else if (cluster.equals(_warmupClusterName)) { - _warmupClusterZnodePathRef.set(path); - _warmupClusterZnodeDataRef.set(data); - } else { - _log.warn("znode path and data callback is called with unknown cluster: " + cluster + ", node path: " + path + ", and data: " + data); - } - }); - ((ZooKeeperServer) _server).setStore(store); - } + store.setZnodePathAndDataCallback((cluster, path, data) -> { + if (cluster.equals(_cluster)) { + _znodePathRef.set(path); + _znodeDataRef.set(data); + } else if (cluster.equals(_warmupClusterName)) { + _warmupClusterZnodePathRef.set(path); + _warmupClusterZnodeDataRef.set(data); + } else { + _log.warn("znode path and data callback is called with unknown cluster: " + cluster + ", node path: " + path + ", and data: " + data); + } + }); + _server.setStore(store); } public synchronized void changeWeight(final Callback callback, boolean doNotSlowStart) @@ -725,13 +718,6 @@ public void setEventEmitter(ServiceDiscoveryEventEmitter emitter) { @Override public void emitSDStatusActiveUpdateIntentAndWriteEvents(String cluster, boolean isMarkUp, boolean succeeded, long startAt) { - // since SD event is sent in IndisAnnouncer for INDIS-write-only, inside ZookeeperAnnouncer, any calls to - // "emitSDStatusActiveUpdateIntentAndWriteEvents" should only happen when _server is an instance of - // ZooKeeperServer (which means it only emits the event when it's doing zk-only or dual write). - if (!(_server instanceof ZooKeeperServer)) - { - return; - } if (_eventEmitter == null) { _log.info("Service discovery event emitter in ZookeeperAnnouncer is null. Skipping emitting events."); return; diff --git a/d2/src/main/java/com/linkedin/d2/balancer/servers/ZooKeeperConnectionManager.java b/d2/src/main/java/com/linkedin/d2/balancer/servers/ZooKeeperConnectionManager.java index 7d36bc2182..1eda85e193 100644 --- a/d2/src/main/java/com/linkedin/d2/balancer/servers/ZooKeeperConnectionManager.java +++ b/d2/src/main/java/com/linkedin/d2/balancer/servers/ZooKeeperConnectionManager.java @@ -46,7 +46,7 @@ * @version $Revision: $ */ -public class ZooKeeperConnectionManager extends ConnectionManager +public class ZooKeeperConnectionManager { private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperConnectionManager.class); @@ -79,7 +79,6 @@ public ZooKeeperConnectionManager(ZKPersistentConnection zkConnection, ZKStoreFactory> factory, ZooKeeperAnnouncer... servers) { - super(servers); _zkBasePath = zkBasePath; _zkConnection = zkConnection; _factory = factory; @@ -95,7 +94,6 @@ public ZooKeeperConnectionManager(String zkConnectString, int zkSessionTimeout, ZKStoreFactory> factory, ZooKeeperAnnouncer... servers) { - super(servers); _zkConnectString = zkConnectString; _zkSessionTimeout = zkSessionTimeout; _zkBasePath = zkBasePath; @@ -134,7 +132,6 @@ public ZooKeeperConnectionManager(String zkConnectString, int zkSessionTimeout, this(zkConnectString, zkSessionTimeout, zkBasePath, factory, servers); } - @Override public void start(Callback callback) { _managerStarted = true; @@ -157,7 +154,6 @@ public void start(Callback callback) } } - @Override public void shutdown(final Callback callback) { _managerStarted = false; @@ -184,6 +180,68 @@ protected None convertResponse(None none) throws Exception } } + public void markDownAllServers(final Callback callback) + { + Callback markDownCallback; + if (callback != null) + { + markDownCallback = callback; + } + else + { + markDownCallback = new Callback() + { + @Override + public void onError(Throwable e) + { + LOG.error("failed to mark down servers", e); + } + + @Override + public void onSuccess(None result) + { + LOG.info("mark down all servers successful"); + } + }; + } + Callback multiCallback = Callbacks.countDown(markDownCallback, _servers.length); + for (ZooKeeperAnnouncer server : _servers) + { + server.markDown(multiCallback); + } + } + + public void markUpAllServers(final Callback callback) + { + Callback markUpCallback; + if (callback != null) + { + markUpCallback = callback; + } + else + { + markUpCallback = new Callback() + { + @Override + public void onError(Throwable e) + { + LOG.error("failed to mark up servers", e); + } + + @Override + public void onSuccess(None result) + { + LOG.info("mark up all servers successful"); + } + }; + } + Callback multiCallback = Callbacks.countDown(markUpCallback, _servers.length); + for (ZooKeeperAnnouncer server : _servers) + { + server.markUp(multiCallback); + } + } + private class Listener implements ZKPersistentConnection.EventListener { @Override @@ -295,10 +353,9 @@ public interface ZKStoreFactory> Z createStore(ZKConnection connection, String path); } - @Override - public String getAnnouncementTargetIdentifier() + public ZooKeeperAnnouncer[] getAnnouncers() { - return getZooKeeperConnectString(); + return _servers; } public boolean isSessionEstablished() diff --git a/gradle.properties b/gradle.properties index 87b5768bbd..7854292dbb 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,4 +1,4 @@ -version=29.59.0 +version=29.58.11 group=com.linkedin.pegasus org.gradle.configureondemand=true org.gradle.parallel=true