Skip to content

Commit

Permalink
Fix deadlock in remove stale batches (#15)
Browse files Browse the repository at this point in the history
* fix deadlock

* add test for removing stale batches

* remove fmt.Println

* use int64 for reading config

* add debug log
  • Loading branch information
jagonalez authored Feb 25, 2022
1 parent a5d76d4 commit cc50bd6
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 12 deletions.
11 changes: 5 additions & 6 deletions batch/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,19 +154,18 @@ func (m *batchManager) removeBatch(batch *batch) {
}

func (m *batchManager) removeStaleBatches() {
m.mu.Lock()
defer m.mu.Unlock()
util.Debugf("Checking for stale batches")
for _, b := range m.Batches {
createdAt, err := time.Parse(time.RFC3339Nano, b.Meta.CreatedAt)
if err != nil {
continue
}
remove := false
uncomittedTimeout := time.Now().Add(-time.Duration(m.Subsystem.Options.UncommittedTimeoutMinutes) * time.Minute)
comittedTimeout := time.Now().AddDate(0, 0, -m.Subsystem.Options.CommittedTimeoutDays)
if !b.Meta.Committed && createdAt.Before(uncomittedTimeout) {
uncommittedTimeout := time.Now().Add(-time.Duration(m.Subsystem.Options.UncommittedTimeoutMinutes) * time.Minute).UTC()
committedTimeout := time.Now().AddDate(0, 0, -m.Subsystem.Options.CommittedTimeoutDays).UTC()
if !b.Meta.Committed && createdAt.Before(uncommittedTimeout) {
remove = true
} else if b.Meta.Committed && createdAt.Before(comittedTimeout) {
} else if b.Meta.Committed && createdAt.Before(committedTimeout) {
remove = true
}

Expand Down
30 changes: 30 additions & 0 deletions batch/batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package batch

import (
"errors"
"fmt"
"github.com/contribsys/faktory/util"
"math/rand"
"os"
"strconv"
Expand Down Expand Up @@ -379,6 +381,34 @@ func TestBatchBatchWithoutCallbacks(t *testing.T) {
})
}

func TestRemoveStaleBatches(t *testing.T) {
batchSystem := new(BatchSubsystem)

withServer(batchSystem, true, func(cl *client.Client) {
committedBatchId := fmt.Sprintf("b-%s", util.RandomJid())
meta := batchSystem.batchManager.newBatchMeta("testing", "", "", nil)
meta.CreatedAt = time.Now().UTC().Add(-time.Duration(1)*time.Minute).AddDate(0, 0, -batchSystem.Options.CommittedTimeoutDays).Format(time.RFC3339Nano)
batch, err := batchSystem.batchManager.newBatch(committedBatchId, meta)
assert.Nil(t, err)
err = batchSystem.batchManager.commit(batch)
assert.Nil(t, err)

uncommittedBatchId := fmt.Sprintf("b-%s", util.RandomJid())
uncommittedMeta := batchSystem.batchManager.newBatchMeta("testing", "", "", nil)
uncommittedMeta.CreatedAt = time.Now().UTC().Add(-time.Duration(batchSystem.Options.UncommittedTimeoutMinutes+1) * time.Minute).UTC().Format(time.RFC3339Nano)
_, err = batchSystem.batchManager.newBatch(uncommittedBatchId, uncommittedMeta)
assert.Nil(t, err)

batchSystem.batchManager.removeStaleBatches()

_, err = batchSystem.batchManager.getBatch(committedBatchId)
assert.EqualError(t, err, "getBatch: no batch found")

_, err = batchSystem.batchManager.getBatch(uncommittedBatchId)
assert.EqualError(t, err, "getBatch: no batch found")
})
}

func withServer(batchSystem *BatchSubsystem, enabled bool, runner func(cl *client.Client)) {
dir := "/tmp/batching_test.db"
defer os.RemoveAll(dir)
Expand Down
12 changes: 6 additions & 6 deletions batch/subsystem.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,28 +70,28 @@ func (b *BatchSubsystem) getOptions(s *server.Server) *Options {
enabled = false
}
childSearchDepthValue := s.Options.Config("batch", "child_search_depth", 0)
childSearchDepth, ok := childSearchDepthValue.(int)
childSearchDepth, ok := childSearchDepthValue.(int64)
if !ok {
childSearchDepth = 0
}

uncommittedTimeoutValue := s.Options.Config("batch", "uncommitted_timeout_minutes", 120)
uncommittedTimeout, ok := uncommittedTimeoutValue.(int)
uncommittedTimeout, ok := uncommittedTimeoutValue.(int64)
if !ok {
uncommittedTimeout = 120
}

committedTimeoutValue := s.Options.Config("batch", "committed_timeout_days", 7)
committedTimeout, ok := committedTimeoutValue.(int)
committedTimeout, ok := committedTimeoutValue.(int64)
if !ok {
committedTimeout = 7
}

return &Options{
Enabled: enabled,
ChildSearchDepth: childSearchDepth,
UncommittedTimeoutMinutes: uncommittedTimeout,
CommittedTimeoutDays: committedTimeout,
ChildSearchDepth: int(childSearchDepth),
UncommittedTimeoutMinutes: int(uncommittedTimeout),
CommittedTimeoutDays: int(committedTimeout),
}
}

Expand Down

0 comments on commit cc50bd6

Please sign in to comment.