diff --git a/CHANGELOG.md b/CHANGELOG.md index c80a957..25515ac 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,14 @@ # Changelog +## Mocked Streams 2.0 + +* Build against Apache Kafka 2.0 +* Changes to support Kafka 2.0 +* Replaced ProcessorTopologyTestDriver with TopologyTestDriver +* Removed Record class to use ConsumerRecord directly +* Added Michal Dziemianko to CONTRIBUTORS.md +* Thanks to Michal for Kafka 2.0 support + ## Mocked Streams 1.8.0 * Bumping versions of dependencies diff --git a/CONTRIBUTORS.md b/CONTRIBUTORS.md index ba96900..2c677cf 100644 --- a/CONTRIBUTORS.md +++ b/CONTRIBUTORS.md @@ -4,3 +4,4 @@ * Jendrik Poloczek * Svend Vanderveken * Daniel Wojda +* Michal Dziemianko diff --git a/README.md b/README.md index 7cc6ee8..cc378a2 100644 --- a/README.md +++ b/README.md @@ -4,14 +4,15 @@ [![codecov](https://codecov.io/gh/jpzk/mockedstreams/branch/master/graph/badge.svg)](https://codecov.io/gh/jpzk/mockedstreams) [![License](http://img.shields.io/:license-Apache%202-grey.svg)](http://www.apache.org/licenses/LICENSE-2.0.txt) [![GitHub stars](https://img.shields.io/github/stars/jpzk/mockedstreams.svg?style=flat)](https://github.com/jpzk/mockedstreams/stargazers) -Mocked Streams 1.8.0 [(git)](https://github.com/jpzk/mockedstreams) is a library for Scala 2.11 and 2.12 which allows you to **unit-test processing topologies** of [Kafka Streams](https://kafka.apache.org/documentation#streams) applications (since Apache Kafka >=0.10.1) **without Zookeeper and Kafka Brokers**. Further, you can use your favourite Scala testing framework e.g. [ScalaTest](http://www.scalatest.org/) and [Specs2](https://etorreborre.github.io/specs2/). Mocked Streams is located at the Maven Central Repository, therefore you just have to add the following to your [SBT dependencies](http://www.scala-sbt.org/0.13/docs/Library-Dependencies.html): +Mocked Streams 2.0.0 [(git)](https://github.com/jpzk/mockedstreams) is a library for Scala 2.11 and 2.12 which allows you to **unit-test processing topologies** of [Kafka Streams](https://kafka.apache.org/documentation#streams) applications (since Apache Kafka >=0.10.1) **without Zookeeper and Kafka Brokers**. Further, you can use your favourite Scala testing framework e.g. [ScalaTest](http://www.scalatest.org/) and [Specs2](https://etorreborre.github.io/specs2/). Mocked Streams is located at the Maven Central Repository, therefore you just have to add the following to your [SBT dependencies](http://www.scala-sbt.org/0.13/docs/Library-Dependencies.html): - libraryDependencies += "com.madewithtea" %% "mockedstreams" % "1.8.0" % "test" + libraryDependencies += "com.madewithtea" %% "mockedstreams" % "2.0.0" % "test" ## Apache Kafka Compatibility | Mocked Streams Version | Apache Kafka Version | |------------- |-------------| +| 2.0.0 | 2.0.0.0 | | 1.8.0 | 1.1.1.0 | | 1.7.0 | 1.1.0.0 | | 1.6.0 | 1.0.1.0 | @@ -26,7 +27,7 @@ Mocked Streams 1.8.0 [(git)](https://github.com/jpzk/mockedstreams) is a library ## Simple Example -It wraps the [org.apache.kafka.test.ProcessorTopologyTestDriver](https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java) class, but adds more syntactic sugar to keep your test code simple: +It wraps the [org.apache.kafka.streams.TopologyTestDriver](https://github.com/apache/kafka/blob/trunk/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java) class, but adds more syntactic sugar to keep your test code simple: import com.madewithtea.mockedstreams.MockedStreams @@ -95,7 +96,7 @@ When you define your state stores via .stores(stores: Seq[String]) since 1.2 and import com.madewithtea.mockedstreams.MockedStreams val props = new Properties - props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, + props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, classOf[TimestampExtractors.CustomTimestampExtractor].getName) val mstreams = MockedStreams() @@ -114,7 +115,7 @@ Sometimes you need to pass a custom configuration to Kafka Streams: import com.madewithtea.mockedstreams.MockedStreams val props = new Properties - props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, classOf[CustomExtractor].getName) + props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, classOf[CustomExtractor].getName) val mstreams = MockedStreams() .topology { builder => builder.stream(...) [...] } diff --git a/build.sbt b/build.sbt index 19175da..3c51c3d 100644 --- a/build.sbt +++ b/build.sbt @@ -1,7 +1,7 @@ lazy val commonSettings = Seq( organization := "com.madewithtea", - version := "1.8.0", + version := "2.0.0", scalaVersion := "2.12.6", crossScalaVersions := Seq("2.12.6","2.11.12"), description := "Topology Unit-Testing Library for Apache Kafka / Kafka Streams", @@ -10,15 +10,17 @@ lazy val commonSettings = Seq( val scalaTestVersion = "3.0.5" val rocksDBVersion = "5.14.2" -val kafkaVersion = "1.1.1" +val kafkaVersion = "2.0.0" lazy val kafka = Seq( - "org.apache.kafka" % "kafka-clients" % kafkaVersion, - "org.apache.kafka" % "kafka-clients" % kafkaVersion classifier "test", - "org.apache.kafka" % "kafka-streams" % kafkaVersion, - "org.apache.kafka" % "kafka-streams" % kafkaVersion classifier "test", - "org.apache.kafka" %% "kafka" % kafkaVersion - ) + "javax.ws.rs" % "javax.ws.rs-api" % "2.1" jar(), + "org.apache.kafka" % "kafka-clients" % kafkaVersion, + "org.apache.kafka" % "kafka-clients" % kafkaVersion classifier "test", + "org.apache.kafka" % "kafka-streams" % kafkaVersion, + "org.apache.kafka" %% "kafka-streams-scala" % kafkaVersion, + "org.apache.kafka" % "kafka-streams-test-utils" % kafkaVersion, + "org.apache.kafka" %% "kafka" % kafkaVersion +) lazy val scalaTest = "org.scalatest" %% "scalatest" % scalaTestVersion % "test" lazy val rocksDB = "org.rocksdb" % "rocksdbjni" % rocksDBVersion % "test" diff --git a/project/build.properties b/project/build.properties index b7dd3cb..d6e3507 100644 --- a/project/build.properties +++ b/project/build.properties @@ -1 +1 @@ -sbt.version=1.0.2 +sbt.version=1.1.6 diff --git a/src/main/scala/MockedStreams.scala b/src/main/scala/MockedStreams.scala index 8886251..0d1cb7a 100644 --- a/src/main/scala/MockedStreams.scala +++ b/src/main/scala/MockedStreams.scala @@ -14,14 +14,16 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package com.madewithtea.mockedstreams import java.util.{Properties, UUID} +import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.serialization.Serde -import org.apache.kafka.streams.{StreamsBuilder, StreamsConfig, Topology} import org.apache.kafka.streams.state.ReadOnlyWindowStore -import org.apache.kafka.test.{ProcessorTopologyTestDriver => Driver} +import org.apache.kafka.streams.test.ConsumerRecordFactory +import org.apache.kafka.streams.{StreamsBuilder, StreamsConfig, Topology, TopologyTestDriver => Driver} import scala.collection.JavaConverters._ @@ -29,16 +31,14 @@ object MockedStreams { def apply() = Builder() - case class Record(topic: String, key: Array[Byte], value: Array[Byte]) - case class Builder(topology: Option[() => Topology] = None, configuration: Properties = new Properties(), stateStores: Seq[String] = Seq(), - inputs: List[Record] = List.empty) { + inputs: List[ConsumerRecord[Array[Byte], Array[Byte]]] = List.empty) { def config(configuration: Properties) = this.copy(configuration = configuration) - def topology(func: (StreamsBuilder => Unit)) = { + def topology(func: StreamsBuilder => Unit) = { val buildTopology = () => { val builder = new StreamsBuilder() func(builder) @@ -55,9 +55,11 @@ object MockedStreams { val keySer = key.serializer val valSer = value.serializer + val factory = new ConsumerRecordFactory[K, V](keySer, valSer) + val updatedRecords = newRecords.foldLeft(inputs) { case (events, (k, v)) => - val newRecord = Record(topic, keySer.serialize(topic, k), valSer.serialize(topic, v)) + val newRecord = factory.create(topic, k, v) events :+ newRecord } @@ -97,22 +99,15 @@ object MockedStreams { // state store is temporarily created in ProcessorTopologyTestDriver private def stream = { - val props: java.util.Map[Object, Object] = new Properties + val props = new Properties props.put(StreamsConfig.APPLICATION_ID_CONFIG, s"mocked-${UUID.randomUUID().toString}") props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092") props.putAll(configuration) - - val t = topology.getOrElse(throw new NoTopologySpecified) - - new Driver(new StreamsConfig(props), t()) + new Driver(topology.getOrElse(throw new NoTopologySpecified)(), props) } - private def produce(driver: Driver): Unit = { - inputs.foreach { - case Record(topic, key, value) => - driver.process(topic, key, value) - } - } + private def produce(driver: Driver): Unit = + inputs.foreach(driver.pipeInput) private def withProcessedDriver[T](f: Driver => T): T = { if(inputs.isEmpty) diff --git a/src/test/scala/MockedStreamsSpec.scala b/src/test/scala/MockedStreamsSpec.scala index 4e15b62..91d2612 100644 --- a/src/test/scala/MockedStreamsSpec.scala +++ b/src/test/scala/MockedStreamsSpec.scala @@ -14,6 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package com.madewithtea.mockedstreams import org.apache.kafka.clients.consumer.ConsumerRecord @@ -140,7 +141,7 @@ class MockedStreamsSpec extends FlatSpec with Matchers { import java.util.Properties val props = new Properties - props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, + props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, classOf[TimestampExtractors.CustomTimestampExtractor].getName) val builder = MockedStreams() @@ -197,14 +198,14 @@ class MockedStreamsSpec extends FlatSpec with Matchers { val strings = Serdes.String() val serdes = Consumed.`with`(strings, strings) - + val InputTopic = "input" val OutputTopic = "output" def topology(builder: StreamsBuilder) = { builder.stream(InputTopic, serdes) .map[String, String]((k, v) => new KeyValue(k, v.toUpperCase)) - .to(strings, strings, OutputTopic) + .to(OutputTopic, Produced.`with`(strings, strings)) } } @@ -236,59 +237,63 @@ class MockedStreamsSpec extends FlatSpec with Matchers { val streamA = builder.stream(InputATopic, serdes) val streamB = builder.stream(InputBTopic, serdes) - val table = streamA.groupByKey(strings, ints).aggregate( - new LastInitializer, - new LastAggregator, ints, StoreName) + val table = streamA.groupByKey(Serialized.`with`(strings, ints)) + .aggregate( + new LastInitializer, + new LastAggregator, + Materialized.as(StoreName).withKeySerde(strings).withValueSerde(ints) + ) - streamB.leftJoin[Integer, Integer](table, new AddJoiner(), strings, ints) - .to(strings, ints, OutputATopic) + streamB.leftJoin[Integer, Integer](table, new AddJoiner(), Joined.`with`(strings, ints, ints)) + .to(OutputATopic, Produced.`with`(strings, ints)) } def topology1WindowOutput(builder: StreamsBuilder) = { val streamA = builder.stream(InputCTopic, serdes) - streamA.groupByKey(strings, ints).count( - TimeWindows.of(1), - StoreName) + streamA.groupByKey(Serialized.`with`(strings, ints)) + .windowedBy(TimeWindows.of(1)) + .count(Materialized.as(StoreName)) } def topology2Output(builder: StreamsBuilder) = { val streamA = builder.stream(InputATopic, serdes) val streamB = builder.stream(InputBTopic, serdes) - val table = streamA.groupByKey(strings, ints).aggregate( + val table = streamA.groupByKey(Serialized.`with`(strings, ints)).aggregate( new LastInitializer, new LastAggregator, - ints, - StoreName) + Materialized.as(StoreName).withKeySerde(strings).withValueSerde(ints)) - streamB.leftJoin[Integer, Integer](table, new AddJoiner(), strings, ints) - .to(strings, ints, OutputATopic) + streamB.leftJoin[Integer, Integer](table, new AddJoiner(), Joined.`with`(strings, ints, ints)) + .to(OutputATopic, Produced.`with`(strings, ints)) - streamB.leftJoin[Integer, Integer](table, new SubJoiner(), strings, ints) - .to(strings, ints, OutputBTopic) + streamB.leftJoin[Integer, Integer](table, new SubJoiner(), Joined.`with`(strings, ints, ints)) + .to(OutputBTopic, Produced.`with`(strings, ints)) } def topologyTables(builder: StreamsBuilder) = { val streamA = builder.stream(InputATopic, serdes) val streamB = builder.stream(InputBTopic, serdes) - val tableA = streamA.groupByKey(strings, ints).aggregate( - new LastInitializer, - new LastAggregator, - ints, - StoreName) + val tableA = streamA.groupByKey(Serialized.`with`(strings, ints)) + .aggregate( + new LastInitializer, + new LastAggregator, + Materialized.as(StoreName).withKeySerde(strings).withValueSerde(ints) + ) - val tableB = streamB.groupByKey(strings, ints).aggregate( - new LastInitializer, - new LastAggregator, - ints, - Store2Name) + val tableB = streamB.groupByKey(Serialized.`with`(strings, ints)) + .aggregate( + new LastInitializer, + new LastAggregator, + Materialized.as(Store2Name).withKeySerde(strings).withValueSerde(ints) + ) val resultTable = tableA.join[Integer,Integer](tableB, new AddJoiner) resultTable .toStream - .to(strings, ints, OutputATopic) + .to(OutputATopic, Produced.`with`(strings, ints)) } }