Skip to content

Commit

Permalink
[AENG 775] Complete and or success callback must be included in job c…
Browse files Browse the repository at this point in the history
…reation (#14)

Check that at least one callback is provided when creating a batch. Remove batch when only complete callback is provided, callback state is 2 and all children succeeded.
  • Loading branch information
sebamarucci authored Feb 17, 2022
1 parent a01c939 commit a5d76d4
Show file tree
Hide file tree
Showing 4 changed files with 108 additions and 7 deletions.
17 changes: 10 additions & 7 deletions batch/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,16 +438,19 @@ func (m *batchManager) updateJobCallbackState(batch *batch, callbackType string,
if err := m.rclient.Set(m.getCompleteJobStateKey(batch.Id), state, timeout).Err(); err != nil {
return fmt.Errorf("updateJobCallbackState: could not set completed_st: %v", err)
}
if _, areChildrenSucceeded := m.areChildrenFinished(batch); areChildrenSucceeded && batch.Meta.SuccessJob == "" && state == CallbackJobSucceeded {
m.removeBatch(batch)
}
}
return nil
}

func (m *batchManager) addJobToBatch(batch *batch) error {
batch.Meta.Total += 1
batch.Meta.Total++
if err := m.rclient.HIncrBy(m.getMetaKey(batch.Id), "total", 1).Err(); err != nil {
return fmt.Errorf("addJobToBatch: unable to modify total: %v", err)
}
batch.Meta.Pending += 1
batch.Meta.Pending++
if err := m.rclient.HIncrBy(m.getMetaKey(batch.Id), "pending", 1).Err(); err != nil {
return fmt.Errorf("addJobToBatch: unable to modify pending: %v", err)
}
Expand All @@ -456,19 +459,19 @@ func (m *batchManager) addJobToBatch(batch *batch) error {

func (m *batchManager) removeJobFromBatch(batch *batch, jobId string, success bool, isRetry bool) error {
if !isRetry {
batch.Meta.Pending -= 1
batch.Meta.Pending--
if err := m.rclient.HIncrBy(m.getMetaKey(batch.Id), "pending", -1).Err(); err != nil {
return fmt.Errorf("removeJobFromBatch: unable to modify pending: %v", err)
}

}
if success {
batch.Meta.Succeeded += 1
batch.Meta.Succeeded++
if err := m.rclient.HIncrBy(m.getMetaKey(batch.Id), "succeeded", 1).Err(); err != nil {
return fmt.Errorf("removeJobFromBatch: unable to modify succeeded: %v", err)
}
} else {
batch.Meta.Failed += 1
batch.Meta.Failed++
if err := m.rclient.HIncrBy(m.getMetaKey(batch.Id), "failed", 1).Err(); err != nil {
return fmt.Errorf("removeJobFromBatch: unable to modify failed: %v", err)
}
Expand All @@ -477,11 +480,11 @@ func (m *batchManager) removeJobFromBatch(batch *batch, jobId string, success bo
}

func (m *batchManager) areBatchJobsCompleted(batch *batch) bool {
return batch.Meta.Committed == true && batch.Meta.Pending == 0
return batch.Meta.Committed && batch.Meta.Pending == 0
}

func (m *batchManager) areBatchJobsSucceeded(batch *batch) bool {
return batch.Meta.Committed == true && batch.Meta.Succeeded == batch.Meta.Total
return batch.Meta.Committed && batch.Meta.Succeeded == batch.Meta.Total
}

func (m *batchManager) handleBatchJobsCompleted(batch *batch, parentsVisited map[string]bool) {
Expand Down
91 changes: 91 additions & 0 deletions batch/batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,73 @@ func TestBatchSuccess(t *testing.T) {

fetchedJob, err := cl.Fetch("default")
assert.Nil(t, fetchedJob)

_, err = batchSystem.batchManager.getBatch(b.Bid)

// ensure batch is removed
assert.Error(t, err)
assert.EqualError(t, err, "getBatch: no batch found")
})
}

func TestBatchSuccessWithoutSuccessCallback(t *testing.T) {
batchSystem := new(BatchSubsystem)
withServer(batchSystem, true, func(cl *client.Client) {
b := client.NewBatch(cl)

b.Complete = client.NewJob("batchDone", 1, "string", 3)
b.Description = "Test batch"

err := b.Jobs(func() error {
err := b.Push(client.NewJob("JobOne", 1))
assert.Nil(t, err)
err = b.Push(client.NewJob("JobTwo", 2))
assert.Nil(t, err)
return nil
})
assert.Nil(t, err)
assert.NotEqual(t, "", b.Bid)

time.Sleep(1 * time.Second)
batchData, err := batchSystem.batchManager.getBatch(b.Bid)
assert.Nil(t, err)
assert.Equal(t, 2, batchData.Meta.Total)

// job one
err = processJob(cl, true, func(job *client.Job) {
assert.Equal(t, 0, batchData.Meta.Succeeded)
})
assert.Nil(t, err)
assert.Equal(t, 1, batchData.Meta.Succeeded)
assert.False(t, batchSystem.batchManager.areBatchJobsCompleted(batchData))

// job two
err = processJob(cl, true, func(job *client.Job) {
assert.Equal(t, batchData.Meta.Succeeded, 1)
assert.Equal(t, batchData.Meta.Failed, 0)
})

assert.Nil(t, err)
assert.Equal(t, 2, batchData.Meta.Succeeded)
assert.Equal(t, 0, batchData.Meta.Failed)
assert.True(t, batchSystem.batchManager.areBatchJobsCompleted(batchData))

assert.Equal(t, "1", batchData.Meta.CompleteJobState)
// completeJob
err = processJob(cl, true, func(job *client.Job) {
assert.Equal(t, "batchDone", job.Type)
})
assert.Nil(t, err)
assert.Equal(t, "2", batchData.Meta.CompleteJobState)

fetchedJob, err := cl.Fetch("default")
assert.Nil(t, fetchedJob)

_, err = batchSystem.batchManager.getBatch(b.Bid)

// ensure batch is removed
assert.Error(t, err)
assert.EqualError(t, err, "getBatch: no batch found")
})
}

Expand Down Expand Up @@ -288,6 +355,30 @@ func TestBatchOptions(t *testing.T) {
})
}

func TestBatchBatchWithoutCallbacks(t *testing.T) {
batchSystem := new(BatchSubsystem)
withServer(batchSystem, true, func(cl *client.Client) {
b := client.NewBatch(cl)

err := b.Jobs(func() error {
err := b.Push(client.NewJob("JobOne", 1))
assert.Nil(t, err)

err = b.Push(client.NewJob("JobTwo", 1))
assert.Nil(t, err)

err = b.Push(client.NewJob("JobThree", 1))
assert.Nil(t, err)

err = b.Push(client.NewJob("JobFour", 1))
assert.Nil(t, err)
return nil
})
assert.Error(t, err)
assert.EqualError(t, err, "cannot create new batch: ERR success and/or a complete job callback must be included in batch creation")
})
}

func withServer(batchSystem *BatchSubsystem, enabled bool, runner func(cl *client.Client)) {
dir := "/tmp/batching_test.db"
defer os.RemoveAll(dir)
Expand Down
2 changes: 2 additions & 0 deletions batch/child_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,8 @@ func TestChildBatch(t *testing.T) {
err := b.Push(client.NewJob("A", 1))
assert.Nil(t, err)
batchA = client.NewBatch(cl)
batchA.Complete = client.NewJob("batchDone", 1, "string", 3)
batchA.Success = client.NewJob("batchSuccess", 2, "string", 4)
_, err = cl.BatchNew(batchA)
val, err := cl.Generic(fmt.Sprintf("BATCH CHILD %s %s", b.Bid, batchA.Bid))
assert.Nil(t, err)
Expand Down
5 changes: 5 additions & 0 deletions batch/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@ func (b *BatchSubsystem) batchCommand(c *server.Connection, s *server.Server, cm
complete = string(completeData)
}

if len(success) == 0 && len(complete) == 0 {
_ = c.Error(cmd, fmt.Errorf("success and/or a complete job callback must be included in batch creation"))
return
}

meta := b.batchManager.newBatchMeta(batchRequest.Description, success, complete, batchRequest.ChildSearchDepth)
batch, err := b.batchManager.newBatch(batchId, meta)
if err != nil {
Expand Down

0 comments on commit a5d76d4

Please sign in to comment.