Skip to content

Commit

Permalink
support integer key offset using a PartitionFilter
Browse files Browse the repository at this point in the history
  • Loading branch information
reugn committed Apr 11, 2021
1 parent d3d50e7 commit 280cec3
Show file tree
Hide file tree
Showing 4 changed files with 97 additions and 26 deletions.
7 changes: 7 additions & 0 deletions src/main/java/com/aerospike/jdbc/model/AerospikeQuery.java
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,13 @@ public List<String> getColumns() {
return columns;
}

public String[] getBinNames() {
if (columns.size() == 1 && columns.get(0).equals("*")) {
return null;
}
return columns.toArray(new String[0]);
}

@Override
public String toString() {
try {
Expand Down
10 changes: 7 additions & 3 deletions src/main/java/com/aerospike/jdbc/query/PolicyBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,25 @@

import java.util.Objects;

import static com.aerospike.jdbc.util.Constants.defaultQueryLimit;

public final class PolicyBuilder {

private PolicyBuilder() {
}

public static ScanPolicy buildScanPolicy(AerospikeQuery query) {
ScanPolicy scanPolicy = new ScanPolicy();
scanPolicy.maxRecords = Objects.isNull(query.getLimit()) ? defaultQueryLimit : query.getLimit();
scanPolicy.maxRecords = Objects.isNull(query.getLimit()) ? 0 : query.getLimit();
Exp expression = ExpressionBuilder.buildExp(query.getWhere());
scanPolicy.filterExp = Objects.isNull(expression) ? null : Exp.build(expression);
return scanPolicy;
}

public static ScanPolicy buildScanNoBinDataPolicy(AerospikeQuery query) {
ScanPolicy scanPolicy = buildScanPolicy(query);
scanPolicy.includeBinData = false;
return scanPolicy;
}

public static WritePolicy buildWritePolicy(AerospikeQuery query) {
WritePolicy writePolicy = new WritePolicy();
writePolicy.sendKey = true;
Expand Down
34 changes: 11 additions & 23 deletions src/main/java/com/aerospike/jdbc/query/SelectQueryHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,8 @@
import com.aerospike.jdbc.model.AerospikeQuery;
import com.aerospike.jdbc.model.DataColumn;
import com.aerospike.jdbc.model.Pair;
import com.aerospike.jdbc.scan.EventLoopProvider;
import com.aerospike.jdbc.scan.PartitionScanHandler;
import com.aerospike.jdbc.scan.RecordSet;
import com.aerospike.jdbc.scan.ScanRecordSequenceListener;
import com.aerospike.jdbc.schema.AerospikeSchemaBuilder;
import com.aerospike.jdbc.sql.AerospikeRecordResultSet;
import com.aerospike.jdbc.util.IOUtils;
Expand All @@ -27,6 +26,7 @@
import java.util.logging.Logger;
import java.util.stream.Collectors;

import static com.aerospike.jdbc.query.PolicyBuilder.buildScanNoBinDataPolicy;
import static com.aerospike.jdbc.query.PolicyBuilder.buildScanPolicy;
import static com.aerospike.jdbc.util.AerospikeUtils.getTableRecordsNumber;

Expand Down Expand Up @@ -63,14 +63,11 @@ private Pair<ResultSet, Integer> executeCountQuery(AerospikeQuery query) {
if (Objects.isNull(query.getWhere())) {
recordNumber = getTableRecordsNumber(client, query.getSchema(), query.getTable());
} else {
ScanRecordSequenceListener listener = new ScanRecordSequenceListener();
ScanPolicy scanPolicy = buildScanPolicy(query);
scanPolicy.includeBinData = false;
client.scanAll(EventLoopProvider.getEventLoop(), listener, scanPolicy, query.getSchema(),
query.getTable());
ScanPolicy policy = buildScanNoBinDataPolicy(query);
RecordSet recordSet = PartitionScanHandler.create(client).scanPartition(policy, query);

final AtomicInteger count = new AtomicInteger();
listener.getRecordSet().forEach(r -> count.incrementAndGet());
recordSet.forEach(r -> count.incrementAndGet());
recordNumber = count.get();
}
com.aerospike.client.Record record = new com.aerospike.client.Record(Collections.singletonMap(
Expand All @@ -90,7 +87,7 @@ private Pair<ResultSet, Integer> executeCountQuery(AerospikeQuery query) {
private Pair<ResultSet, Integer> executeSelectByPrimaryKey(AerospikeQuery query, Value primaryKey) {
logger.info("SELECT PK");
Key key = new Key(query.getSchema(), query.getTable(), primaryKey);
com.aerospike.client.Record record = client.get(null, key, getBinNames(query));
com.aerospike.client.Record record = client.get(null, key, query.getBinNames());

RecordSet recordSet;
if (Objects.nonNull(record)) {
Expand All @@ -103,19 +100,17 @@ private Pair<ResultSet, Integer> executeSelectByPrimaryKey(AerospikeQuery query,
recordSet.end();

return new Pair<>(new AerospikeRecordResultSet(recordSet, statement, query.getSchema(),
query.getTable(), filterColumns(columns, getBinNames(query))), -1);
query.getTable(), filterColumns(columns, query.getBinNames())), -1);
}

private Pair<ResultSet, Integer> executeScanQuery(AerospikeQuery query) {
logger.info("SELECT scan");
ScanRecordSequenceListener listener = new ScanRecordSequenceListener();
ScanPolicy scanPolicy = buildScanPolicy(query);
client.scanAll(EventLoopProvider.getEventLoop(), listener, scanPolicy, query.getSchema(),
query.getTable(), getBinNames(query));
RecordSet recordSet = listener.getRecordSet();

ScanPolicy policy = buildScanPolicy(query);
RecordSet recordSet = PartitionScanHandler.create(client).scanPartition(policy, query);

return new Pair<>(new AerospikeRecordResultSet(recordSet, statement, query.getSchema(),
query.getTable(), filterColumns(columns, getBinNames(query))), -1);
query.getTable(), filterColumns(columns, query.getBinNames())), -1);
}

private boolean isCount(AerospikeQuery query) {
Expand All @@ -129,11 +124,4 @@ private List<DataColumn> filterColumns(List<DataColumn> columns, String[] select
return columns.stream().filter(c -> list.contains(c.getName())).collect(Collectors.toList());
}

private String[] getBinNames(AerospikeQuery query) {
List<String> columns = query.getColumns();
if (columns.size() == 1 && columns.get(0).equals("*")) {
return null;
}
return columns.toArray(new String[0]);
}
}
72 changes: 72 additions & 0 deletions src/main/java/com/aerospike/jdbc/scan/PartitionScanHandler.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package com.aerospike.jdbc.scan;

import com.aerospike.client.IAerospikeClient;
import com.aerospike.client.Key;
import com.aerospike.client.ScanCallback;
import com.aerospike.client.cluster.Node;
import com.aerospike.client.cluster.Partition;
import com.aerospike.client.policy.ScanPolicy;
import com.aerospike.client.query.PartitionFilter;
import com.aerospike.jdbc.model.AerospikeQuery;

import java.util.Objects;
import java.util.logging.Logger;

public class PartitionScanHandler {

private static final Logger logger = Logger.getLogger(PartitionScanHandler.class.getName());

private final IAerospikeClient client;

private ScanRecordSequenceListener listener;

private int currentPartition;
private int count;

public PartitionScanHandler(IAerospikeClient client) {
this.client = client;
this.listener = new ScanRecordSequenceListener();
}

public RecordSet scanPartition(ScanPolicy scanPolicy, AerospikeQuery query) {
if (Objects.nonNull(query.getOffset())) {
long maxRecords = scanPolicy.maxRecords;
PartitionFilter filter = getPartitionFilter(query);
while (isScanRequired(maxRecords)) {
client.scanPartitions(scanPolicy, filter, query.getSchema(), query.getTable(),
callback, query.getBinNames());
scanPolicy.maxRecords = maxRecords > 0 ? maxRecords - count : maxRecords;
filter = PartitionFilter.id(++currentPartition);
}
listener.onSuccess();
} else {
logger.info("scanAll");
client.scanAll(null, listener, scanPolicy, query.getSchema(),
query.getTable(), query.getBinNames());
}
return listener.getRecordSet();
}

private PartitionFilter getPartitionFilter(AerospikeQuery query) {
Key key = new Key(query.getSchema(), query.getTable(), query.getOffset());
currentPartition = Partition.getPartitionId(key.digest);
return PartitionFilter.after(key);
}

private boolean isScanRequired(final long maxRecords) {
return (maxRecords == 0 || count < maxRecords) && isValidPartition();
}

private boolean isValidPartition() {
return currentPartition >= 0 && currentPartition < Node.PARTITIONS;
}

private final ScanCallback callback = ((key, record) -> {
listener.onRecord(key, record);
count++;
});

public static PartitionScanHandler create(IAerospikeClient client) {
return new PartitionScanHandler(client);
}
}

0 comments on commit 280cec3

Please sign in to comment.