Skip to content

Commit

Permalink
feat(xignite): off_hours_schedule config (#521)
Browse files Browse the repository at this point in the history
  • Loading branch information
dakimura authored Oct 18, 2021
1 parent 00eee5f commit 0b956f4
Show file tree
Hide file tree
Showing 5 changed files with 249 additions and 1 deletion.
7 changes: 7 additions & 0 deletions contrib/xignitefeeder/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,13 @@ bgworkers:
# If a non-zero value is set for off_hours_interval,
# the data-feeding is executed every off_hours_interval[minute] even when the market is closed.
off_hours_interval: 5
# The data-feeding is executed when 'minute' of the current time matches off_hours_schedule
# even when the market is cloded. Example: "10" -> execute at 00:10, 01:10, 02:10,...,23:10
# Numbers separated by commas are allowed. Example: "0,15,30,45" -> execute every 15 minutes.
# Whitespaces are ignored.
# If both off_hours_interval and off_hours_schedule are specified at the same time,
# off_hours_interval will be ignored.
off_hours_schedule: "0,15,30,45"
# XigniteFeeder runs from openTime ~ closeTime (UTC)
openTime: "23:00:00" # 08:00 (JST)
closeTime: "06:10:00" # 15:10 (JST)
Expand Down
7 changes: 7 additions & 0 deletions contrib/xignitefeeder/configs/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,13 @@ type DefaultConfig struct {
// If a non-zero value is set for OffHoursInterval,
// the data-feeding is executed every offHoursInterval[minute] even when the market is closed.
OffHoursInterval int `json:"off_hours_interval"`
// The data-feeding is executed when 'minute' of the current time matches off_hours_schedule
// even when the market is cloded. Example: "10" -> execute at 00:10, 01:10, 02:10,...,23:10
// Numbers separated by commas are allowed. Example: "0,15,30,45" -> execute every 15 minutes.
// Whitespaces are ignored.
// If both off_hours_interval and off_hours_schedule are specified at the same time,
// off_hours_interval will be ignored.
OffHoursSchedule string `json:"off_hours_schedule"`
Backfill struct {
Enabled bool `json:"enabled"`
Since CustomDay `json:"since"`
Expand Down
90 changes: 90 additions & 0 deletions contrib/xignitefeeder/feed/schedule.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package feed

import (
"fmt"
"sort"
"strconv"
"strings"
"time"

"github.com/alpacahq/marketstore/v4/utils/log"
)

// ParseSchedule checks comma-separated numbers in a string format
// and returns the list of minutes that data feeding is executed.
// Examples:
// "0,15,30,45" -> [0,15,30,45] (the data feeding must be executed every 15 minutes)
// "50,10" -> [10, 50]
// " 20, 40 " -> [20, 40] (whitespaces are ignored)
// "20" -> [20] (00:10, 01:10, ..., 23:10)
// "100" -> error (minute must be between 0 and 59)
// "One" -> error (numbers must be used)
// "0-10" -> error (range is not supported)
func ParseSchedule(s string) ([]int, error) {
if s == "" {
log.Debug("[xignite] no schedule is set for off_hours")
return []int{}, nil
}
s = strings.ReplaceAll(s, " ", "")
strs := strings.Split(s, ",")

ret := make([]int, len(strs))
var err error
for i, m := range strs {
ret[i], err = strconv.Atoi(m)
if err != nil {
return nil, fmt.Errorf("parse %s for scheduling of xignite feeder: %w", m, err)
}

if ret[i] < 0 || ret[i] >= 60 {
return nil, fmt.Errorf("off_hours_schedule[min] must be between 0 and 59: got=%d", ret[i])
}
}

sort.Ints(ret)
return ret, nil
}

// ScheduledMarketTimeChecker is used where periodic processing is needed to run even when the market is closed.
type ScheduledMarketTimeChecker struct {
MarketTimeChecker
// LastTime holds the last time that IntervalTimeChceker.IsOpen returned true.
LastTime time.Time
ScheduleMin []int
}

func NewScheduledMarketTimeChecker(
mtc MarketTimeChecker,
scheduleMin []int,
) *ScheduledMarketTimeChecker {
return &ScheduledMarketTimeChecker{
MarketTimeChecker: mtc,
LastTime: time.Time{},
ScheduleMin: scheduleMin,
}
}

// IsOpen returns true when the market is open or the interval elapsed since LastTime.
func (c *ScheduledMarketTimeChecker) IsOpen(t time.Time) bool {
return c.MarketTimeChecker.IsOpen(t) || c.tick(t)
}

func (c *ScheduledMarketTimeChecker) tick(t time.Time) bool {
m := t.Minute()
for _, sche := range c.ScheduleMin {
if m != sche {
continue
}

// maximum frequency is once a minute
if t.Sub(c.LastTime) < 1*time.Minute {
continue
}

log.Debug(fmt.Sprintf("[Xignite Feeder] run data feed based on the schedule: %v(min)", c.ScheduleMin))
c.LastTime = t
return true
}

return false
}
131 changes: 131 additions & 0 deletions contrib/xignitefeeder/feed/schedule_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
package feed_test

import (
"reflect"
"testing"
"time"

"github.com/alpacahq/marketstore/v4/contrib/xignitefeeder/feed"
)

func TestParseSchedule(t *testing.T) {
t.Parallel()
tests := []struct {
name string
s string
want []int
wantErr bool
}{
{
name: "\"0,15,30,45\" -> every 15 minutes",
s: "0,15,30,45",
want: []int{0, 15, 30, 45},
wantErr: false,
},
{
name: "\"50,10\" -> 10 and 50, numbers are sorted",
s: "50,10",
want: []int{10, 50},
wantErr: false,
},
{
name: "whitespaces are ignored",
s: " 20, 40 ",
want: []int{20, 40},
wantErr: false,
},
{
name: "no schedule is set",
s: "",
want: []int{},
wantErr: false,
},
{
name: "NG/minute must be between [0, 59]",
s: "100",
want: nil,
wantErr: true,
},
{
name: "NG/range is not supported",
s: "0-10",
want: nil,
wantErr: true,
},
}
for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
t.Parallel()

got, err := feed.ParseSchedule(tt.s)
if (err != nil) != tt.wantErr {
t.Errorf("ParseSchedule() error = %v, wantErr %v", err, tt.wantErr)
return
}
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("ParseSchedule() got = %v, want %v", got, tt.want)
}
})
}
}

