Skip to content

Commit

Permalink
review: add "scheduler" and "nopScheduler" types
Browse files Browse the repository at this point in the history
Instead of using the `dryRun` flag to control whether or not operations
are added to the (standard) scheduler, we now select the type of the
scheduler pass to the timelock worker service:

* if dryRun is false, use the standard scheduler
* if dryRun is true, use the new "nop" scheduler, which only logs the
  calls but does not do anything

In practice the "standard scheduler" is a new type + interface as well,
since the existing implementation defined a the schedule as a simple
data type which was associated with the timelock worker via implicit
composition (though all the schedule related methods were defined on the
timelock worker type).
  • Loading branch information
gustavogama-cll committed Nov 21, 2024
1 parent fc28be7 commit ba738cb
Show file tree
Hide file tree
Showing 7 changed files with 228 additions and 85 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go 1.22
require (
github.com/docker/go-connections v0.5.0
github.com/ethereum/go-ethereum v1.13.15
github.com/google/go-cmp v0.6.0
github.com/prometheus/client_golang v1.19.0
github.com/rs/zerolog v1.31.0
github.com/samber/lo v1.47.0
Expand Down
95 changes: 68 additions & 27 deletions pkg/timelock/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,26 +18,39 @@ import (

type operationKey [32]byte

type Scheduler interface {
runScheduler(ctx context.Context) <-chan struct{}
addToScheduler(op *contract.TimelockCallScheduled)
delFromScheduler(op operationKey)
dumpOperationStore(now func() time.Time)
}

type executeFn func(context.Context, []*contract.TimelockCallScheduled)

// Scheduler represents a scheduler with an in memory store.
// Whenever accesing the map the mutex should be Locked, to prevent
// any race condition.
type scheduler struct {
mu sync.Mutex
ticker *time.Ticker
add chan *contract.TimelockCallScheduled
del chan operationKey
store map[operationKey][]*contract.TimelockCallScheduled
busy bool
mu sync.Mutex
ticker *time.Ticker
add chan *contract.TimelockCallScheduled
del chan operationKey
store map[operationKey][]*contract.TimelockCallScheduled
busy bool
logger *zerolog.Logger
executeFn executeFn
}

// newScheduler returns a new initialized scheduler.
func newScheduler(tick time.Duration) *scheduler {
func newScheduler(tick time.Duration, logger *zerolog.Logger, executeFn executeFn) *scheduler {
s := &scheduler{
ticker: time.NewTicker(tick),
add: make(chan *contract.TimelockCallScheduled),
del: make(chan operationKey),
store: make(map[operationKey][]*contract.TimelockCallScheduled),
busy: false,
ticker: time.NewTicker(tick),
add: make(chan *contract.TimelockCallScheduled),
del: make(chan operationKey),
store: make(map[operationKey][]*contract.TimelockCallScheduled),
busy: false,
logger: logger,
executeFn: executeFn,
}

return s
Expand All @@ -50,7 +63,7 @@ func newScheduler(tick time.Duration) *scheduler {
// call them this way so no process is allowd to add/delete from
// the store, which could cause race conditions like adding/deleting
// while the operation is being executed.
func (tw *Worker) runScheduler(ctx context.Context) <-chan struct{} {
func (tw *scheduler) runScheduler(ctx context.Context) <-chan struct{} {
done := make(chan struct{})

go func() {
Expand All @@ -67,7 +80,7 @@ func (tw *Worker) runScheduler(ctx context.Context) <-chan struct{} {
tw.logger.Debug().Msgf("new scheduler tick: operations in store")
tw.setSchedulerBusy()
for _, op := range tw.store {
tw.execute(ctx, op)
tw.executeFn(ctx, op)
}
tw.setSchedulerFree()
} else {
Expand Down Expand Up @@ -102,7 +115,7 @@ func (tw *Worker) runScheduler(ctx context.Context) <-chan struct{} {
}

// updateSchedulerDelay updates the internal ticker delay, so it can be reconfigured while running.
func (tw *Worker) updateSchedulerDelay(t time.Duration) {
func (tw *scheduler) updateSchedulerDelay(t time.Duration) {
if t <= 0 {
tw.logger.Debug().Msgf("internal min delay not changed, invalid duration: %v", t.String())
return
Expand All @@ -113,44 +126,38 @@ func (tw *Worker) updateSchedulerDelay(t time.Duration) {
}

// addToScheduler adds a new CallSchedule operation safely to the store.
func (tw *Worker) addToScheduler(op *contract.TimelockCallScheduled) {
tw.mu.Lock()
defer tw.mu.Unlock()
func (tw *scheduler) addToScheduler(op *contract.TimelockCallScheduled) {
tw.logger.Debug().Msgf("scheduling operation: %x", op.Id)
tw.add <- op
tw.logger.Debug().Msgf("operations in scheduler: %v", len(tw.store))
}

// delFromScheduler deletes an operation safely from the store.
func (tw *Worker) delFromScheduler(op operationKey) {
tw.mu.Lock()
defer tw.mu.Unlock()
func (tw *scheduler) delFromScheduler(op operationKey) {
tw.logger.Debug().Msgf("de-scheduling operation: %v", op)
tw.del <- op
tw.logger.Debug().Msgf("operations in scheduler: %v", len(tw.store))
}

func (tw *Worker) setSchedulerBusy() {
func (tw *scheduler) setSchedulerBusy() {
tw.logger.Debug().Msgf("setting scheduler busy")
tw.mu.Lock()
tw.busy = true
tw.mu.Unlock()
}

func (tw *Worker) setSchedulerFree() {
func (tw *scheduler) setSchedulerFree() {
tw.logger.Debug().Msgf("setting scheduler free")
tw.mu.Lock()
tw.busy = false
tw.mu.Unlock()
}

func (tw *Worker) isSchedulerBusy() bool {
func (tw *scheduler) isSchedulerBusy() bool {
return tw.busy
}

// dumpOperationStore dumps to the logger and to the log file the current scheduled unexecuted operations.
// maps in go don't guarantee order, so that's why we have to find the earliest block.
func (tw *Worker) dumpOperationStore(now func() time.Time) {
func (tw *scheduler) dumpOperationStore(now func() time.Time) {
if len(tw.store) <= 0 {
tw.logger.Info().Msgf("no operations to dump")
return
Expand Down Expand Up @@ -253,3 +260,37 @@ func toEarliestRecord(op *contract.TimelockCallScheduled) string {
func toSubsequentRecord(op *contract.TimelockCallScheduled) string {
return fmt.Sprintf("CallSchedule pending ID: %x\tBlock Number: %v\n", op.Id, op.Raw.BlockNumber)
}

// ----- nop scheduler -----
// nopScheduler implements the Scheduler interface but doesn't not effectively trigger any operations.
type nopScheduler struct {
logger *zerolog.Logger
}

func newNopScheduler(logger *zerolog.Logger) *nopScheduler {
return &nopScheduler{logger: logger}
}

func (s *nopScheduler) runScheduler(ctx context.Context) <-chan struct{} {
s.logger.Info().Msg("nop.runScheduler")
ch := make(chan struct{})

go func() {
<-ctx.Done()
close(ch)
}()

return ch
}

func (s *nopScheduler) addToScheduler(op *contract.TimelockCallScheduled) {
s.logger.Info().Any("op", op).Msg("nop.addToScheduler")
}

func (s *nopScheduler) delFromScheduler(key operationKey) {
s.logger.Info().Any("key", key).Msg("nop.delFromScheduler")
}

func (s *nopScheduler) dumpOperationStore(now func() time.Time) {
s.logger.Info().Msg("nop.dumpOperationStore")
}
Loading

0 comments on commit ba738cb

Please sign in to comment.