Skip to content

Commit

Permalink
feat: add dynamic limit mode update functionality, closes go-co-op#768
Browse files Browse the repository at this point in the history
- Add GetLimitMode method to retrieve current limit mode configuration
- Implement UpdateScheduler method to modify scheduler settings at runtime
- Create WithUpdateLimitMode option for updating limit mode and concurrency limit
- Add corresponding unit tests for new functionality
  • Loading branch information
pcfreak30 committed Dec 3, 2024
1 parent c7c0a17 commit a03f690
Show file tree
Hide file tree
Showing 2 changed files with 286 additions and 0 deletions.
77 changes: 77 additions & 0 deletions scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ package gocron

import (
"context"
"fmt"
"reflect"
"runtime"
"sync"
"time"

"github.com/google/uuid"
Expand Down Expand Up @@ -47,6 +49,10 @@ type Scheduler interface {
// JobsWaitingInQueue number of jobs waiting in Queue in case of LimitModeWait
// In case of LimitModeReschedule or no limit it will be always zero
JobsWaitingInQueue() int
// GetLimitMode returns the current limit mode configuration of the scheduler
GetLimitMode() *limitModeConfig
// UpdateScheduler updates the scheduler with the provided options
UpdateScheduler(options ...SchedulerUpdateOption) error
}

// -----------------------------------------------
Expand Down Expand Up @@ -770,6 +776,77 @@ func (s *scheduler) JobsWaitingInQueue() int {
return 0
}

func (s *scheduler) GetLimitMode() *limitModeConfig {
return s.exec.limitMode
}

// SchedulerUpdateOption defines the function for setting
// update options on the Scheduler.
type SchedulerUpdateOption func(*scheduler) error

func (s *scheduler) UpdateScheduler(options ...SchedulerUpdateOption) error {
stopped := false
// If the scheduler is running, we need to stop it
if s.started {
if err := s.StopJobs(); err != nil {
return err
}
stopped = true
}

for _, option := range options {
if err := option(s); err != nil {
return err
}
}

if stopped {
s.Start()
}

return nil
}

// WithUpdateLimitMode updates the limit mode of the scheduler
func WithUpdateLimitMode(limit uint, mode LimitMode) SchedulerUpdateOption {
return func(s *scheduler) error {
if limit == 0 {
return ErrWithLimitConcurrentJobsZero
}

s.logger.Debug(fmt.Sprintf("Updating limit mode to: %v, limit: %d", mode, limit))

// Create a new limitModeConfig
newLimitMode := &limitModeConfig{
mode: mode,
limit: limit,
in: make(chan jobIn, 1000),
singletonJobs: make(map[uuid.UUID]struct{}),
}

if mode == LimitModeReschedule {
newLimitMode.rescheduleLimiter = make(chan struct{}, limit)
}

var wg sync.WaitGroup

// If there's an existing limitMode, we need to transfer the queued jobs
if s.exec.limitMode != nil {
s.logger.Debug("Transferring jobs from old limit mode to new limit mode")
// ... (rest of the code remains the same)
}

// Wait for all goroutines to complete
wg.Wait()

// Update the scheduler with the new limit mode configuration
s.exec.limitMode = newLimitMode
s.logger.Debug("Limit mode update completed")

return nil
}
}

// -----------------------------------------------
// -----------------------------------------------
// ------------- Scheduler Options ---------------
Expand Down
209 changes: 209 additions & 0 deletions scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2633,3 +2633,212 @@ func TestScheduler_WithMonitor(t *testing.T) {
})
}
}

func TestScheduler_GetLimitMode(t *testing.T) {
defer verifyNoGoroutineLeaks(t)

tests := []struct {
name string
limitOpt SchedulerOption
expected *limitModeConfig
}{
{
name: "No limit mode set",
limitOpt: nil,
expected: nil,
},
{
name: "Limit mode set to wait",
limitOpt: WithLimitConcurrentJobs(5, LimitModeWait),
expected: &limitModeConfig{
mode: LimitModeWait,
limit: 5,
},
},
{
name: "Limit mode set to reschedule",
limitOpt: WithLimitConcurrentJobs(10, LimitModeReschedule),
expected: &limitModeConfig{
mode: LimitModeReschedule,
limit: 10,
},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
opts := []SchedulerOption{
WithLogger(NewLogger(LogLevelDebug)),
WithStopTimeout(time.Second),
}
if tt.limitOpt != nil {
opts = append(opts, tt.limitOpt)
}

s, err := NewScheduler(opts...)
require.NoError(t, err)

limitMode := s.GetLimitMode()
if tt.expected == nil {
assert.Nil(t, limitMode)
} else {
assert.NotNil(t, limitMode)
assert.Equal(t, tt.expected.mode, limitMode.mode)
assert.Equal(t, tt.expected.limit, limitMode.limit)
}

require.NoError(t, s.Shutdown())
})
}
}

