Skip to content

Commit

Permalink
KAFKA-12690 Remove deprecated Producer#sendOffsetsToTransaction (#17865)
Browse files Browse the repository at this point in the history
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
  • Loading branch information
Rancho-7 authored Nov 22, 2024
1 parent 38aca3a commit 7db4d53
Show file tree
Hide file tree
Showing 6 changed files with 0 additions and 110 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -689,46 +689,6 @@ public void beginTransaction() throws ProducerFencedException {
producerMetrics.recordBeginTxn(time.nanoseconds() - now);
}

/**
* Sends a list of specified offsets to the consumer group coordinator, and also marks
* those offsets as part of the current transaction. These offsets will be considered
* committed only if the transaction is committed successfully. The committed offset should
* be the next message your application will consume, i.e. lastProcessedMessageOffset + 1.
* <p>
* This method should be used when you need to batch consumed and produced messages
* together, typically in a consume-transform-produce pattern. Thus, the specified
* {@code consumerGroupId} should be the same as config parameter {@code group.id} of the used
* {@link KafkaConsumer consumer}. Note, that the consumer should have {@code enable.auto.commit=false}
* and should also not commit offsets manually (via {@link KafkaConsumer#commitSync(Map) sync} or
* {@link KafkaConsumer#commitAsync(Map, OffsetCommitCallback) async} commits).
*
* <p>
* This method is a blocking call that waits until the request has been received and acknowledged by the consumer group
* coordinator; but the offsets are not considered as committed until the transaction itself is successfully committed later (via
* the {@link #commitTransaction()} call).
*
* @throws IllegalStateException if no transactional.id has been configured, no transaction has been started
* @throws ProducerFencedException fatal error indicating another producer with the same transactional.id is active
* @throws org.apache.kafka.common.errors.UnsupportedVersionException fatal error indicating the broker
* does not support transactions (i.e. if its version is lower than 0.11.0.0)
* @throws org.apache.kafka.common.errors.UnsupportedForMessageFormatException fatal error indicating the message
* format used for the offsets topic on the broker does not support transactions
* @throws org.apache.kafka.common.errors.AuthorizationException fatal error indicating that the configured
* transactional.id is not authorized, or the consumer group id is not authorized.
* @throws org.apache.kafka.common.errors.InvalidProducerEpochException if the producer has attempted to produce with an old epoch
* to the partition leader. See the exception for more details
* @throws TimeoutException if the time taken for sending the offsets has surpassed <code>max.block.ms</code>.
* @throws KafkaException if the producer has encountered a previous fatal or abortable error, or for any
* other unexpected error
*
* @deprecated Since 3.0.0, please use {@link #sendOffsetsToTransaction(Map, ConsumerGroupMetadata)} instead.
*/
@Deprecated
public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,
String consumerGroupId) throws ProducerFencedException {
sendOffsetsToTransaction(offsets, new ConsumerGroupMetadata(consumerGroupId));
}

/**
* Sends a list of specified offsets to the consumer group coordinator, and also marks
* those offsets as part of the current transaction. These offsets will be considered
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,14 +202,6 @@ public void beginTransaction() throws ProducerFencedException {
this.sentOffsets = false;
}

@Deprecated
@Override
public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,
String consumerGroupId) throws ProducerFencedException {
Objects.requireNonNull(consumerGroupId);
sendOffsetsToTransaction(offsets, new ConsumerGroupMetadata(consumerGroupId));
}

@Override
public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,
ConsumerGroupMetadata groupMetadata) throws ProducerFencedException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,6 @@ public interface Producer<K, V> extends Closeable {
*/
void beginTransaction() throws ProducerFencedException;

/**
* See {@link KafkaProducer#sendOffsetsToTransaction(Map, String)}
*/
@Deprecated
void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,
String consumerGroupId) throws ProducerFencedException;

