Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PROTON-2287 Improve Symbol decoding cache #39

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,13 @@ public byte get(int index) {
throw new IndexOutOfBoundsException("The given index is not valid: " + index);
}

return _get(index);
}

/**
* Unchecked ie no bound-checks get
*/
private byte _get(int index) {
byte result = 0;

if (index == position) {
Expand Down Expand Up @@ -813,8 +820,8 @@ public int hashCode() {
int remaining = remaining();

if (currentArrayIndex < 0 || remaining <= currentArray.length - currentOffset) {
while (remaining > 0) {
hash = 31 * hash + currentArray[currentOffset + --remaining];
if (remaining > 0) {
hash = Hashing.byteBufferCompatibleHashCode(currentArray, currentOffset, currentOffset + remaining);
}
} else {
hash = hashCodeFromComponents();
Expand Down Expand Up @@ -875,7 +882,7 @@ public boolean equals(Object other) {
return true;
}

if (hasArray() || remaining <= currentArray.length - currentOffset) {
if (remaining <= currentArray.length - currentOffset || hasArray()) {
// Either there is only one array, or the span to compare is within a single chunk of this buffer,
// allowing the compare to directly access the underlying array instead of using slower get methods.
return equals(currentArray, currentOffset, remaining, buffer);
Expand All @@ -885,6 +892,40 @@ public boolean equals(Object other) {
}

private static boolean equals(byte[] buffer, int start, int length, ReadableBuffer other) {
if (other.hasArray()) {
// fast-path: jdk 11 has a vectorized Arrays::equals for ranged comparisons, but
// sadly JDK 8 nope so let's try to save at least bound checks
final int otherStart = other.arrayOffset() + other.position();
return equals(buffer, start, other.array(), otherStart, length);
} else if (other instanceof ByteBufferReader) {
return rawEquals(buffer, start, length, other.byteBuffer());
}
return rawEquals(buffer, start, length, other);
}

private static boolean uncheckedEquals(byte[] buffer, int start, int length, CompositeReadableBuffer other) {
final int position = other.position();
for (int i = 0; i < length; i++) {
if (buffer[start + i] != other._get(position + i)) {
return false;
}
}
return true;
}

private static boolean uncheckedEquals(CompositeReadableBuffer buffer, ByteBuffer other, int length) {
assert buffer.remaining() >= length;
final int otherPosition = other.position();
final int bufferPosition = buffer.position();
for (int i = 0; i < length; i++) {
if (buffer._get(bufferPosition + i) != other.get(otherPosition + i)) {
return false;
}
}
return true;
}

private static boolean rawEquals(byte[] buffer, int start, int length, ByteBuffer other) {
final int position = other.position();
for (int i = 0; i < length; i++) {
if (buffer[start + i] != other.get(position + i)) {
Expand All @@ -894,18 +935,47 @@ private static boolean equals(byte[] buffer, int start, int length, ReadableBuff
return true;
}

private static boolean equals(ReadableBuffer buffer, ReadableBuffer other) {
final int origPos = buffer.position();
try {
for (int i = other.position(); buffer.hasRemaining(); i++) {
if (!equals(buffer.get(), other.get(i))) {
return false;
}
private static boolean rawEquals(byte[] buffer, int start, int length, ReadableBuffer other) {
final int position = other.position();
for (int i = 0; i < length; i++) {
if (buffer[start + i] != other.get(position + i)) {
return false;
}
}
return true;
}

private static boolean equals(byte[] a, int aStart, byte[] b, int bStart, int length) {
for (int i = 0; i < length; i++) {
if (a[aStart + i] != b[bStart + i]) {
return false;
}
return true;
} finally {
buffer.position(origPos);
}
return true;
}

private static boolean equals(CompositeReadableBuffer buffer, ReadableBuffer other) {
final int bufferRemaining = buffer.remaining();
if (other.hasArray()) {
final int otherStart = other.arrayOffset() + other.position();
// check if otherEnd is beyond other limits, because the underline array is just limited by the capacity
if (other.limit() < otherStart + bufferRemaining) {
throw new BufferUnderflowException();
}
return uncheckedEquals(other.array(), otherStart, bufferRemaining, buffer);
}
if (other instanceof ByteBufferReader) {
return uncheckedEquals(buffer, other.byteBuffer(), bufferRemaining);
}
// slow path
final int bufferPosition = buffer.position();
final int otherPosition = other.position();
for (int i = 0; i < bufferRemaining; i++) {
if (buffer._get(bufferPosition + i) != other.get(otherPosition + i)) {
return false;
}
}
return true;
}

@Override
Expand All @@ -923,10 +993,6 @@ public String toString() {
return builder.toString();
}

private static boolean equals(byte x, byte y) {
return x == y;
}

private void maybeMoveToNextArray() {
if (currentArray.length == currentOffset) {
if (currentArrayIndex >= 0 && currentArrayIndex < (contents.size() - 1)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1075,8 +1075,18 @@ void readRaw(final byte[] data, final int offset, final int length)

<V> V readRaw(TypeDecoder<V> decoder, int size)
{
V decode = decoder.decode(this, _buffer.slice().limit(size));
_buffer.position(_buffer.position()+size);
final int originalLimit = _buffer.limit();
final int originalPosition = _buffer.position();
final V decode;
try {
decode = decoder.decode(this, _buffer.limit(originalPosition + size));
} catch (Throwable t) {
_buffer.position(originalPosition);
throw t;
} finally {
_buffer.limit(originalLimit);
}
_buffer.position(originalPosition + size);
return decode;
}

Expand Down
125 changes: 125 additions & 0 deletions proton-j/src/main/java/org/apache/qpid/proton/codec/Hashing.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.proton.codec;

import java.nio.ByteBuffer;

public class Hashing {

private Hashing() {

}

// this const propagation should be already handled by the JIT
// but we do this to make it more readable
private static final int PRIME_1 = 31;
private static final int PRIME_2 = PRIME_1 * PRIME_1;
private static final int PRIME_3 = PRIME_2 * PRIME_1;
private static final int PRIME_4 = PRIME_3 * PRIME_1;
private static final int PRIME_5 = PRIME_4 * PRIME_1;
private static final int PRIME_6 = PRIME_5 * PRIME_1;
private static final int PRIME_7 = PRIME_6 * PRIME_1;
private static final int PRIME_8 = PRIME_7 * PRIME_1;

public static int byteBufferCompatibleHashCode(ByteBuffer byteBuffer) {
if (byteBuffer.hasArray()) {
final int arrayOffset = byteBuffer.arrayOffset();
final int arrayPosition = arrayOffset + byteBuffer.position();
final int arrayLimit = arrayOffset + byteBuffer.limit();
return byteBufferCompatibleHashCode(byteBuffer.array(), arrayPosition, arrayLimit);
}
// direct ByteBuffers does have some heavy-weight bound checks and memory barriers that
// we just hope JIT to be better then us!
return byteBuffer.hashCode();
}

public static int byteBufferCompatibleHashCode(byte[] bytes, int position, int limit) {
int h = 1;
int remaining = limit - position;
if (remaining == 0) {
return h;
}
int index = limit - 1;
// unrolled version
final int bytesCount = remaining & 7;
if (bytesCount > 0) {
assert h == 1;
h = unrolledHashCode(bytes, index, bytesCount, 1);
index -= bytesCount;
}
final long longsCount = remaining >>> 3;
// let's break the data dependency of each per element hash code
// and save bound checks by manual unrolling 8 ops at time
for (int i = 0; i < longsCount; i++) {
final byte b7 = bytes[index];
final byte b6 = bytes[index - 1];
final byte b5 = bytes[index - 2];
final byte b4 = bytes[index - 3];
final byte b3 = bytes[index - 4];
final byte b2 = bytes[index - 5];
final byte b1 = bytes[index - 6];
final byte b0 = bytes[index - 7];
h = PRIME_8 * h +
PRIME_7 * b7 +
PRIME_6 * b6 +
PRIME_5 * b5 +
PRIME_4 * b4 +
PRIME_3 * b3 +
PRIME_2 * b2 +
PRIME_1 * b1 +
b0;
index -= Long.BYTES;
}
return h;
}

private static int unrolledHashCode(byte[] bytes, int index, int bytesCount, int h) {
// there is still the hash data dependency but is more friendly
// then a plain loop, given that we know no loop is needed here
assert bytesCount > 0 && bytesCount < 8;
h = PRIME_1 * h + bytes[index];
if (bytesCount == 1) {
return h;
}
h = PRIME_1 * h + bytes[index - 1];
if (bytesCount == 2) {
return h;
}
h = PRIME_1 * h + bytes[index - 2];
if (bytesCount == 3) {
return h;
}
h = PRIME_1 * h + bytes[index - 3];
if (bytesCount == 4) {
return h;
}
h = PRIME_1 * h + bytes[index - 4];
if (bytesCount == 5) {
return h;
}
h = PRIME_1 * h + bytes[index - 5];
if (bytesCount == 6) {
return h;
}
h = PRIME_1 * h + bytes[index - 6];
return h;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ public interface ReadableBuffer {

final class ByteBufferReader implements ReadableBuffer {

private ByteBuffer buffer;
private final ByteBuffer buffer;

public static ByteBufferReader allocate(int size) {
ByteBuffer allocated = ByteBuffer.allocate(size);
Expand Down Expand Up @@ -522,7 +522,7 @@ public String toString() {

@Override
public int hashCode() {
return buffer.hashCode();
return Hashing.byteBufferCompatibleHashCode(buffer);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public class SymbolType extends AbstractPrimitiveType<Symbol>
private final SymbolEncoding _shortSymbolEncoding;

private final Map<ReadableBuffer, Symbol> _symbolCache = new HashMap<ReadableBuffer, Symbol>();
private DecoderImpl.TypeDecoder<Symbol> _symbolCreator =
private final DecoderImpl.TypeDecoder<Symbol> _symbolCreator =
new DecoderImpl.TypeDecoder<Symbol>()
{
@Override
Expand All @@ -44,7 +44,7 @@ public Symbol decode(DecoderImpl decoder, ReadableBuffer buffer)
Symbol symbol = _symbolCache.get(buffer);
if (symbol == null)
{
byte[] bytes = new byte[buffer.limit()];
byte[] bytes = new byte[buffer.remaining()];
buffer.get(bytes);

String str = new String(bytes, ASCII_CHARSET);
Expand Down
2 changes: 1 addition & 1 deletion tests/performance-jmh/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
<name>Proton-J JMH Performance Tests</name>

<properties>
<jmh-version>1.19</jmh-version>
<jmh-version>1.25.2</jmh-version>
</properties>

<dependencies>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.qpid.proton.codec.ReadableBuffer.ByteBufferReader;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
Expand All @@ -43,15 +44,16 @@
@State(Scope.Benchmark)
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.NANOSECONDS)
@Warmup(iterations = 5, time = 1)
@Measurement(iterations = 5, time = 1)
@Warmup(iterations = 5, time = 400, timeUnit = TimeUnit.MILLISECONDS)
@Measurement(iterations = 5, time = 400, timeUnit = TimeUnit.MILLISECONDS)
@Fork(2)
public class CompositeReadableBufferEqualsBenchmark {

private CompositeReadableBuffer composite;
@Param({"8", "64", "1024"})
@Param({"8", "16", "64"})
private int size;
private ReadableBuffer.ByteBufferReader bufferReader;
@Param({"false", "true"})
@Param({ "false", "true" })
private boolean direct;
@Param({"1", "2"})
private int chunks;
Expand Down Expand Up @@ -97,11 +99,6 @@ public static void main(String[] args) throws RunnerException {
public static void runBenchmark(Class<?> benchmarkClass) throws RunnerException {
final Options opt = new OptionsBuilder()
.include(benchmarkClass.getSimpleName())
.addProfiler(GCProfiler.class)
.shouldDoGC(true)
.warmupIterations(5)
.measurementIterations(5)
.forks(1)
.build();
new Runner(opt).run();
}
Expand Down
Loading