diff --git a/fluss-common/src/main/java/com/alibaba/fluss/record/MemoryLogRecordsArrowBuilder.java b/fluss-common/src/main/java/com/alibaba/fluss/record/MemoryLogRecordsArrowBuilder.java index ed9fcc4e..869eea6e 100644 --- a/fluss-common/src/main/java/com/alibaba/fluss/record/MemoryLogRecordsArrowBuilder.java +++ b/fluss-common/src/main/java/com/alibaba/fluss/record/MemoryLogRecordsArrowBuilder.java @@ -66,6 +66,7 @@ public class MemoryLogRecordsArrowBuilder implements AutoCloseable { private int sizeInBytes; private int recordCount; private boolean isClosed; + private boolean reCalculateSizeInBytes = false; private MemoryLogRecordsArrowBuilder( long baseLogOffset, @@ -203,6 +204,7 @@ public void append(RowKind rowKind, InternalRow row) throws Exception { arrowWriter.writeRow(row); rowKindWriter.writeRowKind(rowKind); + reCalculateSizeInBytes = true; } public long writerId() { @@ -246,12 +248,14 @@ public void deallocate() { } public int getSizeInBytes() { - if (recordCount != arrowWriter.getRecordsCount()) { + if (reCalculateSizeInBytes) { // make size in bytes up-to-date sizeInBytes = ARROW_ROWKIND_OFFSET + rowKindWriter.sizeInBytes() + arrowWriter.sizeInBytes(); recordCount = arrowWriter.getRecordsCount(); } + + reCalculateSizeInBytes = false; return sizeInBytes; } diff --git a/fluss-common/src/main/java/com/alibaba/fluss/row/arrow/ArrowWriterPool.java b/fluss-common/src/main/java/com/alibaba/fluss/row/arrow/ArrowWriterPool.java index 23c1dc14..6ee77ffb 100644 --- a/fluss-common/src/main/java/com/alibaba/fluss/row/arrow/ArrowWriterPool.java +++ b/fluss-common/src/main/java/com/alibaba/fluss/row/arrow/ArrowWriterPool.java @@ -120,7 +120,7 @@ public void close() { } @VisibleForTesting - Map> freeWriters() { + public Map> freeWriters() { return freeWriters; } } diff --git a/fluss-common/src/test/java/com/alibaba/fluss/record/MemoryLogRecordsArrowBuilderTest.java b/fluss-common/src/test/java/com/alibaba/fluss/record/MemoryLogRecordsArrowBuilderTest.java index 847e8c3f..93cceb33 100644 --- a/fluss-common/src/test/java/com/alibaba/fluss/record/MemoryLogRecordsArrowBuilderTest.java +++ b/fluss-common/src/test/java/com/alibaba/fluss/record/MemoryLogRecordsArrowBuilderTest.java @@ -141,6 +141,46 @@ void testIllegalArgument() { "The size of first segment of pagedOutputView is too small, need at least 44 bytes."); } + @Test + void testClose() throws Exception { + int maxSizeInBytes = 1024; + ArrowWriter writer = + provider.getOrCreateWriter(1L, DEFAULT_SCHEMA_ID, 1024, DATA1_ROW_TYPE); + MemoryLogRecordsArrowBuilder builder = createMemoryLogRecordsArrowBuilder(writer, 10, 1024); + List rowKinds = + DATA1.stream().map(row -> RowKind.APPEND_ONLY).collect(Collectors.toList()); + List rows = + DATA1.stream() + .map(object -> row(DATA1_ROW_TYPE, object)) + .collect(Collectors.toList()); + while (!builder.isFull()) { + int rndIndex = RandomUtils.nextInt(0, DATA1.size()); + builder.append(rowKinds.get(rndIndex), rows.get(rndIndex)); + } + assertThat(builder.isFull()).isTrue(); + + String tableSchemaId = 1L + "-" + 1; + assertThat(provider.freeWriters().size()).isEqualTo(0); + int sizeInBytesBeforeClose = builder.getSizeInBytes(); + builder.close(); + builder.serialize(); + builder.setWriterState(1L, 0); + MemoryLogRecords.pointToByteBuffer(builder.build().getByteBuf().nioBuffer()); + assertThat(provider.freeWriters().get(tableSchemaId).size()).isEqualTo(1); + int sizeInBytes = builder.getSizeInBytes(); + assertThat(sizeInBytes).isEqualTo(sizeInBytesBeforeClose); + // get writer again, writer will be initial. + ArrowWriter writer1 = + provider.getOrCreateWriter(1L, DEFAULT_SCHEMA_ID, maxSizeInBytes, DATA1_ROW_TYPE); + assertThat(provider.freeWriters().get(tableSchemaId).size()).isEqualTo(0); + + // Even if the writer has re-initialized, the sizeInBytes should be the same. + assertThat(builder.getSizeInBytes()).isEqualTo(sizeInBytes); + + writer.close(); + writer1.close(); + } + private MemoryLogRecordsArrowBuilder createMemoryLogRecordsArrowBuilder( ArrowWriter writer, int maxPages, int pageSizeInBytes) { conf.set(