func TestScheduledMarketTimeChecker_IsOpen(t *testing.T) {
t.Parallel()
tests := map[string]struct {
MarketTimeChecker feed.MarketTimeChecker
ScheduleMin []int
CurrentTime time.Time
LastTime time.Time
want bool
}{
"ok: run - 00:15 matches {0} in the schedule": {
MarketTimeChecker: &mockMarketTimeChecker{isOpen: false},
ScheduleMin: []int{0, 15, 30, 45},
LastTime: time.Time{},
CurrentTime: time.Date(2021, 8, 20, 0, 15, 0, 0, time.UTC),
want: true,
},
"ok: not run - 00:01 does not match any of {0,15,30,45}": {
MarketTimeChecker: &mockMarketTimeChecker{isOpen: false},
ScheduleMin: []int{0, 15, 30, 45},
LastTime: time.Time{},
CurrentTime: time.Date(2021, 8, 20, 0, 1, 0, 0, time.UTC),
want: false,
},
"ok: not run - run only once per minute": {
MarketTimeChecker: &mockMarketTimeChecker{isOpen: false},
ScheduleMin: []int{20},
LastTime: time.Date(2021, 8, 20, 0, 19, 30, 0, time.UTC),
CurrentTime: time.Date(2021, 8, 20, 0, 20, 0, 0, time.UTC),
want: false,
},
"ok: run - always run when the original market time checker's IsOpen=true": {
MarketTimeChecker: &mockMarketTimeChecker{isOpen: true},
ScheduleMin: []int{20},
LastTime: time.Time{},
CurrentTime: time.Date(2021, 8, 20, 0, 0, 0, 0, time.UTC),
want: true,
},
}
for name := range tests {
tt := tests[name]
t.Run(name, func(t *testing.T) {
t.Parallel()
// --- given ---
c := feed.NewScheduledMarketTimeChecker(
tt.MarketTimeChecker,
tt.ScheduleMin,
)
c.LastTime = tt.LastTime

// --- when ---
got := c.IsOpen(tt.CurrentTime)

// --- then ---
if got != tt.want {
t.Errorf("IsOpen() = %v, want %v", got, tt.want)
}
})
}
}
15 changes: 14 additions & 1 deletion contrib/xignitefeeder/xignitefeeder.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,20 @@ func NewBgWorker(conf map[string]interface{}) (bgworker.BgWorker, error) {
config.ClosedDays,
config.OpenTime,
config.CloseTime)
if config.OffHoursInterval != 0 {
if config.OffHoursSchedule != "" {
scheduleMin, err := feed.ParseSchedule(config.OffHoursSchedule)
if err != nil {
return nil, fmt.Errorf("parse off_hours_schedule %s: %w", config.OffHoursSchedule, err)
}
log.Info(fmt.Sprintf("[Xignite Feeder] off_hours_schedule=%s[min] is set. "+
"The data will be retrieved at %s [minute] even when the market is closed.",
config.OffHoursSchedule, config.OffHoursSchedule),
)
timeChecker = feed.NewScheduledMarketTimeChecker(
timeChecker,
scheduleMin,
)
} else if config.OffHoursInterval != 0 {
log.Info(fmt.Sprintf("[Xignite Feeder] off_hours_interval=%dmin is set. "+
"The data will be retrieved every %d minutes even when the market is closed.",
config.OffHoursInterval, config.OffHoursInterval),
Expand Down

0 comments on commit 0b956f4

Please sign in to comment.