Skip to content

Commit

Permalink
fix: race condition on async
Browse files Browse the repository at this point in the history
  • Loading branch information
marcobrotas-prowarehouse committed Oct 16, 2024
1 parent a315c35 commit 3d8389b
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 11 deletions.
22 changes: 15 additions & 7 deletions async.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ type Async struct {
data any
done *flag
pending chan bool
notify *flag
listener func(as *Async)
err error
}
Expand All @@ -19,6 +20,7 @@ func newAsync(hdl Handler, cmd Command) *Async {
hdl: hdl,
cmd: cmd,
done: newFlag(),
notify: newFlag(),
pending: make(chan bool, 1),
}
}
Expand All @@ -42,34 +44,40 @@ func (as *Async) await() {

func (as *Async) notifyDone() {
if as.done.enable() {
as.pending <- true
as.notifyListener()
as.pending <- true
}
}

func (as *Async) fail(err error) {
as.Lock()
as.err = err
as.notifyDone()
as.Unlock()
}

func (as *Async) success(data any) {
as.Lock()
as.data = data
as.notifyDone()
as.Unlock()
}

func (as *Async) setListener(listener func(as *Async)) {
as.Lock()
as.listener = listener
as.Unlock()
if as.done.enabled() {
as.notifyListener()
listener(as)
return
}
if as.notify.enable() {
as.listener = listener
}
as.Unlock()

}

func (as *Async) notifyListener() {
as.Lock()
if as.listener != nil {
if as.notify.disable() {
as.listener(as)
}
as.Unlock()
}
12 changes: 8 additions & 4 deletions async_list.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package command

import "errors"
import (
"errors"
"fmt"
)

// Async is the struct returned from async commands.
type AsyncList struct {
Expand Down Expand Up @@ -48,15 +51,16 @@ func (asl *AsyncList) AwaitIterator() (<-chan AsyncResult, error) {
return results, nil
}

func (asl *AsyncList) generateListener(i int, asyncResults chan<- AsyncResult, processed *counter, total uint32) func(as *Async) {
func (asl *AsyncList) generateListener(i int, results chan<- AsyncResult, processed *counter, total uint32) func(as *Async) {
return func(as *Async) {
asyncResults <- AsyncResult{
results <- AsyncResult{
Index: i,
Data: as.data,
Err: as.err,
}
if processed.increment() == total {
close(asyncResults)
close(results)
fmt.Println("CHANNEL CLOSED")
}
}
}

0 comments on commit 3d8389b

Please sign in to comment.