Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add kafka announcement only logic #1027

Merged
merged 12 commits into from
Oct 15, 2024
5 changes: 4 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ and what APIs have changed, if applicable.

## [Unreleased]

## [29.58.12] - 2024-10-07
- Add support for announcing/deannoucing service only to INDIS
## [29.58.11] - 2024-10-03
brycezhongqing marked this conversation as resolved.
Show resolved Hide resolved
- Add getters in ZookeeperAnnouncer

Expand Down Expand Up @@ -5743,7 +5745,8 @@ patch operations can re-use these classes for generating patch messages.

## [0.14.1]

[Unreleased]: https://github.com/linkedin/rest.li/compare/v29.58.11...master
[Unreleased]: https://github.com/linkedin/rest.li/compare/v29.58.12...master
[29.58.12]: https://github.com/linkedin/rest.li/compare/v29.58.11...v29.58.12
[29.58.11]: https://github.com/linkedin/rest.li/compare/v29.58.10...v29.58.11
[29.58.10]: https://github.com/linkedin/rest.li/compare/v29.58.9...v29.58.10
[29.58.9]: https://github.com/linkedin/rest.li/compare/v29.58.8...v29.58.9
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package com.linkedin.d2.balancer.servers;

import com.linkedin.common.callback.Callback;
import com.linkedin.common.callback.Callbacks;
import com.linkedin.common.util.None;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class ConnectionManager
brycezhongqing marked this conversation as resolved.
Show resolved Hide resolved
{
private final ZooKeeperAnnouncer[] _servers;

private static final Logger LOG = LoggerFactory.getLogger(ConnectionManager.class);

protected ConnectionManager(ZooKeeperAnnouncer[] servers)
{
_servers = servers;
}

abstract public void start(Callback<None> callback);

abstract public void shutdown(final Callback<None> callback);

abstract public String getAnnouncementTargetIdentifier();

public void markDownAllServers(final Callback<None> callback)
{
Callback<None> markDownCallback;
if (callback != null)
{
markDownCallback = callback;
}
else
{
markDownCallback = new Callback<None>()
{
@Override
public void onError(Throwable e)
{
LOG.error("failed to mark down servers", e);
}

@Override
public void onSuccess(None result)
{
LOG.info("mark down all servers successful");
}
};
}
Callback<None> multiCallback = Callbacks.countDown(markDownCallback, _servers.length);
for (ZooKeeperAnnouncer server : _servers)
{
server.markDown(multiCallback);
}
}

public void markUpAllServers(final Callback<None> callback)
{
Callback<None> markUpCallback;
if (callback != null)
{
markUpCallback = callback;
}
else
{
markUpCallback = new Callback<None>()
{
@Override
public void onError(Throwable e)
{
LOG.error("failed to mark up servers", e);
}

@Override
public void onSuccess(None result)
{
LOG.info("mark up all servers successful");
}
};
}
Callback<None> multiCallback = Callbacks.countDown(markUpCallback, _servers.length);
for (ZooKeeperAnnouncer server : _servers)
{
server.markUp(multiCallback);
}

}

public ZooKeeperAnnouncer[] getAnnouncers()
{
return _servers;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
* @version $Revision: $
*/

public class ZooKeeperConnectionManager
public class ZooKeeperConnectionManager extends ConnectionManager
{
private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperConnectionManager.class);

Expand Down Expand Up @@ -79,6 +79,7 @@ public ZooKeeperConnectionManager(ZKPersistentConnection zkConnection,
ZKStoreFactory<UriProperties,ZooKeeperEphemeralStore<UriProperties>> factory,
ZooKeeperAnnouncer... servers)
{
super(servers);
_zkBasePath = zkBasePath;
_zkConnection = zkConnection;
_factory = factory;
Expand All @@ -94,6 +95,7 @@ public ZooKeeperConnectionManager(String zkConnectString, int zkSessionTimeout,
ZKStoreFactory<UriProperties,ZooKeeperEphemeralStore<UriProperties>> factory,
ZooKeeperAnnouncer... servers)
{
super(servers);
_zkConnectString = zkConnectString;
_zkSessionTimeout = zkSessionTimeout;
_zkBasePath = zkBasePath;
Expand Down Expand Up @@ -132,6 +134,7 @@ public ZooKeeperConnectionManager(String zkConnectString, int zkSessionTimeout,
this(zkConnectString, zkSessionTimeout, zkBasePath, factory, servers);
}

@Override
public void start(Callback<None> callback)
{
_managerStarted = true;
Expand All @@ -154,6 +157,7 @@ public void start(Callback<None> callback)
}
}

@Override
public void shutdown(final Callback<None> callback)
{
_managerStarted = false;
Expand All @@ -180,68 +184,6 @@ protected None convertResponse(None none) throws Exception
}
}

public void markDownAllServers(final Callback<None> callback)
{
Callback<None> markDownCallback;
if (callback != null)
{
markDownCallback = callback;
}
else
{
markDownCallback = new Callback<None>()
{
@Override
public void onError(Throwable e)
{
LOG.error("failed to mark down servers", e);
}

@Override
public void onSuccess(None result)
{
LOG.info("mark down all servers successful");
}
};
}
Callback<None> multiCallback = Callbacks.countDown(markDownCallback, _servers.length);
for (ZooKeeperAnnouncer server : _servers)
{
server.markDown(multiCallback);
}
}

public void markUpAllServers(final Callback<None> callback)
{
Callback<None> markUpCallback;
if (callback != null)
{
markUpCallback = callback;
}
else
{
markUpCallback = new Callback<None>()
{
@Override
public void onError(Throwable e)
{
LOG.error("failed to mark up servers", e);
}

@Override
public void onSuccess(None result)
{
LOG.info("mark up all servers successful");
}
};
}
Callback<None> multiCallback = Callbacks.countDown(markUpCallback, _servers.length);
for (ZooKeeperAnnouncer server : _servers)
{
server.markUp(multiCallback);
}
}

private class Listener implements ZKPersistentConnection.EventListener
{
@Override
Expand Down Expand Up @@ -353,9 +295,10 @@ public interface ZKStoreFactory<P, Z extends ZooKeeperStore<P>>
Z createStore(ZKConnection connection, String path);
}

public ZooKeeperAnnouncer[] getAnnouncers()
@Override
public String getAnnouncementTargetIdentifier()
{
return _servers;
return getZooKeeperConnectString();
}

public boolean isSessionEstablished()
Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
version=29.58.11
version=29.58.12
group=com.linkedin.pegasus
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

better bump the minor version since we changed the public API of ZooKeeperAnnouncer (although it's backward compatible).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Copy link
Contributor

@bohhyang bohhyang Oct 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please see my other comment about emitting SD event in container PR. It will need some changes in the callbacks of various markup/down in ZookeeperAnnouncer.

Copy link
Collaborator Author

@brycezhongqing brycezhongqing Oct 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will need some changes in the callbacks of various markup/down in ZookeeperAnnouncer.

How about directly change emitSDStatusActiveUpdateIntentAndWriteEvents ? We don't need to care about where it called

org.gradle.configureondemand=true
org.gradle.parallel=true
Expand Down
Loading