Skip to content

Commit

Permalink
LIKAFKA-41423: Kafka federation proof of concept using dual-cluster i…
Browse files Browse the repository at this point in the history
…ntegration test. (Code-review fixes: rename UMR 'ClusterId' to 'OriginClusterId', and this time include new FederatedMetadataCache class, doh...)
  • Loading branch information
groelofs committed Feb 16, 2022
1 parent 91da79d commit b1718a4
Show file tree
Hide file tree
Showing 7 changed files with 193 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,19 +51,19 @@ public class UpdateMetadataRequest extends AbstractControlRequest {
public static class Builder extends AbstractControlRequest.Builder<UpdateMetadataRequest> {
private final List<UpdateMetadataPartitionState> partitionStates;
private final List<UpdateMetadataBroker> liveBrokers;
private final String clusterId;
private final String originClusterId;
private Lock buildLock = new ReentrantLock();

// LIKAFKA-18349 - Cache the UpdateMetadataRequest Objects to reduce memory usage
private final Map<Short, UpdateMetadataRequest> requestCache = new HashMap<>();

public Builder(short version, int controllerId, int controllerEpoch, long brokerEpoch, long maxBrokerEpoch,
List<UpdateMetadataPartitionState> partitionStates, List<UpdateMetadataBroker> liveBrokers,
String clusterId) {
String originClusterId) {
super(ApiKeys.UPDATE_METADATA, version, controllerId, controllerEpoch, brokerEpoch, maxBrokerEpoch);
this.partitionStates = partitionStates;
this.liveBrokers = liveBrokers;
this.clusterId = clusterId;
this.originClusterId = originClusterId;
}

@Override
Expand Down Expand Up @@ -108,10 +108,10 @@ public UpdateMetadataRequest build(short version) {
data.setUngroupedPartitionStates(partitionStates);
}

// clusterId == null implies federation is not enabled (though reverse may not be true); will be ignored
// during serialization (data.toStruct())
// originClusterId == null implies federation is not enabled (though reverse may not be true); will be
// ignored during serialization (data.toStruct())
if (version >= 7) {
data.setClusterId(clusterId);
data.setOriginClusterId(originClusterId);
}

updateMetadataRequest = new UpdateMetadataRequest(data, version);
Expand Down Expand Up @@ -178,7 +178,7 @@ public static class WrappingBuilder extends Builder {

public WrappingBuilder(UpdateMetadataRequest umr) {
super(umr.version(), umr.controllerId(), umr.controllerEpoch(), umr.brokerEpoch(), umr.maxBrokerEpoch(),
toList(umr.partitionStates()), umr.liveBrokers(), umr.clusterId());
toList(umr.partitionStates()), umr.liveBrokers(), umr.originClusterId());
this.updateMetadataRequest = umr;
}

Expand Down Expand Up @@ -251,8 +251,8 @@ public UpdateMetadataRequest(Struct struct, short version) {
}

// federation
public String clusterId() {
return data.clusterId();
public String originClusterId() {
return data.originClusterId();
}

// federation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,12 @@
"validVersions": "0-7",
"flexibleVersions": "7+",
"fields": [
{ "name": "ClusterId", "type": "string", "versions": "7+", "nullableVersions": "7+", "default": "null",
{ "name": "OriginClusterId", "type": "string", "versions": "7+", "nullableVersions": "7+", "default": "null",
"taggedVersions": "7+", "tag": 0, "ignorable": true,
"about": "The clusterId if known. In federated clusters, this is the ID of the originating physical cluster, i.e., it matches the included broker info." },
{ "name": "RoutingClusterId", "type": "string", "versions": "7+", "nullableVersions": "7+", "default": "null",
"taggedVersions": "7+", "tag": 1, "ignorable": true,
"about": "The 'effective' or rewritten clusterId; for routing purposes, has precedence over ClusterId if present. In federated clusters, updates from other physical clusters must be modified by the local controller and then forwarded to local brokers, including to the broker half of the controller itself. This field allows the local controller to avoid infinite loops." },
"about": "The 'effective' or rewritten clusterId; for routing purposes, has precedence over OriginClusterId if present. In federated clusters, updates from other physical clusters must be modified by the local controller and then forwarded to local brokers, including to the broker half of the controller itself. This field allows the local controller to avoid infinite loops." },
{ "name": "ControllerId", "type": "int32", "versions": "0+", "entityType": "brokerId",
"about": "The controller id." },
{ "name": "ControllerEpoch", "type": "int32", "versions": "0+",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ class ControllerChannelManager(controllerContext: ControllerContext,
// GRR FIXME: should we instrument the time spent waiting for this lock (and its other 5 call sites)?
// (would love to see histogram in active controller)
brokerLock synchronized {
var stateInfoOpt = brokerStateInfo.get(brokerId)
val stateInfoOpt = brokerStateInfo.get(brokerId)
stateInfoOpt match {
case Some(stateInfo) =>
stateInfo.messageQueue.put(QueueItem(request.apiKey, request, callback, time.milliseconds()))
Expand Down
41 changes: 22 additions & 19 deletions core/src/main/scala/kafka/controller/KafkaController.scala
Original file line number Diff line number Diff line change
Expand Up @@ -341,58 +341,61 @@ class KafkaController(val config: KafkaConfig,
*/
def rewriteAndForwardRemoteUpdateMetadataRequest(umr: UpdateMetadataRequest): UpdateMetadataResponse = {

// Upstream caller has already verified that clusterId doesn't match our own AND routingClusterId doesn't match
// (null or mismatch), i.e., this is a remote, incoming request (or a bug...).
// Upstream caller has already verified that originClusterId doesn't match our own AND routingClusterId doesn't
// match (null or mismatch), i.e., this is a remote, incoming request (or a bug).

// OPERABILITY TODO (longer-term): would be good to associate a (persistent) color with each physical cluster
// and include it in logging (and probably also in some znode) => more human-memorable than random UUID strings
// [maybe do same thing for federation overall => refuse to talk to remote controller if federation-color (or
// flavor/fruit/star/name/etc.) doesn't match own: useful sanity check]
// (though might be tricky to consistently and persistently associate a _unique_ color with each cluster in the
// federation...hmmm)
// [LIKAFKA-42834: do same thing for federation overall => refuse to talk to remote controller if its federation
// ID-string doesn't match our own: useful sanity check]

info(s"GRR DEBUG: controller for clusterId=${clusterId} has received a remote, non-rewritten UpdateMetadataRequest "
+ s"(UMR clusterId=${umr.clusterId}, routingClusterId=${umr.routingClusterId}), and li.federation.enable="
+ s"(UMR clusterId=${umr.originClusterId}, routingClusterId=${umr.routingClusterId}), and li.federation.enable="
+ s"${config.liFederationEnable}: about to validate and rewrite it")

if (!config.liFederationEnable) {
if (!config.liFederationEnable) { // is this even possible? KafkaApis shouldn't call us unless federation == true
// GRR TODO: increment some (new?) error metric
// GRR FIXME: do we need to log the exception message separately? not sure what preferred pattern is
// GRR FIXME: should we return an error-UpdateMetadataResponse instead of throwing? what's correct pattern?
throw new IllegalStateException(s"Received rewritten UpdateMetadataRequest from clusterId=${umr.clusterId} " +
throw new IllegalStateException(s"Received rewritten UpdateMetadataRequest from clusterId=${umr.originClusterId} " +
s"with routingClusterId=${umr.routingClusterId}, but li.federation.enable=${config.liFederationEnable}")
}

if (umr.routingClusterId != null && umr.routingClusterId != clusterId) {
// GRR TODO: increment some (new?) error metric
// GRR FIXME: do we need to log the exception message separately? not sure what preferred pattern is
// GRR FIXME: should we return an error-UpdateMetadataResponse instead of throwing? what's correct pattern?
throw new IllegalStateException(s"Received rewritten UpdateMetadataRequest from clusterId=${umr.clusterId}, " +
throw new IllegalStateException(s"Received rewritten UpdateMetadataRequest from clusterId=${umr.originClusterId}, " +
s"but routingClusterId=${umr.routingClusterId} does not match us (clusterId=${clusterId})")
}
// upstream already handled routingClusterId == clusterId case, so at this point we know routingClusterId == null
// and we can safely rewrite it
// KafkaApis already handled the umr.routingClusterId == clusterId case, so at this point we know
// umr.routingClusterId == null and can safely rewrite it

// GRR FIXME: should we refresh the preferred-controller list first? presumably it's latency-expensive...
//controllerContext.setLivePreferredControllerIds(zkClient.getPreferredControllerList.toSet)
if (!config.preferredController) {
// GRR TODO: increment some (new?) error metric
// GRR FIXME: do we need to log the exception message separately? not sure what preferred pattern is
// GRR FIXME: should we return an error-UpdateMetadataResponse instead of throwing? what's correct pattern?
throw new IllegalStateException(s"Received UpdateMetadataRequest from clusterId=${umr.clusterId} " +
throw new IllegalStateException(s"Received UpdateMetadataRequest from clusterId=${umr.originClusterId} " +
s"(we're clusterId=${clusterId}), but we're not a preferred controller")
}

// At this point we know routingClusterId == null and we're a controller node (maybe not leader, but preferred).
// Both leaders and inactive preferred controllers (i.e., everybody who receives this request) need to cache
// At this point we know routingClusterId == null and we're a controller node (maybe not active, but preferred).
// Both active and inactive preferred controllers (i.e., everybody who receives this request) need to cache
// the remote data (in "firewalled" data structures) in order to serve it but not otherwise act on it.

//GRR WORKING:
// GRR TODO: cache metadata in some new data structures for remote physical clusters' info: FIGURE OUT WHAT KIND
// [huh...not seeing any such data structures in this class...all buried within ChannelMgr or EventMgr?]
// [ChannelMgr has brokerStateInfo hashmap...all brokers, keyed by brokerId (NEEDS brokerLock!)]
// GRR TODO: remote-controller metadata is cached within channel manager, but need to verify (or ensure) that
// admin operations requested of local controller don't result in action against remote controllers, only
// local brokers

// FIXME? is there a isActive race condition here such that a remote request could get dropped if
// FIXME? is there an isActive race condition here such that a remote request could get dropped if
// a non-active/leader controller becomes the leader right around the time both old and new leaders
// are checking this conditional?
// are checking this conditional? [no, should be safe: newly active controller will be proactively
// sent full metadata dumps by all remote controllers, even if they're newly active as well: see
// startup/failover section of design doc]
if (isActive) { // rewrite request and stuff it back into "the queue" (whatever/wherever that is)
// rewrite request (in place)
umr.rewriteRemoteRequest(
Expand Down
Loading

0 comments on commit b1718a4

Please sign in to comment.