Add this library in dependencies:
<dependency>
<groupId>com.jupiter-tools</groupId>
<artifactId>spring-test-kafka</artifactId>
<version>0.2</version>
</dependency>
Now, you can start Kafka in docker (TestContainers) by the using of @KafkaTestContainer
annotation in tests:
@SpringBootTest
@KafkaTestContainer
class RedisTestContainerTest {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@Test
void sendAndReceiveTest() throws InterruptedException {
assertThat(kafkaTemplate).isNotNull();
kafkaTemplate.send("test-topic", "flight of a dragon");
...
}
}
You can use this annotation to start Kafka container in tests both with JUnit5 and JUnit4. The implementation doesn’t depend on some test framework, just on the Spring Framework.
@SpringBootTest
@KafkaTestContainer (1)
@KafkaTestContainer(bootstrapServersPropertyName = "second.kafka.server") (2)
class MultipleContainerInOneTest {
...
}
-
start a first Kafka test container and set a bootstrap servers of started container to default Spring Boot properties (
spring.kafka.bootstrap-servers
) -
start one more Kafka container and set a bootstrap servers to specified property, exactly in this property you can read an actual value of bootstrap servers after run the application context.
You can test received messages after test execution if you use JSON format.
Just describe expected dataset for a test case by the using @ExpectedDataSet
annotation:
@SpringBootTest
@KafkaTestContainer
@EnableKafkaTest(topics = {"test-topic", "another-topic"})
class ExpectedMessagesTest {
@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;
@Test
@ExpectedMessages(topic = "test-topic", datasetFile = "/datasets/expected_event.json")
void firstTopic() {
kafkaTemplate.send("test-topic", new Foo("qwert"));
kafkaTemplate.send("test-topic", new Bar("baaark"));
}
@Test
@ExpectedMessages(topic = "another-topic", datasetFile = "/datasets/expected_another_event.json")
void anotherTopic() {
kafkaTemplate.send("another-topic", Bar.builder().time(new Date()).build());
}
}
class Foo {
String value;
}
class Bar {
String name;
Date time;
}
And after test execution, we will wait for messages declared in this JSON file. The content of JSON dataset file:
{
"com.kafkatest.example.events.Foo": [
{
"value": "qwert"
}
],
"com.kafkatest.example.events.Bar": [
{
"name": "baaark"
}
]
}
Also, you can use @NoMessagesExpected
to check the silence on air.
Receiving any messages will fail the test case.
@Test
@NoMessagesExpected(timeout = 3000)
void silence() {
}
Waits during the timeout for messages in Kafka topics(set in @EnableKafkaTest
),
if receive something then throws an exception.