Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-18026: migrate KTableSource to use ProcesserSupplier#stores #17903

Open
wants to merge 3 commits into
base: trunk
Choose a base branch
from

Conversation

agavra
Copy link
Contributor

@agavra agavra commented Nov 21, 2024

Description

This PR is part of the implementation for KIP-1112 (KAFKA-18026). In order to have DSL operators be properly wrapped by the interface suggestion in 1112, we need to make sure they all use the ConnectedStoreProvider#stores method to connect stores instead of manually calling addStateStore.

Review Guide

  1. Checkout KTableSource which now implements the stores() method
  2. GlobalKTables no longer hack materialized by setting the source name on the builder, instead we pipe a forceQueryable into MaterializedInternal so that we directly indicate that it should be queryable/materialized instead of relying on a side effect of setting the store name
  3. StoreDelegatingProcessorSupplier is used only in the case of GlobalKTables because the stores are expected to be passed in separately in the public API -- this PR doesn't change that
  4. FactoryWrappingStoreBuilder is used so that already created store factories can be typecast as StoreBuilders.

NOTE: For point (3), I think this might be a bug in the current implementation that we may want to change in 4.0 since it's backwards incompatible. If you pass in a ProcessorSupplier to addGlobalTable, the stores() method on that will be ignored.

Testing

This is a refactor only, there is no new behaviors.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@ableegoldman ableegoldman changed the title [KAFKA-18026] migrate KTableSource to use ProcesserSupplier#stores KAFKA-18026: migrate KTableSource to use ProcesserSupplier#stores Nov 22, 2024
Copy link
Contributor

@ableegoldman ableegoldman left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gentle reminder: it's "KAFKA-18026: ..." not "[KAFKA-18026] " when naming your PRs in AK 😉

public MaterializedInternal(final Materialized<K, V, S> materialized,
final InternalNameProvider nameProvider,
final String generatedStorePrefix,
final boolean forceQueryable) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice refactor, this is way cleaner/easy to follow than randomly overriding the queryableStoreName for things like global stores

topologyBuilder.connectSourceStoreAndTopic(storeFactory.name(), topicName);
tableSource.stores().forEach(store -> {
store.withLoggingDisabled();
topologyBuilder.connectSourceStoreAndTopic(store.name(), topicName);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Btw I noticed that this is actually called inside #addStateStore, at least as long as the parent processor names are passed in, which should always be the case except for with global tables/stores, and when adding stores to a Topology directly via #addStateStore and connecting them manually using #connectProcessorToStateStores (ie the alternative to implementing ProcessorSupplier#stores)

In both those cases #connectSourceStoreAndTopic is called directly, so AFAICT there's no reason to be invoking topologyBuilder.connectSourceStoreAndTopic all over the place including right here.

Granted, it's idempotent so calling it again is fine, but it makes the already-messy topology building code even more confusing. Might be nice to remove all these extraneous calls (can be done in a separate PR so we can make sure it doesn't break anything)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

😅 I actually had removed this accidentally and a test failed, so I added it back. Let's do it in a follow up, may have just been a defunct test

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

might've been a mock, but I'm all good with following up on this

@ableegoldman
Copy link
Contributor

btw @agavra I merged the first KIP-1112 PR so can you add a StreamsBuilderTest to verify the processor wrapping for this operator?

(also there's a question for you about method names in that PR)

@ableegoldman
Copy link
Contributor

@agavra also I enabled the PR build (apparently you now need a committer to approve this :/ ) and it looks like the spotless check failed due to import ordering . If you look at the output it should a command you can run to fix it automatically so you don't have to rearrange everything yourself

@agavra agavra force-pushed the ktablesource_supplier_stores branch from b0a3932 to 65fd4e5 Compare November 25, 2024 19:18
@agavra
Copy link
Contributor Author

agavra commented Nov 25, 2024

@ableegoldman I added the tests! I also changed them to record the processor names, not just the count, to make sure we are actually counting the processors we expect to be counting 😅

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants