Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[LI-HOTFIX] Avoid assigning replicas to the preferred controllers or maintenance brokers #111

Open
wants to merge 6 commits into
base: 2.4-li
Choose a base branch
from
Open
22 changes: 13 additions & 9 deletions core/src/main/scala/kafka/controller/KafkaController.scala
Original file line number Diff line number Diff line change
Expand Up @@ -880,7 +880,7 @@ class KafkaController(val config: KafkaConfig,
// between the moment this broker started and right now when it becomes controller again.
loadMinIsrForTopics(controllerContext.allTopics)

rearrangePartitionReplicaAssignmentForNewTopics(controllerContext.allTopics.toSet)
rearrangePartitionReplicaAssignmentForNewPartitions(controllerContext.allTopics.toSet)
registerPartitionModificationsHandlers(controllerContext.allTopics.toSeq)
getReplicaAssignmentPolicyCompliant(controllerContext.allTopics.toSet).foreach {
case (topicPartition, replicaAssignment) =>
Expand Down Expand Up @@ -968,25 +968,28 @@ class KafkaController(val config: KafkaConfig,

// Rearrange partition and replica assignment for new topics that get assigned to
// maintenance brokers that do not take new partitions
private def rearrangePartitionReplicaAssignmentForNewTopics(topics: Set[String]) {
private def rearrangePartitionReplicaAssignmentForNewPartitions(topicsToCheck: Set[String]) {
try {
val noNewPartitionBrokers = partitionUnassignableBrokerIds
if (noNewPartitionBrokers.nonEmpty) {
val newTopics = zkClient.getPartitionNodeNonExistsTopics(topics.toSet)
val newTopicsToBeArranged = zkClient.getPartitionAssignmentForTopics(newTopics).filter {
case (_, partitionMap) =>
partitionMap.exists {
val topicsToBeRearranged = zkClient.getPartitionAssignmentForTopics(topicsToCheck.toSet).filter {
case (topic, partitionMap) =>
val existingAssignment = controllerContext.partitionAssignments.getOrElse(topic, mutable.Map.empty)
val newPartitions = partitionMap.filter{case (partitionId, _) => partitionId >= existingAssignment.size}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could we add some safe check here to ensure the partition state znode doesn't exist for these new partitions. Although I don't see an issue in the current implementation, if this function is used incorrectly, it would be very dangerous. For example, if we use "rearrangePartitionReplicaAssignmentForNewPartitions(topics, false)" when initializing controller context, this may cause all topics to get reassigned since controllerContext.partitionAssignments is an empty set (this will also result in orphan partitions).
Again, I don't see the issue in this implementation, but I think it is worth checking.

In addition, this doesn't address the case when there is controller move right after partition expansion (before the rearrange actually happened). I think it is ok since we are not trying to handle 100% no replicas in the preferred controller. Maybe we can add some comments for this unhandled situations.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the comments. It's true that they are all newPartitions given controllerContext.partitionAssignments is an empty set, but they shouldn't have a replica on the noNewPartitionBrokers, thus they shouldn't be rearranged.

It's a good point that the current implementation cannot handle controller switches. Given it's safe to scan all topics' partitions on controller switch, I think it should be done during a controller switch. Thoughts?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

manual assignment for existing topics can still assign partitions to noNewPartitionBrokers. I think we cannot guarantee that noNewPartitionBrokers won't get replicas. In addition, there are some small-time window that new replicas can still get assigned to preferred controllers due to the fact that preferred controller znode is emphermal znode (see the design doc for more detail).

I think a safer way is to rely on the existence of partition state znode when performing rearrangement.

scan all topics' partitions on controller switch => If we cannot guarantee 100% no replica in the preferred controllers, I think it is ok to not performing special handing during controller switch given the overhead/additional hacky code needed (up to you to make a decision).

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Upon closer look, I find that there is already logic to handle the controller switch over by calling the rearrangePartitionReplicaAssignmentForNewPartitions method inside initializeControllerContext.

Regarding the preferred controller znodes being ephemeral, it's kinda an orthogonal design issue that we could address independently.

newPartitions.exists {
case (_, assignedReplicas) =>
assignedReplicas.replicas.intersect(noNewPartitionBrokers).nonEmpty
}
}
newTopicsToBeArranged.foreach {
topicsToBeRearranged.foreach {
case (topic, partitionMap) =>
val numPartitions = partitionMap.size
val numReplica = partitionMap.head._2.replicas.size
val brokers = controllerContext.liveOrShuttingDownBrokers.map { b => kafka.admin.BrokerMetadata(b.id, b.rack) }.toSeq

val replicaAssignment = adminZkClient.assignReplicasToAvailableBrokers(brokers, noNewPartitionBrokers.toSet, numPartitions, numReplica)
val existingAssignment = controllerContext.partitionAssignments.getOrElse(topic, mutable.Map.empty)
val partitionsToAdd = numPartitions - existingAssignment.size
val replicaAssignment = adminZkClient.assignReplicasToAvailableBrokers(brokers, noNewPartitionBrokers.toSet, partitionsToAdd, numReplica, -1, existingAssignment.size)
adminZkClient.writeTopicPartitionAssignment(topic, replicaAssignment.mapValues(ReplicaAssignment(_)).toMap, true)
info(s"Rearrange partition and replica assignment for topic [$topic]")
}
Expand Down Expand Up @@ -1697,7 +1700,7 @@ class KafkaController(val config: KafkaConfig,
val newTopics = topics -- controllerContext.allTopics
val deletedTopics = controllerContext.allTopics -- topics
controllerContext.allTopics = topics
rearrangePartitionReplicaAssignmentForNewTopics(newTopics)
rearrangePartitionReplicaAssignmentForNewPartitions(newTopics)

registerPartitionModificationsHandlers(newTopics.toSeq)
val addedPartitionReplicaAssignment = getReplicaAssignmentPolicyCompliant(newTopics)
Expand Down Expand Up @@ -1755,6 +1758,7 @@ class KafkaController(val config: KafkaConfig,
}

if (!isActive) return
rearrangePartitionReplicaAssignmentForNewPartitions(immutable.Set(topic))
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does it conflict with partition reassignment?
say if a partition gets reassigned to replica ( 1, 2, 5, 6 ) ==> if one of this replica is maintenance brokers, would this reassignment complete if we automatically changing zk node to disallow placing replicas on maintenance brokers.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good point. I've updated the PR to avoid assignment replicas to the undesirable hosts during partition reassignment. Please take another look. Thanks!

val partitionReplicaAssignment = zkClient.getReplicaAssignmentForTopics(immutable.Set(topic))
val partitionsToBeAdded = partitionReplicaAssignment.filter { case (topicPartition, _) =>
controllerContext.partitionReplicaAssignment(topicPartition).isEmpty
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/kafka/zk/AdminZkClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ class AdminZkClient(zkClient: KafkaZkClient) extends Logging {
topicConfig: Properties = new Properties,
rackAwareMode: RackAwareMode = RackAwareMode.Enforced): Unit = {
val brokerMetadatas = getBrokerMetadatas(rackAwareMode)
val noNewPartitionBrokerIds = getMaintenanceBrokerList()
val noNewPartitionBrokerIds = getMaintenanceBrokerList() ++ zkClient.getPreferredControllerList
val replicaAssignment = assignReplicasToAvailableBrokers(brokerMetadatas, noNewPartitionBrokerIds.toSet, partitions, replicationFactor)
createTopicWithAssignment(topic, topicConfig, replicaAssignment)
}
Expand Down Expand Up @@ -235,7 +235,7 @@ class AdminZkClient(zkClient: KafkaZkClient) extends Logging {
numPartitions: Int = 1,
replicaAssignment: Option[Map[Int, Seq[Int]]] = None,
validateOnly: Boolean = false): Map[Int, Seq[Int]] = {
val noNewPartitionBrokerIds = getMaintenanceBrokerList()
val noNewPartitionBrokerIds = getMaintenanceBrokerList() ++ zkClient.getPreferredControllerList
addPartitions(topic, existingAssignment, allBrokers, numPartitions, replicaAssignment, validateOnly, noNewPartitionBrokerIds.toSet)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import org.scalatest.Assertions.fail

import scala.collection.JavaConverters._
import scala.collection.Map
import scala.collection.immutable

class PreferredControllerTest extends ZooKeeperTestHarness {

Expand Down Expand Up @@ -71,6 +72,33 @@ class PreferredControllerTest extends ZooKeeperTestHarness {
client.close()
}

@Test
def testPartitionCreatedByAdminZkClientShouldNotBeAssignedToPreferredControllers(): Unit = {
val brokerConfigs = Seq((0, false), (1, true), (2, false))
createBrokersWithPreferredControllers(brokerConfigs, true)

TestUtils.waitUntilControllerElected(zkClient)
// create topic using admin client
val topic = "topic1"
TestUtils.createTopic(zkClient, topic, 3, 2, brokers)

assertTrue("topic1 should not be in broker 1", ensureTopicNotInBrokers("topic1", Set(1)))

val existingAssignment = zkClient.getFullReplicaAssignmentForTopics(immutable.Set(topic)).map {
case (topicPartition, assignment) => topicPartition.partition -> assignment
}
val allBrokers = adminZkClient.getBrokerMetadatas()
val newPartitionsCount = 5
adminZkClient.addPartitions(topic, existingAssignment, allBrokers, 5)
(0 until newPartitionsCount).map { i =>
TestUtils.waitUntilMetadataIsPropagated(brokers, topic, i)
i -> TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, i)
}

assertTrue("topic1 should not be in broker 1 after increasing partition count",
ensureTopicNotInBrokers("topic1", Set(1)))
}

@Test
def testElectionWithoutPreferredControllersAndNoFallback(): Unit = {
val brokerConfigs = Seq((0, false), (1, false), (2, false))
Expand Down