Skip to content

Commit

Permalink
use assertj
Browse files Browse the repository at this point in the history
  • Loading branch information
heesung-sn committed Nov 8, 2024
1 parent 6889aaf commit 9e110e3
Show file tree
Hide file tree
Showing 6 changed files with 78 additions and 73 deletions.
6 changes: 3 additions & 3 deletions pulsar-inttest-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
<version>${testng.version}</version>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<version>${assertj-core.version}</version>
</dependency>
<dependency>
<groupId>org.apache.pulsar</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pulsar.tests.integration.messaging;

import static org.apache.pulsar.tests.integration.utils.IntegTestUtils.getPartitionedTopic;
import static org.assertj.core.api.Assertions.assertThat;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -31,7 +32,6 @@
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.tests.integration.IntegTest;
import org.testng.Assert;

/**
* Delay messaging test.
Expand Down Expand Up @@ -77,14 +77,16 @@ public void delayMsgBlockTest() throws Exception {

// receive message at first time
Message<byte[]> message = consumer.receive(delayTimeSeconds * 2, TimeUnit.SECONDS);
Assert.assertNotNull(message, "Can't receive message at the first time.");
assertThat(message).isNotNull().as("Can't receive message at the first time.");
consumer.reconsumeLater(message, delayTimeSeconds, TimeUnit.SECONDS);

// receive retry messages
for (int i = 0; i < redeliverCnt; i++) {
message = consumer.receive(delayTimeSeconds * 2, TimeUnit.SECONDS);
Assert.assertNotNull(message, "Consumer can't receive message in double delayTimeSeconds time "
+ delayTimeSeconds * 2 + "s");
assertThat(message)
.isNotNull()
.as("Consumer can't receive message in double delayTimeSeconds time "
+ delayTimeSeconds * 2 + "s");
log.info("receive msg. reConsumeTimes: {}", message.getProperty("RECONSUMETIMES"));
consumer.reconsumeLater(message, delayTimeSeconds, TimeUnit.SECONDS);
}
Expand All @@ -97,7 +99,7 @@ public void delayMsgBlockTest() throws Exception {
.subscribe();

message = dltConsumer.receive(10, TimeUnit.SECONDS);
Assert.assertNotNull(message, "Dead letter topic consumer can't receive message.");
assertThat(message).isNotNull().as("Dead letter topic consumer can't receive message.");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*/
package org.apache.pulsar.tests.integration.messaging;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.fail;
import java.nio.charset.StandardCharsets;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
Expand All @@ -30,7 +32,6 @@
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.tests.integration.GeoRepIntegTest;
import org.awaitility.Awaitility;
import org.testng.Assert;

/**
* Geo replication test.
Expand All @@ -50,9 +51,9 @@ public void testTopicReplication(String domain) throws Exception {
adminA.topics().createPartitionedTopic(topic, 10);
} catch (Exception e) {
log.error("Failed to create partitioned topic {}.", topic, e);
Assert.fail("Failed to create partitioned topic " + topic);
fail("Failed to create partitioned topic " + topic);
}
Assert.assertEquals(adminA.topics().getPartitionedTopicMetadata(topic).partitions, 10);
assertThat(adminA.topics().getPartitionedTopicMetadata(topic).partitions).isEqualTo(10);
});
log.info("Test geo-replication produce and consume for topic {}.", topic);

Expand All @@ -76,7 +77,7 @@ public void testTopicReplication(String domain) throws Exception {

for (int i = 0; i < 10; i++) {
Message<byte[]> message = c.receive(10, TimeUnit.SECONDS);
Assert.assertNotNull(message);
assertThat(message).isNotNull();
}
log.info("Successfully consume message from cluster {} for topic {}.", "cluster2", topic);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.pulsar.tests.integration.utils.IntegTestUtils.getNonPartitionedTopic;
import static org.testng.Assert.assertEquals;
import static org.assertj.core.api.Assertions.assertThat;
import java.util.stream.IntStream;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.Consumer;
Expand Down Expand Up @@ -62,7 +62,7 @@ public void testNonDurableConsumer() throws Exception {

for (int i = 0; i < numMessages; i++) {
Message<byte[]> msg = consumer.receive();
assertEquals(new String(msg.getValue(), UTF_8), "message-" + i);
assertThat(new String(msg.getValue(), UTF_8)).isEqualTo("message-" + i);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@
package org.apache.pulsar.tests.integration.messaging;

import static org.apache.pulsar.tests.integration.utils.IntegTestUtils.getNonPartitionedTopic;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.assertj.core.api.Assertions.assertThat;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -58,24 +57,24 @@ public void testReaderReconnectAndRead() throws Exception {
.create();
for (int i = 0; i < messagesToSend; i++) {
MessageId messageId = producer.newMessage().value("message-" + i).send();
assertNotNull(messageId);
assertThat(messageId).isNotNull();
}

for (int i = 0; i < messagesToSend; i++) {
Message<String> msg = reader.readNext();
assertEquals(msg.getValue(), "message-" + i);
assertThat(msg.getValue()).isEqualTo("message-" + i);
}

admin.topics().unload(topicName);

for (int i = 0; i < messagesToSend; i++) {
MessageId messageId = producer.newMessage().value("message-" + i).send();
assertNotNull(messageId);
assertThat(messageId).isNotNull();
}

for (int i = 0; i < messagesToSend; i++) {
Message<String> msg = reader.readNext();
assertEquals(msg.getValue(), "message-" + i);
assertThat(msg.getValue()).isEqualTo("message-" + i);
}

log.info("-- Exiting testReaderReconnectAndRead test --");
Expand Down Expand Up @@ -103,24 +102,24 @@ public void testReaderReconnectAndReadBatchMessages()

for (int i = 0; i < messagesToSend; i++) {
MessageId messageId = producer.newMessage().value("message-" + i).send();
assertNotNull(messageId);
assertThat(messageId).isNotNull();
}

for (int i = 0; i < messagesToSend; i++) {
Message<String> msg = reader.readNext();
assertEquals(msg.getValue(), "message-" + i);
assertThat(msg.getValue()).isEqualTo("message-" + i);
}

admin.topics().unload(topicName);

for (int i = 0; i < messagesToSend; i++) {
MessageId messageId = producer.newMessage().value("message-" + i).send();
assertNotNull(messageId);
assertThat(messageId).isNotNull();
}

for (int i = 0; i < messagesToSend; i++) {
Message<String> msg = reader.readNext();
assertEquals(msg.getValue(), "message-" + i);
assertThat(msg.getValue()).isEqualTo("message-" + i);
}

log.info("-- Exiting testReaderReconnectAndReadBatchMessages test --");
Expand Down
Loading

0 comments on commit 9e110e3

Please sign in to comment.