-
Notifications
You must be signed in to change notification settings - Fork 205
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[kv] Support index lookup for primary key table #222
base: main
Are you sure you want to change the base?
Conversation
572a477
to
b95540f
Compare
b95540f
to
90f6295
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think our current index is not a general index, it is just a prefix of primary key index. So, actually, it is just a prefix scan/lookup for the prefix of primary key (the prefix should include bucket key). I don't want to call this indexLookup
because it occupies the API for future possible index (index on arbitrary columns).
How about changing the API into prefixLookup
? The parameter key
should be the prefix of primary key and must include bucket key. For DDL, we don't need to introduce new options table.index.keys
, we can just continue to use bucket.key
.
As we don't have force checks for bucket key is a prefix of primary key. We have to add some best practices for Delta Join cases in the future documentation. For tables used for DeltaJoin queries, the best practice is putting columns of bucket key before other columns in the definition of primary key. Otherwise, the prefixLookup
doesn't work when the parameter key only contains bucket join. For example, given a primary key table orders
with schema user_id, item_id, order_id, col1, col2, col3
(order_id
can be used as primary key as it is unique). If the join key is (user_id, item_id)
, the primary key of the table must be set to user_id, item_id, order_id
and bucket key to user_id, item_id
. The prefixLookup
will not work if the primary key is set to order_id, user_id, item_id
, because the join key is not a prefix of primary key.
@@ -490,6 +490,26 @@ Some metrics might not be exposed when using other JVM implementations (e.g. IBM | |||
<td>The number of failed lookup requests to lookup value by key from this table per second.</td> | |||
<td>Meter</td> | |||
</tr> | |||
<tr> | |||
<td>totalLimitScanRequestsPerSecond</td> | |||
<td>The number of limit scn requests to scan records with limit from this table per second.</td> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
scn -> scan?
</tr> | ||
<tr> | ||
<td>failedLimitScanRequestsPerSecond</td> | ||
<td>The number of failed limit scn requests to scan records with limit from this table per second.</td> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
return key; | ||
} | ||
|
||
public abstract LookupType lookupType(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Class 'LookupType' is exposed outside its defined visibility scope
Make LookupType
public.
|
||
/** An abstract lookup batch. */ | ||
@Internal | ||
public abstract class AbstractLookupBatch { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is only used by IndexLookupBatch
, we don't need this abstraction.
|
||
public abstract LookupType lookupType(); | ||
|
||
public abstract CompletableFuture<List<byte[]>> future(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is strange and inefficient that Lookup
returns a List of result. We can introduce a generic type T
to AbstractLookup
and allows Lookup
and IndexLookup
to define their own return type.
public abstract class AbstractLookup<T> {
...
public abstract CompletableFuture<T> future();
}
public class Lookup extends AbstractLookup<byte[]> {
...
}
public class IndexLookup extends AbstractLookup<List<byte[]>> {
...
}
for (Map.Entry<TableBucket, List<byte[]>> entry : entriesPerBucket.entrySet()) { | ||
TableBucket tb = entry.getKey(); | ||
PbIndexLookupRespForBucket respForBucket = new PbIndexLookupRespForBucket(); | ||
respForBucket.setBucketId(tb.getBucket()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
set partition id as well .
@@ -456,6 +460,50 @@ public void multiLookupValues( | |||
responseCallback.accept(lookupResultForBucketMap); | |||
} | |||
|
|||
/** Lookup by index keys on kv store. */ | |||
public void indexLookup( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
indexLookup
-> indexLookups
and rename multiLookupValues
to lookups
TableBucket tb = new TableBucket(tableId, 0); | ||
makeKvTableAsLeader(tb.getBucket()); | ||
|
||
// TODO |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
missing implementation
|
||
public static void assertIndexLookupResponse( | ||
IndexLookupResponse indexLookupResponse, List<List<byte[]>> expectedValues) { | ||
checkArgument(indexLookupResponse.getBucketsRespsCount() == 1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do not use checkArgument
for assertion. Use assertThat
instead!
primaryKeyType, | ||
rowType, | ||
Arrays.asList( | ||
Tuple2.of( | ||
new Object[] {1, "a", 1L}, | ||
new Object[] { | ||
1, "a", 1L, "value1" | ||
}), | ||
Tuple2.of( | ||
new Object[] {1, "a", 2L}, | ||
new Object[] { | ||
1, "a", 2L, "value2" | ||
}), | ||
Tuple2.of( | ||
new Object[] {1, "a", 3L}, | ||
new Object[] { | ||
1, "a", 3L, "value3" | ||
}), | ||
Tuple2.of( | ||
new Object[] {2, "a", 4L}, | ||
new Object[] { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
code is not readable, reformat it.
Purpose
Linked issue: #65
Index lookup is a feature that exposes lookup capabilities built on top of secondary indexes. By using secondary indexes, the required data can be located quickly, which can be utilized in conjunction with Flink to implement delta joins.
The purpose of this PR is to provide index lookup for kv tables. The implementation approach is to define the primary key of the kv storage as "secondary keys + primary key", and set the bucket key to the secondary keys. This way, when looking up data through the secondary keys, the corresponding bucket and server can be quickly identified, providing efficient point query capabilities.
Tests
API and Format
Documentation