From ac69320d49673023a33edcbdb9dcc48a80bb9419 Mon Sep 17 00:00:00 2001 From: Jendrik Poloczek Date: Fri, 16 Aug 2019 11:26:13 +0100 Subject: [PATCH 01/12] changed window unit test for changed semantics in Kafka 2.3, formatted using scalafmt --- build.sbt | 8 +- .../mockedstreams/MockedStreams.scala | 23 ++-- .../mockedstreams/MockedStreamsSpec.scala | 115 ++++++++++++------ 3 files changed, 96 insertions(+), 50 deletions(-) diff --git a/build.sbt b/build.sbt index 901312b..66182bf 100644 --- a/build.sbt +++ b/build.sbt @@ -1,16 +1,16 @@ lazy val commonSettings = Seq( organization := "com.madewithtea", - version := "3.3.0", + version := "3.4.0", scalaVersion := "2.12.8", crossScalaVersions := Seq("2.12.8", "2.11.12"), description := "Topology Unit-Testing Library for Apache Kafka / Kafka Streams", organizationHomepage := Some(url("https://www.madewithtea.com")), scalacOptions := Seq("-Xexperimental")) -val scalaTestVersion = "3.0.5" -val rocksDBVersion = "5.17.2" -val kafkaVersion = "2.2.0" +val scalaTestVersion = "3.0.8" +val rocksDBVersion = "5.18.3" +val kafkaVersion = "2.3.0" lazy val kafka = Seq( "org.apache.kafka" % "kafka-clients" % kafkaVersion, diff --git a/src/main/scala/com/madewithtea/mockedstreams/MockedStreams.scala b/src/main/scala/com/madewithtea/mockedstreams/MockedStreams.scala index 13c5516..214e024 100644 --- a/src/main/scala/com/madewithtea/mockedstreams/MockedStreams.scala +++ b/src/main/scala/com/madewithtea/mockedstreams/MockedStreams.scala @@ -22,10 +22,10 @@ import java.util.{Properties, UUID} import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.serialization.Serde -import org.apache.kafka.streams.state.ReadOnlyWindowStore +import org.apache.kafka.streams.scala.StreamsBuilder +import org.apache.kafka.streams.state.ValueAndTimestamp import org.apache.kafka.streams.test.ConsumerRecordFactory import org.apache.kafka.streams.{StreamsConfig, Topology, TopologyTestDriver => Driver} -import org.apache.kafka.streams.scala.StreamsBuilder import scala.collection.JavaConverters._ import scala.collection.immutable @@ -88,7 +88,7 @@ object MockedStreams { def windowStateTable[K, V](name: String, key: K, timeFrom: Long = 0, - timeTo: Long = Long.MaxValue): Map[java.lang.Long, V] = { + timeTo: Long = Long.MaxValue): Map[java.lang.Long, ValueAndTimestamp[V]] = { windowStateTable[K, V](name, key, Instant.ofEpochMilli(timeFrom), Instant.ofEpochMilli(timeTo)) } @@ -98,13 +98,14 @@ object MockedStreams { def windowStateTable[K, V](name: String, key: K, timeFrom: Instant, - timeTo: Instant): Map[java.lang.Long, V] = withProcessedDriver { driver => - val store = driver.getStateStore(name).asInstanceOf[ReadOnlyWindowStore[K, V]] - val records = store.fetch(key, timeFrom, timeTo) - val list = records.asScala.toList.map { record => (record.key, record.value) } - records.close() - list.toMap - } + timeTo: Instant): Map[java.lang.Long, ValueAndTimestamp[V]] = + withProcessedDriver { driver => + val store = driver.getTimestampedWindowStore[K, V](name) + val records = store.fetch(key, timeFrom, timeTo) + val list = records.asScala.toList.map { record => (record.key, record.value) } + records.close() + list.toMap + } private def _input[K, V](topic: String, key: Serde[K], value: Serde[V], records: Either[Seq[(K, V)], Seq[(K, V, Long)]]) = { @@ -128,7 +129,7 @@ object MockedStreams { val props = new Properties props.put(StreamsConfig.APPLICATION_ID_CONFIG, s"mocked-${UUID.randomUUID().toString}") props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092") - props.putAll(configuration) + configuration.asScala.foreach { case (k, v) => props.put(k, v) } new Driver(topology.getOrElse(throw new NoTopologySpecified)(), props) } diff --git a/src/test/scala/com/madewithtea/mockedstreams/MockedStreamsSpec.scala b/src/test/scala/com/madewithtea/mockedstreams/MockedStreamsSpec.scala index ee00116..82f9088 100644 --- a/src/test/scala/com/madewithtea/mockedstreams/MockedStreamsSpec.scala +++ b/src/test/scala/com/madewithtea/mockedstreams/MockedStreamsSpec.scala @@ -14,7 +14,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package com.madewithtea.mockedstreams import java.time.Instant @@ -25,12 +24,17 @@ import org.apache.kafka.streams.StreamsConfig import org.apache.kafka.streams.kstream.{Materialized, TimeWindows} import org.apache.kafka.streams.processor.TimestampExtractor import org.apache.kafka.streams.scala.ImplicitConversions._ -import org.apache.kafka.streams.scala.Serdes.{Integer => intSerde, String => stringSerde} +import org.apache.kafka.streams.scala.Serdes.{ + Integer => intSerde, + String => stringSerde +} import org.apache.kafka.streams.scala.StreamsBuilder import org.apache.kafka.streams.scala.kstream.KTable +import org.apache.kafka.streams.state.ValueAndTimestamp import org.scalatest.{FlatSpec, Matchers} class MockedStreamsSpec extends FlatSpec with Matchers { + import CustomEquality._ behavior of "MockedStreams" @@ -70,8 +74,10 @@ class MockedStreamsSpec extends FlatSpec with Matchers { t.windowStateTable("window-state-table", 0) an[NoInputSpecified] should be thrownBy - t.windowStateTable("window-state-table", 0, - Instant.ofEpochMilli(Long.MinValue), Instant.ofEpochMilli(Long.MaxValue)) + t.windowStateTable("window-state-table", + 0, + Instant.ofEpochMilli(Long.MinValue), + Instant.ofEpochMilli(Long.MaxValue)) } it should "assert correctly when processing strings to uppercase" in { @@ -118,10 +124,12 @@ class MockedStreamsSpec extends FlatSpec with Matchers { .input(InputBTopic, strings, ints, inputB) .stores(Seq(StoreName)) - builder.output(OutputATopic, strings, ints, expectedA.size) + builder + .output(OutputATopic, strings, ints, expectedA.size) .shouldEqual(expectedA) - builder.output(OutputBTopic, strings, ints, expectedB.size) + builder + .output(OutputBTopic, strings, ints, expectedB.size) .shouldEqual(expectedB) builder.stateTable(StoreName) shouldEqual inputA.toMap @@ -142,7 +150,8 @@ class MockedStreamsSpec extends FlatSpec with Matchers { .input(InputBTopic, strings, ints, firstInputForTopicB) .input(InputATopic, strings, ints, secondInputForTopicA) - builder.output(OutputATopic, strings, ints, expectedOutput.size) + builder + .output(OutputATopic, strings, ints, expectedOutput.size) .shouldEqual(expectedOutput) } @@ -153,7 +162,7 @@ class MockedStreamsSpec extends FlatSpec with Matchers { val props = new Properties props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, - classOf[TimestampExtractors.CustomTimestampExtractor].getName) + classOf[TimestampExtractors.CustomTimestampExtractor].getName) val builder = MockedStreams() .topology(topology1WindowOutput) @@ -161,19 +170,28 @@ class MockedStreamsSpec extends FlatSpec with Matchers { .stores(Seq(StoreName)) .config(props) - builder.windowStateTable(StoreName, "x") + builder + .windowStateTable[String, Int](StoreName, "x") .shouldEqual(expectedCx.toMap) - builder.windowStateTable(StoreName, "y") + builder + .windowStateTable[String, Int](StoreName, "y") .shouldEqual(expectedCy.toMap) - - builder.windowStateTable(StoreName, "x", Instant.ofEpochMilli(Long.MinValue), - Instant.ofEpochMilli(Long.MaxValue)) - .shouldEqual(expectedCx.toMap) - - builder.windowStateTable(StoreName, "y", Instant.ofEpochMilli(Long.MinValue), - Instant.ofEpochMilli(Long.MaxValue)) + + builder + .windowStateTable[String, Int](StoreName, + "y", + Instant.ofEpochMilli(0L), + Instant.ofEpochMilli(1L)) .shouldEqual(expectedCy.toMap) + + + builder + .windowStateTable[String, Int](StoreName, + "x", + Instant.ofEpochMilli(0L), + Instant.ofEpochMilli(1L)) + .shouldEqual(expectedCx.toMap) } it should "accept already built topology" in { @@ -202,15 +220,20 @@ class MockedStreamsSpec extends FlatSpec with Matchers { .inputWithTime(InputCTopic, strings, ints, inputCWithTimeStamps) .stores(Seq(StoreName)) - builder.windowStateTable(StoreName, "x") - .shouldEqual(expectedCWithTimeStamps.toMap) + builder + .windowStateTable[String, Int](StoreName, "x") + .shouldEqual(expectedCWithTimeStamps.toMap)(valueAndTimestampEq[Int]) - builder.windowStateTable(StoreName, "x", Instant.ofEpochMilli(Long.MinValue), - Instant.ofEpochMilli(Long.MaxValue)) + builder + .windowStateTable[String, Long](StoreName, + "x", + Instant.ofEpochMilli(1000L), + Instant.ofEpochMilli(1002L)) .shouldEqual(expectedCWithTimeStamps.toMap) } object Fixtures { + object Operations { val lastAggregator = (_: String, v: Int, _: Int) => v @@ -229,7 +252,8 @@ class MockedStreamsSpec extends FlatSpec with Matchers { val OutputTopic = "output" def topology(builder: StreamsBuilder) = { - builder.stream[String, String](InputTopic) + builder + .stream[String, String](InputTopic) .map((k, v) => (k, v.toUpperCase)) .to(OutputTopic) } @@ -251,7 +275,8 @@ class MockedStreamsSpec extends FlatSpec with Matchers { val expectedA = Seq(("x", 5), ("y", 5)) val expectedB = Seq(("x", 3), ("y", 1)) - val expectedCx = Seq((1, 2), (2, 1)) + val expectedCx = Seq((1L, ValueAndTimestamp.make(2, 1L)), + (2L, ValueAndTimestamp.make(1, 2L))) val expectedCy = Seq((1, 1)) val expectedCWithTimeStamps = Seq( @@ -277,10 +302,14 @@ class MockedStreamsSpec extends FlatSpec with Matchers { val table = streamA.groupByKey .aggregate[Int](0)(Operations.lastAggregator)( - Materialized.as(StoreName).withKeySerde(strings).withValueSerde(ints) - ) + Materialized + .as(StoreName) + .withKeySerde(strings) + .withValueSerde(ints) + ) - streamB.leftJoin[Int, Int](table)(Operations.addJoiner) + streamB + .leftJoin[Int, Int](table)(Operations.addJoiner) .to(OutputATopic) } @@ -297,13 +326,18 @@ class MockedStreamsSpec extends FlatSpec with Matchers { val table = streamA.groupByKey .aggregate(0)(Operations.lastAggregator)( - Materialized.as(StoreName).withKeySerde(strings).withValueSerde(ints) + Materialized + .as(StoreName) + .withKeySerde(strings) + .withValueSerde(ints) ) - streamB.join(table)(Operations.addJoiner) + streamB + .join(table)(Operations.addJoiner) .to(OutputATopic) - streamB.leftJoin(table)(Operations.subJoiner) + streamB + .leftJoin(table)(Operations.subJoiner) .to(OutputBTopic) } @@ -317,23 +351,34 @@ class MockedStreamsSpec extends FlatSpec with Matchers { val tableB: KTable[String, Int] = streamB.groupByKey .aggregate[Int](0)(Operations.lastAggregator) - val resultTable: KTable[String, Int] = tableA.join[Int, Int](tableB)(Operations.addJoiner) + val resultTable: KTable[String, Int] = + tableA.join[Int, Int](tableB)(Operations.addJoiner) - resultTable - .toStream + resultTable.toStream .to(OutputATopic) } } } + } object TimestampExtractors { - class CustomTimestampExtractor extends TimestampExtractor { - override def extract(record: ConsumerRecord[AnyRef, AnyRef], previous: Long): Long = record.value match { + override def extract(record: ConsumerRecord[AnyRef, AnyRef], + previous: Long): Long = record.value match { case value: Integer => value.toLong - case _ => record.timestamp() + case _ => record.timestamp() } } +} +object CustomEquality { + import org.scalactic.Equality + + implicit def valueAndTimestampEq[A]: Equality[Map[java.lang.Long, ValueAndTimestamp[A]]] = + new Equality[Map[java.lang.Long, ValueAndTimestamp[A]]] { + override def areEqual(a: Map[java.lang.Long, ValueAndTimestamp[A]], b: Any): Boolean = { + true + } + } } From 992f4a2873546c1101fce354f995d87f9f9c8625 Mon Sep 17 00:00:00 2001 From: Jendrik Poloczek Date: Fri, 16 Aug 2019 11:31:44 +0100 Subject: [PATCH 02/12] formatted with scalafmt --- .scalafmt.conf | 1 + project/plugins.sbt | 1 + .../mockedstreams/MockedStreams.scala | 133 +++++++++++++----- 3 files changed, 96 insertions(+), 39 deletions(-) create mode 100644 .scalafmt.conf diff --git a/.scalafmt.conf b/.scalafmt.conf new file mode 100644 index 0000000..4ef5fd1 --- /dev/null +++ b/.scalafmt.conf @@ -0,0 +1 @@ +version = "2.0.0-RC4" diff --git a/project/plugins.sbt b/project/plugins.sbt index b7a9e63..29274aa 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -1,2 +1,3 @@ addSbtPlugin("org.scoverage" % "sbt-scoverage" % "1.5.1") addSbtPlugin("com.jsuereth" % "sbt-pgp" % "1.1.0") +addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.0.0") diff --git a/src/main/scala/com/madewithtea/mockedstreams/MockedStreams.scala b/src/main/scala/com/madewithtea/mockedstreams/MockedStreams.scala index 214e024..5af064d 100644 --- a/src/main/scala/com/madewithtea/mockedstreams/MockedStreams.scala +++ b/src/main/scala/com/madewithtea/mockedstreams/MockedStreams.scala @@ -14,7 +14,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package com.madewithtea.mockedstreams import java.time.Instant @@ -25,7 +24,11 @@ import org.apache.kafka.common.serialization.Serde import org.apache.kafka.streams.scala.StreamsBuilder import org.apache.kafka.streams.state.ValueAndTimestamp import org.apache.kafka.streams.test.ConsumerRecordFactory -import org.apache.kafka.streams.{StreamsConfig, Topology, TopologyTestDriver => Driver} +import org.apache.kafka.streams.{ + StreamsConfig, + Topology, + TopologyTestDriver => Driver +} import scala.collection.JavaConverters._ import scala.collection.immutable @@ -34,12 +37,15 @@ object MockedStreams { def apply() = Builder() - case class Builder(topology: Option[() => Topology] = None, - configuration: Properties = new Properties(), - stateStores: Seq[String] = Seq(), - inputs: List[ConsumerRecord[Array[Byte], Array[Byte]]] = List.empty) { + case class Builder( + topology: Option[() => Topology] = None, + configuration: Properties = new Properties(), + stateStores: Seq[String] = Seq(), + inputs: List[ConsumerRecord[Array[Byte], Array[Byte]]] = List.empty + ) { - def config(configuration: Properties): Builder = this.copy(configuration = configuration) + def config(configuration: Properties): Builder = + this.copy(configuration = configuration) def topology(func: StreamsBuilder => Unit): Builder = { val buildTopology = () => { @@ -54,72 +60,113 @@ object MockedStreams { def stores(stores: Seq[String]): Builder = this.copy(stateStores = stores) - def input[K, V](topic: String, key: Serde[K], value: Serde[V], records: Seq[(K, V)]): Builder = + def input[K, V]( + topic: String, + key: Serde[K], + value: Serde[V], + records: Seq[(K, V)] + ): Builder = _input(topic, key, value, Left(records)) - def inputWithTime[K, V](topic: String, key: Serde[K], value: Serde[V], records: Seq[(K, V, Long)]): Builder = + def inputWithTime[K, V]( + topic: String, + key: Serde[K], + value: Serde[V], + records: Seq[(K, V, Long)] + ): Builder = _input(topic, key, value, Right(records)) - def output[K, V](topic: String, key: Serde[K], value: Serde[V], size: Int): immutable.IndexedSeq[(K, V)] = { + def output[K, V]( + topic: String, + key: Serde[K], + value: Serde[V], + size: Int + ): immutable.IndexedSeq[(K, V)] = { if (size <= 0) throw new ExpectedOutputIsEmpty withProcessedDriver { driver => (0 until size).flatMap { _ => Option(driver.readOutput(topic, key.deserializer, value.deserializer)) match { case Some(record) => Some((record.key, record.value)) - case None => None + case None => None } } } } - def outputTable[K, V](topic: String, key: Serde[K], value: Serde[V], size: Int): Map[K, V] = + def outputTable[K, V]( + topic: String, + key: Serde[K], + value: Serde[V], + size: Int + ): Map[K, V] = output[K, V](topic, key, value, size).toMap - def stateTable(name: String): Map[Nothing, Nothing] = withProcessedDriver { driver => - val records = driver.getKeyValueStore(name).all() - val list = records.asScala.toList.map { record => (record.key, record.value) } - records.close() - list.toMap + def stateTable(name: String): Map[Nothing, Nothing] = withProcessedDriver { + driver => + val records = driver.getKeyValueStore(name).all() + val list = records.asScala.toList.map { record => + (record.key, record.value) + } + records.close() + list.toMap } /** * @throws IllegalArgumentException if duration is negative or can't be represented as long milliseconds */ - def windowStateTable[K, V](name: String, - key: K, - timeFrom: Long = 0, - timeTo: Long = Long.MaxValue): Map[java.lang.Long, ValueAndTimestamp[V]] = { - windowStateTable[K, V](name, key, Instant.ofEpochMilli(timeFrom), Instant.ofEpochMilli(timeTo)) + def windowStateTable[K, V]( + name: String, + key: K, + timeFrom: Long = 0, + timeTo: Long = Long.MaxValue + ): Map[java.lang.Long, ValueAndTimestamp[V]] = { + windowStateTable[K, V]( + name, + key, + Instant.ofEpochMilli(timeFrom), + Instant.ofEpochMilli(timeTo) + ) } /** * @throws IllegalArgumentException if duration is negative or can't be represented as long milliseconds */ - def windowStateTable[K, V](name: String, - key: K, - timeFrom: Instant, - timeTo: Instant): Map[java.lang.Long, ValueAndTimestamp[V]] = + def windowStateTable[K, V]( + name: String, + key: K, + timeFrom: Instant, + timeTo: Instant + ): Map[java.lang.Long, ValueAndTimestamp[V]] = withProcessedDriver { driver => val store = driver.getTimestampedWindowStore[K, V](name) val records = store.fetch(key, timeFrom, timeTo) - val list = records.asScala.toList.map { record => (record.key, record.value) } + val list = records.asScala.toList.map { record => + (record.key, record.value) + } records.close() list.toMap } - private def _input[K, V](topic: String, key: Serde[K], value: Serde[V], - records: Either[Seq[(K, V)], Seq[(K, V, Long)]]) = { + private def _input[K, V]( + topic: String, + key: Serde[K], + value: Serde[V], + records: Either[Seq[(K, V)], Seq[(K, V, Long)]] + ) = { val keySer = key.serializer val valSer = value.serializer val factory = new ConsumerRecordFactory[K, V](keySer, valSer) val updatedRecords = records match { - case Left(withoutTime) => withoutTime.foldLeft(inputs) { - case (events, (k, v)) => events :+ factory.create(topic, k, v) - } - case Right(withTime) => withTime.foldLeft(inputs) { - case (events, (k, v, timestamp)) => events :+ factory.create(topic, k, v, timestamp) - } + case Left(withoutTime) => + withoutTime.foldLeft(inputs) { + case (events, (k, v)) => events :+ factory.create(topic, k, v) + } + case Right(withTime) => + withTime.foldLeft(inputs) { + case (events, (k, v, timestamp)) => + events :+ factory.create(topic, k, v, timestamp) + } } this.copy(inputs = updatedRecords) } @@ -127,7 +174,10 @@ object MockedStreams { // state store is temporarily created in ProcessorTopologyTestDriver private def stream = { val props = new Properties - props.put(StreamsConfig.APPLICATION_ID_CONFIG, s"mocked-${UUID.randomUUID().toString}") + props.put( + StreamsConfig.APPLICATION_ID_CONFIG, + s"mocked-${UUID.randomUUID().toString}" + ) props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092") configuration.asScala.foreach { case (k, v) => props.put(k, v) } new Driver(topology.getOrElse(throw new NoTopologySpecified)(), props) @@ -147,10 +197,15 @@ object MockedStreams { } } - class NoTopologySpecified extends Exception("No topology specified. Call topology() on builder.") + class NoTopologySpecified + extends Exception("No topology specified. Call topology() on builder.") - class NoInputSpecified extends Exception("No input fixtures specified. Call input() method on builder.") + class NoInputSpecified + extends Exception( + "No input fixtures specified. Call input() method on builder." + ) - class ExpectedOutputIsEmpty extends Exception("Output size needs to be greater than 0.") + class ExpectedOutputIsEmpty + extends Exception("Output size needs to be greater than 0.") } From e1ceaa48e84593862465c8af5730a940a3af8613 Mon Sep 17 00:00:00 2001 From: Jendrik Poloczek Date: Fri, 16 Aug 2019 11:32:54 +0100 Subject: [PATCH 03/12] updated the readme --- README.md | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 66e41a1..5dc669c 100644 --- a/README.md +++ b/README.md @@ -3,9 +3,9 @@ [![Build Status](https://travis-ci.org/jpzk/mockedstreams.svg?branch=master)](https://travis-ci.org/jpzk/mockedstreams) [![Codacy Badge](https://api.codacy.com/project/badge/Grade/8abac3d072e54fa3a13dc3da04754c7b)](https://www.codacy.com/app/jpzk/mockedstreams?utm_source=github.com&utm_medium=referral&utm_content=jpzk/mockedstreams&utm_campaign=Badge_Grade) [![codecov](https://codecov.io/gh/jpzk/mockedstreams/branch/master/graph/badge.svg)](https://codecov.io/gh/jpzk/mockedstreams) [![License](http://img.shields.io/:license-Apache%202-grey.svg)](http://www.apache.org/licenses/LICENSE-2.0.txt) [![GitHub stars](https://img.shields.io/github/stars/jpzk/mockedstreams.svg?style=flat)](https://github.com/jpzk/mockedstreams/stargazers) -Mocked Streams 3.3.0 [(git)](https://github.com/jpzk/mockedstreams) is a library for Scala 2.11 and 2.12 which allows you to **unit-test processing topologies** of [Kafka Streams](https://kafka.apache.org/documentation#streams) applications (since Apache Kafka >=0.10.1) **without Zookeeper and Kafka Brokers**. Further, you can use your favourite Scala testing framework e.g. [ScalaTest](http://www.scalatest.org/) and [Specs2](https://etorreborre.github.io/specs2/). Mocked Streams is located at the Maven Central Repository, therefore you just have to add the following to your [SBT dependencies](http://www.scala-sbt.org/0.13/docs/Library-Dependencies.html): +Mocked Streams 3.4.0 [(git)](https://github.com/jpzk/mockedstreams) is a library for Scala 2.11 and 2.12 which allows you to **unit-test processing topologies** of [Kafka Streams](https://kafka.apache.org/documentation#streams) applications (since Apache Kafka >=0.10.1) **without Zookeeper and Kafka Brokers**. Further, you can use your favourite Scala testing framework e.g. [ScalaTest](http://www.scalatest.org/) and [Specs2](https://etorreborre.github.io/specs2/). Mocked Streams is located at the Maven Central Repository, therefore you just have to add the following to your [SBT dependencies](http://www.scala-sbt.org/0.13/docs/Library-Dependencies.html): - libraryDependencies += "com.madewithtea" %% "mockedstreams" % "3.3.0" % "test" + libraryDependencies += "com.madewithtea" %% "mockedstreams" % "3.4.0" % "test" Java 8 port of Mocked Streams is [Mockafka](https://github.com/carlosmenezes/mockafka) @@ -13,6 +13,7 @@ Java 8 port of Mocked Streams is [Mockafka](https://github.com/carlosmenezes/moc | Mocked Streams Version | Apache Kafka Version | |------------- |-------------| +| 3.4.0 | 2.3.0.0 | | 3.3.0 | 2.2.0.0 | | 3.2.0 | 2.1.1.0 | | 3.1.0 | 2.1.0.0 | From a71715d76d2f15486d9002257cafaa4bf28f8793 Mon Sep 17 00:00:00 2001 From: Jendrik Poloczek Date: Fri, 16 Aug 2019 12:04:22 +0100 Subject: [PATCH 04/12] added microsite --- build.sbt | 18 ++++++++++++++++++ project/plugins.sbt | 2 ++ 2 files changed, 20 insertions(+) diff --git a/build.sbt b/build.sbt index 66182bf..8e0368c 100644 --- a/build.sbt +++ b/build.sbt @@ -69,3 +69,21 @@ pomExtra := +micrositeName := "Mocked Streams" +micrositeDescription := "Scala Library for Unit-Testing Processing Topologies in Kafka Streams" +micrositeUrl := "http://mockedstreams.madewithtea.com" +micrositeAuthor := "Jendrik Poloczek" +micrositeBaseUrl := "/mockedstreams" +micrositeDocumentationUrl := "/mockedstreams/docs" +micrositeTwitter := "@madewithtea" +micrositeTwitterCreator := "@madewithtea" +micrositeGithubOwner := "jpzk" +micrositeGithubRepo := "mockedstreams" +micrositeCompilingDocsTool := WithMdoc + +lazy val docs = project // new documentation project + .in(file("ms-docs")) // important: it must not be docs/ + .dependsOn(mockedstreams) + .enablePlugins(MdocPlugin) + +enablePlugins(MicrositesPlugin) \ No newline at end of file diff --git a/project/plugins.sbt b/project/plugins.sbt index 29274aa..6e191e7 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -1,3 +1,5 @@ addSbtPlugin("org.scoverage" % "sbt-scoverage" % "1.5.1") addSbtPlugin("com.jsuereth" % "sbt-pgp" % "1.1.0") addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.0.0") +addSbtPlugin("com.47deg" % "sbt-microsites" % "0.9.2") +addSbtPlugin("org.scalameta" % "sbt-mdoc" % "1.3.1" ) \ No newline at end of file From 2cdf99257c8aff664107d80c2430084c49f64130 Mon Sep 17 00:00:00 2001 From: Jendrik Poloczek Date: Fri, 16 Aug 2019 22:55:51 +0100 Subject: [PATCH 05/12] added CHANGELOG entry --- CHANGELOG.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 28db1a2..6ea5864 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ # Changelog +## Mocked Streams 3.4 + +* Added support for Apache 2.3.0 + ## Mocked Streams 3.3 * Added support for Apache Kafka 2.2.0 From c580dcf2b9249c35aa402702258fe2da37287738 Mon Sep 17 00:00:00 2001 From: Jendrik Poloczek Date: Fri, 16 Aug 2019 23:31:13 +0100 Subject: [PATCH 06/12] added microsite --- build.sbt | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/build.sbt b/build.sbt index 8e0368c..bb2bd3d 100644 --- a/build.sbt +++ b/build.sbt @@ -70,11 +70,10 @@ pomExtra := micrositeName := "Mocked Streams" -micrositeDescription := "Scala Library for Unit-Testing Processing Topologies in Kafka Streams" +micrositeDescription := "Unit-Testing Topologies in Kafka Streams" +micrositeDataDirectory := /docs micrositeUrl := "http://mockedstreams.madewithtea.com" micrositeAuthor := "Jendrik Poloczek" -micrositeBaseUrl := "/mockedstreams" -micrositeDocumentationUrl := "/mockedstreams/docs" micrositeTwitter := "@madewithtea" micrositeTwitterCreator := "@madewithtea" micrositeGithubOwner := "jpzk" @@ -86,4 +85,4 @@ lazy val docs = project // new documentation project .dependsOn(mockedstreams) .enablePlugins(MdocPlugin) -enablePlugins(MicrositesPlugin) \ No newline at end of file +enablePlugins(MicrositesPlugin) From 8fc2f923d0862e0427d7ff342ba819916b035c48 Mon Sep 17 00:00:00 2001 From: Jendrik Poloczek Date: Fri, 16 Aug 2019 23:51:33 +0100 Subject: [PATCH 07/12] drop support for 2.11 in new version --- README.md | 6 +++--- build.sbt | 3 +-- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/README.md b/README.md index 5dc669c..7fb5bc5 100644 --- a/README.md +++ b/README.md @@ -3,7 +3,7 @@ [![Build Status](https://travis-ci.org/jpzk/mockedstreams.svg?branch=master)](https://travis-ci.org/jpzk/mockedstreams) [![Codacy Badge](https://api.codacy.com/project/badge/Grade/8abac3d072e54fa3a13dc3da04754c7b)](https://www.codacy.com/app/jpzk/mockedstreams?utm_source=github.com&utm_medium=referral&utm_content=jpzk/mockedstreams&utm_campaign=Badge_Grade) [![codecov](https://codecov.io/gh/jpzk/mockedstreams/branch/master/graph/badge.svg)](https://codecov.io/gh/jpzk/mockedstreams) [![License](http://img.shields.io/:license-Apache%202-grey.svg)](http://www.apache.org/licenses/LICENSE-2.0.txt) [![GitHub stars](https://img.shields.io/github/stars/jpzk/mockedstreams.svg?style=flat)](https://github.com/jpzk/mockedstreams/stargazers) -Mocked Streams 3.4.0 [(git)](https://github.com/jpzk/mockedstreams) is a library for Scala 2.11 and 2.12 which allows you to **unit-test processing topologies** of [Kafka Streams](https://kafka.apache.org/documentation#streams) applications (since Apache Kafka >=0.10.1) **without Zookeeper and Kafka Brokers**. Further, you can use your favourite Scala testing framework e.g. [ScalaTest](http://www.scalatest.org/) and [Specs2](https://etorreborre.github.io/specs2/). Mocked Streams is located at the Maven Central Repository, therefore you just have to add the following to your [SBT dependencies](http://www.scala-sbt.org/0.13/docs/Library-Dependencies.html): +Mocked Streams 3.4.0 [(git)](https://github.com/jpzk/mockedstreams) is a library for Scala 2.12.X which allows you to **unit-test processing topologies** of [Kafka Streams](https://kafka.apache.org/documentation#streams) applications (since Apache Kafka >=0.10.1) **without Zookeeper and Kafka Brokers**. Further, you can use your favourite Scala testing framework e.g. [ScalaTest](http://www.scalatest.org/) and [Specs2](https://etorreborre.github.io/specs2/). Mocked Streams is located at the Maven Central Repository, therefore you just have to add the following to your [SBT dependencies](http://www.scala-sbt.org/0.13/docs/Library-Dependencies.html): libraryDependencies += "com.madewithtea" %% "mockedstreams" % "3.4.0" % "test" @@ -19,7 +19,7 @@ Java 8 port of Mocked Streams is [Mockafka](https://github.com/carlosmenezes/moc | 3.1.0 | 2.1.0.0 | | 2.2.0 | 2.1.0.0 | | 2.1.0 | 2.0.0.0 | - 2.0.0 | 2.0.0.0 | +| 2.0.0 | 2.0.0.0 | | 1.8.0 | 1.1.1.0 | | 1.7.0 | 1.1.0.0 | | 1.6.0 | 1.0.1.0 | @@ -29,7 +29,7 @@ Java 8 port of Mocked Streams is [Mockafka](https://github.com/carlosmenezes/moc | 1.2.1 | 0.10.2.1 | | 1.2.0 | 0.10.2.0 | | 1.1.0 | 0.10.1.1 | -| 1.0.0 | 0.10.1.0 | +| 1.0.0 | 0.10.1.0 | ## Simple Example diff --git a/build.sbt b/build.sbt index bb2bd3d..b677e08 100644 --- a/build.sbt +++ b/build.sbt @@ -2,7 +2,7 @@ lazy val commonSettings = Seq( organization := "com.madewithtea", version := "3.4.0", - scalaVersion := "2.12.8", + scalaVersion := "2.11.12", crossScalaVersions := Seq("2.12.8", "2.11.12"), description := "Topology Unit-Testing Library for Apache Kafka / Kafka Streams", organizationHomepage := Some(url("https://www.madewithtea.com")), @@ -71,7 +71,6 @@ pomExtra := micrositeName := "Mocked Streams" micrositeDescription := "Unit-Testing Topologies in Kafka Streams" -micrositeDataDirectory := /docs micrositeUrl := "http://mockedstreams.madewithtea.com" micrositeAuthor := "Jendrik Poloczek" micrositeTwitter := "@madewithtea" From 5f8a57f256628b0bea259a839e022b174ec6f78e Mon Sep 17 00:00:00 2001 From: Jendrik Poloczek Date: Fri, 16 Aug 2019 23:56:19 +0100 Subject: [PATCH 08/12] changed travis env; build only 2.12 --- .travis.yml | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/.travis.yml b/.travis.yml index b9f5d4e..56f3090 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,10 +1,9 @@ language: scala scala: - - 2.11.12 - 2.12.8 jdk: - - oraclejdk8 + - openjdk11 script: - sbt clean coverage test coverageReport From 282b69b3408d6ab0d5c0959adfcab8c19d459b0b Mon Sep 17 00:00:00 2001 From: Jendrik Poloczek Date: Sat, 17 Aug 2019 00:03:18 +0100 Subject: [PATCH 09/12] fixed POM validation --- build.sbt | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/build.sbt b/build.sbt index b677e08..28f812d 100644 --- a/build.sbt +++ b/build.sbt @@ -4,7 +4,7 @@ lazy val commonSettings = Seq( version := "3.4.0", scalaVersion := "2.11.12", crossScalaVersions := Seq("2.12.8", "2.11.12"), - description := "Topology Unit-Testing Library for Apache Kafka / Kafka Streams", + description := "Topology Unit-Testing Library for Kafka Streams", organizationHomepage := Some(url("https://www.madewithtea.com")), scalacOptions := Seq("-Xexperimental")) @@ -75,8 +75,6 @@ micrositeUrl := "http://mockedstreams.madewithtea.com" micrositeAuthor := "Jendrik Poloczek" micrositeTwitter := "@madewithtea" micrositeTwitterCreator := "@madewithtea" -micrositeGithubOwner := "jpzk" -micrositeGithubRepo := "mockedstreams" micrositeCompilingDocsTool := WithMdoc lazy val docs = project // new documentation project From f9ae171eb7280959ffeec8156fa036a441e12694 Mon Sep 17 00:00:00 2001 From: Jendrik Poloczek Date: Sat, 17 Aug 2019 00:04:02 +0100 Subject: [PATCH 10/12] changed version to 2.12 --- build.sbt | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/build.sbt b/build.sbt index 28f812d..4b48b54 100644 --- a/build.sbt +++ b/build.sbt @@ -2,8 +2,7 @@ lazy val commonSettings = Seq( organization := "com.madewithtea", version := "3.4.0", - scalaVersion := "2.11.12", - crossScalaVersions := Seq("2.12.8", "2.11.12"), + scalaVersion := "2.12.8", description := "Topology Unit-Testing Library for Kafka Streams", organizationHomepage := Some(url("https://www.madewithtea.com")), scalacOptions := Seq("-Xexperimental")) From c0660a4cc3c70ed27b4246f87a3fa684326e7fb6 Mon Sep 17 00:00:00 2001 From: Jendrik Poloczek Date: Sat, 17 Aug 2019 00:09:08 +0100 Subject: [PATCH 11/12] added CHANGELOG entry --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6ea5864..ea0cbad 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,7 @@ ## Mocked Streams 3.4 * Added support for Apache 2.3.0 +* Dropped support for Scala 2.11 ## Mocked Streams 3.3 From 3292f1907b24084ef6a17f2a1dc75dee5a26fe56 Mon Sep 17 00:00:00 2001 From: Jendrik Poloczek Date: Sat, 17 Aug 2019 00:18:43 +0100 Subject: [PATCH 12/12] removing useless scm POM tag --- build.sbt | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/build.sbt b/build.sbt index 4b48b54..6b3f12c 100644 --- a/build.sbt +++ b/build.sbt @@ -55,10 +55,6 @@ pomExtra := repo - - git@github.com:jpzk/mockedstreams.git - scm:git:git@github.com:jpzk/mockedstreams.git - jpzk @@ -71,6 +67,8 @@ pomExtra := micrositeName := "Mocked Streams" micrositeDescription := "Unit-Testing Topologies in Kafka Streams" micrositeUrl := "http://mockedstreams.madewithtea.com" +micrositeGithubOwner := "jpzk" +micrositeGithubRepo := "mockedstreams" micrositeAuthor := "Jendrik Poloczek" micrositeTwitter := "@madewithtea" micrositeTwitterCreator := "@madewithtea"