diff --git a/connect-file-pulse-filesystems/filepulse-commons-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/reader/text/RowFileInputIteratorBuilder.java b/connect-file-pulse-filesystems/filepulse-commons-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/reader/text/RowFileInputIteratorBuilder.java index fd37dd3f9..d865dd98e 100644 --- a/connect-file-pulse-filesystems/filepulse-commons-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/reader/text/RowFileInputIteratorBuilder.java +++ b/connect-file-pulse-filesystems/filepulse-commons-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/reader/text/RowFileInputIteratorBuilder.java @@ -23,6 +23,8 @@ import io.streamthoughts.kafka.connect.filepulse.reader.FileInputIterator; import io.streamthoughts.kafka.connect.filepulse.source.FileObjectMeta; import io.streamthoughts.kafka.connect.filepulse.source.FileRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.File; import java.nio.charset.Charset; @@ -34,6 +36,8 @@ */ public class RowFileInputIteratorBuilder { + private static final Logger LOG = LoggerFactory.getLogger(RowFileInputIteratorBuilder.class); + private Charset charset = StandardCharsets.UTF_8; private int minNumReadRecords = 1; private FileObjectMeta metadata; @@ -91,15 +95,17 @@ public FileInputIterator> build() { .setMaxWaitMs(waitMaxMs); if (skipFooters > 0) { + LOG.debug("Decorate RowFileInputIterator with RowFileWithFooterInputIterator"); iterator = new RowFileWithFooterInputIterator( skipFooters, - new File(metadata.uri()), + metadata.uri(), charset, iterator ); } if (skipHeaders > 0) { + LOG.debug("Decorate RowFileInputIterator with RowFileWithHeadersInputIterator"); iterator = new RowFileWithHeadersInputIterator( skipHeaders, readerSupplier, diff --git a/connect-file-pulse-filesystems/filepulse-commons-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/reader/text/RowFileWithFooterInputIterator.java b/connect-file-pulse-filesystems/filepulse-commons-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/reader/text/RowFileWithFooterInputIterator.java index ad8bf5799..33c51625b 100644 --- a/connect-file-pulse-filesystems/filepulse-commons-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/reader/text/RowFileWithFooterInputIterator.java +++ b/connect-file-pulse-filesystems/filepulse-commons-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/reader/text/RowFileWithFooterInputIterator.java @@ -29,6 +29,7 @@ import org.slf4j.LoggerFactory; import java.io.File; +import java.net.URI; import java.nio.charset.Charset; import java.util.Collections; import java.util.List; @@ -61,12 +62,12 @@ public class RowFileWithFooterInputIterator extends RowFileInputIteratorDecorato private List footersStrings; public RowFileWithFooterInputIterator(final int skipFooters, - final File file, + final URI uri, final Charset charset, final FileInputIterator> iterator) { super(iterator); this.skipFooters = skipFooters; - this.file = file; + this.file = new File(uri); this.charset = charset; } diff --git a/connect-file-pulse-filesystems/filepulse-commons-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/reader/text/internal/ReversedInputFileReader.java b/connect-file-pulse-filesystems/filepulse-commons-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/reader/text/internal/ReversedInputFileReader.java index a7bf02baa..571308cec 100644 --- a/connect-file-pulse-filesystems/filepulse-commons-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/reader/text/internal/ReversedInputFileReader.java +++ b/connect-file-pulse-filesystems/filepulse-commons-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/reader/text/internal/ReversedInputFileReader.java @@ -26,8 +26,7 @@ import java.util.List; /** - * ReversedInputFileReader is attend to be used to read a fixed number of lines from bottom of a file. - * + * ReversedInputFileReader is attended to be used to read a fixed number of lines from bottom of a file. * This class is not optimized for reading a whole file from bottom to top. */ public class ReversedInputFileReader implements AutoCloseable { @@ -136,7 +135,7 @@ public List readLines(int minRecords) throws IOException { input.readFully(buffer, 0, nread); - // Reset buffer cursor to beginning before reading before. + // Reset buffer cursor to beginning before reading. bufferOffset = 0; TextBlock line; do {