Skip to content

Commit

Permalink
migrate KTableSourceNode to use ProcesserSupplier#stores
Browse files Browse the repository at this point in the history
  • Loading branch information
agavra committed Nov 21, 2024
1 parent fd9de50 commit b0a3932
Show file tree
Hide file tree
Showing 18 changed files with 278 additions and 143 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,8 @@ public synchronized <K, V> GlobalKTable<K, V> globalTable(final String topic,
final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materializedInternal =
new MaterializedInternal<>(
Materialized.with(consumedInternal.keySerde(), consumedInternal.valueSerde()),
internalStreamsBuilder, topic + "-");
internalStreamsBuilder, topic + "-",
true /* force materializing global tables */);

return internalStreamsBuilder.globalTable(topic, consumedInternal, materializedInternal);
}
Expand Down Expand Up @@ -517,7 +518,7 @@ public synchronized <K, V> GlobalKTable<K, V> globalTable(final String topic,
*/
public synchronized StreamsBuilder addStateStore(final StoreBuilder<?> builder) {
Objects.requireNonNull(builder, "builder can't be null");
internalStreamsBuilder.addStateStore(new StoreBuilderWrapper(builder));
internalStreamsBuilder.addStateStore(StoreBuilderWrapper.wrapStoreBuilder(builder));
return this;
}

Expand Down Expand Up @@ -556,7 +557,7 @@ public synchronized <KIn, VIn> StreamsBuilder addGlobalStore(final StoreBuilder<
Objects.requireNonNull(storeBuilder, "storeBuilder can't be null");
Objects.requireNonNull(consumed, "consumed can't be null");
internalStreamsBuilder.addGlobalStore(
new StoreBuilderWrapper(storeBuilder),
StoreBuilderWrapper.wrapStoreBuilder(storeBuilder),
topic,
new ConsumedInternal<>(consumed),
stateUpdateSupplier,
Expand Down
8 changes: 3 additions & 5 deletions streams/src/main/java/org/apache/kafka/streams/Topology.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.streams.processor.internals.SinkNode;
import org.apache.kafka.streams.processor.internals.SourceNode;
import org.apache.kafka.streams.processor.internals.StoreBuilderWrapper;
import org.apache.kafka.streams.processor.internals.StoreDelegatingProcessorSupplier;
import org.apache.kafka.streams.state.StoreBuilder;

import java.util.Set;
Expand Down Expand Up @@ -851,14 +851,13 @@ public synchronized <KIn, VIn> Topology addGlobalStore(final StoreBuilder<?> sto
final String processorName,
final ProcessorSupplier<KIn, VIn, Void, Void> stateUpdateSupplier) {
internalTopologyBuilder.addGlobalStore(
new StoreBuilderWrapper(storeBuilder),
sourceName,
null,
keyDeserializer,
valueDeserializer,
topic,
processorName,
stateUpdateSupplier,
new StoreDelegatingProcessorSupplier<>(stateUpdateSupplier, Set.of(storeBuilder)),
true
);
return this;
Expand Down Expand Up @@ -897,14 +896,13 @@ public synchronized <KIn, VIn> Topology addGlobalStore(final StoreBuilder<?> sto
final String processorName,
final ProcessorSupplier<KIn, VIn, Void, Void> stateUpdateSupplier) {
internalTopologyBuilder.addGlobalStore(
new StoreBuilderWrapper(storeBuilder),
sourceName,
timestampExtractor,
keyDeserializer,
valueDeserializer,
topic,
processorName,
stateUpdateSupplier,
new StoreDelegatingProcessorSupplier<>(stateUpdateSupplier, Set.of(storeBuilder)),
true
);
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,15 +140,14 @@ public <K, V> KTable<K, V> table(final String topic,
final String tableSourceName = named
.orElseGenerateWithPrefix(this, KTableImpl.SOURCE_NAME);

final KTableSource<K, V> tableSource = new KTableSource<>(materialized.storeName(), materialized.queryableStoreName());
final KTableSource<K, V> tableSource = new KTableSource<>(materialized);
final ProcessorParameters<K, V, ?, ?> processorParameters = new ProcessorParameters<>(tableSource, tableSourceName);

final TableSourceNode<K, V> tableSourceNode = TableSourceNode.<K, V>tableSourceNodeBuilder()
.withTopic(topic)
.withSourceName(sourceName)
.withNodeName(tableSourceName)
.withConsumedInternal(consumed)
.withMaterializedInternal(materialized)
.withProcessorParameters(processorParameters)
.build();
tableSourceNode.setOutputVersioned(materialized.storeSupplier() instanceof VersionedBytesStoreSupplier);
Expand Down Expand Up @@ -186,9 +185,7 @@ public <K, V> GlobalKTable<K, V> globalTable(final String topic,
final String processorName = named
.orElseGenerateWithPrefix(this, KTableImpl.SOURCE_NAME);

// enforce store name as queryable name to always materialize global table stores
final String storeName = materialized.storeName();
final KTableSource<K, V> tableSource = new KTableSource<>(storeName, storeName);
final KTableSource<K, V> tableSource = new KTableSource<>(materialized);

final ProcessorParameters<K, V, ?, ?> processorParameters = new ProcessorParameters<>(tableSource, processorName);

Expand All @@ -197,12 +194,12 @@ public <K, V> GlobalKTable<K, V> globalTable(final String topic,
.isGlobalKTable(true)
.withSourceName(sourceName)
.withConsumedInternal(consumed)
.withMaterializedInternal(materialized)
.withProcessorParameters(processorParameters)
.build();

addGraphNode(root, tableSourceNode);

final String storeName = materialized.storeName();
return new GlobalKTableImpl<>(new KTableSourceValueGetterSupplier<>(storeName), materialized.queryableStoreName());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -660,10 +660,7 @@ public KTable<K, V> toTable(final Named named,
subTopologySourceNodes = this.subTopologySourceNodes;
}

final KTableSource<K, V> tableSource = new KTableSource<>(
materializedInternal.storeName(),
materializedInternal.queryableStoreName()
);
final KTableSource<K, V> tableSource = new KTableSource<>(materializedInternal);
final ProcessorParameters<K, V, ?, ?> processorParameters = new ProcessorParameters<>(tableSource, name);
final GraphNode tableNode = new StreamToTableNode<>(
name,
Expand Down Expand Up @@ -1173,7 +1170,7 @@ private <VO, VR> KStream<K, VR> doStreamTableJoin(final KTable<K, VO> table,
bufferStoreName = Optional.of(name + "-Buffer");
final RocksDBTimeOrderedKeyValueBuffer.Builder<Object, Object> storeBuilder =
new RocksDBTimeOrderedKeyValueBuffer.Builder<>(bufferStoreName.get(), joinedInternal.gracePeriod(), name);
builder.addStateStore(new StoreBuilderWrapper(storeBuilder));
builder.addStateStore(StoreBuilderWrapper.wrapStoreBuilder(storeBuilder));
}

final ProcessorSupplier<K, V, K, ? extends VR> processorSupplier = new KStreamKTableJoin<>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ private void assertUniqueStoreNames(final WindowBytesStoreSupplier supplier,
private static <K, V> StoreFactory joinWindowStoreBuilderFromSupplier(final WindowBytesStoreSupplier storeSupplier,
final Serde<K> keySerde,
final Serde<V> valueSerde) {
return new StoreBuilderWrapper(Stores.windowStoreBuilder(
return StoreBuilderWrapper.wrapStoreBuilder(Stores.windowStoreBuilder(
storeSupplier,
keySerde,
valueSerde
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -590,7 +590,7 @@ public KTable<K, V> suppress(final Suppressed<? super K> suppressed) {
final ProcessorGraphNode<K, Change<V>> node = new TableSuppressNode<>(
name,
new ProcessorParameters<>(suppressionSupplier, name),
new StoreBuilderWrapper(storeBuilder)
StoreBuilderWrapper.wrapStoreBuilder(storeBuilder)
);
node.setOutputVersioned(false);

Expand Down Expand Up @@ -1227,10 +1227,7 @@ private <VR, KO, VO> KTable<K, VR> doJoinOnForeignKey(final KTable<KO, VO> forei
materializedInternal.withKeySerde(keySerde);
}

final KTableSource<K, VR> resultProcessorSupplier = new KTableSource<>(
materializedInternal.storeName(),
materializedInternal.queryableStoreName()
);
final KTableSource<K, VR> resultProcessorSupplier = new KTableSource<>(materializedInternal);

final StoreFactory resultStore =
new KeyValueStoreMaterializer<>(materializedInternal);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,24 @@
*/
package org.apache.kafka.streams.kstream.internals;

import java.util.Set;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.api.RecordMetadata;
import org.apache.kafka.streams.processor.internals.StoreFactory;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.state.internals.KeyValueStoreWrapper;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Objects;

import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor;
import static org.apache.kafka.streams.state.VersionedKeyValueStore.PUT_RETURN_CODE_NOT_PUT;
import static org.apache.kafka.streams.state.internals.KeyValueStoreWrapper.PUT_RETURN_CODE_IS_LATEST;
Expand All @@ -40,15 +43,16 @@ public class KTableSource<KIn, VIn> implements ProcessorSupplier<KIn, VIn, KIn,
private static final Logger LOG = LoggerFactory.getLogger(KTableSource.class);

private final String storeName;
private final StoreFactory storeFactory;
private String queryableName;
private boolean sendOldValues;

public KTableSource(final String storeName, final String queryableName) {
Objects.requireNonNull(storeName, "storeName can't be null");

this.storeName = storeName;
this.queryableName = queryableName;
public KTableSource(
final MaterializedInternal<KIn, VIn, KeyValueStore<Bytes, byte[]>> materialized) {
this.storeName = materialized.storeName();
this.queryableName = materialized.queryableStoreName();
this.sendOldValues = false;
this.storeFactory = new KeyValueStoreMaterializer<>(materialized);
}

public String queryableName() {
Expand All @@ -60,6 +64,15 @@ public Processor<KIn, VIn, KIn, Change<VIn>> get() {
return new KTableSourceProcessor();
}

@Override
public Set<StoreBuilder<?>> stores() {
if (materialized()) {
return Set.of(new StoreFactory.FactoryWrappingStoreBuilder<>(storeFactory));
} else {
return null;
}
}

// when source ktable requires sending old values, we just
// need to set the queryable name as the store name to enforce materialization
public void enableSendingOldValues() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,19 @@ public MaterializedInternal(final Materialized<K, V, S> materialized) {
public MaterializedInternal(final Materialized<K, V, S> materialized,
final InternalNameProvider nameProvider,
final String generatedStorePrefix) {
this(materialized, nameProvider, generatedStorePrefix, false);
}

public MaterializedInternal(final Materialized<K, V, S> materialized,
final InternalNameProvider nameProvider,
final String generatedStorePrefix,
final boolean forceQueryable) {
super(materialized);

// if storeName is not provided, the corresponding KTable would never be queryable;
// but we still need to provide an internal name for it in case we materialize.
queryable = storeName() != null;
if (!queryable && nameProvider != null) {
queryable = forceQueryable || storeName() != null;
if (storeName() == null && nameProvider != null) {
storeName = nameProvider.newStoreName(generatedStorePrefix);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@
*/
package org.apache.kafka.streams.kstream.internals.graph;

import java.util.Set;
import org.apache.kafka.streams.kstream.internals.ConsumedInternal;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.processor.internals.StoreDelegatingProcessorSupplier;
import org.apache.kafka.streams.processor.internals.StoreFactory;

public class GlobalStoreNode<KIn, VIn, S extends StateStore> extends StateStoreNode<S> {
Expand Down Expand Up @@ -52,15 +54,16 @@ public GlobalStoreNode(final StoreFactory storeBuilder,
@Override
public void writeToTopology(final InternalTopologyBuilder topologyBuilder) {
storeBuilder.withLoggingDisabled();
topologyBuilder.addGlobalStore(storeBuilder,
sourceName,
topologyBuilder.addGlobalStore(sourceName,
consumed.timestampExtractor(),
consumed.keyDeserializer(),
consumed.valueDeserializer(),
topic,
processorName,
stateUpdateSupplier,
reprocessOnRestore);
new StoreDelegatingProcessorSupplier<>(
stateUpdateSupplier,
Set.of(new StoreFactory.FactoryWrappingStoreBuilder<>(storeBuilder))
), reprocessOnRestore);

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,10 @@

package org.apache.kafka.streams.kstream.internals.graph;

import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.kstream.internals.ConsumedInternal;
import org.apache.kafka.streams.kstream.internals.KTableSource;
import org.apache.kafka.streams.kstream.internals.KeyValueStoreMaterializer;
import org.apache.kafka.streams.kstream.internals.MaterializedInternal;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.processor.internals.StoreFactory;
import org.apache.kafka.streams.state.KeyValueStore;

import java.util.Collections;
import java.util.Iterator;
Expand All @@ -36,7 +31,6 @@
*/
public class TableSourceNode<K, V> extends SourceGraphNode<K, V> {

private final MaterializedInternal<K, V, ?> materializedInternal;
private final ProcessorParameters<K, V, ?, ?> processorParameters;
private final String sourceName;
private final boolean isGlobalKTable;
Expand All @@ -46,7 +40,6 @@ private TableSourceNode(final String nodeName,
final String sourceName,
final String topic,
final ConsumedInternal<K, V> consumedInternal,
final MaterializedInternal<K, V, ?> materializedInternal,
final ProcessorParameters<K, V, ?, ?> processorParameters,
final boolean isGlobalKTable) {

Expand All @@ -57,7 +50,6 @@ private TableSourceNode(final String nodeName,
this.sourceName = sourceName;
this.isGlobalKTable = isGlobalKTable;
this.processorParameters = processorParameters;
this.materializedInternal = materializedInternal;
}


Expand All @@ -68,7 +60,6 @@ public void reuseSourceTopicForChangeLog(final boolean shouldReuseSourceTopicFor
@Override
public String toString() {
return "TableSourceNode{" +
"materializedInternal=" + materializedInternal +
", processorParameters=" + processorParameters +
", sourceName='" + sourceName + '\'' +
", isGlobalKTable=" + isGlobalKTable +
Expand All @@ -93,12 +84,8 @@ public void writeToTopology(final InternalTopologyBuilder topologyBuilder) {
throw new IllegalStateException("A table source node must have a single topic as input");
}

final StoreFactory storeFactory =
new KeyValueStoreMaterializer<>((MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>>) materializedInternal);

if (isGlobalKTable) {
topologyBuilder.addGlobalStore(
storeFactory,
sourceName,
consumedInternal().timestampExtractor(),
consumedInternal().keyDeserializer(),
Expand All @@ -116,16 +103,16 @@ public void writeToTopology(final InternalTopologyBuilder topologyBuilder) {
consumedInternal().valueDeserializer(),
topicName);

topologyBuilder.addProcessor(processorParameters.processorName(), processorParameters.processorSupplier(), sourceName);
processorParameters.addProcessorTo(topologyBuilder, new String[] {sourceName});

// only add state store if the source KTable should be materialized
// if the KTableSource should not be materialized, stores will be null or empty
final KTableSource<K, V> tableSource = (KTableSource<K, V>) processorParameters.processorSupplier();
if (tableSource.materialized()) {
topologyBuilder.addStateStore(storeFactory, nodeName());

if (tableSource.stores() != null) {
if (shouldReuseSourceTopicForChangelog) {
storeFactory.withLoggingDisabled();
topologyBuilder.connectSourceStoreAndTopic(storeFactory.name(), topicName);
tableSource.stores().forEach(store -> {
store.withLoggingDisabled();
topologyBuilder.connectSourceStoreAndTopic(store.name(), topicName);
});
}
}
}
Expand All @@ -138,7 +125,6 @@ public static final class TableSourceNodeBuilder<K, V> {
private String sourceName;
private String topic;
private ConsumedInternal<K, V> consumedInternal;
private MaterializedInternal<K, V, ?> materializedInternal;
private ProcessorParameters<K, V, ?, ?> processorParameters;
private boolean isGlobalKTable = false;

Expand All @@ -155,11 +141,6 @@ public TableSourceNodeBuilder<K, V> withTopic(final String topic) {
return this;
}

public TableSourceNodeBuilder<K, V> withMaterializedInternal(final MaterializedInternal<K, V, ?> materializedInternal) {
this.materializedInternal = materializedInternal;
return this;
}

public TableSourceNodeBuilder<K, V> withConsumedInternal(final ConsumedInternal<K, V> consumedInternal) {
this.consumedInternal = consumedInternal;
return this;
Expand All @@ -185,7 +166,6 @@ public TableSourceNode<K, V> build() {
sourceName,
topic,
consumedInternal,
materializedInternal,
processorParameters,
isGlobalKTable);
}
Expand Down
Loading

0 comments on commit b0a3932

Please sign in to comment.