Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-8583: Optimization for SslTransportLayer#write(ByteBuffer) #28

Open
wants to merge 1 commit into
base: 2.0-li
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -630,37 +630,42 @@ public long read(ByteBuffer[] dsts, int offset, int length) throws IOException {
*/

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the commit message, warp or wrap?

@Override
public int write(ByteBuffer src) throws IOException {
int written = 0;
if (state == State.CLOSING)
throw closingException();
if (state != State.READY)
return written;

return 0;
if (!flush(netWriteBuffer))
return written;
return 0;

netWriteBuffer.clear();
SSLEngineResult wrapResult = sslEngine.wrap(src, netWriteBuffer);
netWriteBuffer.flip();
int written = 0;
while (src.remaining() != 0) {
netWriteBuffer.clear();
SSLEngineResult wrapResult = sslEngine.wrap(src, netWriteBuffer);
netWriteBuffer.flip();

//handle ssl renegotiation
if (wrapResult.getHandshakeStatus() != HandshakeStatus.NOT_HANDSHAKING && wrapResult.getStatus() == Status.OK)
throw renegotiationException();
//handle ssl renegotiation

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: whitespace between // and handle

if (wrapResult.getHandshakeStatus() != HandshakeStatus.NOT_HANDSHAKING && wrapResult.getStatus() == Status.OK)
throw renegotiationException();

if (wrapResult.getStatus() == Status.OK) {
written = wrapResult.bytesConsumed();
flush(netWriteBuffer);
} else if (wrapResult.getStatus() == Status.BUFFER_OVERFLOW) {
int currentNetWriteBufferSize = netWriteBufferSize();
netWriteBuffer.compact();
netWriteBuffer = Utils.ensureCapacity(netWriteBuffer, currentNetWriteBufferSize);
netWriteBuffer.flip();
if (netWriteBuffer.limit() >= currentNetWriteBufferSize)
throw new IllegalStateException("SSL BUFFER_OVERFLOW when available data size (" + netWriteBuffer.limit() + ") >= network buffer size (" + currentNetWriteBufferSize + ")");
} else if (wrapResult.getStatus() == Status.BUFFER_UNDERFLOW) {
throw new IllegalStateException("SSL BUFFER_UNDERFLOW during write");
} else if (wrapResult.getStatus() == Status.CLOSED) {
throw new EOFException();
if (wrapResult.getStatus() == Status.OK) {
written += wrapResult.bytesConsumed();
if (!flush(netWriteBuffer)) {
// break if socketChannel can't accept all data in netWriteBuffer
break;
}
// otherwise, we are safe to go to next iteration.
} else if (wrapResult.getStatus() == Status.BUFFER_OVERFLOW) {
int currentNetWriteBufferSize = netWriteBufferSize();
netWriteBuffer.compact();
netWriteBuffer = Utils.ensureCapacity(netWriteBuffer, currentNetWriteBufferSize);
netWriteBuffer.flip();
if (netWriteBuffer.limit() >= currentNetWriteBufferSize) throw new IllegalStateException(
"SSL BUFFER_OVERFLOW when available data size (" + netWriteBuffer.limit() + ") >= network buffer size (" + currentNetWriteBufferSize + ")");
} else if (wrapResult.getStatus() == Status.BUFFER_UNDERFLOW) {
throw new IllegalStateException("SSL BUFFER_UNDERFLOW during write");
} else if (wrapResult.getStatus() == Status.CLOSED) {
throw new EOFException();
}
}
return written;
}
Expand Down