Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: 27: ReadableStreamingData should implement bulk read methods #129

Merged
merged 3 commits into from
Dec 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import java.nio.BufferUnderflowException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.Objects;

/**
* A buffer backed by a {@link ByteBuffer} that is a {@link BufferedSequentialData} (and therefore contains
Expand Down Expand Up @@ -759,56 +758,6 @@ public void writeBytes(@NonNull final RandomAccessData src) {
}
}

/**
* {@inheritDoc}
*/
@Override
public int writeBytes(@NonNull final InputStream src, final int maxLength) {
if (!buffer.hasArray()) {
return WritableSequentialData.super.writeBytes(src, maxLength);
}

// Check for a bad length or a null src
Objects.requireNonNull(src);
if (maxLength < 0) {
throw new IllegalArgumentException("The length must be >= 0");
}

// If the length is zero, then we have nothing to read
if (maxLength == 0) {
return 0;
}

// Since we have an inner array, we can just read from the input stream into that
// array over and over until either we read all the bytes we need to, or we hit
// the end of the stream, or we have read all that we can.
final var array = buffer.array();

// We are going to read from the input stream up to either "len" or the number of bytes
// remaining in this buffer, whichever is lesser.
final long numBytesToRead = Math.min(maxLength, remaining());
if (numBytesToRead == 0) {
return 0;
}

try {
int totalBytesRead = 0;
while (totalBytesRead < numBytesToRead) {
int numBytesRead = src.read(array, buffer.position() + buffer.arrayOffset(), (int) numBytesToRead - totalBytesRead);
if (numBytesRead == -1) {
return totalBytesRead;
}

buffer.position(buffer.position() + numBytesRead);
totalBytesRead += numBytesRead;
}

return totalBytesRead;
} catch (IOException e) {
throw new DataAccessException(e);
}
}

