Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[kv] Support index lookup for primary key table #222

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

swuferhong
Copy link
Collaborator

Purpose

Linked issue: #65

Index lookup is a feature that exposes lookup capabilities built on top of secondary indexes. By using secondary indexes, the required data can be located quickly, which can be utilized in conjunction with Flink to implement delta joins.
The purpose of this PR is to provide index lookup for kv tables. The implementation approach is to define the primary key of the kv storage as "secondary keys + primary key", and set the bucket key to the secondary keys. This way, when looking up data through the secondary keys, the corresponding bucket and server can be quickly identified, providing efficient point query capabilities.

Tests

API and Format

Documentation

@wuchong wuchong linked an issue Dec 18, 2024 that may be closed by this pull request
2 tasks
@swuferhong swuferhong force-pushed the index-lookup-1216 branch 2 times, most recently from 572a477 to b95540f Compare December 20, 2024 09:37
Copy link
Member

@wuchong wuchong left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think our current index is not a general index, it is just a prefix of primary key index. So, actually, it is just a prefix scan/lookup for the prefix of primary key (the prefix should include bucket key). I don't want to call this indexLookup because it occupies the API for future possible index (index on arbitrary columns).

How about changing the API into prefixLookup? The parameter key should be the prefix of primary key and must include bucket key. For DDL, we don't need to introduce new options table.index.keys, we can just continue to use bucket.key.

As we don't have force checks for bucket key is a prefix of primary key. We have to add some best practices for Delta Join cases in the future documentation. For tables used for DeltaJoin queries, the best practice is putting columns of bucket key before other columns in the definition of primary key. Otherwise, the prefixLookup doesn't work when the parameter key only contains bucket join. For example, given a primary key table orders with schema user_id, item_id, order_id, col1, col2, col3 (order_id can be used as primary key as it is unique). If the join key is (user_id, item_id), the primary key of the table must be set to user_id, item_id, order_id and bucket key to user_id, item_id. The prefixLookup will not work if the primary key is set to order_id, user_id, item_id, because the join key is not a prefix of primary key.

@@ -490,6 +490,26 @@ Some metrics might not be exposed when using other JVM implementations (e.g. IBM
<td>The number of failed lookup requests to lookup value by key from this table per second.</td>
<td>Meter</td>
</tr>
<tr>
<td>totalLimitScanRequestsPerSecond</td>
<td>The number of limit scn requests to scan records with limit from this table per second.</td>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

scn -> scan?

</tr>
<tr>
<td>failedLimitScanRequestsPerSecond</td>
<td>The number of failed limit scn requests to scan records with limit from this table per second.</td>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

return key;
}

public abstract LookupType lookupType();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Class 'LookupType' is exposed outside its defined visibility scope

Make LookupType public.


/** An abstract lookup batch. */
@Internal
public abstract class AbstractLookupBatch {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is only used by IndexLookupBatch, we don't need this abstraction.


public abstract LookupType lookupType();

public abstract CompletableFuture<List<byte[]>> future();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is strange and inefficient that Lookup returns a List of result. We can introduce a generic type T to AbstractLookup and allows Lookup and IndexLookup to define their own return type.

public abstract class AbstractLookup<T> {

    ...

    public abstract CompletableFuture<T> future();
}

public class Lookup extends AbstractLookup<byte[]> {
  ...
}

public class IndexLookup extends AbstractLookup<List<byte[]>> {
  ...
}

for (Map.Entry<TableBucket, List<byte[]>> entry : entriesPerBucket.entrySet()) {
TableBucket tb = entry.getKey();
PbIndexLookupRespForBucket respForBucket = new PbIndexLookupRespForBucket();
respForBucket.setBucketId(tb.getBucket());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

set partition id as well .

@@ -456,6 +460,50 @@ public void multiLookupValues(
responseCallback.accept(lookupResultForBucketMap);
}

/** Lookup by index keys on kv store. */
public void indexLookup(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indexLookup -> indexLookups

and rename multiLookupValues to lookups

TableBucket tb = new TableBucket(tableId, 0);
makeKvTableAsLeader(tb.getBucket());

// TODO
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missing implementation


public static void assertIndexLookupResponse(
IndexLookupResponse indexLookupResponse, List<List<byte[]>> expectedValues) {
checkArgument(indexLookupResponse.getBucketsRespsCount() == 1);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do not use checkArgument for assertion. Use assertThat instead!

Comment on lines +452 to +472
primaryKeyType,
rowType,
Arrays.asList(
Tuple2.of(
new Object[] {1, "a", 1L},
new Object[] {
1, "a", 1L, "value1"
}),
Tuple2.of(
new Object[] {1, "a", 2L},
new Object[] {
1, "a", 2L, "value2"
}),
Tuple2.of(
new Object[] {1, "a", 3L},
new Object[] {
1, "a", 3L, "value3"
}),
Tuple2.of(
new Object[] {2, "a", 4L},
new Object[] {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

code is not readable, reformat it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Feature] Fluss support index lookup for primary key table
2 participants