Skip to content

Commit

Permalink
Use a mutex instead of atomics.
Browse files Browse the repository at this point in the history
  • Loading branch information
winder committed Aug 17, 2023
1 parent a7c4d6b commit cd408aa
Showing 1 changed file with 14 additions and 5 deletions.
19 changes: 14 additions & 5 deletions conduit/pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"path"
"runtime/pprof"
"sync"
"sync/atomic"
"time"

"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -64,6 +63,9 @@ type pipelineImpl struct {
completeCallback []conduit.OnCompleteFunc

pipelineMetadata state

statusMu sync.Mutex
status Status
}

func (p *pipelineImpl) Error() error {
Expand Down Expand Up @@ -375,6 +377,10 @@ func (p *pipelineImpl) Init() error {
go p.startMetricsServer()
}

p.statusMu.Lock()
defer p.statusMu.Unlock()
p.status.Round = p.pipelineMetadata.NextRound

return err
}

Expand Down Expand Up @@ -506,6 +512,9 @@ func (p *pipelineImpl) Start() {

// Increment Round, update metadata
p.pipelineMetadata.NextRound++
p.statusMu.Lock()
p.status.Round = p.pipelineMetadata.NextRound
p.statusMu.Unlock()
err = p.pipelineMetadata.encodeToFile(p.cfg.ConduitArgs.ConduitDataDir)
if err != nil {
p.logger.Errorf("%v", err)
Expand Down Expand Up @@ -540,10 +549,10 @@ func (p *pipelineImpl) Wait() {
}

func (p *pipelineImpl) Status() (Status, error) {
rnd := atomic.LoadUint64(&p.pipelineMetadata.NextRound)
return Status{
Round: rnd,
}, nil
p.statusMu.Lock()
ret := p.status
p.statusMu.Unlock()
return ret, nil

Check warning on line 555 in conduit/pipeline/pipeline.go

View check run for this annotation

Codecov / codecov/patch

conduit/pipeline/pipeline.go#L551-L555

Added lines #L551 - L555 were not covered by tests
}

// start a http server serving /metrics
Expand Down

0 comments on commit cd408aa

Please sign in to comment.