diff --git a/internal/confirmations/confirmed_block_listener.go b/internal/confirmations/confirmed_block_listener.go index a03b245..bd00450 100644 --- a/internal/confirmations/confirmed_block_listener.go +++ b/internal/confirmations/confirmed_block_listener.go @@ -194,12 +194,12 @@ func (cbl *confirmedBlockListener) processBlockNotification(block *apitypes.Bloc var dispatchHead *apitypes.BlockInfo if len(cbl.newHeadToAdd) > 0 { // we've snuck in multiple notifications while the dispatcher is busy... don't add indefinitely to this list - if len(cbl.newHeadToAdd) > 10 /* not considered worth adding/explaining a tuning property for this */ { + if len(cbl.newHeadToAdd) >= 10 /* not considered worth adding/explaining a tuning property for this */ { log.L(cbl.ctx).Infof("Block listener fell behind head of chain") - cbl.newHeadToAdd = nil - } else { - dispatchHead = cbl.newHeadToAdd[len(cbl.newHeadToAdd)-1] + // Nothing more we can do in this function until it catches up - we just have to discard the notification + return } + dispatchHead = cbl.newHeadToAdd[len(cbl.newHeadToAdd)-1] } if dispatchHead == nil && len(cbl.blocksSinceCheckpoint) > 0 { dispatchHead = cbl.blocksSinceCheckpoint[len(cbl.blocksSinceCheckpoint)-1] diff --git a/internal/confirmations/confirmed_block_listener_test.go b/internal/confirmations/confirmed_block_listener_test.go index 3c3f39c..65164e8 100644 --- a/internal/confirmations/confirmed_block_listener_test.go +++ b/internal/confirmations/confirmed_block_listener_test.go @@ -15,6 +15,7 @@ package confirmations import ( + "context" "fmt" "strings" "sync/atomic" @@ -338,38 +339,39 @@ func TestCBLDispatcherFallsBehindHead(t *testing.T) { mbiHash := mca.On("BlockInfoByHash", mock.Anything, mock.Anything) mbiHash.Run(func(args mock.Arguments) { mockBlockHashReturn(mbiHash, args, blocks) }) - // We'll fall back to this because we don't keep up - mbiNum := mca.On("BlockInfoByNumber", mock.Anything, mock.Anything) - mbiNum.Run(func(args mock.Arguments) { mockBlockNumberReturn(mbiNum, args, blocks) }) - bcm.requiredConfirmations = 5 + + // Start a CBL, but then cancel the dispatcher cbl, err := bcm.startConfirmedBlockListener(bcm.ctx, id, "", nil, esDispatch) assert.NoError(t, err) + cbl.cancelFunc() + <-cbl.dispatcherDone + <-cbl.processorDone + + // Restart just the processor + cbl.ctx, cbl.cancelFunc = context.WithCancel(bcm.ctx) + cbl.processorDone = make(chan struct{}) + go cbl.notificationProcessor() - // Notify all the blocks before we process any + // Notify all the blocks for i := 0; i < len(blocks); i++ { bcm.propagateBlockHashToCBLs(&ffcapi.BlockHashEvent{ BlockHashes: []string{blocks[i].BlockHash}, }) } - for i := 0; i < len(blocks)-bcm.requiredConfirmations; i++ { - // The dispatches should have been added, until it got too far ahead - // and then set to nil. - for cbl.newHeadToAdd != nil { - time.Sleep(1 * time.Millisecond) - } - b := <-esDispatch - assert.Equal(t, b.BlockEvent.BlockNumber.Uint64(), blocks[i].BlockNumber.Uint64()) - assert.Equal(t, b.BlockEvent.BlockInfo, blocks[i].BlockInfo) - } + // Wait until there's a first tap to the dispatcher, meaning + // dispatches should have been added to the newHeadToAdd list + <-cbl.dispatcherTap - time.Sleep(1 * time.Millisecond) - assert.Len(t, cbl.blocksSinceCheckpoint, bcm.requiredConfirmations) - select { - case <-esDispatch: - assert.Fail(t, "should not have received block in confirmation window") - default: // good - we should have the confirmations sat there, but no dispatch + // ... until it got too far ahead and then set to nil. + checkHeadLen := func() int { + cbl.stateLock.Lock() + defer cbl.stateLock.Unlock() + return len(cbl.newHeadToAdd) + } + for checkHeadLen() < 10 { + time.Sleep(1 * time.Millisecond) } bcm.Stop()