Skip to content

Commit

Permalink
add kafka announcement only logic
Browse files Browse the repository at this point in the history
  • Loading branch information
brycezhongqing committed Oct 7, 2024
1 parent af7f497 commit 7e7a3ca
Show file tree
Hide file tree
Showing 5 changed files with 139 additions and 86 deletions.
5 changes: 4 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ and what APIs have changed, if applicable.

## [Unreleased]

## [29.58.12] - 2024-10-07
- Add support for announcing/deannoucing service only to INDIS
## [29.58.11] - 2024-10-03
- Add getters in ZookeeperAnnouncer

Expand Down Expand Up @@ -5743,7 +5745,8 @@ patch operations can re-use these classes for generating patch messages.

## [0.14.1]

[Unreleased]: https://github.com/linkedin/rest.li/compare/v29.58.11...master
[Unreleased]: https://github.com/linkedin/rest.li/compare/v29.58.12...master
[29.58.12]: https://github.com/linkedin/rest.li/compare/v29.58.11...v29.58.12
[29.58.11]: https://github.com/linkedin/rest.li/compare/v29.58.10...v29.58.11
[29.58.10]: https://github.com/linkedin/rest.li/compare/v29.58.9...v29.58.10
[29.58.9]: https://github.com/linkedin/rest.li/compare/v29.58.8...v29.58.9
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
package com.linkedin.d2.balancer.servers;

