Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Common] Rename DefaultLogRecordBatch to MemoryLogRecordBatch #235

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import com.alibaba.fluss.record.DefaultKvRecord;
import com.alibaba.fluss.record.DefaultKvRecordBatch;
import com.alibaba.fluss.record.DefaultLogRecord;
import com.alibaba.fluss.record.DefaultLogRecordBatch;
import com.alibaba.fluss.record.MemoryLogRecordBatch;
import com.alibaba.fluss.row.InternalRow;

import javax.annotation.Nullable;
Expand Down Expand Up @@ -79,7 +79,7 @@ public WriteRecord(
// TODO: row maybe not IndexedRow, which can't be estimated size
// and the size maybe not accurate when the format is arrow.
: DefaultLogRecord.sizeOf(row)
+ DefaultLogRecordBatch.RECORD_BATCH_HEADER_SIZE;
+ MemoryLogRecordBatch.RECORD_BATCH_HEADER_SIZE;
}

public PhysicalTablePath getPhysicalTablePath() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@
import com.alibaba.fluss.memory.MemorySegmentOutputView;
import com.alibaba.fluss.metadata.TableBucket;
import com.alibaba.fluss.record.DefaultLogRecord;
import com.alibaba.fluss.record.DefaultLogRecordBatch;
import com.alibaba.fluss.record.LogRecord;
import com.alibaba.fluss.record.LogRecordReadContext;
import com.alibaba.fluss.record.MemoryLogRecordBatch;
import com.alibaba.fluss.record.MemoryLogRecordsIndexedBuilder;
import com.alibaba.fluss.record.RowKind;
import com.alibaba.fluss.record.bytesview.BytesView;
Expand Down Expand Up @@ -65,7 +65,7 @@ void testTryAppendWithWriteLimit() throws Exception {

for (int i = 0;
i
< (writeLimit - DefaultLogRecordBatch.RECORD_BATCH_HEADER_SIZE)
< (writeLimit - MemoryLogRecordBatch.RECORD_BATCH_HEADER_SIZE)
/ estimatedSizeInBytes;
i++) {
boolean appendResult =
Expand All @@ -89,7 +89,7 @@ void testToBytes() throws Exception {
logProducerBatch.close();
logProducerBatch.serialize();
BytesView bytesView = logProducerBatch.build();
DefaultLogRecordBatch recordBatch = new DefaultLogRecordBatch();
MemoryLogRecordBatch recordBatch = new MemoryLogRecordBatch();
MemorySegmentBytesView firstBytesView = (MemorySegmentBytesView) bytesView;
recordBatch.pointTo(firstBytesView.getMemorySegment(), firstBytesView.getPosition());
assertDefaultLogRecordBatchEquals(recordBatch);
Expand Down Expand Up @@ -164,7 +164,7 @@ private IndexedLogWriteBatch createLogWriteBatch(
new MemorySegmentOutputView(memorySegment)));
}

private void assertDefaultLogRecordBatchEquals(DefaultLogRecordBatch recordBatch) {
private void assertDefaultLogRecordBatchEquals(MemoryLogRecordBatch recordBatch) {
assertThat(recordBatch.getRecordCount()).isEqualTo(1);
assertThat(recordBatch.baseLogOffset()).isEqualTo(0L);
assertThat(recordBatch.schemaId()).isEqualTo((short) DATA1_TABLE_INFO.getSchemaId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@
import java.util.Map;
import java.util.stream.Collectors;

import static com.alibaba.fluss.record.DefaultLogRecordBatch.RECORD_BATCH_HEADER_SIZE;
import static com.alibaba.fluss.record.MemoryLogRecordBatch.RECORD_BATCH_HEADER_SIZE;
import static com.alibaba.fluss.record.TestData.DATA1_PHYSICAL_TABLE_PATH;
import static com.alibaba.fluss.record.TestData.DATA1_ROW_TYPE;
import static com.alibaba.fluss.record.TestData.DATA1_SCHEMA;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@

import java.io.IOException;

import static com.alibaba.fluss.record.DefaultLogRecordBatch.LENGTH_LENGTH;
import static com.alibaba.fluss.record.MemoryLogRecordBatch.LENGTH_LENGTH;

/* This file is based on source code of Apache Kafka Project (https://kafka.apache.org/), licensed by the Apache
* Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,12 @@
import java.nio.channels.FileChannel;
import java.util.Objects;

import static com.alibaba.fluss.record.DefaultLogRecordBatch.BASE_OFFSET_OFFSET;
import static com.alibaba.fluss.record.DefaultLogRecordBatch.LENGTH_OFFSET;
import static com.alibaba.fluss.record.DefaultLogRecordBatch.LOG_OVERHEAD;
import static com.alibaba.fluss.record.DefaultLogRecordBatch.MAGIC_LENGTH;
import static com.alibaba.fluss.record.DefaultLogRecordBatch.MAGIC_OFFSET;
import static com.alibaba.fluss.record.DefaultLogRecordBatch.RECORD_BATCH_HEADER_SIZE;
import static com.alibaba.fluss.record.MemoryLogRecordBatch.BASE_OFFSET_OFFSET;
import static com.alibaba.fluss.record.MemoryLogRecordBatch.LENGTH_OFFSET;
import static com.alibaba.fluss.record.MemoryLogRecordBatch.LOG_OVERHEAD;
import static com.alibaba.fluss.record.MemoryLogRecordBatch.MAGIC_LENGTH;
import static com.alibaba.fluss.record.MemoryLogRecordBatch.MAGIC_OFFSET;
import static com.alibaba.fluss.record.MemoryLogRecordBatch.RECORD_BATCH_HEADER_SIZE;

/* This file is based on source code of Apache Kafka Project (https://kafka.apache.org/), licensed by the Apache
* Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for
Expand Down Expand Up @@ -182,7 +182,7 @@ public int sizeInBytes() {
}

private LogRecordBatch toMemoryRecordBatch(ByteBuffer buffer) {
DefaultLogRecordBatch records = new DefaultLogRecordBatch();
MemoryLogRecordBatch records = new MemoryLogRecordBatch();
records.pointTo(MemorySegment.wrap(buffer.array()), 0);
return records;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,11 @@
import java.util.List;
import java.util.Map;

import static com.alibaba.fluss.record.DefaultLogRecordBatch.ARROW_ROWKIND_OFFSET;
import static com.alibaba.fluss.record.DefaultLogRecordBatch.LENGTH_OFFSET;
import static com.alibaba.fluss.record.DefaultLogRecordBatch.LOG_OVERHEAD;
import static com.alibaba.fluss.record.DefaultLogRecordBatch.RECORDS_COUNT_OFFSET;
import static com.alibaba.fluss.record.DefaultLogRecordBatch.RECORD_BATCH_HEADER_SIZE;
import static com.alibaba.fluss.record.MemoryLogRecordBatch.ARROW_ROWKIND_OFFSET;
import static com.alibaba.fluss.record.MemoryLogRecordBatch.LENGTH_OFFSET;
import static com.alibaba.fluss.record.MemoryLogRecordBatch.LOG_OVERHEAD;
import static com.alibaba.fluss.record.MemoryLogRecordBatch.RECORDS_COUNT_OFFSET;
import static com.alibaba.fluss.record.MemoryLogRecordBatch.RECORD_BATCH_HEADER_SIZE;
import static com.alibaba.fluss.utils.FileUtils.readFullyOrFail;
import static com.alibaba.fluss.utils.Preconditions.checkNotNull;
import static com.alibaba.fluss.utils.Preconditions.checkState;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,8 @@
*
* @since 0.1
*/
// TODO rename to MemoryLogRecordBatch
@PublicEvolving
public class DefaultLogRecordBatch implements LogRecordBatch {
public class MemoryLogRecordBatch implements LogRecordBatch {
protected static final int BASE_OFFSET_LENGTH = 8;
public static final int LENGTH_LENGTH = 4;
static final int MAGIC_LENGTH = 1;
Expand Down Expand Up @@ -248,7 +247,7 @@ public boolean equals(Object o) {
return false;
}

DefaultLogRecordBatch that = (DefaultLogRecordBatch) o;
MemoryLogRecordBatch that = (MemoryLogRecordBatch) o;
int sizeInBytes = sizeInBytes();
return sizeInBytes == that.sizeInBytes()
&& segment.equalTo(that.segment, position, that.position, sizeInBytes);
Expand All @@ -262,7 +261,7 @@ public int hashCode() {
private CloseableIterator<LogRecord> rowRecordIterator(RowType rowType, long timestamp) {
DataType[] fieldTypes = rowType.getChildren().toArray(new DataType[0]);
return new LogRecordIterator() {
int position = DefaultLogRecordBatch.this.position + RECORD_BATCH_HEADER_SIZE;
int position = MemoryLogRecordBatch.this.position + RECORD_BATCH_HEADER_SIZE;
int rowId = 0;

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,12 @@ public void clear() {
}

public void ensureValid() {
if (sizeInBytes < DefaultLogRecordBatch.RECORD_BATCH_HEADER_SIZE) {
if (sizeInBytes < MemoryLogRecordBatch.RECORD_BATCH_HEADER_SIZE) {
throw new RuntimeException(
"Record batch is corrupt (the size "
+ sizeInBytes
+ " is smaller than the minimum allowed overhead "
+ DefaultLogRecordBatch.RECORD_BATCH_HEADER_SIZE
+ MemoryLogRecordBatch.RECORD_BATCH_HEADER_SIZE
+ ")");
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,15 @@
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;

import static com.alibaba.fluss.record.DefaultLogRecordBatch.ARROW_ROWKIND_OFFSET;
import static com.alibaba.fluss.record.DefaultLogRecordBatch.BASE_OFFSET_LENGTH;
import static com.alibaba.fluss.record.DefaultLogRecordBatch.CRC_OFFSET;
import static com.alibaba.fluss.record.DefaultLogRecordBatch.LENGTH_LENGTH;
import static com.alibaba.fluss.record.DefaultLogRecordBatch.SCHEMA_ID_OFFSET;
import static com.alibaba.fluss.record.DefaultLogRecordBatch.WRITE_CLIENT_ID_OFFSET;
import static com.alibaba.fluss.record.LogRecordBatch.CURRENT_LOG_MAGIC_VALUE;
import static com.alibaba.fluss.record.LogRecordBatch.NO_BATCH_SEQUENCE;
import static com.alibaba.fluss.record.LogRecordBatch.NO_WRITER_ID;
import static com.alibaba.fluss.record.MemoryLogRecordBatch.ARROW_ROWKIND_OFFSET;
import static com.alibaba.fluss.record.MemoryLogRecordBatch.BASE_OFFSET_LENGTH;
import static com.alibaba.fluss.record.MemoryLogRecordBatch.CRC_OFFSET;
import static com.alibaba.fluss.record.MemoryLogRecordBatch.LENGTH_LENGTH;
import static com.alibaba.fluss.record.MemoryLogRecordBatch.SCHEMA_ID_OFFSET;
import static com.alibaba.fluss.record.MemoryLogRecordBatch.WRITE_CLIENT_ID_OFFSET;
import static com.alibaba.fluss.utils.Preconditions.checkArgument;
import static com.alibaba.fluss.utils.Preconditions.checkNotNull;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,15 @@

import java.io.IOException;

import static com.alibaba.fluss.record.DefaultLogRecordBatch.BASE_OFFSET_LENGTH;
import static com.alibaba.fluss.record.DefaultLogRecordBatch.CRC_OFFSET;
import static com.alibaba.fluss.record.DefaultLogRecordBatch.LENGTH_LENGTH;
import static com.alibaba.fluss.record.DefaultLogRecordBatch.RECORD_BATCH_HEADER_SIZE;
import static com.alibaba.fluss.record.DefaultLogRecordBatch.SCHEMA_ID_OFFSET;
import static com.alibaba.fluss.record.DefaultLogRecordBatch.WRITE_CLIENT_ID_OFFSET;
import static com.alibaba.fluss.record.LogRecordBatch.CURRENT_LOG_MAGIC_VALUE;
import static com.alibaba.fluss.record.LogRecordBatch.NO_BATCH_SEQUENCE;
import static com.alibaba.fluss.record.LogRecordBatch.NO_WRITER_ID;
import static com.alibaba.fluss.record.MemoryLogRecordBatch.BASE_OFFSET_LENGTH;
import static com.alibaba.fluss.record.MemoryLogRecordBatch.CRC_OFFSET;
import static com.alibaba.fluss.record.MemoryLogRecordBatch.LENGTH_LENGTH;
import static com.alibaba.fluss.record.MemoryLogRecordBatch.RECORD_BATCH_HEADER_SIZE;
import static com.alibaba.fluss.record.MemoryLogRecordBatch.SCHEMA_ID_OFFSET;
import static com.alibaba.fluss.record.MemoryLogRecordBatch.WRITE_CLIENT_ID_OFFSET;

/**
* Default builder for {@link MemoryLogRecords} of log records in {@link LogFormat#INDEXED} format.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@

import com.alibaba.fluss.memory.MemorySegment;

import static com.alibaba.fluss.record.DefaultLogRecordBatch.LENGTH_OFFSET;
import static com.alibaba.fluss.record.DefaultLogRecordBatch.LOG_OVERHEAD;
import static com.alibaba.fluss.record.MemoryLogRecordBatch.LENGTH_OFFSET;
import static com.alibaba.fluss.record.MemoryLogRecordBatch.LOG_OVERHEAD;

/**
* A byte buffer backed log input stream. This class avoids the need to copy records by returning
Expand All @@ -43,7 +43,7 @@ public LogRecordBatch nextBatch() {
return null;
}

DefaultLogRecordBatch logRecords = new DefaultLogRecordBatch();
MemoryLogRecordBatch logRecords = new MemoryLogRecordBatch();
logRecords.pointTo(memorySegment, currentPosition);

currentPosition += batchSize;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@

import static org.assertj.core.api.Assertions.assertThat;

/** Test for {@link DefaultLogRecordBatch}. */
public class DefaultLogRecordBatchTest extends LogTestBase {
/** Test for {@link MemoryLogRecordBatch}. */
public class MemoryLogRecordBatchTest extends LogTestBase {

@Test
void testRecordBatchSize() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

import java.util.Iterator;

import static com.alibaba.fluss.record.DefaultLogRecordBatch.LOG_OVERHEAD;
import static com.alibaba.fluss.record.MemoryLogRecordBatch.LOG_OVERHEAD;
import static com.alibaba.fluss.record.TestData.DATA1;
import static org.assertj.core.api.Assertions.assertThat;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
import java.util.Arrays;
import java.util.List;

import static com.alibaba.fluss.record.DefaultLogRecordBatch.ARROW_ROWKIND_OFFSET;
import static com.alibaba.fluss.record.MemoryLogRecordBatch.ARROW_ROWKIND_OFFSET;
import static com.alibaba.fluss.record.TestData.DATA1;
import static com.alibaba.fluss.record.TestData.DATA1_ROW_TYPE;
import static com.alibaba.fluss.testutils.DataTestUtils.row;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import com.alibaba.fluss.metadata.PhysicalTablePath;
import com.alibaba.fluss.metadata.Schema;
import com.alibaba.fluss.metadata.TableBucket;
import com.alibaba.fluss.record.DefaultLogRecordBatch;
import com.alibaba.fluss.record.FileLogRecords;
import com.alibaba.fluss.record.KvRecord;
import com.alibaba.fluss.record.KvRecordBatch;
Expand All @@ -34,6 +33,7 @@
import com.alibaba.fluss.record.LogRecordBatch;
import com.alibaba.fluss.record.LogRecordReadContext;
import com.alibaba.fluss.record.LogRecords;
import com.alibaba.fluss.record.MemoryLogRecordBatch;
import com.alibaba.fluss.record.MemoryLogRecords;
import com.alibaba.fluss.record.MemoryLogRecordsArrowBuilder;
import com.alibaba.fluss.record.MemoryLogRecordsIndexedBuilder;
Expand Down Expand Up @@ -363,7 +363,7 @@ private static MemoryLogRecords createIndexedMemoryLogRecords(
MemoryLogRecords memoryLogRecords = builder.build();
memoryLogRecords.ensureValid();

((DefaultLogRecordBatch) memoryLogRecords.batches().iterator().next())
((MemoryLogRecordBatch) memoryLogRecords.batches().iterator().next())
.setCommitTimestamp(maxTimestamp);
return memoryLogRecords;
}
Expand Down Expand Up @@ -398,7 +398,7 @@ private static MemoryLogRecords createArrowMemoryLogRecords(
MemoryLogRecords memoryLogRecords =
MemoryLogRecords.pointToByteBuffer(bytesView.getByteBuf().nioBuffer());

((DefaultLogRecordBatch) memoryLogRecords.batches().iterator().next())
((MemoryLogRecordBatch) memoryLogRecords.batches().iterator().next())
.setCommitTimestamp(maxTimestamp);
memoryLogRecords.ensureValid();
return memoryLogRecords;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
import java.io.IOException;
import java.util.Optional;

import static com.alibaba.fluss.record.DefaultLogRecordBatch.RECORD_BATCH_HEADER_SIZE;
import static com.alibaba.fluss.record.MemoryLogRecordBatch.RECORD_BATCH_HEADER_SIZE;
import static com.alibaba.fluss.utils.IOUtils.closeQuietly;

/* This file is based on source code of Apache Kafka Project (https://kafka.apache.org/), licensed by the Apache
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,11 @@
import com.alibaba.fluss.metrics.MeterView;
import com.alibaba.fluss.metrics.MetricNames;
import com.alibaba.fluss.metrics.groups.MetricGroup;
import com.alibaba.fluss.record.DefaultLogRecordBatch;
import com.alibaba.fluss.record.FileLogProjection;
import com.alibaba.fluss.record.FileLogRecords;
import com.alibaba.fluss.record.LogRecordBatch;
import com.alibaba.fluss.record.LogRecords;
import com.alibaba.fluss.record.MemoryLogRecordBatch;
import com.alibaba.fluss.record.MemoryLogRecords;
import com.alibaba.fluss.server.log.LocalLog.SegmentDeletionReason;
import com.alibaba.fluss.server.metrics.group.BucketMetricGroup;
Expand Down Expand Up @@ -667,10 +667,10 @@ private AssignResult assignOffsetAndTimestamp(
MemoryLogRecords records, long baseLogOffset, long commitTimestamp) {
long initialOffset = baseLogOffset;
for (LogRecordBatch batch : records.batches()) {
if (batch instanceof DefaultLogRecordBatch) {
DefaultLogRecordBatch defaultLogRecordBatch = (DefaultLogRecordBatch) batch;
defaultLogRecordBatch.setBaseLogOffset(initialOffset);
defaultLogRecordBatch.setCommitTimestamp(commitTimestamp);
if (batch instanceof MemoryLogRecordBatch) {
MemoryLogRecordBatch memoryLogRecordBatch = (MemoryLogRecordBatch) batch;
memoryLogRecordBatch.setBaseLogOffset(initialOffset);
memoryLogRecordBatch.setCommitTimestamp(commitTimestamp);
} else {
throw new FlussRuntimeException(
"Currently, we only support DefaultLogRecordBatch.");
Expand Down