Skip to content

Commit

Permalink
[log] Fix CRC error for Arrow log caused by memory modification after…
Browse files Browse the repository at this point in the history
… Arrow builder is closed (#153)
  • Loading branch information
swuferhong authored Dec 16, 2024
1 parent e8abfdf commit f0d800b
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ public class MemoryLogRecordsArrowBuilder implements AutoCloseable {
private int sizeInBytes;
private int recordCount;
private boolean isClosed;
private boolean reCalculateSizeInBytes = false;

private MemoryLogRecordsArrowBuilder(
long baseLogOffset,
Expand Down Expand Up @@ -203,6 +204,7 @@ public void append(RowKind rowKind, InternalRow row) throws Exception {

arrowWriter.writeRow(row);
rowKindWriter.writeRowKind(rowKind);
reCalculateSizeInBytes = true;
}

public long writerId() {
Expand Down Expand Up @@ -246,12 +248,14 @@ public void deallocate() {
}

public int getSizeInBytes() {
if (recordCount != arrowWriter.getRecordsCount()) {
if (reCalculateSizeInBytes) {
// make size in bytes up-to-date
sizeInBytes =
ARROW_ROWKIND_OFFSET + rowKindWriter.sizeInBytes() + arrowWriter.sizeInBytes();
recordCount = arrowWriter.getRecordsCount();
}

reCalculateSizeInBytes = false;
return sizeInBytes;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ public void close() {
}

@VisibleForTesting
Map<String, Deque<ArrowWriter>> freeWriters() {
public Map<String, Deque<ArrowWriter>> freeWriters() {
return freeWriters;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,46 @@ void testIllegalArgument() {
"The size of first segment of pagedOutputView is too small, need at least 44 bytes.");
}

@Test
void testClose() throws Exception {
int maxSizeInBytes = 1024;
ArrowWriter writer =
provider.getOrCreateWriter(1L, DEFAULT_SCHEMA_ID, 1024, DATA1_ROW_TYPE);
MemoryLogRecordsArrowBuilder builder = createMemoryLogRecordsArrowBuilder(writer, 10, 1024);
List<RowKind> rowKinds =
DATA1.stream().map(row -> RowKind.APPEND_ONLY).collect(Collectors.toList());
List<InternalRow> rows =
DATA1.stream()
.map(object -> row(DATA1_ROW_TYPE, object))
.collect(Collectors.toList());
while (!builder.isFull()) {
int rndIndex = RandomUtils.nextInt(0, DATA1.size());
builder.append(rowKinds.get(rndIndex), rows.get(rndIndex));
}
assertThat(builder.isFull()).isTrue();

String tableSchemaId = 1L + "-" + 1;
assertThat(provider.freeWriters().size()).isEqualTo(0);
int sizeInBytesBeforeClose = builder.getSizeInBytes();
builder.close();
builder.serialize();
builder.setWriterState(1L, 0);
MemoryLogRecords.pointToByteBuffer(builder.build().getByteBuf().nioBuffer());
assertThat(provider.freeWriters().get(tableSchemaId).size()).isEqualTo(1);
int sizeInBytes = builder.getSizeInBytes();
assertThat(sizeInBytes).isEqualTo(sizeInBytesBeforeClose);
// get writer again, writer will be initial.
ArrowWriter writer1 =
provider.getOrCreateWriter(1L, DEFAULT_SCHEMA_ID, maxSizeInBytes, DATA1_ROW_TYPE);
assertThat(provider.freeWriters().get(tableSchemaId).size()).isEqualTo(0);

// Even if the writer has re-initialized, the sizeInBytes should be the same.
assertThat(builder.getSizeInBytes()).isEqualTo(sizeInBytes);

writer.close();
writer1.close();
}

private MemoryLogRecordsArrowBuilder createMemoryLogRecordsArrowBuilder(
ArrowWriter writer, int maxPages, int pageSizeInBytes) {
conf.set(
Expand Down

0 comments on commit f0d800b

Please sign in to comment.