From f1d376d961a80a26ce7805ccaa65e53f4232d6e3 Mon Sep 17 00:00:00 2001 From: Jendrik Poloczek Date: Fri, 24 Feb 2017 10:33:20 +0100 Subject: [PATCH 01/14] removed logging dependency log4j, slf4j, compat. type parameter compat. with 0.10.2 issue fixed; added scala 2.12.1 to cross compilation; updated scala test dependency; updated rocksdb depedency; changed version to 1.2.0 --- build.sbt | 18 +++++++----------- src/test/scala/MockedStreamsSpec.scala | 8 ++++---- 2 files changed, 11 insertions(+), 15 deletions(-) diff --git a/build.sbt b/build.sbt index 1c7c200..01452bb 100644 --- a/build.sbt +++ b/build.sbt @@ -1,17 +1,16 @@ + lazy val commonSettings = Seq( organization := "com.madewithtea", - version := "1.1.0", + version := "1.2.0", scalaVersion := "2.11.8", - crossScalaVersions := Seq("2.12.0", "2.11.8"), + crossScalaVersions := Seq("2.12.1","2.12.0", "2.11.8"), description := "Topology Unit-Testing Library for Apache Kafka / Kafka Streams", organizationHomepage := Some(url("https://www.madewithtea.com")), scalacOptions := Seq("-Xexperimental")) -val log4jVersion = "1.2.17" -val slf4jVersion = "1.7.21" -val scalaTestVersion = "2.2.6" -val rocksDBVersion = "4.11.2" -val kafkaVersion = "0.10.1.1" +val scalaTestVersion = "3.0.1" +val rocksDBVersion = "5.0.1" +val kafkaVersion = "0.10.2.0" lazy val kafka = Seq( "org.apache.kafka" % "kafka-clients" % kafkaVersion, @@ -23,9 +22,6 @@ lazy val kafka = Seq( lazy val scalaTest = "org.scalatest" %% "scalatest" % scalaTestVersion % "test" lazy val rocksDB = "org.rocksdb" % "rocksdbjni" % rocksDBVersion % "test" -lazy val logging = Seq("log4j" % "log4j" % log4jVersion % "test", - "org.slf4j" % "slf4j-api" % slf4jVersion % "test", - "org.slf4j" % "slf4j-log4j12" % slf4jVersion % "test") lazy val mockedstreams = (project in file(".")). settings(commonSettings: _*). @@ -33,7 +29,7 @@ lazy val mockedstreams = (project in file(".")). libraryDependencies ++= Seq( scalaTest, rocksDB - ) ++ kafka ++ logging + ) ++ kafka ) publishTo := { diff --git a/src/test/scala/MockedStreamsSpec.scala b/src/test/scala/MockedStreamsSpec.scala index 5b98e97..77e214c 100644 --- a/src/test/scala/MockedStreamsSpec.scala +++ b/src/test/scala/MockedStreamsSpec.scala @@ -136,7 +136,7 @@ class MockedStreamsSpec extends FlatSpec with Matchers { def topology(builder: KStreamBuilder) = { builder.stream(strings, strings, InputTopic) - .map((k, v) => new KeyValue(k, v.toUpperCase)) + .map[String, String]((k, v) => new KeyValue(k, v.toUpperCase)) .to(strings, strings, OutputTopic) } } @@ -167,7 +167,7 @@ class MockedStreamsSpec extends FlatSpec with Matchers { new LastInitializer, new LastAggregator, ints, StoreName) - streamB.leftJoin(table, new AddJoiner(), strings, ints) + streamB.leftJoin[Integer, Integer](table, new AddJoiner(), strings, ints) .to(strings, ints, OutputATopic) } @@ -181,10 +181,10 @@ class MockedStreamsSpec extends FlatSpec with Matchers { ints, StoreName) - streamB.leftJoin(table, new AddJoiner(), strings, ints) + streamB.leftJoin[Integer, Integer](table, new AddJoiner(), strings, ints) .to(strings, ints, OutputATopic) - streamB.leftJoin(table, new SubJoiner(), strings, ints) + streamB.leftJoin[Integer, Integer](table, new SubJoiner(), strings, ints) .to(strings, ints, OutputBTopic) } } From 3213397b9fcca79244c001660f08107d8d31d31d Mon Sep 17 00:00:00 2001 From: Jendrik Poloczek Date: Fri, 24 Feb 2017 10:59:07 +0100 Subject: [PATCH 02/14] added change log entry; added CONTRIBUTORS file; updated README, minor change in spec --- CHANGELOG | 10 ++++++++++ CONTRIBUTORS | 5 +++++ README.md | 11 +++++++++++ src/test/scala/MockedStreamsSpec.scala | 3 +-- 4 files changed, 27 insertions(+), 2 deletions(-) create mode 100644 CONTRIBUTORS diff --git a/CHANGELOG b/CHANGELOG index da271c4..64987b6 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,5 +1,15 @@ # Change Log +## Mocked Streams 1.2.0 + +* Build against Apache Kafka 0.10.2 +* Added support for Scala 2.12.1 +* Added .stateTable method for retrieving the contant of the state store as Map +* Added contributors file +* Removed dependencies to Log4j and Slf4j +* Updated RocksDB version to 5.0.1 +* Updated ScalaTest version to 3.0.1 + ## Mocked Streams 1.1.0 * Build against Apache Kafka 0.10.1.1 diff --git a/CONTRIBUTORS b/CONTRIBUTORS new file mode 100644 index 0000000..d6887f9 --- /dev/null +++ b/CONTRIBUTORS @@ -0,0 +1,5 @@ +# Contributors + +Hamidreza Afzali +Jendrik Poloczek + diff --git a/README.md b/README.md index 1d38764..1aa5eaa 100644 --- a/README.md +++ b/README.md @@ -11,6 +11,7 @@ Mocked Streams 1.0 [(git)](https://github.com/jpzk/mockedstreams) is a library f | Mocked Streams Version | Apache Kafka Version | | ------------- |-------------| +| 1.2.0 | 0.10.2 | | 1.1.0 | 0.10.1.1 | | 1.0.0 | 0.10.1.0 | @@ -44,6 +45,16 @@ It also allows you to have multiple input and output streams. If your topology u mstreams.output("out-a", strings, ints, expA.size) shouldEqual(expectedA) mstreams.output("out-b", strings, ints, expB.size) shouldEqual(expectedB) + +## Testing the State Store Content + + val mstreams = MockedStreams() + .topology { builder => builder.stream(...) [...] } + .input("in-a", strings, ints, inputA) + .input("in-b", strings, ints, inputB) + .stores(Seq("store-name")) + + mstreams.stateTable("store-name") shouldEqual Map('a' -> 1) ## Custom Streams Configuration diff --git a/src/test/scala/MockedStreamsSpec.scala b/src/test/scala/MockedStreamsSpec.scala index 77e214c..90c43ae 100644 --- a/src/test/scala/MockedStreamsSpec.scala +++ b/src/test/scala/MockedStreamsSpec.scala @@ -102,8 +102,7 @@ class MockedStreamsSpec extends FlatSpec with Matchers { builder.output(OutputBTopic, strings, ints, expectedB.size) .shouldEqual(expectedB) - builder.stateTable(StoreName) - .shouldEqual(inputA.toMap) + builder.stateTable(StoreName) shouldEqual inputA.toMap } class LastInitializer extends Initializer[Integer] { From 90663aac8379596ad97b6558560a1201d0aadd95 Mon Sep 17 00:00:00 2001 From: Jendrik Poloczek Date: Fri, 24 Feb 2017 11:02:45 +0100 Subject: [PATCH 03/14] fixed minor things in README --- README.md | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/README.md b/README.md index 1aa5eaa..7d6009f 100644 --- a/README.md +++ b/README.md @@ -3,15 +3,15 @@ [![Build Status](https://travis-ci.org/jpzk/mockedstreams.svg?branch=master)](https://travis-ci.org/jpzk/mockedstreams) [![License](http://img.shields.io/:license-Apache%202-grey.svg)](http://www.apache.org/licenses/LICENSE-2.0.txt) [![GitHub stars](https://img.shields.io/github/stars/jpzk/mockedstreams.svg?style=flat)](https://github.com/jpzk/mockedstreams/stargazers) -Mocked Streams 1.0 [(git)](https://github.com/jpzk/mockedstreams) is a library for Scala >= 2.11.8 which allows you to **unit-test processing topologies** of [Kafka Streams](https://kafka.apache.org/documentation#streams) applications (since Apache Kafka >=0.10.1) **without Zookeeper and Kafka Brokers**. Further, you can use your favourite Scala testing framework e.g. [ScalaTest](http://www.scalatest.org/) and [Specs2](https://etorreborre.github.io/specs2/). Mocked Streams is located at the Maven Central Repository, therefore you just have to add the following to your [SBT dependencies](http://www.scala-sbt.org/0.13/docs/Library-Dependencies.html): +Mocked Streams 1.2.0 [(git)](https://github.com/jpzk/mockedstreams) is a library for Scala >= 2.11.8 which allows you to **unit-test processing topologies** of [Kafka Streams](https://kafka.apache.org/documentation#streams) applications (since Apache Kafka >=0.10.1) **without Zookeeper and Kafka Brokers**. Further, you can use your favourite Scala testing framework e.g. [ScalaTest](http://www.scalatest.org/) and [Specs2](https://etorreborre.github.io/specs2/). Mocked Streams is located at the Maven Central Repository, therefore you just have to add the following to your [SBT dependencies](http://www.scala-sbt.org/0.13/docs/Library-Dependencies.html): - libraryDependencies += "com.madewithtea" %% "mockedstreams" % "1.1.0" % "test" + libraryDependencies += "com.madewithtea" %% "mockedstreams" % "1.2.0" % "test" ## Apache Kafka Compatibility | Mocked Streams Version | Apache Kafka Version | | ------------- |-------------| -| 1.2.0 | 0.10.2 | +| 1.2.0 | 0.10.2.0 | | 1.1.0 | 0.10.1.1 | | 1.0.0 | 0.10.1.0 | @@ -33,7 +33,7 @@ It wraps the [org.apache.kafka.test.ProcessorTopologyTestDriver](https://github. ## Multiple Input / Output Example and State -It also allows you to have multiple input and output streams. If your topology uses state stores you need to define them using .stores(Seq[String]): +It also allows you to have multiple input and output streams. If your topology uses state stores you need to define them using .stores(stores: Seq[String]): import com.madewithtea.mockedstreams.MockedStreams @@ -46,7 +46,11 @@ It also allows you to have multiple input and output streams. If your topology u mstreams.output("out-a", strings, ints, expA.size) shouldEqual(expectedA) mstreams.output("out-b", strings, ints, expB.size) shouldEqual(expectedB) -## Testing the State Store Content +## State Store Content + +When you define your state stores via .stores(stores: Seq[String]) since 1.2 you are able to verify the state store content: + + import com.madewithtea.mockedstreams.MockedStreams val mstreams = MockedStreams() .topology { builder => builder.stream(...) [...] } From 8491fb0ba4d21dea131248b83daabb0e6017c068 Mon Sep 17 00:00:00 2001 From: Jendrik Poloczek Date: Fri, 24 Feb 2017 11:05:09 +0100 Subject: [PATCH 04/14] CHANGELOG and CONTRIBUTORS as markdown --- CHANGELOG => CHANGELOG.md | 0 CONTRIBUTORS | 5 ----- CONTRIBUTORS.md | 5 +++++ 3 files changed, 5 insertions(+), 5 deletions(-) rename CHANGELOG => CHANGELOG.md (100%) delete mode 100644 CONTRIBUTORS create mode 100644 CONTRIBUTORS.md diff --git a/CHANGELOG b/CHANGELOG.md similarity index 100% rename from CHANGELOG rename to CHANGELOG.md diff --git a/CONTRIBUTORS b/CONTRIBUTORS deleted file mode 100644 index d6887f9..0000000 --- a/CONTRIBUTORS +++ /dev/null @@ -1,5 +0,0 @@ -# Contributors - -Hamidreza Afzali -Jendrik Poloczek - diff --git a/CONTRIBUTORS.md b/CONTRIBUTORS.md new file mode 100644 index 0000000..ebe177f --- /dev/null +++ b/CONTRIBUTORS.md @@ -0,0 +1,5 @@ +# Contributors + +* Hamidreza Afzali +* Jendrik Poloczek + From 125294d03a4a19e7ca72f4250d60861cd7c13e1a Mon Sep 17 00:00:00 2001 From: Jendrik Poloczek Date: Fri, 24 Feb 2017 11:06:29 +0100 Subject: [PATCH 05/14] Fixed typo in CHANGELOG.md --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 64987b6..3fb1eef 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,4 @@ -# Change Log +# Changelog ## Mocked Streams 1.2.0 From 53e43547534458b574e6cd2da111d57317455a66 Mon Sep 17 00:00:00 2001 From: Hamidreza Afzali Date: Fri, 24 Feb 2017 16:49:58 +0100 Subject: [PATCH 06/14] Added support for reading windowed state --- src/main/scala/MockedStreams.scala | 16 ++++++++ src/test/scala/MockedStreamsSpec.scala | 54 ++++++++++++++++++++++++-- 2 files changed, 67 insertions(+), 3 deletions(-) diff --git a/src/main/scala/MockedStreams.scala b/src/main/scala/MockedStreams.scala index 36f149b..a16406d 100644 --- a/src/main/scala/MockedStreams.scala +++ b/src/main/scala/MockedStreams.scala @@ -22,6 +22,7 @@ import java.util.{Properties, UUID} import org.apache.kafka.common.serialization.Serde import org.apache.kafka.streams.StreamsConfig import org.apache.kafka.streams.kstream.KStreamBuilder +import org.apache.kafka.streams.state.ReadOnlyWindowStore import org.apache.kafka.test.ProcessorTopologyTestDriver object MockedStreams { @@ -85,6 +86,21 @@ object MockedStreams { list.toMap } + // FIXME: timeTo: Long = Long.MaxValue + def windowStateTable[K, V](name: String, key: K, timeFrom: Long = 0, timeTo: Long = 100000000L) = { + if (inputs.isEmpty) + throw new NoInputSpecified + + val driver = stream + produce(driver) + + val store = driver.getStateStore(name).asInstanceOf[ReadOnlyWindowStore[K, V]] + val records = store.fetch(key, timeFrom, timeTo) + val list = records.asScala.toList.map { record => (record.key, record.value) } + records.close() + list.toMap + } + // state store is temporarily created in ProcessorTopologyTestDriver private def stream = { val props = new Properties diff --git a/src/test/scala/MockedStreamsSpec.scala b/src/test/scala/MockedStreamsSpec.scala index 5b98e97..f261f7e 100644 --- a/src/test/scala/MockedStreamsSpec.scala +++ b/src/test/scala/MockedStreamsSpec.scala @@ -16,9 +16,13 @@ */ package com.madewithtea.mockedstreams +import java.util.Properties + +import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.serialization.Serdes -import org.apache.kafka.streams.KeyValue -import org.apache.kafka.streams.kstream.{Aggregator, Initializer, KStreamBuilder, ValueJoiner} +import org.apache.kafka.streams.kstream._ +import org.apache.kafka.streams.processor.TimestampExtractor +import org.apache.kafka.streams.{KeyValue, StreamsConfig} import org.scalatest.{FlatSpec, Matchers} class MockedStreamsSpec extends FlatSpec with Matchers { @@ -106,6 +110,26 @@ class MockedStreamsSpec extends FlatSpec with Matchers { .shouldEqual(inputA.toMap) } + + it should "assert correctly when processing windowed state output topology" in { + import Fixtures.Multi._ + + val props = new Properties + props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, classOf[TimestampExtractors.CustomTimestampExtractor].getName) + + val builder = MockedStreams() + .topology(topology3Output _) + .input(InputCTopic, strings, ints, inputC) + .stores(Seq(StoreName)) + .config(props) + + builder.windowStateTable(StoreName, "x") + .shouldEqual(expectedCx.toMap) + + builder.windowStateTable(StoreName, "y") + .shouldEqual(expectedCy.toMap) + } + class LastInitializer extends Initializer[Integer] { override def apply() = 0 } @@ -147,14 +171,18 @@ class MockedStreamsSpec extends FlatSpec with Matchers { val inputA = Seq(("x", int(1)), ("y", int(2))) val inputB = Seq(("x", int(4)), ("y", int(3))) + val inputC = Seq(("x", int(1000)), ("x", int(1000)), ("x", int(2000)), ("y", int(1000))) val expectedA = Seq(("x", int(5)), ("y", int(5))) val expectedB = Seq(("x", int(3)), ("y", int(1))) + val expectedCx = Seq((1000, 2), (2000, 1)) + val expectedCy = Seq((1000, 1)) val strings = Serdes.String() val ints = Serdes.Integer() val InputATopic = "inputA" val InputBTopic = "inputB" + val InputCTopic = "inputC" val OutputATopic = "outputA" val OutputBTopic = "outputB" val StoreName = "store" @@ -187,10 +215,30 @@ class MockedStreamsSpec extends FlatSpec with Matchers { streamB.leftJoin(table, new SubJoiner(), strings, ints) .to(strings, ints, OutputBTopic) } + + def topology3Output(builder: KStreamBuilder) = { + + val streamA = builder.stream(strings, ints, InputCTopic) + + streamA.groupByKey(strings, ints).count( + TimeWindows.of(1000), + StoreName) + } } } } - +object TimestampExtractors { + class CustomTimestampExtractor extends TimestampExtractor { + override def extract(record: ConsumerRecord[AnyRef, AnyRef]): Long = { + record.value match { + case value: Integer => + value.toLong + case _ => + record.timestamp() + } + } + } +} From 3a717bdee4d667e1d8709ce9d142a154a293367f Mon Sep 17 00:00:00 2001 From: Jendrik Poloczek Date: Sun, 26 Feb 2017 10:51:22 +0100 Subject: [PATCH 07/14] fixed method parameters; shortened fixtures; renamed topology method --- src/test/scala/MockedStreamsSpec.scala | 39 ++++++++++++-------------- 1 file changed, 18 insertions(+), 21 deletions(-) diff --git a/src/test/scala/MockedStreamsSpec.scala b/src/test/scala/MockedStreamsSpec.scala index b128ec4..1d33b39 100644 --- a/src/test/scala/MockedStreamsSpec.scala +++ b/src/test/scala/MockedStreamsSpec.scala @@ -114,10 +114,11 @@ class MockedStreamsSpec extends FlatSpec with Matchers { import Fixtures.Multi._ val props = new Properties - props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, classOf[TimestampExtractors.CustomTimestampExtractor].getName) + props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, + classOf[TimestampExtractors.CustomTimestampExtractor].getName) val builder = MockedStreams() - .topology(topology3Output _) + .topology(topology1WindowOutput _) .input(InputCTopic, strings, ints, inputC) .stores(Seq(StoreName)) .config(props) @@ -170,11 +171,11 @@ class MockedStreamsSpec extends FlatSpec with Matchers { val inputA = Seq(("x", int(1)), ("y", int(2))) val inputB = Seq(("x", int(4)), ("y", int(3))) - val inputC = Seq(("x", int(1000)), ("x", int(1000)), ("x", int(2000)), ("y", int(1000))) + val inputC = Seq(("x", int(1)), ("x", int(1)), ("x", int(2)), ("y", int(1))) val expectedA = Seq(("x", int(5)), ("y", int(5))) val expectedB = Seq(("x", int(3)), ("y", int(1))) - val expectedCx = Seq((1000, 2), (2000, 1)) - val expectedCy = Seq((1000, 1)) + val expectedCx = Seq((1, 2), (2, 1)) + val expectedCy = Seq((1, 1)) val strings = Serdes.String() val ints = Serdes.Integer() @@ -198,6 +199,13 @@ class MockedStreamsSpec extends FlatSpec with Matchers { .to(strings, ints, OutputATopic) } + def topology1WindowOutput(builder: KStreamBuilder) = { + val streamA = builder.stream(strings, ints, InputCTopic) + streamA.groupByKey(strings, ints).count( + TimeWindows.of(1), + StoreName) + } + def topology2Output(builder: KStreamBuilder) = { val streamA = builder.stream(strings, ints, InputATopic) val streamB = builder.stream(strings, ints, InputBTopic) @@ -214,30 +222,19 @@ class MockedStreamsSpec extends FlatSpec with Matchers { streamB.leftJoin[Integer, Integer](table, new SubJoiner(), strings, ints) .to(strings, ints, OutputBTopic) } - - def topology3Output(builder: KStreamBuilder) = { - - val streamA = builder.stream(strings, ints, InputCTopic) - - streamA.groupByKey(strings, ints).count( - TimeWindows.of(1000), - StoreName) - } } - } - } object TimestampExtractors { + class CustomTimestampExtractor extends TimestampExtractor { - override def extract(record: ConsumerRecord[AnyRef, AnyRef]): Long = { + override def extract(record: ConsumerRecord[AnyRef, AnyRef], previous: Long) = { record.value match { - case value: Integer => - value.toLong - case _ => - record.timestamp() + case value: Integer => value.toLong + case _ => record.timestamp() } } } + } From 3c6159081310077e89ebc5c6a5c6c800f67c491f Mon Sep 17 00:00:00 2001 From: Jendrik Poloczek Date: Sun, 26 Feb 2017 11:37:53 +0100 Subject: [PATCH 08/14] refactored output and state methods; added validate function for input; added more assertions in test; use Long.MaxValue as top timestamp in window state function --- build.sbt | 2 +- src/main/scala/MockedStreams.scala | 48 +++++++++++------------ src/test/scala/MockedStreamsSpec.scala | 54 +++++++++++++++----------- 3 files changed, 56 insertions(+), 48 deletions(-) diff --git a/build.sbt b/build.sbt index 01452bb..d322baa 100644 --- a/build.sbt +++ b/build.sbt @@ -1,7 +1,7 @@ lazy val commonSettings = Seq( organization := "com.madewithtea", - version := "1.2.0", + version := "1.2.0-SNAPSHOT", scalaVersion := "2.11.8", crossScalaVersions := Seq("2.12.1","2.12.0", "2.11.8"), description := "Topology Unit-Testing Library for Apache Kafka / Kafka Streams", diff --git a/src/main/scala/MockedStreams.scala b/src/main/scala/MockedStreams.scala index a16406d..f119c85 100644 --- a/src/main/scala/MockedStreams.scala +++ b/src/main/scala/MockedStreams.scala @@ -16,7 +16,6 @@ */ package com.madewithtea.mockedstreams -import collection.JavaConverters._ import java.util.{Properties, UUID} import org.apache.kafka.common.serialization.Serde @@ -25,6 +24,8 @@ import org.apache.kafka.streams.kstream.KStreamBuilder import org.apache.kafka.streams.state.ReadOnlyWindowStore import org.apache.kafka.test.ProcessorTopologyTestDriver +import scala.collection.JavaConverters._ + object MockedStreams { def apply() = Builder() @@ -49,33 +50,25 @@ object MockedStreams { this.copy(inputs = inputs + (topic -> Input(in))) } - def output[K, V](topic: String, key: Serde[K], value: Serde[V], size: Int) = { - if (size <= 0) - throw new ExpectedOutputIsEmpty - if (inputs.isEmpty) - throw new NoInputSpecified + def output[K, V](topic: String, key: Serde[K], value: Serde[V], size: Int) = withValidInput { + if (size <= 0) throw new ExpectedOutputIsEmpty - val driver = stream - produce(driver) + val driver = stream + produce(driver) - val keyDes = key.deserializer - val valDes = value.deserializer - (0 until size).flatMap { i => - Option(driver.readOutput(topic, keyDes, valDes)) match { - case Some(record) => Some((record.key, record.value)) - case None => None + (0 until size).flatMap { i => + Option(driver.readOutput(topic, key.deserializer, value.deserializer)) match { + case Some(record) => Some((record.key, record.value)) + case None => None + } } } - } - def outputTable[K,V](topic: String, key: Serde[K], value: Serde[V], size: Int) = { - output[K,V](topic, key, value, size).toMap + def outputTable[K, V](topic: String, key: Serde[K], value: Serde[V], size: Int): Map[K, V] = { + output[K, V](topic, key, value, size).toMap } - def stateTable(name: String) = { - if (inputs.isEmpty) - throw new NoInputSpecified - + def stateTable(name: String): Map[Nothing, Nothing] = withValidInput { val driver = stream produce(driver) @@ -86,10 +79,8 @@ object MockedStreams { list.toMap } - // FIXME: timeTo: Long = Long.MaxValue - def windowStateTable[K, V](name: String, key: K, timeFrom: Long = 0, timeTo: Long = 100000000L) = { - if (inputs.isEmpty) - throw new NoInputSpecified + def windowStateTable[K, V](name: String, key: K, timeFrom: Long = 0, + timeTo: Long = Long.MaxValue) = withValidInput { val driver = stream produce(driver) @@ -125,6 +116,13 @@ object MockedStreams { } } } + + private def withValidInput[T](f: => T): T = { + if (inputs.isEmpty) + throw new NoInputSpecified + f + } + } class NoTopologySpecified extends Exception("No topology specified. Call topology() on builder.") diff --git a/src/test/scala/MockedStreamsSpec.scala b/src/test/scala/MockedStreamsSpec.scala index 1d33b39..a0ed5fe 100644 --- a/src/test/scala/MockedStreamsSpec.scala +++ b/src/test/scala/MockedStreamsSpec.scala @@ -29,38 +29,47 @@ class MockedStreamsSpec extends FlatSpec with Matchers { behavior of "MockedStreams" - it should "throw exception when expected size is <= 0" in { + it should "throw exception when expected size in output methods is <= 0" in { import Fixtures.Uppercase._ import MockedStreams.ExpectedOutputIsEmpty - an[ExpectedOutputIsEmpty] should be thrownBy - MockedStreams() - .topology(topology _) - .input(InputTopic, strings, strings, input) - .output(OutputTopic, strings, strings, 0) - - an[ExpectedOutputIsEmpty] should be thrownBy - MockedStreams() - .topology(topology _) - .input(InputTopic, strings, strings, input) - .output(OutputTopic, strings, strings, -1) + val spec = MockedStreams() + .topology(topology) + .input(InputTopic, strings, strings, input) + + Seq(-1, 0).foreach { size => + an[ExpectedOutputIsEmpty] should be thrownBy + spec.output(OutputTopic, strings, strings, size) + + an[ExpectedOutputIsEmpty] should be thrownBy + spec.outputTable(OutputTopic, strings, strings, size) + } } - it should "throw exception when no input specified" in { + it should "throw exception when no input specified for all output and state methods" in { import Fixtures.Uppercase._ import MockedStreams.NoInputSpecified + val t = MockedStreams().topology(topology) + + an[NoInputSpecified] should be thrownBy + t.output(OutputTopic, strings, strings, expected.size) + an[NoInputSpecified] should be thrownBy - MockedStreams() - .topology(topology _) - .output(OutputTopic, strings, strings, expected.size) + t.outputTable(OutputTopic, strings, strings, expected.size) + + an[NoInputSpecified] should be thrownBy + t.stateTable("state-table") + + an[NoInputSpecified] should be thrownBy + t.windowStateTable("window-state-table", 0) } it should "assert correctly when processing strings to uppercase" in { import Fixtures.Uppercase._ val output = MockedStreams() - .topology(topology _) + .topology(topology) .input(InputTopic, strings, strings, input) .output(OutputTopic, strings, strings, expected.size) @@ -71,7 +80,7 @@ class MockedStreamsSpec extends FlatSpec with Matchers { import Fixtures.Uppercase._ val output = MockedStreams() - .topology(topology _) + .topology(topology) .input(InputTopic, strings, strings, input) .outputTable(OutputTopic, strings, strings, expected.size) @@ -82,7 +91,7 @@ class MockedStreamsSpec extends FlatSpec with Matchers { import Fixtures.Multi._ val builder = MockedStreams() - .topology(topology1Output _) + .topology(topology1Output) .input(InputATopic, strings, ints, inputA) .input(InputBTopic, strings, ints, inputB) .stores(Seq(StoreName)) @@ -95,7 +104,7 @@ class MockedStreamsSpec extends FlatSpec with Matchers { import Fixtures.Multi._ val builder = MockedStreams() - .topology(topology2Output _) + .topology(topology2Output) .input(InputATopic, strings, ints, inputA) .input(InputBTopic, strings, ints, inputB) .stores(Seq(StoreName)) @@ -109,7 +118,6 @@ class MockedStreamsSpec extends FlatSpec with Matchers { builder.stateTable(StoreName) shouldEqual inputA.toMap } - it should "assert correctly when processing windowed state output topology" in { import Fixtures.Multi._ @@ -118,7 +126,7 @@ class MockedStreamsSpec extends FlatSpec with Matchers { classOf[TimestampExtractors.CustomTimestampExtractor].getName) val builder = MockedStreams() - .topology(topology1WindowOutput _) + .topology(topology1WindowOutput) .input(InputCTopic, strings, ints, inputC) .stores(Seq(StoreName)) .config(props) @@ -223,7 +231,9 @@ class MockedStreamsSpec extends FlatSpec with Matchers { .to(strings, ints, OutputBTopic) } } + } + } object TimestampExtractors { From 12f2a7dc66736ea334c87b97ec32650c073881cd Mon Sep 17 00:00:00 2001 From: Jendrik Poloczek Date: Sun, 26 Feb 2017 11:56:04 +0100 Subject: [PATCH 09/14] removed type annotations; shortened method --- src/test/scala/MockedStreamsSpec.scala | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/src/test/scala/MockedStreamsSpec.scala b/src/test/scala/MockedStreamsSpec.scala index a0ed5fe..5833238 100644 --- a/src/test/scala/MockedStreamsSpec.scala +++ b/src/test/scala/MockedStreamsSpec.scala @@ -143,15 +143,15 @@ class MockedStreamsSpec extends FlatSpec with Matchers { } class LastAggregator extends Aggregator[String, Integer, Integer] { - override def apply(k: String, v: Integer, t: Integer): Integer = v + override def apply(k: String, v: Integer, t: Integer) = v } class AddJoiner extends ValueJoiner[Integer, Integer, Integer] { - override def apply(v1: Integer, v2: Integer): Integer = v1 + v2 + override def apply(v1: Integer, v2: Integer) = v1 + v2 } class SubJoiner extends ValueJoiner[Integer, Integer, Integer] { - override def apply(v1: Integer, v2: Integer): Integer = v1 - v2 + override def apply(v1: Integer, v2: Integer) = v1 - v2 } object Fixtures { @@ -239,11 +239,9 @@ class MockedStreamsSpec extends FlatSpec with Matchers { object TimestampExtractors { class CustomTimestampExtractor extends TimestampExtractor { - override def extract(record: ConsumerRecord[AnyRef, AnyRef], previous: Long) = { - record.value match { - case value: Integer => value.toLong - case _ => record.timestamp() - } + override def extract(record: ConsumerRecord[AnyRef, AnyRef], previous: Long) = record.value match { + case value: Integer => value.toLong + case _ => record.timestamp() } } From e5e9c8cad7c01a6a1c0d60301569d44ccc07d3a6 Mon Sep 17 00:00:00 2001 From: Jendrik Poloczek Date: Sun, 26 Feb 2017 12:35:05 +0100 Subject: [PATCH 10/14] added documentation for stateTable and windowStateTable --- README.md | 25 ++++++++++++++++++++++--- 1 file changed, 22 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index 7d6009f..b00d2b4 100644 --- a/README.md +++ b/README.md @@ -46,9 +46,9 @@ It also allows you to have multiple input and output streams. If your topology u mstreams.output("out-a", strings, ints, expA.size) shouldEqual(expectedA) mstreams.output("out-b", strings, ints, expB.size) shouldEqual(expectedB) -## State Store Content +## State Store -When you define your state stores via .stores(stores: Seq[String]) since 1.2 you are able to verify the state store content: +When you define your state stores via .stores(stores: Seq[String]) since 1.2, you are able to verify the state store content via the .stateTable(name: String) method: import com.madewithtea.mockedstreams.MockedStreams @@ -59,7 +59,26 @@ When you define your state stores via .stores(stores: Seq[String]) since 1.2 you .stores(Seq("store-name")) mstreams.stateTable("store-name") shouldEqual Map('a' -> 1) - + +## Window State Store + +When you define your state stores via .stores(stores: Seq[String]) since 1.2 and added the timestamp extractor to the config, you are able to verify the window state store content via the .windowStateTable(name: String, key: K) method: + + import com.madewithtea.mockedstreams.MockedStreams + + val props = new Properties + props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, + classOf[TimestampExtractors.CustomTimestampExtractor].getName) + + val builder = MockedStreams() + .topology(topology1WindowOutput) + .input("in-a", strings, ints, inputA) + .stores(Seq("store-name")) + .config(props) + + builder.windowStateTable("store-name", "x") shouldEqual someMapX + builder.windowStateTable("store-name", "y") shouldEqual someMapY + ## Custom Streams Configuration Sometimes you need to pass a custom configuration to Kafka Streams: From e87382a57b6302025de0d51d031f28cb6e69d5fb Mon Sep 17 00:00:00 2001 From: Jendrik Poloczek Date: Sun, 26 Feb 2017 12:37:21 +0100 Subject: [PATCH 11/14] updated the changelog --- CHANGELOG.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3fb1eef..b251249 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,11 +4,12 @@ * Build against Apache Kafka 0.10.2 * Added support for Scala 2.12.1 -* Added .stateTable method for retrieving the contant of the state store as Map +* Added .stateTable and .windowStateTable method for retrieving the contant of the state store as Map * Added contributors file * Removed dependencies to Log4j and Slf4j * Updated RocksDB version to 5.0.1 * Updated ScalaTest version to 3.0.1 +* Added more assertions in the test for input validation ## Mocked Streams 1.1.0 From f5948f72b75f26e1e91f42692b013c48df38bc5a Mon Sep 17 00:00:00 2001 From: Jendrik Poloczek Date: Sun, 26 Feb 2017 12:47:20 +0100 Subject: [PATCH 12/14] changed version back to non-SNAPSHOT --- build.sbt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build.sbt b/build.sbt index d322baa..01452bb 100644 --- a/build.sbt +++ b/build.sbt @@ -1,7 +1,7 @@ lazy val commonSettings = Seq( organization := "com.madewithtea", - version := "1.2.0-SNAPSHOT", + version := "1.2.0", scalaVersion := "2.11.8", crossScalaVersions := Seq("2.12.1","2.12.0", "2.11.8"), description := "Topology Unit-Testing Library for Apache Kafka / Kafka Streams", From 85ed23f3ae6a6825492efb4d6d6b6f5a798ce230 Mon Sep 17 00:00:00 2001 From: Jendrik Poloczek Date: Sun, 26 Feb 2017 12:53:18 +0100 Subject: [PATCH 13/14] improved documentation for windowStateTable --- README.md | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index b00d2b4..c534f85 100644 --- a/README.md +++ b/README.md @@ -70,14 +70,14 @@ When you define your state stores via .stores(stores: Seq[String]) since 1.2 and props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, classOf[TimestampExtractors.CustomTimestampExtractor].getName) - val builder = MockedStreams() - .topology(topology1WindowOutput) + val mstreams = MockedStreams() + .topology { builder => builder.stream(...) [...] } .input("in-a", strings, ints, inputA) .stores(Seq("store-name")) .config(props) - builder.windowStateTable("store-name", "x") shouldEqual someMapX - builder.windowStateTable("store-name", "y") shouldEqual someMapY + mstreams.windowStateTable("store-name", "x") shouldEqual someMapX + mstreams.windowStateTable("store-name", "y") shouldEqual someMapY ## Custom Streams Configuration From 1e14e511073bfdb28dc80d01bc794d665f01667c Mon Sep 17 00:00:00 2001 From: Jendrik Poloczek Date: Sun, 26 Feb 2017 12:54:58 +0100 Subject: [PATCH 14/14] fixed typo --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index b251249..b3215bb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,7 +4,7 @@ * Build against Apache Kafka 0.10.2 * Added support for Scala 2.12.1 -* Added .stateTable and .windowStateTable method for retrieving the contant of the state store as Map +* Added .stateTable and .windowStateTable method for retrieving the content of the state stores as Map * Added contributors file * Removed dependencies to Log4j and Slf4j * Updated RocksDB version to 5.0.1