Skip to content

Commit

Permalink
add OrderedStart Strategy
Browse files Browse the repository at this point in the history
  • Loading branch information
ajatprabha committed Aug 4, 2023
1 parent 10f68fb commit 1e4fa71
Show file tree
Hide file tree
Showing 5 changed files with 357 additions and 154 deletions.
55 changes: 55 additions & 0 deletions context.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package xrun

import (
"context"
"sync/atomic"
)

// SignalStarted signals that the component has started.
func SignalStarted(ctx context.Context) {
if n, ok := ctx.Value(ctxKeyNotify).(*notifyCtx); ok {
n.start()
}
}

type ctxKey int

const (
ctxKeyNotify ctxKey = iota
)

type notifyCtx struct {
parent context.Context
_started atomic.Bool
ch chan struct{}
}

func newNotifyCtx(parent context.Context) context.Context {
return context.WithValue(parent, ctxKeyNotify, &notifyCtx{
parent: parent,
ch: make(chan struct{}, 1),
})
}

func (n *notifyCtx) started() <-chan struct{} { return n.ch }

func (n *notifyCtx) start() {
if n._started.CompareAndSwap(false, true) {
close(n.ch)
}
}

func started(ctx context.Context) <-chan struct{} {
if n, ok := ctx.Value(ctxKeyNotify).(*notifyCtx); ok {
return n.started()
}

return closedCh()
}

func closedCh() <-chan struct{} {
ch := make(chan struct{}, 1)
defer close(ch)

return ch
}
10 changes: 10 additions & 0 deletions context_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package xrun

import (
"context"
"testing"
)

func Test_started_with_no_notifyCtx(t *testing.T) {
<-started(context.TODO())
}
51 changes: 41 additions & 10 deletions manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@ import (

// NewManager creates a Manager and applies provided Option
func NewManager(opts ...Option) *Manager {
m := &Manager{shutdownTimeout: NoTimeout}
m := &Manager{
shutdownTimeout: NoTimeout,
maxStartWait: defaultMaxStartWait,
}

for _, o := range opts {
o.apply(m)
Expand All @@ -24,13 +27,16 @@ func NewManager(opts ...Option) *Manager {
// Manager helps to run multiple components
// and waits for them to complete
type Manager struct {
mu sync.Mutex
strategy Strategy
maxStartWait time.Duration
mu sync.Mutex

internalCtx context.Context
internalCancel context.CancelFunc

components []Component
wg sync.WaitGroup
components []Component
componentCancels []context.CancelFunc
wg sync.WaitGroup

started bool
stopping bool
Expand Down Expand Up @@ -87,20 +93,38 @@ func (m *Manager) start() {
defer m.mu.Unlock()
m.started = true

for _, c := range m.components {
if c != nil {
m.startComponent(c)
switch m.strategy {
case OrderedStart:
for _, c := range m.components {
if c != nil {
notify := newNotifyCtx(m.internalCtx)
nCtx, cancel := context.WithCancel(notify)
m.startComponent(c, nCtx)
m.componentCancels = append([]context.CancelFunc{cancel}, m.componentCancels...)

// Block until the component has started or the timeout has elapsed.
select {
case <-started(notify):
case <-time.After(m.maxStartWait):
}
}
}
case DefaultStartStop:
for _, c := range m.components {
if c != nil {
m.startComponent(c, m.internalCtx)
}
}
}
}

func (m *Manager) startComponent(c Component) {
func (m *Manager) startComponent(c Component, ctx context.Context) {
m.wg.Add(1)

go func() {
defer m.wg.Done()

if err := c.Run(m.internalCtx); err != nil && !errors.Is(err, context.Canceled) {
if err := c.Run(ctx); err != nil && !errors.Is(err, context.Canceled) {
m.errChan <- err
}
}()
Expand All @@ -110,7 +134,14 @@ func (m *Manager) engageStopProcedure() error {
shutdownCancel := m.cancelFunc()
defer shutdownCancel()

m.internalCancel()
switch m.strategy {
case OrderedStart:
for _, cancel := range m.componentCancels {
cancel()
}
case DefaultStartStop:
m.internalCancel()
}

m.mu.Lock()
defer m.mu.Unlock()
Expand Down
Loading

0 comments on commit 1e4fa71

Please sign in to comment.