Skip to content

Commit

Permalink
refactor(filesystem): remove local fs dependency from RowFileInputIte…
Browse files Browse the repository at this point in the history
…ratorBuilder
  • Loading branch information
fhussonnois committed Mar 28, 2023
1 parent 7313451 commit 4ad472c
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import io.streamthoughts.kafka.connect.filepulse.reader.FileInputIterator;
import io.streamthoughts.kafka.connect.filepulse.source.FileObjectMeta;
import io.streamthoughts.kafka.connect.filepulse.source.FileRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.nio.charset.Charset;
Expand All @@ -34,6 +36,8 @@
*/
public class RowFileInputIteratorBuilder {

private static final Logger LOG = LoggerFactory.getLogger(RowFileInputIteratorBuilder.class);

private Charset charset = StandardCharsets.UTF_8;
private int minNumReadRecords = 1;
private FileObjectMeta metadata;
Expand Down Expand Up @@ -91,15 +95,17 @@ public FileInputIterator<FileRecord<TypedStruct>> build() {
.setMaxWaitMs(waitMaxMs);

if (skipFooters > 0) {
LOG.debug("Decorate RowFileInputIterator with RowFileWithFooterInputIterator");
iterator = new RowFileWithFooterInputIterator(
skipFooters,
new File(metadata.uri()),
metadata.uri(),
charset,
iterator
);
}

if (skipHeaders > 0) {
LOG.debug("Decorate RowFileInputIterator with RowFileWithHeadersInputIterator");
iterator = new RowFileWithHeadersInputIterator(
skipHeaders,
readerSupplier,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.slf4j.LoggerFactory;

import java.io.File;
import java.net.URI;
import java.nio.charset.Charset;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -61,12 +62,12 @@ public class RowFileWithFooterInputIterator extends RowFileInputIteratorDecorato
private List<String> footersStrings;

public RowFileWithFooterInputIterator(final int skipFooters,
final File file,
final URI uri,
final Charset charset,
final FileInputIterator<FileRecord<TypedStruct>> iterator) {
super(iterator);
this.skipFooters = skipFooters;
this.file = file;
this.file = new File(uri);
this.charset = charset;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@
import java.util.List;

/**
* ReversedInputFileReader is attend to be used to read a fixed number of lines from bottom of a file.
*
* ReversedInputFileReader is attended to be used to read a fixed number of lines from bottom of a file.
* This class is not optimized for reading a whole file from bottom to top.
*/
public class ReversedInputFileReader implements AutoCloseable {
Expand Down Expand Up @@ -136,7 +135,7 @@ public List<TextBlock> readLines(int minRecords) throws IOException {

input.readFully(buffer, 0, nread);

// Reset buffer cursor to beginning before reading before.
// Reset buffer cursor to beginning before reading.
bufferOffset = 0;
TextBlock line;
do {
Expand Down

0 comments on commit 4ad472c

Please sign in to comment.