diff --git a/dledger/src/main/java/io/openmessaging/storage/dledger/DLedgerEntryPusher.java b/dledger/src/main/java/io/openmessaging/storage/dledger/DLedgerEntryPusher.java index 37ed5564..322f663d 100644 --- a/dledger/src/main/java/io/openmessaging/storage/dledger/DLedgerEntryPusher.java +++ b/dledger/src/main/java/io/openmessaging/storage/dledger/DLedgerEntryPusher.java @@ -218,14 +218,19 @@ public boolean completeResponseFuture(final ApplyEntry task) { */ public void checkResponseFuturesTimeout(final long beginIndex) { final long term = this.memberState.currTerm(); + long maxIndex = this.memberState.getCommittedIndex() + dLedgerConfig.getMaxPendingRequestsNum() + 1; + if (maxIndex > this.memberState.getLedgerEndIndex()) { + maxIndex = this.memberState.getLedgerEndIndex() + 1; + } ConcurrentMap closureMap = this.pendingClosure.get(term); - if (closureMap != null) { - for (long i = beginIndex; i < Integer.MAX_VALUE; i++) { + if (closureMap != null && closureMap.size() > 0) { + for (long i = beginIndex; i < maxIndex; i++) { Closure closure = closureMap.get(i); if (closure == null) { - break; + // index may be removed for complete, we should continue scan } else if (closure.isTimeOut()) { closure.done(Status.error(DLedgerResponseCode.WAIT_QUORUM_ACK_TIMEOUT)); + closureMap.remove(i); } else { break; } @@ -254,6 +259,7 @@ private class QuorumAckChecker extends ShutdownAbleThread { private long lastPrintWatermarkTimeMs = System.currentTimeMillis(); private long lastCheckLeakTimeMs = System.currentTimeMillis(); + private long lastCheckTimeoutTimeMs = System.currentTimeMillis(); public QuorumAckChecker(Logger logger) { super("QuorumAckChecker-" + memberState.getSelfId(), logger); @@ -267,10 +273,7 @@ public void doWork() { memberState.getSelfId(), memberState.getRole(), memberState.currTerm(), dLedgerStore.getLedgerBeforeBeginIndex(), dLedgerStore.getLedgerEndIndex(), memberState.getCommittedIndex(), JSON.toJSONString(peerWaterMarksByTerm), memberState.getAppliedIndex()); lastPrintWatermarkTimeMs = System.currentTimeMillis(); } - if (!memberState.isLeader()) { - waitForRunning(1); - return; - } + long currTerm = memberState.currTerm(); checkTermForPendingMap(currTerm, "QuorumAckChecker"); checkTermForWaterMark(currTerm, "QuorumAckChecker"); @@ -303,9 +306,15 @@ public void doWork() { checkResponseFuturesElapsed(DLedgerEntryPusher.this.memberState.getAppliedIndex()); lastCheckLeakTimeMs = System.currentTimeMillis(); } - - // clear the timeout pending closure which index > appliedIndex - checkResponseFuturesTimeout(DLedgerEntryPusher.this.memberState.getAppliedIndex() + 1); + if (DLedgerUtils.elapsed(lastCheckTimeoutTimeMs) > 1000) { + // clear the timeout pending closure should check all since it can timeout for different index + checkResponseFuturesTimeout(DLedgerEntryPusher.this.memberState.getAppliedIndex() + 1); + lastCheckTimeoutTimeMs = System.currentTimeMillis(); + } + if (!memberState.isLeader()) { + waitForRunning(1); + return; + } // update peer watermarks of self updatePeerWaterMark(currTerm, memberState.getSelfId(), dLedgerStore.getLedgerEndIndex()); diff --git a/dledger/src/test/java/io/openmessaging/storage/dledger/AppendAndPushTest.java b/dledger/src/test/java/io/openmessaging/storage/dledger/AppendAndPushTest.java index af91d86f..f35c44c6 100644 --- a/dledger/src/test/java/io/openmessaging/storage/dledger/AppendAndPushTest.java +++ b/dledger/src/test/java/io/openmessaging/storage/dledger/AppendAndPushTest.java @@ -33,6 +33,8 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; + +import org.awaitility.core.AssertionCondition; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import org.mockito.Mockito; @@ -88,7 +90,7 @@ public void testPushNetworkOffline() throws Exception { futures.add(future); } Assertions.assertEquals(9, dLedgerServer0.getDLedgerStore().getLedgerEndIndex()); - Thread.sleep(dLedgerServer0.getDLedgerConfig().getMaxWaitAckTimeMs() + 100); + Thread.sleep(dLedgerServer0.getDLedgerConfig().getMaxWaitAckTimeMs() + 1000); for (int i = 0; i < futures.size(); i++) { CompletableFuture future = futures.get(i); Assertions.assertTrue(future.isDone()); @@ -96,7 +98,7 @@ public void testPushNetworkOffline() throws Exception { } boolean hasWait = false; - for (int i = 0; i < dLedgerServer0.getDLedgerConfig().getMaxPendingRequestsNum(); i++) { + for (int i = 0; i < dLedgerServer0.getDLedgerConfig().getMaxPendingRequestsNum()+2; i++) { AppendEntryRequest appendEntryRequest = new AppendEntryRequest(); appendEntryRequest.setGroup(group); appendEntryRequest.setRemoteId(dLedgerServer0.getMemberState().getSelfId()); diff --git a/dledger/src/test/java/io/openmessaging/storage/dledger/BatchPushTest.java b/dledger/src/test/java/io/openmessaging/storage/dledger/BatchPushTest.java index 3fa5febc..07f7a55e 100644 --- a/dledger/src/test/java/io/openmessaging/storage/dledger/BatchPushTest.java +++ b/dledger/src/test/java/io/openmessaging/storage/dledger/BatchPushTest.java @@ -126,7 +126,7 @@ public void testBatchPushNetworkOffline() throws Exception { futures.add(future); } Assertions.assertEquals(9, dLedgerServer0.getDLedgerStore().getLedgerEndIndex()); - Thread.sleep(dLedgerServer0.getDLedgerConfig().getMaxWaitAckTimeMs() + 100); + Thread.sleep(dLedgerServer0.getDLedgerConfig().getMaxWaitAckTimeMs() + 1000); for (int i = 0; i < futures.size(); i++) { CompletableFuture future = futures.get(i); Assertions.assertTrue(future.isDone()); @@ -134,7 +134,7 @@ public void testBatchPushNetworkOffline() throws Exception { } boolean hasWait = false; - for (int i = 0; i < dLedgerServer0.getDLedgerConfig().getMaxPendingRequestsNum(); i++) { + for (int i = 0; i < dLedgerServer0.getDLedgerConfig().getMaxPendingRequestsNum() + 2; i++) { AppendEntryRequest appendEntryRequest = new AppendEntryRequest(); appendEntryRequest.setGroup(group); appendEntryRequest.setRemoteId(dLedgerServer0.getMemberState().getSelfId()); diff --git a/dledger/src/test/java/io/openmessaging/storage/dledger/ServerTestHarness.java b/dledger/src/test/java/io/openmessaging/storage/dledger/ServerTestHarness.java index 2fcfd84f..c5a6879a 100644 --- a/dledger/src/test/java/io/openmessaging/storage/dledger/ServerTestHarness.java +++ b/dledger/src/test/java/io/openmessaging/storage/dledger/ServerTestHarness.java @@ -67,6 +67,7 @@ protected synchronized DLedgerServer launchServer(String group, String peers, St config.setEnableLeaderElector(false); config.setEnableDiskForceClean(false); config.setDiskSpaceRatioToForceClean(0.90f); + config.setMaxPendingRequestsNum(1000); DLedgerServer dLedgerServer = new DLedgerServer(config); MemberState memberState = dLedgerServer.getMemberState(); memberState.setCurrTermForTest(0);