diff --git a/batch/batch.go b/batch/batch.go index fa01f41..b2d117d 100644 --- a/batch/batch.go +++ b/batch/batch.go @@ -154,19 +154,18 @@ func (m *batchManager) removeBatch(batch *batch) { } func (m *batchManager) removeStaleBatches() { - m.mu.Lock() - defer m.mu.Unlock() + util.Debugf("Checking for stale batches") for _, b := range m.Batches { createdAt, err := time.Parse(time.RFC3339Nano, b.Meta.CreatedAt) if err != nil { continue } remove := false - uncomittedTimeout := time.Now().Add(-time.Duration(m.Subsystem.Options.UncommittedTimeoutMinutes) * time.Minute) - comittedTimeout := time.Now().AddDate(0, 0, -m.Subsystem.Options.CommittedTimeoutDays) - if !b.Meta.Committed && createdAt.Before(uncomittedTimeout) { + uncommittedTimeout := time.Now().Add(-time.Duration(m.Subsystem.Options.UncommittedTimeoutMinutes) * time.Minute).UTC() + committedTimeout := time.Now().AddDate(0, 0, -m.Subsystem.Options.CommittedTimeoutDays).UTC() + if !b.Meta.Committed && createdAt.Before(uncommittedTimeout) { remove = true - } else if b.Meta.Committed && createdAt.Before(comittedTimeout) { + } else if b.Meta.Committed && createdAt.Before(committedTimeout) { remove = true } diff --git a/batch/batch_test.go b/batch/batch_test.go index c44d264..5a5ca97 100644 --- a/batch/batch_test.go +++ b/batch/batch_test.go @@ -2,6 +2,8 @@ package batch import ( "errors" + "fmt" + "github.com/contribsys/faktory/util" "math/rand" "os" "strconv" @@ -379,6 +381,34 @@ func TestBatchBatchWithoutCallbacks(t *testing.T) { }) } +func TestRemoveStaleBatches(t *testing.T) { + batchSystem := new(BatchSubsystem) + + withServer(batchSystem, true, func(cl *client.Client) { + committedBatchId := fmt.Sprintf("b-%s", util.RandomJid()) + meta := batchSystem.batchManager.newBatchMeta("testing", "", "", nil) + meta.CreatedAt = time.Now().UTC().Add(-time.Duration(1)*time.Minute).AddDate(0, 0, -batchSystem.Options.CommittedTimeoutDays).Format(time.RFC3339Nano) + batch, err := batchSystem.batchManager.newBatch(committedBatchId, meta) + assert.Nil(t, err) + err = batchSystem.batchManager.commit(batch) + assert.Nil(t, err) + + uncommittedBatchId := fmt.Sprintf("b-%s", util.RandomJid()) + uncommittedMeta := batchSystem.batchManager.newBatchMeta("testing", "", "", nil) + uncommittedMeta.CreatedAt = time.Now().UTC().Add(-time.Duration(batchSystem.Options.UncommittedTimeoutMinutes+1) * time.Minute).UTC().Format(time.RFC3339Nano) + _, err = batchSystem.batchManager.newBatch(uncommittedBatchId, uncommittedMeta) + assert.Nil(t, err) + + batchSystem.batchManager.removeStaleBatches() + + _, err = batchSystem.batchManager.getBatch(committedBatchId) + assert.EqualError(t, err, "getBatch: no batch found") + + _, err = batchSystem.batchManager.getBatch(uncommittedBatchId) + assert.EqualError(t, err, "getBatch: no batch found") + }) +} + func withServer(batchSystem *BatchSubsystem, enabled bool, runner func(cl *client.Client)) { dir := "/tmp/batching_test.db" defer os.RemoveAll(dir) diff --git a/batch/subsystem.go b/batch/subsystem.go index a332206..c4fdd4b 100644 --- a/batch/subsystem.go +++ b/batch/subsystem.go @@ -70,28 +70,28 @@ func (b *BatchSubsystem) getOptions(s *server.Server) *Options { enabled = false } childSearchDepthValue := s.Options.Config("batch", "child_search_depth", 0) - childSearchDepth, ok := childSearchDepthValue.(int) + childSearchDepth, ok := childSearchDepthValue.(int64) if !ok { childSearchDepth = 0 } uncommittedTimeoutValue := s.Options.Config("batch", "uncommitted_timeout_minutes", 120) - uncommittedTimeout, ok := uncommittedTimeoutValue.(int) + uncommittedTimeout, ok := uncommittedTimeoutValue.(int64) if !ok { uncommittedTimeout = 120 } committedTimeoutValue := s.Options.Config("batch", "committed_timeout_days", 7) - committedTimeout, ok := committedTimeoutValue.(int) + committedTimeout, ok := committedTimeoutValue.(int64) if !ok { committedTimeout = 7 } return &Options{ Enabled: enabled, - ChildSearchDepth: childSearchDepth, - UncommittedTimeoutMinutes: uncommittedTimeout, - CommittedTimeoutDays: committedTimeout, + ChildSearchDepth: int(childSearchDepth), + UncommittedTimeoutMinutes: int(uncommittedTimeout), + CommittedTimeoutDays: int(committedTimeout), } }