diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/PassThroughConsumerRecord.java b/clients/src/main/java/org/apache/kafka/clients/consumer/PassThroughConsumerRecord.java new file mode 100644 index 000000000000..efe82a02c239 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/PassThroughConsumerRecord.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer; + +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.record.TimestampType; + +/** + * In case of passthrough, value contains the message batch while the key is null + * @param null + * @param batch of messages + */ +public class PassThroughConsumerRecord extends ConsumerRecord { + private final byte magic; + + public PassThroughConsumerRecord(String topic, int partition, long offset, K key, V value, byte magic) { + super(topic, partition, offset, key, value); + this.magic = magic; + } + + public PassThroughConsumerRecord(String topic, int partition, long offset, long timestamp, + TimestampType timestampType, long checksum, int serializedKeySize, int serializedValueSize, K key, V value, + byte magic) { + super(topic, partition, offset, timestamp, timestampType, checksum, serializedKeySize, serializedValueSize, key, + value); + this.magic = magic; + } + + public PassThroughConsumerRecord(String topic, int partition, long offset, long timestamp, + TimestampType timestampType, Long checksum, int serializedKeySize, int serializedValueSize, K key, V value, + Headers headers, byte magic) { + super(topic, partition, offset, timestamp, timestampType, checksum, serializedKeySize, serializedValueSize, key, + value, headers); + this.magic = magic; + } + + public byte magic() { + return magic; + } +} \ No newline at end of file diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java index 93e9a0b30608..8602bc1a0aad 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java @@ -26,6 +26,7 @@ import org.apache.kafka.clients.consumer.OffsetAndTimestamp; import org.apache.kafka.clients.consumer.OffsetOutOfRangeException; import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.clients.consumer.PassThroughConsumerRecord; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.Node; @@ -1147,6 +1148,13 @@ private ConsumerRecord parseRecord(TopicPartition partition, (enableShallowIteration ? ((AbstractLegacyRecordBatch) record).outerRecord().buffer() : record.value()); byte[] valueByteArray = valueBytes == null ? null : Utils.toArray(valueBytes); V value = valueBytes == null ? null : this.valueDeserializer.deserialize(partition.topic(), headers, valueByteArray); + if (enableShallowIteration) { + return new PassThroughConsumerRecord<>(partition.topic(), partition.partition(), offset, timestamp, + timestampType, record.checksumOrNull(), + keyByteArray == null ? ConsumerRecord.NULL_SIZE : keyByteArray.length, + valueByteArray == null ? ConsumerRecord.NULL_SIZE : valueByteArray.length, key, value, headers, + batch.magic()); + } return new ConsumerRecord<>(partition.topic(), partition.partition(), offset, timestamp, timestampType, record.checksumOrNull(), keyByteArray == null ? ConsumerRecord.NULL_SIZE : keyByteArray.length, diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java index a9bb29ce73f0..174937f75817 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java @@ -69,6 +69,8 @@ */ public final class RecordAccumulator { + private static final String PASS_THROUGH_MAGIC_VALUE = "__passThroughMagicValue"; + private final Logger log; private volatile boolean closed; private final AtomicInteger flushesInProgress; @@ -222,6 +224,20 @@ public RecordAppendResult append(TopicPartition tp, return appendResult; } + // HOTFIX for enabling mirroring data with passthrough compression with mixed message format + // In PassThrough mode, KMM will set a header with key = "__passThroughMagicValue" and value = magic byte of the message. + // This code enables passthrough were source is in 0.10 format and destination is in 2.0 format (not the other way round). + if (compression.equals(CompressionType.PASSTHROUGH)) { + for (Header header : headers) { + if (header.key().equals(PASS_THROUGH_MAGIC_VALUE)) { + byte srcMagicValue = header.value()[0]; + if (maxUsableMagic > srcMagicValue) { + maxUsableMagic = srcMagicValue; + } + } + } + } + MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, maxUsableMagic); ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, time.milliseconds()); FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, headers, callback, time.milliseconds())); diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java index 54e25ec10d11..b80c32fac799 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java +++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java @@ -792,8 +792,12 @@ public boolean hasRoomFor(long timestamp, ByteBuffer key, ByteBuffer value, Head // For passthrough V2, ensure one producerBatch only has one DefaultRecordBatch, and since // in this case, DefaultRecordBatch is the value part of a DefaultRecord, so we only allow one record - if (magic >= RecordBatch.MAGIC_VALUE_V2 && usePassthrough) + // For passthrough V1, ideally we can append multiple passthrough records to the same batch, it + // will not work when we are migrating from V1 to V2 message format as we cannot append V1 and V2 messages + // in the same batch. + if (usePassthrough) { return false; + } final int recordSize; if (magic < RecordBatch.MAGIC_VALUE_V2) { diff --git a/core/src/main/scala/kafka/consumer/BaseConsumerRecord.scala b/core/src/main/scala/kafka/consumer/BaseConsumerRecord.scala index 7628b6b65168..01408a744d17 100644 --- a/core/src/main/scala/kafka/consumer/BaseConsumerRecord.scala +++ b/core/src/main/scala/kafka/consumer/BaseConsumerRecord.scala @@ -30,4 +30,5 @@ case class BaseConsumerRecord(topic: String, timestampType: TimestampType = TimestampType.NO_TIMESTAMP_TYPE, key: Array[Byte], value: Array[Byte], - headers: Headers = new RecordHeaders()) + headers: Headers = new RecordHeaders(), + magic: Byte = -1) diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala index 24d799d92274..5d7ce16cef1b 100755 --- a/core/src/main/scala/kafka/tools/MirrorMaker.scala +++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala @@ -28,13 +28,15 @@ import joptsimple.OptionParser import kafka.consumer.BaseConsumerRecord import kafka.metrics.KafkaMetricsGroup import kafka.utils.{CommandLineUtils, CoreUtils, Logging, Whitelist} -import org.apache.kafka.clients.consumer.{CommitFailedException, Consumer, ConsumerConfig, ConsumerRebalanceListener, ConsumerRecord, KafkaConsumer, OffsetAndMetadata} +import org.apache.kafka.clients.consumer.{CommitFailedException, Consumer, ConsumerConfig, ConsumerRebalanceListener, ConsumerRecord, KafkaConsumer, OffsetAndMetadata, PassThroughConsumerRecord} import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord, RecordMetadata} import org.apache.kafka.common.{KafkaException, TopicPartition} import org.apache.kafka.common.serialization.ByteArrayDeserializer import org.apache.kafka.common.utils.Utils import org.apache.kafka.common.errors.WakeupException +import org.apache.kafka.common.header.Headers +import org.apache.kafka.common.header.internals.{RecordHeader, RecordHeaders} import org.apache.kafka.common.record.RecordBatch import scala.collection.JavaConverters._ @@ -69,6 +71,22 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { private var offsetCommitIntervalMs = 0 private var abortOnSendFailure: Boolean = true @volatile private var exitingOnSendFailure: Boolean = false + private var passThroughEnabled: Boolean = false + private val PASS_THROUGH_MAGIC_VALUE = "__passThroughMagicValue"; + + val recordHeadersV1: Headers = { + val magicValueV1 = Array[Byte] { + RecordBatch.MAGIC_VALUE_V1 + } + new RecordHeaders().add(new RecordHeader(PASS_THROUGH_MAGIC_VALUE, magicValueV1)) + } + + val recordHeadersV2: Headers = { + val magicValueV2 = Array[Byte] { + RecordBatch.MAGIC_VALUE_V2 + } + new RecordHeaders().add(new RecordHeader(PASS_THROUGH_MAGIC_VALUE, magicValueV2)) + } // If a message send failed after retries are exhausted. The offset of the messages will also be removed from // the unacked offset list to avoid offset commit being stuck on that offset. In this case, the offset of that @@ -214,6 +232,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { producerProps.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer") producerProps.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer") if (options.has(passthroughCompressionOpt)) { + passThroughEnabled = true consumerProps.setProperty("enable.shallow.iterator", "true") producerProps.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG, "passthrough") } @@ -253,7 +272,11 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { else CoreUtils.createObject[MirrorMakerMessageHandler](customMessageHandlerClass) } else { - defaultMirrorMakerMessageHandler + if (passThroughEnabled) { + passThroughMirrorMakerMessageHandler + } else { + defaultMirrorMakerMessageHandler + } } } } catch { @@ -362,6 +385,17 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { record.value, record.headers) + private def toBaseConsumerRecordWithPassThrough(record: PassThroughConsumerRecord[Array[Byte], Array[Byte]]): BaseConsumerRecord = + BaseConsumerRecord(record.topic, + record.partition, + record.offset, + record.timestamp, + record.timestampType, + record.key, + record.value, + record.headers, + record.magic) + override def run() { info("Starting mirror maker thread " + threadName) try { @@ -377,7 +411,13 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { } else { trace("Sending message with null value and offset %d.".format(data.offset)) } - val records = messageHandler.handle(toBaseConsumerRecord(data)) + val records = { + if (passThroughEnabled) { + messageHandler.handle(toBaseConsumerRecordWithPassThrough(data.asInstanceOf[PassThroughConsumerRecord[Array[Byte], Array[Byte]]])) + } else { + messageHandler.handle(toBaseConsumerRecord(data)) + } + } records.asScala.foreach(producer.send) maybeFlushAndCommitOffsets() } @@ -587,6 +627,20 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { } } + private[tools] object passThroughMirrorMakerMessageHandler extends MirrorMakerMessageHandler { + override def handle(record: BaseConsumerRecord): util.List[ProducerRecord[Array[Byte], Array[Byte]]] = { + val timestamp: java.lang.Long = if (record.timestamp == RecordBatch.NO_TIMESTAMP) null else record.timestamp + // It is assumed that we don't have message format V0 anymore at Linkedin + if (record.magic.equals(RecordBatch.MAGIC_VALUE_V1)) { + Collections.singletonList(new ProducerRecord(record.topic, null, timestamp, record.key, record.value, recordHeadersV1)) + } else if (record.magic.equals(RecordBatch.MAGIC_VALUE_V2)) { + Collections.singletonList(new ProducerRecord(record.topic, null, timestamp, record.key, record.value, recordHeadersV2)) + } else { + throw new IllegalArgumentException("Record Batch with magic value : " + record.magic + ", is not supported in PassThrough mode") + } + } + } + private class NoRecordsException extends RuntimeException }