From b95540fb3bc53811a246c7bb12ab78ed0fe1ee4a Mon Sep 17 00:00:00 2001 From: yunhong <337361684@qq.com> Date: Wed, 18 Dec 2024 20:46:42 +0800 Subject: [PATCH] [kv] Support index lookup for primary key table --- .../fluss/client/lookup/AbstractLookup.java | 41 ++++ .../client/lookup/AbstractLookupBatch.java | 47 +++++ .../fluss/client/lookup/IndexLookup.java | 70 +++++++ .../fluss/client/lookup/IndexLookupBatch.java | 65 ++++++ .../alibaba/fluss/client/lookup/Lookup.java | 15 +- .../fluss/client/lookup/LookupBatch.java | 15 +- .../fluss/client/lookup/LookupClient.java | 16 +- .../fluss/client/lookup/LookupQueue.java | 74 +++++-- .../{table => lookup}/LookupResult.java | 23 +-- .../fluss/client/lookup/LookupSender.java | 193 +++++++++++++++--- .../fluss/client/lookup/LookupType.java | 26 +++ .../fluss/client/table/FlussTable.java | 84 ++++++-- .../com/alibaba/fluss/client/table/Table.java | 16 ++ .../client/table/writer/UpsertWriter.java | 22 +- .../client/utils/ClientRpcMessageUtils.java | 19 ++ .../fluss/client/admin/FlussAdminITCase.java | 30 +++ .../table/FlussPartitionedTableITCase.java | 3 +- .../fluss/client/table/FlussTableITCase.java | 64 +++++- .../alibaba/fluss/config/ConfigOptions.java | 22 +- .../exception/InvalidIndexKeysException.java | 32 +++ .../fluss/metadata/TableDescriptor.java | 99 +++++++++ .../alibaba/fluss/metrics/MetricNames.java | 4 + .../alibaba/fluss/row/encode/KeyEncoder.java | 2 +- .../com/alibaba/fluss/utils/BytesUtils.java | 16 ++ .../fluss/testutils/DataTestUtils.java | 22 +- .../flink/FlinkConnectorOptions.java | 12 ++ .../connector/flink/catalog/FlinkCatalog.java | 16 +- .../flink/catalog/FlinkTableFactory.java | 21 +- .../flink/source/FlinkTableSource.java | 22 +- .../lookup/FlinkAsyncLookupFunction.java | 66 +++--- .../source/lookup/FlinkLookupFunction.java | 50 +++-- .../flink/source/lookup/LookupNormalizer.java | 116 +++++++++-- .../utils/FlinkConnectorOptionsUtils.java | 87 +++++++- .../flink/utils/FlinkConversions.java | 6 + .../rpc/gateway/TabletServerGateway.java | 10 + .../rpc/netty/server/RequestsMetrics.java | 2 + .../alibaba/fluss/rpc/protocol/ApiKeys.java | 3 +- .../alibaba/fluss/rpc/protocol/Errors.java | 5 +- fluss-rpc/src/main/proto/FlussApi.proto | 31 +++ .../coordinator/CoordinatorService.java | 6 + .../com/alibaba/fluss/server/kv/KvTablet.java | 9 + .../fluss/server/kv/rocksdb/RocksDBKv.java | 30 ++- .../group/PhysicalTableMetricGroup.java | 28 +++ .../alibaba/fluss/server/replica/Replica.java | 38 ++++ .../fluss/server/replica/ReplicaManager.java | 48 +++++ .../fluss/server/tablet/TabletService.java | 10 + .../fluss/server/utils/RpcMessageUtils.java | 24 +++ .../log/remote/RemoteLogManagerTest.java | 12 +- .../server/replica/ReplicaManagerTest.java | 34 ++- .../fluss/server/replica/ReplicaTestBase.java | 27 ++- .../server/tablet/TabletServiceITCase.java | 174 +++++++++++++++- .../tablet/TestTabletServerGateway.java | 7 + .../fluss/server/testutils/KvTestUtils.java | 22 ++ .../server/testutils/RpcMessageTestUtils.java | 13 ++ website/docs/maintenance/configuration.md | 3 +- website/docs/maintenance/monitor-metrics.md | 24 ++- 56 files changed, 1762 insertions(+), 214 deletions(-) create mode 100644 fluss-client/src/main/java/com/alibaba/fluss/client/lookup/AbstractLookup.java create mode 100644 fluss-client/src/main/java/com/alibaba/fluss/client/lookup/AbstractLookupBatch.java create mode 100644 fluss-client/src/main/java/com/alibaba/fluss/client/lookup/IndexLookup.java create mode 100644 fluss-client/src/main/java/com/alibaba/fluss/client/lookup/IndexLookupBatch.java rename fluss-client/src/main/java/com/alibaba/fluss/client/{table => lookup}/LookupResult.java (72%) create mode 100644 fluss-client/src/main/java/com/alibaba/fluss/client/lookup/LookupType.java create mode 100644 fluss-common/src/main/java/com/alibaba/fluss/exception/InvalidIndexKeysException.java diff --git a/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/AbstractLookup.java b/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/AbstractLookup.java new file mode 100644 index 00000000..5f5c8537 --- /dev/null +++ b/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/AbstractLookup.java @@ -0,0 +1,41 @@ +/* + * Copyright (c) 2024 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.client.lookup; + +import com.alibaba.fluss.annotation.Internal; + +import java.util.List; +import java.util.concurrent.CompletableFuture; + +/** Abstract Class to represent a lookup operation. */ +@Internal +public abstract class AbstractLookup { + + private final byte[] key; + + public AbstractLookup(byte[] key) { + this.key = key; + } + + public byte[] key() { + return key; + } + + public abstract LookupType lookupType(); + + public abstract CompletableFuture> future(); +} diff --git a/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/AbstractLookupBatch.java b/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/AbstractLookupBatch.java new file mode 100644 index 00000000..229fe826 --- /dev/null +++ b/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/AbstractLookupBatch.java @@ -0,0 +1,47 @@ +/* + * Copyright (c) 2024 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.client.lookup; + +import com.alibaba.fluss.annotation.Internal; + +import java.util.ArrayList; +import java.util.List; + +/** An abstract lookup batch. */ +@Internal +public abstract class AbstractLookupBatch { + + protected final List lookups; + + public AbstractLookupBatch() { + this.lookups = new ArrayList<>(); + } + + public void addLookup(AbstractLookup lookup) { + lookups.add(lookup); + } + + public List lookups() { + return lookups; + } + + /** Complete the lookup operations using given values . */ + public abstract void complete(List> values); + + /** Complete the get operations with given exception. */ + public abstract void completeExceptionally(Exception exception); +} diff --git a/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/IndexLookup.java b/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/IndexLookup.java new file mode 100644 index 00000000..f1bbea14 --- /dev/null +++ b/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/IndexLookup.java @@ -0,0 +1,70 @@ +/* + * Copyright (c) 2024 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.client.lookup; + +import com.alibaba.fluss.annotation.Internal; + +import java.util.List; +import java.util.concurrent.CompletableFuture; + +/** + * Class to represent an index lookup operation, it contains the table id, bucketNums and related + * CompletableFuture. + */ +@Internal +public class IndexLookup extends AbstractLookup { + private final int destination; + private final long tableId; + private final int bucketId; + private final String indexName; + private final CompletableFuture> future; + + public IndexLookup( + int destination, long tableId, int bucketId, String indexName, byte[] indexKey) { + super(indexKey); + this.destination = destination; + this.tableId = tableId; + this.bucketId = bucketId; + this.indexName = indexName; + this.future = new CompletableFuture<>(); + } + + public int destination() { + return destination; + } + + public long tableId() { + return tableId; + } + + public int bucketId() { + return bucketId; + } + + public String indexName() { + return indexName; + } + + public CompletableFuture> future() { + return future; + } + + @Override + public LookupType lookupType() { + return LookupType.INDEX_LOOKUP; + } +} diff --git a/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/IndexLookupBatch.java b/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/IndexLookupBatch.java new file mode 100644 index 00000000..043a510f --- /dev/null +++ b/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/IndexLookupBatch.java @@ -0,0 +1,65 @@ +/* + * Copyright (c) 2024 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.client.lookup; + +import com.alibaba.fluss.annotation.Internal; +import com.alibaba.fluss.exception.FlussRuntimeException; +import com.alibaba.fluss.metadata.TableBucket; + +import java.util.List; + +/** + * A batch that contains the index operations that send to same destination and some table together. + */ +@Internal +public class IndexLookupBatch extends AbstractLookupBatch { + + private final TableBucket tableBucket; + + public IndexLookupBatch(TableBucket tableBucket) { + super(); + this.tableBucket = tableBucket; + } + + public TableBucket tableBucket() { + return tableBucket; + } + + @Override + public void complete(List> values) { + if (values.size() != lookups.size()) { + completeExceptionally( + new FlussRuntimeException( + String.format( + "The number of values return by index lookup request is not equal to the number of " + + "index lookups send. Got %d values, but expected %d.", + values.size(), lookups.size()))); + } else { + for (int i = 0; i < values.size(); i++) { + AbstractLookup lookup = lookups.get(i); + lookup.future().complete(values.get(i)); + } + } + } + + @Override + public void completeExceptionally(Exception exception) { + for (AbstractLookup get : lookups) { + get.future().completeExceptionally(exception); + } + } +} diff --git a/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/Lookup.java b/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/Lookup.java index b4dd1ada..49716559 100644 --- a/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/Lookup.java +++ b/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/Lookup.java @@ -19,6 +19,7 @@ import com.alibaba.fluss.annotation.Internal; import com.alibaba.fluss.metadata.TableBucket; +import java.util.List; import java.util.concurrent.CompletableFuture; /** @@ -26,15 +27,14 @@ * from, the bytes of the key, and a future for the lookup operation. */ @Internal -public class Lookup { +public class Lookup extends AbstractLookup { private final TableBucket tableBucket; - private final byte[] key; - private final CompletableFuture future; + private final CompletableFuture> future; Lookup(TableBucket tableBucket, byte[] key) { + super(key); this.tableBucket = tableBucket; - this.key = key; this.future = new CompletableFuture<>(); } @@ -42,11 +42,12 @@ public TableBucket tableBucket() { return tableBucket; } - public byte[] key() { - return key; + @Override + public LookupType lookupType() { + return LookupType.LOOKUP; } - public CompletableFuture future() { + public CompletableFuture> future() { return future; } } diff --git a/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/LookupBatch.java b/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/LookupBatch.java index 32bcb13f..fb7df277 100644 --- a/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/LookupBatch.java +++ b/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/LookupBatch.java @@ -19,7 +19,6 @@ import com.alibaba.fluss.annotation.Internal; import com.alibaba.fluss.exception.FlussRuntimeException; import com.alibaba.fluss.metadata.TableBucket; -import com.alibaba.fluss.rpc.messages.PbValue; import java.util.ArrayList; import java.util.List; @@ -51,21 +50,21 @@ public TableBucket tableBucket() { } /** Complete the lookup operations using given values . */ - public void complete(List pbValues) { + public void complete(List> values) { // if the size of return values of lookup operation are not equal to the number of lookups, // should complete an exception. - if (pbValues.size() != lookups.size()) { + if (values.size() != lookups.size()) { completeExceptionally( new FlussRuntimeException( String.format( "The number of return values of lookup operation is not equal to the number of " + "lookups. Return %d values, but expected %d.", - pbValues.size(), lookups.size()))); + values.size(), lookups.size()))); } else { - for (int i = 0; i < pbValues.size(); i++) { - Lookup lookup = lookups.get(i); - PbValue pbValue = pbValues.get(i); - lookup.future().complete(pbValue.hasValues() ? pbValue.getValues() : null); + for (int i = 0; i < values.size(); i++) { + AbstractLookup lookup = lookups.get(i); + // single value. + lookup.future().complete(values.get(i)); } } } diff --git a/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/LookupClient.java b/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/LookupClient.java index 836a4d98..ea7e4da8 100644 --- a/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/LookupClient.java +++ b/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/LookupClient.java @@ -29,6 +29,7 @@ import javax.annotation.concurrent.ThreadSafe; import java.time.Duration; +import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -57,10 +58,12 @@ public class LookupClient { private final ExecutorService lookupSenderThreadPool; private final LookupSender lookupSender; + private final MetadataUpdater metadataUpdater; public LookupClient(Configuration conf, MetadataUpdater metadataUpdater) { this.lookupQueue = new LookupQueue(conf); this.lookupSenderThreadPool = createThreadPool(); + this.metadataUpdater = metadataUpdater; this.lookupSender = new LookupSender( metadataUpdater, @@ -75,12 +78,23 @@ private ExecutorService createThreadPool() { return Executors.newFixedThreadPool(1, new ExecutorThreadFactory(LOOKUP_THREAD_PREFIX)); } - public CompletableFuture lookup(TableBucket tableBucket, byte[] keyBytes) { + public CompletableFuture> lookup(TableBucket tableBucket, byte[] keyBytes) { Lookup lookup = new Lookup(tableBucket, keyBytes); lookupQueue.appendLookup(lookup); return lookup.future(); } + public CompletableFuture> indexLookup( + long tableId, int bucketId, String indexName, byte[] keyBytes) { + // TODO index lookup support partition table. + + // TODO leader and buckets may change during the index lookup. need do retry send. + int leader = metadataUpdater.leaderFor(new TableBucket(tableId, bucketId)); + IndexLookup indexLookup = new IndexLookup(leader, tableId, bucketId, indexName, keyBytes); + lookupQueue.appendLookup(indexLookup); + return indexLookup.future(); + } + public void close(Duration timeout) { LOG.info("Closing lookup client and lookup sender."); diff --git a/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/LookupQueue.java b/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/LookupQueue.java index fec36a1c..3b9c0f24 100644 --- a/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/LookupQueue.java +++ b/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/LookupQueue.java @@ -26,33 +26,55 @@ import java.util.List; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; /** * A queue that buffers the pending lookup operations and provides a list of {@link Lookup} when - * call method {@link #drain()}. + * call method {@link #drain(LookupType)}. */ @ThreadSafe @Internal class LookupQueue { private volatile boolean closed; - private final ArrayBlockingQueue lookupQueue; + private final ArrayBlockingQueue lookupQueue; + private final ArrayBlockingQueue indexLookupQueue; + // next drain queue, 0 or 1, 0 means lookupQueue, 1 means indexLookupQueue. + private final AtomicInteger nextDrainQueue; private final int maxBatchSize; + private final long batchTimeoutMs; LookupQueue(Configuration conf) { this.lookupQueue = new ArrayBlockingQueue<>(conf.get(ConfigOptions.CLIENT_LOOKUP_QUEUE_SIZE)); + this.indexLookupQueue = + new ArrayBlockingQueue<>(conf.get(ConfigOptions.CLIENT_LOOKUP_QUEUE_SIZE)); + this.nextDrainQueue = new AtomicInteger(0); this.maxBatchSize = conf.get(ConfigOptions.CLIENT_LOOKUP_MAX_BATCH_SIZE); + this.batchTimeoutMs = conf.get(ConfigOptions.CLIENT_LOOKUP_BATCH_TIMEOUT).toMillis(); this.closed = false; } - void appendLookup(Lookup lookup) { + LookupType nexDrainLookupType() { + int drainQueueId = nextDrainQueue.updateAndGet(i -> i == 0 ? 1 : 0); + if (drainQueueId == 0) { + return LookupType.LOOKUP; + } else { + return LookupType.INDEX_LOOKUP; + } + } + + void appendLookup(AbstractLookup lookup) { if (closed) { throw new IllegalStateException( "Can not append lookup operation since the LookupQueue is closed."); } try { - lookupQueue.put(lookup); + if (lookup.lookupType() == LookupType.LOOKUP) { + lookupQueue.put(lookup); + } else { + indexLookupQueue.put(lookup); + } } catch (InterruptedException e) { lookup.future().completeExceptionally(e); } @@ -63,21 +85,43 @@ boolean hasUnDrained() { } /** Drain a batch of {@link Lookup}s from the lookup queue. */ - List drain() throws Exception { - List lookups = new ArrayList<>(maxBatchSize); - Lookup firstLookup = lookupQueue.poll(300, TimeUnit.MILLISECONDS); - if (firstLookup != null) { - lookups.add(firstLookup); - lookupQueue.drainTo(lookups, maxBatchSize - 1); + List drain(LookupType lookupType) throws Exception { + long start = System.currentTimeMillis(); + List lookupOperations = new ArrayList<>(maxBatchSize); + ArrayBlockingQueue queue = getQueue(lookupType); + int count = 0; + while (true) { + if (System.currentTimeMillis() - start > batchTimeoutMs) { + break; + } + + AbstractLookup lookup = queue.poll(300, TimeUnit.MILLISECONDS); + if (lookup == null) { + break; + } + count++; + lookupOperations.add(lookup); + if (count >= maxBatchSize) { + break; + } } - return lookups; + return lookupOperations; } /** Drain all the {@link Lookup}s from the lookup queue. */ - List drainAll() { - List lookups = new ArrayList<>(lookupQueue.size()); - lookupQueue.drainTo(lookups); - return lookups; + List drainAll(LookupType lookupType) { + ArrayBlockingQueue queue = getQueue(lookupType); + List lookupOperations = new ArrayList<>(queue.size()); + queue.drainTo(lookupOperations); + return lookupOperations; + } + + private ArrayBlockingQueue getQueue(LookupType lookupType) { + if (lookupType == LookupType.LOOKUP) { + return this.lookupQueue; + } else { + return this.indexLookupQueue; + } } public void close() { diff --git a/fluss-client/src/main/java/com/alibaba/fluss/client/table/LookupResult.java b/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/LookupResult.java similarity index 72% rename from fluss-client/src/main/java/com/alibaba/fluss/client/table/LookupResult.java rename to fluss-client/src/main/java/com/alibaba/fluss/client/lookup/LookupResult.java index 9d8bee92..d3c55437 100644 --- a/fluss-client/src/main/java/com/alibaba/fluss/client/table/LookupResult.java +++ b/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/LookupResult.java @@ -14,14 +14,13 @@ * limitations under the License. */ -package com.alibaba.fluss.client.table; +package com.alibaba.fluss.client.lookup; import com.alibaba.fluss.annotation.PublicEvolving; +import com.alibaba.fluss.client.table.Table; import com.alibaba.fluss.row.InternalRow; -import javax.annotation.Nullable; - -import java.util.Objects; +import java.util.List; /** * The result of {@link Table#lookup(InternalRow)}. @@ -30,14 +29,14 @@ */ @PublicEvolving public final class LookupResult { - private final @Nullable InternalRow row; + private final List rowList; - public LookupResult(@Nullable InternalRow row) { - this.row = row; + public LookupResult(List rowList) { + this.rowList = rowList; } - public @Nullable InternalRow getRow() { - return row; + public List getRowList() { + return rowList; } @Override @@ -50,16 +49,16 @@ public boolean equals(Object o) { } LookupResult lookupResult = (LookupResult) o; - return Objects.equals(row, lookupResult.row); + return rowList.equals(lookupResult.rowList); } @Override public int hashCode() { - return Objects.hash(row); + return rowList.hashCode(); } @Override public String toString() { - return "LookupResult{row=" + row + '}'; + return "LookupResult{" + "rowList=" + rowList + '}'; } } diff --git a/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/LookupSender.java b/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/LookupSender.java index 87021397..b1e312dd 100644 --- a/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/LookupSender.java +++ b/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/LookupSender.java @@ -21,21 +21,27 @@ import com.alibaba.fluss.exception.FlussRuntimeException; import com.alibaba.fluss.metadata.TableBucket; import com.alibaba.fluss.rpc.gateway.TabletServerGateway; +import com.alibaba.fluss.rpc.messages.IndexLookupRequest; +import com.alibaba.fluss.rpc.messages.IndexLookupResponse; import com.alibaba.fluss.rpc.messages.LookupRequest; import com.alibaba.fluss.rpc.messages.LookupResponse; +import com.alibaba.fluss.rpc.messages.PbIndexLookupRespForBucket; +import com.alibaba.fluss.rpc.messages.PbIndexLookupRespForKey; import com.alibaba.fluss.rpc.messages.PbLookupRespForBucket; -import com.alibaba.fluss.rpc.messages.PbValue; import com.alibaba.fluss.rpc.protocol.ApiError; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.Semaphore; +import java.util.stream.Collectors; +import static com.alibaba.fluss.client.utils.ClientRpcMessageUtils.makeIndexLookupRequest; import static com.alibaba.fluss.client.utils.ClientRpcMessageUtils.makeLookupRequest; /** @@ -96,57 +102,83 @@ public void run() { /** Run a single iteration of sending. */ private void runOnce(boolean drainAll) throws Exception { - List lookups = drainAll ? lookupQueue.drainAll() : lookupQueue.drain(); - sendLookups(lookups); + if (drainAll) { + sendLookups(LookupType.LOOKUP, lookupQueue.drainAll(LookupType.LOOKUP)); + sendLookups(LookupType.INDEX_LOOKUP, lookupQueue.drainAll(LookupType.INDEX_LOOKUP)); + } else { + LookupType lookupType = lookupQueue.nexDrainLookupType(); + sendLookups(lookupType, lookupQueue.drain(lookupType)); + } } - private void sendLookups(List lookups) { + private void sendLookups(LookupType lookupType, List lookups) { if (lookups.isEmpty()) { return; } // group by to lookup batches - Map> lookupBatches = groupByLeader(lookups); + Map> lookupBatches = groupByLeader(lookups); // now, send the batches - lookupBatches.forEach(this::sendLookups); + lookupBatches.forEach((destination, batch) -> sendLookups(destination, lookupType, batch)); } - private Map> groupByLeader(List lookups) { + private Map> groupByLeader(List lookups) { // leader -> lookup batches - Map> lookupBatchesByLeader = new HashMap<>(); - for (Lookup lookup : lookups) { - // get the leader node - TableBucket tb = lookup.tableBucket(); - + Map> lookupBatchesByLeader = new HashMap<>(); + for (AbstractLookup abstractLookup : lookups) { int leader; - try { - // TODO this can be a re-triable operation. We should retry here instead of throwing - // exception. - leader = metadataUpdater.leaderFor(tb); - } catch (Exception e) { - lookup.future().completeExceptionally(e); - continue; + if (abstractLookup instanceof Lookup) { + Lookup lookup = (Lookup) abstractLookup; + // lookup the leader node + TableBucket tb = lookup.tableBucket(); + try { + // TODO this can be a re-triable operation. We should retry here instead of + // throwing exception. + leader = metadataUpdater.leaderFor(tb); + } catch (Exception e) { + lookup.future().completeExceptionally(e); + continue; + } + } else if (abstractLookup instanceof IndexLookup) { + IndexLookup indexLookup = (IndexLookup) abstractLookup; + leader = indexLookup.destination(); + } else { + throw new IllegalArgumentException("Unsupported lookup type: " + abstractLookup); } - - lookupBatchesByLeader.computeIfAbsent(leader, k -> new ArrayList<>()).add(lookup); + lookupBatchesByLeader + .computeIfAbsent(leader, k -> new ArrayList<>()) + .add(abstractLookup); } return lookupBatchesByLeader; } - private void sendLookups(int destination, List lookupBatches) { + private void sendLookups( + int destination, LookupType lookupType, List lookupBatches) { TabletServerGateway gateway = metadataUpdater.newTabletServerClientForNode(destination); + if (lookupType == LookupType.LOOKUP) { + sendLookupRequest(gateway, lookupBatches); + } else if (lookupType == LookupType.INDEX_LOOKUP) { + sendIndexLookupRequest(gateway, lookupBatches); + } else { + throw new IllegalArgumentException("Unsupported lookup type: " + lookupType); + } + } + + private void sendLookupRequest( + TabletServerGateway gateway, List lookupBatches) { // table id -> (bucket -> lookups) - Map> lookupsByTableId = new HashMap<>(); - for (Lookup lookup : lookupBatches) { + Map> lookupByTableId = new HashMap<>(); + for (AbstractLookup abstractLookup : lookupBatches) { + Lookup lookup = (Lookup) abstractLookup; TableBucket tb = lookup.tableBucket(); long tableId = tb.getTableId(); - lookupsByTableId + lookupByTableId .computeIfAbsent(tableId, k -> new HashMap<>()) .computeIfAbsent(tb, k -> new LookupBatch(tb)) .addLookup(lookup); } - lookupsByTableId.forEach( + lookupByTableId.forEach( (tableId, lookupsByBucket) -> sendLookupRequestAndHandleResponse( gateway, @@ -155,6 +187,29 @@ private void sendLookups(int destination, List lookupBatches) { lookupsByBucket)); } + private void sendIndexLookupRequest( + TabletServerGateway gateway, List indexLookupBatches) { + // table id -> (bucket -> lookups) + Map> lookupByTableId = new HashMap<>(); + for (AbstractLookup abstractLookup : indexLookupBatches) { + IndexLookup indexLookup = (IndexLookup) abstractLookup; + TableBucket tb = new TableBucket(indexLookup.tableId(), indexLookup.bucketId()); + long tableId = tb.getTableId(); + lookupByTableId + .computeIfAbsent(tableId, k -> new HashMap<>()) + .computeIfAbsent(tb, k -> new IndexLookupBatch(tb)) + .addLookup(indexLookup); + } + + lookupByTableId.forEach( + (tableId, indexLookupBatch) -> + sendIndexLookupRequestAndHandleResponse( + gateway, + makeIndexLookupRequest(tableId, indexLookupBatch.values()), + tableId, + indexLookupBatch)); + } + private void sendLookupRequestAndHandleResponse( TabletServerGateway gateway, LookupRequest lookupRequest, @@ -186,6 +241,38 @@ private void sendLookupRequestAndHandleResponse( }); } + private void sendIndexLookupRequestAndHandleResponse( + TabletServerGateway gateway, + IndexLookupRequest indexLookupRequest, + long tableId, + Map lookupsByBucket) { + try { + maxInFlightReuqestsSemaphore.acquire(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new FlussRuntimeException("interrupted:", e); + } + gateway.indexLookup(indexLookupRequest) + .thenAccept( + indexLookupResponse -> { + try { + handleIndexLookupResponse( + tableId, indexLookupResponse, lookupsByBucket); + } finally { + maxInFlightReuqestsSemaphore.release(); + } + }) + .exceptionally( + e -> { + try { + handleIndexLookupException(e, lookupsByBucket); + return null; + } finally { + maxInFlightReuqestsSemaphore.release(); + } + }); + } + private void handleLookupResponse( long tableId, LookupResponse lookupResponse, @@ -208,9 +295,46 @@ private void handleLookupResponse( error.formatErrMsg()); lookupBatch.completeExceptionally(error.exception()); } else { - List pbValues = pbLookupRespForBucket.getValuesList(); - lookupBatch.complete(pbValues); + List> byteValues = + pbLookupRespForBucket.getValuesList().stream() + .map( + pbValue -> { + if (pbValue.hasValues()) { + return Collections.singletonList( + pbValue.getValues()); + } else { + return null; + } + }) + .collect(Collectors.toList()); + lookupBatch.complete(byteValues); + } + } + } + + private void handleIndexLookupResponse( + long tableId, + IndexLookupResponse indexLookupResponse, + Map indexLookupsByBucket) { + for (PbIndexLookupRespForBucket respForBucket : indexLookupResponse.getBucketsRespsList()) { + TableBucket tableBucket = + new TableBucket( + tableId, + respForBucket.hasPartitionId() ? respForBucket.getPartitionId() : null, + respForBucket.getBucketId()); + + // TODO If error, we need to retry send the request instead of throw exception. + IndexLookupBatch indexLookupBatch = indexLookupsByBucket.get(tableBucket); + List> result = new ArrayList<>(); + for (int i = 0; i < respForBucket.getKeysRespsCount(); i++) { + PbIndexLookupRespForKey respForKey = respForBucket.getKeysRespAt(i); + List keyResult = new ArrayList<>(); + for (int j = 0; j < respForKey.getValuesCount(); j++) { + keyResult.add(respForKey.getValueAt(j)); + } + result.add(keyResult); } + indexLookupBatch.complete(result); } } @@ -227,6 +351,19 @@ private void handleLookupRequestException( } } + private void handleIndexLookupException( + Throwable t, Map lookupsByBucket) { + ApiError error = ApiError.fromThrowable(t); + for (IndexLookupBatch lookupBatch : lookupsByBucket.values()) { + // TODO If error, we need to retry send the request instead of throw exception. + LOG.warn( + "Get error index lookup response on table bucket {}, fail. Error: {}", + lookupBatch.tableBucket(), + error.formatErrMsg()); + lookupBatch.completeExceptionally(error.exception()); + } + } + void forceClose() { forceClose = true; initiateClose(); diff --git a/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/LookupType.java b/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/LookupType.java new file mode 100644 index 00000000..2e92d363 --- /dev/null +++ b/fluss-client/src/main/java/com/alibaba/fluss/client/lookup/LookupType.java @@ -0,0 +1,26 @@ +/* + * Copyright (c) 2024 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.client.lookup; + +import com.alibaba.fluss.annotation.Internal; + +/** Enum to represent the type of lookup operation. */ +@Internal +enum LookupType { + LOOKUP, + INDEX_LOOKUP; +} diff --git a/fluss-client/src/main/java/com/alibaba/fluss/client/table/FlussTable.java b/fluss-client/src/main/java/com/alibaba/fluss/client/table/FlussTable.java index 54dda698..3edd311d 100644 --- a/fluss-client/src/main/java/com/alibaba/fluss/client/table/FlussTable.java +++ b/fluss-client/src/main/java/com/alibaba/fluss/client/table/FlussTable.java @@ -19,6 +19,7 @@ import com.alibaba.fluss.annotation.PublicEvolving; import com.alibaba.fluss.client.lakehouse.LakeTableBucketAssigner; import com.alibaba.fluss.client.lookup.LookupClient; +import com.alibaba.fluss.client.lookup.LookupResult; import com.alibaba.fluss.client.metadata.MetadataUpdater; import com.alibaba.fluss.client.scanner.RemoteFileDownloader; import com.alibaba.fluss.client.scanner.ScanRecord; @@ -81,6 +82,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Supplier; @@ -103,15 +105,18 @@ public class FlussTable implements Table { private final TableInfo tableInfo; private final boolean hasPrimaryKey; private final int numBuckets; + private final @Nullable int[] indexKeyIndices; private final RowType keyRowType; - // encode the key bytes for kv lookups - private final KeyEncoder keyEncoder; // decode the lookup bytes to result row private final ValueDecoder kvValueDecoder; // a getter to extract partition from key row, null when it's not a partitioned primary key // table private final @Nullable PartitionGetter keyRowPartitionGetter; + private final KeyEncoder indexKeyEncoder; + private final KeyEncoder primaryKeyEncoder; + private final KeyEncoder lookupBucketKeyEncoder; + private final Supplier writerSupplier; private final Supplier lookupClientSupplier; private final AtomicBoolean closed; @@ -148,9 +153,6 @@ public FlussTable( this.hasPrimaryKey = tableDescriptor.hasPrimaryKey(); this.numBuckets = metadataUpdater.getBucketCount(tablePath); this.keyRowType = getKeyRowType(schema); - this.keyEncoder = - KeyEncoder.createKeyEncoder( - keyRowType, keyRowType.getFieldNames(), tableDescriptor.getPartitionKeys()); this.keyRowPartitionGetter = tableDescriptor.isPartitioned() && tableDescriptor.hasPrimaryKey() ? new PartitionGetter(keyRowType, tableDescriptor.getPartitionKeys()) @@ -161,6 +163,26 @@ public FlussTable( RowDecoder.create( tableDescriptor.getKvFormat(), schema.toRowType().getChildren().toArray(new DataType[0]))); + + this.primaryKeyEncoder = new KeyEncoder(keyRowType); + Map indexKeyIndexes = tableDescriptor.getIndexKeyIndexes(); + if (indexKeyIndexes.size() > 1) { + throw new FlussRuntimeException( + String.format("table %s only support one index key", tablePath)); + } else if (indexKeyIndexes.size() == 1) { + this.indexKeyIndices = new ArrayList<>(indexKeyIndexes.values()).get(0); + this.indexKeyEncoder = new KeyEncoder(getIndexRowType(schema, indexKeyIndices)); + int length = indexKeyIndices.length; + int[] lookupBucketKeyIndices = new int[length]; + for (int i = 0; i < length; i++) { + lookupBucketKeyIndices[i] = i; + } + this.lookupBucketKeyEncoder = new KeyEncoder(keyRowType, lookupBucketKeyIndices); + } else { + this.indexKeyIndices = null; + this.indexKeyEncoder = null; + this.lookupBucketKeyEncoder = primaryKeyEncoder; + } } @Override @@ -176,20 +198,48 @@ public CompletableFuture lookup(InternalRow key) { } // encoding the key row using a compacted way consisted with how the key is encoded when put // a row - byte[] keyBytes = keyEncoder.encode(key); + byte[] keyBytes = primaryKeyEncoder.encode(key); + byte[] lookupBucketKeyBytes = lookupBucketKeyEncoder.encode(key); Long partitionId = keyRowPartitionGetter == null ? null : getPartitionId(key); - int bucketId = getBucketId(keyBytes, key); + int bucketId = getBucketId(lookupBucketKeyBytes, key); TableBucket tableBucket = new TableBucket(tableId, partitionId, bucketId); return lookupClientSupplier .get() .lookup(tableBucket, keyBytes) .thenApply( valueBytes -> { - InternalRow row = + List rowList = valueBytes == null - ? null - : kvValueDecoder.decodeValue(valueBytes).row; - return new LookupResult(row); + ? Collections.emptyList() + : Collections.singletonList( + kvValueDecoder.decodeValue(valueBytes.get(0)) + .row); + return new LookupResult(rowList); + }); + } + + @Override + public CompletableFuture indexLookup(String indexName, InternalRow indexKey) { + if (!hasPrimaryKey) { + throw new FlussRuntimeException( + String.format("none-pk table %s not support lookup", tablePath)); + } + + byte[] indexKeyBytes = indexKeyEncoder.encode(indexKey); + int bucketId = HashBucketAssigner.bucketForRowKey(indexKeyBytes, numBuckets); + return lookupClientSupplier + .get() + .indexLookup(tableId, bucketId, indexName, indexKeyBytes) + .thenApply( + result -> { + List rowList = new ArrayList<>(); + for (byte[] valueBytes : result) { + rowList.add( + valueBytes == null + ? null + : kvValueDecoder.decodeValue(valueBytes).row); + } + return new LookupResult(rowList); }); } @@ -346,6 +396,15 @@ private RowType getKeyRowType(Schema schema) { return new RowType(keyRowFields); } + private RowType getIndexRowType(Schema schema, int[] indexIndexes) { + List keyRowFields = new ArrayList<>(indexIndexes.length); + List rowFields = schema.toRowType().getFields(); + for (int index : indexIndexes) { + keyRowFields.add(rowFields.get(index)); + } + return new RowType(keyRowFields); + } + @Override public AppendWriter getAppendWriter() { if (hasPrimaryKey) { @@ -367,7 +426,8 @@ public UpsertWriter getUpsertWriter(UpsertWrite upsertWrite) { tableInfo.getTableDescriptor(), upsertWrite, writerSupplier.get(), - metadataUpdater); + metadataUpdater, + indexKeyIndices); } @Override diff --git a/fluss-client/src/main/java/com/alibaba/fluss/client/table/Table.java b/fluss-client/src/main/java/com/alibaba/fluss/client/table/Table.java index b2373f7b..1712fd1d 100644 --- a/fluss-client/src/main/java/com/alibaba/fluss/client/table/Table.java +++ b/fluss-client/src/main/java/com/alibaba/fluss/client/table/Table.java @@ -18,6 +18,7 @@ import com.alibaba.fluss.annotation.PublicEvolving; import com.alibaba.fluss.client.Connection; +import com.alibaba.fluss.client.lookup.LookupResult; import com.alibaba.fluss.client.scanner.ScanRecord; import com.alibaba.fluss.client.scanner.log.LogScan; import com.alibaba.fluss.client.scanner.log.LogScanner; @@ -63,6 +64,21 @@ public interface Table extends AutoCloseable { */ CompletableFuture lookup(InternalRow key); + /** + * Lookup certain rows from the given table by index key. + * + *

Only available for Primary Key Table. Will throw exception when the table isn't a Primary + * Key Table. + * + *

The index need to be created while creating the table. If this table doesn't have an index + * belong to the given index key, the exception will throw. + * + * @param indexName the given table index name. + * @param indexKey the given table index key. + * @return the result of index lookup. + */ + CompletableFuture indexLookup(String indexName, InternalRow indexKey); + /** * Extracts limit number of rows from the given table bucket. * diff --git a/fluss-client/src/main/java/com/alibaba/fluss/client/table/writer/UpsertWriter.java b/fluss-client/src/main/java/com/alibaba/fluss/client/table/writer/UpsertWriter.java index 01bcdfe7..9f4492ac 100644 --- a/fluss-client/src/main/java/com/alibaba/fluss/client/table/writer/UpsertWriter.java +++ b/fluss-client/src/main/java/com/alibaba/fluss/client/table/writer/UpsertWriter.java @@ -42,6 +42,7 @@ public class UpsertWriter extends TableWriter { private final KeyEncoder keyEncoder; + private final KeyEncoder bucketKeyEncoder; private final @Nullable int[] targetColumns; public UpsertWriter( @@ -49,7 +50,8 @@ public UpsertWriter( TableDescriptor tableDescriptor, UpsertWrite upsertWrite, WriterClient writerClient, - MetadataUpdater metadataUpdater) { + MetadataUpdater metadataUpdater, + @Nullable int[] indexKeyIndex) { super(tablePath, tableDescriptor, metadataUpdater, writerClient); Schema schema = tableDescriptor.getSchema(); sanityCheck(schema, upsertWrite.getPartialUpdateColumns()); @@ -61,6 +63,12 @@ public UpsertWriter( schema.toRowType(), schema.getPrimaryKey().get().getColumnNames(), tableDescriptor.getPartitionKeys()); + + if (indexKeyIndex == null) { + this.bucketKeyEncoder = keyEncoder; + } else { + this.bucketKeyEncoder = new KeyEncoder(schema.toRowType(), indexKeyIndex); + } } private static void sanityCheck(Schema schema, @Nullable int[] targetColumns) { @@ -111,8 +119,10 @@ private static void sanityCheck(Schema schema, @Nullable int[] targetColumns) { */ public CompletableFuture upsert(InternalRow row) { byte[] key = keyEncoder.encode(row); + byte[] bucketKey = bucketKeyEncoder.encode(row); return send( - new WriteRecord(getPhysicalPath(row), WriteKind.PUT, key, key, row, targetColumns)); + new WriteRecord( + getPhysicalPath(row), WriteKind.PUT, key, bucketKey, row, targetColumns)); } /** @@ -124,8 +134,14 @@ public CompletableFuture upsert(InternalRow row) { */ public CompletableFuture delete(InternalRow row) { byte[] key = keyEncoder.encode(row); + byte[] bucketKey = bucketKeyEncoder.encode(row); return send( new WriteRecord( - getPhysicalPath(row), WriteKind.DELETE, key, key, null, targetColumns)); + getPhysicalPath(row), + WriteKind.DELETE, + key, + bucketKey, + null, + targetColumns)); } } diff --git a/fluss-client/src/main/java/com/alibaba/fluss/client/utils/ClientRpcMessageUtils.java b/fluss-client/src/main/java/com/alibaba/fluss/client/utils/ClientRpcMessageUtils.java index cadccb37..226bccdb 100644 --- a/fluss-client/src/main/java/com/alibaba/fluss/client/utils/ClientRpcMessageUtils.java +++ b/fluss-client/src/main/java/com/alibaba/fluss/client/utils/ClientRpcMessageUtils.java @@ -17,6 +17,7 @@ package com.alibaba.fluss.client.utils; import com.alibaba.fluss.client.admin.OffsetSpec; +import com.alibaba.fluss.client.lookup.IndexLookupBatch; import com.alibaba.fluss.client.lookup.LookupBatch; import com.alibaba.fluss.client.table.lake.LakeTableSnapshotInfo; import com.alibaba.fluss.client.table.snapshot.BucketSnapshotInfo; @@ -43,12 +44,14 @@ import com.alibaba.fluss.rpc.messages.GetKvSnapshotResponse; import com.alibaba.fluss.rpc.messages.GetLakeTableSnapshotResponse; import com.alibaba.fluss.rpc.messages.GetPartitionSnapshotResponse; +import com.alibaba.fluss.rpc.messages.IndexLookupRequest; import com.alibaba.fluss.rpc.messages.ListOffsetsRequest; import com.alibaba.fluss.rpc.messages.ListPartitionInfosResponse; import com.alibaba.fluss.rpc.messages.LookupRequest; import com.alibaba.fluss.rpc.messages.MetadataRequest; import com.alibaba.fluss.rpc.messages.PbBucketSnapshot; import com.alibaba.fluss.rpc.messages.PbFetchLogRespForBucket; +import com.alibaba.fluss.rpc.messages.PbIndexLookupReqForBucket; import com.alibaba.fluss.rpc.messages.PbKeyValue; import com.alibaba.fluss.rpc.messages.PbLakeSnapshotForBucket; import com.alibaba.fluss.rpc.messages.PbLakeStorageInfo; @@ -185,6 +188,22 @@ public static LookupRequest makeLookupRequest( return request; } + public static IndexLookupRequest makeIndexLookupRequest( + long tableId, Collection lookupBatches) { + IndexLookupRequest request = new IndexLookupRequest().setTableId(tableId); + lookupBatches.forEach( + (batch) -> { + TableBucket tb = batch.tableBucket(); + PbIndexLookupReqForBucket pbIndexLookupReqForBucket = + request.addBucketsReq().setBucketId(tb.getBucket()); + if (tb.getPartitionId() != null) { + pbIndexLookupReqForBucket.setPartitionId(tb.getPartitionId()); + } + batch.lookups().forEach(get -> pbIndexLookupReqForBucket.addKey(get.key())); + }); + return request; + } + public static FetchLogResultForBucket getFetchLogResultForBucket( TableBucket tb, TablePath tp, PbFetchLogRespForBucket respForBucket) { FetchLogResultForBucket fetchLogResultForBucket; diff --git a/fluss-client/src/test/java/com/alibaba/fluss/client/admin/FlussAdminITCase.java b/fluss-client/src/test/java/com/alibaba/fluss/client/admin/FlussAdminITCase.java index f957e55c..e0d639c4 100644 --- a/fluss-client/src/test/java/com/alibaba/fluss/client/admin/FlussAdminITCase.java +++ b/fluss-client/src/test/java/com/alibaba/fluss/client/admin/FlussAdminITCase.java @@ -28,6 +28,7 @@ import com.alibaba.fluss.exception.DatabaseNotExistException; import com.alibaba.fluss.exception.InvalidConfigException; import com.alibaba.fluss.exception.InvalidDatabaseException; +import com.alibaba.fluss.exception.InvalidIndexKeysException; import com.alibaba.fluss.exception.InvalidReplicationFactorException; import com.alibaba.fluss.exception.InvalidTableException; import com.alibaba.fluss.exception.SchemaNotExistException; @@ -198,6 +199,35 @@ void testCreateTableWithInvalidProperty() { .cause() .isInstanceOf(InvalidConfigException.class) .hasMessage("'table.log.tiered.local-segments' must be greater than 0."); + + TableDescriptor t4 = + TableDescriptor.builder() + .schema(DEFAULT_SCHEMA) + .comment("test table") + // invalid property value + .property("table.index.key", "") + .build(); + // should throw exception + assertThatThrownBy(() -> admin.createTable(tablePath, t4, false).get()) + .cause() + .isInstanceOf(InvalidIndexKeysException.class) + .hasMessageContaining( + "Option 'table.index.key' = '' is invalid: " + + "There is an index key not follow the format 'indexKeyName=indexKeyFields'"); + TableDescriptor t5 = + TableDescriptor.builder() + .schema(DEFAULT_SCHEMA) + .comment("test table") + // invalid property value + .property("table.index.key", "idx0=a;idx1=b") + .build(); + // should throw exception + assertThatThrownBy(() -> admin.createTable(tablePath, t5, false).get()) + .cause() + .isInstanceOf(InvalidIndexKeysException.class) + .hasMessageContaining( + "Option 'table.index.key' = 'idx0=a;idx1=b' is invalid: Currently, " + + "Fluss only support to define single index key, but there is more than one index key."); } @Test diff --git a/fluss-client/src/test/java/com/alibaba/fluss/client/table/FlussPartitionedTableITCase.java b/fluss-client/src/test/java/com/alibaba/fluss/client/table/FlussPartitionedTableITCase.java index 09e16589..b6b4324f 100644 --- a/fluss-client/src/test/java/com/alibaba/fluss/client/table/FlussPartitionedTableITCase.java +++ b/fluss-client/src/test/java/com/alibaba/fluss/client/table/FlussPartitionedTableITCase.java @@ -83,7 +83,8 @@ void testPartitionedPrimaryKeyTable() throws Exception { InternalRow lookupRow = table.lookup(keyRow(schema, new Object[] {i, null, partition})) .get() - .getRow(); + .getRowList() + .get(0); assertThat(lookupRow).isEqualTo(actualRow); } } diff --git a/fluss-client/src/test/java/com/alibaba/fluss/client/table/FlussTableITCase.java b/fluss-client/src/test/java/com/alibaba/fluss/client/table/FlussTableITCase.java index 9d26a9b3..3d39b8d9 100644 --- a/fluss-client/src/test/java/com/alibaba/fluss/client/table/FlussTableITCase.java +++ b/fluss-client/src/test/java/com/alibaba/fluss/client/table/FlussTableITCase.java @@ -19,6 +19,7 @@ import com.alibaba.fluss.client.Connection; import com.alibaba.fluss.client.ConnectionFactory; import com.alibaba.fluss.client.admin.ClientToServerITCaseBase; +import com.alibaba.fluss.client.lookup.LookupResult; import com.alibaba.fluss.client.scanner.ScanRecord; import com.alibaba.fluss.client.scanner.log.LogScanner; import com.alibaba.fluss.client.scanner.log.ScanRecords; @@ -58,6 +59,7 @@ import java.util.ArrayList; import java.util.Iterator; import java.util.List; +import java.util.concurrent.CompletableFuture; import java.util.stream.Collectors; import static com.alibaba.fluss.record.TestData.DATA1_ROW_TYPE; @@ -215,6 +217,66 @@ void testPutAndLookup() throws Exception { verifyPutAndLookup(table2, schema, new Object[] {"a", 1}); } + @Test + void testPutAndIndexLookup() throws Exception { + TablePath tablePath = TablePath.of("test_db_1", "test_put_and_index_lookup_table"); + Schema schema = + Schema.newBuilder() + .column("a", DataTypes.INT()) + .column("b", DataTypes.STRING()) + .column("c", DataTypes.BIGINT()) + .column("d", DataTypes.STRING()) + .primaryKey("a", "b", "c") + .build(); + TableDescriptor descriptor = + TableDescriptor.builder() + .schema(schema) + .property("table.index.key", "idx0=a,b") + .build(); + createTable(tablePath, descriptor, false); + Table table = conn.getTable(tablePath); + verifyPutAndLookup(table, schema, new Object[] {1, "a", 1L, "value1"}); + verifyPutAndLookup(table, schema, new Object[] {1, "a", 2L, "value2"}); + verifyPutAndLookup(table, schema, new Object[] {1, "a", 3L, "value3"}); + verifyPutAndLookup(table, schema, new Object[] {2, "a", 4L, "value4"}); + RowType rowType = schema.toRowType(); + + // test index lookup. + Schema indexKeySchema = + Schema.newBuilder() + .column("a", DataTypes.INT()) + .column("b", DataTypes.STRING()) + .build(); + CompletableFuture result = + table.indexLookup( + "idx0", compactedRow(indexKeySchema.toRowType(), new Object[] {1, "a"})); + LookupResult lookupResult = result.get(); + assertThat(lookupResult).isNotNull(); + List rowList = lookupResult.getRowList(); + assertThat(rowList.size()).isEqualTo(3); + for (int i = 0; i < rowList.size(); i++) { + assertRowValueEquals( + rowType, rowList.get(i), new Object[] {1, "a", i + 1L, "value" + (i + 1)}); + } + + result = + table.indexLookup( + "idx0", compactedRow(indexKeySchema.toRowType(), new Object[] {2, "a"})); + lookupResult = result.get(); + assertThat(lookupResult).isNotNull(); + rowList = lookupResult.getRowList(); + assertThat(rowList.size()).isEqualTo(1); + assertRowValueEquals(rowType, rowList.get(0), new Object[] {2, "a", 4L, "value4"}); + + result = + table.indexLookup( + "idx0", compactedRow(indexKeySchema.toRowType(), new Object[] {3, "a"})); + lookupResult = result.get(); + assertThat(lookupResult).isNotNull(); + rowList = lookupResult.getRowList(); + assertThat(rowList.size()).isEqualTo(0); + } + @Test void testLookupForNotReadyTable() throws Exception { TablePath tablePath = TablePath.of("test_db_1", "test_lookup_unready_table_t1"); @@ -349,7 +411,7 @@ void verifyPutAndLookup(Table table, Schema tableSchema, Object[] fields) throws private InternalRow lookupRow(Table table, IndexedRow keyRow) throws Exception { // lookup this key. - return table.lookup(keyRow).get().getRow(); + return table.lookup(keyRow).get().getRowList().get(0); } @Test diff --git a/fluss-common/src/main/java/com/alibaba/fluss/config/ConfigOptions.java b/fluss-common/src/main/java/com/alibaba/fluss/config/ConfigOptions.java index 12f842a5..bc99173c 100644 --- a/fluss-common/src/main/java/com/alibaba/fluss/config/ConfigOptions.java +++ b/fluss-common/src/main/java/com/alibaba/fluss/config/ConfigOptions.java @@ -717,7 +717,7 @@ public class ConfigOptions { public static final ConfigOption CLIENT_LOOKUP_QUEUE_SIZE = key("client.lookup.queue-size") .intType() - .defaultValue(256) + .defaultValue(25600) .withDescription("The maximum number of pending lookup operations."); public static final ConfigOption CLIENT_LOOKUP_MAX_BATCH_SIZE = @@ -734,6 +734,14 @@ public class ConfigOptions { .withDescription( "The maximum number of unacknowledged lookup requests for lookup operations."); + public static final ConfigOption CLIENT_LOOKUP_BATCH_TIMEOUT = + key("client.lookup.batch-timeout") + .durationType() + .defaultValue(Duration.ofMillis(100)) + .withDescription( + "The maximum time to wait for the lookup batch to full, if this timeout is reached, " + + "the lookup batch will be closed to send."); + public static final ConfigOption CLIENT_SCANNER_REMOTE_LOG_PREFETCH_NUM = key("client.scanner.remote-log.prefetch-num") .intType() @@ -891,6 +899,18 @@ public class ConfigOptions { + "When this option is set to ture and the datalake tiering service is up," + " the table will be tiered and compacted into datalake format stored on lakehouse storage."); + public static final ConfigOption TABLE_INDEX_KEY = + key("table.index.key") + .stringType() + .noDefaultValue() + .withDescription( + "The index key is used to build non-key secondary indexes on a pk table, " + + "enabling fast data queries. You can define multiple index keys, separated by ';'." + + " Each index key can specify a index name, like indexName=indexKeyFields. " + + " indexKeyFields support multiple fields, which can be nullable filed," + + " and fields within a key are separated by ','." + + " For example: 'index1=num_orders;index2=num_orders,total_amount'"); + // ------------------------------------------------------------------------ // ConfigOptions for Kv // ------------------------------------------------------------------------ diff --git a/fluss-common/src/main/java/com/alibaba/fluss/exception/InvalidIndexKeysException.java b/fluss-common/src/main/java/com/alibaba/fluss/exception/InvalidIndexKeysException.java new file mode 100644 index 00000000..3bc38606 --- /dev/null +++ b/fluss-common/src/main/java/com/alibaba/fluss/exception/InvalidIndexKeysException.java @@ -0,0 +1,32 @@ +/* + * Copyright (c) 2024 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.exception; + +import com.alibaba.fluss.annotation.PublicEvolving; + +/** + * Try to build a table with some invalid index keys. like the column in index key not exist or the + * column is repeated, like (a, b) and (b, a). + * + * @since 0.3 + */ +@PublicEvolving +public class InvalidIndexKeysException extends ApiException { + public InvalidIndexKeysException(String message) { + super(message); + } +} diff --git a/fluss-common/src/main/java/com/alibaba/fluss/metadata/TableDescriptor.java b/fluss-common/src/main/java/com/alibaba/fluss/metadata/TableDescriptor.java index d3737b93..c3ae9213 100644 --- a/fluss-common/src/main/java/com/alibaba/fluss/metadata/TableDescriptor.java +++ b/fluss-common/src/main/java/com/alibaba/fluss/metadata/TableDescriptor.java @@ -22,6 +22,7 @@ import com.alibaba.fluss.config.ConfigOptions; import com.alibaba.fluss.config.Configuration; import com.alibaba.fluss.config.ConfigurationUtils; +import com.alibaba.fluss.exception.InvalidIndexKeysException; import com.alibaba.fluss.utils.AutoPartitionStrategy; import com.alibaba.fluss.utils.Preconditions; import com.alibaba.fluss.utils.json.JsonSerdeUtils; @@ -135,6 +136,78 @@ && getLogFormat() != LogFormat.ARROW) { } } + /** Validate the table descriptor. */ + public void validate() throws InvalidIndexKeysException { + validateIndexKeys(); + } + + private void validateIndexKeys() throws InvalidIndexKeysException { + Configuration conf = configuration(); + if (!conf.contains(ConfigOptions.TABLE_INDEX_KEY)) { + return; + } + + String indexKeysString = conf.get(ConfigOptions.TABLE_INDEX_KEY); + String indexKeysError = detectInvalidIndexKeys(indexKeysString); + if (indexKeysError != null) { + throw new InvalidIndexKeysException( + "Option 'table.index.key' = '" + + indexKeysString + + "' is invalid: " + + indexKeysError); + } + } + + private String detectInvalidIndexKeys(String indexKeysString) { + List columnNames = + schema.getColumns().stream() + .map(Schema.Column::getName) + .collect(Collectors.toList()); + String[] indexKeyStringList = indexKeysString.split(";"); + if (indexKeyStringList.length <= 0) { + return "Index key is empty or index key doesn't split by ';'."; + } else if (indexKeyStringList.length > 1) { + return "Currently, Fluss only support to define single index key, but there is more than one index key."; + } + + List orderedIndexKey = new ArrayList<>(); + for (String indexKeyFieldsStr : indexKeyStringList) { + String[] indexNameAndFields = indexKeyFieldsStr.split("="); + if (indexNameAndFields.length != 2) { + return "There is an index key not follow the format 'indexKeyName=indexKeyFields'."; + } + + String[] fieldStrings = indexNameAndFields[1].split(","); + if (fieldStrings.length <= 0) { + return "There is an index key is empty or column field in index key doesn't split by ','."; + } + + List indexKeyPosList = new ArrayList<>(); + for (String field : fieldStrings) { + int fieldIndex = columnNames.indexOf(field); + if (fieldIndex < 0) { + return "Index key '" + field + "' does not exist in the schema."; + } + indexKeyPosList.add(fieldIndex); + } + + Collections.sort(indexKeyPosList); + if (orderedIndexKey.stream() + .anyMatch( + f -> + Arrays.equals( + f, + indexKeyPosList.stream() + .mapToInt(Integer::intValue) + .toArray()))) { + return "There is an index key is duplicate."; + } + orderedIndexKey.add(indexKeyPosList.stream().mapToInt(Integer::intValue).toArray()); + } + + return null; + } + /** Creates a builder for building table descriptor. */ public static Builder builder() { return new Builder(); @@ -239,6 +312,32 @@ public int getTieredLogLocalSegments() { return configuration().get(ConfigOptions.TABLE_TIERED_LOG_LOCAL_SEGMENTS); } + /** Return a list of index key indexes in the table. */ + public Map getIndexKeyIndexes() { + Configuration conf = configuration(); + if (!conf.contains(ConfigOptions.TABLE_INDEX_KEY)) { + return Collections.emptyMap(); + } + + List columnNames = + schema.getColumns().stream() + .map(Schema.Column::getName) + .collect(Collectors.toList()); + + Map indexKeyIndexes = new HashMap<>(); + for (String indexKeyStr : conf.get(ConfigOptions.TABLE_INDEX_KEY).split(";")) { + String[] indexNameAndFields = indexKeyStr.split("="); + String indexKeyName = indexNameAndFields[0]; + List indexKeyList = new ArrayList<>(); + for (String field : indexNameAndFields[1].split(",")) { + indexKeyList.add(columnNames.indexOf(field)); + } + indexKeyIndexes.put( + indexKeyName, indexKeyList.stream().mapToInt(Integer::intValue).toArray()); + } + return indexKeyIndexes; + } + /** Whether the data lake is enabled. */ public boolean isDataLakeEnabled() { return configuration().get(ConfigOptions.TABLE_DATALAKE_ENABLED); diff --git a/fluss-common/src/main/java/com/alibaba/fluss/metrics/MetricNames.java b/fluss-common/src/main/java/com/alibaba/fluss/metrics/MetricNames.java index c92e64df..700d21b5 100644 --- a/fluss-common/src/main/java/com/alibaba/fluss/metrics/MetricNames.java +++ b/fluss-common/src/main/java/com/alibaba/fluss/metrics/MetricNames.java @@ -76,6 +76,10 @@ public class MetricNames { public static final String FAILED_PUT_KV_REQUESTS_RATE = "failedPutKvRequestsPerSecond"; public static final String TOTAL_LIMIT_SCAN_REQUESTS_RATE = "totalLimitScanRequestsPerSecond"; public static final String FAILED_LIMIT_SCAN_REQUESTS_RATE = "failedLimitScanRequestsPerSecond"; + public static final String TOTAL_INDEX_LOOKUP_REQUESTS_RATE = + "totalIndexLookupRequestsPerSecond"; + public static final String FAILED_INDEX_LOOKUP_REQUESTS_RATE = + "failedIndexLookupRequestsPerSecond"; // -------------------------------------------------------------------------------------------- // metrics for table bucket diff --git a/fluss-common/src/main/java/com/alibaba/fluss/row/encode/KeyEncoder.java b/fluss-common/src/main/java/com/alibaba/fluss/row/encode/KeyEncoder.java index eed6b665..227da452 100644 --- a/fluss-common/src/main/java/com/alibaba/fluss/row/encode/KeyEncoder.java +++ b/fluss-common/src/main/java/com/alibaba/fluss/row/encode/KeyEncoder.java @@ -54,7 +54,7 @@ public static KeyEncoder createKeyEncoder( return new KeyEncoder(rowType, encodeColIndexes); } - protected KeyEncoder(RowType rowType) { + public KeyEncoder(RowType rowType) { this(rowType, IntStream.range(0, rowType.getFieldCount()).toArray()); } diff --git a/fluss-common/src/main/java/com/alibaba/fluss/utils/BytesUtils.java b/fluss-common/src/main/java/com/alibaba/fluss/utils/BytesUtils.java index 6a609d86..fa1d6459 100644 --- a/fluss-common/src/main/java/com/alibaba/fluss/utils/BytesUtils.java +++ b/fluss-common/src/main/java/com/alibaba/fluss/utils/BytesUtils.java @@ -58,4 +58,20 @@ public static byte[] toArray(ByteBuffer buffer, int offset, int size) { } return dest; } + + /** + * Check if the given two byte arrays have the same prefix. + * + * @param bytes1 The first byte array + * @param bytes2 The second byte array + * @return true if the given two byte arrays have the same prefix, false otherwise + */ + public static boolean isPrefixEquals(byte[] bytes1, byte[] bytes2) { + for (int i = 0; i < bytes1.length; i++) { + if (bytes1[i] != bytes2[i]) { + return false; + } + } + return true; + } } diff --git a/fluss-common/src/test/java/com/alibaba/fluss/testutils/DataTestUtils.java b/fluss-common/src/test/java/com/alibaba/fluss/testutils/DataTestUtils.java index 8e8c67e6..eab22b9b 100644 --- a/fluss-common/src/test/java/com/alibaba/fluss/testutils/DataTestUtils.java +++ b/fluss-common/src/test/java/com/alibaba/fluss/testutils/DataTestUtils.java @@ -179,23 +179,33 @@ public static MemoryLogRecords genLogRecordsWithBaseOffsetAndTimestamp( public static KvRecordBatch genKvRecordBatch(List> keyAndValues) throws Exception { - return genKvRecordBatchWithWriterId(keyAndValues, NO_WRITER_ID, NO_BATCH_SEQUENCE); + return genKvRecordBatch(DATA1_KEY_TYPE, DATA1_ROW_TYPE, keyAndValues); + } + + public static KvRecordBatch genKvRecordBatch( + RowType keyType, RowType valueType, List> keyAndValues) + throws Exception { + return genKvRecordBatchWithWriterId( + keyAndValues, keyType, valueType, NO_WRITER_ID, NO_BATCH_SEQUENCE); } public static KvRecordBatch genKvRecordBatchWithWriterId( - List> keyAndValues, long writerId, int batchSequence) + List> keyAndValues, + RowType keyType, + RowType valueType, + long writerId, + int batchSequence) throws Exception { - KeyEncoder keyEncoder = new KeyEncoder(DATA1_ROW_TYPE, new int[] {0}); + KeyEncoder keyEncoder = new KeyEncoder(keyType); KvRecordTestUtils.KvRecordBatchFactory kvRecordBatchFactory = KvRecordTestUtils.KvRecordBatchFactory.of(DEFAULT_SCHEMA_ID); KvRecordTestUtils.KvRecordFactory kvRecordFactory = - KvRecordTestUtils.KvRecordFactory.of(DATA1_ROW_TYPE); + KvRecordTestUtils.KvRecordFactory.of(valueType); List records = new ArrayList<>(); for (Tuple2 keyAndValue : keyAndValues) { records.add( kvRecordFactory.ofRecord( - keyEncoder.encode(row(DATA1_KEY_TYPE, keyAndValue.f0)), - keyAndValue.f1)); + keyEncoder.encode(row(keyType, keyAndValue.f0)), keyAndValue.f1)); } return kvRecordBatchFactory.ofRecords(records, writerId, batchSequence); } diff --git a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/FlinkConnectorOptions.java b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/FlinkConnectorOptions.java index 3bc76cd2..1b964dc3 100644 --- a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/FlinkConnectorOptions.java +++ b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/FlinkConnectorOptions.java @@ -69,6 +69,18 @@ public class FlinkConnectorOptions { .defaultValue(true) .withDescription("Whether to set async lookup. Default is true."); + public static final ConfigOption INDEX_LOOKUP_KEY = + ConfigOptions.key("table.index.key") + .stringType() + .noDefaultValue() + .withDescription( + "The index key is used to build non-key secondary indexes on a pk table, " + + "enabling fast data queries. You can define multiple index keys, separated by ';'." + + " Each index key can specify a index name, like indexName=indexKeyFields. " + + " indexKeyFields support multiple fields, which can be nullable filed," + + " and fields within a key are separated by ','." + + " For example: 'index1=num_orders;index2=num_orders,total_amount'"); + // -------------------------------------------------------------------------------------------- // Scan specific options // -------------------------------------------------------------------------------------------- diff --git a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/catalog/FlinkCatalog.java b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/catalog/FlinkCatalog.java index ddbc8a5f..671b8b86 100644 --- a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/catalog/FlinkCatalog.java +++ b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/catalog/FlinkCatalog.java @@ -76,13 +76,13 @@ public class FlinkCatalog implements Catalog { public static final String LAKE_TABLE_SPLITTER = "$lake"; - private final ClassLoader classLoader; + protected final ClassLoader classLoader; - private final String catalogName; - private final @Nullable String defaultDatabase; - private final String bootstrapServers; - private Connection connection; - private Admin admin; + protected final String catalogName; + protected final @Nullable String defaultDatabase; + protected final String bootstrapServers; + protected Connection connection; + protected Admin admin; private volatile @Nullable LakeCatalog lakeCatalog; @@ -272,7 +272,7 @@ public CatalogBaseTable getTable(ObjectPath objectPath) } } - private CatalogBaseTable getLakeTable(String databaseName, String tableName) + protected CatalogBaseTable getLakeTable(String databaseName, String tableName) throws TableNotExistException, CatalogException { mayInitLakeCatalogCatalog(); String[] tableComponents = tableName.split("\\" + LAKE_TABLE_SPLITTER); @@ -514,7 +514,7 @@ public void alterPartitionColumnStatistics( throw new UnsupportedOperationException(); } - private TablePath toTablePath(ObjectPath objectPath) { + protected TablePath toTablePath(ObjectPath objectPath) { return TablePath.of(objectPath.getDatabaseName(), objectPath.getObjectName()); } diff --git a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/catalog/FlinkTableFactory.java b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/catalog/FlinkTableFactory.java index dbe033bc..a02761da 100644 --- a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/catalog/FlinkTableFactory.java +++ b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/catalog/FlinkTableFactory.java @@ -48,10 +48,13 @@ import java.time.ZoneId; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; +import java.util.Map; import java.util.Set; import static com.alibaba.fluss.connector.flink.catalog.FlinkCatalog.LAKE_TABLE_SPLITTER; +import static com.alibaba.fluss.connector.flink.utils.FlinkConnectorOptionsUtils.getIndexKeys; import static org.apache.flink.configuration.ConfigOptions.key; /** Factory to create table source and table sink for Fluss. */ @@ -78,7 +81,8 @@ public DynamicTableSource createDynamicTableSource(Context context) { == RuntimeExecutionMode.STREAMING; final ReadableConfig tableOptions = helper.getOptions(); - FlinkConnectorOptionsUtils.validateTableSourceOptions(tableOptions); + RowType tableOutputType = (RowType) context.getPhysicalRowDataType().getLogicalType(); + FlinkConnectorOptionsUtils.validateTableSourceOptions(tableOptions, tableOutputType); ZoneId timeZone = FlinkConnectorOptionsUtils.getLocalTimeZone( @@ -90,8 +94,6 @@ public DynamicTableSource createDynamicTableSource(Context context) { ResolvedCatalogTable resolvedCatalogTable = context.getCatalogTable(); int[] primaryKeyIndexes = resolvedSchema.getPrimaryKeyIndexes(); - RowType tableOutputType = (RowType) context.getPhysicalRowDataType().getLogicalType(); - // options for lookup LookupCache cache = null; @@ -106,6 +108,15 @@ public DynamicTableSource createDynamicTableSource(Context context) { throw new UnsupportedOperationException("Full lookup caching is not supported yet."); } + // get and validate index lookup key. + Map indexKeys = new HashMap<>(); + if (tableOptions.getOptional(FlinkConnectorOptions.INDEX_LOOKUP_KEY).isPresent()) { + indexKeys = + getIndexKeys( + tableOptions.get(FlinkConnectorOptions.INDEX_LOOKUP_KEY), + tableOutputType); + } + return new FlinkTableSource( toFlussTablePath(context.getObjectIdentifier()), toFlussClientConfig(helper.getOptions(), context.getConfiguration()), @@ -123,7 +134,8 @@ public DynamicTableSource createDynamicTableSource(Context context) { tableOptions.get( key(ConfigOptions.TABLE_DATALAKE_ENABLED.key()) .booleanType() - .defaultValue(false))); + .defaultValue(false)), + indexKeys); } @Override @@ -166,6 +178,7 @@ public Set> optionalOptions() { FlinkConnectorOptions.SCAN_STARTUP_TIMESTAMP, FlinkConnectorOptions.SCAN_PARTITION_DISCOVERY_INTERVAL, FlinkConnectorOptions.LOOKUP_ASYNC, + FlinkConnectorOptions.INDEX_LOOKUP_KEY, LookupOptions.MAX_RETRIES, LookupOptions.CACHE_TYPE, LookupOptions.PARTIAL_CACHE_EXPIRE_AFTER_ACCESS, diff --git a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/FlinkTableSource.java b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/FlinkTableSource.java index 49ecb87f..bc6956c5 100644 --- a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/FlinkTableSource.java +++ b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/FlinkTableSource.java @@ -101,6 +101,7 @@ public class FlinkTableSource private final long scanPartitionDiscoveryIntervalMs; private final boolean isDataLakeEnabled; + private final Map indexKeys; // output type after projection pushdown private LogicalType producedDataType; @@ -130,7 +131,8 @@ public FlinkTableSource( boolean lookupAsync, @Nullable LookupCache cache, long scanPartitionDiscoveryIntervalMs, - boolean isDataLakeEnabled) { + boolean isDataLakeEnabled, + Map indexKeys) { this.tablePath = tablePath; this.flussConfig = flussConfig; this.tableOutputType = tableOutputType; @@ -146,6 +148,7 @@ public FlinkTableSource( this.scanPartitionDiscoveryIntervalMs = scanPartitionDiscoveryIntervalMs; this.isDataLakeEnabled = isDataLakeEnabled; + this.indexKeys = indexKeys; } @Override @@ -279,14 +282,20 @@ public boolean isBounded() { public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) { LookupNormalizer lookupNormalizer = LookupNormalizer.validateAndCreateLookupNormalizer( - context.getKeys(), primaryKeyIndexes, tableOutputType, projectedFields); + context.getKeys(), + primaryKeyIndexes, + indexKeys, + tableOutputType, + projectedFields); if (lookupAsync) { AsyncLookupFunction asyncLookupFunction = new FlinkAsyncLookupFunction( flussConfig, tablePath, tableOutputType, - primaryKeyIndexes, + lookupNormalizer.getIndexName() == null + ? primaryKeyIndexes + : indexKeys.get(lookupNormalizer.getIndexName()), lookupMaxRetryTimes, lookupNormalizer, projectedFields); @@ -301,7 +310,9 @@ public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) { flussConfig, tablePath, tableOutputType, - primaryKeyIndexes, + lookupNormalizer.getIndexName() == null + ? primaryKeyIndexes + : indexKeys.get(lookupNormalizer.getIndexName()), lookupMaxRetryTimes, lookupNormalizer, projectedFields); @@ -328,7 +339,8 @@ public DynamicTableSource copy() { lookupAsync, cache, scanPartitionDiscoveryIntervalMs, - isDataLakeEnabled); + isDataLakeEnabled, + indexKeys); source.producedDataType = producedDataType; source.projectedFields = projectedFields; source.singleRowFilter = singleRowFilter; diff --git a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/lookup/FlinkAsyncLookupFunction.java b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/lookup/FlinkAsyncLookupFunction.java index 87f38d07..cc466c83 100644 --- a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/lookup/FlinkAsyncLookupFunction.java +++ b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/lookup/FlinkAsyncLookupFunction.java @@ -18,7 +18,7 @@ import com.alibaba.fluss.client.Connection; import com.alibaba.fluss.client.ConnectionFactory; -import com.alibaba.fluss.client.table.LookupResult; +import com.alibaba.fluss.client.lookup.LookupResult; import com.alibaba.fluss.client.table.Table; import com.alibaba.fluss.config.Configuration; import com.alibaba.fluss.connector.flink.source.lookup.LookupNormalizer.RemainingFilter; @@ -40,8 +40,9 @@ import javax.annotation.Nullable; +import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; +import java.util.List; import java.util.concurrent.CompletableFuture; /** A flink async lookup function for fluss. */ @@ -55,7 +56,7 @@ public class FlinkAsyncLookupFunction extends AsyncLookupFunction { private final TablePath tablePath; private final int maxRetryTimes; private final RowType flinkRowType; - private final int[] pkIndexes; + private final int[] lookupKeyIndexes; private final LookupNormalizer lookupNormalizer; @Nullable private final int[] projection; @@ -68,7 +69,7 @@ public FlinkAsyncLookupFunction( Configuration flussConfig, TablePath tablePath, RowType flinkRowType, - int[] pkIndexes, + int[] lookupKeyIndexes, int maxRetryTimes, LookupNormalizer lookupNormalizer, @Nullable int[] projection) { @@ -76,17 +77,17 @@ public FlinkAsyncLookupFunction( this.tablePath = tablePath; this.maxRetryTimes = maxRetryTimes; this.flinkRowType = flinkRowType; - this.pkIndexes = pkIndexes; + this.lookupKeyIndexes = lookupKeyIndexes; this.lookupNormalizer = lookupNormalizer; this.projection = projection; } - private RowType toPkRowType(RowType rowType, int[] pkIndex) { - LogicalType[] types = new LogicalType[pkIndex.length]; - String[] names = new String[pkIndex.length]; - for (int i = 0; i < pkIndex.length; i++) { - types[i] = rowType.getTypeAt(pkIndex[i]); - names[i] = rowType.getFieldNames().get(pkIndex[i]); + private RowType toLookupKeyRowType(RowType rowType, int[] lookupKeyIndex) { + LogicalType[] types = new LogicalType[lookupKeyIndex.length]; + String[] names = new String[lookupKeyIndex.length]; + for (int i = 0; i < lookupKeyIndex.length; i++) { + types[i] = rowType.getTypeAt(lookupKeyIndex[i]); + names[i] = rowType.getFieldNames().get(lookupKeyIndex[i]); } return RowType.of(rowType.isNullable(), types, names); } @@ -99,7 +100,8 @@ public void open(FunctionContext context) { // TODO: convert to Fluss GenericRow to avoid unnecessary deserialization flinkRowToFlussRowConverter = FlinkRowToFlussRowConverter.create( - toPkRowType(flinkRowType, pkIndexes), table.getDescriptor().getKvFormat()); + toLookupKeyRowType(flinkRowType, lookupKeyIndexes), + table.getDescriptor().getKvFormat()); flussRowToFlinkRowConverter = new FlussRowToFlinkRowConverter(FlinkConversions.toFlussRowType(flinkRowType)); LOG.info("end open."); @@ -117,7 +119,7 @@ public CompletableFuture> asyncLookup(RowData keyRow) { InternalRow flussKeyRow = flinkRowToFlussRowConverter.toInternalRow(normalizedKeyRow); CompletableFuture> future = new CompletableFuture<>(); // fetch result - fetchResult(future, 0, flussKeyRow, remainingFilter); + fetchResult(future, 0, flussKeyRow, remainingFilter, lookupNormalizer.getIndexName()); return future; } @@ -128,13 +130,21 @@ public CompletableFuture> asyncLookup(RowData keyRow) { * @param currentRetry Current number of retries. * @param keyRow the key row to get. * @param remainingFilter the nullable remaining filter to filter the result. + * @param indexName the index name to get. */ private void fetchResult( CompletableFuture> resultFuture, int currentRetry, InternalRow keyRow, - @Nullable RemainingFilter remainingFilter) { - CompletableFuture responseFuture = table.lookup(keyRow); + @Nullable RemainingFilter remainingFilter, + @Nullable String indexName) { + CompletableFuture responseFuture; + if (indexName == null) { + responseFuture = table.lookup(keyRow); + } else { + responseFuture = table.indexLookup(indexName, keyRow); + } + responseFuture.whenComplete( (result, throwable) -> { if (throwable != null) { @@ -163,24 +173,24 @@ private void fetchResult( resultFuture.completeExceptionally(e1); } fetchResult( - resultFuture, currentRetry + 1, keyRow, remainingFilter); + resultFuture, + currentRetry + 1, + keyRow, + remainingFilter, + indexName); } } } else { - InternalRow row = result.getRow(); - if (row == null) { - resultFuture.complete(Collections.emptyList()); - } else { - // TODO: we can project fluss row first, - // to avoid deserialize unnecessary fields - RowData flinkRow = flussRowToFlinkRowConverter.toFlinkRowData(row); - if (remainingFilter != null && !remainingFilter.isMatch(flinkRow)) { - resultFuture.complete(Collections.emptyList()); - } else { - resultFuture.complete( - Collections.singletonList(maybeProject(flinkRow))); + List projectedRow = new ArrayList<>(); + for (InternalRow row : result.getRowList()) { + if (row != null) { + RowData flinkRow = flussRowToFlinkRowConverter.toFlinkRowData(row); + if (remainingFilter == null || remainingFilter.isMatch(flinkRow)) { + projectedRow.add(maybeProject(flinkRow)); + } } } + resultFuture.complete(projectedRow); } }); } diff --git a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/lookup/FlinkLookupFunction.java b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/lookup/FlinkLookupFunction.java index 2e28e856..b52984cc 100644 --- a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/lookup/FlinkLookupFunction.java +++ b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/lookup/FlinkLookupFunction.java @@ -37,8 +37,10 @@ import javax.annotation.Nullable; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.List; /** A flink lookup function for fluss. */ public class FlinkLookupFunction extends LookupFunction { @@ -50,7 +52,7 @@ public class FlinkLookupFunction extends LookupFunction { private final TablePath tablePath; private final int maxRetryTimes; private final RowType flinkRowType; - private final int[] pkIndexes; + private final int[] lookupKeyIndexes; private final LookupNormalizer lookupNormalizer; @Nullable private final int[] projection; @@ -64,7 +66,7 @@ public FlinkLookupFunction( Configuration flussConfig, TablePath tablePath, RowType flinkRowType, - int[] pkIndexes, + int[] lookupKeyIndexes, int maxRetryTimes, LookupNormalizer lookupNormalizer, @Nullable int[] projection) { @@ -72,17 +74,17 @@ public FlinkLookupFunction( this.tablePath = tablePath; this.maxRetryTimes = maxRetryTimes; this.flinkRowType = flinkRowType; - this.pkIndexes = pkIndexes; + this.lookupKeyIndexes = lookupKeyIndexes; this.lookupNormalizer = lookupNormalizer; this.projection = projection; } - private RowType toPkRowType(RowType rowType, int[] pkIndex) { - LogicalType[] types = new LogicalType[pkIndex.length]; - String[] names = new String[pkIndex.length]; - for (int i = 0; i < pkIndex.length; i++) { - types[i] = rowType.getTypeAt(pkIndex[i]); - names[i] = rowType.getFieldNames().get(pkIndex[i]); + private RowType toLookupKeyRowType(RowType rowType, int[] lookupKeyIndex) { + LogicalType[] types = new LogicalType[lookupKeyIndex.length]; + String[] names = new String[lookupKeyIndex.length]; + for (int i = 0; i < lookupKeyIndex.length; i++) { + types[i] = rowType.getTypeAt(lookupKeyIndex[i]); + names[i] = rowType.getFieldNames().get(lookupKeyIndex[i]); } return RowType.of(rowType.isNullable(), types, names); } @@ -98,7 +100,8 @@ public void open(FunctionContext context) { // TODO: convert to Fluss GenericRow to avoid unnecessary deserialization flinkRowToFlussRowConverter = FlinkRowToFlussRowConverter.create( - toPkRowType(flinkRowType, pkIndexes), table.getDescriptor().getKvFormat()); + toLookupKeyRowType(flinkRowType, lookupKeyIndexes), + table.getDescriptor().getKvFormat()); flussRowToFlinkRowConverter = new FlussRowToFlinkRowConverter(FlinkConversions.toFlussRowType(flinkRowType)); LOG.info("end open."); @@ -115,6 +118,7 @@ public Collection lookup(RowData keyRow) { RowData normalizedKeyRow = lookupNormalizer.normalizeLookupKey(keyRow); LookupNormalizer.RemainingFilter remainingFilter = lookupNormalizer.createRemainingFilter(keyRow); + String indexName = lookupNormalizer.getIndexName(); // to lookup a key, we will need to do two data conversion, // first is converting from flink row to fluss row, // second is extracting key from the fluss row when calling method table.get(flussKeyRow) @@ -122,16 +126,26 @@ public Collection lookup(RowData keyRow) { InternalRow flussKeyRow = flinkRowToFlussRowConverter.toInternalRow(normalizedKeyRow); for (int retry = 0; retry <= maxRetryTimes; retry++) { try { - InternalRow row = table.lookup(flussKeyRow).get().getRow(); - if (row != null) { - // TODO: we can project fluss row first, to avoid deserialize unnecessary fields - RowData flinkRow = flussRowToFlinkRowConverter.toFlinkRowData(row); - if (remainingFilter == null || remainingFilter.isMatch(flinkRow)) { - return Collections.singletonList(maybeProject(flinkRow)); - } else { - return Collections.emptyList(); + List projectedRows = new ArrayList<>(); + List lookupRows; + if (indexName == null) { + lookupRows = table.lookup(flussKeyRow).get().getRowList(); + } else { + lookupRows = table.indexLookup(indexName, flussKeyRow).get().getRowList(); + } + + for (InternalRow row : lookupRows) { + if (row != null) { + // TODO: we can project fluss row first, to avoid deserialize unnecessary + // fields + RowData flinkRow = flussRowToFlinkRowConverter.toFlinkRowData(row); + if (remainingFilter == null || remainingFilter.isMatch(flinkRow)) { + projectedRows.add(maybeProject(flinkRow)); + } } } + + return projectedRows; } catch (Exception e) { LOG.error(String.format("Fluss lookup error, retry times = %d", retry), e); if (retry >= maxRetryTimes) { diff --git a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/lookup/LookupNormalizer.java b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/lookup/LookupNormalizer.java index 21b1c120..75ed9c1c 100644 --- a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/lookup/LookupNormalizer.java +++ b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/source/lookup/LookupNormalizer.java @@ -30,6 +30,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.Set; import java.util.stream.Collectors; @@ -50,7 +51,11 @@ public class LookupNormalizer implements Serializable { private static final long serialVersionUID = 1L; - public static final LookupNormalizer NOOP_NORMALIZER = new LookupNormalizer(null, null, null); + public static final LookupNormalizer NOOP_NORMALIZER = + new LookupNormalizer(null, null, null, null); + + /** The index name. */ + @Nullable private final String indexName; /** Mapping from normalized key index to the lookup key index (in the lookup row). */ @Nullable private final FieldGetter[] normalizedKeyGetters; @@ -62,9 +67,11 @@ public class LookupNormalizer implements Serializable { @Nullable private final FieldGetter[] resultFieldGetters; private LookupNormalizer( + @Nullable String indexName, @Nullable FieldGetter[] normalizedKeyGetters, @Nullable FieldGetter[] conditionFieldGetters, @Nullable FieldGetter[] resultFieldGetters) { + this.indexName = indexName; this.normalizedKeyGetters = normalizedKeyGetters; this.conditionFieldGetters = conditionFieldGetters; this.resultFieldGetters = resultFieldGetters; @@ -76,6 +83,10 @@ private LookupNormalizer( } } + public @Nullable String getIndexName() { + return indexName; + } + public RowData normalizeLookupKey(RowData lookupKey) { if (normalizedKeyGetters == null) { return lookupKey; @@ -143,22 +154,16 @@ public boolean fieldMatches(RowData result) { public static LookupNormalizer validateAndCreateLookupNormalizer( int[][] lookupKeyIndexes, int[] primaryKeys, + Map indexKeys, RowType schema, @Nullable int[] projectedFields) { - if (primaryKeys.length == 0) { + if (primaryKeys.length == 0 && indexKeys == null) { throw new UnsupportedOperationException( - "Fluss lookup function only support lookup table with primary key."); + "Fluss lookup function only support lookup table with primary key or index key."); } - // we compare string names rather than int index for better error message and readability, - // the length of lookup key and primary key shouldn't be large, so the overhead is low. - String[] columnNames = schema.getFieldNames().toArray(new String[0]); - String[] primaryKeyNames = - Arrays.stream(primaryKeys).mapToObj(i -> columnNames[i]).toArray(String[]::new); - // get the lookup keys int[] lookupKeys = new int[lookupKeyIndexes.length]; - String[] lookupKeyNames = new String[lookupKeyIndexes.length]; - for (int i = 0; i < lookupKeyNames.length; i++) { + for (int i = 0; i < lookupKeys.length; i++) { int[] innerKeyArr = lookupKeyIndexes[i]; Preconditions.checkArgument( innerKeyArr.length == 1, "Do not support nested lookup keys"); @@ -169,25 +174,48 @@ public static LookupNormalizer validateAndCreateLookupNormalizer( } else { lookupKeys[i] = innerKeyArr[0]; } - lookupKeyNames[i] = columnNames[innerKeyArr[0]]; } - if (Arrays.equals(lookupKeys, primaryKeys)) { - return NOOP_NORMALIZER; + String indexName = getIndexName(lookupKeys, primaryKeys, indexKeys); + if (indexName == null) { + return createLookupNormalizer(lookupKeys, null, primaryKeys, schema); + } else { + return createLookupNormalizer(lookupKeys, indexName, indexKeys.get(indexName), schema); + } + } + + /** create a {@link LookupNormalizer}. */ + private static LookupNormalizer createLookupNormalizer( + int[] lookupKeys, @Nullable String indexName, int[] keys, RowType schema) { + // we compare string names rather than int index for better error message and readability, + // the length of lookup key and keys (primary key or index key) shouldn't be large, so the + // overhead is low. + String[] columnNames = schema.getFieldNames().toArray(new String[0]); + String[] keyNames = + Arrays.stream(keys).mapToObj(i -> columnNames[i]).toArray(String[]::new); + + // get the lookup keys + String[] lookupKeyNames = new String[lookupKeys.length]; + for (int i = 0; i < lookupKeyNames.length; i++) { + lookupKeyNames[i] = columnNames[lookupKeys[i]]; + } + + if (Arrays.equals(lookupKeys, keys)) { + return new LookupNormalizer(indexName, null, null, null); } - FieldGetter[] normalizedKeyGetters = new FieldGetter[primaryKeys.length]; - for (int i = 0; i < primaryKeyNames.length; i++) { - LogicalType fieldType = schema.getTypeAt(primaryKeys[i]); - int lookupKeyIndex = findIndex(lookupKeyNames, primaryKeyNames[i]); + FieldGetter[] normalizedKeyGetters = new FieldGetter[keys.length]; + for (int i = 0; i < keyNames.length; i++) { + LogicalType fieldType = schema.getTypeAt(keys[i]); + int lookupKeyIndex = findIndex(lookupKeyNames, keyNames[i]); normalizedKeyGetters[i] = RowData.createFieldGetter(fieldType, lookupKeyIndex); } - Set primaryKeySet = Arrays.stream(primaryKeys).boxed().collect(Collectors.toSet()); + Set keySet = Arrays.stream(keys).boxed().collect(Collectors.toSet()); List conditionFieldGetters = new ArrayList<>(); List resultFieldGetters = new ArrayList<>(); for (int i = 0; i < lookupKeys.length; i++) { - if (!primaryKeySet.contains(i)) { + if (!keySet.contains(i)) { LogicalType fieldType = schema.getTypeAt(lookupKeys[i]); conditionFieldGetters.add(RowData.createFieldGetter(fieldType, i)); resultFieldGetters.add(RowData.createFieldGetter(fieldType, lookupKeys[i])); @@ -195,6 +223,7 @@ public static LookupNormalizer validateAndCreateLookupNormalizer( } return new LookupNormalizer( + indexName, normalizedKeyGetters, conditionFieldGetters.toArray(new FieldGetter[0]), resultFieldGetters.toArray(new FieldGetter[0])); @@ -213,4 +242,51 @@ private static int findIndex(String[] columnNames, String key) { + "' in lookup keys " + Arrays.toString(columnNames)); } + + private static @Nullable String getIndexName( + int[] lookupKeys, int[] primaryKeys, Map indexKeys) { + StringBuilder stringBuilder = new StringBuilder(); + stringBuilder.append("lookupKeys: " + Arrays.toString(lookupKeys) + "\n"); + stringBuilder.append("primaryKeys: " + Arrays.toString(primaryKeys) + "\n"); + for (Map.Entry entry : indexKeys.entrySet()) { + int[] copyIndexKey = entry.getValue(); + stringBuilder.append( + "indexKey and value: " + + entry.getKey() + + ":" + + Arrays.toString(copyIndexKey) + + "\n"); + } + + List lookupKeyList = + Arrays.stream(lookupKeys).boxed().collect(Collectors.toList()); + List primaryKeyList = + Arrays.stream(primaryKeys).boxed().collect(Collectors.toList()); + if (lookupKeyList.size() >= primaryKeyList.size()) { + boolean isSubset = true; + for (Integer primaryKey : primaryKeyList) { + if (!lookupKeyList.contains(primaryKey)) { + isSubset = false; + break; + } + } + if (isSubset) { + return null; + } + } + + for (Map.Entry entry : indexKeys.entrySet()) { + int[] indexKey = entry.getValue(); + int[] copyIndexKey = indexKey.clone(); + Arrays.sort(copyIndexKey); + Arrays.sort(lookupKeys); + if (Arrays.equals(lookupKeys, copyIndexKey)) { + return entry.getKey(); + } + } + + throw new UnsupportedOperationException( + "There is no index key or primary key that matches the lookup keys. info: " + + stringBuilder); + } } diff --git a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/utils/FlinkConnectorOptionsUtils.java b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/utils/FlinkConnectorOptionsUtils.java index 4a17ac9a..a8ec9877 100644 --- a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/utils/FlinkConnectorOptionsUtils.java +++ b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/utils/FlinkConnectorOptionsUtils.java @@ -22,11 +22,19 @@ import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.api.config.TableConfigOptions; +import org.apache.flink.table.types.logical.RowType; import java.time.LocalDateTime; import java.time.ZoneId; import java.time.format.DateTimeFormatter; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import static com.alibaba.fluss.connector.flink.FlinkConnectorOptions.INDEX_LOOKUP_KEY; import static com.alibaba.fluss.connector.flink.FlinkConnectorOptions.SCAN_STARTUP_MODE; import static com.alibaba.fluss.connector.flink.FlinkConnectorOptions.SCAN_STARTUP_TIMESTAMP; import static com.alibaba.fluss.connector.flink.FlinkConnectorOptions.ScanStartupMode.TIMESTAMP; @@ -43,8 +51,10 @@ public static ZoneId getLocalTimeZone(String timeZone) { : ZoneId.of(timeZone); } - public static void validateTableSourceOptions(ReadableConfig tableOptions) { + public static void validateTableSourceOptions( + ReadableConfig tableOptions, RowType tableOutputType) { validateScanStartupMode(tableOptions); + validateIndexKeys(tableOptions, tableOutputType); } public static StartupOptions getStartupOptions(ReadableConfig tableOptions, ZoneId timeZone) { @@ -73,6 +83,81 @@ private static void validateScanStartupMode(ReadableConfig tableOptions) { } } + private static void validateIndexKeys(ReadableConfig tableOptions, RowType tableOutputType) { + if (!tableOptions.getOptional(INDEX_LOOKUP_KEY).isPresent()) { + return; + } + + List columnNames = tableOutputType.getFieldNames(); + + String indexKeysString = tableOptions.get(INDEX_LOOKUP_KEY); + String indexKeysError = detectInvalidIndexKeys(indexKeysString, columnNames); + if (indexKeysError != null) { + throw new ValidationException( + "Option 'index.key' = '" + indexKeysString + "' is invalid: " + indexKeysError); + } + } + + private static String detectInvalidIndexKeys(String indexKeysString, List columnNames) { + String[] indexKeyStringList = indexKeysString.split(";"); + if (indexKeyStringList.length <= 0) { + return "Index key is empty or index key doesn't split by ';'."; + } + + List orderedIndexKey = new ArrayList<>(); + for (String indexKeyFieldsStr : indexKeyStringList) { + String[] indexNameAndFields = indexKeyFieldsStr.split("="); + if (indexNameAndFields.length != 2) { + return "There is an index key not follow the format 'indexKeyName=indexKeyFields'."; + } + + String[] fieldStrings = indexNameAndFields[1].split(","); + if (fieldStrings.length <= 0) { + return "There is an index key is empty or column field in index key doesn't split by ','."; + } + + List indexKeyPosList = new ArrayList<>(); + for (String field : fieldStrings) { + int fieldIndex = columnNames.indexOf(field); + if (fieldIndex < 0) { + return "Index key '" + field + "' does not exist in the schema."; + } + indexKeyPosList.add(fieldIndex); + } + + Collections.sort(indexKeyPosList); + if (orderedIndexKey.stream() + .anyMatch( + f -> + Arrays.equals( + f, + indexKeyPosList.stream() + .mapToInt(Integer::intValue) + .toArray()))) { + return "There is an index key is duplicate."; + } + orderedIndexKey.add(indexKeyPosList.stream().mapToInt(Integer::intValue).toArray()); + } + + return null; + } + + public static Map getIndexKeys(String indexKeyString, RowType rowType) { + List columnNames = rowType.getFieldNames(); + Map indexKeyIndexes = new HashMap<>(); + for (String indexKeyStr : indexKeyString.split(";")) { + String[] indexNameAndFields = indexKeyStr.split("="); + String indexKeyName = indexNameAndFields[0]; + List indexKeyList = new ArrayList<>(); + for (String field : indexNameAndFields[1].split(",")) { + indexKeyList.add(columnNames.indexOf(field)); + } + indexKeyIndexes.put( + indexKeyName, indexKeyList.stream().mapToInt(Integer::intValue).toArray()); + } + return indexKeyIndexes; + } + /** * Parses timestamp String to Long. * diff --git a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/utils/FlinkConversions.java b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/utils/FlinkConversions.java index 0f038fd8..a739a71f 100644 --- a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/utils/FlinkConversions.java +++ b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/utils/FlinkConversions.java @@ -50,6 +50,7 @@ import static com.alibaba.fluss.connector.flink.FlinkConnectorOptions.BUCKET_KEY; import static com.alibaba.fluss.connector.flink.FlinkConnectorOptions.BUCKET_NUMBER; +import static com.alibaba.fluss.connector.flink.FlinkConnectorOptions.INDEX_LOOKUP_KEY; import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR; /** Utils for conversion between Flink and Fluss. */ @@ -213,6 +214,11 @@ public static TableDescriptor toFlussTable(ResolvedCatalogTable catalogTable) { // then set distributed by information List bucketKey; if (flinkTableConf.containsKey(BUCKET_KEY.key())) { + if (flinkTableConf.containsKey(INDEX_LOOKUP_KEY.key())) { + throw new CatalogException( + "Bucket key and table index key cannot be set at the same time."); + } + bucketKey = Arrays.stream(flinkTableConf.get(BUCKET_KEY).split(",")) .map(String::trim) diff --git a/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/gateway/TabletServerGateway.java b/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/gateway/TabletServerGateway.java index 543dfab1..f77cee8c 100644 --- a/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/gateway/TabletServerGateway.java +++ b/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/gateway/TabletServerGateway.java @@ -19,6 +19,8 @@ import com.alibaba.fluss.rpc.RpcGateway; import com.alibaba.fluss.rpc.messages.FetchLogRequest; import com.alibaba.fluss.rpc.messages.FetchLogResponse; +import com.alibaba.fluss.rpc.messages.IndexLookupRequest; +import com.alibaba.fluss.rpc.messages.IndexLookupResponse; import com.alibaba.fluss.rpc.messages.InitWriterRequest; import com.alibaba.fluss.rpc.messages.InitWriterResponse; import com.alibaba.fluss.rpc.messages.LimitScanRequest; @@ -99,6 +101,14 @@ CompletableFuture notifyLeaderAndIsr( @RPC(api = ApiKeys.LOOKUP) CompletableFuture lookup(LookupRequest request); + /** + * Index lookup to get value by index key. + * + * @return Index lookup response. + */ + @RPC(api = ApiKeys.INDEX_LOOKUP) + CompletableFuture indexLookup(IndexLookupRequest request); + /** * Get limit number of values from the specified table bucket. * diff --git a/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/netty/server/RequestsMetrics.java b/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/netty/server/RequestsMetrics.java index 212f9536..9c3a01c5 100644 --- a/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/netty/server/RequestsMetrics.java +++ b/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/netty/server/RequestsMetrics.java @@ -93,6 +93,8 @@ private static String toRequestName(ApiKeys apiKeys, boolean isFromFollower) { return "putKv"; case LOOKUP: return "lookup"; + case INDEX_LOOKUP: + return "indexLookup"; case FETCH_LOG: return isFromFollower ? "fetchLogFollower" : "fetchLogClient"; default: diff --git a/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/protocol/ApiKeys.java b/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/protocol/ApiKeys.java index 949917b2..a5c6160d 100644 --- a/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/protocol/ApiKeys.java +++ b/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/protocol/ApiKeys.java @@ -61,7 +61,8 @@ public enum ApiKeys { NOTIFY_LAKE_TABLE_OFFSET(1031, 0, 0, PRIVATE), DESCRIBE_LAKE_STORAGE(1032, 0, 0, PUBLIC), GET_LAKE_TABLE_SNAPSHOT(1033, 0, 0, PUBLIC), - LIMIT_SCAN(1034, 0, 0, PUBLIC); + LIMIT_SCAN(1034, 0, 0, PUBLIC), + INDEX_LOOKUP(1035, 0, 0, PUBLIC); private static final Map ID_TO_TYPE = Arrays.stream(ApiKeys.values()) diff --git a/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/protocol/Errors.java b/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/protocol/Errors.java index 11ad3195..1601276c 100644 --- a/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/protocol/Errors.java +++ b/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/protocol/Errors.java @@ -28,6 +28,7 @@ import com.alibaba.fluss.exception.InvalidConfigException; import com.alibaba.fluss.exception.InvalidCoordinatorException; import com.alibaba.fluss.exception.InvalidDatabaseException; +import com.alibaba.fluss.exception.InvalidIndexKeysException; import com.alibaba.fluss.exception.InvalidReplicationFactorException; import com.alibaba.fluss.exception.InvalidRequiredAcksException; import com.alibaba.fluss.exception.InvalidTableException; @@ -174,8 +175,8 @@ public enum Errors { INVALID_TIMESTAMP_EXCEPTION(38, "The timestamp is invalid.", InvalidTimestampException::new), INVALID_CONFIG_EXCEPTION(39, "The config is invalid.", InvalidConfigException::new), LAKE_STORAGE_NOT_CONFIGURED_EXCEPTION( - 40, "The lake storage is not configured.", LakeStorageNotConfiguredException::new); - ; + 40, "The lake storage is not configured.", LakeStorageNotConfiguredException::new), + INVALID_INDEX_KEYS_EXCEPTION(41, "The index keys is invalid", InvalidIndexKeysException::new); private static final Logger LOG = LoggerFactory.getLogger(Errors.class); diff --git a/fluss-rpc/src/main/proto/FlussApi.proto b/fluss-rpc/src/main/proto/FlussApi.proto index 4c68064a..96fc1e9d 100644 --- a/fluss-rpc/src/main/proto/FlussApi.proto +++ b/fluss-rpc/src/main/proto/FlussApi.proto @@ -202,6 +202,16 @@ message LookupResponse { repeated PbLookupRespForBucket buckets_resp = 1; } +// Index Lookup request and response +message IndexLookupRequest { + required int64 table_id = 1; + repeated PbIndexLookupReqForBucket buckets_req = 2; +} + +message IndexLookupResponse { + repeated PbIndexLookupRespForBucket buckets_resp = 1; +} + // limit scan request and response message LimitScanRequest { @@ -550,6 +560,27 @@ message PbValue { optional bytes values = 1; } +message PbIndexLookupReqForBucket { + optional int64 partition_id = 1; + required int32 bucket_id = 2; + repeated bytes keys = 3; +} + +message PbIndexData { + required string index_name = 1; + required bytes index_key = 2; +} + +message PbIndexLookupRespForBucket { + optional int64 partition_id = 1; + required int32 bucket_id = 2; + repeated PbIndexLookupRespForKey keys_resp = 3; +} + +message PbIndexLookupRespForKey { + repeated bytes values = 1; +} + message PbTableBucket { required int64 table_id = 1; optional int64 partition_id = 2; diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorService.java b/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorService.java index 0271b5c2..321c5c8f 100644 --- a/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorService.java +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorService.java @@ -20,6 +20,7 @@ import com.alibaba.fluss.config.ConfigOptions; import com.alibaba.fluss.config.Configuration; import com.alibaba.fluss.exception.InvalidDatabaseException; +import com.alibaba.fluss.exception.InvalidIndexKeysException; import com.alibaba.fluss.exception.InvalidTableException; import com.alibaba.fluss.fs.FileSystem; import com.alibaba.fluss.metadata.Schema; @@ -133,6 +134,11 @@ public CompletableFuture createTable(CreateTableRequest req } TableDescriptor tableDescriptor = TableDescriptor.fromJsonBytes(request.getTableJson()); + try { + tableDescriptor.validate(); + } catch (InvalidIndexKeysException e) { + return FutureUtils.failedFuture(e); + } int bucketCount = defaultBucketNumber; // not set distribution diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/kv/KvTablet.java b/fluss-server/src/main/java/com/alibaba/fluss/server/kv/KvTablet.java index 7dc740ce..3615f071 100644 --- a/fluss-server/src/main/java/com/alibaba/fluss/server/kv/KvTablet.java +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/kv/KvTablet.java @@ -470,6 +470,15 @@ public List multiGet(List keys) throws IOException { }); } + public List indexLookup(byte[] indexKey) throws IOException { + return inReadLock( + kvLock, + () -> { + rocksDBKv.checkIfRocksDBClosed(); + return rocksDBKv.indexLookup(indexKey); + }); + } + public List limitScan(int limit) throws IOException { return inReadLock( kvLock, diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/kv/rocksdb/RocksDBKv.java b/fluss-server/src/main/java/com/alibaba/fluss/server/kv/rocksdb/RocksDBKv.java index aa71749e..61be998b 100644 --- a/fluss-server/src/main/java/com/alibaba/fluss/server/kv/rocksdb/RocksDBKv.java +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/kv/rocksdb/RocksDBKv.java @@ -35,6 +35,8 @@ import java.util.ArrayList; import java.util.List; +import static com.alibaba.fluss.utils.BytesUtils.isPrefixEquals; + /** A wrapper for the operation of {@link org.rocksdb.RocksDB}. */ public class RocksDBKv implements AutoCloseable { @@ -56,7 +58,7 @@ public class RocksDBKv implements AutoCloseable { * {@link RocksDB#open(String)} is different from that by {@link * RocksDB#getDefaultColumnFamily()}, probably it's a bug of RocksDB java API. */ - private final ColumnFamilyHandle defaultColumnFamily; + private final ColumnFamilyHandle defaultColumnFamilyHandle; /** Our RocksDB database. Currently, one kv tablet, one RocksDB instance. */ protected final RocksDB db; @@ -73,7 +75,7 @@ public RocksDBKv( this.db = db; this.rocksDBResourceGuard = rocksDBResourceGuard; this.writeOptions = optionsContainer.getWriteOptions(); - this.defaultColumnFamily = defaultColumnFamilyHandle; + this.defaultColumnFamilyHandle = defaultColumnFamilyHandle; } public ResourceGuard getResourceGuard() { @@ -100,10 +102,28 @@ public List multiGet(List keys) throws IOException { } } + public List indexLookup(byte[] indexKey) throws IOException { + List pkList = new ArrayList<>(); + ReadOptions readOptions = new ReadOptions(); + RocksIterator iterator = db.newIterator(defaultColumnFamilyHandle, readOptions); + try { + iterator.seek(indexKey); + while (iterator.isValid() && isPrefixEquals(indexKey, iterator.key())) { + pkList.add(iterator.value()); + iterator.next(); + } + } finally { + readOptions.close(); + iterator.close(); + } + + return pkList; + } + public List limitScan(Integer limit) { List pkList = new ArrayList<>(); ReadOptions readOptions = new ReadOptions(); - RocksIterator iterator = db.newIterator(defaultColumnFamily, readOptions); + RocksIterator iterator = db.newIterator(defaultColumnFamilyHandle, readOptions); int count = 0; iterator.seekToFirst(); @@ -165,8 +185,8 @@ public void close() throws Exception { // Start with default CF ... List columnFamilyOptions = new ArrayList<>(); RocksDBOperationUtils.addColumnFamilyOptionsToCloseLater( - columnFamilyOptions, defaultColumnFamily); - IOUtils.closeQuietly(defaultColumnFamily); + columnFamilyOptions, defaultColumnFamilyHandle); + IOUtils.closeQuietly(defaultColumnFamilyHandle); // ... and finally close the DB instance ... IOUtils.closeQuietly(db); diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/metrics/group/PhysicalTableMetricGroup.java b/fluss-server/src/main/java/com/alibaba/fluss/server/metrics/group/PhysicalTableMetricGroup.java index 18b2ee6c..5deedab1 100644 --- a/fluss-server/src/main/java/com/alibaba/fluss/server/metrics/group/PhysicalTableMetricGroup.java +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/metrics/group/PhysicalTableMetricGroup.java @@ -201,6 +201,22 @@ public Counter failedLimitScanRequests() { } } + public Counter totalIndexLookupRequests() { + if (kvMetrics == null) { + return NoOpCounter.INSTANCE; + } else { + return kvMetrics.totalIndexLookupRequests; + } + } + + public Counter failedIndexLookupRequests() { + if (kvMetrics == null) { + return NoOpCounter.INSTANCE; + } else { + return kvMetrics.failedIndexLookupRequests; + } + } + // ------------------------------------------------------------------------ // bucket groups // ------------------------------------------------------------------------ @@ -326,6 +342,8 @@ private static class KvMetricGroup extends TabletMetricGroup { private final Counter failedPutKvRequests; private final Counter totalLimitScanRequests; private final Counter failedLimitScanRequests; + private final Counter totalIndexLookupRequests; + private final Counter failedIndexLookupRequests; public KvMetricGroup(PhysicalTableMetricGroup physicalTableMetricGroup) { super(physicalTableMetricGroup, TabletType.KV); @@ -349,6 +367,16 @@ public KvMetricGroup(PhysicalTableMetricGroup physicalTableMetricGroup) { meter( MetricNames.FAILED_LIMIT_SCAN_REQUESTS_RATE, new MeterView(failedLimitScanRequests)); + + // for index lookup request + totalIndexLookupRequests = new ThreadSafeSimpleCounter(); + meter( + MetricNames.TOTAL_INDEX_LOOKUP_REQUESTS_RATE, + new MeterView(totalIndexLookupRequests)); + failedIndexLookupRequests = new ThreadSafeSimpleCounter(); + meter( + MetricNames.FAILED_INDEX_LOOKUP_REQUESTS_RATE, + new MeterView(failedIndexLookupRequests)); } @Override diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/replica/Replica.java b/fluss-server/src/main/java/com/alibaba/fluss/server/replica/Replica.java index 337b3f50..430a8420 100644 --- a/fluss-server/src/main/java/com/alibaba/fluss/server/replica/Replica.java +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/replica/Replica.java @@ -167,6 +167,7 @@ public final class Replica { private final long logTTLMs; private final boolean dataLakeEnabled; private final int tieredLogLocalSegments; + private final Map indexKeyIndexes; private final AtomicReference leaderReplicaIdOpt = new AtomicReference<>(); private final ReadWriteLock leaderIsrUpdateLock = new ReentrantReadWriteLock(); @@ -228,6 +229,7 @@ public Replica( this.dataLakeEnabled = tableDescriptor.isDataLakeEnabled(); this.tieredLogLocalSegments = tableDescriptor.getTieredLogLocalSegments(); this.partitionKeys = tableDescriptor.getPartitionKeys(); + this.indexKeyIndexes = tableDescriptor.getIndexKeyIndexes(); this.snapshotContext = snapshotContext; // create a closeable registry for the replica this.closeableRegistry = new CloseableRegistry(); @@ -307,6 +309,10 @@ public long getLogTTLMs() { return logTTLMs; } + public boolean supportIndexLookup() { + return indexKeyIndexes != null && !indexKeyIndexes.isEmpty(); + } + public int writerIdCount() { return logTablet.getWriterIdCount(); } @@ -1028,6 +1034,38 @@ public List lookups(List keys) { }); } + public List indexLookup(byte[] indexKey) { + if (!isKvTable()) { + throw new NonPrimaryKeyTableException( + "Try to do index lookup on a non primary key table: " + getTablePath()); + } + + return inReadLock( + leaderIsrUpdateLock, + () -> { + try { + if (!isLeader()) { + throw new NotLeaderOrFollowerException( + String.format( + "Leader not local for bucket %s on tabletServer %d", + tableBucket, localTabletServerId)); + } + checkNotNull( + kvTablet, "KvTablet for the replica to get key shouldn't be null."); + // the index key is serialized by index name + index key fields, see + // IndexKeyEncoder. + return kvTablet.indexLookup(indexKey); + } catch (IOException e) { + String errorMsg = + String.format( + "Failed to do index lookup from local kv for table bucket %s, the cause is: %s", + tableBucket, e.getMessage()); + LOG.error(errorMsg, e); + throw new KvStorageException(errorMsg, e); + } + }); + } + public DefaultValueRecordBatch limitKvScan(int limit) { if (!isKvTable()) { throw new NonPrimaryKeyTableException( diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/replica/ReplicaManager.java b/fluss-server/src/main/java/com/alibaba/fluss/server/replica/ReplicaManager.java index 22edc5cc..0236c373 100644 --- a/fluss-server/src/main/java/com/alibaba/fluss/server/replica/ReplicaManager.java +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/replica/ReplicaManager.java @@ -24,6 +24,7 @@ import com.alibaba.fluss.exception.FlussRuntimeException; import com.alibaba.fluss.exception.InvalidCoordinatorException; import com.alibaba.fluss.exception.InvalidRequiredAcksException; +import com.alibaba.fluss.exception.KvStorageException; import com.alibaba.fluss.exception.LogOffsetOutOfRangeException; import com.alibaba.fluss.exception.LogStorageException; import com.alibaba.fluss.exception.NotLeaderOrFollowerException; @@ -50,9 +51,12 @@ import com.alibaba.fluss.rpc.entity.PutKvResultForBucket; import com.alibaba.fluss.rpc.entity.WriteResultForBucket; import com.alibaba.fluss.rpc.gateway.CoordinatorGateway; +import com.alibaba.fluss.rpc.messages.IndexLookupResponse; import com.alibaba.fluss.rpc.messages.NotifyKvSnapshotOffsetResponse; import com.alibaba.fluss.rpc.messages.NotifyLakeTableOffsetResponse; import com.alibaba.fluss.rpc.messages.NotifyRemoteLogOffsetsResponse; +import com.alibaba.fluss.rpc.messages.PbIndexLookupRespForBucket; +import com.alibaba.fluss.rpc.messages.PbIndexLookupRespForKey; import com.alibaba.fluss.rpc.protocol.ApiError; import com.alibaba.fluss.rpc.protocol.Errors; import com.alibaba.fluss.server.coordinator.CoordinatorContext; @@ -456,6 +460,50 @@ public void multiLookupValues( responseCallback.accept(lookupResultForBucketMap); } + /** Lookup by index keys on kv store. */ + public void indexLookup( + Map> entriesPerBucket, + Consumer responseCallback) { + IndexLookupResponse response = new IndexLookupResponse(); + PhysicalTableMetricGroup tableMetrics = null; + List resultForAll = new ArrayList<>(); + for (Map.Entry> entry : entriesPerBucket.entrySet()) { + TableBucket tb = entry.getKey(); + PbIndexLookupRespForBucket respForBucket = new PbIndexLookupRespForBucket(); + respForBucket.setBucketId(tb.getBucket()); + try { + Replica replica = getReplicaOrException(tb); + if (!replica.supportIndexLookup()) { + throw new KvStorageException( + "Table bucket " + tb + " does not support index lookup"); + } + + tableMetrics = replica.tableMetrics(); + tableMetrics.totalIndexLookupRequests().inc(); + List keyResultList = new ArrayList<>(); + for (byte[] indexKey : entry.getValue()) { + PbIndexLookupRespForKey pbIndexLookupRespForKey = new PbIndexLookupRespForKey(); + for (byte[] result : replica.indexLookup(indexKey)) { + pbIndexLookupRespForKey.addValue(result); + } + keyResultList.add(pbIndexLookupRespForKey); + } + respForBucket.addAllKeysResps(keyResultList); + } catch (Exception e) { + LOG.error("Error processing index lookup operation on replica {}", tb, e); + if (tableMetrics != null) { + tableMetrics.failedIndexLookupRequests().inc(); + } + throw e; + } + + resultForAll.add(respForBucket); + } + + response.addAllBucketsResps(resultForAll); + responseCallback.accept(response); + } + public void listOffsets( ListOffsetsParam listOffsetsParam, Set tableBuckets, diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/tablet/TabletService.java b/fluss-server/src/main/java/com/alibaba/fluss/server/tablet/TabletService.java index 085c80b7..f246d59e 100644 --- a/fluss-server/src/main/java/com/alibaba/fluss/server/tablet/TabletService.java +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/tablet/TabletService.java @@ -25,6 +25,8 @@ import com.alibaba.fluss.rpc.gateway.TabletServerGateway; import com.alibaba.fluss.rpc.messages.FetchLogRequest; import com.alibaba.fluss.rpc.messages.FetchLogResponse; +import com.alibaba.fluss.rpc.messages.IndexLookupRequest; +import com.alibaba.fluss.rpc.messages.IndexLookupResponse; import com.alibaba.fluss.rpc.messages.InitWriterRequest; import com.alibaba.fluss.rpc.messages.InitWriterResponse; import com.alibaba.fluss.rpc.messages.LimitScanRequest; @@ -62,6 +64,7 @@ import java.util.concurrent.CompletableFuture; import static com.alibaba.fluss.server.utils.RpcMessageUtils.makeLookupResponse; +import static com.alibaba.fluss.server.utils.RpcMessageUtils.toIndexLookupData; import static com.alibaba.fluss.server.utils.RpcMessageUtils.toLookupData; /** An RPC Gateway service for tablet server. */ @@ -142,6 +145,13 @@ public CompletableFuture lookup(LookupRequest request) { return response; } + @Override + public CompletableFuture indexLookup(IndexLookupRequest request) { + CompletableFuture response = new CompletableFuture<>(); + replicaManager.indexLookup(toIndexLookupData(request), response::complete); + return response; + } + @Override public CompletableFuture limitScan(LimitScanRequest request) { CompletableFuture response = new CompletableFuture<>(); diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/utils/RpcMessageUtils.java b/fluss-server/src/main/java/com/alibaba/fluss/server/utils/RpcMessageUtils.java index 69d626a5..5a31b64d 100644 --- a/fluss-server/src/main/java/com/alibaba/fluss/server/utils/RpcMessageUtils.java +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/utils/RpcMessageUtils.java @@ -52,6 +52,7 @@ import com.alibaba.fluss.rpc.messages.GetKvSnapshotResponse; import com.alibaba.fluss.rpc.messages.GetLakeTableSnapshotResponse; import com.alibaba.fluss.rpc.messages.GetPartitionSnapshotResponse; +import com.alibaba.fluss.rpc.messages.IndexLookupRequest; import com.alibaba.fluss.rpc.messages.InitWriterResponse; import com.alibaba.fluss.rpc.messages.LimitScanResponse; import com.alibaba.fluss.rpc.messages.ListOffsetsRequest; @@ -72,6 +73,7 @@ import com.alibaba.fluss.rpc.messages.PbFetchLogReqForTable; import com.alibaba.fluss.rpc.messages.PbFetchLogRespForBucket; import com.alibaba.fluss.rpc.messages.PbFetchLogRespForTable; +import com.alibaba.fluss.rpc.messages.PbIndexLookupReqForBucket; import com.alibaba.fluss.rpc.messages.PbKeyValue; import com.alibaba.fluss.rpc.messages.PbLakeSnapshotForBucket; import com.alibaba.fluss.rpc.messages.PbLakeStorageInfo; @@ -602,6 +604,28 @@ public static Map> toLookupData(LookupRequest lookupRe return lookupEntryData; } + public static Map> toIndexLookupData( + IndexLookupRequest indexLookupRequest) { + long tableId = indexLookupRequest.getTableId(); + Map> lookupEntryData = new HashMap<>(); + for (PbIndexLookupReqForBucket lookupReqForBucket : + indexLookupRequest.getBucketsReqsList()) { + TableBucket tb = + new TableBucket( + tableId, + lookupReqForBucket.hasPartitionId() + ? lookupReqForBucket.getPartitionId() + : null, + lookupReqForBucket.getBucketId()); + List keys = new ArrayList<>(lookupReqForBucket.getKeysCount()); + for (int i = 0; i < lookupReqForBucket.getKeysCount(); i++) { + keys.add(lookupReqForBucket.getKeyAt(i)); + } + lookupEntryData.put(tb, keys); + } + return lookupEntryData; + } + public static @Nullable int[] getTargetColumns(PutKvRequest putKvRequest) { int[] targetColumns = putKvRequest.getTargetColumns(); return targetColumns.length == 0 ? null : targetColumns; diff --git a/fluss-server/src/test/java/com/alibaba/fluss/server/log/remote/RemoteLogManagerTest.java b/fluss-server/src/test/java/com/alibaba/fluss/server/log/remote/RemoteLogManagerTest.java index 168d9e6e..1a9a08c3 100644 --- a/fluss-server/src/test/java/com/alibaba/fluss/server/log/remote/RemoteLogManagerTest.java +++ b/fluss-server/src/test/java/com/alibaba/fluss/server/log/remote/RemoteLogManagerTest.java @@ -16,6 +16,7 @@ package com.alibaba.fluss.server.log.remote; +import com.alibaba.fluss.config.ConfigOptions; import com.alibaba.fluss.fs.FsPath; import com.alibaba.fluss.metadata.TableBucket; import com.alibaba.fluss.remote.RemoteLogFetchInfo; @@ -39,7 +40,9 @@ import java.util.stream.Collectors; import static com.alibaba.fluss.record.TestData.DATA1_PHYSICAL_TABLE_PATH; +import static com.alibaba.fluss.record.TestData.DATA1_SCHEMA; import static com.alibaba.fluss.record.TestData.DATA1_TABLE_ID; +import static com.alibaba.fluss.record.TestData.DATA1_TABLE_PATH; import static com.alibaba.fluss.server.zk.data.LeaderAndIsr.INITIAL_LEADER_EPOCH; import static com.alibaba.fluss.utils.FlussPaths.remoteLogDir; import static com.alibaba.fluss.utils.FlussPaths.remoteLogTabletDir; @@ -412,7 +415,14 @@ void testCleanupLocalSegments(boolean partitionTable) throws Exception { @ValueSource(booleans = {true, false}) void testConfigureTieredLogLocalSegments(boolean partitionedTable) throws Exception { int tieredLogLocalSegments = 8; - long tableId = registerTableInZkClient(tieredLogLocalSegments); + long tableId = + registerTableInZkClient( + DATA1_TABLE_PATH, + DATA1_SCHEMA, + 200L, + Collections.singletonMap( + ConfigOptions.TABLE_TIERED_LOG_LOCAL_SEGMENTS.key(), + String.valueOf(tieredLogLocalSegments))); TableBucket tb = makeTableBucket(tableId, partitionedTable); // make leader, and then remote log tablet should be created. diff --git a/fluss-server/src/test/java/com/alibaba/fluss/server/replica/ReplicaManagerTest.java b/fluss-server/src/test/java/com/alibaba/fluss/server/replica/ReplicaManagerTest.java index ac0c81a1..4068ee74 100644 --- a/fluss-server/src/test/java/com/alibaba/fluss/server/replica/ReplicaManagerTest.java +++ b/fluss-server/src/test/java/com/alibaba/fluss/server/replica/ReplicaManagerTest.java @@ -16,6 +16,7 @@ package com.alibaba.fluss.server.replica; +import com.alibaba.fluss.config.ConfigOptions; import com.alibaba.fluss.exception.InvalidRequiredAcksException; import com.alibaba.fluss.metadata.PhysicalTablePath; import com.alibaba.fluss.metadata.TableBucket; @@ -68,9 +69,11 @@ import static com.alibaba.fluss.record.TestData.DATA1; import static com.alibaba.fluss.record.TestData.DATA1_KEY_TYPE; import static com.alibaba.fluss.record.TestData.DATA1_ROW_TYPE; +import static com.alibaba.fluss.record.TestData.DATA1_SCHEMA_PK; import static com.alibaba.fluss.record.TestData.DATA1_TABLE_ID; import static com.alibaba.fluss.record.TestData.DATA1_TABLE_ID_PK; import static com.alibaba.fluss.record.TestData.DATA1_TABLE_PATH; +import static com.alibaba.fluss.record.TestData.DATA1_TABLE_PATH_PK; import static com.alibaba.fluss.record.TestData.DATA_1_WITH_KEY_AND_VALUE; import static com.alibaba.fluss.record.TestData.DEFAULT_SCHEMA_ID; import static com.alibaba.fluss.server.coordinator.CoordinatorContext.INITIAL_COORDINATOR_EPOCH; @@ -438,7 +441,10 @@ void testPutKvWithOutOfBatchSequence() throws Exception { replicaManager.putRecordsToKv( 20000, 1, - Collections.singletonMap(tb, genKvRecordBatchWithWriterId(data1, 100L, 0)), + Collections.singletonMap( + tb, + genKvRecordBatchWithWriterId( + data1, DATA1_KEY_TYPE, DATA1_ROW_TYPE, 100L, 0)), null, future::complete); assertThat(future.get()).containsOnly(new PutKvResultForBucket(tb, 5)); @@ -475,7 +481,10 @@ void testPutKvWithOutOfBatchSequence() throws Exception { replicaManager.putRecordsToKv( 20000, 1, - Collections.singletonMap(tb, genKvRecordBatchWithWriterId(data2, 100L, 3)), + Collections.singletonMap( + tb, + genKvRecordBatchWithWriterId( + data2, DATA1_KEY_TYPE, DATA1_ROW_TYPE, 100L, 3)), null, future::complete); PutKvResultForBucket putKvResultForBucket = future.get().get(0); @@ -508,7 +517,10 @@ void testPutKvWithOutOfBatchSequence() throws Exception { replicaManager.putRecordsToKv( 20000, 1, - Collections.singletonMap(tb, genKvRecordBatchWithWriterId(data3, 100L, 1)), + Collections.singletonMap( + tb, + genKvRecordBatchWithWriterId( + data3, DATA1_KEY_TYPE, DATA1_ROW_TYPE, 100L, 1)), null, future::complete); assertThat(future.get()).containsOnly(new PutKvResultForBucket(tb, 8)); @@ -563,7 +575,7 @@ void testLookup() throws Exception { byte[] key3Bytes = keyEncoder.encode(row(DATA1_KEY_TYPE, key3)); verifyLookup(tb, key3Bytes, null); - // Get key from none pk table. + // Lookup from none pk table. TableBucket tb2 = new TableBucket(DATA1_TABLE_ID, 1); makeLogTableAsLeader(tb2.getBucket()); replicaManager.multiLookupValues( @@ -578,6 +590,20 @@ void testLookup() throws Exception { }); } + @Test + void testIndexLookup() throws Exception { + long tableId = + registerTableInZkClient( + DATA1_TABLE_PATH_PK, + DATA1_SCHEMA_PK, + DATA1_TABLE_ID_PK, + Collections.singletonMap(ConfigOptions.TABLE_INDEX_KEY.key(), "idx0=b")); + TableBucket tb = new TableBucket(tableId, 0); + makeKvTableAsLeader(tb.getBucket()); + + // TODO + } + @Test void testLimitScanPrimaryKeyTable() throws Exception { TableBucket tb = new TableBucket(DATA1_TABLE_ID_PK, 1); diff --git a/fluss-server/src/test/java/com/alibaba/fluss/server/replica/ReplicaTestBase.java b/fluss-server/src/test/java/com/alibaba/fluss/server/replica/ReplicaTestBase.java index bbb78319..b88da6fd 100644 --- a/fluss-server/src/test/java/com/alibaba/fluss/server/replica/ReplicaTestBase.java +++ b/fluss-server/src/test/java/com/alibaba/fluss/server/replica/ReplicaTestBase.java @@ -23,8 +23,10 @@ import com.alibaba.fluss.config.MemorySize; import com.alibaba.fluss.fs.FsPath; import com.alibaba.fluss.metadata.PhysicalTablePath; +import com.alibaba.fluss.metadata.Schema; import com.alibaba.fluss.metadata.TableBucket; import com.alibaba.fluss.metadata.TableDescriptor; +import com.alibaba.fluss.metadata.TablePath; import com.alibaba.fluss.record.MemoryLogRecords; import com.alibaba.fluss.rpc.RpcClient; import com.alibaba.fluss.rpc.metrics.TestingClientMetricGroup; @@ -75,6 +77,7 @@ import java.util.Collections; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.concurrent.ExecutorService; @@ -229,22 +232,18 @@ private void registerTableInZkClient() throws Exception { zkClient.registerSchema(DATA2_TABLE_PATH, DATA2_SCHEMA); } - protected long registerTableInZkClient(int tieredLogLocalSegment) throws Exception { - long tableId = 200; - TableDescriptor tableDescriptor = - TableDescriptor.builder() - .schema(DATA1_SCHEMA) - .distributedBy(3) - .property( - ConfigOptions.TABLE_TIERED_LOG_LOCAL_SEGMENTS, - tieredLogLocalSegment) - .build(); + protected long registerTableInZkClient( + TablePath tablePath, Schema schema, long tableId, Map properties) + throws Exception { + TableDescriptor.Builder builder = TableDescriptor.builder().schema(schema).distributedBy(3); + properties.forEach(builder::property); + TableDescriptor tableDescriptor = builder.build(); // if exists, drop it firstly - if (zkClient.tableExist(DATA1_TABLE_PATH)) { - zkClient.deleteTable(DATA1_TABLE_PATH); + if (zkClient.tableExist(tablePath)) { + zkClient.deleteTable(tablePath); } - zkClient.registerTable(DATA1_TABLE_PATH, TableRegistration.of(tableId, tableDescriptor)); - zkClient.registerSchema(DATA1_TABLE_PATH, DATA1_SCHEMA); + zkClient.registerTable(tablePath, TableRegistration.of(tableId, tableDescriptor)); + zkClient.registerSchema(tablePath, schema); return tableId; } diff --git a/fluss-server/src/test/java/com/alibaba/fluss/server/tablet/TabletServiceITCase.java b/fluss-server/src/test/java/com/alibaba/fluss/server/tablet/TabletServiceITCase.java index af039c92..34987f70 100644 --- a/fluss-server/src/test/java/com/alibaba/fluss/server/tablet/TabletServiceITCase.java +++ b/fluss-server/src/test/java/com/alibaba/fluss/server/tablet/TabletServiceITCase.java @@ -17,7 +17,9 @@ package com.alibaba.fluss.server.tablet; import com.alibaba.fluss.exception.InvalidRequiredAcksException; +import com.alibaba.fluss.exception.KvStorageException; import com.alibaba.fluss.metadata.LogFormat; +import com.alibaba.fluss.metadata.Schema; import com.alibaba.fluss.metadata.TableBucket; import com.alibaba.fluss.metadata.TableDescriptor; import com.alibaba.fluss.metadata.TablePath; @@ -36,6 +38,10 @@ import com.alibaba.fluss.rpc.protocol.Errors; import com.alibaba.fluss.server.log.ListOffsetsParam; import com.alibaba.fluss.server.testutils.FlussClusterExtension; +import com.alibaba.fluss.types.DataField; +import com.alibaba.fluss.types.DataTypes; +import com.alibaba.fluss.types.RowType; +import com.alibaba.fluss.utils.types.Tuple2; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; @@ -43,6 +49,8 @@ import javax.annotation.Nullable; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; import java.util.List; import static com.alibaba.fluss.record.TestData.ANOTHER_DATA1; @@ -56,12 +64,14 @@ import static com.alibaba.fluss.record.TestData.DATA1_TABLE_PATH_PK; import static com.alibaba.fluss.record.TestData.DATA_1_WITH_KEY_AND_VALUE; import static com.alibaba.fluss.record.TestData.DEFAULT_SCHEMA_ID; +import static com.alibaba.fluss.server.testutils.KvTestUtils.assertIndexLookupResponse; import static com.alibaba.fluss.server.testutils.KvTestUtils.assertLookupResponse; import static com.alibaba.fluss.server.testutils.RpcMessageTestUtils.assertFetchLogResponse; import static com.alibaba.fluss.server.testutils.RpcMessageTestUtils.assertLimitScanResponse; import static com.alibaba.fluss.server.testutils.RpcMessageTestUtils.assertProduceLogResponse; import static com.alibaba.fluss.server.testutils.RpcMessageTestUtils.createTable; import static com.alibaba.fluss.server.testutils.RpcMessageTestUtils.newFetchLogRequest; +import static com.alibaba.fluss.server.testutils.RpcMessageTestUtils.newIndexLookupRequest; import static com.alibaba.fluss.server.testutils.RpcMessageTestUtils.newLimitScanRequest; import static com.alibaba.fluss.server.testutils.RpcMessageTestUtils.newListOffsetsRequest; import static com.alibaba.fluss.server.testutils.RpcMessageTestUtils.newLookupRequest; @@ -305,7 +315,7 @@ void testPutKv() throws Exception { } @Test - void testGetKey() throws Exception { + void testLookup() throws Exception { long tableId = createTable( FLUSS_CLUSTER_EXTENSION, @@ -319,7 +329,7 @@ void testGetKey() throws Exception { TabletServerGateway leaderGateWay = FLUSS_CLUSTER_EXTENSION.newTabletServerClientForNode(leader); - // first get key without in table, key = 1. + // first lookup without in table, key = 1. Object[] key1 = DATA_1_WITH_KEY_AND_VALUE.get(0).f0; KeyEncoder keyEncoder = new KeyEncoder(DATA1_ROW_TYPE, new int[] {0}); byte[] key1Bytes = keyEncoder.encode(row(DATA1_KEY_TYPE, key1)); @@ -334,7 +344,7 @@ void testGetKey() throws Exception { tableId, 0, 1, genKvRecordBatch(DATA_1_WITH_KEY_AND_VALUE))) .get()); - // second get key in table, key = 1, value = 1, "a1". + // second lookup in table, key = 1, value = 1, "a1". Object[] value1 = DATA_1_WITH_KEY_AND_VALUE.get(3).f1; byte[] value1Bytes = ValueEncoder.encodeValue(DEFAULT_SCHEMA_ID, compactedRow(DATA1_ROW_TYPE, value1)); @@ -359,7 +369,7 @@ void testGetKey() throws Exception { Errors.UNKNOWN_TABLE_OR_BUCKET_EXCEPTION, "Unknown table or bucket: TableBucket{tableId=10005, bucket=6}"); - // Get key from a non-pk table. + // Lookup from a non-pk table. long logTableId = createTable( FLUSS_CLUSTER_EXTENSION, @@ -383,6 +393,162 @@ void testGetKey() throws Exception { "the primary key table not exists for TableBucket"); } + @Test + void testIndexLookup() throws Exception { + TablePath tablePath = TablePath.of("test_db_1", "test_index_lookup_t1"); + Schema schema = + Schema.newBuilder() + .column("a", DataTypes.INT()) + .column("b", DataTypes.STRING()) + .column("c", DataTypes.BIGINT()) + .column("d", DataTypes.STRING()) + .primaryKey("a", "b", "c") + .build(); + RowType rowType = schema.toRowType(); + RowType primaryKeyType = + DataTypes.ROW( + new DataField("a", DataTypes.INT()), + new DataField("b", DataTypes.STRING()), + new DataField("c", DataTypes.BIGINT())); + RowType indexKeyType = + DataTypes.ROW( + new DataField("a", DataTypes.INT()), + new DataField("b", DataTypes.STRING())); + + TableDescriptor descriptor = + TableDescriptor.builder() + .schema(schema) + .property("table.index.key", "idx0=a,b") + .build(); + long tableId = createTable(FLUSS_CLUSTER_EXTENSION, tablePath, descriptor); + TableBucket tb = new TableBucket(tableId, 0); + + FLUSS_CLUSTER_EXTENSION.waitUtilAllReplicaReady(tb); + + int leader = FLUSS_CLUSTER_EXTENSION.waitAndGetLeader(tb); + TabletServerGateway leaderGateWay = + FLUSS_CLUSTER_EXTENSION.newTabletServerClientForNode(leader); + // first index lookup without in table, index key = (1, "a"). + Object[] indexKey1 = new Object[] {1, "a"}; + KeyEncoder keyEncoder = new KeyEncoder(rowType, new int[] {0, 1}); + byte[] indexKey1Bytes = keyEncoder.encode(row(indexKeyType, indexKey1)); + assertIndexLookupResponse( + leaderGateWay + .indexLookup( + newIndexLookupRequest( + tableId, 0, Collections.singletonList(indexKey1Bytes))) + .get(), + Collections.singletonList(Collections.emptyList())); + + // send one batch kv. + assertPutKvResponse( + leaderGateWay + .putKv( + newPutKvRequest( + tableId, + 0, + 1, + genKvRecordBatch( + primaryKeyType, + rowType, + Arrays.asList( + Tuple2.of( + new Object[] {1, "a", 1L}, + new Object[] { + 1, "a", 1L, "value1" + }), + Tuple2.of( + new Object[] {1, "a", 2L}, + new Object[] { + 1, "a", 2L, "value2" + }), + Tuple2.of( + new Object[] {1, "a", 3L}, + new Object[] { + 1, "a", 3L, "value3" + }), + Tuple2.of( + new Object[] {2, "a", 4L}, + new Object[] { + 2, "a", 4L, "value4" + }))))) + .get()); + + // second index lookup in table, index key = (1, "a"). + assertIndexLookupResponse( + leaderGateWay + .indexLookup( + newIndexLookupRequest( + tableId, 0, Collections.singletonList(indexKey1Bytes))) + .get(), + Collections.singletonList( + Arrays.asList( + ValueEncoder.encodeValue( + DEFAULT_SCHEMA_ID, + compactedRow(rowType, new Object[] {1, "a", 1L, "value1"})), + ValueEncoder.encodeValue( + DEFAULT_SCHEMA_ID, + compactedRow(rowType, new Object[] {1, "a", 2L, "value2"})), + ValueEncoder.encodeValue( + DEFAULT_SCHEMA_ID, + compactedRow( + rowType, new Object[] {1, "a", 3L, "value3"}))))); + + // third index lookup in table for multi index key, index key = (1, "a") and (2, "a"). + Object[] indexKey2 = new Object[] {2, "a"}; + byte[] indexKey2Bytes = keyEncoder.encode(row(indexKeyType, indexKey2)); + assertIndexLookupResponse( + leaderGateWay + .indexLookup( + newIndexLookupRequest( + tableId, 0, Arrays.asList(indexKey1Bytes, indexKey2Bytes))) + .get(), + Arrays.asList( + Arrays.asList( + ValueEncoder.encodeValue( + DEFAULT_SCHEMA_ID, + compactedRow(rowType, new Object[] {1, "a", 1L, "value1"})), + ValueEncoder.encodeValue( + DEFAULT_SCHEMA_ID, + compactedRow(rowType, new Object[] {1, "a", 2L, "value2"})), + ValueEncoder.encodeValue( + DEFAULT_SCHEMA_ID, + compactedRow( + rowType, new Object[] {1, "a", 3L, "value3"}))), + Collections.singletonList( + ValueEncoder.encodeValue( + DEFAULT_SCHEMA_ID, + compactedRow( + rowType, new Object[] {2, "a", 4L, "value4"}))))); + + // index lookup an unsupported index table. + long tableId2 = + createTable( + FLUSS_CLUSTER_EXTENSION, + DATA1_TABLE_PATH_PK, + DATA1_TABLE_INFO_PK.getTableDescriptor()); + tb = new TableBucket(tableId2, 0); + FLUSS_CLUSTER_EXTENSION.waitUtilAllReplicaReady(tb); + leader = FLUSS_CLUSTER_EXTENSION.waitAndGetLeader(tb); + TabletServerGateway leaderGateWay2 = + FLUSS_CLUSTER_EXTENSION.newTabletServerClientForNode(leader); + assertThatThrownBy( + () -> + leaderGateWay2 + .indexLookup( + newIndexLookupRequest( + tableId2, + 0, + Collections.singletonList(indexKey1Bytes))) + .get()) + .cause() + .isInstanceOf(KvStorageException.class) + .hasMessageContaining( + "Table bucket TableBucket{tableId=" + + tableId2 + + ", bucket=0} does not support index lookup"); + } + @Test void testLimitScanPrimaryKeyTable() throws Exception { long tableId = diff --git a/fluss-server/src/test/java/com/alibaba/fluss/server/tablet/TestTabletServerGateway.java b/fluss-server/src/test/java/com/alibaba/fluss/server/tablet/TestTabletServerGateway.java index cd9df1d5..81911810 100644 --- a/fluss-server/src/test/java/com/alibaba/fluss/server/tablet/TestTabletServerGateway.java +++ b/fluss-server/src/test/java/com/alibaba/fluss/server/tablet/TestTabletServerGateway.java @@ -41,6 +41,8 @@ import com.alibaba.fluss.rpc.messages.GetTableResponse; import com.alibaba.fluss.rpc.messages.GetTableSchemaRequest; import com.alibaba.fluss.rpc.messages.GetTableSchemaResponse; +import com.alibaba.fluss.rpc.messages.IndexLookupRequest; +import com.alibaba.fluss.rpc.messages.IndexLookupResponse; import com.alibaba.fluss.rpc.messages.InitWriterRequest; import com.alibaba.fluss.rpc.messages.InitWriterResponse; import com.alibaba.fluss.rpc.messages.LimitScanRequest; @@ -183,6 +185,11 @@ public CompletableFuture lookup(LookupRequest request) { return null; } + @Override + public CompletableFuture indexLookup(IndexLookupRequest request) { + throw new UnsupportedOperationException(); + } + @Override public CompletableFuture limitScan(LimitScanRequest request) { return null; diff --git a/fluss-server/src/test/java/com/alibaba/fluss/server/testutils/KvTestUtils.java b/fluss-server/src/test/java/com/alibaba/fluss/server/testutils/KvTestUtils.java index 4d611e95..088da43a 100644 --- a/fluss-server/src/test/java/com/alibaba/fluss/server/testutils/KvTestUtils.java +++ b/fluss-server/src/test/java/com/alibaba/fluss/server/testutils/KvTestUtils.java @@ -19,7 +19,10 @@ import com.alibaba.fluss.config.Configuration; import com.alibaba.fluss.fs.FsPath; import com.alibaba.fluss.metadata.TableBucket; +import com.alibaba.fluss.rpc.messages.IndexLookupResponse; import com.alibaba.fluss.rpc.messages.LookupResponse; +import com.alibaba.fluss.rpc.messages.PbIndexLookupRespForBucket; +import com.alibaba.fluss.rpc.messages.PbIndexLookupRespForKey; import com.alibaba.fluss.rpc.messages.PbLookupRespForBucket; import com.alibaba.fluss.rpc.messages.PbValue; import com.alibaba.fluss.server.kv.rocksdb.RocksDBKv; @@ -172,4 +175,23 @@ public static void assertLookupResponse( byte[] lookupValue = pbValue.hasValues() ? pbValue.getValues() : null; assertThat(lookupValue).isEqualTo(expectedValue); } + + public static void assertIndexLookupResponse( + IndexLookupResponse indexLookupResponse, List> expectedValues) { + checkArgument(indexLookupResponse.getBucketsRespsCount() == 1); + PbIndexLookupRespForBucket pbIndexLookupRespForBucket = + indexLookupResponse.getBucketsRespAt(0); + checkArgument(pbIndexLookupRespForBucket.getKeysRespsCount() == expectedValues.size()); + for (int i = 0; i < expectedValues.size(); i++) { + PbIndexLookupRespForKey pbIndexLookupRespForKey = + pbIndexLookupRespForBucket.getKeysRespAt(i); + List bytesResultForOneIndexKey = expectedValues.get(i); + checkArgument( + pbIndexLookupRespForKey.getValuesCount() == bytesResultForOneIndexKey.size()); + for (int j = 0; j < bytesResultForOneIndexKey.size(); j++) { + assertThat(pbIndexLookupRespForKey.getValueAt(j)) + .isEqualTo(bytesResultForOneIndexKey.get(j)); + } + } + } } diff --git a/fluss-server/src/test/java/com/alibaba/fluss/server/testutils/RpcMessageTestUtils.java b/fluss-server/src/test/java/com/alibaba/fluss/server/testutils/RpcMessageTestUtils.java index 435eefe4..bdfb68fe 100644 --- a/fluss-server/src/test/java/com/alibaba/fluss/server/testutils/RpcMessageTestUtils.java +++ b/fluss-server/src/test/java/com/alibaba/fluss/server/testutils/RpcMessageTestUtils.java @@ -34,6 +34,7 @@ import com.alibaba.fluss.rpc.messages.FetchLogResponse; import com.alibaba.fluss.rpc.messages.GetTableRequest; import com.alibaba.fluss.rpc.messages.GetTableResponse; +import com.alibaba.fluss.rpc.messages.IndexLookupRequest; import com.alibaba.fluss.rpc.messages.LimitScanRequest; import com.alibaba.fluss.rpc.messages.LimitScanResponse; import com.alibaba.fluss.rpc.messages.ListOffsetsRequest; @@ -44,6 +45,7 @@ import com.alibaba.fluss.rpc.messages.PbFetchLogReqForTable; import com.alibaba.fluss.rpc.messages.PbFetchLogRespForBucket; import com.alibaba.fluss.rpc.messages.PbFetchLogRespForTable; +import com.alibaba.fluss.rpc.messages.PbIndexLookupReqForBucket; import com.alibaba.fluss.rpc.messages.PbLookupReqForBucket; import com.alibaba.fluss.rpc.messages.PbProduceLogReqForBucket; import com.alibaba.fluss.rpc.messages.PbProduceLogRespForBucket; @@ -215,6 +217,17 @@ public static LookupRequest newLookupRequest(long tableId, int bucketId, byte[] return lookupRequest; } + public static IndexLookupRequest newIndexLookupRequest( + long tableId, int bucketId, List indexKeys) { + IndexLookupRequest indexLookupRequest = new IndexLookupRequest().setTableId(tableId); + PbIndexLookupReqForBucket pbIndexLookupReqForBucket = indexLookupRequest.addBucketsReq(); + pbIndexLookupReqForBucket.setBucketId(bucketId); + for (byte[] indexKey : indexKeys) { + pbIndexLookupReqForBucket.addKey(indexKey); + } + return indexLookupRequest; + } + public static LimitScanRequest newLimitScanRequest(long tableId, int bucketId, int limit) { return new LimitScanRequest().setTableId(tableId).setBucketId(bucketId).setLimit(limit); } diff --git a/website/docs/maintenance/configuration.md b/website/docs/maintenance/configuration.md index 7f3ad146..5cb23663 100644 --- a/website/docs/maintenance/configuration.md +++ b/website/docs/maintenance/configuration.md @@ -216,9 +216,10 @@ Currently, we don't support alter table configuration by Flink. This will be sup | client.request-timeout | Duration | 30s | The timeout for a request to complete. If user set the write ack to -1, this timeout is the max time that delayed write try to complete. The default setting is 30 seconds. | | client.scanner.log.check-crc | Boolean | true | Automatically check the CRC3 of the read records for LogScanner. This ensures no on-the-wire or on-disk corruption to the messages occurred. This check adds some overhead, so it may be disabled in cases seeking extreme performance. | | client.scanner.log.max-poll-records | Integer | 500 | The maximum number of records returned in a single call to poll() for LogScanner. Note that this config doesn't impact the underlying fetching behavior. The Scanner will cache the records from each fetch request and returns them incrementally from each poll. | -| client.lookup.queue-size | Integer | 256 | The maximum number of pending lookup operations. | +| client.lookup.queue-size | Integer | 25600 | The maximum number of pending lookup operations. | | client.lookup.max-batch-size | Integer | 128 | The maximum batch size of merging lookup operations to one lookup request. | | client.lookup.max-inflight-requests | Integer | 128 | The maximum number of unacknowledged lookup requests for lookup operations. | +| client.lookup.batch-timeout | Duration | 100ms | The maximum time to wait for the lookup batch to full, if this timeout is reached, the lookup batch will be closed to send. | | client.scanner.remote-log.prefetch-num | Integer | 4 | The number of remote log segments to keep in local temp file for LogScanner, which download from remote storage. The default setting is 4. | | client.scanner.io.tmpdir | String | System.getProperty("java.io.tmpdir") + "/fluss" | Local directory that is used by client for storing the data files (like kv snapshot, log segment files) to read temporarily | | client.remote-file.download-thread-num | Integer | 3 | The number of threads the client uses to download remote files. | diff --git a/website/docs/maintenance/monitor-metrics.md b/website/docs/maintenance/monitor-metrics.md index 3eeb93ff..27c239d1 100644 --- a/website/docs/maintenance/monitor-metrics.md +++ b/website/docs/maintenance/monitor-metrics.md @@ -434,8 +434,8 @@ Some metrics might not be exposed when using other JVM implementations (e.g. IBM - tabletServer - table + tabletServer + table messagesInPerSecond The number of messages written per second to this table Meter @@ -490,6 +490,26 @@ Some metrics might not be exposed when using other JVM implementations (e.g. IBM The number of failed lookup requests to lookup value by key from this table per second. Meter + + totalLimitScanRequestsPerSecond + The number of limit scn requests to scan records with limit from this table per second. + Meter + + + failedLimitScanRequestsPerSecond + The number of failed limit scn requests to scan records with limit from this table per second. + Meter + + + totalIndexLookupRequestsPerSecond + The number of index lookup requests to index lookup value by key from this table per second. + Meter + + + failedIndexLookupRequestsPerSecond + The number of failed index lookup requests to index lookup value by key from this table per second. + Meter + remoteLogCopyBytesPerSecond The bytes of log data copied to remote per second.