/**
* {@inheritDoc}
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
package com.hedera.pbj.runtime.io.buffer;

import com.hedera.pbj.runtime.io.DataAccessException;
import edu.umd.cs.findbugs.annotations.NonNull;
import java.io.IOException;
import java.io.InputStream;
import java.nio.BufferUnderflowException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Objects;

/**
* BufferedData subclass for instances backed by a byte array. Provides slightly more optimized
Expand Down Expand Up @@ -332,4 +336,46 @@ public void writeBytes(@NonNull final ByteBuffer src) {
src.position(Math.toIntExact(srcPos + len));
buffer.position(Math.toIntExact(pos + len));
}

/**
* {@inheritDoc}
*/
@Override
public int writeBytes(@NonNull final InputStream src, final int maxLength) {
// Check for a bad length or a null src
Objects.requireNonNull(src);
if (maxLength < 0) {
throw new IllegalArgumentException("The length must be >= 0");
}

// If the length is zero, then we have nothing to read
if (maxLength == 0) {
return 0;
}

// We are going to read from the input stream up to either "len" or the number of bytes
// remaining in this buffer, whichever is lesser.
final long numBytesToRead = Math.min(maxLength, remaining());
if (numBytesToRead == 0) {
return 0;
}

try {
int pos = buffer.position();
int totalBytesRead = 0;
while (totalBytesRead < numBytesToRead) {
int bytesRead = src.read(array, pos + arrayOffset, (int) numBytesToRead - totalBytesRead);
if (bytesRead == -1) {
buffer.position(pos);
return totalBytesRead;
}
pos += bytesRead;
totalBytesRead += bytesRead;
}
buffer.position(pos);
return totalBytesRead;
} catch (IOException e) {
throw new DataAccessException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@

import com.hedera.pbj.runtime.io.DataAccessException;
import com.hedera.pbj.runtime.io.ReadableSequentialData;
import com.hedera.pbj.runtime.io.buffer.BufferedData;
import edu.umd.cs.findbugs.annotations.NonNull;
import java.io.IOException;
import java.io.InputStream;
import java.nio.BufferUnderflowException;
import java.nio.ByteBuffer;

import static java.util.Objects.requireNonNull;

/**
Expand Down Expand Up @@ -96,12 +99,9 @@ public boolean hasRemaining() {
/** {@inheritDoc} */
@Override
public byte readByte() {
// It should NEVER be possible for position to exceed limit, but being extra safe here
// using >= instead of just ==
if (eof || position >= limit) {
if (!hasRemaining()) {
throw new BufferUnderflowException();
}

// We know result is a byte, because we've already checked for EOF, and we know that
// it will only ever be a byte, unless it is EOF, in which case it is a -1 int.
try {
Expand All @@ -112,7 +112,7 @@ public byte readByte() {
}
position++;
return (byte) result;
} catch (IOException e) {
} catch (final IOException e) {
throw new DataAccessException(e);
}
}
Expand All @@ -130,8 +130,70 @@ public long skip(final long n) {
long numSkipped = in.skip(clamped);
position += numSkipped;
return numSkipped;
} catch (IOException e) {
} catch (final IOException e) {
throw new DataAccessException(e);
}
}

/** {@inheritDoc} */
@Override
public long readBytes(@NonNull final byte[] dst, final int offset, final int maxLength) {
if (maxLength < 0) {
throw new IllegalArgumentException("Negative maxLength not allowed");
}
if ((offset < 0) || (maxLength > dst.length - offset)) {
throw new IndexOutOfBoundsException("Illegal read offset / maxLength");
}
final int len = Math.min(dst.length - offset, maxLength);
if ((len == 0) || !hasRemaining()) {
// Nothing to do
return 0;
}
try {
int bytesRead = in.readNBytes(dst, offset, len);
position += bytesRead;
if (bytesRead < len) {
eof = true;
}
return bytesRead;
} catch (final IOException e) {
throw new DataAccessException(e);
}
}

@Override
public long readBytes(@NonNull final ByteBuffer dst) {
if (!dst.hasArray()) {
return ReadableSequentialData.super.readBytes(dst);
}
if (!hasRemaining()) {
return 0;
}
final byte[] dstArr = dst.array();
final int dstArrOffset = dst.arrayOffset();
final int dstPos = dst.position();
final long len = Math.min(remaining(), dst.remaining());
try {
int bytesRead = in.readNBytes(dstArr, dstPos + dstArrOffset, Math.toIntExact(len));
position += bytesRead;
if (bytesRead < len) {
eof = true;
}
return bytesRead;
} catch (final IOException e) {
throw new DataAccessException(e);
}
}

@Override
public long readBytes(@NonNull final BufferedData dst) {
final long len = Math.min(remaining(), dst.remaining());
int bytesRead = dst.writeBytes(in, Math.toIntExact(len));
artemananiev marked this conversation as resolved.
Show resolved Hide resolved
position += bytesRead;
if (bytesRead < len) {
eof = true;
}
return bytesRead;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,10 @@
import com.hedera.pbj.runtime.io.DataAccessException;
import com.hedera.pbj.runtime.io.WritableSequentialData;
import edu.umd.cs.findbugs.annotations.NonNull;
import java.io.FilterOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.BufferOverflowException;
import java.nio.BufferUnderflowException;
import java.nio.ByteBuffer;
import java.util.Objects;

/**
Expand Down Expand Up @@ -83,7 +81,7 @@ public long limit() {

/** {@inheritDoc} */
@Override
public void limit(long limit) {
public void limit(final long limit) {
// Any attempt to set the limit must be clamped between position on the low end and capacity on the high end.
this.limit = Math.min(capacity(), Math.max(position, limit));
}
Expand Down Expand Up @@ -136,7 +134,7 @@ public long skip(long count) {
* {@inheritDoc}
*/
@Override
public void writeByte(byte b) {
public void writeByte(final byte b) {
if (position >= limit) {
throw new BufferOverflowException();
}
Expand All @@ -153,7 +151,7 @@ public void writeByte(byte b) {
* {@inheritDoc}
*/
@Override
public void writeBytes(@NonNull byte[] src, int offset, int length) {
public void writeBytes(@NonNull final byte[] src, final int offset, final int length) {
if (length < 0) {
throw new IllegalArgumentException("length must be >= 0");
}
Expand All @@ -169,7 +167,7 @@ public void writeBytes(@NonNull byte[] src, int offset, int length) {
try {
out.write(src, offset, length);
position += length;
} catch (IOException e) {
} catch (final IOException e) {
throw new DataAccessException(e);
}
}
Expand All @@ -178,15 +176,43 @@ public void writeBytes(@NonNull byte[] src, int offset, int length) {
* {@inheritDoc}
*/
@Override
public void writeBytes(@NonNull byte[] src) {
public void writeBytes(@NonNull final byte[] src) {
if (src.length > remaining()) {
throw new BufferOverflowException();
}

try {
out.write(src);
position += src.length;
} catch (IOException e) {
} catch (final IOException e) {
throw new DataAccessException(e);
}
}

@Override
public void writeBytes(@NonNull final ByteBuffer src) {
if (!src.hasArray()) {
WritableSequentialData.super.writeBytes(src);
return;
}

if (remaining() < src.remaining()) {
throw new BufferOverflowException();
}

final long len = src.remaining();
if (len == 0) {
// Nothing to do
return;
}

final byte[] srcArr = src.array();
final int srcPos = src.position();
try {
out.write(srcArr, srcPos, Math.toIntExact(len));
position += len;
src.position(Math.toIntExact(srcPos + len));
} catch (final IOException e) {
throw new DataAccessException(e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
import edu.umd.cs.findbugs.annotations.NonNull;
import java.nio.BufferUnderflowException;

final class ReadableSequentialDataTest extends ReadableTestBase {
final class ReadableSequentialDataTest extends ReadableSequentialTestBase {

@NonNull
@Override
protected ReadableSequentialData emptySequence() {
Expand Down
Loading