-
Notifications
You must be signed in to change notification settings - Fork 14k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-18026: migrate KTableSource to use ProcesserSupplier#stores #17903
base: trunk
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Gentle reminder: it's "KAFKA-18026: ..." not "[KAFKA-18026] " when naming your PRs in AK 😉
streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
Outdated
Show resolved
Hide resolved
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java
Show resolved
Hide resolved
public MaterializedInternal(final Materialized<K, V, S> materialized, | ||
final InternalNameProvider nameProvider, | ||
final String generatedStorePrefix, | ||
final boolean forceQueryable) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice refactor, this is way cleaner/easy to follow than randomly overriding the queryableStoreName for things like global stores
topologyBuilder.connectSourceStoreAndTopic(storeFactory.name(), topicName); | ||
tableSource.stores().forEach(store -> { | ||
store.withLoggingDisabled(); | ||
topologyBuilder.connectSourceStoreAndTopic(store.name(), topicName); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Btw I noticed that this is actually called inside #addStateStore, at least as long as the parent processor names are passed in, which should always be the case except for with global tables/stores, and when adding stores to a Topology directly via #addStateStore and connecting them manually using #connectProcessorToStateStores (ie the alternative to implementing ProcessorSupplier#stores)
In both those cases #connectSourceStoreAndTopic is called directly, so AFAICT there's no reason to be invoking topologyBuilder.connectSourceStoreAndTopic
all over the place including right here.
Granted, it's idempotent so calling it again is fine, but it makes the already-messy topology building code even more confusing. Might be nice to remove all these extraneous calls (can be done in a separate PR so we can make sure it doesn't break anything)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
😅 I actually had removed this accidentally and a test failed, so I added it back. Let's do it in a follow up, may have just been a defunct test
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
might've been a mock, but I'm all good with following up on this
streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreFactory.java
Outdated
Show resolved
Hide resolved
btw @agavra I merged the first KIP-1112 PR so can you add a StreamsBuilderTest to verify the processor wrapping for this operator? (also there's a question for you about method names in that PR) |
@agavra also I enabled the PR build (apparently you now need a committer to approve this :/ ) and it looks like the spotless check failed due to import ordering . If you look at the output it should a command you can run to fix it automatically so you don't have to rearrange everything yourself |
b0a3932
to
65fd4e5
Compare
@ableegoldman I added the tests! I also changed them to record the processor names, not just the count, to make sure we are actually counting the processors we expect to be counting 😅 |
Description
This PR is part of the implementation for KIP-1112 (KAFKA-18026). In order to have DSL operators be properly wrapped by the interface suggestion in 1112, we need to make sure they all use the
ConnectedStoreProvider#stores
method to connect stores instead of manually callingaddStateStore
.Review Guide
KTableSource
which now implements thestores()
methodforceQueryable
intoMaterializedInternal
so that we directly indicate that it should be queryable/materialized instead of relying on a side effect of setting the store nameStoreDelegatingProcessorSupplier
is used only in the case of GlobalKTables because the stores are expected to be passed in separately in the public API -- this PR doesn't change thatFactoryWrappingStoreBuilder
is used so that already created store factories can be typecast asStoreBuilder
s.NOTE: For point (3), I think this might be a bug in the current implementation that we may want to change in 4.0 since it's backwards incompatible. If you pass in a
ProcessorSupplier
toaddGlobalTable
, thestores()
method on that will be ignored.Testing
This is a refactor only, there is no new behaviors.
Committer Checklist (excluded from commit message)