import com.linkedin.common.callback.Callback;
import com.linkedin.common.callback.Callbacks;
import com.linkedin.common.util.None;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class ConnectionManager
{
private final ZooKeeperAnnouncer[] _servers;
private final String _zkConnectString;
private final int _zkSessionTimeout;
private final String _zkBasePath;

private static final Logger LOG = LoggerFactory.getLogger(ConnectionManager.class);

protected ConnectionManager(ZooKeeperAnnouncer[] servers, String zkConnectString, int zkSessionTimeout, String zkBasePath)
{
_servers = servers;
_zkConnectString = zkConnectString;
_zkSessionTimeout = zkSessionTimeout;
_zkBasePath = zkBasePath;
}

abstract public void start(Callback<None> callback);

abstract public void shutdown(final Callback<None> callback);

public void markDownAllServers(final Callback<None> callback)
{
Callback<None> markDownCallback;
if (callback != null)
{
markDownCallback = callback;
}
else
{
markDownCallback = new Callback<None>()
{
@Override
public void onError(Throwable e)
{
LOG.error("failed to mark down servers", e);
}

@Override
public void onSuccess(None result)
{
LOG.info("mark down all servers successful");
}
};
}
Callback<None> multiCallback = Callbacks.countDown(markDownCallback, _servers.length);
for (ZooKeeperAnnouncer server : _servers)
{
server.markDown(multiCallback);
}
}

public void markUpAllServers(final Callback<None> callback)
{
Callback<None> markUpCallback;
if (callback != null)
{
markUpCallback = callback;
}
else
{
markUpCallback = new Callback<None>()
{
@Override
public void onError(Throwable e)
{
LOG.error("failed to mark up servers", e);
}

@Override
public void onSuccess(None result)
{
LOG.info("mark up all servers successful");
}
};
}
Callback<None> multiCallback = Callbacks.countDown(markUpCallback, _servers.length);
for (ZooKeeperAnnouncer server : _servers)
{
server.markUp(multiCallback);
}

}

public ZooKeeperAnnouncer[] getAnnouncers()
{
return _servers;
}

abstract public boolean isSessionEstablished();

public String getZooKeeperConnectString()
{
return _zkConnectString;
}

public int getZooKeeperSessionTimeout()
{
return _zkSessionTimeout;
}

public String getZooKeeperBasePath()
{
return _zkBasePath;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
* @version $Revision: $
*/

public class ZooKeeperConnectionManager
public class ZooKeeperConnectionManager extends ConnectionManager
{
private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperConnectionManager.class);

Expand Down Expand Up @@ -79,6 +79,10 @@ public ZooKeeperConnectionManager(ZKPersistentConnection zkConnection,
ZKStoreFactory<UriProperties,ZooKeeperEphemeralStore<UriProperties>> factory,
ZooKeeperAnnouncer... servers)
{
super(servers,
zkConnection.getZKConnection().getConnectString(),
zkConnection.getZKConnection().getTimeout(),
zkBasePath);
_zkBasePath = zkBasePath;
_zkConnection = zkConnection;
_factory = factory;
Expand All @@ -94,6 +98,10 @@ public ZooKeeperConnectionManager(String zkConnectString, int zkSessionTimeout,
ZKStoreFactory<UriProperties,ZooKeeperEphemeralStore<UriProperties>> factory,
ZooKeeperAnnouncer... servers)
{
super(servers,
zkConnectString,
zkSessionTimeout,
zkBasePath);
_zkConnectString = zkConnectString;
_zkSessionTimeout = zkSessionTimeout;
_zkBasePath = zkBasePath;
Expand Down Expand Up @@ -132,6 +140,7 @@ public ZooKeeperConnectionManager(String zkConnectString, int zkSessionTimeout,
this(zkConnectString, zkSessionTimeout, zkBasePath, factory, servers);
}

@Override
public void start(Callback<None> callback)
{
_managerStarted = true;
Expand All @@ -154,6 +163,7 @@ public void start(Callback<None> callback)
}
}

@Override
public void shutdown(final Callback<None> callback)
{
_managerStarted = false;
Expand All @@ -179,69 +189,6 @@ protected None convertResponse(None none) throws Exception
zkCloseCallback.onSuccess(None.none());
}
}

public void markDownAllServers(final Callback<None> callback)
{
Callback<None> markDownCallback;
if (callback != null)
{
markDownCallback = callback;
}
else
{
markDownCallback = new Callback<None>()
{
@Override
public void onError(Throwable e)
{
LOG.error("failed to mark down servers", e);
}

@Override
public void onSuccess(None result)
{
LOG.info("mark down all servers successful");
}
};
}
Callback<None> multiCallback = Callbacks.countDown(markDownCallback, _servers.length);
for (ZooKeeperAnnouncer server : _servers)
{
server.markDown(multiCallback);
}
}

public void markUpAllServers(final Callback<None> callback)
{
Callback<None> markUpCallback;
if (callback != null)
{
markUpCallback = callback;
}
else
{
markUpCallback = new Callback<None>()
{
@Override
public void onError(Throwable e)
{
LOG.error("failed to mark up servers", e);
}

@Override
public void onSuccess(None result)
{
LOG.info("mark up all servers successful");
}
};
}
Callback<None> multiCallback = Callbacks.countDown(markUpCallback, _servers.length);
for (ZooKeeperAnnouncer server : _servers)
{
server.markUp(multiCallback);
}
}

private class Listener implements ZKPersistentConnection.EventListener
{
@Override
Expand Down Expand Up @@ -353,28 +300,9 @@ public interface ZKStoreFactory<P, Z extends ZooKeeperStore<P>>
Z createStore(ZKConnection connection, String path);
}

public ZooKeeperAnnouncer[] getAnnouncers()
{
return _servers;
}

@Override
public boolean isSessionEstablished()
{
return _sessionEstablished;
}

public String getZooKeeperConnectString()
{
return _zkConnectString;
}

public int getZooKeeperSessionTimeout()
{
return _zkSessionTimeout;
}

public String getZooKeeperBasePath()
{
return _zkBasePath;
}
}
8 changes: 8 additions & 0 deletions deprecate.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
grep -o "include '[^']*'" settings.gradle | sed -e "s/^include '/com.linkedin.pegasus:/g" -e "s/'//g" | while read -r module_name ; do
if [ "$module_name" == "com.linkedin.pegasus:gradle-plugins" ]
then
echo "WARNING: $module_name cannot be deprecated due to MPPCX-7165. Skipping deprecation..."
else
mint catalog deprecate "$module_name" "$@"
fi
done
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
version=29.58.11
version=29.58.12
group=com.linkedin.pegasus
org.gradle.configureondemand=true
org.gradle.parallel=true
Expand Down

0 comments on commit 7e7a3ca

Please sign in to comment.