Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-18026: KIP-1112, ProcessorWrapper API with PAPI and partial DSL implementation #17892

Merged
merged 12 commits into from
Nov 24, 2024

Conversation

ableegoldman
Copy link
Contributor

@ableegoldman ableegoldman commented Nov 21, 2024

This PR includes the API for KIP-1112 and a partial implementation, which wraps any processors added through the PAPI and the DSL processors that are written to the topology through the ProcessorParameters#addProcessorTo method.

Further PRs will complete the implementation by converting the remaining DSL operators to using the #addProcessorTo method, and future-proof the processor writing mechanism to prevent new DSL operators from being implemented incorrectly/without the wrapper

@ableegoldman ableegoldman changed the title KAFKA-18026: KIP-1112, ProcessorWrapper API and PAPI implementation KAFKA-18026: KIP-1112, ProcessorWrapper API with PAPI and partial DSL implementation Nov 21, 2024
@@ -714,8 +714,10 @@ public org.apache.kafka.streams.processor.api.Processor<Object, Object, Object,
public synchronized <KIn, VIn, KOut, VOut> Topology addProcessor(final String name,
final ProcessorSupplier<KIn, VIn, KOut, VOut> supplier,
final String... parentNames) {
internalTopologyBuilder.addProcessor(name, supplier, parentNames);
final Set<StoreBuilder<?>> stores = supplier.stores();
final ProcessorSupplier<KIn, VIn, KOut, VOut> wrapped = internalTopologyBuilder.wrapProcessorSupplier(name, supplier);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is the entire implementation for PAPI apps

dslStoreSuppliers = Utils.newInstance(
config.getClass(StreamsConfig.DSL_STORE_SUPPLIERS_CLASS_CONFIG),
DslStoreSuppliers.class);
dslStoreSuppliers = config.getConfiguredInstance(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

side fix: we weren't configuring this with the app configs

for (final StoreBuilder<?> storeBuilder : processorSupplier.stores()) {
ApiUtils.checkSupplier(processorSupplier);

final ProcessorSupplier<KIn, VIn, KOut, VOut> wrapped =
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is how the DSL implementation will work -- processors are wrapped when building the DSL into a Topology, specifically in this method where they are added to the InternalTopologyBuilder.

Note this PR is only a partial implementation since not all DSL operators go through this class, some just write their processors to the InternalTopologyBuilder directly. Followup PRs will tackle converting these operators and enforcing that any new ones added in the future will have to go through this method and be wrapped

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

curious why we do this here instead of inside topologyBuilder.addProcessor

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, good question! that would definitely be simpler. however, to maximize the utility of the new wrapper, we want to allow users to wrap not only the Processor returned by the ProcessorSupplier#get method, but also the StoreBuilders returned from ProcessorSupplier#stores. Unfortunately, the stores are currently extracted and connected outside of the #addProcessor method, and since they aren't attached (via #addStateStore) until after #addProcessor is called, we can't even look up and wrap the stores for a processor from within #addProcessor

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ultimately it might make sense to move the extraction of the stores from a ProcessorSupplier into a single InternalTopologyBuilder method (eg #addStatefulProcessor) that calls both #addProcessor and #addStateStore, and do the wrapping there. This will also help future-proof new DSL operators from being implemented incorrectly and missing the wrapper step.

But I think it makes sense to wait until we've completed all the followup PRs for the remaining DSL operators, in case there are any weird edge cases we haven't thought of/seen yet. Then we can do one final PR to clean up and future-proof the wrapping process

Copy link
Contributor

@agavra agavra left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! (non-binding, of course)

* topology builders via the {@link org.apache.kafka.streams.StreamsBuilder#StreamsBuilder(TopologyConfig) StreamsBuilder(TopologyConfig)} method
* for DSL applications, or when creating a PAPI topology via the {@link Topology#Topology(TopologyConfig)} constructor.
* <p>
* Note that some configs that are only defined in the TopologyConfig and not in the StreamsConfig, such as the {@code processor.wrapper.class},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a possibility of having this be available (and respected) in StreamsConfig? People tend to forget to properly pipe things through TopolgoyConfig, though having it only there makes it a bit more obvious where it needs to be passed in.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, good catch -- this comment is actually out of date, and I did end up having to add the config to StreamsConfig as well.

I definitely share your concerns about people forgetting to properly pipe things in to the Topology/StreamsBuilder through TopologyConfig though. I recently filed KAFKA-18053 for exactly this problem, with one subtask covering a way to improve the situation without a KIP by detecting and notifying users of a misconfiguration. Ideally we can include this fix in 4.0 as well

topologyConfigs.originals()
);
} catch (final Exception e) {
log.error("Unable to instantiate ProcessorWrapper from value of config {}. "
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: does it make sense to also include the error in the logged message in case the user swallows this up higher in the stack somewhere?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point

for (final StoreBuilder<?> storeBuilder : processorSupplier.stores()) {
ApiUtils.checkSupplier(processorSupplier);

final ProcessorSupplier<KIn, VIn, KOut, VOut> wrapped =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

curious why we do this here instead of inside topologyBuilder.addProcessor

Copy link
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If all the unit tests are already in place, could you update the description as well to remove the comment that it is still WIP?

* <p>
* Note that some configs, such as the {@code processor.wrapper.class} config, can only take effect while the
* topology is being built, which means they have to be passed in as a TopologyConfig to the
* {@link Topology#Topology(TopologyConfig)} constructor (PAPI) or the
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for adding the docs here, would be helpful to users.

} catch (final Exception e) {
final String errorMessage = String.format(
"Unable to instantiate ProcessorWrapper from value of config %s. Please provide a valid class "
+ "that implements the ProcessorWrapper interface.", PROCESSOR_WRAPPER_CLASS_CONFIG);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: " that"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

space is added at the end of the line above (admittedly it's hard to see this way)

final FixedKeyProcessorSupplier<KIn, VIn, VOut> processorSupplier
) {
return ProcessorWrapper.asWrappedFixedKey(
processorWrapper.wrapFixedKeyProcessorSupplier(name, processorSupplier)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just checking if this is correct? wrapFixedKeyProcessorSupplier func itself calls return ProcessorWrapper.asWrappedFixedKey(processorSupplier); already, and hence seems a circular calling?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, yes, it's correct but I see how it's confusing -- basically the non-static #wrapFixedKeyProcessorSupplier is what does the actual wrapping via the configured ProcessorWrapper instance, whereas the static ProcessorWrapper#asWrapped method is what converts a regular ProcessorSupplier into the WrappedProcessorSupplier subclass that we use as a marker interface to distinguish between processors that are and aren't wrapped yet.

I will add a comment to the code to clarify for future readers, but perhaps it's also worth choosing a better name for the static #asWrapped utility methods. Maybe #toWrappedProcessorSupplier or something like that?

@guozhangwang @agavra thoughts?

(btw I am going to merge the PR as-is since we can address this in a followup PR, I'll make sure to add the code comments and/or change the name in another PR)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, yeah I think having a different name for the static utility function would be better.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, yes, it's correct but I see how it's confusing -- basically the non-static #wrapFixedKeyProcessorSupplier is what does the actual wrapping via the configured ProcessorWrapper instance, whereas the static ProcessorWrapper#asWrapped method is what converts a regular ProcessorSupplier into the WrappedProcessorSupplier subclass that we use as a marker interface to distinguish between processors that are and aren't wrapped yet.

I'm not following this. wrappedFixedKeyProcessorSupplier is already expected to return an instance that implements WrappedProcessorSupplier - so why does InternalTopologyBuilder also need to enclose it in a WrappedProcessorSupplierImpl? Do we explicitly check for the WrappedProcessorSupplierImpl class elsewhere?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to Rohan's question, I don't think we need to do this here since the WrappedProcessorSupplier already returns the right type.

Just on the naming discussion, though, I think markWrapped could be a good name for the static method.

final String name,
final ProcessorSupplier<KIn, VIn, KOut, VOut> processorSupplier
) {
return ProcessorWrapper.asWrapped(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see above 🙂

@ableegoldman ableegoldman merged commit 87b902d into apache:trunk Nov 24, 2024
8 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants