From 0b956f43164ea87ac7a8a5775fa5a193e9b1ed2a Mon Sep 17 00:00:00 2001 From: dakimura <34202807+dakimura@users.noreply.github.com> Date: Tue, 19 Oct 2021 08:23:45 +0900 Subject: [PATCH] feat(xignite): off_hours_schedule config (#521) --- contrib/xignitefeeder/README.md | 7 ++ contrib/xignitefeeder/configs/config.go | 7 ++ contrib/xignitefeeder/feed/schedule.go | 90 ++++++++++++++ contrib/xignitefeeder/feed/schedule_test.go | 131 ++++++++++++++++++++ contrib/xignitefeeder/xignitefeeder.go | 15 ++- 5 files changed, 249 insertions(+), 1 deletion(-) create mode 100644 contrib/xignitefeeder/feed/schedule.go create mode 100644 contrib/xignitefeeder/feed/schedule_test.go diff --git a/contrib/xignitefeeder/README.md b/contrib/xignitefeeder/README.md index 652c9884..21485651 100644 --- a/contrib/xignitefeeder/README.md +++ b/contrib/xignitefeeder/README.md @@ -43,6 +43,13 @@ bgworkers: # If a non-zero value is set for off_hours_interval, # the data-feeding is executed every off_hours_interval[minute] even when the market is closed. off_hours_interval: 5 + # The data-feeding is executed when 'minute' of the current time matches off_hours_schedule + # even when the market is cloded. Example: "10" -> execute at 00:10, 01:10, 02:10,...,23:10 + # Numbers separated by commas are allowed. Example: "0,15,30,45" -> execute every 15 minutes. + # Whitespaces are ignored. + # If both off_hours_interval and off_hours_schedule are specified at the same time, + # off_hours_interval will be ignored. + off_hours_schedule: "0,15,30,45" # XigniteFeeder runs from openTime ~ closeTime (UTC) openTime: "23:00:00" # 08:00 (JST) closeTime: "06:10:00" # 15:10 (JST) diff --git a/contrib/xignitefeeder/configs/config.go b/contrib/xignitefeeder/configs/config.go index 894c43e3..2e61c393 100644 --- a/contrib/xignitefeeder/configs/config.go +++ b/contrib/xignitefeeder/configs/config.go @@ -36,6 +36,13 @@ type DefaultConfig struct { // If a non-zero value is set for OffHoursInterval, // the data-feeding is executed every offHoursInterval[minute] even when the market is closed. OffHoursInterval int `json:"off_hours_interval"` + // The data-feeding is executed when 'minute' of the current time matches off_hours_schedule + // even when the market is cloded. Example: "10" -> execute at 00:10, 01:10, 02:10,...,23:10 + // Numbers separated by commas are allowed. Example: "0,15,30,45" -> execute every 15 minutes. + // Whitespaces are ignored. + // If both off_hours_interval and off_hours_schedule are specified at the same time, + // off_hours_interval will be ignored. + OffHoursSchedule string `json:"off_hours_schedule"` Backfill struct { Enabled bool `json:"enabled"` Since CustomDay `json:"since"` diff --git a/contrib/xignitefeeder/feed/schedule.go b/contrib/xignitefeeder/feed/schedule.go new file mode 100644 index 00000000..3589f183 --- /dev/null +++ b/contrib/xignitefeeder/feed/schedule.go @@ -0,0 +1,90 @@ +package feed + +import ( + "fmt" + "sort" + "strconv" + "strings" + "time" + + "github.com/alpacahq/marketstore/v4/utils/log" +) + +// ParseSchedule checks comma-separated numbers in a string format +// and returns the list of minutes that data feeding is executed. +// Examples: +// "0,15,30,45" -> [0,15,30,45] (the data feeding must be executed every 15 minutes) +// "50,10" -> [10, 50] +// " 20, 40 " -> [20, 40] (whitespaces are ignored) +// "20" -> [20] (00:10, 01:10, ..., 23:10) +// "100" -> error (minute must be between 0 and 59) +// "One" -> error (numbers must be used) +// "0-10" -> error (range is not supported) +func ParseSchedule(s string) ([]int, error) { + if s == "" { + log.Debug("[xignite] no schedule is set for off_hours") + return []int{}, nil + } + s = strings.ReplaceAll(s, " ", "") + strs := strings.Split(s, ",") + + ret := make([]int, len(strs)) + var err error + for i, m := range strs { + ret[i], err = strconv.Atoi(m) + if err != nil { + return nil, fmt.Errorf("parse %s for scheduling of xignite feeder: %w", m, err) + } + + if ret[i] < 0 || ret[i] >= 60 { + return nil, fmt.Errorf("off_hours_schedule[min] must be between 0 and 59: got=%d", ret[i]) + } + } + + sort.Ints(ret) + return ret, nil +} + +// ScheduledMarketTimeChecker is used where periodic processing is needed to run even when the market is closed. +type ScheduledMarketTimeChecker struct { + MarketTimeChecker + // LastTime holds the last time that IntervalTimeChceker.IsOpen returned true. + LastTime time.Time + ScheduleMin []int +} + +func NewScheduledMarketTimeChecker( + mtc MarketTimeChecker, + scheduleMin []int, +) *ScheduledMarketTimeChecker { + return &ScheduledMarketTimeChecker{ + MarketTimeChecker: mtc, + LastTime: time.Time{}, + ScheduleMin: scheduleMin, + } +} + +// IsOpen returns true when the market is open or the interval elapsed since LastTime. +func (c *ScheduledMarketTimeChecker) IsOpen(t time.Time) bool { + return c.MarketTimeChecker.IsOpen(t) || c.tick(t) +} + +func (c *ScheduledMarketTimeChecker) tick(t time.Time) bool { + m := t.Minute() + for _, sche := range c.ScheduleMin { + if m != sche { + continue + } + + // maximum frequency is once a minute + if t.Sub(c.LastTime) < 1*time.Minute { + continue + } + + log.Debug(fmt.Sprintf("[Xignite Feeder] run data feed based on the schedule: %v(min)", c.ScheduleMin)) + c.LastTime = t + return true + } + + return false +} diff --git a/contrib/xignitefeeder/feed/schedule_test.go b/contrib/xignitefeeder/feed/schedule_test.go new file mode 100644 index 00000000..b0b83f02 --- /dev/null +++ b/contrib/xignitefeeder/feed/schedule_test.go @@ -0,0 +1,131 @@ +package feed_test + +import ( + "reflect" + "testing" + "time" + + "github.com/alpacahq/marketstore/v4/contrib/xignitefeeder/feed" +) + +func TestParseSchedule(t *testing.T) { + t.Parallel() + tests := []struct { + name string + s string + want []int + wantErr bool + }{ + { + name: "\"0,15,30,45\" -> every 15 minutes", + s: "0,15,30,45", + want: []int{0, 15, 30, 45}, + wantErr: false, + }, + { + name: "\"50,10\" -> 10 and 50, numbers are sorted", + s: "50,10", + want: []int{10, 50}, + wantErr: false, + }, + { + name: "whitespaces are ignored", + s: " 20, 40 ", + want: []int{20, 40}, + wantErr: false, + }, + { + name: "no schedule is set", + s: "", + want: []int{}, + wantErr: false, + }, + { + name: "NG/minute must be between [0, 59]", + s: "100", + want: nil, + wantErr: true, + }, + { + name: "NG/range is not supported", + s: "0-10", + want: nil, + wantErr: true, + }, + } + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + got, err := feed.ParseSchedule(tt.s) + if (err != nil) != tt.wantErr { + t.Errorf("ParseSchedule() error = %v, wantErr %v", err, tt.wantErr) + return + } + if !reflect.DeepEqual(got, tt.want) { + t.Errorf("ParseSchedule() got = %v, want %v", got, tt.want) + } + }) + } +} + +func TestScheduledMarketTimeChecker_IsOpen(t *testing.T) { + t.Parallel() + tests := map[string]struct { + MarketTimeChecker feed.MarketTimeChecker + ScheduleMin []int + CurrentTime time.Time + LastTime time.Time + want bool + }{ + "ok: run - 00:15 matches {0} in the schedule": { + MarketTimeChecker: &mockMarketTimeChecker{isOpen: false}, + ScheduleMin: []int{0, 15, 30, 45}, + LastTime: time.Time{}, + CurrentTime: time.Date(2021, 8, 20, 0, 15, 0, 0, time.UTC), + want: true, + }, + "ok: not run - 00:01 does not match any of {0,15,30,45}": { + MarketTimeChecker: &mockMarketTimeChecker{isOpen: false}, + ScheduleMin: []int{0, 15, 30, 45}, + LastTime: time.Time{}, + CurrentTime: time.Date(2021, 8, 20, 0, 1, 0, 0, time.UTC), + want: false, + }, + "ok: not run - run only once per minute": { + MarketTimeChecker: &mockMarketTimeChecker{isOpen: false}, + ScheduleMin: []int{20}, + LastTime: time.Date(2021, 8, 20, 0, 19, 30, 0, time.UTC), + CurrentTime: time.Date(2021, 8, 20, 0, 20, 0, 0, time.UTC), + want: false, + }, + "ok: run - always run when the original market time checker's IsOpen=true": { + MarketTimeChecker: &mockMarketTimeChecker{isOpen: true}, + ScheduleMin: []int{20}, + LastTime: time.Time{}, + CurrentTime: time.Date(2021, 8, 20, 0, 0, 0, 0, time.UTC), + want: true, + }, + } + for name := range tests { + tt := tests[name] + t.Run(name, func(t *testing.T) { + t.Parallel() + // --- given --- + c := feed.NewScheduledMarketTimeChecker( + tt.MarketTimeChecker, + tt.ScheduleMin, + ) + c.LastTime = tt.LastTime + + // --- when --- + got := c.IsOpen(tt.CurrentTime) + + // --- then --- + if got != tt.want { + t.Errorf("IsOpen() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/contrib/xignitefeeder/xignitefeeder.go b/contrib/xignitefeeder/xignitefeeder.go index 5ca1966a..bf9acd04 100644 --- a/contrib/xignitefeeder/xignitefeeder.go +++ b/contrib/xignitefeeder/xignitefeeder.go @@ -40,7 +40,20 @@ func NewBgWorker(conf map[string]interface{}) (bgworker.BgWorker, error) { config.ClosedDays, config.OpenTime, config.CloseTime) - if config.OffHoursInterval != 0 { + if config.OffHoursSchedule != "" { + scheduleMin, err := feed.ParseSchedule(config.OffHoursSchedule) + if err != nil { + return nil, fmt.Errorf("parse off_hours_schedule %s: %w", config.OffHoursSchedule, err) + } + log.Info(fmt.Sprintf("[Xignite Feeder] off_hours_schedule=%s[min] is set. "+ + "The data will be retrieved at %s [minute] even when the market is closed.", + config.OffHoursSchedule, config.OffHoursSchedule), + ) + timeChecker = feed.NewScheduledMarketTimeChecker( + timeChecker, + scheduleMin, + ) + } else if config.OffHoursInterval != 0 { log.Info(fmt.Sprintf("[Xignite Feeder] off_hours_interval=%dmin is set. "+ "The data will be retrieved every %d minutes even when the market is closed.", config.OffHoursInterval, config.OffHoursInterval),