diff --git a/aggregator/aggregator.go b/aggregator/aggregator.go index ad51bb3..346f1bd 100644 --- a/aggregator/aggregator.go +++ b/aggregator/aggregator.go @@ -81,7 +81,9 @@ type Aggregator struct { finalProof chan finalProofMsg verifyingProof bool - witnessRetrievalChan chan *state.DBBatch + activeWitnessRetrievalWorkers int + witnessRetrievalChan chan *state.DBBatch + activeWitnessRetrievalWorkersMutex sync.Mutex srv *grpc.Server ctx context.Context @@ -165,21 +167,23 @@ func New( } a := &Aggregator{ - cfg: cfg, - state: stateInterface, - etherman: etherman, - ethTxManager: ethTxManager, - streamClient: streamClient, - l1Syncr: l1Syncr, - profitabilityChecker: profitabilityChecker, - stateDBMutex: &sync.Mutex{}, - timeSendFinalProofMutex: &sync.RWMutex{}, - timeCleanupLockedProofs: cfg.CleanupLockedProofsInterval, - finalProof: make(chan finalProofMsg), - currentBatchStreamData: []byte{}, - aggLayerClient: aggLayerClient, - sequencerPrivateKey: sequencerPrivateKey, - witnessRetrievalChan: make(chan *state.DBBatch, cfg.MaxWitnessRetrievalWorkers), + cfg: cfg, + state: stateInterface, + etherman: etherman, + ethTxManager: ethTxManager, + streamClient: streamClient, + l1Syncr: l1Syncr, + profitabilityChecker: profitabilityChecker, + stateDBMutex: &sync.Mutex{}, + timeSendFinalProofMutex: &sync.RWMutex{}, + timeCleanupLockedProofs: cfg.CleanupLockedProofsInterval, + finalProof: make(chan finalProofMsg), + currentBatchStreamData: []byte{}, + aggLayerClient: aggLayerClient, + sequencerPrivateKey: sequencerPrivateKey, + witnessRetrievalChan: make(chan *state.DBBatch, cfg.MaxWitnessRetrievalWorkers), + activeWitnessRetrievalWorkers: 0, + activeWitnessRetrievalWorkersMutex: sync.Mutex{}, } log.Infof("MaxWitnessRetrievalWorkers set to %d", cfg.MaxWitnessRetrievalWorkers) @@ -197,7 +201,12 @@ func (a *Aggregator) retrieveWitnesses() { case <-a.ctx.Done(): return case dbBatch := <-a.witnessRetrievalChan: - go a.retrieveWitness(dbBatch) + a.activeWitnessRetrievalWorkersMutex.Lock() + if a.activeWitnessRetrievalWorkers < a.cfg.MaxWitnessRetrievalWorkers { + go a.retrieveWitness(dbBatch) + a.activeWitnessRetrievalWorkers++ + } + a.activeWitnessRetrievalWorkersMutex.Unlock() } } } @@ -226,6 +235,10 @@ func (a *Aggregator) retrieveWitness(dbBatch *state.DBBatch) { success = true } + + a.activeWitnessRetrievalWorkersMutex.Lock() + a.activeWitnessRetrievalWorkers-- + a.activeWitnessRetrievalWorkersMutex.Unlock() } func (a *Aggregator) handleReorg(reorgData synchronizer.ReorgExecutionResult) {