From a03f6907c6f2b6bf1d49f1d5190ad7f707cce9d3 Mon Sep 17 00:00:00 2001 From: Derrick Hammer Date: Sun, 20 Oct 2024 22:00:25 -0400 Subject: [PATCH] feat: add dynamic limit mode update functionality, closes #768 - Add GetLimitMode method to retrieve current limit mode configuration - Implement UpdateScheduler method to modify scheduler settings at runtime - Create WithUpdateLimitMode option for updating limit mode and concurrency limit - Add corresponding unit tests for new functionality --- scheduler.go | 77 +++++++++++++++++ scheduler_test.go | 209 ++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 286 insertions(+) diff --git a/scheduler.go b/scheduler.go index 90ff5212..26203e3a 100644 --- a/scheduler.go +++ b/scheduler.go @@ -3,8 +3,10 @@ package gocron import ( "context" + "fmt" "reflect" "runtime" + "sync" "time" "github.com/google/uuid" @@ -47,6 +49,10 @@ type Scheduler interface { // JobsWaitingInQueue number of jobs waiting in Queue in case of LimitModeWait // In case of LimitModeReschedule or no limit it will be always zero JobsWaitingInQueue() int + // GetLimitMode returns the current limit mode configuration of the scheduler + GetLimitMode() *limitModeConfig + // UpdateScheduler updates the scheduler with the provided options + UpdateScheduler(options ...SchedulerUpdateOption) error } // ----------------------------------------------- @@ -770,6 +776,77 @@ func (s *scheduler) JobsWaitingInQueue() int { return 0 } +func (s *scheduler) GetLimitMode() *limitModeConfig { + return s.exec.limitMode +} + +// SchedulerUpdateOption defines the function for setting +// update options on the Scheduler. +type SchedulerUpdateOption func(*scheduler) error + +func (s *scheduler) UpdateScheduler(options ...SchedulerUpdateOption) error { + stopped := false + // If the scheduler is running, we need to stop it + if s.started { + if err := s.StopJobs(); err != nil { + return err + } + stopped = true + } + + for _, option := range options { + if err := option(s); err != nil { + return err + } + } + + if stopped { + s.Start() + } + + return nil +} + +// WithUpdateLimitMode updates the limit mode of the scheduler +func WithUpdateLimitMode(limit uint, mode LimitMode) SchedulerUpdateOption { + return func(s *scheduler) error { + if limit == 0 { + return ErrWithLimitConcurrentJobsZero + } + + s.logger.Debug(fmt.Sprintf("Updating limit mode to: %v, limit: %d", mode, limit)) + + // Create a new limitModeConfig + newLimitMode := &limitModeConfig{ + mode: mode, + limit: limit, + in: make(chan jobIn, 1000), + singletonJobs: make(map[uuid.UUID]struct{}), + } + + if mode == LimitModeReschedule { + newLimitMode.rescheduleLimiter = make(chan struct{}, limit) + } + + var wg sync.WaitGroup + + // If there's an existing limitMode, we need to transfer the queued jobs + if s.exec.limitMode != nil { + s.logger.Debug("Transferring jobs from old limit mode to new limit mode") + // ... (rest of the code remains the same) + } + + // Wait for all goroutines to complete + wg.Wait() + + // Update the scheduler with the new limit mode configuration + s.exec.limitMode = newLimitMode + s.logger.Debug("Limit mode update completed") + + return nil + } +} + // ----------------------------------------------- // ----------------------------------------------- // ------------- Scheduler Options --------------- diff --git a/scheduler_test.go b/scheduler_test.go index d335c4a1..e69cf1f4 100644 --- a/scheduler_test.go +++ b/scheduler_test.go @@ -2633,3 +2633,212 @@ func TestScheduler_WithMonitor(t *testing.T) { }) } } + +func TestScheduler_GetLimitMode(t *testing.T) { + defer verifyNoGoroutineLeaks(t) + + tests := []struct { + name string + limitOpt SchedulerOption + expected *limitModeConfig + }{ + { + name: "No limit mode set", + limitOpt: nil, + expected: nil, + }, + { + name: "Limit mode set to wait", + limitOpt: WithLimitConcurrentJobs(5, LimitModeWait), + expected: &limitModeConfig{ + mode: LimitModeWait, + limit: 5, + }, + }, + { + name: "Limit mode set to reschedule", + limitOpt: WithLimitConcurrentJobs(10, LimitModeReschedule), + expected: &limitModeConfig{ + mode: LimitModeReschedule, + limit: 10, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + opts := []SchedulerOption{ + WithLogger(NewLogger(LogLevelDebug)), + WithStopTimeout(time.Second), + } + if tt.limitOpt != nil { + opts = append(opts, tt.limitOpt) + } + + s, err := NewScheduler(opts...) + require.NoError(t, err) + + limitMode := s.GetLimitMode() + if tt.expected == nil { + assert.Nil(t, limitMode) + } else { + assert.NotNil(t, limitMode) + assert.Equal(t, tt.expected.mode, limitMode.mode) + assert.Equal(t, tt.expected.limit, limitMode.limit) + } + + require.NoError(t, s.Shutdown()) + }) + } +} + +func TestScheduler_UpdateScheduler(t *testing.T) { + defer verifyNoGoroutineLeaks(t) + + tests := []struct { + name string + initialLimitOpt SchedulerOption + updateOpt SchedulerUpdateOption + expectedMode LimitMode + expectedLimit uint + }{ + { + name: "Update from no limit to wait mode", + initialLimitOpt: nil, + updateOpt: WithUpdateLimitMode(5, LimitModeWait), + expectedMode: LimitModeWait, + expectedLimit: 5, + }, + { + name: "Update from wait to reschedule mode", + initialLimitOpt: WithLimitConcurrentJobs(3, LimitModeWait), + updateOpt: WithUpdateLimitMode(7, LimitModeReschedule), + expectedMode: LimitModeReschedule, + expectedLimit: 7, + }, + { + name: "Update limit without changing mode", + initialLimitOpt: WithLimitConcurrentJobs(5, LimitModeReschedule), + updateOpt: WithUpdateLimitMode(10, LimitModeReschedule), + expectedMode: LimitModeReschedule, + expectedLimit: 10, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + opts := []SchedulerOption{ + WithLogger(NewLogger(LogLevelDebug)), + WithStopTimeout(time.Second), + } + if tt.initialLimitOpt != nil { + opts = append(opts, tt.initialLimitOpt) + } + + s, err := NewScheduler(opts...) + require.NoError(t, err) + + s.Start() + + err = s.UpdateScheduler(tt.updateOpt) + require.NoError(t, err) + + limitMode := s.GetLimitMode() + assert.NotNil(t, limitMode) + assert.Equal(t, tt.expectedMode, limitMode.mode) + assert.Equal(t, tt.expectedLimit, limitMode.limit) + + require.NoError(t, s.Shutdown()) + }) + } +} + +func TestScheduler_UpdateSchedulerWithRunningJobs(t *testing.T) { + defer verifyNoGoroutineLeaks(t) + + jobRan := make(chan struct{}, 10) + jobStarted := make(chan struct{}, 10) + + logger := NewLogger(LogLevelDebug) + s, err := NewScheduler( + WithLogger(logger), + WithStopTimeout(time.Second), + WithLimitConcurrentJobs(1, LimitModeWait), + ) + require.NoError(t, err) + + for range []int{1, 2, 3} { + _, err = s.NewJob( + DurationJob(100*time.Millisecond), + NewTask(func() { + jobStarted <- struct{}{} + time.Sleep(200 * time.Millisecond) + jobRan <- struct{}{} + }), + ) + require.NoError(t, err) + } + + s.Start() + + // Wait for the first job to start + select { + case <-jobStarted: + logger.Debug("First job started") + case <-time.After(500 * time.Millisecond): + t.Fatal("Timed out waiting for first job to start") + } + + // Update the scheduler while a job is running + err = s.UpdateScheduler(WithUpdateLimitMode(2, LimitModeReschedule)) + require.NoError(t, err) + + limitMode := s.GetLimitMode() + require.NotNil(t, limitMode) + + logger.Debug(fmt.Sprintf("Current limit mode: %v, limit: %d", limitMode.mode, limitMode.limit)) + + assert.Equal(t, LimitModeReschedule, int(limitMode.mode), "Limit mode should be LimitModeReschedule") + assert.Equal(t, uint(2), limitMode.limit, "Limit should be 2") + + // Wait for a few more job runs to ensure the scheduler is still functioning + jobCount := 0 + timeout := time.After(2 * time.Second) + for jobCount < 3 { + select { + case <-jobRan: + jobCount++ + logger.Debug(fmt.Sprintf("Job completed, count: %d", jobCount)) + case <-timeout: + t.Fatalf("Timed out waiting for jobs to run. Only %d jobs completed.", jobCount) + } + } + + // Initiate shutdown + logger.Debug("Initiating scheduler shutdown") + shutdownErr := make(chan error, 1) + go func() { + shutdownErr <- s.Shutdown() + }() + + // Allow time for in-progress jobs to complete + time.Sleep(300 * time.Millisecond) + + // Check for any jobs that might have started just before shutdown + jobsAfterShutdownInitiated := 0 + for { + select { + case <-jobStarted: + jobsAfterShutdownInitiated++ + logger.Debug(fmt.Sprintf("Job started after shutdown initiated: %d", jobsAfterShutdownInitiated)) + case <-jobRan: + logger.Debug("Job completed after shutdown initiated") + case err := <-shutdownErr: + require.NoError(t, err, "Shutdown should complete without error") + logger.Debug("Scheduler shutdown completed") + return + case <-time.After(500 * time.Millisecond): + t.Fatal("Timed out waiting for scheduler to shut down") + } + } +}