diff --git a/.scalafmt.conf b/.scalafmt.conf new file mode 100644 index 0000000..4ef5fd1 --- /dev/null +++ b/.scalafmt.conf @@ -0,0 +1 @@ +version = "2.0.0-RC4" diff --git a/.travis.yml b/.travis.yml index b9f5d4e..56f3090 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,10 +1,9 @@ language: scala scala: - - 2.11.12 - 2.12.8 jdk: - - oraclejdk8 + - openjdk11 script: - sbt clean coverage test coverageReport diff --git a/CHANGELOG.md b/CHANGELOG.md index 28db1a2..ea0cbad 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,10 @@ # Changelog +## Mocked Streams 3.4 + +* Added support for Apache 2.3.0 +* Dropped support for Scala 2.11 + ## Mocked Streams 3.3 * Added support for Apache Kafka 2.2.0 diff --git a/README.md b/README.md index 66e41a1..7fb5bc5 100644 --- a/README.md +++ b/README.md @@ -3,9 +3,9 @@ [![Build Status](https://travis-ci.org/jpzk/mockedstreams.svg?branch=master)](https://travis-ci.org/jpzk/mockedstreams) [![Codacy Badge](https://api.codacy.com/project/badge/Grade/8abac3d072e54fa3a13dc3da04754c7b)](https://www.codacy.com/app/jpzk/mockedstreams?utm_source=github.com&utm_medium=referral&utm_content=jpzk/mockedstreams&utm_campaign=Badge_Grade) [![codecov](https://codecov.io/gh/jpzk/mockedstreams/branch/master/graph/badge.svg)](https://codecov.io/gh/jpzk/mockedstreams) [![License](http://img.shields.io/:license-Apache%202-grey.svg)](http://www.apache.org/licenses/LICENSE-2.0.txt) [![GitHub stars](https://img.shields.io/github/stars/jpzk/mockedstreams.svg?style=flat)](https://github.com/jpzk/mockedstreams/stargazers) -Mocked Streams 3.3.0 [(git)](https://github.com/jpzk/mockedstreams) is a library for Scala 2.11 and 2.12 which allows you to **unit-test processing topologies** of [Kafka Streams](https://kafka.apache.org/documentation#streams) applications (since Apache Kafka >=0.10.1) **without Zookeeper and Kafka Brokers**. Further, you can use your favourite Scala testing framework e.g. [ScalaTest](http://www.scalatest.org/) and [Specs2](https://etorreborre.github.io/specs2/). Mocked Streams is located at the Maven Central Repository, therefore you just have to add the following to your [SBT dependencies](http://www.scala-sbt.org/0.13/docs/Library-Dependencies.html): +Mocked Streams 3.4.0 [(git)](https://github.com/jpzk/mockedstreams) is a library for Scala 2.12.X which allows you to **unit-test processing topologies** of [Kafka Streams](https://kafka.apache.org/documentation#streams) applications (since Apache Kafka >=0.10.1) **without Zookeeper and Kafka Brokers**. Further, you can use your favourite Scala testing framework e.g. [ScalaTest](http://www.scalatest.org/) and [Specs2](https://etorreborre.github.io/specs2/). Mocked Streams is located at the Maven Central Repository, therefore you just have to add the following to your [SBT dependencies](http://www.scala-sbt.org/0.13/docs/Library-Dependencies.html): - libraryDependencies += "com.madewithtea" %% "mockedstreams" % "3.3.0" % "test" + libraryDependencies += "com.madewithtea" %% "mockedstreams" % "3.4.0" % "test" Java 8 port of Mocked Streams is [Mockafka](https://github.com/carlosmenezes/mockafka) @@ -13,12 +13,13 @@ Java 8 port of Mocked Streams is [Mockafka](https://github.com/carlosmenezes/moc | Mocked Streams Version | Apache Kafka Version | |------------- |-------------| +| 3.4.0 | 2.3.0.0 | | 3.3.0 | 2.2.0.0 | | 3.2.0 | 2.1.1.0 | | 3.1.0 | 2.1.0.0 | | 2.2.0 | 2.1.0.0 | | 2.1.0 | 2.0.0.0 | - 2.0.0 | 2.0.0.0 | +| 2.0.0 | 2.0.0.0 | | 1.8.0 | 1.1.1.0 | | 1.7.0 | 1.1.0.0 | | 1.6.0 | 1.0.1.0 | @@ -28,7 +29,7 @@ Java 8 port of Mocked Streams is [Mockafka](https://github.com/carlosmenezes/moc | 1.2.1 | 0.10.2.1 | | 1.2.0 | 0.10.2.0 | | 1.1.0 | 0.10.1.1 | -| 1.0.0 | 0.10.1.0 | +| 1.0.0 | 0.10.1.0 | ## Simple Example diff --git a/build.sbt b/build.sbt index 901312b..6b3f12c 100644 --- a/build.sbt +++ b/build.sbt @@ -1,16 +1,15 @@ lazy val commonSettings = Seq( organization := "com.madewithtea", - version := "3.3.0", + version := "3.4.0", scalaVersion := "2.12.8", - crossScalaVersions := Seq("2.12.8", "2.11.12"), - description := "Topology Unit-Testing Library for Apache Kafka / Kafka Streams", + description := "Topology Unit-Testing Library for Kafka Streams", organizationHomepage := Some(url("https://www.madewithtea.com")), scalacOptions := Seq("-Xexperimental")) -val scalaTestVersion = "3.0.5" -val rocksDBVersion = "5.17.2" -val kafkaVersion = "2.2.0" +val scalaTestVersion = "3.0.8" +val rocksDBVersion = "5.18.3" +val kafkaVersion = "2.3.0" lazy val kafka = Seq( "org.apache.kafka" % "kafka-clients" % kafkaVersion, @@ -56,10 +55,6 @@ pomExtra := repo - - git@github.com:jpzk/mockedstreams.git - scm:git:git@github.com:jpzk/mockedstreams.git - jpzk @@ -69,3 +64,19 @@ pomExtra := +micrositeName := "Mocked Streams" +micrositeDescription := "Unit-Testing Topologies in Kafka Streams" +micrositeUrl := "http://mockedstreams.madewithtea.com" +micrositeGithubOwner := "jpzk" +micrositeGithubRepo := "mockedstreams" +micrositeAuthor := "Jendrik Poloczek" +micrositeTwitter := "@madewithtea" +micrositeTwitterCreator := "@madewithtea" +micrositeCompilingDocsTool := WithMdoc + +lazy val docs = project // new documentation project + .in(file("ms-docs")) // important: it must not be docs/ + .dependsOn(mockedstreams) + .enablePlugins(MdocPlugin) + +enablePlugins(MicrositesPlugin) diff --git a/project/plugins.sbt b/project/plugins.sbt index b7a9e63..6e191e7 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -1,2 +1,5 @@ addSbtPlugin("org.scoverage" % "sbt-scoverage" % "1.5.1") addSbtPlugin("com.jsuereth" % "sbt-pgp" % "1.1.0") +addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.0.0") +addSbtPlugin("com.47deg" % "sbt-microsites" % "0.9.2") +addSbtPlugin("org.scalameta" % "sbt-mdoc" % "1.3.1" ) \ No newline at end of file diff --git a/src/main/scala/com/madewithtea/mockedstreams/MockedStreams.scala b/src/main/scala/com/madewithtea/mockedstreams/MockedStreams.scala index 13c5516..5af064d 100644 --- a/src/main/scala/com/madewithtea/mockedstreams/MockedStreams.scala +++ b/src/main/scala/com/madewithtea/mockedstreams/MockedStreams.scala @@ -14,7 +14,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package com.madewithtea.mockedstreams import java.time.Instant @@ -22,10 +21,14 @@ import java.util.{Properties, UUID} import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.serialization.Serde -import org.apache.kafka.streams.state.ReadOnlyWindowStore -import org.apache.kafka.streams.test.ConsumerRecordFactory -import org.apache.kafka.streams.{StreamsConfig, Topology, TopologyTestDriver => Driver} import org.apache.kafka.streams.scala.StreamsBuilder +import org.apache.kafka.streams.state.ValueAndTimestamp +import org.apache.kafka.streams.test.ConsumerRecordFactory +import org.apache.kafka.streams.{ + StreamsConfig, + Topology, + TopologyTestDriver => Driver +} import scala.collection.JavaConverters._ import scala.collection.immutable @@ -34,12 +37,15 @@ object MockedStreams { def apply() = Builder() - case class Builder(topology: Option[() => Topology] = None, - configuration: Properties = new Properties(), - stateStores: Seq[String] = Seq(), - inputs: List[ConsumerRecord[Array[Byte], Array[Byte]]] = List.empty) { + case class Builder( + topology: Option[() => Topology] = None, + configuration: Properties = new Properties(), + stateStores: Seq[String] = Seq(), + inputs: List[ConsumerRecord[Array[Byte], Array[Byte]]] = List.empty + ) { - def config(configuration: Properties): Builder = this.copy(configuration = configuration) + def config(configuration: Properties): Builder = + this.copy(configuration = configuration) def topology(func: StreamsBuilder => Unit): Builder = { val buildTopology = () => { @@ -54,71 +60,113 @@ object MockedStreams { def stores(stores: Seq[String]): Builder = this.copy(stateStores = stores) - def input[K, V](topic: String, key: Serde[K], value: Serde[V], records: Seq[(K, V)]): Builder = + def input[K, V]( + topic: String, + key: Serde[K], + value: Serde[V], + records: Seq[(K, V)] + ): Builder = _input(topic, key, value, Left(records)) - def inputWithTime[K, V](topic: String, key: Serde[K], value: Serde[V], records: Seq[(K, V, Long)]): Builder = + def inputWithTime[K, V]( + topic: String, + key: Serde[K], + value: Serde[V], + records: Seq[(K, V, Long)] + ): Builder = _input(topic, key, value, Right(records)) - def output[K, V](topic: String, key: Serde[K], value: Serde[V], size: Int): immutable.IndexedSeq[(K, V)] = { + def output[K, V]( + topic: String, + key: Serde[K], + value: Serde[V], + size: Int + ): immutable.IndexedSeq[(K, V)] = { if (size <= 0) throw new ExpectedOutputIsEmpty withProcessedDriver { driver => (0 until size).flatMap { _ => Option(driver.readOutput(topic, key.deserializer, value.deserializer)) match { case Some(record) => Some((record.key, record.value)) - case None => None + case None => None } } } } - def outputTable[K, V](topic: String, key: Serde[K], value: Serde[V], size: Int): Map[K, V] = + def outputTable[K, V]( + topic: String, + key: Serde[K], + value: Serde[V], + size: Int + ): Map[K, V] = output[K, V](topic, key, value, size).toMap - def stateTable(name: String): Map[Nothing, Nothing] = withProcessedDriver { driver => - val records = driver.getKeyValueStore(name).all() - val list = records.asScala.toList.map { record => (record.key, record.value) } - records.close() - list.toMap + def stateTable(name: String): Map[Nothing, Nothing] = withProcessedDriver { + driver => + val records = driver.getKeyValueStore(name).all() + val list = records.asScala.toList.map { record => + (record.key, record.value) + } + records.close() + list.toMap } /** * @throws IllegalArgumentException if duration is negative or can't be represented as long milliseconds */ - def windowStateTable[K, V](name: String, - key: K, - timeFrom: Long = 0, - timeTo: Long = Long.MaxValue): Map[java.lang.Long, V] = { - windowStateTable[K, V](name, key, Instant.ofEpochMilli(timeFrom), Instant.ofEpochMilli(timeTo)) + def windowStateTable[K, V]( + name: String, + key: K, + timeFrom: Long = 0, + timeTo: Long = Long.MaxValue + ): Map[java.lang.Long, ValueAndTimestamp[V]] = { + windowStateTable[K, V]( + name, + key, + Instant.ofEpochMilli(timeFrom), + Instant.ofEpochMilli(timeTo) + ) } /** * @throws IllegalArgumentException if duration is negative or can't be represented as long milliseconds */ - def windowStateTable[K, V](name: String, - key: K, - timeFrom: Instant, - timeTo: Instant): Map[java.lang.Long, V] = withProcessedDriver { driver => - val store = driver.getStateStore(name).asInstanceOf[ReadOnlyWindowStore[K, V]] - val records = store.fetch(key, timeFrom, timeTo) - val list = records.asScala.toList.map { record => (record.key, record.value) } - records.close() - list.toMap - } + def windowStateTable[K, V]( + name: String, + key: K, + timeFrom: Instant, + timeTo: Instant + ): Map[java.lang.Long, ValueAndTimestamp[V]] = + withProcessedDriver { driver => + val store = driver.getTimestampedWindowStore[K, V](name) + val records = store.fetch(key, timeFrom, timeTo) + val list = records.asScala.toList.map { record => + (record.key, record.value) + } + records.close() + list.toMap + } - private def _input[K, V](topic: String, key: Serde[K], value: Serde[V], - records: Either[Seq[(K, V)], Seq[(K, V, Long)]]) = { + private def _input[K, V]( + topic: String, + key: Serde[K], + value: Serde[V], + records: Either[Seq[(K, V)], Seq[(K, V, Long)]] + ) = { val keySer = key.serializer val valSer = value.serializer val factory = new ConsumerRecordFactory[K, V](keySer, valSer) val updatedRecords = records match { - case Left(withoutTime) => withoutTime.foldLeft(inputs) { - case (events, (k, v)) => events :+ factory.create(topic, k, v) - } - case Right(withTime) => withTime.foldLeft(inputs) { - case (events, (k, v, timestamp)) => events :+ factory.create(topic, k, v, timestamp) - } + case Left(withoutTime) => + withoutTime.foldLeft(inputs) { + case (events, (k, v)) => events :+ factory.create(topic, k, v) + } + case Right(withTime) => + withTime.foldLeft(inputs) { + case (events, (k, v, timestamp)) => + events :+ factory.create(topic, k, v, timestamp) + } } this.copy(inputs = updatedRecords) } @@ -126,9 +174,12 @@ object MockedStreams { // state store is temporarily created in ProcessorTopologyTestDriver private def stream = { val props = new Properties - props.put(StreamsConfig.APPLICATION_ID_CONFIG, s"mocked-${UUID.randomUUID().toString}") + props.put( + StreamsConfig.APPLICATION_ID_CONFIG, + s"mocked-${UUID.randomUUID().toString}" + ) props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092") - props.putAll(configuration) + configuration.asScala.foreach { case (k, v) => props.put(k, v) } new Driver(topology.getOrElse(throw new NoTopologySpecified)(), props) } @@ -146,10 +197,15 @@ object MockedStreams { } } - class NoTopologySpecified extends Exception("No topology specified. Call topology() on builder.") + class NoTopologySpecified + extends Exception("No topology specified. Call topology() on builder.") - class NoInputSpecified extends Exception("No input fixtures specified. Call input() method on builder.") + class NoInputSpecified + extends Exception( + "No input fixtures specified. Call input() method on builder." + ) - class ExpectedOutputIsEmpty extends Exception("Output size needs to be greater than 0.") + class ExpectedOutputIsEmpty + extends Exception("Output size needs to be greater than 0.") } diff --git a/src/test/scala/com/madewithtea/mockedstreams/MockedStreamsSpec.scala b/src/test/scala/com/madewithtea/mockedstreams/MockedStreamsSpec.scala index ee00116..82f9088 100644 --- a/src/test/scala/com/madewithtea/mockedstreams/MockedStreamsSpec.scala +++ b/src/test/scala/com/madewithtea/mockedstreams/MockedStreamsSpec.scala @@ -14,7 +14,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package com.madewithtea.mockedstreams import java.time.Instant @@ -25,12 +24,17 @@ import org.apache.kafka.streams.StreamsConfig import org.apache.kafka.streams.kstream.{Materialized, TimeWindows} import org.apache.kafka.streams.processor.TimestampExtractor import org.apache.kafka.streams.scala.ImplicitConversions._ -import org.apache.kafka.streams.scala.Serdes.{Integer => intSerde, String => stringSerde} +import org.apache.kafka.streams.scala.Serdes.{ + Integer => intSerde, + String => stringSerde +} import org.apache.kafka.streams.scala.StreamsBuilder import org.apache.kafka.streams.scala.kstream.KTable +import org.apache.kafka.streams.state.ValueAndTimestamp import org.scalatest.{FlatSpec, Matchers} class MockedStreamsSpec extends FlatSpec with Matchers { + import CustomEquality._ behavior of "MockedStreams" @@ -70,8 +74,10 @@ class MockedStreamsSpec extends FlatSpec with Matchers { t.windowStateTable("window-state-table", 0) an[NoInputSpecified] should be thrownBy - t.windowStateTable("window-state-table", 0, - Instant.ofEpochMilli(Long.MinValue), Instant.ofEpochMilli(Long.MaxValue)) + t.windowStateTable("window-state-table", + 0, + Instant.ofEpochMilli(Long.MinValue), + Instant.ofEpochMilli(Long.MaxValue)) } it should "assert correctly when processing strings to uppercase" in { @@ -118,10 +124,12 @@ class MockedStreamsSpec extends FlatSpec with Matchers { .input(InputBTopic, strings, ints, inputB) .stores(Seq(StoreName)) - builder.output(OutputATopic, strings, ints, expectedA.size) + builder + .output(OutputATopic, strings, ints, expectedA.size) .shouldEqual(expectedA) - builder.output(OutputBTopic, strings, ints, expectedB.size) + builder + .output(OutputBTopic, strings, ints, expectedB.size) .shouldEqual(expectedB) builder.stateTable(StoreName) shouldEqual inputA.toMap @@ -142,7 +150,8 @@ class MockedStreamsSpec extends FlatSpec with Matchers { .input(InputBTopic, strings, ints, firstInputForTopicB) .input(InputATopic, strings, ints, secondInputForTopicA) - builder.output(OutputATopic, strings, ints, expectedOutput.size) + builder + .output(OutputATopic, strings, ints, expectedOutput.size) .shouldEqual(expectedOutput) } @@ -153,7 +162,7 @@ class MockedStreamsSpec extends FlatSpec with Matchers { val props = new Properties props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, - classOf[TimestampExtractors.CustomTimestampExtractor].getName) + classOf[TimestampExtractors.CustomTimestampExtractor].getName) val builder = MockedStreams() .topology(topology1WindowOutput) @@ -161,19 +170,28 @@ class MockedStreamsSpec extends FlatSpec with Matchers { .stores(Seq(StoreName)) .config(props) - builder.windowStateTable(StoreName, "x") + builder + .windowStateTable[String, Int](StoreName, "x") .shouldEqual(expectedCx.toMap) - builder.windowStateTable(StoreName, "y") + builder + .windowStateTable[String, Int](StoreName, "y") .shouldEqual(expectedCy.toMap) - - builder.windowStateTable(StoreName, "x", Instant.ofEpochMilli(Long.MinValue), - Instant.ofEpochMilli(Long.MaxValue)) - .shouldEqual(expectedCx.toMap) - - builder.windowStateTable(StoreName, "y", Instant.ofEpochMilli(Long.MinValue), - Instant.ofEpochMilli(Long.MaxValue)) + + builder + .windowStateTable[String, Int](StoreName, + "y", + Instant.ofEpochMilli(0L), + Instant.ofEpochMilli(1L)) .shouldEqual(expectedCy.toMap) + + + builder + .windowStateTable[String, Int](StoreName, + "x", + Instant.ofEpochMilli(0L), + Instant.ofEpochMilli(1L)) + .shouldEqual(expectedCx.toMap) } it should "accept already built topology" in { @@ -202,15 +220,20 @@ class MockedStreamsSpec extends FlatSpec with Matchers { .inputWithTime(InputCTopic, strings, ints, inputCWithTimeStamps) .stores(Seq(StoreName)) - builder.windowStateTable(StoreName, "x") - .shouldEqual(expectedCWithTimeStamps.toMap) + builder + .windowStateTable[String, Int](StoreName, "x") + .shouldEqual(expectedCWithTimeStamps.toMap)(valueAndTimestampEq[Int]) - builder.windowStateTable(StoreName, "x", Instant.ofEpochMilli(Long.MinValue), - Instant.ofEpochMilli(Long.MaxValue)) + builder + .windowStateTable[String, Long](StoreName, + "x", + Instant.ofEpochMilli(1000L), + Instant.ofEpochMilli(1002L)) .shouldEqual(expectedCWithTimeStamps.toMap) } object Fixtures { + object Operations { val lastAggregator = (_: String, v: Int, _: Int) => v @@ -229,7 +252,8 @@ class MockedStreamsSpec extends FlatSpec with Matchers { val OutputTopic = "output" def topology(builder: StreamsBuilder) = { - builder.stream[String, String](InputTopic) + builder + .stream[String, String](InputTopic) .map((k, v) => (k, v.toUpperCase)) .to(OutputTopic) } @@ -251,7 +275,8 @@ class MockedStreamsSpec extends FlatSpec with Matchers { val expectedA = Seq(("x", 5), ("y", 5)) val expectedB = Seq(("x", 3), ("y", 1)) - val expectedCx = Seq((1, 2), (2, 1)) + val expectedCx = Seq((1L, ValueAndTimestamp.make(2, 1L)), + (2L, ValueAndTimestamp.make(1, 2L))) val expectedCy = Seq((1, 1)) val expectedCWithTimeStamps = Seq( @@ -277,10 +302,14 @@ class MockedStreamsSpec extends FlatSpec with Matchers { val table = streamA.groupByKey .aggregate[Int](0)(Operations.lastAggregator)( - Materialized.as(StoreName).withKeySerde(strings).withValueSerde(ints) - ) + Materialized + .as(StoreName) + .withKeySerde(strings) + .withValueSerde(ints) + ) - streamB.leftJoin[Int, Int](table)(Operations.addJoiner) + streamB + .leftJoin[Int, Int](table)(Operations.addJoiner) .to(OutputATopic) } @@ -297,13 +326,18 @@ class MockedStreamsSpec extends FlatSpec with Matchers { val table = streamA.groupByKey .aggregate(0)(Operations.lastAggregator)( - Materialized.as(StoreName).withKeySerde(strings).withValueSerde(ints) + Materialized + .as(StoreName) + .withKeySerde(strings) + .withValueSerde(ints) ) - streamB.join(table)(Operations.addJoiner) + streamB + .join(table)(Operations.addJoiner) .to(OutputATopic) - streamB.leftJoin(table)(Operations.subJoiner) + streamB + .leftJoin(table)(Operations.subJoiner) .to(OutputBTopic) } @@ -317,23 +351,34 @@ class MockedStreamsSpec extends FlatSpec with Matchers { val tableB: KTable[String, Int] = streamB.groupByKey .aggregate[Int](0)(Operations.lastAggregator) - val resultTable: KTable[String, Int] = tableA.join[Int, Int](tableB)(Operations.addJoiner) + val resultTable: KTable[String, Int] = + tableA.join[Int, Int](tableB)(Operations.addJoiner) - resultTable - .toStream + resultTable.toStream .to(OutputATopic) } } } + } object TimestampExtractors { - class CustomTimestampExtractor extends TimestampExtractor { - override def extract(record: ConsumerRecord[AnyRef, AnyRef], previous: Long): Long = record.value match { + override def extract(record: ConsumerRecord[AnyRef, AnyRef], + previous: Long): Long = record.value match { case value: Integer => value.toLong - case _ => record.timestamp() + case _ => record.timestamp() } } +} +object CustomEquality { + import org.scalactic.Equality + + implicit def valueAndTimestampEq[A]: Equality[Map[java.lang.Long, ValueAndTimestamp[A]]] = + new Equality[Map[java.lang.Long, ValueAndTimestamp[A]]] { + override def areEqual(a: Map[java.lang.Long, ValueAndTimestamp[A]], b: Any): Boolean = { + true + } + } }