diff --git a/VERSION b/VERSION index c006218557f..879be8a98fc 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -0.7.6 +0.7.7 diff --git a/core/adapters/eth_tx.go b/core/adapters/eth_tx.go index 440715693b8..89441da7894 100644 --- a/core/adapters/eth_tx.go +++ b/core/adapters/eth_tx.go @@ -93,7 +93,7 @@ func createTxRunResult( gasLimit, ) if IsClientRetriable(err) { - return models.NewRunOutputPendingConnection() + return pendingConfirmationsOrConnection(input) } else if err != nil { return models.NewRunOutputError(err) } @@ -191,6 +191,11 @@ func addReceiptToResult( } } + if receipt == nil { + err := errors.New("missing receipt for transaction") + return models.NewRunOutputError(err) + } + receipts = append(receipts, *receipt) var err error data, err = data.Add("ethereumReceipts", receipts) diff --git a/core/adapters/eth_tx_abi_encode.go b/core/adapters/eth_tx_abi_encode.go index c7371b6bffe..340f5df15ca 100644 --- a/core/adapters/eth_tx_abi_encode.go +++ b/core/adapters/eth_tx_abi_encode.go @@ -65,7 +65,7 @@ func (etx *EthTxABIEncode) UnmarshalJSON(data []byte) error { // the blockchain. func (etx *EthTxABIEncode) Perform(input models.RunInput, store *strpkg.Store) models.RunOutput { if !store.TxManager.Connected() { - return models.NewRunOutputPendingConnection() + return pendingConfirmationsOrConnection(input) } if !input.Status().PendingConfirmations() { data, err := etx.abiEncode(&input) diff --git a/core/adapters/eth_tx_internal_test.go b/core/adapters/eth_tx_internal_test.go new file mode 100644 index 00000000000..dd31a857607 --- /dev/null +++ b/core/adapters/eth_tx_internal_test.go @@ -0,0 +1,20 @@ +package adapters + +import ( + "chainlink/core/store/models" + "github.com/stretchr/testify/assert" + "testing" +) + +// In pathological cases, the receipt can be nil. +// Need to ensure we don't panic in this case and return errored output instead +func TestEthTxAdapter_addReceiptToResult(t *testing.T) { + t.Parallel() + + j := models.JSON{} + input := *models.NewRunInput(models.NewID(), j, models.RunStatusUnstarted) + + output := addReceiptToResult(nil, input, j) + assert.True(t, output.HasError()) + assert.EqualError(t, output.Error(), "missing receipt for transaction") +} diff --git a/core/services/synchronization/stats_pusher.go b/core/services/synchronization/stats_pusher.go index 3b67a4b7563..911cd0be443 100644 --- a/core/services/synchronization/stats_pusher.go +++ b/core/services/synchronization/stats_pusher.go @@ -125,7 +125,6 @@ func (sp *statsPusher) Close() error { // PushNow wakes up the stats pusher, asking it to push all queued events immediately. func (sp *statsPusher) PushNow() { - logger.Debug("PushNow") select { case sp.waker <- struct{}{}: default: diff --git a/core/store/tx_manager.go b/core/store/tx_manager.go index 09d2f640ccd..18ccdf172a5 100644 --- a/core/store/tx_manager.go +++ b/core/store/tx_manager.go @@ -6,6 +6,7 @@ import ( "math/big" "regexp" "sync" + "time" "github.com/pkg/errors" @@ -29,7 +30,12 @@ import ( // if updating DefaultGasLimit, be sure it matches with the // DefaultGasLimit specified in evm/test/Oracle_test.js const DefaultGasLimit uint64 = 500000 -const nonceReloadLimit int = 1 + +// Linear backoff is used so worst-case transaction time increases quadratically with this number +const nonceReloadLimit int = 3 + +// The base time for the backoff +const nonceReloadBackoffBaseTime = 3 * time.Second // ErrPendingConnection is the error returned if TxManager is not connected. var ErrPendingConnection = errors.New("Cannot talk to chain, pending connection") @@ -230,7 +236,7 @@ func (txm *EthTxManager) createTx( gasLimit uint64, value *assets.Eth) (*models.Tx, error) { - for nrc := 0; nrc <= nonceReloadLimit; nrc++ { + for nrc := 0; nrc < nonceReloadLimit+1; nrc++ { tx, err := txm.sendInitialTx(surrogateID, ma, to, data, gasPriceWei, gasLimit, value) if err == nil { return tx, nil @@ -241,8 +247,16 @@ func (txm *EthTxManager) createTx( } logger.Warnw( - "Tx #0: nonce too low, retrying with network nonce", - "nonce", tx.Nonce, "error", err.Error(), + "Tx #0: another tx with this nonce already exists, will retry with network nonce", + "nonce", tx.Nonce, "gasPriceWei", gasPriceWei, "gasLimit", gasLimit, "error", err.Error(), + ) + + // Linear backoff + time.Sleep(time.Duration(nrc+1) * nonceReloadBackoffBaseTime) + + logger.Warnw( + "Tx #0: another tx with this nonce already exists, retrying with network nonce", + "nonce", tx.Nonce, "gasPriceWei", gasPriceWei, "gasLimit", gasLimit, "error", err.Error(), ) err = ma.ReloadNonce(txm) @@ -313,9 +327,10 @@ func (txm *EthTxManager) sendInitialTx( } var ( - nonceTooLowRegex = regexp.MustCompile("(nonce .*too low|same hash was already imported)") + nonceTooLowRegex = regexp.MustCompile("(nonce .*too low|same hash was already imported|replacement transaction underpriced)") ) +// FIXME: There are probably other types of errors here that are symptomatic of a nonce that is too low func isNonceTooLowError(err error) bool { return err != nil && nonceTooLowRegex.MatchString(err.Error()) } @@ -553,6 +568,7 @@ func (txm *EthTxManager) processAttempt( attemptIndex int, blockHeight uint64, ) (*eth.TxReceipt, AttemptState, error) { + jobRunID := tx.SurrogateID.ValueOrZero() txAttempt := tx.Attempts[attemptIndex] receipt, state, err := txm.CheckAttempt(txAttempt, blockHeight) @@ -570,6 +586,7 @@ func (txm *EthTxManager) processAttempt( "receiptBlockNumber", receipt.BlockNumber.ToInt(), "currentBlockNumber", blockHeight, "receiptHash", receipt.Hash.Hex(), + "jobRunId", jobRunID, ) return receipt, state, nil @@ -582,6 +599,7 @@ func (txm *EthTxManager) processAttempt( "txAttemptLimit", attemptLimit, "txHash", txAttempt.Hash.String(), "txID", txAttempt.TxID, + "jobRunId", jobRunID, ) return receipt, state, nil } @@ -592,6 +610,7 @@ func (txm *EthTxManager) processAttempt( "txHash", txAttempt.Hash.String(), "txID", txAttempt.TxID, "currentBlockNumber", blockHeight, + "jobRunId", jobRunID, ) err = txm.bumpGas(tx, attemptIndex, blockHeight) } else { @@ -599,6 +618,7 @@ func (txm *EthTxManager) processAttempt( fmt.Sprintf("Tx #%d is %s", attemptIndex, state), "txHash", txAttempt.Hash.String(), "txID", txAttempt.TxID, + "jobRunId", jobRunID, ) } @@ -609,6 +629,7 @@ func (txm *EthTxManager) processAttempt( fmt.Sprintf("Tx #%d is %s, error fetching receipt", attemptIndex, state), "txHash", txAttempt.Hash.String(), "txID", txAttempt.TxID, + "jobRunId", jobRunID, "error", err, ) return nil, Unknown, errors.Wrap(err, "processAttempt CheckAttempt failed") diff --git a/core/store/tx_manager_test.go b/core/store/tx_manager_test.go index a642385f890..0e7bd946d30 100644 --- a/core/store/tx_manager_test.go +++ b/core/store/tx_manager_test.go @@ -270,6 +270,7 @@ func TestTxManager_CreateTx_NonceTooLowReloadSuccess(t *testing.T) { ethClientErrorMsg string }{ {"geth", "nonce too low"}, + {"geth", "replacement transaction underpriced"}, {"parity", "Transaction nonce is too low. Try incrementing the nonce"}, {"parity", "Transaction with the same hash was already imported"}, } @@ -338,12 +339,12 @@ func TestTxManager_CreateTx_NonceTooLowReloadLimit(t *testing.T) { err = manager.Connect(cltest.Head(nonce)) require.NoError(t, err) - ethClient.On("SendRawTx", mock.Anything).Twice().Return(nil, errors.New("nonce is too low")) + ethClient.On("SendRawTx", mock.Anything).Times(4).Return(nil, errors.New("nonce is too low")) to := cltest.NewAddress() data := hexutil.MustDecode("0x0000abcdef") _, err = manager.CreateTx(to, data) - assert.EqualError(t, err, "Transaction reattempt limit reached for 'nonce is too low' error. Limit: 1") + assert.EqualError(t, err, "Transaction reattempt limit reached for 'nonce is too low' error. Limit: 3") ethClient.AssertExpectations(t) }