diff --git a/modbus/src/main/java/com/digitalpetri/modbus/client/ModbusTcpClient.java b/modbus/src/main/java/com/digitalpetri/modbus/client/ModbusTcpClient.java index 17cfc2e..94d3758 100644 --- a/modbus/src/main/java/com/digitalpetri/modbus/client/ModbusTcpClient.java +++ b/modbus/src/main/java/com/digitalpetri/modbus/client/ModbusTcpClient.java @@ -19,7 +19,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -33,17 +33,27 @@ public class ModbusTcpClient extends ModbusClient { private final Logger logger = LoggerFactory.getLogger(getClass()); - private final TransactionSequence transactionSequence = new TransactionSequence(); private final Map promises = new ConcurrentHashMap<>(); - private final ModbusTcpClientTransport transport; private final ModbusClientConfig config; + private final ModbusTcpClientTransport transport; + private final TransactionSequence transactionSequence; public ModbusTcpClient(ModbusClientConfig config, ModbusTcpClientTransport transport) { + this(config, transport, new DefaultTransactionSequence()); + } + + public ModbusTcpClient( + ModbusClientConfig config, + ModbusTcpClientTransport transport, + TransactionSequence transactionSequence + ) { + super(transport); this.config = config; this.transport = transport; + this.transactionSequence = transactionSequence; transport.receive(this::onFrameReceived); } @@ -224,12 +234,44 @@ private record ResponsePromise( TimeoutHandle timeout ) {} - static class TransactionSequence { + public interface TransactionSequence { + + /** + * Return the next 2-byte transaction identifier. Range is [0, 65535] by default. + * + * @return the next 2-byte transaction identifier. + */ + int next(); + } + + public static class DefaultTransactionSequence implements TransactionSequence { - private final AtomicInteger transactionId = new AtomicInteger(0); + private final int low; + private final int high; - int next() { - return transactionId.getAndIncrement() & 0xFFFF; + private final AtomicReference transactionId = new AtomicReference<>(0); + + public DefaultTransactionSequence() { + this(0, 65535); + } + + public DefaultTransactionSequence(int low, int high) { + this.low = low; + this.high = high; + + transactionId.set(low); + } + + @Override + public int next() { + while (true) { + Integer id = transactionId.get(); + Integer nextId = id >= high ? low : id + 1; + + if (transactionId.compareAndSet(id, nextId)) { + return id; + } + } } } diff --git a/modbus/src/test/java/com/digitalpetri/modbus/client/TransactionSequenceTest.java b/modbus/src/test/java/com/digitalpetri/modbus/client/DefaultTransactionSequenceTest.java similarity index 68% rename from modbus/src/test/java/com/digitalpetri/modbus/client/TransactionSequenceTest.java rename to modbus/src/test/java/com/digitalpetri/modbus/client/DefaultTransactionSequenceTest.java index a430a17..8f6426a 100644 --- a/modbus/src/test/java/com/digitalpetri/modbus/client/TransactionSequenceTest.java +++ b/modbus/src/test/java/com/digitalpetri/modbus/client/DefaultTransactionSequenceTest.java @@ -2,14 +2,14 @@ import static org.junit.jupiter.api.Assertions.assertEquals; -import com.digitalpetri.modbus.client.ModbusTcpClient.TransactionSequence; +import com.digitalpetri.modbus.client.ModbusTcpClient.DefaultTransactionSequence; import org.junit.jupiter.api.Test; -class TransactionSequenceTest { +class DefaultTransactionSequenceTest { @Test void rollover() { - TransactionSequence sequence = new TransactionSequence(); + DefaultTransactionSequence sequence = new DefaultTransactionSequence(); // Assert that transactions are generated in the range [0, 65535] // and that they roll over back to 0.