Skip to content

Commit

Permalink
scheduler: Add d and next field (#48)
Browse files Browse the repository at this point in the history
  • Loading branch information
sunshineplan authored May 23, 2024
1 parent 0feb54c commit e27879f
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 15 deletions.
67 changes: 54 additions & 13 deletions scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ type Scheduler struct {
fn []func(time.Time)

sched complexSched
d time.Duration
next time.Time

ctx context.Context
cancel context.CancelFunc
Expand All @@ -33,23 +35,35 @@ func NewScheduler() *Scheduler {
}

func (sched *Scheduler) At(schedules ...Schedule) *Scheduler {
if len(schedules) == 0 {
panic("no schedules")
}
sched.mu.Lock()
defer sched.mu.Unlock()
sched.sched = multiSched(schedules)
sched.d = sched.sched.TickerDuration()
return sched
}

func (sched *Scheduler) AtCondition(schedules ...Schedule) *Scheduler {
if len(schedules) == 0 {
panic("no schedules")
}
sched.mu.Lock()
defer sched.mu.Unlock()
sched.sched = condSched(schedules)
sched.d = sched.sched.TickerDuration()
return sched
}

func (sched *Scheduler) Clear() {
sched.mu.Lock()
defer sched.mu.Unlock()
sched.sched = nil
sched.d = 0
if sched.ctx != nil && sched.ctx.Err() == nil {
sched.Stop()
}
}

func (sched *Scheduler) Run(fn ...func(time.Time)) *Scheduler {
Expand All @@ -63,7 +77,7 @@ func (sched *Scheduler) Run(fn ...func(time.Time)) *Scheduler {
return sched
}

func (sched *Scheduler) init(d time.Duration) error {
func (sched *Scheduler) init() error {
sched.mu.Lock()
defer sched.mu.Unlock()
if len(sched.fn) == 0 {
Expand All @@ -78,8 +92,9 @@ func (sched *Scheduler) init(d time.Duration) error {
defer timer.Stop()
t := <-timer.C
sched.sched.init(t)
sched.next = sched.sched.Next(t)
subscribeNotify(sched.notify)
sched.newTimer(sched.sched.Next(t).Sub(t), d)
sched.newTimer(time.Now())
return nil
}
return ErrAlreadyRunning
Expand All @@ -88,26 +103,45 @@ func (sched *Scheduler) init(d time.Duration) error {
func (sched *Scheduler) checkMatched(t time.Time) {
sched.mu.Lock()
defer sched.mu.Unlock()
var matched time.Time
var notify bool
if sched.sched.IsMatched(t) {
sched.tc <- t
matched = t
} else if sched.sched.TickerDuration() >= time.Minute {
if minus1s := t.Add(-time.Second); sched.sched.IsMatched(minus1s) {
sched.tc <- minus1s
sched.notify <- t
matched = minus1s
notify = true
} else if plus1s := t.Add(time.Second); sched.sched.IsMatched(plus1s) {
sched.tc <- plus1s
matched = plus1s
time.Sleep(2 * time.Second)
sched.notify <- time.Now()
notify = true
}
}
if !matched.IsZero() && !notify {
if sched.next = sched.sched.Next(matched.Truncate(time.Second).Add(time.Second)); sched.next.IsZero() {
sched.Stop()
return
}
} else if t.After(sched.next) {
notify = true
}
if notify {
sched.notify <- time.Now()
}
}

func (sched *Scheduler) newTimer(first, duration time.Duration) {
func (sched *Scheduler) newTimer(t time.Time) {
if sched.next.IsZero() {
sched.Stop()
return
}
ctx, cancel := context.WithCancel(context.Background())
sched.timer = time.AfterFunc(first, func() {
sched.timer = time.AfterFunc(sched.next.Sub(t), func() {
cancel()
var now time.Time
sched.ticker, now = time.NewTicker(duration), time.Now()
sched.ticker = time.NewTicker(sched.d)
go func() {
for {
select {
Expand All @@ -117,23 +151,30 @@ func (sched *Scheduler) newTimer(first, duration time.Duration) {
sched.ticker.Stop()
sched.mu.Lock()
defer sched.mu.Unlock()
sched.newTimer(sched.sched.Next(t).Sub(t), duration)
sched.next = sched.sched.Next(t)
sched.newTimer(t)
return
case <-sched.ctx.Done():
sched.ticker.Stop()
return
}
}
}()
sched.checkMatched(now)
sched.checkMatched(time.Now())
})
go func() {
for {
select {
case t := <-sched.notify:
sched.mu.Lock()
if sched.timer.Stop() {
sched.timer.Reset(sched.sched.Next(t).Sub(t))
sched.next = sched.sched.Next(t)
if sched.next.IsZero() {
cancel()
sched.Stop()
return
}
sched.timer.Reset(sched.next.Sub(t))
}
sched.mu.Unlock()
case <-sched.ctx.Done():
Expand All @@ -148,7 +189,7 @@ func (sched *Scheduler) newTimer(first, duration time.Duration) {
}

func (sched *Scheduler) Start() error {
if err := sched.init(sched.sched.TickerDuration()); err != nil {
if err := sched.init(); err != nil {
return err
}
go func() {
Expand Down Expand Up @@ -200,7 +241,7 @@ func (sched *Scheduler) Immediately() <-chan error {
func (sched *Scheduler) Once() <-chan error {
done := make(chan error)
go func() {
if err := sched.init(sched.sched.TickerDuration()); err != nil {
if err := sched.init(); err != nil {
done <- err
return
}
Expand Down
8 changes: 6 additions & 2 deletions scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,20 @@ func TestTickerScheduler1(t *testing.T) {
s := NewScheduler().At(Every(time.Second))
defer s.Stop()
var a, b atomic.Int32
if err := s.Do(func(_ time.Time) { a.Store(1) }); err != nil {
if err := s.Do(func(_ time.Time) { a.Add(1) }); err != nil {
t.Fatal(err)
}
if err := s.Do(func(_ time.Time) { b.Store(1) }); err != nil {
if err := s.Do(func(_ time.Time) { b.Add(1) }); err != nil {
t.Fatal(err)
}
time.Sleep(1500 * time.Millisecond)
if a, b := a.Load(), b.Load(); a != 1 || b != 1 {
t.Errorf("expected 1 1; got %d %d", a, b)
}
time.Sleep(1600 * time.Millisecond)
if a, b := a.Load(), b.Load(); a != 3 || b != 3 {
t.Errorf("expected 3 3; got %d %d", a, b)
}
}

func TestTickerScheduler2(t *testing.T) {
Expand Down

0 comments on commit e27879f

Please sign in to comment.