Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MINOR: enable consumer auth tests #17885

Merged
merged 4 commits into from
Nov 22, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -920,7 +920,7 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest {
}

@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_KAFKA_17696"))
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testConsumeUsingAssignWithNoAccess(quorum: String, groupProtocol: String): Unit = {
createTopicWithBrokerPrincipal(topic)

Expand All @@ -935,7 +935,7 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest {
}

@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_KAFKA_17696"))
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testSimpleConsumeWithOffsetLookupAndNoGroupAccess(quorum: String, groupProtocol: String): Unit = {
createTopicWithBrokerPrincipal(topic)

Expand All @@ -955,7 +955,7 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest {
}

@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_KAFKA_17696"))
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testSimpleConsumeWithExplicitSeekAndNoGroupAccess(quorum: String, groupProtocol: String): Unit = {
createTopicWithBrokerPrincipal(topic)

Expand All @@ -975,7 +975,7 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest {
}

@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_KAFKA_17696"))
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testConsumeWithoutTopicDescribeAccess(quorum: String, groupProtocol: String): Unit = {
createTopicWithBrokerPrincipal(topic)

Expand All @@ -993,7 +993,7 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest {
}

@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_KAFKA_17696"))
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testConsumeWithTopicDescribe(quorum: String, groupProtocol: String): Unit = {
createTopicWithBrokerPrincipal(topic)

Expand All @@ -1012,7 +1012,7 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest {
}

@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_KAFKA_17696"))
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testConsumeWithTopicWrite(quorum: String, groupProtocol: String): Unit = {
createTopicWithBrokerPrincipal(topic)

Expand All @@ -1031,7 +1031,7 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest {
}

@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_KAFKA_17696"))
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testConsumeWithTopicAndGroupRead(quorum: String, groupProtocol: String): Unit = {
createTopicWithBrokerPrincipal(topic)

Expand Down Expand Up @@ -1076,7 +1076,7 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest {
}

@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_KAFKA_17696"))
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testPatternSubscriptionWithTopicDescribeOnlyAndGroupRead(quorum: String, groupProtocol: String): Unit = {
createTopicWithBrokerPrincipal(topic)

Expand All @@ -1094,7 +1094,7 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest {
}

@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_KAFKA_17696"))
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testPatternSubscriptionWithTopicAndGroupRead(quorum: String, groupProtocol: String): Unit = {
val assignSemaphore = new Semaphore(0)
createTopicWithBrokerPrincipal(topic)
Expand Down Expand Up @@ -1135,7 +1135,7 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest {
}

@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_KAFKA_17696"))
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testPatternSubscriptionMatchingInternalTopic(quorum: String, groupProtocol: String): Unit = {
createTopicWithBrokerPrincipal(topic)

Expand Down Expand Up @@ -1165,7 +1165,7 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest {
}

@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_KAFKA_17696"))
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testPatternSubscriptionMatchingInternalTopicWithDescribeOnlyPermission(quorum: String, groupProtocol: String): Unit = {
createTopicWithBrokerPrincipal(topic)

Expand All @@ -1191,7 +1191,7 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest {
}

@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_KAFKA_17696"))
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testPatternSubscriptionNotMatchingInternalTopic(quorum: String, groupProtocol: String): Unit = {
createTopicWithBrokerPrincipal(topic)

Expand All @@ -1210,15 +1210,15 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest {
}

@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_KAFKA_17696"))
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testCreatePermissionOnTopicToReadFromNonExistentTopic(quorum: String, groupProtocol: String): Unit = {
testCreatePermissionNeededToReadFromNonExistentTopic("newTopic",
Set(new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, CREATE, ALLOW)),
TOPIC)
}

@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_KAFKA_17696"))
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testCreatePermissionOnClusterToReadFromNonExistentTopic(quorum: String, groupProtocol: String): Unit = {
testCreatePermissionNeededToReadFromNonExistentTopic("newTopic",
Set(new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, CREATE, ALLOW)),
Expand Down Expand Up @@ -1280,15 +1280,15 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest {
}

@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_KAFKA_17696"))
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testCommitWithNoTopicAccess(quorum: String, groupProtocol: String): Unit = {
addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, READ, ALLOW)), groupResource)
val consumer = createConsumer()
assertThrows(classOf[TopicAuthorizationException], () => consumer.commitSync(Map(tp -> new OffsetAndMetadata(5)).asJava))
}

@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_KAFKA_17696"))
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testCommitWithTopicWrite(quorum: String, groupProtocol: String): Unit = {
createTopicWithBrokerPrincipal(topic)

Expand All @@ -1299,7 +1299,7 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest {
}

@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_KAFKA_17696"))
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testCommitWithTopicDescribe(quorum: String, groupProtocol: String): Unit = {
createTopicWithBrokerPrincipal(topic)

Expand All @@ -1318,7 +1318,7 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest {
}

@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_KAFKA_17696"))
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testCommitWithTopicAndGroupRead(quorum: String, groupProtocol: String): Unit = {
createTopicWithBrokerPrincipal(topic)
addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, READ, ALLOW)), groupResource)
Expand Down Expand Up @@ -1346,7 +1346,7 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest {
}

@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_KAFKA_17696"))
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testOffsetFetchWithNoTopicAccess(quorum: String, groupProtocol: String): Unit = {
addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, READ, ALLOW)), groupResource)
val consumer = createConsumer()
Expand All @@ -1355,7 +1355,7 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest {
}

@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_KAFKA_17696"))
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testOffsetFetchAllTopicPartitionsAuthorization(quorum: String, groupProtocol: String): Unit = {
createTopicWithBrokerPrincipal(topic)

Expand Down Expand Up @@ -1387,7 +1387,7 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest {
}

@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_KAFKA_17696"))
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testOffsetFetchMultipleGroupsAuthorization(quorum: String, groupProtocol: String): Unit = {
val groups: Seq[String] = (1 to 5).map(i => s"group$i")
val groupResources = groups.map(group => new ResourcePattern(GROUP, group, LITERAL))
Expand Down Expand Up @@ -1543,7 +1543,7 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest {
}

@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_KAFKA_17696"))
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testOffsetFetchTopicDescribe(quorum: String, groupProtocol: String): Unit = {
createTopicWithBrokerPrincipal(topic)
addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, DESCRIBE, ALLOW)), groupResource)
Expand All @@ -1554,7 +1554,7 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest {
}

@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_KAFKA_17696"))
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testOffsetFetchWithTopicAndGroupRead(quorum: String, groupProtocol: String): Unit = {
createTopicWithBrokerPrincipal(topic)
addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, READ, ALLOW)), groupResource)
Expand All @@ -1565,14 +1565,14 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest {
}

@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_KAFKA_17696"))
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testMetadataWithNoTopicAccess(quorum: String, groupProtocol: String): Unit = {
val consumer = createConsumer()
assertThrows(classOf[TopicAuthorizationException], () => consumer.partitionsFor(topic))
}

@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_KAFKA_17696"))
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testMetadataWithTopicDescribe(quorum: String, groupProtocol: String): Unit = {
createTopicWithBrokerPrincipal(topic)
addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, DESCRIBE, ALLOW)), topicResource)
Expand All @@ -1588,7 +1588,7 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest {
}

@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_KAFKA_17696"))
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testListOffsetsWithTopicDescribe(quorum: String, groupProtocol: String): Unit = {
createTopicWithBrokerPrincipal(topic)
addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WILDCARD_HOST, DESCRIBE, ALLOW)), topicResource)
Expand All @@ -1614,7 +1614,7 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest {
}

@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_KAFKA_17696"))
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testListGroupApiWithAndWithoutListGroupAcls(quorum: String, groupProtocol: String): Unit = {
createTopicWithBrokerPrincipal(topic)

Expand Down Expand Up @@ -1663,7 +1663,7 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest {
}

@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_KAFKA_17696"))
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testDeleteGroupApiWithDeleteGroupAcl(quorum: String, groupProtocol: String): Unit = {
createTopicWithBrokerPrincipal(topic)

Expand All @@ -1677,7 +1677,7 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest {
}

@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_KAFKA_17696"))
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testDeleteGroupApiWithNoDeleteGroupAcl(quorum: String, groupProtocol: String): Unit = {
createTopicWithBrokerPrincipal(topic)

Expand All @@ -1698,7 +1698,7 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest {
}

@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_KAFKA_17696"))
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testDeleteGroupOffsetsWithAcl(quorum: String, groupProtocol: String): Unit = {
createTopicWithBrokerPrincipal(topic)

Expand All @@ -1714,7 +1714,7 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest {
}

@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_KAFKA_17696"))
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testDeleteGroupOffsetsWithoutDeleteAcl(quorum: String, groupProtocol: String): Unit = {
createTopicWithBrokerPrincipal(topic)

Expand All @@ -1729,7 +1729,7 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest {
}

@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_KAFKA_17696"))
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testDeleteGroupOffsetsWithDeleteAclWithoutTopicAcl(quorum: String, groupProtocol: String): Unit = {
createTopicWithBrokerPrincipal(topic)
// Create the consumer group
Expand Down Expand Up @@ -2458,7 +2458,7 @@ class AuthorizerIntegrationTest extends AbstractAuthorizerIntegrationTest {
}

@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_KAFKA_17696"))
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testCreateAndCloseConsumerWithNoAccess(quorum: String, groupProtocol: String): Unit = {
val consumer = createConsumer()
val closeConsumer: Executable = () => consumer.close()
Expand Down