forked from Layr-Labs/optimism-archived
-
Notifications
You must be signed in to change notification settings - Fork 4
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
op-node: parallel events executor (#4)
* run op-node in parallel-event mode * add metrics and fix data race * delete debug log * delete metrics * recover executor_global_test.go * modify the variable type and function name * modify comment
- Loading branch information
Showing
10 changed files
with
212 additions
and
33 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,113 @@ | ||
package event | ||
|
||
import ( | ||
"context" | ||
"slices" | ||
"sync" | ||
"sync/atomic" | ||
) | ||
|
||
type ParallelExec struct { | ||
workers []*worker | ||
mu sync.RWMutex | ||
} | ||
|
||
var _ Executor = (*ParallelExec)(nil) | ||
|
||
func NewParallelExec() *ParallelExec { | ||
return &ParallelExec{} | ||
} | ||
|
||
func (p *ParallelExec) Add(d Executable, opts *ExecutorOpts) (leaveExecutor func()) { | ||
p.mu.Lock() | ||
defer p.mu.Unlock() | ||
w := newWorker(p, d, opts) | ||
p.workers = append(p.workers, w) | ||
return w.leave | ||
} | ||
|
||
func (p *ParallelExec) remove(w *worker) { | ||
p.mu.Lock() | ||
defer p.mu.Unlock() | ||
// Linear search to delete is fine, | ||
// since we delete much less frequently than we process events with these. | ||
for i, v := range p.workers { | ||
if v == w { | ||
p.workers = slices.Delete(p.workers, i, i+1) | ||
return | ||
} | ||
} | ||
} | ||
|
||
func (p *ParallelExec) Enqueue(ev AnnotatedEvent) error { | ||
p.mu.RLock() | ||
defer p.mu.RUnlock() | ||
for _, w := range p.workers { | ||
w.enqueue(ev) // this will block if its capacity is full, providing back-pressure to the Enqueue caller | ||
} | ||
return nil | ||
} | ||
|
||
type worker struct { | ||
// ctx signals when the worker is exiting. | ||
// No additional events will be accepted after cancellation. | ||
ctx context.Context | ||
cancel context.CancelFunc | ||
|
||
// closed as channel is closed upon exit of the run loop | ||
closed chan struct{} | ||
|
||
// ingress is the buffered channel of events to process | ||
ingress chan AnnotatedEvent | ||
|
||
// d is the underlying executable to process events on | ||
d Executable | ||
|
||
// p is a reference to the ParallelExec that owns this worker. | ||
// The worker removes itself from this upon leaving. | ||
p atomic.Pointer[ParallelExec] | ||
} | ||
|
||
func newWorker(p *ParallelExec, d Executable, opts *ExecutorOpts) *worker { | ||
ctx, cancel := context.WithCancel(context.Background()) | ||
w := &worker{ | ||
ctx: ctx, | ||
cancel: cancel, | ||
closed: make(chan struct{}), | ||
ingress: make(chan AnnotatedEvent, opts.Capacity), | ||
d: d, | ||
} | ||
w.p.Store(p) | ||
go w.run() | ||
return w | ||
} | ||
|
||
func (w *worker) enqueue(ev AnnotatedEvent) { | ||
select { | ||
case <-w.ctx.Done(): | ||
case w.ingress <- ev: | ||
} | ||
|
||
} | ||
|
||
func (w *worker) leave() { | ||
w.cancel() | ||
if old := w.p.Swap(nil); old != nil { | ||
// remove from worker pool | ||
old.remove(w) | ||
} | ||
// wait for run loop to exit | ||
<-w.closed | ||
} | ||
|
||
func (w *worker) run() { | ||
for { | ||
select { | ||
case <-w.ctx.Done(): | ||
close(w.closed) | ||
return | ||
case ev := <-w.ingress: | ||
w.d.RunEvent(ev) | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.