Skip to content

Commit

Permalink
Polish and refactor the main timelock loop
Browse files Browse the repository at this point in the history
Signed-off-by: Francisco de Borja Aranda Castillejo <borja.aranda@smartcontract.com>
  • Loading branch information
Francisco de Borja Aranda Castillejo committed Apr 29, 2024
1 parent 05dce96 commit 65ca003
Show file tree
Hide file tree
Showing 6 changed files with 165 additions and 104 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/smartcontractkit/timelock-worker

go 1.20
go 1.22

require (
github.com/ethereum/go-ethereum v1.13.14
Expand Down
44 changes: 44 additions & 0 deletions go.sum

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions pkg/timelock/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import "time"

const (
defaultSchedulerDelay time.Duration = 15 * time.Minute
maxSubRetries int = 5

eventCallScheduled string = "CallScheduled"
eventCallExecuted string = "CallExecuted"
Expand Down
62 changes: 32 additions & 30 deletions pkg/timelock/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,43 +49,45 @@ func newScheduler(tick time.Duration) *scheduler {
// the store, which could cause race conditions like adding/deleting
// while the operation is being executed.
func (tw *Worker) runScheduler(ctx context.Context) {
for {
select {
case <-tw.ticker.C:
if len(tw.store) <= 0 {
tw.logger.Debug().Msgf("new scheduler tick: no operations in store")
continue
}

if !tw.isSchedulerBusy() {
tw.logger.Debug().Msgf("new scheduler tick: operations in store")
tw.setSchedulerBusy()
for _, op := range tw.store {
tw.execute(ctx, op)
go func() {
for {
select {
case <-tw.ticker.C:
if len(tw.store) <= 0 {
tw.logger.Debug().Msgf("new scheduler tick: no operations in store")
continue
}
tw.setSchedulerFree()
} else {
tw.logger.Debug().Msgf("new scheduler tick: scheduler is busy, skipping until next tick")
}

case op := <-tw.add:
tw.mu.Lock()
if len(tw.store[op.Id]) <= int(op.Index.Int64()) {
tw.store[op.Id] = append(tw.store[op.Id], op)
}
tw.store[op.Id][op.Index.Int64()] = op
tw.mu.Unlock()
tw.logger.Debug().Msgf("scheduled operation: %x", op.Id)
if !tw.isSchedulerBusy() {
tw.logger.Debug().Msgf("new scheduler tick: operations in store")
tw.setSchedulerBusy()
for _, op := range tw.store {
tw.execute(ctx, op)
}
tw.setSchedulerFree()
} else {
tw.logger.Debug().Msgf("new scheduler tick: scheduler is busy, skipping until next tick")
}

case op := <-tw.del:
if _, ok := tw.store[op]; ok {
case op := <-tw.add:
tw.mu.Lock()
delete(tw.store, op)
if len(tw.store[op.Id]) <= int(op.Index.Int64()) {
tw.store[op.Id] = append(tw.store[op.Id], op)
}
tw.store[op.Id][op.Index.Int64()] = op
tw.mu.Unlock()
tw.logger.Debug().Msgf("de-scheduled operation: %x", op)
tw.logger.Debug().Msgf("scheduled operation: %x", op.Id)

case op := <-tw.del:
if _, ok := tw.store[op]; ok {
tw.mu.Lock()
delete(tw.store, op)
tw.mu.Unlock()
tw.logger.Debug().Msgf("de-scheduled operation: %x", op)
}
}
}
}
}()
}

// updateSchedulerDelay updates the internal ticker delay, so it can be reconfigured while running.
Expand Down
138 changes: 87 additions & 51 deletions pkg/timelock/timelock.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ type Worker struct {
ethClient *ethclient.Client
contract *contract.Timelock
executeContract *contract.Timelock
ABI *abi.ABI
abi *abi.ABI
address []common.Address
fromBlock *big.Int
pollPeriod int64
Expand Down Expand Up @@ -113,7 +113,7 @@ func NewTimelockWorker(nodeURL, timelockAddress, callProxyAddress, privateKey st
ethClient: ethClient,
contract: timelockContract,
executeContract: executeContract,
ABI: timelockABI,
abi: timelockABI,
address: []common.Address{common.HexToAddress(timelockAddress)},
fromBlock: fromBlock,
pollPeriod: pollPeriod,
Expand All @@ -132,63 +132,81 @@ func (tw *Worker) Listen(ctx context.Context) error {
return err
}

tw.startLog()

// Handle OS signals to properly stop/kill the process.
stopCh := make(chan string)
logCh := make(chan types.Log)
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM)
go handleOSSignal(stopCh)

// Update timelock-worker default scheduler delay.
tw.updateSchedulerDelay(time.Duration(tw.pollPeriod) * time.Second)

// Run the scheduler to add/del operations in a thread-safe way.
go tw.runScheduler(ctx)
tw.startLog()

// Shutdown gracefully based on OS interrupts.
go func() {
for {
handleOSSignal(<-sigCh, stopCh)
}
}()
// Initialize the subscription.
logCh := make(chan types.Log)
if err := tw.subscribeAndProcessLogs(ctx, logCh, stopCh); err != nil {
tw.logger.Error().Err(err).Msg("failed to subscribe and process logs.")
return err
}

defer close(stopCh)
defer close(logCh)
defer ctx.Done()
defer tw.dumpOperationStore(time.Now)

return nil
}

// FilterQuery to be feed to the subscription and FilterLogs.
query := ethereum.FilterQuery{
func (tw *Worker) setupFilterQuery() ethereum.FilterQuery {
return ethereum.FilterQuery{
Addresses: tw.address,
FromBlock: tw.fromBlock,
}
}

tw.logger.Info().Msgf("Starting subscription")
// Create the new subscription with the predefined query.
// subscribeAndProcessLogs creates a subscription and processes logs based on a filter query.
func (tw *Worker) subscribeAndProcessLogs(ctx context.Context, logCh chan types.Log, stopCh chan string) error {
query := tw.setupFilterQuery()

// SubscribeFilterLogs creates an asynchronous subscription to the events.
// It receives all the new events.
tw.logger.Info().Msg("starting subscription")
sub, err := tw.ethClient.SubscribeFilterLogs(ctx, query, logCh)
if err != nil {
return err
}
defer sub.Unsubscribe()

// Read events by FilterLogs. This method calls eth_getLogs under the hood.
// FilterLogs starts filtering the logs at fromBlock, gathering the historical data.
// This is needed to guarantee that all the events are gathered, even in scenarios where the worker crashes.
filter, err := tw.ethClient.FilterLogs(ctx, query)
if err != nil {
return err
}

// Process incoming historical logs in a separate goroutine.
wg.Add(1)
go func() {
for _, l := range filter {
logCh <- l
for _, log := range filter {
logCh <- log
}
wg.Done()
}()

// Setting readyStatus here because we want to make sure subscription is up.
tw.logger.Info().Msgf("Initial subscription complete")
SetReadyStatus(HealthStatusOK)

// This is the goroutine watching over the subscription.
// We want wg.Done() to cancel the whole execution, so don't add more than 1 to wg.
// Also, when receiving an event that creates an error, skip the event and
// continue processing the rest, as an external operator can cancel the faulty event.
loop := true
wg.Add(1)
go func() {
for loop {
MainLoop:
for {
select {
case log := <-logCh:
// Decode the log into an event using the ABI exposed in Timelock.go
event, err := tw.ABI.EventByID(log.Topics[0])
event, err := tw.abi.EventByID(log.Topics[0])
if err != nil {
continue
}
Expand Down Expand Up @@ -240,54 +258,72 @@ func (tw *Worker) Listen(ctx context.Context) error {
// Check if the error is not nil, because sub.Unsubscribe will
// signal the channel sub.Err() to close it, leading to false nil errors.
if err != nil {
tw.logger.Info().Msgf("subscription: %s", err.Error())
tw.logger.Warn().Msgf("subscription error: %s", err.Error())
SetReadyStatus(HealthStatusError)
loop = false
sub.Unsubscribe()

success := false
for try := range maxSubRetries {

Check failure on line 266 in pkg/timelock/timelock.go

View workflow job for this annotation

GitHub Actions / Golang Lint

cannot range over maxSubRetries (constant 5 of type int) (typecheck)
tw.logger.Warn().Msgf("trying to re-create subscription: %v/%v retry.", try+1, maxSubRetries)
sub, err = tw.ethClient.SubscribeFilterLogs(ctx, query, logCh)
if err == nil {
tw.logger.Info().Msg("subscription successfully recreated.")
SetReadyStatus(HealthStatusOK)
break
}

time.Sleep(time.Second * time.Duration(try))
}

if !success {
tw.logger.Error().Msg("failed to recreate subscription after retries: shutting down timelock-worker.")
break MainLoop
}
}

case signal := <-stopCh:
tw.logger.Info().Msgf("received OS signal %s", signal)
SetReadyStatus(HealthStatusError)
loop = false
break MainLoop
}
}
wg.Done()
}()
wg.Wait()

// Close in this specific order to avoid runtime panics,
// or memory leaks.
defer close(sigCh)
defer close(stopCh)
defer close(logCh)
defer sub.Unsubscribe()
defer ctx.Done()

tw.dumpOperationStore(time.Now)
wg.Wait()

return nil
}

// handleOSSignal handles SIGINT and SIGTERM OS signals, and signals the stopCh.
func handleOSSignal(signal os.Signal, stopCh chan string) {
switch signal {
case syscall.SIGINT:
stopCh <- syscall.SIGINT.String()
case syscall.SIGTERM:
stopCh <- syscall.SIGTERM.String()
}
}

func (tw *Worker) startLog() {
tw.logger.Info().Msgf("timelock-worker started")
tw.logger.Info().Msgf("\tTimelock contract address: %v", tw.address[0])

wallet, err := privateKeyToAddress(tw.privateKey)
if err != nil {
tw.logger.Info().Msgf("\tEOA address: unable to determine")
tw.logger.Error().Msgf("\tEOA address: unable to determine")
} else {
tw.logger.Info().Msgf("\tEOA address: %v", wallet)
}

tw.logger.Info().Msgf("\tEOA address: %v", wallet)
tw.logger.Info().Msgf("\tStarting from block: %v", tw.fromBlock)
tw.logger.Info().Msgf("\tPoll Period: %v", time.Duration(tw.pollPeriod*int64(time.Second)).String())
}

// handleOSSignal handles SIGINT and SIGTERM OS signals, and signals the stopCh.
func handleOSSignal(stopCh chan string) {
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
defer close(sigCh)

// Block and wait until a system signal happens.
signal := <-sigCh

// In the future SIGHUP can be used to reload configuration.
switch signal {
case syscall.SIGINT:
stopCh <- syscall.SIGINT.String()
case syscall.SIGTERM:
stopCh <- syscall.SIGTERM.String()
}
}
22 changes: 0 additions & 22 deletions pkg/timelock/timelock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,7 @@ package timelock

import (
"math/big"
"os"
"reflect"
"sync"
"syscall"
"testing"

"github.com/rs/zerolog"
Expand Down Expand Up @@ -173,25 +170,6 @@ func TestNewTimelockWorker(t *testing.T) {
}
}

func Test_handleOSSignal(t *testing.T) {
stopCh := make(chan string)
sigCh := make(chan os.Signal, 1)
var testWg sync.WaitGroup

testWg.Add(1)
go func() {
for {
handleOSSignal(<-sigCh, stopCh)
}
}()

sigCh <- syscall.SIGTERM
assert.Equal(t, <-stopCh, "terminated", "send SIGTERM, receive terminated")

sigCh <- syscall.SIGINT
assert.Equal(t, <-stopCh, "interrupt", "send SIGINT, receive interrupt")
}

func TestWorker_startLog(t *testing.T) {
testWorker := newTestTimelockWorker(t, testNodeURL, testTimelockAddress, testCallProxyAddress, testPrivateKey, testFromBlock, int64(testPollPeriod), testLogger)

Expand Down

0 comments on commit 65ca003

Please sign in to comment.