Skip to content

Commit

Permalink
Merge pull request #36 from jpzk/release/2.0
Browse files Browse the repository at this point in the history
Release/2.0
  • Loading branch information
jpzk authored Aug 10, 2018
2 parents 8e70254 + d218462 commit 2829cbb
Show file tree
Hide file tree
Showing 7 changed files with 74 additions and 61 deletions.
9 changes: 9 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,14 @@
# Changelog

## Mocked Streams 2.0

* Build against Apache Kafka 2.0
* Changes to support Kafka 2.0
* Replaced ProcessorTopologyTestDriver with TopologyTestDriver
* Removed Record class to use ConsumerRecord directly
* Added Michal Dziemianko to CONTRIBUTORS.md
* Thanks to Michal for Kafka 2.0 support

## Mocked Streams 1.8.0

* Bumping versions of dependencies
Expand Down
1 change: 1 addition & 0 deletions CONTRIBUTORS.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@
* Jendrik Poloczek
* Svend Vanderveken
* Daniel Wojda
* Michal Dziemianko
11 changes: 6 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,15 @@
[![codecov](https://codecov.io/gh/jpzk/mockedstreams/branch/master/graph/badge.svg)](https://codecov.io/gh/jpzk/mockedstreams) [![License](http://img.shields.io/:license-Apache%202-grey.svg)](http://www.apache.org/licenses/LICENSE-2.0.txt) [![GitHub stars](https://img.shields.io/github/stars/jpzk/mockedstreams.svg?style=flat)](https://github.com/jpzk/mockedstreams/stargazers)


Mocked Streams 1.8.0 [(git)](https://github.com/jpzk/mockedstreams) is a library for Scala 2.11 and 2.12 which allows you to **unit-test processing topologies** of [Kafka Streams](https://kafka.apache.org/documentation#streams) applications (since Apache Kafka >=0.10.1) **without Zookeeper and Kafka Brokers**. Further, you can use your favourite Scala testing framework e.g. [ScalaTest](http://www.scalatest.org/) and [Specs2](https://etorreborre.github.io/specs2/). Mocked Streams is located at the Maven Central Repository, therefore you just have to add the following to your [SBT dependencies](http://www.scala-sbt.org/0.13/docs/Library-Dependencies.html):
Mocked Streams 2.0.0 [(git)](https://github.com/jpzk/mockedstreams) is a library for Scala 2.11 and 2.12 which allows you to **unit-test processing topologies** of [Kafka Streams](https://kafka.apache.org/documentation#streams) applications (since Apache Kafka >=0.10.1) **without Zookeeper and Kafka Brokers**. Further, you can use your favourite Scala testing framework e.g. [ScalaTest](http://www.scalatest.org/) and [Specs2](https://etorreborre.github.io/specs2/). Mocked Streams is located at the Maven Central Repository, therefore you just have to add the following to your [SBT dependencies](http://www.scala-sbt.org/0.13/docs/Library-Dependencies.html):

libraryDependencies += "com.madewithtea" %% "mockedstreams" % "1.8.0" % "test"
libraryDependencies += "com.madewithtea" %% "mockedstreams" % "2.0.0" % "test"

## Apache Kafka Compatibility

| Mocked Streams Version | Apache Kafka Version |
|------------- |-------------|
| 2.0.0 | 2.0.0.0 |
| 1.8.0 | 1.1.1.0 |
| 1.7.0 | 1.1.0.0 |
| 1.6.0 | 1.0.1.0 |
Expand All @@ -26,7 +27,7 @@ Mocked Streams 1.8.0 [(git)](https://github.com/jpzk/mockedstreams) is a library

## Simple Example

It wraps the [org.apache.kafka.test.ProcessorTopologyTestDriver](https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java) class, but adds more syntactic sugar to keep your test code simple:
It wraps the [org.apache.kafka.streams.TopologyTestDriver](https://github.com/apache/kafka/blob/trunk/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java) class, but adds more syntactic sugar to keep your test code simple:

import com.madewithtea.mockedstreams.MockedStreams

Expand Down Expand Up @@ -95,7 +96,7 @@ When you define your state stores via .stores(stores: Seq[String]) since 1.2 and
import com.madewithtea.mockedstreams.MockedStreams

val props = new Properties
props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
classOf[TimestampExtractors.CustomTimestampExtractor].getName)

val mstreams = MockedStreams()
Expand All @@ -114,7 +115,7 @@ Sometimes you need to pass a custom configuration to Kafka Streams:
import com.madewithtea.mockedstreams.MockedStreams

val props = new Properties
props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, classOf[CustomExtractor].getName)
props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, classOf[CustomExtractor].getName)

val mstreams = MockedStreams()
.topology { builder => builder.stream(...) [...] }
Expand Down
18 changes: 10 additions & 8 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@

lazy val commonSettings = Seq(
organization := "com.madewithtea",
version := "1.8.0",
version := "2.0.0",
scalaVersion := "2.12.6",
crossScalaVersions := Seq("2.12.6","2.11.12"),
description := "Topology Unit-Testing Library for Apache Kafka / Kafka Streams",
Expand All @@ -10,15 +10,17 @@ lazy val commonSettings = Seq(

val scalaTestVersion = "3.0.5"
val rocksDBVersion = "5.14.2"
val kafkaVersion = "1.1.1"
val kafkaVersion = "2.0.0"

lazy val kafka = Seq(
"org.apache.kafka" % "kafka-clients" % kafkaVersion,
"org.apache.kafka" % "kafka-clients" % kafkaVersion classifier "test",
"org.apache.kafka" % "kafka-streams" % kafkaVersion,
"org.apache.kafka" % "kafka-streams" % kafkaVersion classifier "test",
"org.apache.kafka" %% "kafka" % kafkaVersion
)
"javax.ws.rs" % "javax.ws.rs-api" % "2.1" jar(),
"org.apache.kafka" % "kafka-clients" % kafkaVersion,
"org.apache.kafka" % "kafka-clients" % kafkaVersion classifier "test",
"org.apache.kafka" % "kafka-streams" % kafkaVersion,
"org.apache.kafka" %% "kafka-streams-scala" % kafkaVersion,
"org.apache.kafka" % "kafka-streams-test-utils" % kafkaVersion,
"org.apache.kafka" %% "kafka" % kafkaVersion
)

lazy val scalaTest = "org.scalatest" %% "scalatest" % scalaTestVersion % "test"
lazy val rocksDB = "org.rocksdb" % "rocksdbjni" % rocksDBVersion % "test"
Expand Down
2 changes: 1 addition & 1 deletion project/build.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
sbt.version=1.0.2
sbt.version=1.1.6
31 changes: 13 additions & 18 deletions src/main/scala/MockedStreams.scala
Original file line number Diff line number Diff line change
Expand Up @@ -14,31 +14,31 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.madewithtea.mockedstreams

import java.util.{Properties, UUID}

import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.Serde
import org.apache.kafka.streams.{StreamsBuilder, StreamsConfig, Topology}
import org.apache.kafka.streams.state.ReadOnlyWindowStore
import org.apache.kafka.test.{ProcessorTopologyTestDriver => Driver}
import org.apache.kafka.streams.test.ConsumerRecordFactory
import org.apache.kafka.streams.{StreamsBuilder, StreamsConfig, Topology, TopologyTestDriver => Driver}

import scala.collection.JavaConverters._

object MockedStreams {

def apply() = Builder()

case class Record(topic: String, key: Array[Byte], value: Array[Byte])

case class Builder(topology: Option[() => Topology] = None,
configuration: Properties = new Properties(),
stateStores: Seq[String] = Seq(),
inputs: List[Record] = List.empty) {
inputs: List[ConsumerRecord[Array[Byte], Array[Byte]]] = List.empty) {

def config(configuration: Properties) = this.copy(configuration = configuration)

def topology(func: (StreamsBuilder => Unit)) = {
def topology(func: StreamsBuilder => Unit) = {
val buildTopology = () => {
val builder = new StreamsBuilder()
func(builder)
Expand All @@ -55,9 +55,11 @@ object MockedStreams {
val keySer = key.serializer
val valSer = value.serializer

val factory = new ConsumerRecordFactory[K, V](keySer, valSer)

val updatedRecords = newRecords.foldLeft(inputs) {
case (events, (k, v)) =>
val newRecord = Record(topic, keySer.serialize(topic, k), valSer.serialize(topic, v))
val newRecord = factory.create(topic, k, v)
events :+ newRecord
}

Expand Down Expand Up @@ -97,22 +99,15 @@ object MockedStreams {

// state store is temporarily created in ProcessorTopologyTestDriver
private def stream = {
val props: java.util.Map[Object, Object] = new Properties
val props = new Properties
props.put(StreamsConfig.APPLICATION_ID_CONFIG, s"mocked-${UUID.randomUUID().toString}")
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
props.putAll(configuration)

val t = topology.getOrElse(throw new NoTopologySpecified)

new Driver(new StreamsConfig(props), t())
new Driver(topology.getOrElse(throw new NoTopologySpecified)(), props)
}

private def produce(driver: Driver): Unit = {
inputs.foreach {
case Record(topic, key, value) =>
driver.process(topic, key, value)
}
}
private def produce(driver: Driver): Unit =
inputs.foreach(driver.pipeInput)

private def withProcessedDriver[T](f: Driver => T): T = {
if(inputs.isEmpty)
Expand Down
63 changes: 34 additions & 29 deletions src/test/scala/MockedStreamsSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.madewithtea.mockedstreams

import org.apache.kafka.clients.consumer.ConsumerRecord
Expand Down Expand Up @@ -140,7 +141,7 @@ class MockedStreamsSpec extends FlatSpec with Matchers {
import java.util.Properties

val props = new Properties
props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
classOf[TimestampExtractors.CustomTimestampExtractor].getName)

val builder = MockedStreams()
Expand Down Expand Up @@ -197,14 +198,14 @@ class MockedStreamsSpec extends FlatSpec with Matchers {

val strings = Serdes.String()
val serdes = Consumed.`with`(strings, strings)

val InputTopic = "input"
val OutputTopic = "output"

def topology(builder: StreamsBuilder) = {
builder.stream(InputTopic, serdes)
.map[String, String]((k, v) => new KeyValue(k, v.toUpperCase))
.to(strings, strings, OutputTopic)
.to(OutputTopic, Produced.`with`(strings, strings))
}
}

Expand Down Expand Up @@ -236,59 +237,63 @@ class MockedStreamsSpec extends FlatSpec with Matchers {
val streamA = builder.stream(InputATopic, serdes)
val streamB = builder.stream(InputBTopic, serdes)

val table = streamA.groupByKey(strings, ints).aggregate(
new LastInitializer,
new LastAggregator, ints, StoreName)
val table = streamA.groupByKey(Serialized.`with`(strings, ints))
.aggregate(
new LastInitializer,
new LastAggregator,
Materialized.as(StoreName).withKeySerde(strings).withValueSerde(ints)
)

streamB.leftJoin[Integer, Integer](table, new AddJoiner(), strings, ints)
.to(strings, ints, OutputATopic)
streamB.leftJoin[Integer, Integer](table, new AddJoiner(), Joined.`with`(strings, ints, ints))
.to(OutputATopic, Produced.`with`(strings, ints))
}

def topology1WindowOutput(builder: StreamsBuilder) = {
val streamA = builder.stream(InputCTopic, serdes)
streamA.groupByKey(strings, ints).count(
TimeWindows.of(1),
StoreName)
streamA.groupByKey(Serialized.`with`(strings, ints))
.windowedBy(TimeWindows.of(1))
.count(Materialized.as(StoreName))
}

def topology2Output(builder: StreamsBuilder) = {
val streamA = builder.stream(InputATopic, serdes)
val streamB = builder.stream(InputBTopic, serdes)

val table = streamA.groupByKey(strings, ints).aggregate(
val table = streamA.groupByKey(Serialized.`with`(strings, ints)).aggregate(
new LastInitializer,
new LastAggregator,
ints,
StoreName)
Materialized.as(StoreName).withKeySerde(strings).withValueSerde(ints))

streamB.leftJoin[Integer, Integer](table, new AddJoiner(), strings, ints)
.to(strings, ints, OutputATopic)
streamB.leftJoin[Integer, Integer](table, new AddJoiner(), Joined.`with`(strings, ints, ints))
.to(OutputATopic, Produced.`with`(strings, ints))

streamB.leftJoin[Integer, Integer](table, new SubJoiner(), strings, ints)
.to(strings, ints, OutputBTopic)
streamB.leftJoin[Integer, Integer](table, new SubJoiner(), Joined.`with`(strings, ints, ints))
.to(OutputBTopic, Produced.`with`(strings, ints))
}

def topologyTables(builder: StreamsBuilder) = {
val streamA = builder.stream(InputATopic, serdes)
val streamB = builder.stream(InputBTopic, serdes)

val tableA = streamA.groupByKey(strings, ints).aggregate(
new LastInitializer,
new LastAggregator,
ints,
StoreName)
val tableA = streamA.groupByKey(Serialized.`with`(strings, ints))
.aggregate(
new LastInitializer,
new LastAggregator,
Materialized.as(StoreName).withKeySerde(strings).withValueSerde(ints)
)

val tableB = streamB.groupByKey(strings, ints).aggregate(
new LastInitializer,
new LastAggregator,
ints,
Store2Name)
val tableB = streamB.groupByKey(Serialized.`with`(strings, ints))
.aggregate(
new LastInitializer,
new LastAggregator,
Materialized.as(Store2Name).withKeySerde(strings).withValueSerde(ints)
)

val resultTable = tableA.join[Integer,Integer](tableB, new AddJoiner)

resultTable
.toStream
.to(strings, ints, OutputATopic)
.to(OutputATopic, Produced.`with`(strings, ints))
}
}

Expand Down

0 comments on commit 2829cbb

Please sign in to comment.