Skip to content

Commit

Permalink
Discard changes to tests/kafkatest/tests/core/transactions_upgrade_te…
Browse files Browse the repository at this point in the history
…st.py
  • Loading branch information
kirktrue authored Nov 22, 2024
1 parent 8992790 commit 1f4ec65
Showing 1 changed file with 16 additions and 11 deletions.
27 changes: 16 additions & 11 deletions tests/kafkatest/tests/core/transactions_upgrade_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from kafkatest.services.kafka import KafkaService, quorum
from kafkatest.services.kafka import KafkaService, quorum, consumer_group
from kafkatest.services.kafka.quorum import isolated_kraft
from kafkatest.services.console_consumer import ConsoleConsumer
from kafkatest.services.verifiable_producer import VerifiableProducer
Expand Down Expand Up @@ -75,19 +75,20 @@ def seed_messages(self, topic, num_seed_messages):
(self.num_seed_messages, seed_timeout_sec))
return seed_producer.acked

def get_messages_from_topic(self, topic, num_messages):
consumer = self.start_consumer(topic, group_id="verifying_consumer")
def get_messages_from_topic(self, topic, num_messages, group_protocol):
consumer = self.start_consumer(topic, group_id="verifying_consumer", group_protocol=group_protocol)
return self.drain_consumer(consumer, num_messages)

def start_consumer(self, topic_to_read, group_id):
def start_consumer(self, topic_to_read, group_id, group_protocol):
consumer = ConsoleConsumer(context=self.test_context,
num_nodes=1,
kafka=self.kafka,
topic=topic_to_read,
group_id=group_id,
message_validator=is_int,
from_beginning=True,
isolation_level="read_committed")
isolation_level="read_committed",
consumer_properties=consumer_group.maybe_set_group_protocol(group_protocol))
consumer.start()
# ensure that the consumer is up.
wait_until(lambda: (len(consumer.messages_consumed[1]) > 0) == True,
Expand Down Expand Up @@ -143,7 +144,7 @@ def perform_upgrade(self, from_kafka_version):

def copy_messages_transactionally_during_upgrade(self, input_topic, output_topic,
num_copiers, num_messages_to_copy,
use_group_metadata,
use_group_metadata, group_protocol,
from_kafka_version):
"""Copies messages transactionally from the seeded input topic to the
output topic while an rolling upgrade occurs.
Expand All @@ -155,13 +156,16 @@ def copy_messages_transactionally_during_upgrade(self, input_topic, output_topic
"""
copiers = create_and_start_copiers(test_context=self.test_context,
kafka=self.kafka,
consumer_group=self.consumer_group,
input_topic=input_topic,
output_topic=output_topic,
transaction_size=self.transaction_size,
transaction_timeout=self.transaction_timeout,
num_copiers=num_copiers,
use_group_metadata=use_group_metadata)
concurrent_consumer = self.start_consumer(output_topic, group_id="concurrent_consumer")
concurrent_consumer = self.start_consumer(output_topic,
group_id="concurrent_consumer",
group_protocol=group_protocol)

self.perform_upgrade(from_kafka_version)

Expand Down Expand Up @@ -197,9 +201,10 @@ def setup_topics(self):
@matrix(
from_kafka_version=[str(LATEST_3_9), str(LATEST_3_8), str(LATEST_3_7), str(LATEST_3_6), str(LATEST_3_5), str(LATEST_3_4), str(LATEST_3_3), str(LATEST_3_2), str(LATEST_3_1)],
metadata_quorum=[isolated_kraft],
use_new_coordinator=[False]
use_new_coordinator=[False],
group_protocol=[None]
)
def test_transactions_upgrade(self, from_kafka_version, metadata_quorum=quorum.isolated_kraft, use_new_coordinator=False):
def test_transactions_upgrade(self, from_kafka_version, metadata_quorum=quorum.isolated_kraft, use_new_coordinator=False, group_protocol=None):
fromKafkaVersion = KafkaVersion(from_kafka_version)
self.kafka = KafkaService(self.test_context,
num_nodes=self.num_brokers,
Expand All @@ -220,9 +225,9 @@ def test_transactions_upgrade(self, from_kafka_version, metadata_quorum=quorum.i
input_messages = self.seed_messages(self.input_topic, self.num_seed_messages)
concurrently_consumed_messages = self.copy_messages_transactionally_during_upgrade(
input_topic=self.input_topic, output_topic=self.output_topic, num_copiers=self.num_input_partitions,
num_messages_to_copy=self.num_seed_messages, use_group_metadata=True,
num_messages_to_copy=self.num_seed_messages, use_group_metadata=True, group_protocol=group_protocol,
from_kafka_version=from_kafka_version)
output_messages = self.get_messages_from_topic(self.output_topic, self.num_seed_messages)
output_messages = self.get_messages_from_topic(self.output_topic, self.num_seed_messages, group_protocol)

concurrently_consumed_message_set = set(concurrently_consumed_messages)
output_message_set = set(output_messages)
Expand Down

0 comments on commit 1f4ec65

Please sign in to comment.