Skip to content

Commit

Permalink
Revert "cleanup"
Browse files Browse the repository at this point in the history
This reverts commit e2a259a.
  • Loading branch information
stevenlanders committed Nov 15, 2023
1 parent e2a259a commit 42c9e33
Showing 1 changed file with 21 additions and 21 deletions.
42 changes: 21 additions & 21 deletions tasks/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ type deliverTxTask struct {
ValidateCh chan struct{}
}

func (dt *deliverTxTask) Reset() {
func (dt *deliverTxTask) Increment() {
dt.Incarnation++
dt.Status = statusPending
dt.Response = nil
dt.Abort = nil
Expand All @@ -62,10 +63,6 @@ func (dt *deliverTxTask) Reset() {
dt.ValidateCh = make(chan struct{}, 1)
}

func (dt *deliverTxTask) Increment() {
dt.Incarnation++
}

// Scheduler processes tasks concurrently
type Scheduler interface {
ProcessAll(ctx sdk.Context, reqs []*sdk.DeliverTxEntry) ([]types.ResponseDeliverTx, error)
Expand Down Expand Up @@ -247,8 +244,8 @@ func (s *scheduler) validateTask(ctx sdk.Context, task *deliverTxTask) bool {
_, span := s.traceSpan(ctx, "SchedulerValidate", task)
defer span.End()

if s.shouldRerun(task) {
task.Reset()
if ok := s.shouldRerun(task); ok {
task.Increment()
return false
}
return true
Expand All @@ -275,8 +272,7 @@ func (s *scheduler) validateAll(ctx sdk.Context, tasks []*deliverTxTask) ([]*del
wg.Add(1)
go func(task *deliverTxTask) {
defer wg.Done()
if !s.validateTask(ctx, task) {
task.Increment()
if ok := s.validateTask(ctx, task); !ok {
mx.Lock()
res = append(res, task)
mx.Unlock()
Expand Down Expand Up @@ -304,10 +300,9 @@ func (s *scheduler) executeAll(ctx sdk.Context, tasks []*deliverTxTask) error {
workers = len(tasks)
}

// validationWg waits for all validations to complete
// validations happen in separate goroutines in order to wait on previous index
validationWg := &sync.WaitGroup{}
validationWg.Add(len(tasks))
wg := &sync.WaitGroup{}

wg.Add(len(tasks))
for i := 0; i < workers; i++ {
grp.Go(func() error {
for {
Expand All @@ -318,21 +313,27 @@ func (s *scheduler) executeAll(ctx sdk.Context, tasks []*deliverTxTask) error {
if !ok {
return nil
}
s.prepareAndRunTask(validationWg, ctx, task)
s.prepareAndRunTask(wg, ctx, task)
}
}
})
}

for _, task := range tasks {
ch <- task
}
close(ch)
grp.Go(func() error {
defer close(ch)
for _, task := range tasks {
select {
case <-gCtx.Done():
return gCtx.Err()
case ch <- task:
}
}
return nil
})

if err := grp.Wait(); err != nil {
return err
}
validationWg.Wait()
wg.Wait()

return nil
}
Expand All @@ -346,7 +347,6 @@ func (s *scheduler) prepareAndRunTask(wg *sync.WaitGroup, ctx sdk.Context, task
go func() {
defer wg.Done()
defer close(task.ValidateCh)
// wait on previous task to finish validation
if task.Index > 0 {
<-s.allTasks[task.Index-1].ValidateCh
}
Expand Down

0 comments on commit 42c9e33

Please sign in to comment.