Skip to content

Commit

Permalink
PBM-886: sync channel set/unset/close ops
Browse files Browse the repository at this point in the history
  • Loading branch information
defbin committed Sep 23, 2024
1 parent 827d30d commit 6721557
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 1 deletion.
10 changes: 10 additions & 0 deletions pbm/oplog/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"io"
"sync"
"time"

"go.mongodb.org/mongo-driver/bson"
Expand Down Expand Up @@ -31,6 +32,7 @@ func (t Timeline) String() string {
// OplogBackup is used for reading the Mongodb oplog
type OplogBackup struct {
cl *mongo.Client
mu sync.Mutex
stopC chan struct{}
start primitive.Timestamp
end primitive.Timestamp
Expand Down Expand Up @@ -68,7 +70,10 @@ func (ot *OplogBackup) WriteTo(w io.Writer) (int64, error) {
return 0, errors.Errorf("oplog TailingSpan should be set, have start: %v, end: %v", ot.start, ot.end)
}

ot.mu.Lock()
ot.stopC = make(chan struct{})
ot.mu.Unlock()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

Expand All @@ -79,7 +84,9 @@ func (ot *OplogBackup) WriteTo(w io.Writer) (int64, error) {
cancel()
}

ot.mu.Lock()
ot.stopC = nil
ot.mu.Unlock()
}()

cur, err := ot.cl.Database("local").Collection("oplog.rs").Find(ctx,
Expand Down Expand Up @@ -145,6 +152,9 @@ func (ot *OplogBackup) WriteTo(w io.Writer) (int64, error) {
}

func (ot *OplogBackup) Cancel() {
ot.mu.Lock()
defer ot.mu.Unlock()

if c := ot.stopC; c != nil {
select {
case _, ok := <-c:
Expand Down
2 changes: 1 addition & 1 deletion pbm/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ func Upload(

err := r.Close()
if err != nil {
return 0, errors.Wrap(err, "cancel backup: close reader")
return 0, errors.Wrap(err, "cancel upload: close reader")
}
return 0, ErrCancelled
case <-saveDone:
Expand Down

0 comments on commit 6721557

Please sign in to comment.