From 0536dc7e7a1b608fb98a8e5efa297a398d9cccb7 Mon Sep 17 00:00:00 2001 From: Lucas Wang Date: Wed, 6 Jan 2021 17:03:16 -0800 Subject: [PATCH 1/6] Fixing bug so that replicas won't be assigned to maintenance brokers or preferred controllers --- core/src/main/scala/kafka/zk/AdminZkClient.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/kafka/zk/AdminZkClient.scala b/core/src/main/scala/kafka/zk/AdminZkClient.scala index 1d338f3f5e9f..3dbd8c11cbdc 100644 --- a/core/src/main/scala/kafka/zk/AdminZkClient.scala +++ b/core/src/main/scala/kafka/zk/AdminZkClient.scala @@ -53,7 +53,7 @@ class AdminZkClient(zkClient: KafkaZkClient) extends Logging { topicConfig: Properties = new Properties, rackAwareMode: RackAwareMode = RackAwareMode.Enforced): Unit = { val brokerMetadatas = getBrokerMetadatas(rackAwareMode) - val noNewPartitionBrokerIds = getMaintenanceBrokerList() + val noNewPartitionBrokerIds = getMaintenanceBrokerList() ++ zkClient.getPreferredControllerList val replicaAssignment = assignReplicasToAvailableBrokers(brokerMetadatas, noNewPartitionBrokerIds.toSet, partitions, replicationFactor) createTopicWithAssignment(topic, topicConfig, replicaAssignment) } @@ -235,7 +235,7 @@ class AdminZkClient(zkClient: KafkaZkClient) extends Logging { numPartitions: Int = 1, replicaAssignment: Option[Map[Int, Seq[Int]]] = None, validateOnly: Boolean = false): Map[Int, Seq[Int]] = { - val noNewPartitionBrokerIds = getMaintenanceBrokerList() + val noNewPartitionBrokerIds = getMaintenanceBrokerList() ++ zkClient.getPreferredControllerList addPartitions(topic, existingAssignment, allBrokers, numPartitions, replicaAssignment, validateOnly, noNewPartitionBrokerIds.toSet) } From 3f1698337ff60c7f479ed45cf15a0285fa515bc7 Mon Sep 17 00:00:00 2001 From: Lucas Wang Date: Wed, 6 Jan 2021 17:34:02 -0800 Subject: [PATCH 2/6] Adding test to ensure the bug doesn't happen again --- .../server/PreferredControllerTest.scala | 29 +++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/core/src/test/scala/unit/kafka/server/PreferredControllerTest.scala b/core/src/test/scala/unit/kafka/server/PreferredControllerTest.scala index 3a130a22baaf..d929170d4134 100644 --- a/core/src/test/scala/unit/kafka/server/PreferredControllerTest.scala +++ b/core/src/test/scala/unit/kafka/server/PreferredControllerTest.scala @@ -33,6 +33,7 @@ import org.scalatest.Assertions.fail import scala.collection.JavaConverters._ import scala.collection.Map +import scala.collection.immutable class PreferredControllerTest extends ZooKeeperTestHarness { @@ -71,6 +72,34 @@ class PreferredControllerTest extends ZooKeeperTestHarness { client.close() } + @Test + def testPartitionCreatedByAdminZkClientShouldNotBeAssignedToPreferredControllers(): Unit = { + val brokerConfigs = Seq((0, false), (1, true), (2, false)) + createBrokersWithPreferredControllers(brokerConfigs, true) + + val brokerList = TestUtils.bootstrapServers(brokers, ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)) + TestUtils.waitUntilControllerElected(zkClient) + // create topic using admin client + val topic = "topic1" + TestUtils.createTopic(zkClient, topic, 3, 2, brokers) + + assertTrue("topic1 should not be in broker 1", ensureTopicNotInBrokers("topic1", Set(1))) + + val existingAssignment = zkClient.getFullReplicaAssignmentForTopics(immutable.Set(topic)).map { + case (topicPartition, assignment) => topicPartition.partition -> assignment + } + val allBrokers = adminZkClient.getBrokerMetadatas() + val newPartitionsCount = 5 + adminZkClient.addPartitions(topic, existingAssignment, allBrokers, 5) + (0 until newPartitionsCount).map { i => + TestUtils.waitUntilMetadataIsPropagated(brokers, topic, i) + i -> TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, i) + } + + assertTrue("topic1 should not be in broker 1 after increasing partition count", + ensureTopicNotInBrokers("topic1", Set(1))) + } + @Test def testElectionWithoutPreferredControllersAndNoFallback(): Unit = { val brokerConfigs = Seq((0, false), (1, false), (2, false)) From caddfe111d8dfed7adb227dd342c1963670e5cb4 Mon Sep 17 00:00:00 2001 From: Lucas Wang Date: Wed, 6 Jan 2021 19:28:36 -0800 Subject: [PATCH 3/6] Added test and fixes on the broker side to rearrange assignments during partition expansion --- .../kafka/controller/KafkaController.scala | 27 ++++++++++++------- .../server/PreferredControllerTest.scala | 1 - 2 files changed, 18 insertions(+), 10 deletions(-) diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index af27d0a6f8fe..146dfc8f4665 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -880,7 +880,7 @@ class KafkaController(val config: KafkaConfig, // between the moment this broker started and right now when it becomes controller again. loadMinIsrForTopics(controllerContext.allTopics) - rearrangePartitionReplicaAssignmentForNewTopics(controllerContext.allTopics.toSet) + rearrangePartitionReplicaAssignmentForNewPartitions(controllerContext.allTopics.toSet, true) registerPartitionModificationsHandlers(controllerContext.allTopics.toSeq) getReplicaAssignmentPolicyCompliant(controllerContext.allTopics.toSet).foreach { case (topicPartition, replicaAssignment) => @@ -968,25 +968,33 @@ class KafkaController(val config: KafkaConfig, // Rearrange partition and replica assignment for new topics that get assigned to // maintenance brokers that do not take new partitions - private def rearrangePartitionReplicaAssignmentForNewTopics(topics: Set[String]) { + private def rearrangePartitionReplicaAssignmentForNewPartitions(topics: Set[String], onlyNewTopics: Boolean) { try { val noNewPartitionBrokers = partitionUnassignableBrokerIds if (noNewPartitionBrokers.nonEmpty) { - val newTopics = zkClient.getPartitionNodeNonExistsTopics(topics.toSet) - val newTopicsToBeArranged = zkClient.getPartitionAssignmentForTopics(newTopics).filter { - case (_, partitionMap) => - partitionMap.exists { + val topicsToCheck = if (onlyNewTopics) + zkClient.getPartitionNodeNonExistsTopics(topics.toSet) + else + topics + + val topicsToBeRearranged = zkClient.getPartitionAssignmentForTopics(topicsToCheck.toSet).filter { + case (topic, partitionMap) => + val existingAssignment = controllerContext.partitionAssignments.getOrElse(topic, mutable.Map.empty) + val newPartitions = partitionMap.filter{case (partitionId, _) => partitionId >= existingAssignment.size} + newPartitions.exists { case (_, assignedReplicas) => assignedReplicas.replicas.intersect(noNewPartitionBrokers).nonEmpty } } - newTopicsToBeArranged.foreach { + topicsToBeRearranged.foreach { case (topic, partitionMap) => val numPartitions = partitionMap.size val numReplica = partitionMap.head._2.replicas.size val brokers = controllerContext.liveOrShuttingDownBrokers.map { b => kafka.admin.BrokerMetadata(b.id, b.rack) }.toSeq - val replicaAssignment = adminZkClient.assignReplicasToAvailableBrokers(brokers, noNewPartitionBrokers.toSet, numPartitions, numReplica) + val existingAssignment = controllerContext.partitionAssignments.getOrElse(topic, mutable.Map.empty) + val partitionsToAdd = numPartitions - existingAssignment.size + val replicaAssignment = adminZkClient.assignReplicasToAvailableBrokers(brokers, noNewPartitionBrokers.toSet, partitionsToAdd, numReplica, -1, existingAssignment.size) adminZkClient.writeTopicPartitionAssignment(topic, replicaAssignment.mapValues(ReplicaAssignment(_)).toMap, true) info(s"Rearrange partition and replica assignment for topic [$topic]") } @@ -1697,7 +1705,7 @@ class KafkaController(val config: KafkaConfig, val newTopics = topics -- controllerContext.allTopics val deletedTopics = controllerContext.allTopics -- topics controllerContext.allTopics = topics - rearrangePartitionReplicaAssignmentForNewTopics(newTopics) + rearrangePartitionReplicaAssignmentForNewPartitions(newTopics, true) registerPartitionModificationsHandlers(newTopics.toSeq) val addedPartitionReplicaAssignment = getReplicaAssignmentPolicyCompliant(newTopics) @@ -1755,6 +1763,7 @@ class KafkaController(val config: KafkaConfig, } if (!isActive) return + rearrangePartitionReplicaAssignmentForNewPartitions(immutable.Set(topic), false) val partitionReplicaAssignment = zkClient.getReplicaAssignmentForTopics(immutable.Set(topic)) val partitionsToBeAdded = partitionReplicaAssignment.filter { case (topicPartition, _) => controllerContext.partitionReplicaAssignment(topicPartition).isEmpty diff --git a/core/src/test/scala/unit/kafka/server/PreferredControllerTest.scala b/core/src/test/scala/unit/kafka/server/PreferredControllerTest.scala index d929170d4134..d7ef64eb1eb0 100644 --- a/core/src/test/scala/unit/kafka/server/PreferredControllerTest.scala +++ b/core/src/test/scala/unit/kafka/server/PreferredControllerTest.scala @@ -77,7 +77,6 @@ class PreferredControllerTest extends ZooKeeperTestHarness { val brokerConfigs = Seq((0, false), (1, true), (2, false)) createBrokersWithPreferredControllers(brokerConfigs, true) - val brokerList = TestUtils.bootstrapServers(brokers, ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)) TestUtils.waitUntilControllerElected(zkClient) // create topic using admin client val topic = "topic1" From 38e9116c8e04754271386d70db2de5518f0777ec Mon Sep 17 00:00:00 2001 From: Lucas Wang Date: Thu, 7 Jan 2021 17:44:24 -0800 Subject: [PATCH 4/6] Check supplied topics without considering they are new or not --- .../scala/kafka/controller/KafkaController.scala | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index 146dfc8f4665..24eb5924fdd0 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -880,7 +880,7 @@ class KafkaController(val config: KafkaConfig, // between the moment this broker started and right now when it becomes controller again. loadMinIsrForTopics(controllerContext.allTopics) - rearrangePartitionReplicaAssignmentForNewPartitions(controllerContext.allTopics.toSet, true) + rearrangePartitionReplicaAssignmentForNewPartitions(controllerContext.allTopics.toSet) registerPartitionModificationsHandlers(controllerContext.allTopics.toSeq) getReplicaAssignmentPolicyCompliant(controllerContext.allTopics.toSet).foreach { case (topicPartition, replicaAssignment) => @@ -968,15 +968,10 @@ class KafkaController(val config: KafkaConfig, // Rearrange partition and replica assignment for new topics that get assigned to // maintenance brokers that do not take new partitions - private def rearrangePartitionReplicaAssignmentForNewPartitions(topics: Set[String], onlyNewTopics: Boolean) { + private def rearrangePartitionReplicaAssignmentForNewPartitions(topicsToCheck: Set[String]) { try { val noNewPartitionBrokers = partitionUnassignableBrokerIds if (noNewPartitionBrokers.nonEmpty) { - val topicsToCheck = if (onlyNewTopics) - zkClient.getPartitionNodeNonExistsTopics(topics.toSet) - else - topics - val topicsToBeRearranged = zkClient.getPartitionAssignmentForTopics(topicsToCheck.toSet).filter { case (topic, partitionMap) => val existingAssignment = controllerContext.partitionAssignments.getOrElse(topic, mutable.Map.empty) @@ -1705,7 +1700,7 @@ class KafkaController(val config: KafkaConfig, val newTopics = topics -- controllerContext.allTopics val deletedTopics = controllerContext.allTopics -- topics controllerContext.allTopics = topics - rearrangePartitionReplicaAssignmentForNewPartitions(newTopics, true) + rearrangePartitionReplicaAssignmentForNewPartitions(newTopics) registerPartitionModificationsHandlers(newTopics.toSeq) val addedPartitionReplicaAssignment = getReplicaAssignmentPolicyCompliant(newTopics) @@ -1763,7 +1758,7 @@ class KafkaController(val config: KafkaConfig, } if (!isActive) return - rearrangePartitionReplicaAssignmentForNewPartitions(immutable.Set(topic), false) + rearrangePartitionReplicaAssignmentForNewPartitions(immutable.Set(topic)) val partitionReplicaAssignment = zkClient.getReplicaAssignmentForTopics(immutable.Set(topic)) val partitionsToBeAdded = partitionReplicaAssignment.filter { case (topicPartition, _) => controllerContext.partitionReplicaAssignment(topicPartition).isEmpty From 05973423e1d93e9c1bf0937d7b61ef10e7223449 Mon Sep 17 00:00:00 2001 From: Lucas Wang Date: Thu, 27 May 2021 15:20:52 -0700 Subject: [PATCH 5/6] Reject reassigning partitions to unassignable brokers --- .../scala/kafka/controller/KafkaController.scala | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index 24eb5924fdd0..a3dd585df5cb 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -1876,9 +1876,9 @@ class KafkaController(val config: KafkaConfig, } else { val reassignmentResults = mutable.Map.empty[TopicPartition, ApiError] val partitionsToReassign = mutable.Map.empty[TopicPartition, ReplicaAssignment] - + val noNewPartitionBrokers = partitionUnassignableBrokerIds.toSet reassignments.foreach { case (tp, targetReplicas) => - if (replicasAreValid(tp, targetReplicas)) { + if (replicasAreValid(tp, targetReplicas, noNewPartitionBrokers)) { maybeBuildReassignment(tp, targetReplicas) match { case Some(context) => partitionsToReassign.put(tp, context) case None => reassignmentResults.put(tp, new ApiError(Errors.NO_REASSIGNMENT_IN_PROGRESS)) @@ -1897,7 +1897,8 @@ class KafkaController(val config: KafkaConfig, } } - private def replicasAreValid(topicPartition: TopicPartition, replicasOpt: Option[Seq[Int]]): Boolean = { + private def replicasAreValid(topicPartition: TopicPartition, replicasOpt: Option[Seq[Int]], + noNewPartitionBrokers: Set[Int]): Boolean = { replicasOpt match { case Some(replicas) => val replicaSet = replicas.toSet @@ -1905,7 +1906,10 @@ class KafkaController(val config: KafkaConfig, false else if (replicas.exists(_ < 0)) false - else { + else if (!replicaSet.intersect(noNewPartitionBrokers).isEmpty) { + warn(s"reject reassignment of $topicPartition to unassignable hosts $noNewPartitionBrokers") + false + } else { // Ensure that any new replicas are among the live brokers val currentAssignment = controllerContext.partitionFullReplicaAssignment(topicPartition) val newAssignment = currentAssignment.reassignTo(replicas) From 821a23e7cd18e48d8e8419a60e1f1710629888d4 Mon Sep 17 00:00:00 2001 From: Lucas Wang Date: Fri, 28 May 2021 13:33:47 -0700 Subject: [PATCH 6/6] Adding test to ensure partition reassignment don't like on maintenance brokers --- .../kafka/controller/KafkaController.scala | 4 ++ .../kafka/server/MaintenanceBrokerTest.scala | 71 ++++++++++++++++++- .../server/PreferredControllerTest.scala | 28 -------- 3 files changed, 73 insertions(+), 30 deletions(-) diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index a3dd585df5cb..a91383814a9a 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -880,7 +880,11 @@ class KafkaController(val config: KafkaConfig, // between the moment this broker started and right now when it becomes controller again. loadMinIsrForTopics(controllerContext.allTopics) + // scan partitions of all topics and ensure they don't lie on partitionUnassignableBrokerIds + // the controllerContext.partitionAssignments is still not initialized yet + // thus every single partition will be checked inside rearrangePartitionReplicaAssignmentForNewPartitions rearrangePartitionReplicaAssignmentForNewPartitions(controllerContext.allTopics.toSet) + registerPartitionModificationsHandlers(controllerContext.allTopics.toSeq) getReplicaAssignmentPolicyCompliant(controllerContext.allTopics.toSet).foreach { case (topicPartition, replicaAssignment) => diff --git a/core/src/test/scala/unit/kafka/server/MaintenanceBrokerTest.scala b/core/src/test/scala/unit/kafka/server/MaintenanceBrokerTest.scala index c97740136623..336d053ce644 100644 --- a/core/src/test/scala/unit/kafka/server/MaintenanceBrokerTest.scala +++ b/core/src/test/scala/unit/kafka/server/MaintenanceBrokerTest.scala @@ -18,13 +18,14 @@ package kafka.server import java.util.{Optional, Properties} - import kafka.server.KafkaConfig.fromProps import kafka.utils.CoreUtils._ import kafka.utils.TestUtils import kafka.utils.TestUtils._ import kafka.zk.ZooKeeperTestHarness import org.apache.kafka.clients.admin._ +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.errors.InvalidReplicaAssignmentException import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.security.auth.SecurityProtocol @@ -32,7 +33,8 @@ import scala.collection.JavaConverters._ import org.junit.Assert._ import org.junit.{After, Test} -import scala.collection.Map +import scala.collection.{Map, Seq} +import scala.concurrent.ExecutionException /** * This is the main test which ensure maintenance broker work correctly. @@ -172,6 +174,71 @@ class MaintenanceBrokerTest extends ZooKeeperTestHarness { client.close() } + @Test + def testAddPartitionByAdminZkClientShouldHonorMaintenanceBrokers(): Unit = { + brokers = (0 to 2).map { id => createServer(fromProps(createBrokerConfig(id, zkConnect))) } + + TestUtils.waitUntilControllerElected(zkClient) + // setting broker 1 to not take new topic partitions + setMaintenanceBrokers(Seq(1)) + + // create topic using admin client + val topic = "topic1" + TestUtils.createTopic(zkClient, topic, 3, 2, brokers) + + assertTrue("topic1 should not be in broker 1", ensureTopicNotInBrokers("topic1", Set(1))) + + val existingAssignment = zkClient.getFullReplicaAssignmentForTopics(Set(topic)).map { + case (topicPartition, assignment) => topicPartition.partition -> assignment + } + val allBrokers = adminZkClient.getBrokerMetadatas() + val newPartitionsCount = 5 + adminZkClient.addPartitions(topic, existingAssignment, allBrokers, 5) + (0 until newPartitionsCount).map { i => + TestUtils.waitUntilMetadataIsPropagated(brokers, topic, i) + i -> TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, i) + } + + assertTrue("topic1 should not be in broker 1 after increasing partition count", + ensureTopicNotInBrokers("topic1", Set(1))) + } + + @Test + def testPartitionReassignmentShouldHonorMaintenanceBrokers(): Unit = { + brokers = (0 to 2).map { id => createServer(fromProps(createBrokerConfig(id, zkConnect))) } + + TestUtils.waitUntilControllerElected(zkClient) + // setting broker 1 to not take new topic partitions + setMaintenanceBrokers(Seq(1)) + + // create topic using admin client + val topic = "topic1" + TestUtils.createTopic(zkClient, topic, 1, 2, brokers) + assertTrue("topic1 should not be in broker 1", ensureTopicNotInBrokers("topic1", Set(1))) + + // get the admin client + val adminClientConfig = new Properties + val brokerList = TestUtils.bootstrapServers(brokers, ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)) + adminClientConfig.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList) + val client = AdminClient.create(adminClientConfig) + + val reassignmentsResult = client.alterPartitionReassignments(Map(reassignmentEntry(new TopicPartition(topic, 0), Seq(0, 1))).asJava) + var reassignmentFailed = false + try { + reassignmentsResult.all().get() + } catch { + case e : ExecutionException => + assertTrue(e.getCause.isInstanceOf[InvalidReplicaAssignmentException]) + reassignmentFailed = true + } + assertTrue("the partition reassignment should have failed", reassignmentFailed) + client.close() + } + + def reassignmentEntry(tp: TopicPartition, replicas: Seq[Int]): (TopicPartition, java.util.Optional[NewPartitionReassignment]) = + tp -> Optional.of(new NewPartitionReassignment((replicas.map(_.asInstanceOf[Integer]).asJava))) + + @Test def testTopicCreatedInZkShouldBeRearrangedForMaintenanceBrokers(): Unit = { diff --git a/core/src/test/scala/unit/kafka/server/PreferredControllerTest.scala b/core/src/test/scala/unit/kafka/server/PreferredControllerTest.scala index d7ef64eb1eb0..3a130a22baaf 100644 --- a/core/src/test/scala/unit/kafka/server/PreferredControllerTest.scala +++ b/core/src/test/scala/unit/kafka/server/PreferredControllerTest.scala @@ -33,7 +33,6 @@ import org.scalatest.Assertions.fail import scala.collection.JavaConverters._ import scala.collection.Map -import scala.collection.immutable class PreferredControllerTest extends ZooKeeperTestHarness { @@ -72,33 +71,6 @@ class PreferredControllerTest extends ZooKeeperTestHarness { client.close() } - @Test - def testPartitionCreatedByAdminZkClientShouldNotBeAssignedToPreferredControllers(): Unit = { - val brokerConfigs = Seq((0, false), (1, true), (2, false)) - createBrokersWithPreferredControllers(brokerConfigs, true) - - TestUtils.waitUntilControllerElected(zkClient) - // create topic using admin client - val topic = "topic1" - TestUtils.createTopic(zkClient, topic, 3, 2, brokers) - - assertTrue("topic1 should not be in broker 1", ensureTopicNotInBrokers("topic1", Set(1))) - - val existingAssignment = zkClient.getFullReplicaAssignmentForTopics(immutable.Set(topic)).map { - case (topicPartition, assignment) => topicPartition.partition -> assignment - } - val allBrokers = adminZkClient.getBrokerMetadatas() - val newPartitionsCount = 5 - adminZkClient.addPartitions(topic, existingAssignment, allBrokers, 5) - (0 until newPartitionsCount).map { i => - TestUtils.waitUntilMetadataIsPropagated(brokers, topic, i) - i -> TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, i) - } - - assertTrue("topic1 should not be in broker 1 after increasing partition count", - ensureTopicNotInBrokers("topic1", Set(1))) - } - @Test def testElectionWithoutPreferredControllersAndNoFallback(): Unit = { val brokerConfigs = Seq((0, false), (1, false), (2, false))