/**
* See {@link KafkaProducer#sendOffsetsToTransaction(Map, ConsumerGroupMetadata)}
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -419,15 +419,6 @@ public void shouldPublishConsumerGroupOffsetsOnlyAfterCommitIfTransactionsAreEna
assertEquals(Collections.singletonList(expectedResult), producer.consumerGroupOffsetsHistory());
}

@Deprecated
@Test
public void shouldThrowOnNullConsumerGroupIdWhenSendOffsetsToTransaction() {
buildMockProducer(true);
producer.initTransactions();
producer.beginTransaction();
assertThrows(NullPointerException.class, () -> producer.sendOffsetsToTransaction(Collections.emptyMap(), (String) null));
}

@Test
public void shouldThrowOnNullConsumerGroupMetadataWhenSendOffsetsToTransaction() {
buildMockProducer(true);
Expand All @@ -436,16 +427,6 @@ public void shouldThrowOnNullConsumerGroupMetadataWhenSendOffsetsToTransaction()
assertThrows(NullPointerException.class, () -> producer.sendOffsetsToTransaction(Collections.emptyMap(), new ConsumerGroupMetadata(null)));
}

@Deprecated
@Test
public void shouldIgnoreEmptyOffsetsWhenSendOffsetsToTransactionByGroupId() {
buildMockProducer(true);
producer.initTransactions();
producer.beginTransaction();
producer.sendOffsetsToTransaction(Collections.emptyMap(), "groupId");
assertFalse(producer.sentOffsets());
}

@Test
public void shouldIgnoreEmptyOffsetsWhenSendOffsetsToTransactionByGroupMetadata() {
buildMockProducer(true);
Expand All @@ -455,24 +436,6 @@ public void shouldIgnoreEmptyOffsetsWhenSendOffsetsToTransactionByGroupMetadata(
assertFalse(producer.sentOffsets());
}

@Deprecated
@Test
public void shouldAddOffsetsWhenSendOffsetsToTransactionByGroupId() {
buildMockProducer(true);
producer.initTransactions();
producer.beginTransaction();

assertFalse(producer.sentOffsets());

Map<TopicPartition, OffsetAndMetadata> groupCommit = new HashMap<TopicPartition, OffsetAndMetadata>() {
{
put(new TopicPartition(topic, 0), new OffsetAndMetadata(42L, null));
}
};
producer.sendOffsetsToTransaction(groupCommit, "groupId");
assertTrue(producer.sentOffsets());
}

@Test
public void shouldAddOffsetsWhenSendOffsetsToTransactionByGroupMetadata() {
buildMockProducer(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.MethodSource

import scala.annotation.nowarn
import scala.jdk.CollectionConverters._
import scala.collection.mutable

Expand Down Expand Up @@ -76,14 +75,6 @@ class TransactionsBounceTest extends IntegrationTestHarness {

override protected def brokerCount: Int = 4

@nowarn("cat=deprecation")
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_KAFKA_17961"))
def testWithGroupId(quorum: String, groupProtocol: String): Unit = {
testBrokerFailure((producer, groupId, consumer) =>
producer.sendOffsetsToTransaction(TestUtils.consumerPositions(consumer).asJava, groupId))
}

@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testWithGroupMetadata(quorum: String, groupProtocol: String): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ import java.time.Duration
import java.util
import java.util.concurrent.TimeUnit
import java.util.{Optional, Properties}
import scala.annotation.nowarn
import scala.collection.{Seq, mutable}
import scala.collection.mutable.{ArrayBuffer, ListBuffer}
import scala.concurrent.ExecutionException
Expand Down Expand Up @@ -301,14 +300,6 @@ class TransactionsTest extends IntegrationTestHarness {
assertEquals(3L, second.offset)
}

@nowarn("cat=deprecation")
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_KAFKA_17961"))
def testSendOffsetsWithGroupId(quorum: String, groupProtocol: String): Unit = {
sendOffset((producer, groupId, consumer) =>
producer.sendOffsetsToTransaction(TestUtils.consumerPositions(consumer).asJava, groupId))
}

@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testSendOffsetsWithGroupMetadata(quorum: String, groupProtocol: String): Unit = {
Expand Down

0 comments on commit 7db4d53

Please sign in to comment.