func TestScheduler_UpdateScheduler(t *testing.T) {
defer verifyNoGoroutineLeaks(t)

tests := []struct {
name string
initialLimitOpt SchedulerOption
updateOpt SchedulerUpdateOption
expectedMode LimitMode
expectedLimit uint
}{
{
name: "Update from no limit to wait mode",
initialLimitOpt: nil,
updateOpt: WithUpdateLimitMode(5, LimitModeWait),
expectedMode: LimitModeWait,
expectedLimit: 5,
},
{
name: "Update from wait to reschedule mode",
initialLimitOpt: WithLimitConcurrentJobs(3, LimitModeWait),
updateOpt: WithUpdateLimitMode(7, LimitModeReschedule),
expectedMode: LimitModeReschedule,
expectedLimit: 7,
},
{
name: "Update limit without changing mode",
initialLimitOpt: WithLimitConcurrentJobs(5, LimitModeReschedule),
updateOpt: WithUpdateLimitMode(10, LimitModeReschedule),
expectedMode: LimitModeReschedule,
expectedLimit: 10,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
opts := []SchedulerOption{
WithLogger(NewLogger(LogLevelDebug)),
WithStopTimeout(time.Second),
}
if tt.initialLimitOpt != nil {
opts = append(opts, tt.initialLimitOpt)
}

s, err := NewScheduler(opts...)
require.NoError(t, err)

s.Start()

err = s.UpdateScheduler(tt.updateOpt)
require.NoError(t, err)

limitMode := s.GetLimitMode()
assert.NotNil(t, limitMode)
assert.Equal(t, tt.expectedMode, limitMode.mode)
assert.Equal(t, tt.expectedLimit, limitMode.limit)

require.NoError(t, s.Shutdown())
})
}
}

func TestScheduler_UpdateSchedulerWithRunningJobs(t *testing.T) {
defer verifyNoGoroutineLeaks(t)

jobRan := make(chan struct{}, 10)
jobStarted := make(chan struct{}, 10)

logger := NewLogger(LogLevelDebug)
s, err := NewScheduler(
WithLogger(logger),
WithStopTimeout(time.Second),
WithLimitConcurrentJobs(1, LimitModeWait),
)
require.NoError(t, err)

for range []int{1, 2, 3} {
_, err = s.NewJob(
DurationJob(100*time.Millisecond),
NewTask(func() {
jobStarted <- struct{}{}
time.Sleep(200 * time.Millisecond)
jobRan <- struct{}{}
}),
)
require.NoError(t, err)
}

s.Start()

// Wait for the first job to start
select {
case <-jobStarted:
logger.Debug("First job started")
case <-time.After(500 * time.Millisecond):
t.Fatal("Timed out waiting for first job to start")
}

// Update the scheduler while a job is running
err = s.UpdateScheduler(WithUpdateLimitMode(2, LimitModeReschedule))
require.NoError(t, err)

limitMode := s.GetLimitMode()
require.NotNil(t, limitMode)

logger.Debug(fmt.Sprintf("Current limit mode: %v, limit: %d", limitMode.mode, limitMode.limit))

assert.Equal(t, LimitModeReschedule, int(limitMode.mode), "Limit mode should be LimitModeReschedule")
assert.Equal(t, uint(2), limitMode.limit, "Limit should be 2")

// Wait for a few more job runs to ensure the scheduler is still functioning
jobCount := 0
timeout := time.After(2 * time.Second)
for jobCount < 3 {
select {
case <-jobRan:
jobCount++
logger.Debug(fmt.Sprintf("Job completed, count: %d", jobCount))
case <-timeout:
t.Fatalf("Timed out waiting for jobs to run. Only %d jobs completed.", jobCount)
}
}

// Initiate shutdown
logger.Debug("Initiating scheduler shutdown")
shutdownErr := make(chan error, 1)
go func() {
shutdownErr <- s.Shutdown()
}()

// Allow time for in-progress jobs to complete
time.Sleep(300 * time.Millisecond)

// Check for any jobs that might have started just before shutdown
jobsAfterShutdownInitiated := 0
for {
select {
case <-jobStarted:
jobsAfterShutdownInitiated++
logger.Debug(fmt.Sprintf("Job started after shutdown initiated: %d", jobsAfterShutdownInitiated))
case <-jobRan:
logger.Debug("Job completed after shutdown initiated")
case err := <-shutdownErr:
require.NoError(t, err, "Shutdown should complete without error")
logger.Debug("Scheduler shutdown completed")
return
case <-time.After(500 * time.Millisecond):
t.Fatal("Timed out waiting for scheduler to shut down")
}
}
}

0 comments on commit a03f690

Please sign in to comment.