Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

LIKAFKA-41423: Kafka federation proof of concept using dual-cluster integration test (ProxyBasedFederationTest) #326

Open
wants to merge 9 commits into
base: 2.4-li
Choose a base branch
from

Conversation

groelofs
Copy link

(This is an updated version of #275 with a key fix by Lucas to move the UMR-forwarding logic into the ControllerEvent thread, plus some log improvements, etc., and a small enhancement to EmbeddedZookeeper to allow tests to optionally specify the ZK port. This PR is mainly for reference; the next step is to port everything to the 3.0-li branch so development can continue from there.)

First, a quick design note (relevant to the MetadataCache): the topics/partitions and nodes content of UpdateMetadataRequests (UMRs) remains unchanged, i.e., any given update contains all the nodes for that controller's set of brokers (only); controllers don't include other clusters' nodes in their own requests. The relevance to MetadataCache is that both the original version and the new one assume this and completely replace all the cached nodes for the single cluster that originated the request (NOT all the nodes in the federation).

The main changes live in:

  • KafkaController - new clusterId argument, passed down to ControllerChannelManager and ControllerBrokerRequestBatch constructors; new "pass-through" methods getBrokerNode() and addRemoteController() (probably temporary/test-only?); new rewriteAndForwardRemoteUpdateMetadataRequest() and sendRemoteUpdateMetadataRequest() methods (involved when the "broker half" receives a fresh UMR from a remote cluster and hands it off to the "controller half" to be rewritten and rerouted to its local brokers)
  • ControllerChannelManager - new remoteControllerIds set of remote/foreign controller nodes' broker IDs (to allow picking just the remote controllers out of the existing brokerStateInfo map, which is now shared by both local brokers and remote controllers); new getBrokerNode() method to tell remote controller(s) about the local controller and addRemoteController() method to receive and store such info; modifications to addNewBroker() to support both local brokers (existing case) and remote controllers (new/federation case); modified sendUpdateMetadataRequests() method to support clusterId in requests and to send local UpdateMetadataRequests to remote controllers (if federation enabled); new sendRemoteRequestToBrokers() method to send remote topic metadata to local brokers
  • KafkaApis - "broker half" that receives both normal UMRs and (if we're the lead controller node) UMRs from remote clusters in the federation; has the new "mini-router" in doHandleUpdateMetadataRequest() to distinguish the two cases and either process the UMR normally or, if federation is enabled and the UMR is a remote one and it's not already rewritten, send it to the "controller half" to be rewritten and forwarded to local brokers
  • MetadataCache - conversion of MetadataSnapshot to a trait/interface plus a single-cluster implementation ("SingleClusterMetadataSnapshot"); refactor of updateMetadata() to pull common code out into two new helper methods for sharing with FederatedMetadataCache; conversion of various methods to protected for same reason
  • FederatedMetadataCache - new federated MetadataSnapshot variant ("MultiClusterMetadataSnapshot" with an extra level of hashmap to support aliveNodes and aliveBrokers for multiple clusters); modifications to updateMetadata() to update only the relevant cluster's data in the new multi-cluster hashmaps
  • UpdateMetadataRequest.java - new clusterId argument and getters; new WrappingBuilder inner class to support the rerouting of remote requests, which are already "built" and otherwise don't easily work with the existing networking stack, which assumes Builder objects; new rewrite method for incoming remote requests (prior to rerouting them to the local brokers)
  • UpdateMetadataRequest.json - new tagged (optional) fields "ClusterId" and "RoutingClusterId"

The majority of the other files have only trivial changes, mostly involving the addition of a clusterId argument to certain constructors (e.g., UpdateMetadataRequest.Builder). There are also a few log-message tweaks to make it easier to highlight and/or grep for the clusterId uniformly in test output or other logging. I also have some leftover comments about what I thought might be bugs in the LCCR code that Lucas and I have discussed, but I didn't get around to removing my comments yet (in KafkaApis).

Testing-wise, everything lives in a single new test case at the bottom of ProxyBasedFederationTest. (Apologies for the huge work-in-progress TODO/FIXME/etc. comment...) The test still needs real assertions related to the presence or absence of various topics, but it does at least verify that a consumer can connect to a broker in either physical cluster and successfully read a given topic. In the interim it still has the old assertions from the PlaintextConsumerTest or whatever it was called; these are placeholders only.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

groelofs and others added 9 commits February 9, 2022 16:13
…ntegration test (ProxyBasedFederationTest). Main changes live in KafkaController, ControllerChannelManager, KafkaApis, MetadataCache, and UpdateMetadataRequest itself (both .java and .json).
…ntegration test. (Fix for overlooked merge bug/typo.)
…ntegration test. (Fix for infinite loop in testBasicMultiClusterSetup(), plus indentation fix for ControllerChannelManager and cleanup of obsolete debug noise in KafkaController.)
…ntegration test. (Code-review fixes: refactor MetadataCache into legacy and federated-subclass variants, and merge ControllerChannelManager's brokerStateInfo and remoteControllerStateInfo maps in favor of simple remoteControllerIds hashset to mark remote controllers.)
…ntegration test. (Code-review fixes: rename UMR 'ClusterId' to 'OriginClusterId', and this time include new FederatedMetadataCache class, doh...)
…ntegration test. (Add integ-test support for non-random embedded ZK ports.)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants