diff --git a/batch/batch.go b/batch/batch.go index 70e8407..fa01f41 100644 --- a/batch/batch.go +++ b/batch/batch.go @@ -438,16 +438,19 @@ func (m *batchManager) updateJobCallbackState(batch *batch, callbackType string, if err := m.rclient.Set(m.getCompleteJobStateKey(batch.Id), state, timeout).Err(); err != nil { return fmt.Errorf("updateJobCallbackState: could not set completed_st: %v", err) } + if _, areChildrenSucceeded := m.areChildrenFinished(batch); areChildrenSucceeded && batch.Meta.SuccessJob == "" && state == CallbackJobSucceeded { + m.removeBatch(batch) + } } return nil } func (m *batchManager) addJobToBatch(batch *batch) error { - batch.Meta.Total += 1 + batch.Meta.Total++ if err := m.rclient.HIncrBy(m.getMetaKey(batch.Id), "total", 1).Err(); err != nil { return fmt.Errorf("addJobToBatch: unable to modify total: %v", err) } - batch.Meta.Pending += 1 + batch.Meta.Pending++ if err := m.rclient.HIncrBy(m.getMetaKey(batch.Id), "pending", 1).Err(); err != nil { return fmt.Errorf("addJobToBatch: unable to modify pending: %v", err) } @@ -456,19 +459,19 @@ func (m *batchManager) addJobToBatch(batch *batch) error { func (m *batchManager) removeJobFromBatch(batch *batch, jobId string, success bool, isRetry bool) error { if !isRetry { - batch.Meta.Pending -= 1 + batch.Meta.Pending-- if err := m.rclient.HIncrBy(m.getMetaKey(batch.Id), "pending", -1).Err(); err != nil { return fmt.Errorf("removeJobFromBatch: unable to modify pending: %v", err) } } if success { - batch.Meta.Succeeded += 1 + batch.Meta.Succeeded++ if err := m.rclient.HIncrBy(m.getMetaKey(batch.Id), "succeeded", 1).Err(); err != nil { return fmt.Errorf("removeJobFromBatch: unable to modify succeeded: %v", err) } } else { - batch.Meta.Failed += 1 + batch.Meta.Failed++ if err := m.rclient.HIncrBy(m.getMetaKey(batch.Id), "failed", 1).Err(); err != nil { return fmt.Errorf("removeJobFromBatch: unable to modify failed: %v", err) } @@ -477,11 +480,11 @@ func (m *batchManager) removeJobFromBatch(batch *batch, jobId string, success bo } func (m *batchManager) areBatchJobsCompleted(batch *batch) bool { - return batch.Meta.Committed == true && batch.Meta.Pending == 0 + return batch.Meta.Committed && batch.Meta.Pending == 0 } func (m *batchManager) areBatchJobsSucceeded(batch *batch) bool { - return batch.Meta.Committed == true && batch.Meta.Succeeded == batch.Meta.Total + return batch.Meta.Committed && batch.Meta.Succeeded == batch.Meta.Total } func (m *batchManager) handleBatchJobsCompleted(batch *batch, parentsVisited map[string]bool) { diff --git a/batch/batch_test.go b/batch/batch_test.go index bf90d94..c44d264 100644 --- a/batch/batch_test.go +++ b/batch/batch_test.go @@ -73,6 +73,73 @@ func TestBatchSuccess(t *testing.T) { fetchedJob, err := cl.Fetch("default") assert.Nil(t, fetchedJob) + + _, err = batchSystem.batchManager.getBatch(b.Bid) + + // ensure batch is removed + assert.Error(t, err) + assert.EqualError(t, err, "getBatch: no batch found") + }) +} + +func TestBatchSuccessWithoutSuccessCallback(t *testing.T) { + batchSystem := new(BatchSubsystem) + withServer(batchSystem, true, func(cl *client.Client) { + b := client.NewBatch(cl) + + b.Complete = client.NewJob("batchDone", 1, "string", 3) + b.Description = "Test batch" + + err := b.Jobs(func() error { + err := b.Push(client.NewJob("JobOne", 1)) + assert.Nil(t, err) + err = b.Push(client.NewJob("JobTwo", 2)) + assert.Nil(t, err) + return nil + }) + assert.Nil(t, err) + assert.NotEqual(t, "", b.Bid) + + time.Sleep(1 * time.Second) + batchData, err := batchSystem.batchManager.getBatch(b.Bid) + assert.Nil(t, err) + assert.Equal(t, 2, batchData.Meta.Total) + + // job one + err = processJob(cl, true, func(job *client.Job) { + assert.Equal(t, 0, batchData.Meta.Succeeded) + }) + assert.Nil(t, err) + assert.Equal(t, 1, batchData.Meta.Succeeded) + assert.False(t, batchSystem.batchManager.areBatchJobsCompleted(batchData)) + + // job two + err = processJob(cl, true, func(job *client.Job) { + assert.Equal(t, batchData.Meta.Succeeded, 1) + assert.Equal(t, batchData.Meta.Failed, 0) + }) + + assert.Nil(t, err) + assert.Equal(t, 2, batchData.Meta.Succeeded) + assert.Equal(t, 0, batchData.Meta.Failed) + assert.True(t, batchSystem.batchManager.areBatchJobsCompleted(batchData)) + + assert.Equal(t, "1", batchData.Meta.CompleteJobState) + // completeJob + err = processJob(cl, true, func(job *client.Job) { + assert.Equal(t, "batchDone", job.Type) + }) + assert.Nil(t, err) + assert.Equal(t, "2", batchData.Meta.CompleteJobState) + + fetchedJob, err := cl.Fetch("default") + assert.Nil(t, fetchedJob) + + _, err = batchSystem.batchManager.getBatch(b.Bid) + + // ensure batch is removed + assert.Error(t, err) + assert.EqualError(t, err, "getBatch: no batch found") }) } @@ -288,6 +355,30 @@ func TestBatchOptions(t *testing.T) { }) } +func TestBatchBatchWithoutCallbacks(t *testing.T) { + batchSystem := new(BatchSubsystem) + withServer(batchSystem, true, func(cl *client.Client) { + b := client.NewBatch(cl) + + err := b.Jobs(func() error { + err := b.Push(client.NewJob("JobOne", 1)) + assert.Nil(t, err) + + err = b.Push(client.NewJob("JobTwo", 1)) + assert.Nil(t, err) + + err = b.Push(client.NewJob("JobThree", 1)) + assert.Nil(t, err) + + err = b.Push(client.NewJob("JobFour", 1)) + assert.Nil(t, err) + return nil + }) + assert.Error(t, err) + assert.EqualError(t, err, "cannot create new batch: ERR success and/or a complete job callback must be included in batch creation") + }) +} + func withServer(batchSystem *BatchSubsystem, enabled bool, runner func(cl *client.Client)) { dir := "/tmp/batching_test.db" defer os.RemoveAll(dir) diff --git a/batch/child_test.go b/batch/child_test.go index e8f81f4..e80edd2 100644 --- a/batch/child_test.go +++ b/batch/child_test.go @@ -397,6 +397,8 @@ func TestChildBatch(t *testing.T) { err := b.Push(client.NewJob("A", 1)) assert.Nil(t, err) batchA = client.NewBatch(cl) + batchA.Complete = client.NewJob("batchDone", 1, "string", 3) + batchA.Success = client.NewJob("batchSuccess", 2, "string", 4) _, err = cl.BatchNew(batchA) val, err := cl.Generic(fmt.Sprintf("BATCH CHILD %s %s", b.Bid, batchA.Bid)) assert.Nil(t, err) diff --git a/batch/commands.go b/batch/commands.go index 05f5ca5..516e00d 100644 --- a/batch/commands.go +++ b/batch/commands.go @@ -59,6 +59,11 @@ func (b *BatchSubsystem) batchCommand(c *server.Connection, s *server.Server, cm complete = string(completeData) } + if len(success) == 0 && len(complete) == 0 { + _ = c.Error(cmd, fmt.Errorf("success and/or a complete job callback must be included in batch creation")) + return + } + meta := b.batchManager.newBatchMeta(batchRequest.Description, success, complete, batchRequest.ChildSearchDepth) batch, err := b.batchManager.newBatch(batchId, meta) if err != nil {