Each implementation must override: equals(object) and hashCode(), and those implementation
+ * must catch Broker updates (e.g. it's not safe to compare only the Broker UID). It's recommended
+ * to not relying on equals(object) and hashCode() generated by Protocol Buffer compiler.
+ *
+ *
Testing equals(object) and hashCode() of newly added implementation is done by adding sources
+ * to parameterized tests in BrokerTest.
+ */
+public interface Broker {
+
+ /**
+ * Get broker identifier.
+ *
+ * @return identifier.
+ */
+ String id();
+
+ /**
+ * Get broker topic.
+ *
+ * @return topic.
+ */
+ String topic();
+
+ /**
+ * Get Broker dead letter sink.
+ *
+ * @return dead letter sink.
+ */
+ String deadLetterSink();
+}
diff --git a/data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/BrokerWrapper.java b/data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/BrokerWrapper.java
new file mode 100644
index 000000000..1343aebb7
--- /dev/null
+++ b/data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/BrokerWrapper.java
@@ -0,0 +1,74 @@
+/*
+ * Copyright 2020 The Knative Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package dev.knative.eventing.kafka.broker.core;
+
+import dev.knative.eventing.kafka.broker.core.config.BrokersConfig.Broker;
+import java.util.Objects;
+
+/**
+ * BrokerWrapper wraps a Broker for implementing the Broker interface.
+ *
+ *
The wrapped Broker Broker must not be modified by callers.
+ */
+public class BrokerWrapper implements dev.knative.eventing.kafka.broker.core.Broker {
+
+ private final Broker broker;
+
+ /**
+ * All args constructor.
+ *
+ * @param broker broker (it must not be modified by callers)
+ */
+ public BrokerWrapper(final Broker broker) {
+ this.broker = broker;
+ }
+
+ @Override
+ public String id() {
+ return broker.getId();
+ }
+
+ @Override
+ public String topic() {
+ return broker.getTopic();
+ }
+
+ @Override
+ public String deadLetterSink() {
+ return broker.getDeadLetterSink();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ BrokerWrapper that = (BrokerWrapper) o;
+
+ return broker.getId().equals(that.id())
+ && broker.getDeadLetterSink().equals(that.deadLetterSink())
+ && broker.getTopic().equals(that.topic());
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(broker.getId(), broker.getDeadLetterSink(), broker.getTopic());
+ }
+}
diff --git a/data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/EventMatcher.java b/data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/EventMatcher.java
new file mode 100644
index 000000000..438234cec
--- /dev/null
+++ b/data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/EventMatcher.java
@@ -0,0 +1,142 @@
+/*
+ * Copyright 2020 The Knative Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package dev.knative.eventing.kafka.broker.core;
+
+import static java.time.format.DateTimeFormatter.ISO_INSTANT;
+
+import dev.knative.eventing.kafka.broker.core.Filter;
+import io.cloudevents.CloudEvent;
+import io.cloudevents.core.v1.ContextAttributes;
+import io.cloudevents.lang.Nullable;
+import java.net.URI;
+import java.util.AbstractMap.SimpleImmutableEntry;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+
+public class EventMatcher implements Filter {
+
+ private static final String DEFAULT_STRING = "";
+
+ static final Map> attributesMapper = Map.of(
+ Constants.SPEC_VERSION, event -> event.getSpecVersion().toString(),
+ Constants.ID, CloudEvent::getId,
+ Constants.TYPE, CloudEvent::getType,
+ Constants.SOURCE, event -> event.getSource().toString(),
+ Constants.DATA_CONTENT_TYPE, CloudEvent::getDataContentType,
+ Constants.DATA_SCHEMA, event -> getOrDefault(event.getDataSchema(), URI::toString),
+ Constants.SCHEMA_URL, event -> getOrDefault(event.getDataSchema(), URI::toString),
+ Constants.SUBJECT, CloudEvent::getSubject,
+ Constants.TIME, event -> getOrDefault(event.getTime(), time -> time.format(ISO_INSTANT))
+ );
+
+ // the key represents the function to turn an event into a string value.
+ // the value represents the value to match.
+ // specversion -> 1.0
+ // f(event) -> event.getSpecVersion().toString() -> 1.0
+ private final List, String>> attributes;
+
+ /**
+ * All args constructor.
+ *
+ * @param attributes attributes to match to pass filter.
+ */
+ public EventMatcher(final Map attributes) {
+ this.attributes = attributes.entrySet().stream()
+ .map(entry -> new SimpleImmutableEntry<>(
+ attributesMapper.getOrDefault(
+ entry.getKey(),
+ event -> getOrDefault(event.getAttribute(entry.getKey()), Object::toString)
+ ),
+ entry.getValue()
+ ))
+ .collect(Collectors.toUnmodifiableList());
+ }
+
+ /**
+ * Attributes filters events by exact match on event context attributes. Each key in the map is
+ * compared with the equivalent key in the event context. An event passes the filter if all values
+ * are equal to the specified values. Nested context attributes are not supported as keys. Only
+ * string values are supported.
+ *
+ * @param event event to match
+ * @return true if event matches attributes, otherwise false.
+ */
+ @Override
+ public boolean match(final CloudEvent event) {
+
+ for (final var entry : attributes) {
+ if (!entry.getKey().apply(event).equals(entry.getValue())) {
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ private static String getOrDefault(
+ @Nullable final T s,
+ final Function stringProvider) {
+
+ if (s == null) {
+ return DEFAULT_STRING;
+ }
+ return stringProvider.apply(s);
+ }
+
+ static class Constants {
+
+ static final String TYPE = ContextAttributes.TYPE
+ .name()
+ .toLowerCase();
+
+ static final String SPEC_VERSION = ContextAttributes.SPECVERSION
+ .name()
+ .toLowerCase();
+
+ static final String ID = ContextAttributes.ID
+ .name()
+ .toLowerCase();
+
+ static final String SOURCE = ContextAttributes.SOURCE
+ .name()
+ .toLowerCase();
+
+ static final String DATA_CONTENT_TYPE = ContextAttributes.DATACONTENTTYPE
+ .name()
+ .toLowerCase();
+
+ static final String DATA_SCHEMA = ContextAttributes.DATASCHEMA
+ .name()
+ .toLowerCase();
+
+ static final String SCHEMA_URL = io.cloudevents.core.v03.ContextAttributes.SCHEMAURL
+ .name()
+ .toLowerCase();
+
+ static final String SUBJECT = ContextAttributes.SUBJECT
+ .name()
+ .toLowerCase();
+
+ static final String TIME = ContextAttributes.TIME
+ .name()
+ .toLowerCase();
+ }
+}
diff --git a/data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/Filter.java b/data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/Filter.java
new file mode 100644
index 000000000..efbe878bd
--- /dev/null
+++ b/data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/Filter.java
@@ -0,0 +1,28 @@
+/*
+ * Copyright 2020 The Knative Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package dev.knative.eventing.kafka.broker.core;
+
+/**
+ * Filter interface abstract the filtering logic.
+ *
+ * @param type of objects to filter.
+ */
+@FunctionalInterface
+public interface Filter {
+
+ boolean match(final T event);
+}
diff --git a/data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/ObjectsCreator.java b/data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/ObjectsCreator.java
new file mode 100644
index 000000000..d6288cb13
--- /dev/null
+++ b/data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/ObjectsCreator.java
@@ -0,0 +1,98 @@
+/*
+ * Copyright 2020 The Knative Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package dev.knative.eventing.kafka.broker.core;
+
+import dev.knative.eventing.kafka.broker.core.config.BrokersConfig.Brokers;
+import io.cloudevents.CloudEvent;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * ObjectsCreator receives updates and converts protobuf objects to core objects often by wrapping
+ * protobuf objects by means of wrapper objects.
+ */
+public class ObjectsCreator implements Consumer {
+
+ private static final Logger logger = LoggerFactory.getLogger(ObjectsCreator.class);
+
+ private static final int WAIT_TIMEOUT = 1;
+
+ private final ObjectsReconciler objectsReconciler;
+
+ /**
+ * All args constructor.
+ *
+ * @param objectsReconciler brokers and triggers consumer.
+ */
+ public ObjectsCreator(final ObjectsReconciler objectsReconciler) {
+ Objects.requireNonNull(objectsReconciler, "provider objectsReconciler");
+
+ this.objectsReconciler = objectsReconciler;
+ }
+
+ /**
+ * Capture new changes.
+ *
+ * @param brokers new brokers config.
+ */
+ @Override
+ public void accept(final Brokers brokers) {
+
+ final Map>> objects = new HashMap<>();
+
+ for (final var broker : brokers.getBrokerList()) {
+ if (broker.getTriggersCount() <= 0) {
+ continue;
+ }
+
+ final var triggers = new HashSet>(
+ broker.getTriggersCount()
+ );
+ for (final var trigger : broker.getTriggersList()) {
+ triggers.add(new TriggerWrapper(trigger));
+ }
+
+ objects.put(new BrokerWrapper(broker), triggers);
+ }
+
+ try {
+ final var latch = new CountDownLatch(1);
+ objectsReconciler.reconcile(objects).onComplete(result -> {
+ if (result.succeeded()) {
+ logger.debug("reconciled objects {}", brokers);
+ } else {
+ logger.error("failed to reconcile {}", brokers);
+ }
+ latch.countDown();
+ });
+
+ // wait the reconcilation
+ latch.await(WAIT_TIMEOUT, TimeUnit.MINUTES);
+
+ } catch (final Exception ex) {
+ logger.error("failed to reconcile objects - cause {} - objects {}", ex, objects);
+ }
+ }
+}
diff --git a/data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/ObjectsReconciler.java b/data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/ObjectsReconciler.java
new file mode 100644
index 000000000..29871df03
--- /dev/null
+++ b/data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/ObjectsReconciler.java
@@ -0,0 +1,27 @@
+/*
+ * Copyright 2020 The Knative Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package dev.knative.eventing.kafka.broker.core;
+
+import io.vertx.core.Future;
+import java.util.Map;
+import java.util.Set;
+
+@FunctionalInterface
+public interface ObjectsReconciler {
+
+ Future reconcile(Map>> objects) throws Exception;
+}
diff --git a/data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/Trigger.java b/data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/Trigger.java
new file mode 100644
index 000000000..8ece9611e
--- /dev/null
+++ b/data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/Trigger.java
@@ -0,0 +1,51 @@
+/*
+ * Copyright 2020 The Knative Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package dev.knative.eventing.kafka.broker.core;
+
+/**
+ * Trigger interface represents the Trigger object.
+ *
+ *
Each implementation must override: equals(object) and hashCode(), and those implementation
+ * must catch Trigger updates (e.g. it's not safe to compare only the Trigger UID). It's recommended
+ * to not relying on equals(object) and hashCode() generated by Protocol Buffer compiler.
+ *
+ *
Testing equals(object) and hashCode() of newly added implementation is done by adding sources
+ * to parameterized tests in TriggerTest.
+ */
+public interface Trigger {
+
+ /**
+ * Get trigger id.
+ *
+ * @return trigger identifier.
+ */
+ String id();
+
+ /**
+ * Get the filter.
+ *
+ * @return filter to use.
+ */
+ Filter filter();
+
+ /**
+ * Get trigger destination URI.
+ *
+ * @return destination URI.
+ */
+ String destination();
+}
diff --git a/data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/TriggerWrapper.java b/data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/TriggerWrapper.java
new file mode 100644
index 000000000..f03f306d2
--- /dev/null
+++ b/data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/TriggerWrapper.java
@@ -0,0 +1,97 @@
+/*
+ * Copyright 2020 The Knative Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package dev.knative.eventing.kafka.broker.core;
+
+import dev.knative.eventing.kafka.broker.core.config.BrokersConfig.Trigger;
+import io.cloudevents.CloudEvent;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * TriggerWrapper wraps a Trigger for implementing the Trigger interface.
+ *
+ *
The wrapped Trigger Trigger must not be modified by callers.
+ */
+public class TriggerWrapper implements dev.knative.eventing.kafka.broker.core.Trigger {
+
+ private final Trigger trigger;
+
+ /**
+ * All args constructor.
+ *
+ * @param trigger trigger (it must not be modified by callers)
+ */
+ public TriggerWrapper(final Trigger trigger) {
+ this.trigger = trigger;
+ }
+
+ @Override
+ public String id() {
+ return trigger.getId();
+ }
+
+ @Override
+ public Filter filter() {
+ return new EventMatcher(trigger.getAttributesMap());
+ }
+
+ @Override
+ public String destination() {
+ return trigger.getDestination();
+ }
+
+ @Override
+ public boolean equals(Object object) {
+ if (!(object instanceof TriggerWrapper)) {
+ return false;
+ }
+ final var t = (TriggerWrapper) object;
+ return t.trigger.getId().equals(trigger.getId())
+ && t.trigger.getDestination().equals(trigger.getDestination())
+ && mapEquals(t.trigger.getAttributesMap(), trigger.getAttributesMap());
+ }
+
+ @Override
+ public int hashCode() {
+ final var hashAttributes = trigger.getAttributesMap().entrySet().stream()
+ .mapToInt(entry -> Objects.hash(entry.getKey(), entry.getValue()))
+ .sum();
+
+ return Objects.hash(
+ trigger.getId(),
+ trigger.getDestination(),
+ hashAttributes
+ );
+ }
+
+ // TODO re-evaluate hashcode and equals
+ private static boolean mapEquals(final Map m1, final Map m2) {
+ final var count = m1.entrySet().stream()
+ .map(entry -> m2.containsKey(entry.getKey())
+ && m2.get(entry.getKey()).equals(entry.getValue()))
+ .filter(Boolean::booleanValue)
+ .count();
+ return count == m1.size() && count == m2.size();
+ }
+
+ @Override
+ public String toString() {
+ return "TriggerWrapper{"
+ + "trigger=" + trigger
+ + '}';
+ }
+}
diff --git a/data-plane/core/src/test/java/dev/knative/eventing/kafka/broker/core/BrokerWrapperTest.java b/data-plane/core/src/test/java/dev/knative/eventing/kafka/broker/core/BrokerWrapperTest.java
new file mode 100644
index 000000000..be88f01c2
--- /dev/null
+++ b/data-plane/core/src/test/java/dev/knative/eventing/kafka/broker/core/BrokerWrapperTest.java
@@ -0,0 +1,144 @@
+/*
+ * Copyright 2020 The Knative Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package dev.knative.eventing.kafka.broker.core;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import dev.knative.eventing.kafka.broker.core.config.BrokersConfig.Broker;
+import java.util.stream.Stream;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+public class BrokerWrapperTest {
+
+ @Test
+ public void idCallShouldBeDelegatedToWrappedBroker() {
+ final var id = "123-42";
+ final var broker = new BrokerWrapper(
+ Broker.newBuilder().setId(id).build()
+ );
+
+ assertThat(broker.id()).isEqualTo(id);
+ }
+
+ @Test
+ public void deadLetterSinkCallShouldBeDelegatedToWrappedBroker() {
+ final var deadLetterSink = "http://localhost:9090/api";
+ final var broker = new BrokerWrapper(
+ Broker.newBuilder().setDeadLetterSink(deadLetterSink).build()
+ );
+
+ assertThat(broker.deadLetterSink()).isEqualTo(deadLetterSink);
+ }
+
+ @Test
+ public void topicCallShouldBeDelegatedToWrappedBroker() {
+ final var topic = "knative-topic";
+ final var broker = new BrokerWrapper(
+ Broker.newBuilder().setTopic(topic).build()
+ );
+
+ assertThat(broker.topic()).isEqualTo(topic);
+ }
+
+ @ParameterizedTest
+ @MethodSource(value = {"equalTriggersProvider"})
+ public void testTriggerEquality(
+ final dev.knative.eventing.kafka.broker.core.Broker b1,
+ final dev.knative.eventing.kafka.broker.core.Broker b2) {
+
+ assertThat(b1).isEqualTo(b2);
+ assertThat(b1.hashCode()).isEqualTo(b2.hashCode());
+ }
+
+ @ParameterizedTest
+ @MethodSource(value = {"differentTriggersProvider"})
+ public void testTriggerDifference(
+ final dev.knative.eventing.kafka.broker.core.Broker b1,
+ final dev.knative.eventing.kafka.broker.core.Broker b2) {
+
+ assertThat(b1).isNotEqualTo(b2);
+ assertThat(b1.hashCode()).isNotEqualTo(b2.hashCode());
+ }
+
+ public static Stream differentTriggersProvider() {
+ return Stream.of(
+ Arguments.of(
+ new BrokerWrapper(
+ Broker.newBuilder()
+ .setId("1234-id")
+ .build()
+ ),
+ new BrokerWrapper(
+ Broker.newBuilder().build()
+ )
+ ),
+ Arguments.of(
+ new BrokerWrapper(
+ Broker.newBuilder()
+ .setTopic("kantive-topic")
+ .build()
+ ),
+ new BrokerWrapper(
+ Broker.newBuilder().build()
+ )
+ ),
+ Arguments.of(
+ new BrokerWrapper(
+ Broker.newBuilder()
+ .setDeadLetterSink("http:/localhost:9090")
+ .build()
+ ),
+ new BrokerWrapper(
+ Broker.newBuilder().build()
+ )
+ )
+ );
+ }
+
+ public static Stream equalTriggersProvider() {
+ return Stream.of(
+ Arguments.of(
+ new BrokerWrapper(
+ Broker.newBuilder()
+ .build()
+ ),
+ new BrokerWrapper(
+ Broker.newBuilder().build()
+ )
+ ),
+ Arguments.of(
+ new BrokerWrapper(
+ Broker.newBuilder()
+ .setTopic("knative-topic")
+ .setId("1234-42")
+ .setDeadLetterSink("http://localhost:9090")
+ .build()
+ ),
+ new BrokerWrapper(
+ Broker.newBuilder()
+ .setId("1234-42")
+ .setTopic("knative-topic")
+ .setDeadLetterSink("http://localhost:9090")
+ .build()
+ )
+ )
+ );
+ }
+}
\ No newline at end of file
diff --git a/data-plane/core/src/test/java/dev/knative/eventing/kafka/broker/core/EventMatcherTest.java b/data-plane/core/src/test/java/dev/knative/eventing/kafka/broker/core/EventMatcherTest.java
new file mode 100644
index 000000000..b5373bbbb
--- /dev/null
+++ b/data-plane/core/src/test/java/dev/knative/eventing/kafka/broker/core/EventMatcherTest.java
@@ -0,0 +1,280 @@
+/*
+ * Copyright 2020 The Knative Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package dev.knative.eventing.kafka.broker.core;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import dev.knative.eventing.kafka.broker.core.EventMatcher.Constants;
+import io.cloudevents.CloudEvent;
+import java.net.URI;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+public class EventMatcherTest {
+
+ @ParameterizedTest
+ @MethodSource(value = {"testCases"})
+ public void match(
+ final Map attributes,
+ final CloudEvent event,
+ final boolean shouldMatch) {
+
+ final var matcher = new EventMatcher(attributes);
+
+ final var match = matcher.match(event);
+
+ assertEquals(shouldMatch, match);
+ }
+
+ public static Stream testCases() {
+ return Stream.of(
+ Arguments.of(
+ Map.of(
+ Constants.SPEC_VERSION, "1.0"
+ ),
+ new io.cloudevents.core.v1.CloudEventBuilder()
+ .withId("1234")
+ .withSource(URI.create("/source"))
+ .withType("type")
+ .build(),
+ true
+ ),
+ Arguments.of(
+ Map.of(
+ Constants.SPEC_VERSION, "0.3"
+ ),
+ new io.cloudevents.core.v03.CloudEventBuilder()
+ .withId("1234")
+ .withSource(URI.create("/source"))
+ .withType("type")
+ .build(),
+ true
+ ),
+ Arguments.of(
+ Map.of(
+ Constants.SPEC_VERSION, "0.3",
+ Constants.ID, "123-42"
+ ),
+ new io.cloudevents.core.v03.CloudEventBuilder()
+ .withId("123-42")
+ .withSource(URI.create("/source"))
+ .withType("type")
+ .build(),
+ true
+ ),
+ Arguments.of(
+ Map.of(
+ Constants.SPEC_VERSION, "0.3",
+ Constants.ID, "123-42"
+ ),
+ new io.cloudevents.core.v03.CloudEventBuilder()
+ .withId("123-423")
+ .withSource(URI.create("/source"))
+ .withType("type")
+ .build(),
+ false
+ ),
+ Arguments.of(
+ Map.of(
+ Constants.SPEC_VERSION, "0.3",
+ Constants.ID, "123-42",
+ Constants.DATA_CONTENT_TYPE, "application/cloudevents+json"
+ ),
+ new io.cloudevents.core.v03.CloudEventBuilder()
+ .withId("123-42")
+ .withSource(URI.create("/source"))
+ .withType("type")
+ .withDataContentType("application/cloudevents+json")
+ .build(),
+ true
+ ),
+ Arguments.of(
+ Map.of(
+ Constants.SPEC_VERSION, "1.0",
+ Constants.ID, "123-42",
+ Constants.DATA_CONTENT_TYPE, "application/cloudevents+json",
+ Constants.DATA_SCHEMA, "/api/schema"
+ ),
+ new io.cloudevents.core.v1.CloudEventBuilder()
+ .withId("123-42")
+ .withDataContentType("application/cloudevents+json")
+ .withDataSchema(URI.create("/api/schema"))
+ .withSource(URI.create("/source"))
+ .withType("type")
+ .build(),
+ true
+ ),
+ Arguments.of(
+ Map.of(
+ Constants.SPEC_VERSION, "1.0",
+ Constants.ID, "123-42",
+ Constants.DATA_CONTENT_TYPE, "application/cloudevents+json",
+ Constants.DATA_SCHEMA, "/api/schema",
+ Constants.SOURCE, "/api/some-source"
+ ),
+ new io.cloudevents.core.v1.CloudEventBuilder()
+ .withId("123-42")
+ .withDataContentType("application/cloudevents+json")
+ .withDataSchema(URI.create("/api/schema"))
+ .withSource(URI.create("/api/some-source"))
+ .withType("type")
+ .build(),
+ true
+ ),
+ Arguments.of(
+ Map.of(
+ Constants.SPEC_VERSION, "1.0",
+ Constants.ID, "123-42",
+ Constants.DATA_CONTENT_TYPE, "application/cloudevents+json",
+ Constants.SOURCE, "/api/schema",
+ Constants.DATA_SCHEMA, "/api/some-source"
+ ),
+ new io.cloudevents.core.v1.CloudEventBuilder()
+ .withId("123-42")
+ .withDataContentType("application/cloudevents+json")
+ .withDataSchema(URI.create("/api/schema"))
+ .withSource(URI.create("/api/some-source"))
+ .withType("type")
+ .build(),
+ false
+ ),
+ Arguments.of(
+ Map.of(
+ Constants.SPEC_VERSION, "1.0",
+ Constants.ID, "123-42",
+ Constants.DATA_CONTENT_TYPE, "application/cloudevents+json",
+ Constants.DATA_SCHEMA, "/api/schema",
+ Constants.SOURCE, "/api/some-source",
+ Constants.SUBJECT, "a-subject-42",
+ Constants.TIME, "1985-04-12T23:20:50Z",
+ Constants.SCHEMA_URL, "/api/schema-url"
+ ),
+ new io.cloudevents.core.v1.CloudEventBuilder()
+ .withId("123-42")
+ .withDataContentType("application/cloudevents+json")
+ .withDataSchema(URI.create("/api/schema"))
+ .withSource(URI.create("/api/some-source"))
+ .withSubject("a-subject-42")
+ .withType("type")
+ .withTime(ZonedDateTime.of(
+ 1985, 4, 12,
+ 23, 20, 50, 0,
+ ZoneId.of("Z")
+ ))
+ .build(),
+ false
+ ),
+ Arguments.of(
+ Map.of(
+ Constants.SPEC_VERSION, "0.3",
+ Constants.ID, "123-42",
+ Constants.DATA_CONTENT_TYPE, "application/cloudevents+json",
+ Constants.SOURCE, "/api/some-source",
+ Constants.SUBJECT, "a-subject-42",
+ Constants.TIME, "1985-04-12T23:20:50Z",
+ Constants.SCHEMA_URL, "/api/schema-url"
+ ),
+ new io.cloudevents.core.v03.CloudEventBuilder()
+ .withId("123-42")
+ .withDataContentType("application/cloudevents+json")
+ .withSchemaUrl(URI.create("/api/schema-url"))
+ .withSource(URI.create("/api/some-source"))
+ .withSubject("a-subject-42")
+ .withType("type")
+ .withTime(ZonedDateTime.of(
+ 1985, 4, 12,
+ 23, 20, 50, 0,
+ ZoneId.of("Z")
+ ))
+ .build(),
+ true
+ ),
+ Arguments.of(
+ Map.of(
+ Constants.SPEC_VERSION, "0.3",
+ Constants.ID, "123-42",
+ Constants.DATA_CONTENT_TYPE, "application/cloudevents+json",
+ Constants.SOURCE, "/api/some-source",
+ Constants.SUBJECT, "a-subject-42",
+ Constants.TIME, "1985-04-12T23:20:50Z",
+ Constants.SCHEMA_URL, "/api/schema-url",
+ Constants.TYPE, "dev.knative.eventing.create"
+ ),
+ new io.cloudevents.core.v03.CloudEventBuilder()
+ .withId("123-42")
+ .withDataContentType("application/cloudevents+json")
+ .withSchemaUrl(URI.create("/api/schema-url"))
+ .withSource(URI.create("/api/some-source"))
+ .withSubject("a-subject-42")
+ .withType("dev.knative.eventing.create")
+ .withTime(ZonedDateTime.of(
+ 1985, 4, 12,
+ 23, 20, 50, 0,
+ ZoneId.of("Z")
+ ))
+ .build(),
+ true
+ ),
+ Arguments.of(
+ Map.of(
+ Constants.SPEC_VERSION, "1.0",
+ Constants.ID, "123-42",
+ Constants.DATA_CONTENT_TYPE, "application/cloudevents+json",
+ Constants.SOURCE, "/api/some-source",
+ Constants.SUBJECT, "a-subject-42",
+ Constants.TIME, "1985-04-12T23:20:50Z",
+ Constants.DATA_SCHEMA, "/api/schema",
+ Constants.TYPE, "dev.knative.eventing.create"
+ ),
+ new io.cloudevents.core.v1.CloudEventBuilder()
+ .withId("123-42")
+ .withDataContentType("application/cloudevents+json")
+ .withDataSchema(URI.create("/api/schema"))
+ .withSource(URI.create("/api/some-source"))
+ .withSubject("a-subject-42")
+ .withType("dev.knative.eventing.create")
+ .withTime(ZonedDateTime.of(
+ 1985, 4, 12,
+ 23, 20, 50, 0,
+ ZoneId.of("Z")
+ ))
+ .build(),
+ true
+ )
+ );
+ }
+
+ @Test
+ public void shouldSetAllAttributes() {
+ final var size = Stream.concat(
+ io.cloudevents.core.v1.ContextAttributes.VALUES.stream(),
+ io.cloudevents.core.v03.ContextAttributes.VALUES.stream()
+ )
+ .collect(Collectors.toSet())
+ .size();
+
+ // DATACONTENTENCODING isn't usable, so +1
+ assertEquals(size, EventMatcher.attributesMapper.size() + 1);
+ }
+}
\ No newline at end of file
diff --git a/data-plane/core/src/test/java/dev/knative/eventing/kafka/broker/core/ObjectsCreatorTest.java b/data-plane/core/src/test/java/dev/knative/eventing/kafka/broker/core/ObjectsCreatorTest.java
new file mode 100644
index 000000000..093e34277
--- /dev/null
+++ b/data-plane/core/src/test/java/dev/knative/eventing/kafka/broker/core/ObjectsCreatorTest.java
@@ -0,0 +1,83 @@
+/*
+ * Copyright 2020 The Knative Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package dev.knative.eventing.kafka.broker.core;
+
+import static dev.knative.eventing.kafka.broker.core.testing.utils.CoreObjects.broker1;
+import static dev.knative.eventing.kafka.broker.core.testing.utils.CoreObjects.broker2;
+import static dev.knative.eventing.kafka.broker.core.testing.utils.CoreObjects.brokers;
+import static dev.knative.eventing.kafka.broker.core.testing.utils.CoreObjects.trigger1;
+import static dev.knative.eventing.kafka.broker.core.testing.utils.CoreObjects.trigger2;
+import static dev.knative.eventing.kafka.broker.core.testing.utils.CoreObjects.trigger3;
+import static dev.knative.eventing.kafka.broker.core.testing.utils.CoreObjects.trigger4;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import dev.knative.eventing.kafka.broker.core.config.BrokersConfig;
+import dev.knative.eventing.kafka.broker.core.config.BrokersConfig.Brokers;
+import io.vertx.core.Future;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.parallel.Execution;
+import org.junit.jupiter.api.parallel.ExecutionMode;
+
+@Execution(ExecutionMode.CONCURRENT)
+public class ObjectsCreatorTest {
+
+ @Test
+ public void shouldNotPassBrokerWithNoTrigger() {
+ final var called = new AtomicBoolean(false);
+
+ final var brokers = Map.of();
+
+ final var creator = new ObjectsCreator(objects -> {
+ called.set(true);
+ assertThat(objects).usingRecursiveComparison().isEqualTo(brokers);
+ return Future.succeededFuture();
+ });
+
+ creator.accept(Brokers.newBuilder()
+ .addBroker(BrokersConfig.Broker.newBuilder()
+ .setTopic("1234")
+ .setDeadLetterSink("http://localhost:9090")
+ .build())
+ .build()
+ );
+
+ assertThat(called.get()).isTrue();
+ }
+
+ @Test
+ public void shouldPassAllTriggers() {
+ final var called = new AtomicBoolean(false);
+
+ final var brokers = Map.of(
+ broker1(), Set.of(trigger1(), trigger2()),
+ broker2(), Set.of(trigger3(), trigger4())
+ );
+
+ final var creator = new ObjectsCreator(objects -> {
+ called.set(true);
+ assertThat(objects).usingRecursiveComparison().isEqualTo(brokers);
+ return Future.succeededFuture();
+ });
+
+ creator.accept(brokers());
+
+ assertThat(called.get()).isTrue();
+ }
+}
\ No newline at end of file
diff --git a/data-plane/core/src/test/java/dev/knative/eventing/kafka/broker/core/TriggerWrapperTest.java b/data-plane/core/src/test/java/dev/knative/eventing/kafka/broker/core/TriggerWrapperTest.java
new file mode 100644
index 000000000..fdc43d1a1
--- /dev/null
+++ b/data-plane/core/src/test/java/dev/knative/eventing/kafka/broker/core/TriggerWrapperTest.java
@@ -0,0 +1,289 @@
+/*
+ * Copyright 2020 The Knative Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package dev.knative.eventing.kafka.broker.core;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import dev.knative.eventing.kafka.broker.core.config.BrokersConfig.Trigger;
+import io.cloudevents.CloudEvent;
+import java.util.Collections;
+import java.util.Map;
+import java.util.stream.Stream;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.parallel.Execution;
+import org.junit.jupiter.api.parallel.ExecutionMode;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+@Execution(value = ExecutionMode.CONCURRENT)
+public class TriggerWrapperTest {
+
+ @ParameterizedTest
+ @MethodSource(value = {"equalTriggersProvider"})
+ public void testTriggerEquality(final TriggerWrapper t1, final TriggerWrapper t2) {
+ assertThat(t1).isEqualTo(t2);
+ assertThat(t1.hashCode()).isEqualTo(t2.hashCode());
+ }
+
+ @ParameterizedTest
+ @MethodSource(value = {"differentTriggersProvider"})
+ public void testTriggerDifference(final TriggerWrapper t1, final TriggerWrapper t2) {
+ assertThat(t1).isNotEqualTo(t2);
+ assertThat(t1.hashCode()).isNotEqualTo(t2.hashCode());
+ }
+
+ @Test
+ public void idCallShouldBeDelegatedToWrappedTrigger() {
+ final var id = "123-42";
+ final var triggerWrapper = new TriggerWrapper(
+ Trigger.newBuilder().setId(id).build()
+ );
+
+ assertThat(triggerWrapper.id()).isEqualTo(id);
+
+ }
+
+ @Test
+ public void destinationCallShouldBeDelegatedToWrappedTrigger() {
+ final var destination = "destination-42";
+ final var triggerWrapper = new TriggerWrapper(
+ Trigger.newBuilder().setDestination(destination).build()
+ );
+
+ assertThat(triggerWrapper.destination()).isEqualTo(destination);
+ }
+
+ // test if filter returned by filter() agrees with EventMatcher
+ @ParameterizedTest
+ @MethodSource(value = "dev.knative.eventing.kafka.broker.core.EventMatcherTest#testCases")
+ public void testFilter(
+ final Map attributes,
+ final CloudEvent event,
+ final boolean shouldMatch) {
+ final var triggerWrapper = new TriggerWrapper(
+ Trigger.newBuilder()
+ .putAllAttributes(attributes)
+ .build()
+ );
+
+ final var filter = triggerWrapper.filter();
+
+ final var match = filter.match(event);
+
+ assertThat(match).isEqualTo(shouldMatch);
+ }
+
+ public static Stream differentTriggersProvider() {
+ return Stream.of(
+ // trigger's destination is different
+ Arguments.of(
+ new TriggerWrapper(Trigger
+ .newBuilder()
+ .putAllAttributes(Collections.emptyMap())
+ .setDestination("this-is-my-destination1")
+ .setId("1234-hello")
+ .build()
+ ),
+ new TriggerWrapper(Trigger
+ .newBuilder()
+ .putAllAttributes(Collections.emptyMap())
+ .setDestination("this-is-my-destination")
+ .setId("1234-hello")
+ .build()
+ )
+ ),
+ // trigger's attributes are different
+ Arguments.of(
+ new TriggerWrapper(Trigger
+ .newBuilder()
+ .putAllAttributes(Map.of(
+ "specversion1",
+ "1.0",
+ "type",
+ "type_value"
+ ))
+ .setDestination("this-is-my-destination")
+ .setId("1234-hello")
+ .build()
+ ),
+ new TriggerWrapper(Trigger
+ .newBuilder()
+ .putAllAttributes(Map.of(
+ "specversion",
+ "1.0",
+ "type",
+ "type_value"
+ ))
+ .setDestination("this-is-my-destination")
+ .setId("1234-hello")
+ .build()
+ )
+ ),
+ Arguments.of(
+ new TriggerWrapper(Trigger
+ .newBuilder()
+ .putAllAttributes(Map.of(
+ "specversion",
+ "1.0",
+ "type",
+ "type_value1"
+ ))
+ .setDestination("this-is-my-destination")
+ .setId("1234-hello")
+ .build()
+ ),
+ new TriggerWrapper(Trigger
+ .newBuilder()
+ .putAllAttributes(Map.of(
+ "specversion",
+ "1.0",
+ "type",
+ "type_value"
+ ))
+ .setDestination("this-is-my-destination")
+ .setId("1234-hello")
+ .build()
+ )
+ ),
+ // trigger's id is different
+ Arguments.of(
+ new TriggerWrapper(Trigger
+ .newBuilder()
+ .putAllAttributes(Map.of(
+ "specversion",
+ "1.0",
+ "type",
+ "type_value"
+ ))
+ .setDestination("this-is-my-destination")
+ .setId("1234-hello1")
+ .build()
+ ),
+ new TriggerWrapper(Trigger
+ .newBuilder()
+ .putAllAttributes(Map.of(
+ "specversion",
+ "1.0",
+ "type",
+ "type_value"
+ ))
+ .setDestination("this-is-my-destination")
+ .setId("1234-hello")
+ .build()
+ )
+ )
+ );
+ }
+
+ public static Stream equalTriggersProvider() {
+ return Stream.of(
+ Arguments.of(
+ new TriggerWrapper(Trigger
+ .newBuilder()
+ .putAllAttributes(Map.of(
+ "specversion",
+ "1.0",
+ "type",
+ "type_value"
+ ))
+ .setDestination("this-is-my-destination")
+ .setId("1234-hello")
+ .build()
+ ),
+ new TriggerWrapper(Trigger
+ .newBuilder()
+ .putAllAttributes(Map.of(
+ "specversion",
+ "1.0",
+ "type",
+ "type_value"
+ ))
+ .setDestination("this-is-my-destination")
+ .setId("1234-hello")
+ .build()
+ )
+ ),
+ Arguments.of(
+ new TriggerWrapper(Trigger
+ .newBuilder()
+ .build()
+ ),
+ new TriggerWrapper(Trigger
+ .newBuilder()
+ .build()
+ )
+ ),
+ Arguments.of(
+ new TriggerWrapper(Trigger
+ .newBuilder()
+ .setId("1234")
+ .build()
+ ),
+ new TriggerWrapper(Trigger
+ .newBuilder()
+ .setId("1234")
+ .build()
+ )
+ ),
+ Arguments.of(
+ new TriggerWrapper(Trigger
+ .newBuilder()
+ .build()
+ ),
+ new TriggerWrapper(Trigger
+ .newBuilder()
+ .build()
+ )
+ ),
+ Arguments.of(
+ new TriggerWrapper(Trigger
+ .newBuilder()
+ .setDestination("dest")
+ .build()
+ ),
+ new TriggerWrapper(Trigger
+ .newBuilder()
+ .setDestination("dest")
+ .build()
+ )
+ ),
+ Arguments.of(
+ new TriggerWrapper(Trigger
+ .newBuilder()
+ .putAllAttributes(Map.of(
+ "specversion",
+ "1.0",
+ "type",
+ "type1"
+ ))
+ .build()
+ ),
+ new TriggerWrapper(Trigger
+ .newBuilder()
+ .putAllAttributes(Map.of(
+ "specversion",
+ "1.0",
+ "type",
+ "type1"
+ ))
+ .build()
+ )
+ )
+ );
+ }
+}
\ No newline at end of file
diff --git a/data-plane/core/src/test/java/dev/knative/eventing/kafka/broker/core/testing/utils/CoreObjects.java b/data-plane/core/src/test/java/dev/knative/eventing/kafka/broker/core/testing/utils/CoreObjects.java
new file mode 100644
index 000000000..9cd637451
--- /dev/null
+++ b/data-plane/core/src/test/java/dev/knative/eventing/kafka/broker/core/testing/utils/CoreObjects.java
@@ -0,0 +1,145 @@
+/*
+ * Copyright 2020 The Knative Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package dev.knative.eventing.kafka.broker.core.testing.utils;
+
+import dev.knative.eventing.kafka.broker.core.BrokerWrapper;
+import dev.knative.eventing.kafka.broker.core.TriggerWrapper;
+import dev.knative.eventing.kafka.broker.core.config.BrokersConfig.Broker;
+import dev.knative.eventing.kafka.broker.core.config.BrokersConfig.Brokers;
+import dev.knative.eventing.kafka.broker.core.config.BrokersConfig.Trigger;
+import io.cloudevents.CloudEvent;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.Arrays;
+import java.util.Map;
+
+public final class CoreObjects {
+
+ public static URL DESTINATION_URL;
+
+ static {
+ try {
+ DESTINATION_URL = new URL(
+ "http", "localhost", 44331, ""
+ );
+ } catch (final MalformedURLException e) {
+ e.printStackTrace();
+ }
+ }
+
+ public static final String DESTINATION = DESTINATION_URL.toString();
+
+ public static Brokers brokers() {
+ return Brokers.newBuilder()
+ .addBroker(broker1Unwrapped())
+ .addBroker(broker2Unwrapped())
+ .build();
+ }
+
+ public static dev.knative.eventing.kafka.broker.core.Broker broker1() {
+ return new BrokerWrapper(
+ broker1Unwrapped()
+ );
+ }
+
+ public static Broker broker1Unwrapped() {
+ return Broker.newBuilder()
+ .setDeadLetterSink(DESTINATION)
+ .setId("1-1234")
+ .setTopic("1-12345")
+ .addAllTriggers(Arrays.asList(
+ trigger11(),
+ trigger12()
+ ))
+ .build();
+ }
+
+ public static dev.knative.eventing.kafka.broker.core.Broker broker2() {
+ return new BrokerWrapper(
+ broker2Unwrapped()
+ );
+ }
+
+ public static Broker broker2Unwrapped() {
+ return Broker.newBuilder()
+ .setDeadLetterSink(DESTINATION)
+ .setId("2-1234")
+ .setTopic("2-12345")
+ .addAllTriggers(Arrays.asList(
+ trigger13(),
+ trigger14()
+ ))
+ .build();
+ }
+
+
+ public static dev.knative.eventing.kafka.broker.core.Trigger trigger1() {
+ return new TriggerWrapper(trigger11());
+ }
+
+ public static dev.knative.eventing.kafka.broker.core.Trigger trigger2() {
+ return new TriggerWrapper(trigger12());
+ }
+
+ public static dev.knative.eventing.kafka.broker.core.Trigger trigger3() {
+ return new TriggerWrapper(trigger13());
+ }
+
+ public static dev.knative.eventing.kafka.broker.core.Trigger trigger4() {
+ return new TriggerWrapper(trigger14());
+ }
+
+ public static Trigger trigger11() {
+ return Trigger.newBuilder()
+ .setId("1-1234567")
+ .setDestination(DESTINATION)
+ .putAllAttributes(Map.of(
+ "type", "dev.knative"
+ ))
+ .build();
+ }
+
+ public static Trigger trigger12() {
+ return Trigger.newBuilder()
+ .setId("2-1234567")
+ .setDestination(DESTINATION)
+ .putAllAttributes(Map.of(
+ "type", "dev.knative"
+ ))
+ .build();
+ }
+
+ public static Trigger trigger13() {
+ return Trigger.newBuilder()
+ .setId("3-1234567")
+ .setDestination(DESTINATION)
+ .putAllAttributes(Map.of(
+ "type", "dev.knative"
+ ))
+ .build();
+ }
+
+ public static Trigger trigger14() {
+ return Trigger.newBuilder()
+ .setId("4-1234567")
+ .setDestination(DESTINATION)
+ .putAllAttributes(Map.of(
+ "type", "dev.knative"
+ ))
+ .build();
+ }
+}
diff --git a/data-plane/dispatcher/pom.xml b/data-plane/dispatcher/pom.xml
new file mode 100644
index 000000000..5f382b912
--- /dev/null
+++ b/data-plane/dispatcher/pom.xml
@@ -0,0 +1,61 @@
+
+
+
+ data-plane
+ dev.knative.eventing.kafka.broker
+ 1.0-SNAPSHOT
+
+ 4.0.0
+
+ dispatcher
+
+
+
+
+ org.apache.maven.plugins
+ maven-shade-plugin
+ ${maven.shade.plugin.version}
+
+ true
+
+
+
+ package
+
+ shade
+
+
+
+
+
+ dev.knative.eventing.kafka.broker.dispatcher.Main
+
+
+
+
+
+
+
+
+
+
+
+
+ dev.knative.eventing.kafka.broker
+ core
+ 1.0-SNAPSHOT
+
+
+ dev.knative.eventing.kafka.broker
+ core
+ tests
+ test-jar
+ 1.0-SNAPSHOT
+ test
+
+
+
+
\ No newline at end of file
diff --git a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/BrokersManager.java b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/BrokersManager.java
new file mode 100644
index 000000000..ef5ced9f3
--- /dev/null
+++ b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/BrokersManager.java
@@ -0,0 +1,244 @@
+/*
+ * Copyright 2020 The Knative Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package dev.knative.eventing.kafka.broker.dispatcher;
+
+import dev.knative.eventing.kafka.broker.core.Broker;
+import dev.knative.eventing.kafka.broker.core.ObjectsReconciler;
+import dev.knative.eventing.kafka.broker.core.Trigger;
+import io.vertx.core.AbstractVerticle;
+import io.vertx.core.CompositeFuture;
+import io.vertx.core.Future;
+import io.vertx.core.Promise;
+import io.vertx.core.Vertx;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Brokers manager manges Broker and Trigger objects by instantiating and starting verticles based
+ * on brokers configurations.
+ *
+ *
Note: {@link BrokersManager} is not thread-safe and it's not supposed to be shared between
+ * threads.
+ *
+ * @param trigger type.
+ */
+public final class BrokersManager implements ObjectsReconciler {
+
+ private static final Logger logger = LoggerFactory.getLogger(BrokersManager.class);
+
+ // Broker -> Trigger -> AbstractVerticle
+ private final Map, AbstractVerticle>> brokers;
+
+ private final Vertx vertx;
+ private final ConsumerVerticleFactory consumerFactory;
+ private final int triggersInitialCapacity;
+
+ /**
+ * All args constructor.
+ *
+ * @param vertx vertx instance.
+ * @param consumerFactory consumer factory.
+ * @param brokersInitialCapacity brokers container initial capacity.
+ * @param triggersInitialCapacity triggers container initial capacity.
+ */
+ public BrokersManager(
+ final Vertx vertx,
+ final ConsumerVerticleFactory consumerFactory,
+ final int brokersInitialCapacity,
+ final int triggersInitialCapacity) {
+
+ Objects.requireNonNull(vertx, "provide vertx instance");
+ Objects.requireNonNull(consumerFactory, "provide consumer factory");
+ if (brokersInitialCapacity <= 0) {
+ throw new IllegalArgumentException("brokersInitialCapacity cannot be negative or 0");
+ }
+ if (triggersInitialCapacity <= 0) {
+ throw new IllegalArgumentException("triggersInitialCapacity cannot be negative or 0");
+ }
+ this.vertx = vertx;
+ this.consumerFactory = consumerFactory;
+ this.triggersInitialCapacity = triggersInitialCapacity;
+ brokers = new HashMap<>(brokersInitialCapacity);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ @SuppressWarnings("rawtypes")
+ public Future reconcile(final Map>> newObjects) {
+ final List futures = new ArrayList<>(newObjects.size() * 2);
+
+ // diffing previous and new --> remove deleted objects
+ for (final var brokersIt = brokers.entrySet().iterator(); brokersIt.hasNext(); ) {
+
+ final var brokerTriggers = brokersIt.next();
+ final var broker = brokerTriggers.getKey();
+
+ // check if the old broker has been deleted or updated.
+ if (!newObjects.containsKey(broker)) {
+
+ // broker deleted or updated, so remove it
+ brokersIt.remove();
+
+ // undeploy all verticles associated with triggers of the deleted broker.
+ for (final var e : brokerTriggers.getValue().entrySet()) {
+ futures.add(undeploy(broker, e.getKey(), e.getValue()));
+ }
+
+ continue;
+ }
+
+ // broker is there, so check if some triggers have been deleted.
+ final var triggersVerticles = brokerTriggers.getValue();
+ for (final var triggersIt = triggersVerticles.entrySet().iterator(); triggersIt.hasNext(); ) {
+
+ final var triggerVerticle = triggersIt.next();
+
+ // check if the trigger has been deleted or updated.
+ if (!newObjects.get(broker).contains(triggerVerticle.getKey())) {
+
+ // trigger deleted or updated, so remove it
+ triggersIt.remove();
+
+ // undeploy verticle associated with the deleted trigger.
+ futures.add(undeploy(
+ broker,
+ triggerVerticle.getKey(),
+ triggerVerticle.getValue()
+ ));
+ }
+ }
+ }
+
+ // add all new objects.
+ for (final var entry : newObjects.entrySet()) {
+ final var broker = entry.getKey();
+ for (final var trigger : entry.getValue()) {
+
+ futures.add(addBroker(broker, trigger));
+
+ }
+ }
+
+ return CompositeFuture.all(futures).mapEmpty();
+ }
+
+ private Future addBroker(final Broker broker, final Trigger trigger) {
+ final Map, AbstractVerticle> triggers;
+
+ if (brokers.containsKey(broker)) {
+ triggers = brokers.get(broker);
+ } else {
+ triggers = new ConcurrentHashMap<>(triggersInitialCapacity);
+ brokers.put(broker, triggers);
+ }
+
+ if (trigger == null || triggers.containsKey(trigger)) {
+ // the trigger is already there and it hasn't been updated.
+ return Future.succeededFuture();
+ }
+
+ final Promise promise = Promise.promise();
+
+ consumerFactory.get(broker, trigger)
+ .compose(verticle -> startVerticle(verticle, broker, triggers, trigger, promise))
+ .onSuccess(ignored -> promise.tryComplete())
+ .onFailure(cause -> {
+ // probably configuration are wrong, so do not retry.
+ logger.error(
+ "potential control-plane bug: failed to get verticle: {} - cause: {}",
+ trigger,
+ cause
+ );
+ promise.tryFail(cause);
+ });
+
+ return promise.future();
+ }
+
+ private Future startVerticle(
+ final AbstractVerticle verticle,
+ final Broker broker,
+ final Map, AbstractVerticle> triggers,
+ final Trigger trigger,
+ Promise promise) {
+
+ addTrigger(triggers, trigger, verticle)
+ .onSuccess(msg -> {
+ logger.info(
+ "verticle for trigger {} associated with broker {} deployed - message: {}",
+ trigger,
+ broker,
+ msg
+ );
+ promise.tryComplete();
+ })
+ .onFailure(cause -> {
+ // this is a bad state we cannot start the verticle for consuming messages.
+ logger.error(
+ "failed to start verticle for trigger {} associated with broker {} - cause {}",
+ trigger,
+ broker,
+ cause
+ );
+ promise.tryFail(cause);
+ });
+
+ return promise.future();
+ }
+
+ private Future addTrigger(
+ final Map, AbstractVerticle> triggers,
+ final Trigger trigger,
+ final AbstractVerticle verticle) {
+
+ triggers.put(trigger, verticle);
+ final Promise promise = Promise.promise();
+ vertx.deployVerticle(verticle, promise);
+ return promise.future();
+ }
+
+ private Future undeploy(Broker broker, Trigger trigger, AbstractVerticle verticle) {
+ final Promise promise = Promise.promise();
+
+ vertx.undeploy(verticle.deploymentID(), result -> {
+ if (result.succeeded()) {
+ logger.info("removed trigger {} associated with broker {}", trigger, broker);
+ promise.tryComplete();
+ return;
+ }
+
+ logger.error(
+ "failed to un-deploy verticle for trigger {} associated with broker {} - cause",
+ trigger,
+ broker,
+ result.cause()
+ );
+ promise.tryFail(result.cause());
+ });
+
+ return promise.future();
+ }
+}
diff --git a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/ConsumerRecordHandler.java b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/ConsumerRecordHandler.java
new file mode 100644
index 000000000..f3e41c034
--- /dev/null
+++ b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/ConsumerRecordHandler.java
@@ -0,0 +1,188 @@
+/*
+ * Copyright 2020 The Knative Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package dev.knative.eventing.kafka.broker.dispatcher;
+
+import dev.knative.eventing.kafka.broker.core.Filter;
+import io.vertx.core.Future;
+import io.vertx.core.Handler;
+import io.vertx.kafka.client.consumer.KafkaConsumerRecord;
+import java.util.Objects;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * ConsumerRecordHandler implements the core algorithm of the Dispatcher component (see {@link
+ * ConsumerRecordHandler#handle(KafkaConsumerRecord)}).
+ *
+ * @param type of records' key.
+ * @param type of records' value.
+ * @param type of the response of given senders.
+ * @see ConsumerRecordHandler#handle(KafkaConsumerRecord)
+ */
+public final class ConsumerRecordHandler implements
+ Handler> {
+
+ private static final Logger logger = LoggerFactory
+ .getLogger(ConsumerRecordHandler.class);
+
+ private static final String SUBSCRIBER = "subscriber";
+ private static final String DLQ = "dead letter queue";
+
+ private final Filter filter;
+ private final ConsumerRecordSender subscriberSender;
+ private final ConsumerRecordSender deadLetterQueueSender;
+ private final ConsumerRecordOffsetStrategy receiver;
+ private final SinkResponseHandler sinkResponseHandler;
+
+ /**
+ * All args constructor.
+ *
+ * @param subscriberSender sender to trigger subscriber
+ * @param filter event filter
+ * @param receiver hook receiver {@link ConsumerRecordOffsetStrategy}. It allows to
+ * plug in custom offset management depending on the success/failure
+ * during the algorithm.
+ * @param sinkResponseHandler handler of the response from {@code subscriberSender}
+ * @param deadLetterQueueSender sender to DLQ
+ */
+ public ConsumerRecordHandler(
+ final ConsumerRecordSender subscriberSender,
+ final Filter filter,
+ final ConsumerRecordOffsetStrategy receiver,
+ final SinkResponseHandler sinkResponseHandler,
+ final ConsumerRecordSender deadLetterQueueSender) {
+
+ Objects.requireNonNull(filter, "provide filter");
+ Objects.requireNonNull(subscriberSender, "provide subscriberSender");
+ Objects.requireNonNull(deadLetterQueueSender, "provide deadLetterQueueSender");
+ Objects.requireNonNull(receiver, "provider receiver");
+ Objects.requireNonNull(sinkResponseHandler, "provider sinkResponseHandler");
+
+ this.subscriberSender = subscriberSender;
+ this.filter = filter;
+ this.receiver = receiver;
+ this.deadLetterQueueSender = deadLetterQueueSender;
+ this.sinkResponseHandler = sinkResponseHandler;
+ }
+
+ /**
+ * Call this constructor when there is no DLQ configured on the broker.
+ *
+ * @param subscriberSender sender to trigger subscriber
+ * @param filter event filter
+ * @param receiver hook receiver {@link ConsumerRecordOffsetStrategy}. It allows to
+ * plug in custom offset management depending on the success/failure
+ * during the algorithm.
+ * @param sinkResponseHandler handler of the response
+ */
+ public ConsumerRecordHandler(
+ final ConsumerRecordSender subscriberSender,
+ final Filter filter,
+ final ConsumerRecordOffsetStrategy receiver,
+ final SinkResponseHandler sinkResponseHandler) {
+
+ this(
+ subscriberSender,
+ filter,
+ receiver,
+ sinkResponseHandler,
+ // If there is no DLQ configured by default DLQ sender always fails, which means
+ // implementors will receive failedToSendToDLQ if the subscriber sender fails.
+ record -> Future.failedFuture("no DLQ configured")
+ );
+ }
+
+ /**
+ * Handle the given record.
+ *
+ * @param record record to handle.
+ */
+ @Override
+ public void handle(final KafkaConsumerRecord record) {
+
+ receiver.recordReceived(record);
+
+ if (filter.match(record.value())) {
+ subscriberSender.send(record)
+ .compose(sinkResponseHandler::handle)
+ .onSuccess(response -> onSuccessfullySentToSubscriber(record))
+ .onFailure(cause -> onFailedToSendToSubscriber(record, cause));
+ } else {
+ receiver.recordDiscarded(record);
+ }
+ }
+
+ private void onSuccessfullySentToSubscriber(final KafkaConsumerRecord record) {
+ logSuccessfulSendTo(SUBSCRIBER, record);
+
+ receiver.successfullySentToSubscriber(record);
+ }
+
+ private void onFailedToSendToSubscriber(
+ final KafkaConsumerRecord record,
+ final Throwable cause) {
+
+ logFailedSendTo(SUBSCRIBER, record, cause);
+
+ deadLetterQueueSender.send(record)
+ .compose(sinkResponseHandler::handle)
+ .onSuccess(ignored -> onSuccessfullySentToDLQ(record))
+ .onFailure(ex -> onFailedToSendToDLQ(record, ex));
+ }
+
+ private void onSuccessfullySentToDLQ(final KafkaConsumerRecord record) {
+ logSuccessfulSendTo(DLQ, record);
+
+ receiver.successfullySentToDLQ(record);
+ }
+
+ private void onFailedToSendToDLQ(KafkaConsumerRecord record, Throwable ex) {
+ logFailedSendTo(DLQ, record, ex);
+
+ receiver.failedToSendToDLQ(record, ex);
+ }
+
+ private static void logFailedSendTo(
+ final String component,
+ final KafkaConsumerRecord record,
+ final Throwable cause) {
+
+ logger.error(
+ "{} sender failed to send record - topic: {} - partition: {} - offset: {} - cause: {}",
+ component,
+ record.topic(),
+ record.partition(),
+ record.offset(),
+ cause
+ );
+ }
+
+ private static void logSuccessfulSendTo(
+ final String component,
+ final KafkaConsumerRecord record) {
+
+ if (logger.isDebugEnabled()) {
+ logger.debug(
+ "record successfully handled by {} - topic: {} - partition: {} - offset: {}",
+ component,
+ record.topic(),
+ record.partition(),
+ record.offset()
+ );
+ }
+ }
+}
diff --git a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/ConsumerRecordOffsetStrategy.java b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/ConsumerRecordOffsetStrategy.java
new file mode 100644
index 000000000..abbcf2534
--- /dev/null
+++ b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/ConsumerRecordOffsetStrategy.java
@@ -0,0 +1,58 @@
+/*
+ * Copyright 2020 The Knative Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package dev.knative.eventing.kafka.broker.dispatcher;
+
+import io.vertx.kafka.client.consumer.KafkaConsumerRecord;
+
+public interface ConsumerRecordOffsetStrategy {
+
+ /**
+ * The given record has been received.
+ *
+ * @param record record received.
+ */
+ void recordReceived(KafkaConsumerRecord record);
+
+ /**
+ * The given record has been successfully sent to subscriber.
+ *
+ * @param record record sent to subscriber.
+ */
+ void successfullySentToSubscriber(KafkaConsumerRecord record);
+
+ /**
+ * The given record has been successfully sent to dead letter queue.
+ *
+ * @param record record sent to dead letter queue.
+ */
+ void successfullySentToDLQ(KafkaConsumerRecord record);
+
+ /**
+ * The given record cannot be delivered to dead letter queue.
+ *
+ * @param record record undeliverable to dead letter queue.
+ * @param ex exception occurred.
+ */
+ void failedToSendToDLQ(KafkaConsumerRecord record, Throwable ex);
+
+ /**
+ * The given event doesn't pass the filter.
+ *
+ * @param record record discarded.
+ */
+ void recordDiscarded(KafkaConsumerRecord record);
+}
diff --git a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/ConsumerRecordOffsetStrategyFactory.java b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/ConsumerRecordOffsetStrategyFactory.java
new file mode 100644
index 000000000..2a652d53c
--- /dev/null
+++ b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/ConsumerRecordOffsetStrategyFactory.java
@@ -0,0 +1,37 @@
+/*
+ * Copyright 2020 The Knative Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package dev.knative.eventing.kafka.broker.dispatcher;
+
+import dev.knative.eventing.kafka.broker.core.Broker;
+import dev.knative.eventing.kafka.broker.core.Trigger;
+import io.vertx.kafka.client.consumer.KafkaConsumer;
+
+public interface ConsumerRecordOffsetStrategyFactory {
+
+ static ConsumerRecordOffsetStrategyFactory create() {
+ return new ConsumerRecordOffsetStrategyFactory<>() {
+ };
+ }
+
+ default ConsumerRecordOffsetStrategy get(
+ final KafkaConsumer consumer,
+ final Broker broker,
+ final Trigger trigger) {
+
+ return new UnorderedConsumerRecordOffsetStrategy<>(consumer);
+ }
+}
diff --git a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/ConsumerRecordSender.java b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/ConsumerRecordSender.java
new file mode 100644
index 000000000..35c774b6e
--- /dev/null
+++ b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/ConsumerRecordSender.java
@@ -0,0 +1,32 @@
+/*
+ * Copyright 2020 The Knative Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package dev.knative.eventing.kafka.broker.dispatcher;
+
+import io.vertx.core.Future;
+import io.vertx.kafka.client.consumer.KafkaConsumerRecord;
+
+@FunctionalInterface
+public interface ConsumerRecordSender {
+
+ /**
+ * Send the given record. (the record passed the filter)
+ *
+ * @param record record to send
+ * @return a successful future or a failed future.
+ */
+ Future send(KafkaConsumerRecord record);
+}
diff --git a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/ConsumerVerticle.java b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/ConsumerVerticle.java
new file mode 100644
index 000000000..ea46d4667
--- /dev/null
+++ b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/ConsumerVerticle.java
@@ -0,0 +1,77 @@
+/*
+ * Copyright 2020 The Knative Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package dev.knative.eventing.kafka.broker.dispatcher;
+
+import io.vertx.core.AbstractVerticle;
+import io.vertx.core.Handler;
+import io.vertx.core.Promise;
+import io.vertx.kafka.client.consumer.KafkaConsumer;
+import io.vertx.kafka.client.consumer.KafkaConsumerRecord;
+import java.util.Objects;
+
+/**
+ * ConsumerVerticle is responsible for manging the consumer lifecycle.
+ *
+ * @param record key type.
+ * @param record value type.
+ */
+public final class ConsumerVerticle extends AbstractVerticle {
+
+ private final KafkaConsumer consumer;
+ private final String topic;
+ private final Handler> recordHandler;
+
+ /**
+ * All args constructor.
+ *
+ * @param consumer Kafka consumer.
+ * @param topic topic to consume.
+ * @param recordHandler handler of consumed Kafka records.
+ */
+ public ConsumerVerticle(
+ final KafkaConsumer consumer,
+ final String topic,
+ final Handler> recordHandler) {
+
+ Objects.requireNonNull(consumer, "provide consumer");
+ Objects.requireNonNull(topic, "provide topic");
+ Objects.requireNonNull(recordHandler, "provide record handler");
+
+ this.recordHandler = recordHandler;
+ this.consumer = consumer;
+ this.topic = topic;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void start(Promise startPromise) {
+
+ consumer.handler(recordHandler);
+
+ consumer.subscribe(topic, startPromise);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void stop(Promise stopPromise) {
+ consumer.close(stopPromise);
+ }
+}
diff --git a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/ConsumerVerticleFactory.java b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/ConsumerVerticleFactory.java
new file mode 100644
index 000000000..784f01fef
--- /dev/null
+++ b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/ConsumerVerticleFactory.java
@@ -0,0 +1,37 @@
+/*
+ * Copyright 2020 The Knative Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package dev.knative.eventing.kafka.broker.dispatcher;
+
+import dev.knative.eventing.kafka.broker.core.Broker;
+import dev.knative.eventing.kafka.broker.core.Trigger;
+import io.vertx.core.AbstractVerticle;
+import io.vertx.core.Future;
+
+/**
+ * ConsumerVerticleFactory is responsible for instantiating consumer verticles.
+ */
+@FunctionalInterface
+public interface ConsumerVerticleFactory {
+
+ /**
+ * Get a new consumer verticle.
+ *
+ * @param trigger trigger data.
+ * @return a new consumer verticle.
+ */
+ Future get(final Broker broker, final Trigger trigger);
+}
diff --git a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/Main.java b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/Main.java
new file mode 100644
index 000000000..4b26462b7
--- /dev/null
+++ b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/Main.java
@@ -0,0 +1,140 @@
+/*
+ * Copyright 2020 The Knative Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package dev.knative.eventing.kafka.broker.dispatcher;
+
+import dev.knative.eventing.kafka.broker.core.ObjectsCreator;
+import dev.knative.eventing.kafka.broker.dispatcher.file.FileWatcher;
+import dev.knative.eventing.kafka.broker.dispatcher.http.HttpConsumerVerticleFactory;
+import io.cloudevents.CloudEvent;
+import io.vertx.config.ConfigRetriever;
+import io.vertx.config.ConfigRetrieverOptions;
+import io.vertx.config.ConfigStoreOptions;
+import io.vertx.core.Future;
+import io.vertx.core.Vertx;
+import io.vertx.core.json.JsonObject;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.nio.file.FileSystems;
+import java.util.Properties;
+import java.util.concurrent.ArrayBlockingQueue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class Main {
+
+ private static final Logger logger = LoggerFactory.getLogger(Main.class);
+
+ private static final String BROKERS_TRIGGERS_PATH = "BROKERS_TRIGGERS_PATH";
+ private static final String PRODUCER_CONFIG_FILE_PATH = "PRODUCER_CONFIG_FILE_PATH";
+ private static final String CONSUMER_CONFIG_FILE_PATH = "CONSUMER_CONFIG_FILE_PATH";
+ private static final String BROKERS_INITIAL_CAPACITY = "BROKERS_INITIAL_CAPACITY";
+ private static final String TRIGGERS_INITIAL_CAPACITY = "TRIGGERS_INITIAL_CAPACITY";
+
+ /**
+ * Dispatcher entry point.
+ *
+ * @param args command line arguments.
+ */
+ public static void main(final String[] args) {
+
+ final var vertx = Vertx.vertx();
+ Runtime.getRuntime().addShutdownHook(new Thread(vertx::close));
+
+ final JsonObject json;
+ try {
+ json = getConfigurations(vertx);
+ } catch (InterruptedException e) {
+ System.exit(1);
+ return;
+ }
+
+ final var producerConfigs = config(json.getString(PRODUCER_CONFIG_FILE_PATH));
+ final var consumerConfigs = config(json.getString(CONSUMER_CONFIG_FILE_PATH));
+
+ final ConsumerRecordOffsetStrategyFactory
+ consumerRecordOffsetStrategyFactory = ConsumerRecordOffsetStrategyFactory.create();
+
+ final var consumerVerticleFactory = new HttpConsumerVerticleFactory(
+ consumerRecordOffsetStrategyFactory,
+ consumerConfigs,
+ vertx.createHttpClient(),
+ vertx,
+ producerConfigs
+ );
+
+ final var brokersManager = new BrokersManager<>(
+ vertx,
+ consumerVerticleFactory,
+ Integer.parseInt(json.getString(BROKERS_INITIAL_CAPACITY)),
+ Integer.parseInt(json.getString(TRIGGERS_INITIAL_CAPACITY))
+ );
+
+ final var objectCreator = new ObjectsCreator(brokersManager);
+
+ try {
+ final var fw = new FileWatcher(
+ FileSystems.getDefault().newWatchService(),
+ objectCreator,
+ new File(json.getString(BROKERS_TRIGGERS_PATH))
+ );
+
+ fw.watch(); // block forever
+
+ } catch (InterruptedException | IOException ex) {
+ logger.error("failed during filesystem watch", ex);
+ }
+ }
+
+ private static Properties config(final String path) {
+ if (path == null) {
+ return new Properties();
+ }
+
+ final var consumerConfigs = new Properties();
+ try (final var configReader = new FileReader(path)) {
+ consumerConfigs.load(configReader);
+ } catch (IOException e) {
+ logger.error("failed to load configurations from file {} - cause {}", path, e);
+ }
+
+ return consumerConfigs;
+ }
+
+ private static JsonObject getConfigurations(final Vertx vertx) throws InterruptedException {
+
+ final var envConfigs = new ConfigStoreOptions()
+ .setType("env")
+ .setOptional(false)
+ .setConfig(new JsonObject().put("raw-data", true));
+
+ final var configRetrieverOptions = new ConfigRetrieverOptions()
+ .addStore(envConfigs);
+
+ final var configRetriever = ConfigRetriever.create(vertx, configRetrieverOptions);
+
+ final var waitConfigs = new ArrayBlockingQueue(1);
+ Future.future(configRetriever::getConfig)
+ .onSuccess(waitConfigs::add)
+ .onFailure(cause -> {
+ logger.error("failed to retrieve configurations", cause);
+ vertx.close(ignored -> System.exit(1));
+ });
+
+ return waitConfigs.take();
+ }
+}
diff --git a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/SinkResponseHandler.java b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/SinkResponseHandler.java
new file mode 100644
index 000000000..b15c77670
--- /dev/null
+++ b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/SinkResponseHandler.java
@@ -0,0 +1,25 @@
+/*
+ * Copyright 2020 The Knative Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package dev.knative.eventing.kafka.broker.dispatcher;
+
+import io.vertx.core.Future;
+
+@FunctionalInterface
+public interface SinkResponseHandler {
+
+ Future