Skip to content

Commit

Permalink
fix concurrency
Browse files Browse the repository at this point in the history
  • Loading branch information
ToniRamirezM committed Aug 13, 2024
1 parent 4245e6a commit 1b32510
Showing 1 changed file with 30 additions and 17 deletions.
47 changes: 30 additions & 17 deletions aggregator/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,9 @@ type Aggregator struct {
finalProof chan finalProofMsg
verifyingProof bool

witnessRetrievalChan chan *state.DBBatch
activeWitnessRetrievalWorkers int
witnessRetrievalChan chan *state.DBBatch
activeWitnessRetrievalWorkersMutex sync.Mutex

srv *grpc.Server
ctx context.Context
Expand Down Expand Up @@ -165,21 +167,23 @@ func New(
}

a := &Aggregator{
cfg: cfg,
state: stateInterface,
etherman: etherman,
ethTxManager: ethTxManager,
streamClient: streamClient,
l1Syncr: l1Syncr,
profitabilityChecker: profitabilityChecker,
stateDBMutex: &sync.Mutex{},
timeSendFinalProofMutex: &sync.RWMutex{},
timeCleanupLockedProofs: cfg.CleanupLockedProofsInterval,
finalProof: make(chan finalProofMsg),
currentBatchStreamData: []byte{},
aggLayerClient: aggLayerClient,
sequencerPrivateKey: sequencerPrivateKey,
witnessRetrievalChan: make(chan *state.DBBatch, cfg.MaxWitnessRetrievalWorkers),
cfg: cfg,
state: stateInterface,
etherman: etherman,
ethTxManager: ethTxManager,
streamClient: streamClient,
l1Syncr: l1Syncr,
profitabilityChecker: profitabilityChecker,
stateDBMutex: &sync.Mutex{},
timeSendFinalProofMutex: &sync.RWMutex{},
timeCleanupLockedProofs: cfg.CleanupLockedProofsInterval,
finalProof: make(chan finalProofMsg),
currentBatchStreamData: []byte{},
aggLayerClient: aggLayerClient,
sequencerPrivateKey: sequencerPrivateKey,
witnessRetrievalChan: make(chan *state.DBBatch, cfg.MaxWitnessRetrievalWorkers),
activeWitnessRetrievalWorkers: 0,
activeWitnessRetrievalWorkersMutex: sync.Mutex{},
}

log.Infof("MaxWitnessRetrievalWorkers set to %d", cfg.MaxWitnessRetrievalWorkers)
Expand All @@ -197,7 +201,12 @@ func (a *Aggregator) retrieveWitnesses() {
case <-a.ctx.Done():
return
case dbBatch := <-a.witnessRetrievalChan:
go a.retrieveWitness(dbBatch)
a.activeWitnessRetrievalWorkersMutex.Lock()
if a.activeWitnessRetrievalWorkers < a.cfg.MaxWitnessRetrievalWorkers {
go a.retrieveWitness(dbBatch)
a.activeWitnessRetrievalWorkers++
}
a.activeWitnessRetrievalWorkersMutex.Unlock()
}
}
}
Expand Down Expand Up @@ -226,6 +235,10 @@ func (a *Aggregator) retrieveWitness(dbBatch *state.DBBatch) {

success = true
}

a.activeWitnessRetrievalWorkersMutex.Lock()
a.activeWitnessRetrievalWorkers--
a.activeWitnessRetrievalWorkersMutex.Unlock()
}

func (a *Aggregator) handleReorg(reorgData synchronizer.ReorgExecutionResult) {
Expand Down

0 comments on commit 1b32510

Please sign in to comment.