From 7e7a3ca9f809acf3fbc82f8d15d705ea27f5b953 Mon Sep 17 00:00:00 2001 From: brycezhongqing Date: Mon, 7 Oct 2024 16:10:45 -0700 Subject: [PATCH] add kafka announcement only logic --- CHANGELOG.md | 5 +- .../balancer/servers/ConnectionManager.java | 114 ++++++++++++++++++ .../servers/ZooKeeperConnectionManager.java | 96 ++------------- deprecate.sh | 8 ++ gradle.properties | 2 +- 5 files changed, 139 insertions(+), 86 deletions(-) create mode 100644 d2/src/main/java/com/linkedin/d2/balancer/servers/ConnectionManager.java create mode 100644 deprecate.sh diff --git a/CHANGELOG.md b/CHANGELOG.md index 2da2f25c37..d2e9752ca0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,8 @@ and what APIs have changed, if applicable. ## [Unreleased] +## [29.58.12] - 2024-10-07 +- Add support for announcing/deannoucing service only to INDIS ## [29.58.11] - 2024-10-03 - Add getters in ZookeeperAnnouncer @@ -5743,7 +5745,8 @@ patch operations can re-use these classes for generating patch messages. ## [0.14.1] -[Unreleased]: https://github.com/linkedin/rest.li/compare/v29.58.11...master +[Unreleased]: https://github.com/linkedin/rest.li/compare/v29.58.12...master +[29.58.12]: https://github.com/linkedin/rest.li/compare/v29.58.11...v29.58.12 [29.58.11]: https://github.com/linkedin/rest.li/compare/v29.58.10...v29.58.11 [29.58.10]: https://github.com/linkedin/rest.li/compare/v29.58.9...v29.58.10 [29.58.9]: https://github.com/linkedin/rest.li/compare/v29.58.8...v29.58.9 diff --git a/d2/src/main/java/com/linkedin/d2/balancer/servers/ConnectionManager.java b/d2/src/main/java/com/linkedin/d2/balancer/servers/ConnectionManager.java new file mode 100644 index 0000000000..86e5f97d84 --- /dev/null +++ b/d2/src/main/java/com/linkedin/d2/balancer/servers/ConnectionManager.java @@ -0,0 +1,114 @@ +package com.linkedin.d2.balancer.servers; + +import com.linkedin.common.callback.Callback; +import com.linkedin.common.callback.Callbacks; +import com.linkedin.common.util.None; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public abstract class ConnectionManager +{ + private final ZooKeeperAnnouncer[] _servers; + private final String _zkConnectString; + private final int _zkSessionTimeout; + private final String _zkBasePath; + + private static final Logger LOG = LoggerFactory.getLogger(ConnectionManager.class); + + protected ConnectionManager(ZooKeeperAnnouncer[] servers, String zkConnectString, int zkSessionTimeout, String zkBasePath) + { + _servers = servers; + _zkConnectString = zkConnectString; + _zkSessionTimeout = zkSessionTimeout; + _zkBasePath = zkBasePath; + } + + abstract public void start(Callback callback); + + abstract public void shutdown(final Callback callback); + + public void markDownAllServers(final Callback callback) + { + Callback markDownCallback; + if (callback != null) + { + markDownCallback = callback; + } + else + { + markDownCallback = new Callback() + { + @Override + public void onError(Throwable e) + { + LOG.error("failed to mark down servers", e); + } + + @Override + public void onSuccess(None result) + { + LOG.info("mark down all servers successful"); + } + }; + } + Callback multiCallback = Callbacks.countDown(markDownCallback, _servers.length); + for (ZooKeeperAnnouncer server : _servers) + { + server.markDown(multiCallback); + } + } + + public void markUpAllServers(final Callback callback) + { + Callback markUpCallback; + if (callback != null) + { + markUpCallback = callback; + } + else + { + markUpCallback = new Callback() + { + @Override + public void onError(Throwable e) + { + LOG.error("failed to mark up servers", e); + } + + @Override + public void onSuccess(None result) + { + LOG.info("mark up all servers successful"); + } + }; + } + Callback multiCallback = Callbacks.countDown(markUpCallback, _servers.length); + for (ZooKeeperAnnouncer server : _servers) + { + server.markUp(multiCallback); + } + + } + + public ZooKeeperAnnouncer[] getAnnouncers() + { + return _servers; + } + + abstract public boolean isSessionEstablished(); + + public String getZooKeeperConnectString() + { + return _zkConnectString; + } + + public int getZooKeeperSessionTimeout() + { + return _zkSessionTimeout; + } + + public String getZooKeeperBasePath() + { + return _zkBasePath; + } +} diff --git a/d2/src/main/java/com/linkedin/d2/balancer/servers/ZooKeeperConnectionManager.java b/d2/src/main/java/com/linkedin/d2/balancer/servers/ZooKeeperConnectionManager.java index 1eda85e193..f1d6073054 100644 --- a/d2/src/main/java/com/linkedin/d2/balancer/servers/ZooKeeperConnectionManager.java +++ b/d2/src/main/java/com/linkedin/d2/balancer/servers/ZooKeeperConnectionManager.java @@ -46,7 +46,7 @@ * @version $Revision: $ */ -public class ZooKeeperConnectionManager +public class ZooKeeperConnectionManager extends ConnectionManager { private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperConnectionManager.class); @@ -79,6 +79,10 @@ public ZooKeeperConnectionManager(ZKPersistentConnection zkConnection, ZKStoreFactory> factory, ZooKeeperAnnouncer... servers) { + super(servers, + zkConnection.getZKConnection().getConnectString(), + zkConnection.getZKConnection().getTimeout(), + zkBasePath); _zkBasePath = zkBasePath; _zkConnection = zkConnection; _factory = factory; @@ -94,6 +98,10 @@ public ZooKeeperConnectionManager(String zkConnectString, int zkSessionTimeout, ZKStoreFactory> factory, ZooKeeperAnnouncer... servers) { + super(servers, + zkConnectString, + zkSessionTimeout, + zkBasePath); _zkConnectString = zkConnectString; _zkSessionTimeout = zkSessionTimeout; _zkBasePath = zkBasePath; @@ -132,6 +140,7 @@ public ZooKeeperConnectionManager(String zkConnectString, int zkSessionTimeout, this(zkConnectString, zkSessionTimeout, zkBasePath, factory, servers); } + @Override public void start(Callback callback) { _managerStarted = true; @@ -154,6 +163,7 @@ public void start(Callback callback) } } + @Override public void shutdown(final Callback callback) { _managerStarted = false; @@ -179,69 +189,6 @@ protected None convertResponse(None none) throws Exception zkCloseCallback.onSuccess(None.none()); } } - - public void markDownAllServers(final Callback callback) - { - Callback markDownCallback; - if (callback != null) - { - markDownCallback = callback; - } - else - { - markDownCallback = new Callback() - { - @Override - public void onError(Throwable e) - { - LOG.error("failed to mark down servers", e); - } - - @Override - public void onSuccess(None result) - { - LOG.info("mark down all servers successful"); - } - }; - } - Callback multiCallback = Callbacks.countDown(markDownCallback, _servers.length); - for (ZooKeeperAnnouncer server : _servers) - { - server.markDown(multiCallback); - } - } - - public void markUpAllServers(final Callback callback) - { - Callback markUpCallback; - if (callback != null) - { - markUpCallback = callback; - } - else - { - markUpCallback = new Callback() - { - @Override - public void onError(Throwable e) - { - LOG.error("failed to mark up servers", e); - } - - @Override - public void onSuccess(None result) - { - LOG.info("mark up all servers successful"); - } - }; - } - Callback multiCallback = Callbacks.countDown(markUpCallback, _servers.length); - for (ZooKeeperAnnouncer server : _servers) - { - server.markUp(multiCallback); - } - } - private class Listener implements ZKPersistentConnection.EventListener { @Override @@ -353,28 +300,9 @@ public interface ZKStoreFactory> Z createStore(ZKConnection connection, String path); } - public ZooKeeperAnnouncer[] getAnnouncers() - { - return _servers; - } - + @Override public boolean isSessionEstablished() { return _sessionEstablished; } - - public String getZooKeeperConnectString() - { - return _zkConnectString; - } - - public int getZooKeeperSessionTimeout() - { - return _zkSessionTimeout; - } - - public String getZooKeeperBasePath() - { - return _zkBasePath; - } } diff --git a/deprecate.sh b/deprecate.sh new file mode 100644 index 0000000000..ad97da56c2 --- /dev/null +++ b/deprecate.sh @@ -0,0 +1,8 @@ +grep -o "include '[^']*'" settings.gradle | sed -e "s/^include '/com.linkedin.pegasus:/g" -e "s/'//g" | while read -r module_name ; do + if [ "$module_name" == "com.linkedin.pegasus:gradle-plugins" ] + then + echo "WARNING: $module_name cannot be deprecated due to MPPCX-7165. Skipping deprecation..." + else + mint catalog deprecate "$module_name" "$@" + fi +done diff --git a/gradle.properties b/gradle.properties index 7854292dbb..e7075f74d1 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,4 +1,4 @@ -version=29.58.11 +version=29.58.12 group=com.linkedin.pegasus org.gradle.configureondemand=true org.gradle.parallel=true