Skip to content

Commit

Permalink
[LI-HOTFIX] Change federated topic znode structure (#495)
Browse files Browse the repository at this point in the history
This PR changes the federated topic znodes from
/kafka-tracking/federatedTopics/PageViewEvent(data = tracking) to
/kafka-tracking/federatedTopics/tracking/PageViewEvent

to save space in zk as well as simplify logic in mario side. Mario will make sure topic name uniqueness within same kafka cluster.

LI_DESCRIPTION =
To save space in zk as well as simplify logic in mario side, federated topic znodes structure are changed to encode namespace into the znode path instead of storing it in data part.

Added integ-test to verify list create/delete/list federated topic znode api behavior

EXIT_CRITERIA = We can deprecate this pr when all kafka clients have been migrated to xinfra clients and all topic CUDs go through xmd, then we don't need kafka broker to understand both old and new topic acl, it will only need to understand the new acl.
  • Loading branch information
kehuum authored Nov 9, 2023
1 parent a62822a commit 64c29e7
Show file tree
Hide file tree
Showing 9 changed files with 72 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ default ListTopicsResult listTopics() {
* @return all federated topic znodes, formatted /namespace/topic
*/
default ListFederatedTopicZnodesResult listFederatedTopicZnodes() {
return listFederatedTopicZnodes(Collections.emptyList(), new ListFederatedTopicZnodesOptions());
return listFederatedTopicZnodes(Collections.emptyMap(), new ListFederatedTopicZnodesOptions());
}

/**
Expand All @@ -326,7 +326,7 @@ default ListFederatedTopicZnodesResult listFederatedTopicZnodes() {
* @return empty list if the given topic names' znode don't exist; otherwise return the federated topics formatted
* /namespace/topic
*/
ListFederatedTopicZnodesResult listFederatedTopicZnodes(List<String> federatedTopics,
ListFederatedTopicZnodesResult listFederatedTopicZnodes(Map<String, String> federatedTopics,
ListFederatedTopicZnodesOptions options);

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2033,12 +2033,14 @@ void handleFailure(Throwable throwable) {
}

@Override
public ListFederatedTopicZnodesResult listFederatedTopicZnodes(List<String> federatedTopics,
public ListFederatedTopicZnodesResult listFederatedTopicZnodes(Map<String, String> federatedTopics,
ListFederatedTopicZnodesOptions options) {
final KafkaFutureImpl<List<String>> federatedTopicZnodesListingFuture = new KafkaFutureImpl<>();
List<LiListFederatedTopicZnodesRequestData.FederatedTopics> topicsRequested = new ArrayList<>();
federatedTopics.forEach(topic ->
topicsRequested.add(new LiListFederatedTopicZnodesRequestData.FederatedTopics().setName(topic)));
federatedTopics.forEach((topic, namespace) ->
topicsRequested.add(
new LiListFederatedTopicZnodesRequestData.FederatedTopics().setName(topic).setNamespace(namespace)
));
final long now = time.milliseconds();
runnable.call(new Call("listFederatedTopicZnodes", calcDeadlineMs(now, options.timeoutMs()),
new LeastLoadedNodeProvider()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ public ListTopicsResult listTopics(ListTopicsOptions options) {
}

@Override
public ListFederatedTopicZnodesResult listFederatedTopicZnodes(List<String> federatedTopics,
ListFederatedTopicZnodesOptions options) {
public ListFederatedTopicZnodesResult listFederatedTopicZnodes(Map<String, String> federatedTopics,
ListFederatedTopicZnodesOptions options) {
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@
"fields": [
{ "name": "Topics", "type": "[]FederatedTopics", "versions": "0+", "about": "The simple name of the federated topics",
"fields": [
{"name": "Name", "type": "string", "versions": "0+", "about": "The topic name"}
{"name": "Name", "type": "string", "versions": "0+", "about": "The topic name"},
{"name": "Namespace", "type": "string", "versions": "0+", "about": "The namespace of the topic"}
]}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -361,8 +361,8 @@ synchronized public ListTopicsResult listTopics(ListTopicsOptions options) {
}

@Override
public ListFederatedTopicZnodesResult listFederatedTopicZnodes(List<String> federatedTopics,
ListFederatedTopicZnodesOptions options) {
public ListFederatedTopicZnodesResult listFederatedTopicZnodes(Map<String, String> federatedTopics,
ListFederatedTopicZnodesOptions options) {
throw new UnsupportedOperationException("Not implemented yet");
}

Expand Down
23 changes: 13 additions & 10 deletions core/src/main/scala/kafka/server/KafkaApis.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3801,9 +3801,16 @@ class KafkaApis(val requestChannel: RequestChannel,
}
}

val toDeleteZNodes = mutable.Map[String, String]()
federatedTopicZnodesDeleteRequest.data.topics.forEach { topic =>
if (results.find(topic.name).errorCode == Errors.NONE.code) {
toDeleteZNodes += topic.name -> topic.namespace
}
}

try {
toDelete.foreach(federatedTopic => {
zkSupport.zkClient.deleteFederatedTopicZNode(federatedTopic)
toDeleteZNodes.foreach(federatedTopic => {
zkSupport.zkClient.deleteFederatedTopicZNode(federatedTopic._1, federatedTopic._2)
})
requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
LiDeleteFederatedTopicZnodesResponse.prepareResponse(Errors.NONE, requestThrottleMs,
Expand Down Expand Up @@ -3844,14 +3851,10 @@ class KafkaApis(val requestChannel: RequestChannel,
)
} else {
// if non-empty list passed, only list znode values for the given topics
val foundFederatedTopicZnodes = mutable.Set[String]()

requestedTopics.forEach(topic => {
val curFederatedTopicZnode = zkSupport.zkClient.getFederatedTopic(topic.name())
if (curFederatedTopicZnode != null) {
foundFederatedTopicZnodes.add(curFederatedTopicZnode)
}
})
val foundFederatedTopicZnodes = requestedTopics.asScala
// from a list of Options, flatten extracts value from Some, and nothing from None
.flatMap(topic => zkSupport.zkClient.getFederatedTopic(topic.name(), topic.namespace()))
.toSet

requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
new LiListFederatedTopicZnodesResponse(
Expand Down
73 changes: 42 additions & 31 deletions core/src/main/scala/kafka/zk/KafkaZkClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -145,37 +145,25 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient,
}.toMap
}

def getFederatedTopic(topic: String): String = {
val getDataRequest = GetDataRequest(FederatedTopicZnode.path(topic))
val getDataResponse = retryRequestUntilConnected(getDataRequest)
getDataResponse.resultCode match {
case Code.OK => "/" + FederatedTopicZnode.decode(getDataResponse.data) + "/" + topic
case Code.NONODE => null
case _ => throw getDataResponse.resultException.get
private[kafka] def getFederatedTopic(topic: String, namespace: String): Option[String] = {
if (pathExists(FederatedTopicZnode.path(topic, namespace))) {
Some(s"/$namespace/$topic")
} else {
None
}
}

def getAllFederatedTopics: Set[String] = {
val topics = getChildren(FederatedTopicsZNode.path)

val merge: ((String, String)) => String = {
case (key, value) => "/" + value + "/" + key
}

val getDataRequests = topics.map(topic => GetDataRequest(
FederatedTopicZnode.path(topic),
ctx = Some(topic)))

val getDataResponses = retryRequestsUntilConnected(getDataRequests)
getDataResponses.flatMap { getDataResponse =>
val topic = getDataResponse.ctx.get.asInstanceOf[String]
getDataResponse.resultCode match {
case Code.OK => Some(topic, FederatedTopicZnode.decode(getDataResponse.data))
case Code.NONODE => None
case _ => throw getDataResponse.resultException.get
}
}.toMap.map(merge)
}.toSet
val namespaces = getChildren(FederatedTopicsZNode.path)
namespaces
// For all topics, generate (topic -> namespace) tuple
.flatMap(namespace => getAllFederatedTopicsInNamespace(namespace).map(_ -> namespace))
// To map to merge potential duplicate of topic -> namespace
.toMap
// Serialize to znode paths
.map { case (topic: String, namespace: String) => s"/$namespace/$topic" }
.toSet
}

/**
* Registers a given broker in zookeeper as the controller and increments controller epoch.
Expand Down Expand Up @@ -629,6 +617,29 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient,
}
}

/**
* Gets all federated topics in the namespace.
* @param registerWatch indicates if a watch must be registered or not
* @return sequence of topics in the cluster.
*
*/
def getAllFederatedTopicsInNamespace(namespace: String, registerWatch: Boolean = false): Set[String] = {
val getChildrenResponse = retryRequestUntilConnected(
if (paginateTopics) {
debug(s"upgrading GetChildrenRequest to GetChildrenPaginatedRequest for " +
s"'${FederatedTopicZnode.namespacePath(namespace)}'")
GetChildrenPaginatedRequest(FederatedTopicZnode.namespacePath(namespace), registerWatch)
} else {
GetChildrenRequest(FederatedTopicZnode.namespacePath(namespace), registerWatch)
}
)
getChildrenResponse.resultCode match {
case Code.OK => getChildrenResponse.children.toSet
case Code.NONODE => Set.empty
case _ => throw getChildrenResponse.resultException.get
}
}

/**
* Checks the topic existence
* @param topicName
Expand Down Expand Up @@ -1896,12 +1907,12 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient,
}

def createFederatedTopicZNode(topic: String, namespace: String): Unit = {
val path = FederatedTopicZnode.path(topic)
createRecursive(path, FederatedTopicZnode.encode(namespace))
val path = FederatedTopicZnode.path(topic, namespace)
createRecursive(path)
}

def deleteFederatedTopicZNode(topic: String): Unit = {
deletePath(FederatedTopicZnode.path(topic), ZkVersion.MatchAnyVersion, false)
def deleteFederatedTopicZNode(topic: String, namespace: String): Unit = {
deletePath(FederatedTopicZnode.path(topic, namespace), ZkVersion.MatchAnyVersion, false)
}

private def setConsumerOffset(group: String, topicPartition: TopicPartition, offset: Long): SetDataResponse = {
Expand Down
5 changes: 2 additions & 3 deletions core/src/main/scala/kafka/zk/ZkData.scala
Original file line number Diff line number Diff line change
Expand Up @@ -340,9 +340,8 @@ object BrokerIdZNode {
}

object FederatedTopicZnode {
def path(topic: String) = s"${FederatedTopicsZNode.path}/$topic"
def encode(namespace: String): Array[Byte] = namespace.getBytes(UTF_8)
def decode(bytes: Array[Byte]): String = if (bytes != null) new String(bytes, UTF_8) else ""
def path(topic: String, namespace: String) = s"${FederatedTopicsZNode.path}/$namespace/$topic"
def namespacePath(namespace: String) = s"${FederatedTopicsZNode.path}/$namespace"
}

object TopicsZNode {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,12 +153,11 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
assertEquals(1, allZnodes.size())
assertEquals(expectedFedTopicString, allZnodes.get(0))
// 2. test list specific success
val expectedSuccess = client.listFederatedTopicZnodes(Collections.singletonList("federated-test-topic"),
new ListFederatedTopicZnodesOptions()).topics().get()
val expectedSuccess = client.listFederatedTopicZnodes(federatedTopic, new ListFederatedTopicZnodesOptions()).topics().get()
assertEquals(1, expectedSuccess.size())
assertEquals(expectedFedTopicString, expectedSuccess.get(0))
// 3. test list specific fail
val expectedFail = client.listFederatedTopicZnodes(Collections.singletonList("non-exist-topic"),
val expectedFail = client.listFederatedTopicZnodes(Collections.singletonMap("non-exist-topic", "tracking"),
new ListFederatedTopicZnodesOptions()).topics().get()
assertEquals(0, expectedFail.size())

Expand Down

0 comments on commit 64c29e7

Please sign in to comment.