Skip to content

Commit

Permalink
modify the variable type and function name
Browse files Browse the repository at this point in the history
  • Loading branch information
claymega committed Oct 23, 2024
1 parent dd4a918 commit 877789c
Show file tree
Hide file tree
Showing 7 changed files with 23 additions and 28 deletions.
25 changes: 11 additions & 14 deletions op-node/rollup/derive/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ type DerivationPipeline struct {

// Index of the stage that is currently being reset.
// >= len(stages) if no additional resetting is required
resetting int32
resetting atomic.Int32
stages []ResettableStage

// Special stages to keep track of
Expand All @@ -72,7 +72,7 @@ type DerivationPipeline struct {
resetL2Safe eth.L2BlockRef
resetSysConfig eth.SystemConfig
// Its value is only 1 or 0
engineIsReset int32
engineIsReset atomic.Bool

metrics Metrics
}
Expand Down Expand Up @@ -102,7 +102,6 @@ func NewDerivationPipeline(log log.Logger, rollupCfg *rollup.Config, l1Fetcher L
rollupCfg: rollupCfg,
l1Fetcher: l1Fetcher,
altDA: altDA,
resetting: 0,
stages: stages,
metrics: metrics,
traversal: l1Traversal,
Expand All @@ -114,16 +113,14 @@ func NewDerivationPipeline(log log.Logger, rollupCfg *rollup.Config, l1Fetcher L
// DerivationReady returns true if the derivation pipeline is ready to be used.
// When it's being reset its state is inconsistent, and should not be used externally.
func (dp *DerivationPipeline) DerivationReady() bool {
engineIsReset := atomic.LoadInt32(&dp.engineIsReset)
resetting := atomic.LoadInt32(&dp.resetting)
return engineIsReset != 0 && resetting > 0
return dp.engineIsReset.Load() && dp.resetting.Load() > 0
}

func (dp *DerivationPipeline) Reset() {
atomic.StoreInt32(&dp.resetting, 0)
dp.resetting.Store(0)
dp.resetSysConfig = eth.SystemConfig{}
dp.resetL2Safe = eth.L2BlockRef{}
atomic.StoreInt32(&dp.engineIsReset, 0)
dp.engineIsReset.Store(false)
}

// Origin is the L1 block of the inner-most stage of the derivation pipeline,
Expand All @@ -149,8 +146,8 @@ func (dp *DerivationPipeline) Step(ctx context.Context, pendingSafeHead eth.L2Bl
}()

// if any stages need to be reset, do that first.
if atomic.LoadInt32(&dp.resetting) < int32(len(dp.stages)) {
if atomic.LoadInt32(&dp.engineIsReset) == 0 {
if dp.resetting.Load() < int32(len(dp.stages)) {
if !dp.engineIsReset.Load() {
return nil, NewResetError(errors.New("cannot continue derivation until Engine has been reset"))
}

Expand All @@ -163,13 +160,13 @@ func (dp *DerivationPipeline) Step(ctx context.Context, pendingSafeHead eth.L2Bl
}
}

resetting := atomic.LoadInt32(&dp.resetting)
resetting := dp.resetting.Load()
if err := dp.stages[resetting].Reset(ctx, dp.origin, dp.resetSysConfig); err == io.EOF {
dp.log.Debug("reset of stage completed", "stage", resetting, "origin", dp.origin)
atomic.AddInt32(&dp.resetting, 1)
dp.resetting.Add(1)
return nil, nil
} else if err != nil {
return nil, fmt.Errorf("stage %d failed resetting: %w", atomic.LoadInt32(&dp.resetting), err)
return nil, fmt.Errorf("stage %d failed resetting: %w", dp.resetting.Load(), err)
} else {
return nil, nil
}
Expand Down Expand Up @@ -244,5 +241,5 @@ func (dp *DerivationPipeline) initialReset(ctx context.Context, resetL2Safe eth.
}

func (dp *DerivationPipeline) ConfirmEngineReset() {
atomic.StoreInt32(&dp.engineIsReset, 1)
dp.engineIsReset.Store(true)
}
2 changes: 1 addition & 1 deletion op-node/rollup/driver/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ func (s *Driver) eventLoop() {
select {
case <-sequencerCh:
s.Emitter.Emit(sequencing.SequencerActionEvent{})
case <-s.sequencer.NextActionStep():
case <-s.sequencer.CheckNextAction():
case <-altSyncTicker.C:
// Check if there is a gap in the current unsafe payload queue.
ctx, cancel := context.WithTimeout(s.driverCtx, time.Second*2)
Expand Down
3 changes: 0 additions & 3 deletions op-node/rollup/event/executor_parallel.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,9 @@ package event

import (
"context"
"fmt"
"slices"
"sync"
"sync/atomic"
"time"
)

type ParallelExec struct {
Expand All @@ -23,7 +21,6 @@ func NewParallelExec() *ParallelExec {
func (p *ParallelExec) Add(d Executable, opts *ExecutorOpts) (leaveExecutor func()) {
p.mu.Lock()
defer p.mu.Unlock()
fmt.Println(time.Now().Format("2006-01-02 15:04:05.00000"), ", add handler parallel, handler name:,", opts.Capacity)
w := newWorker(p, d, opts)
p.workers = append(p.workers, w)
return w.leave
Expand Down
14 changes: 7 additions & 7 deletions op-node/rollup/event/system.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,15 +53,15 @@ type systemActor struct {
leaveExecutor func()

// 0 if event does not originate from Deriver-handling of another event
currentEvent uint64
currentEvent atomic.Uint64
}

// Emit is called by the end-user
func (r *systemActor) Emit(ev Event) {
if r.ctx.Err() != nil {
return
}
r.sys.emit(r.name, atomic.LoadUint64(&r.currentEvent), ev)
r.sys.emit(r.name, r.currentEvent.Load(), ev)
}

// RunEvent is called by the events executor.
Expand All @@ -74,13 +74,13 @@ func (r *systemActor) RunEvent(ev AnnotatedEvent) {
return
}

prev := atomic.LoadUint64(&r.currentEvent)
prev := r.currentEvent.Load()
start := time.Now()
atomic.StoreUint64(&r.currentEvent, r.sys.recordDerivStart(r.name, ev, start))
r.currentEvent.Store(r.sys.recordDerivStart(r.name, ev, start))
effect := r.deriv.OnEvent(ev.Event)
elapsed := time.Since(start)
r.sys.recordDerivEnd(r.name, ev, atomic.LoadUint64(&r.currentEvent), start, elapsed, effect)
atomic.StoreUint64(&r.currentEvent, prev)
r.sys.recordDerivEnd(r.name, ev, r.currentEvent.Load(), start, elapsed, effect)
r.currentEvent.Store(prev)
}

// Sys is the canonical implementation of System.
Expand Down Expand Up @@ -130,7 +130,7 @@ func (s *Sys) Register(name string, deriver Deriver, opts *RegisterOpts) Emitter
if opts.Emitter.Limiting {
limitedCallback := opts.Emitter.OnLimited
em = NewLimiter(ctx, r, opts.Emitter.Rate, opts.Emitter.Burst, func() {
r.sys.recordRateLimited(name, atomic.LoadUint64(&r.currentEvent))
r.sys.recordRateLimited(name, r.currentEvent.Load())
if limitedCallback != nil {
limitedCallback()
}
Expand Down
2 changes: 1 addition & 1 deletion op-node/rollup/sequencing/disabled.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func (ds DisabledSequencer) NextAction() (t time.Time, ok bool) {
return time.Time{}, false
}

func (ds DisabledSequencer) NextActionStep() <-chan struct{} { return nil }
func (ds DisabledSequencer) CheckNextAction() <-chan struct{} { return nil }

func (ds DisabledSequencer) Active() bool {
return false
Expand Down
3 changes: 2 additions & 1 deletion op-node/rollup/sequencing/iface.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ type SequencerIface interface {
event.Deriver
// NextAction returns when the sequencer needs to do the next change, and iff it should do so.
NextAction() (t time.Time, ok bool)
NextActionStep() <-chan struct{}
// NewNextAction is channel to await changes in the value of `nextActionOK`
CheckNextAction() <-chan struct{}
Active() bool
Init(ctx context.Context, active bool) error
Start(ctx context.Context, head common.Hash) error
Expand Down
2 changes: 1 addition & 1 deletion op-node/rollup/sequencing/sequencer.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ func (d *Sequencer) onBuildStarted(x engine.BuildStartedEvent) {
}
}

func (d *Sequencer) NextActionStep() <-chan struct{} {
func (d *Sequencer) CheckNextAction() <-chan struct{} {
return d.nextActionCh
}

Expand Down

0 comments on commit 877789c

Please sign in to comment.