diff --git a/config/config.go b/config/config.go index dff018eb..011a90b3 100644 --- a/config/config.go +++ b/config/config.go @@ -11,6 +11,7 @@ import ( "time" "github.com/tsaikd/KDGoLib/errutil" + "github.com/tsaikd/gogstash/config/ctxutil" "github.com/tsaikd/gogstash/config/goglog" "github.com/tsaikd/gogstash/config/logevent" "golang.org/x/sync/errgroup" @@ -23,6 +24,7 @@ var ( ErrorUnmarshalJSONConfig = errutil.NewFactory("Failed unmarshalling config in JSON format") ErrorUnmarshalYAMLConfig = errutil.NewFactory("Failed unmarshalling config in YAML format") ErrorTimeout1 = errutil.NewFactory("timeout: %v") + ErrorInvalidState = errutil.NewFactory("Invalid state for pause/resume") ) // Config contains all config @@ -47,6 +49,10 @@ type Config struct { chOutDebug MsgChan // channel from output to debug ctx context.Context eg *errgroup.Group + + state int32 + signalPause *ctxutil.Broadcaster + signalResume *ctxutil.Broadcaster } var defaultConfig = Config{ @@ -115,6 +121,9 @@ func initConfig(config *Config) { if config.DebugChannel { config.chOutDebug = make(MsgChan, config.ChannelSize) } + config.state = stateNormal + config.signalPause = ctxutil.NewBroadcaster() + config.signalResume = ctxutil.NewBroadcaster() } // Start config in goroutines diff --git a/config/control.go b/config/control.go new file mode 100644 index 00000000..d8f39d5a --- /dev/null +++ b/config/control.go @@ -0,0 +1,36 @@ +package config + +import ( + "context" + "sync/atomic" +) + +type Control interface { + RequestPause(ctx context.Context) error + RequestResume(ctx context.Context) error + PauseSignal() <-chan struct{} + ResumeSignal() <-chan struct{} +} + +func (t *Config) RequestPause(ctx context.Context) error { + if atomic.CompareAndSwapInt32(&t.state, stateNormal, statePause) { + return t.signalPause.Broadcast(ctx) + } else { + return ErrorInvalidState.New(nil) + } +} +func (t *Config) RequestResume(ctx context.Context) error { + if atomic.CompareAndSwapInt32(&t.state, statePause, stateNormal) { + return t.signalResume.Broadcast(ctx) + } else { + return ErrorInvalidState.New(nil) + } +} + +func (t *Config) PauseSignal() <-chan struct{} { return t.signalPause.Channel() } +func (t *Config) ResumeSignal() <-chan struct{} { return t.signalResume.Channel() } + +const ( + stateNormal = iota + statePause +) diff --git a/config/ctxutil/broadcaster.go b/config/ctxutil/broadcaster.go new file mode 100644 index 00000000..4b0dcfc0 --- /dev/null +++ b/config/ctxutil/broadcaster.go @@ -0,0 +1,69 @@ +package ctxutil + +import ( + "context" + + "github.com/subchen/go-trylock/v2" +) + +type Broadcaster struct { + mutex trylock.TryLocker + channel chan struct{} +} + +func NewBroadcaster() *Broadcaster { + return &Broadcaster{ + mutex: trylock.New(), + channel: make(chan struct{}), + } +} + +func (t *Broadcaster) Wait(ctx context.Context) error { + select { + case <-ctx.Done(): + return context.DeadlineExceeded + case <-t.Channel(): + return nil + } +} + +func (t *Broadcaster) Channel() <-chan struct{} { + t.mutex.RLock() + defer t.mutex.RUnlock() + + return t.channel +} + +// Signal wakes one goroutine waiting on broadcaster, if there is any. +func (t *Broadcaster) Signal(ctx context.Context) error { + if !t.mutex.RTryLock(ctx) { + return context.DeadlineExceeded + } + defer t.mutex.RUnlock() + + select { + case <-ctx.Done(): + return context.DeadlineExceeded + case t.channel <- struct{}{}: + default: + } + + return nil +} + +// Broadcast wakes all goroutines waiting on broadcaster, if there is any. +func (t *Broadcaster) Broadcast(ctx context.Context) error { + newChannel := make(chan struct{}) + + if !t.mutex.TryLock(ctx) { + return context.DeadlineExceeded + } + channel := t.channel + t.channel = newChannel + t.mutex.Unlock() + + // send broadcast signal + close(channel) + + return nil +} diff --git a/config/ctxutil/broadcaster_test.go b/config/ctxutil/broadcaster_test.go new file mode 100644 index 00000000..ed79c14e --- /dev/null +++ b/config/ctxutil/broadcaster_test.go @@ -0,0 +1,53 @@ +package ctxutil + +import ( + "context" + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestBroadcaster(t *testing.T) { + t.Parallel() + assert := assert.New(t) + assert.NotNil(assert) + require := require.New(t) + require.NotNil(require) + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + cg := NewCancelGroup(ctx) + b := NewBroadcaster() + + var broadcastCount int32 + listenReady := make(chan struct{}, 1) + for i := 0; i < 5; i++ { + cg.Go(func(ctx context.Context) error { + listenReady <- struct{}{} + select { + case <-ctx.Done(): + t.Fatal("wait for broadcast signal timeout") + case <-b.Channel(): + atomic.AddInt32(&broadcastCount, 1) + } + + return nil + }) + } + for i := 0; i < 5; i++ { + <-listenReady + } + + require.False(Sleep(ctx, 500*time.Millisecond)) + require.NoError(b.Signal(ctx)) + require.False(Sleep(ctx, 500*time.Millisecond)) + require.EqualValues(1, atomic.LoadInt32(&broadcastCount)) + require.NoError(b.Broadcast(ctx)) + + require.NoError(cg.Wait()) + require.EqualValues(5, atomic.LoadInt32(&broadcastCount)) +} diff --git a/config/ctxutil/cancelGroup.go b/config/ctxutil/cancelGroup.go new file mode 100644 index 00000000..763b8165 --- /dev/null +++ b/config/ctxutil/cancelGroup.go @@ -0,0 +1,169 @@ +package ctxutil + +import ( + "context" + "sync" + "time" +) + +func NewCancelGroup(parent context.Context) *CancelGroup { + ctx, cancel := context.WithCancel(parent) + + return &CancelGroup{ + ctx: ctx, + cancel: cancel, + } +} + +type CancelGroup struct { + ctx context.Context + cancel func() + + mutex sync.Mutex + done chan error + + wg sync.WaitGroup + + errOnce sync.Once + err error +} + +func (t *CancelGroup) Wait() error { + t.wg.Wait() + t.cancel() + + return t.err +} + +func (t *CancelGroup) Done() <-chan error { + t.mutex.Lock() + if t.done == nil { + t.done = make(chan error) + go func() { + t.wg.Wait() + t.cancel() + t.done <- t.err + }() + } + d := t.done + t.mutex.Unlock() + + return d +} + +func (t *CancelGroup) Go(f func(context.Context) error) { + t.wg.Add(1) + + go func() { + defer t.wg.Done() + + if err := f(t.ctx); err != nil { + t.CancelError(err) + } + }() +} + +// GoCancel go with cancel +func (t *CancelGroup) GoCancel(f func(context.Context) error) context.CancelFunc { + t.wg.Add(1) + + ctx, cancel := context.WithCancel(t.ctx) + + go func() { + defer t.wg.Done() + + if err := f(ctx); err != nil { + t.CancelError(err) + } + }() + + return cancel +} + +// GoTimeout go with timeout +func (t *CancelGroup) GoTimeout(timeout time.Duration, f func(context.Context) error) context.CancelFunc { + t.wg.Add(1) + + ctx, cancel := context.WithTimeout(t.ctx, timeout) + + go func() { + defer t.wg.Done() + + if err := f(ctx); err != nil { + t.CancelError(err) + } + }() + + return cancel +} + +// Fork goroutine will disconnect context propagation +func (t *CancelGroup) Fork(f func(context.Context) error) { + t.wg.Add(1) + + go func() { + defer t.wg.Done() + + ctx := DisconnectContext(t.ctx) + + if err := f(ctx); err != nil { + t.CancelError(err) + } + }() +} + +// ForkTimeout fork with cancel +func (t *CancelGroup) ForkCancel(f func(context.Context) error) context.CancelFunc { + t.wg.Add(1) + + ctx, cancel := DisconnectContextWithCancel(t.ctx) + + go func() { + defer t.wg.Done() + + if err := f(ctx); err != nil { + t.CancelError(err) + } + }() + + return cancel +} + +// ForkTimeout fork with timeout +func (t *CancelGroup) ForkTimeout(timeout time.Duration, f func(context.Context) error) context.CancelFunc { + t.wg.Add(1) + + ctx, cancel := DisconnectContextWithTimeout(t.ctx, timeout) + + go func() { + defer t.wg.Done() + + if err := f(ctx); err != nil { + t.CancelError(err) + } + }() + + return cancel +} + +func (t *CancelGroup) Context() context.Context { + return t.ctx +} + +func (t *CancelGroup) Cancel() { + t.cancel() +} + +func (t *CancelGroup) CancelError(err error) { + t.errOnce.Do(func() { + t.err = err + t.cancel() + }) +} + +func (t *CancelGroup) Close() (err error) { + t.cancel() + t.wg.Wait() + + return t.err +} diff --git a/config/ctxutil/cancelGroup_test.go b/config/ctxutil/cancelGroup_test.go new file mode 100644 index 00000000..fc1db525 --- /dev/null +++ b/config/ctxutil/cancelGroup_test.go @@ -0,0 +1,203 @@ +package ctxutil + +import ( + "context" + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestCancelGroup(t *testing.T) { + t.Parallel() + assert := assert.New(t) + assert.NotNil(assert) + require := require.New(t) + require.NotNil(require) + + var count1 int64 + var count2 int64 + cg1 := NewCancelGroup(context.Background()) + defer func() { + require.NoError(cg1.Close()) + }() + cg2 := NewCancelGroup(context.Background()) + defer func() { + require.Error(cg2.Close()) + }() + + cg1.Go(func(ctx context.Context) error { + time.Sleep(400 * time.Millisecond) + atomic.AddInt64(&count1, 1) + cg1.Cancel() + + return nil + }) + cg1.Go(func(ctx context.Context) error { + select { + case <-ctx.Done(): + return nil + case <-time.After(time.Second): + atomic.AddInt64(&count1, 1) + } + + return nil + }) + + cg2.Go(func(ctx context.Context) error { + time.Sleep(200 * time.Millisecond) + atomic.AddInt64(&count2, 1) + + return context.DeadlineExceeded + }) + cg2.Go(func(ctx context.Context) error { + select { + case <-ctx.Done(): + return nil + case <-time.After(time.Second): + atomic.AddInt64(&count2, 1) + } + + return nil + }) + + var firstClose string + select { + case <-cg1.Done(): + firstClose = "cg1" + case <-cg2.Done(): + firstClose = "cg2" + } + + assert.EqualValues(0, atomic.LoadInt64(&count1)) + assert.EqualValues(1, atomic.LoadInt64(&count2)) + assert.EqualValues("cg2", firstClose) +} + +func TestCancelGroupDone(t *testing.T) { + t.Parallel() + assert := assert.New(t) + assert.NotNil(assert) + require := require.New(t) + require.NotNil(require) + + var count1 int64 + ctx, cancel := context.WithTimeout(context.Background(), 300*time.Millisecond) + defer cancel() + cg1 := NewCancelGroup(ctx) + defer func() { + require.NoError(cg1.Close()) + }() + + cg1.Go(func(ctx context.Context) error { + time.Sleep(100 * time.Millisecond) + atomic.AddInt64(&count1, 1) + + return nil + }) + + var err error + var path string + select { + case <-ctx.Done(): + path = "ctx" + case err = <-cg1.Done(): + path = "cg1" + } + + assert.EqualValues(1, atomic.LoadInt64(&count1)) + assert.EqualValues("cg1", path) + assert.NoError(err) + assert.NoError(cg1.Wait()) +} + +func TestCancelGroupFork(t *testing.T) { + t.Parallel() + assert := assert.New(t) + assert.NotNil(assert) + require := require.New(t) + require.NotNil(require) + + const step = 300 * time.Millisecond + var timeGo1 time.Time + var timeGo2 time.Time + var timeGo3 time.Time + var timeGo4 time.Time + var timeFork1 time.Time + var timeFork2 time.Time + var timeFork3 time.Time + var timeFork4 time.Time + ctx, cancel := context.WithTimeout(context.Background(), 3*step) + defer cancel() + cg := NewCancelGroup(ctx) + defer func() { + require.NoError(cg.Close()) + }() + start := time.Now() + + cg.Go(func(ctx context.Context) error { + Sleep(ctx, 1*step) + timeGo1 = time.Now() + + return nil + }) + cg.Go(func(ctx context.Context) error { + Sleep(ctx, 5*step) + timeGo2 = time.Now() + + return nil + }) + cancelGo3 := cg.GoCancel(func(ctx context.Context) error { + Sleep(ctx, 2*step) + timeGo3 = time.Now() + + return nil + }) + cancelGo3() + cg.GoTimeout(1*step, func(ctx context.Context) error { + Sleep(ctx, 5*step) + timeGo4 = time.Now() + + return nil + }) + + cg.Fork(func(ctx context.Context) error { + Sleep(ctx, 7*step) + timeFork1 = time.Now() + + return nil + }) + cancelFork2 := cg.ForkCancel(func(ctx context.Context) error { + <-ctx.Done() + timeFork2 = time.Now() + + return nil + }) + time.AfterFunc(9*step, cancelFork2) + cg.ForkTimeout(1*step, func(ctx context.Context) error { + <-ctx.Done() + timeFork3 = time.Now() + + return nil + }) + cg.ForkTimeout(5*step, func(ctx context.Context) error { + <-ctx.Done() + timeFork4 = time.Now() + + return nil + }) + + assert.NoError(cg.Wait()) + const delta = step >> 1 + assert.InDelta(1*step, timeGo1.Sub(start), float64(delta)) + assert.InDelta(3*step, timeGo2.Sub(start), float64(delta)) + assert.InDelta(0*step, timeGo3.Sub(start), float64(delta)) + assert.InDelta(1*step, timeGo4.Sub(start), float64(delta)) + assert.InDelta(7*step, timeFork1.Sub(start), float64(delta)) + assert.InDelta(9*step, timeFork2.Sub(start), float64(delta)) + assert.InDelta(1*step, timeFork3.Sub(start), float64(delta)) + assert.InDelta(5*step, timeFork4.Sub(start), float64(delta)) + assert.InDelta(9*step, time.Since(start), float64(delta)) +} diff --git a/config/ctxutil/disconnectContext.go b/config/ctxutil/disconnectContext.go new file mode 100644 index 00000000..1aa5123d --- /dev/null +++ b/config/ctxutil/disconnectContext.go @@ -0,0 +1,56 @@ +// https://rodaine.com/2020/07/break-context-cancellation-chain/ + +package ctxutil + +import ( + "context" + "time" +) + +func DisconnectContext(parent context.Context) context.Context { + return disconnectedContext{parent: parent} +} + +func DisconnectContextWithCancel(parent context.Context) (context.Context, context.CancelFunc) { + ctx := disconnectedContext{parent: parent} + + return context.WithCancel(ctx) +} + +func DisconnectContextWithTimeout( + parent context.Context, + timeout time.Duration, +) (context.Context, context.CancelFunc) { + ctx := disconnectedContext{parent: parent} + + return context.WithTimeout(ctx, timeout) +} + +// disconnectedContext looks very similar to the nonexported context.emptyCtx +// implementation from the standard library, with the exception of the parent's +// Value method being the only feature propagated. +type disconnectedContext struct { + parent context.Context +} + +// Deadline will erase any actual deadline from the parent, returning ok==false +func (ctx disconnectedContext) Deadline() (deadline time.Time, ok bool) { + return +} + +// Done will stop propagation of the parent context's done channel. Receiving +// on a nil channel will block forever. +func (ctx disconnectedContext) Done() <-chan struct{} { + return nil +} + +// Err will always return nil since there is no longer any cancellation +func (ctx disconnectedContext) Err() error { + return nil +} + +// Value behaves as normal, continuing up the chain to find a matching +// key-value pair. +func (ctx disconnectedContext) Value(key interface{}) interface{} { + return ctx.parent.Value(key) +} diff --git a/config/ctxutil/disconnectContext_test.go b/config/ctxutil/disconnectContext_test.go new file mode 100644 index 00000000..73007354 --- /dev/null +++ b/config/ctxutil/disconnectContext_test.go @@ -0,0 +1,94 @@ +package ctxutil_test + +import ( + "bytes" + "context" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/tsaikd/gogstash/config/ctxutil" +) + +func TestDisconnectContext(t *testing.T) { + t.Parallel() + assert := assert.New(t) + assert.NotNil(assert) + require := require.New(t) + require.NotNil(require) + + type key string + + ctx := context.Background() + ctx = context.WithValue(ctx, key("foo"), "bar") + ctx, cancel := context.WithTimeout(ctx, 500*time.Millisecond) + defer cancel() + ctx1, cancel1 := context.WithCancel(ctx) + defer cancel1() + buffer := SyncBuffer{} + + ctx2, cancel2 := ctxutil.DisconnectContextWithTimeout(ctx1, 300*time.Millisecond) + defer cancel2() + require.EqualValues("bar", ctx2.Value(key("foo"))) + ctx3, cancel3 := ctxutil.DisconnectContextWithTimeout(ctx1, 700*time.Millisecond) + defer cancel3() + + buffer.MustWriteString("1") + wg := sync.WaitGroup{} + + wg.Add(1) + go func() { + defer wg.Done() + <-ctx.Done() + buffer.MustWriteString("3") + }() + + wg.Add(1) + go func() { + defer wg.Done() + <-ctx1.Done() + buffer.MustWriteString("3") + }() + + wg.Add(1) + go func() { + defer wg.Done() + <-ctx2.Done() + buffer.MustWriteString("2") + }() + + wg.Add(1) + go func() { + defer wg.Done() + <-ctx3.Done() + buffer.MustWriteString("4") + }() + + wg.Wait() + + require.EqualValues("12334", buffer.String()) +} + +type SyncBuffer struct { + mutex sync.Mutex + buffer bytes.Buffer +} + +func (t *SyncBuffer) String() string { + t.mutex.Lock() + defer t.mutex.Unlock() + + return t.buffer.String() +} + +func (t *SyncBuffer) MustWriteString(text string) { + t.mutex.Lock() + defer t.mutex.Unlock() + + _, err := t.buffer.WriteString(text) + if err != nil { + panic(err) + } +} diff --git a/config/ctxutil/sleep.go b/config/ctxutil/sleep.go new file mode 100644 index 00000000..80fff9b2 --- /dev/null +++ b/config/ctxutil/sleep.go @@ -0,0 +1,20 @@ +package ctxutil + +import ( + "context" + "time" +) + +func Sleep(ctx context.Context, duration time.Duration) (done bool) { + if duration < 1 { + return false + } + timer := time.NewTimer(duration) + defer timer.Stop() + select { + case <-ctx.Done(): + return true + case <-timer.C: + return false + } +} diff --git a/config/ctxutil/sleep_test.go b/config/ctxutil/sleep_test.go new file mode 100644 index 00000000..6b3608d0 --- /dev/null +++ b/config/ctxutil/sleep_test.go @@ -0,0 +1,31 @@ +package ctxutil + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestSleep(t *testing.T) { + t.Parallel() + assert := assert.New(t) + assert.NotNil(assert) + require := require.New(t) + require.NotNil(require) + + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + + start := time.Now() + require.False(Sleep(ctx, 0)) + require.WithinDuration(start, time.Now(), 100*time.Millisecond) + + require.False(Sleep(ctx, 100*time.Millisecond)) + require.WithinDuration(start.Add(100*time.Millisecond), time.Now(), 100*time.Millisecond) + + require.True(Sleep(ctx, 2000*time.Millisecond)) + require.WithinDuration(start.Add(1000*time.Millisecond), time.Now(), 100*time.Millisecond) +} diff --git a/config/filter.go b/config/filter.go index 0f554d67..4de1b14c 100644 --- a/config/filter.go +++ b/config/filter.go @@ -22,7 +22,10 @@ type TypeFilterConfig interface { // IsConfigured returns whether common configuration has been setup func (f *FilterConfig) IsConfigured() bool { - return len(f.AddTags) != 0 || len(f.AddFields) != 0 || len(f.RemoveTags) != 0 || len(f.RemoveFields) != 0 + return len(f.AddTags) != 0 || + len(f.AddFields) != 0 || + len(f.RemoveTags) != 0 || + len(f.RemoveFields) != 0 } func (f *FilterConfig) CommonFilter( @@ -56,7 +59,7 @@ type FieldConfig struct { } // FilterHandler is a handler to regist filter module -type FilterHandler func(ctx context.Context, raw ConfigRaw) (TypeFilterConfig, error) +type FilterHandler func(ctx context.Context, raw ConfigRaw, control Control) (TypeFilterConfig, error) var ( mapFilterHandler = map[string]FilterHandler{} @@ -71,6 +74,7 @@ func RegistFilterHandler(name string, handler FilterHandler) { func GetFilters( ctx context.Context, filterRaw []ConfigRaw, + control Control, ) (filters []TypeFilterConfig, err error) { var filter TypeFilterConfig for _, raw := range filterRaw { @@ -86,7 +90,7 @@ func GetFilters( return filters, ErrorUnknownFilterType1.New(nil, raw["type"]) } - if filter, err = handler(ctx, raw); err != nil { + if filter, err = handler(ctx, raw, control); err != nil { return filters, ErrorInitFilterFailed1.New(err, raw) } @@ -97,7 +101,7 @@ func GetFilters( } func (t *Config) getFilters() (filters []TypeFilterConfig, err error) { - return GetFilters(t.ctx, t.FilterRaw) + return GetFilters(t.ctx, t.FilterRaw, t) } func (t *Config) startFilters() (err error) { diff --git a/config/filter_test.go b/config/filter_test.go index 2f3b85fa..c55bb83a 100644 --- a/config/filter_test.go +++ b/config/filter_test.go @@ -131,7 +131,7 @@ filter: assert.Equal(0, len(conf.OutputRaw)) // to have config updated - mapFilterHandler["whatever"] = func(ctx context.Context, raw ConfigRaw) (TypeFilterConfig, error) { + mapFilterHandler["whatever"] = func(ctx context.Context, raw ConfigRaw, control Control) (TypeFilterConfig, error) { conf := WhateverFilterConfig{} err := ReflectConfig(raw, &conf) if err != nil { diff --git a/config/input.go b/config/input.go index cebbf980..34e8167a 100644 --- a/config/input.go +++ b/config/input.go @@ -26,7 +26,7 @@ type InputConfig struct { } // InputHandler is a handler to regist input module -type InputHandler func(ctx context.Context, raw ConfigRaw) (TypeInputConfig, error) +type InputHandler func(ctx context.Context, raw ConfigRaw, control Control) (TypeInputConfig, error) var ( mapInputHandler = map[string]InputHandler{} @@ -52,7 +52,7 @@ func (t *Config) getInputs() (inputs []TypeInputConfig, err error) { return inputs, ErrorUnknownInputType1.New(nil, raw["type"]) } - if input, err = handler(t.ctx, raw); err != nil { + if input, err = handler(t.ctx, raw, t); err != nil { return inputs, ErrorInitInputFailed1.New(err, raw) } diff --git a/config/output.go b/config/output.go index 19beae52..efe7530e 100644 --- a/config/output.go +++ b/config/output.go @@ -28,7 +28,7 @@ type OutputConfig struct { } // OutputHandler is a handler to regist output module -type OutputHandler func(ctx context.Context, raw ConfigRaw) (TypeOutputConfig, error) +type OutputHandler func(ctx context.Context, raw ConfigRaw, control Control) (TypeOutputConfig, error) var ( mapOutputHandler = map[string]OutputHandler{} @@ -43,6 +43,7 @@ func RegistOutputHandler(name string, handler OutputHandler) { func GetOutputs( ctx context.Context, outputRaw []ConfigRaw, + control Control, ) (outputs []TypeOutputConfig, err error) { var output TypeOutputConfig for _, raw := range outputRaw { @@ -58,7 +59,7 @@ func GetOutputs( return outputs, ErrorUnknownOutputType1.New(nil, raw["type"]) } - if output, err = handler(ctx, raw); err != nil { + if output, err = handler(ctx, raw, control); err != nil { return outputs, ErrorInitOutputFailed1.New(err, raw) } @@ -69,7 +70,7 @@ func GetOutputs( } func (t *Config) getOutputs() (outputs []TypeOutputConfig, err error) { - return GetOutputs(t.ctx, t.OutputRaw) + return GetOutputs(t.ctx, t.OutputRaw, t) } func (t *Config) startOutputs() (err error) { diff --git a/filter/addfield/filteraddfield.go b/filter/addfield/filteraddfield.go index 34adabb1..d6a1f34d 100644 --- a/filter/addfield/filteraddfield.go +++ b/filter/addfield/filteraddfield.go @@ -30,7 +30,11 @@ func DefaultFilterConfig() FilterConfig { } // InitHandler initialize the filter plugin -func InitHandler(ctx context.Context, raw config.ConfigRaw) (config.TypeFilterConfig, error) { +func InitHandler( + ctx context.Context, + raw config.ConfigRaw, + control config.Control, +) (config.TypeFilterConfig, error) { conf := DefaultFilterConfig() if err := config.ReflectConfig(raw, &conf); err != nil { return nil, err @@ -40,7 +44,10 @@ func InitHandler(ctx context.Context, raw config.ConfigRaw) (config.TypeFilterCo } // Event the main filter event -func (f *FilterConfig) Event(ctx context.Context, event logevent.LogEvent) (logevent.LogEvent, bool) { +func (f *FilterConfig) Event( + ctx context.Context, + event logevent.LogEvent, +) (logevent.LogEvent, bool) { if _, ok := event.Extra[f.Key]; ok && !f.Overwrite { return event, false } diff --git a/filter/cond/filtercond.go b/filter/cond/filtercond.go index 33909ef2..1d8ffbab 100644 --- a/filter/cond/filtercond.go +++ b/filter/cond/filtercond.go @@ -108,7 +108,11 @@ func DefaultFilterConfig() FilterConfig { } // InitHandler initialize the filter plugin -func InitHandler(ctx context.Context, raw config.ConfigRaw) (config.TypeFilterConfig, error) { +func InitHandler( + ctx context.Context, + raw config.ConfigRaw, + control config.Control, +) (config.TypeFilterConfig, error) { conf := DefaultFilterConfig() err := config.ReflectConfig(raw, &conf) if err != nil { @@ -118,7 +122,7 @@ func InitHandler(ctx context.Context, raw config.ConfigRaw) (config.TypeFilterCo goglog.Logger.Warn("filter cond config condition empty, ignored") return &conf, nil } - conf.filters, err = config.GetFilters(ctx, conf.FilterRaw) + conf.filters, err = config.GetFilters(ctx, conf.FilterRaw, control) if err != nil { return nil, err } @@ -127,7 +131,7 @@ func InitHandler(ctx context.Context, raw config.ConfigRaw) (config.TypeFilterCo return &conf, nil } if len(conf.ElseFilterRaw) > 0 { - conf.elseFilters, err = config.GetFilters(ctx, conf.ElseFilterRaw) + conf.elseFilters, err = config.GetFilters(ctx, conf.ElseFilterRaw, control) if err != nil { return nil, err } @@ -137,7 +141,10 @@ func InitHandler(ctx context.Context, raw config.ConfigRaw) (config.TypeFilterCo } // Event the main filter event -func (f *FilterConfig) Event(ctx context.Context, event logevent.LogEvent) (logevent.LogEvent, bool) { +func (f *FilterConfig) Event( + ctx context.Context, + event logevent.LogEvent, +) (logevent.LogEvent, bool) { if f.expression != nil { ep := EventParameters{Event: &event} ret, err := f.expression.Eval(&ep) diff --git a/filter/cond/filtercond_test.go b/filter/cond/filtercond_test.go index fcb8625e..adbd546e 100644 --- a/filter/cond/filtercond_test.go +++ b/filter/cond/filtercond_test.go @@ -37,9 +37,9 @@ filter: key: foo value: bar `))) - require.Nil(err) - _, err = InitHandler(context.TODO(), conf.FilterRaw[0]) - require.NotNil(err) + require.NoError(err) + _, err = InitHandler(context.TODO(), conf.FilterRaw[0], nil) + require.Error(err) } func Test_filter_cond_module(t *testing.T) { @@ -123,7 +123,7 @@ filter: - type: mutate add_tag: ["added"] `))) - require.Nil(err) + require.NoError(err) require.NoError(conf.Start(context.Background())) conf.TestInputEvent(logevent.LogEvent{}) if output, err := conf.TestGetOutputEvent(300 * time.Millisecond); assert.NoError(err) { @@ -143,7 +143,7 @@ filter: - type: mutate add_tag: ["added"] `))) - require.Nil(err) + require.NoError(err) require.NoError(conf.Start(context.Background())) conf.TestInputEvent(logevent.LogEvent{}) if output, err := conf.TestGetOutputEvent(300 * time.Millisecond); assert.NoError(err) { diff --git a/filter/date/filterdate.go b/filter/date/filterdate.go index 534a8211..e51e8a3d 100644 --- a/filter/date/filterdate.go +++ b/filter/date/filterdate.go @@ -49,7 +49,11 @@ func DefaultFilterConfig() FilterConfig { } // InitHandler initialize the filter plugin -func InitHandler(ctx context.Context, raw config.ConfigRaw) (config.TypeFilterConfig, error) { +func InitHandler( + ctx context.Context, + raw config.ConfigRaw, + control config.Control, +) (config.TypeFilterConfig, error) { conf := DefaultFilterConfig() if err := config.ReflectConfig(raw, &conf); err != nil { return nil, err diff --git a/filter/geoip2/filtergeoip2.go b/filter/geoip2/filtergeoip2.go index d7595fe0..08cde4cc 100644 --- a/filter/geoip2/filtergeoip2.go +++ b/filter/geoip2/filtergeoip2.go @@ -60,7 +60,11 @@ func DefaultFilterConfig() FilterConfig { } // InitHandler initialize the filter plugin -func InitHandler(ctx context.Context, raw config.ConfigRaw) (config.TypeFilterConfig, error) { +func InitHandler( + ctx context.Context, + raw config.ConfigRaw, + control config.Control, +) (config.TypeFilterConfig, error) { conf := DefaultFilterConfig() err := config.ReflectConfig(raw, &conf) if err != nil { diff --git a/filter/gonx/filtergonx.go b/filter/gonx/filtergonx.go index a6be9450..227eab1c 100644 --- a/filter/gonx/filtergonx.go +++ b/filter/gonx/filtergonx.go @@ -48,7 +48,11 @@ func DefaultFilterConfig() FilterConfig { } // InitHandler initialize the filter plugin -func InitHandler(ctx context.Context, raw config.ConfigRaw) (config.TypeFilterConfig, error) { +func InitHandler( + ctx context.Context, + raw config.ConfigRaw, + control config.Control, +) (config.TypeFilterConfig, error) { conf := DefaultFilterConfig() err := config.ReflectConfig(raw, &conf) if err != nil { diff --git a/filter/grok/filtergrok.go b/filter/grok/filtergrok.go index 87648f2c..ac2071e0 100644 --- a/filter/grok/filtergrok.go +++ b/filter/grok/filtergrok.go @@ -45,7 +45,11 @@ func DefaultFilterConfig() FilterConfig { } // InitHandler initialize the filter plugin -func InitHandler(ctx context.Context, raw config.ConfigRaw) (config.TypeFilterConfig, error) { +func InitHandler( + ctx context.Context, + raw config.ConfigRaw, + control config.Control, +) (config.TypeFilterConfig, error) { conf := DefaultFilterConfig() err := config.ReflectConfig(raw, &conf) if err != nil { diff --git a/filter/hash/filterhash.go b/filter/hash/filterhash.go index 9d977f30..d287962b 100644 --- a/filter/hash/filterhash.go +++ b/filter/hash/filterhash.go @@ -83,7 +83,11 @@ var hashAlgos = []hashAlgo{ } // InitHandler initialize the filter plugin -func InitHandler(ctx context.Context, raw config.ConfigRaw) (config.TypeFilterConfig, error) { +func InitHandler( + ctx context.Context, + raw config.ConfigRaw, + control config.Control, +) (config.TypeFilterConfig, error) { conf := DefaultFilterConfig() if err := config.ReflectConfig(raw, &conf); err != nil { return nil, err diff --git a/filter/ip2location/filterip2location.go b/filter/ip2location/filterip2location.go index 23e28cba..d2f70ce9 100644 --- a/filter/ip2location/filterip2location.go +++ b/filter/ip2location/filterip2location.go @@ -112,7 +112,11 @@ func (fc *FilterConfig) initFsnotifyEventHandler() { } // InitHandler initialize the filter plugin -func InitHandler(ctx context.Context, raw config.ConfigRaw) (config.TypeFilterConfig, error) { +func InitHandler( + ctx context.Context, + raw config.ConfigRaw, + control config.Control, +) (config.TypeFilterConfig, error) { conf := DefaultFilterConfig() err := config.ReflectConfig(raw, &conf) if err != nil { diff --git a/filter/ip2proxy/filterip2proxy.go b/filter/ip2proxy/filterip2proxy.go index 0a5c8913..74a369d4 100644 --- a/filter/ip2proxy/filterip2proxy.go +++ b/filter/ip2proxy/filterip2proxy.go @@ -59,7 +59,11 @@ func DefaultFilterConfig() FilterConfig { } // InitHandler initialize the filter plugin -func InitHandler(ctx context.Context, raw config.ConfigRaw) (config.TypeFilterConfig, error) { +func InitHandler( + ctx context.Context, + raw config.ConfigRaw, + control config.Control, +) (config.TypeFilterConfig, error) { conf := DefaultFilterConfig() err := config.ReflectConfig(raw, &conf) if err != nil { diff --git a/filter/json/filterjson.go b/filter/json/filterjson.go index 5e72d6a3..8bd3251c 100644 --- a/filter/json/filterjson.go +++ b/filter/json/filterjson.go @@ -39,7 +39,11 @@ func DefaultFilterConfig() FilterConfig { } // InitHandler initialize the filter plugin -func InitHandler(ctx context.Context, raw config.ConfigRaw) (config.TypeFilterConfig, error) { +func InitHandler( + ctx context.Context, + raw config.ConfigRaw, + control config.Control, +) (config.TypeFilterConfig, error) { conf := DefaultFilterConfig() if err := config.ReflectConfig(raw, &conf); err != nil { return nil, err diff --git a/filter/kv/filterkv.go b/filter/kv/filterkv.go index 907626c9..36a20301 100644 --- a/filter/kv/filterkv.go +++ b/filter/kv/filterkv.go @@ -32,7 +32,11 @@ func DefaultFilterConfig() FilterConfig { } // InitHandler initialize the filter plugin -func InitHandler(ctx context.Context, raw config.ConfigRaw) (config.TypeFilterConfig, error) { +func InitHandler( + ctx context.Context, + raw config.ConfigRaw, + control config.Control, +) (config.TypeFilterConfig, error) { conf := DefaultFilterConfig() if err := config.ReflectConfig(raw, &conf); err != nil { return nil, err diff --git a/filter/lookuptable/lookup.go b/filter/lookuptable/lookup.go index 1c443d9e..99a3e675 100644 --- a/filter/lookuptable/lookup.go +++ b/filter/lookuptable/lookup.go @@ -57,7 +57,11 @@ func DefaultFilterConfig() FilterConfig { } // InitHandler initialize the filter plugin -func InitHandler(ctx context.Context, raw config.ConfigRaw) (config.TypeFilterConfig, error) { +func InitHandler( + ctx context.Context, + raw config.ConfigRaw, + control config.Control, +) (config.TypeFilterConfig, error) { conf := DefaultFilterConfig() err := config.ReflectConfig(raw, &conf) if err != nil { diff --git a/filter/mutate/filtermutate.go b/filter/mutate/filtermutate.go index 91a50db5..adc59e35 100644 --- a/filter/mutate/filtermutate.go +++ b/filter/mutate/filtermutate.go @@ -44,7 +44,11 @@ func DefaultFilterConfig() FilterConfig { } // InitHandler initialize the filter plugin -func InitHandler(ctx context.Context, raw config.ConfigRaw) (config.TypeFilterConfig, error) { +func InitHandler( + ctx context.Context, + raw config.ConfigRaw, + control config.Control, +) (config.TypeFilterConfig, error) { conf := DefaultFilterConfig() err := config.ReflectConfig(raw, &conf) if err != nil { diff --git a/filter/ratelimit/filterratelimit.go b/filter/ratelimit/filterratelimit.go index 0aca733b..9bf64234 100644 --- a/filter/ratelimit/filterratelimit.go +++ b/filter/ratelimit/filterratelimit.go @@ -35,7 +35,11 @@ func DefaultFilterConfig() FilterConfig { } // InitHandler initialize the filter plugin -func InitHandler(ctx context.Context, raw config.ConfigRaw) (config.TypeFilterConfig, error) { +func InitHandler( + ctx context.Context, + raw config.ConfigRaw, + control config.Control, +) (config.TypeFilterConfig, error) { conf := DefaultFilterConfig() if err := config.ReflectConfig(raw, &conf); err != nil { return nil, err diff --git a/filter/removefield/filterremovefield.go b/filter/removefield/filterremovefield.go index ca756930..4d5a481b 100644 --- a/filter/removefield/filterremovefield.go +++ b/filter/removefield/filterremovefield.go @@ -35,7 +35,11 @@ func DefaultFilterConfig() FilterConfig { } // InitHandler initialize the filter plugin -func InitHandler(ctx context.Context, raw config.ConfigRaw) (config.TypeFilterConfig, error) { +func InitHandler( + ctx context.Context, + raw config.ConfigRaw, + control config.Control, +) (config.TypeFilterConfig, error) { conf := DefaultFilterConfig() if err := config.ReflectConfig(raw, &conf); err != nil { return nil, err diff --git a/filter/typeconv/filtertypeconv.go b/filter/typeconv/filtertypeconv.go index 990955eb..b24aa4b2 100644 --- a/filter/typeconv/filtertypeconv.go +++ b/filter/typeconv/filtertypeconv.go @@ -47,7 +47,11 @@ func DefaultFilterConfig() FilterConfig { } // InitHandler initialize the filter plugin -func InitHandler(ctx context.Context, raw config.ConfigRaw) (config.TypeFilterConfig, error) { +func InitHandler( + ctx context.Context, + raw config.ConfigRaw, + control config.Control, +) (config.TypeFilterConfig, error) { conf := DefaultFilterConfig() if err := config.ReflectConfig(raw, &conf); err != nil { return nil, err diff --git a/filter/urlparam/filterurlparam.go b/filter/urlparam/filterurlparam.go index d4971b38..70e6a31a 100644 --- a/filter/urlparam/filterurlparam.go +++ b/filter/urlparam/filterurlparam.go @@ -52,7 +52,11 @@ func DefaultFilterConfig() FilterConfig { } // InitHandler initialize the filter plugin -func InitHandler(ctx context.Context, raw config.ConfigRaw) (config.TypeFilterConfig, error) { +func InitHandler( + ctx context.Context, + raw config.ConfigRaw, + control config.Control, +) (config.TypeFilterConfig, error) { conf := DefaultFilterConfig() err := config.ReflectConfig(raw, &conf) if err != nil { diff --git a/filter/useragent/filteruseragent.go b/filter/useragent/filteruseragent.go index 14553960..8e4dacd9 100644 --- a/filter/useragent/filteruseragent.go +++ b/filter/useragent/filteruseragent.go @@ -82,7 +82,11 @@ func DefaultFilterConfig() FilterConfig { } // InitHandler initialize the filter plugin -func InitHandler(ctx context.Context, raw config.ConfigRaw) (config.TypeFilterConfig, error) { +func InitHandler( + ctx context.Context, + raw config.ConfigRaw, + control config.Control, +) (config.TypeFilterConfig, error) { conf := DefaultFilterConfig() err := config.ReflectConfig(raw, &conf) if err != nil { diff --git a/go.mod b/go.mod index dd92f41c..55bf5ec1 100644 --- a/go.mod +++ b/go.mod @@ -46,6 +46,7 @@ require ( github.com/spf13/viper v1.6.3 // indirect github.com/streadway/amqp v0.0.0-20200108173154-1c71cc93ed71 github.com/stretchr/testify v1.7.0 + github.com/subchen/go-trylock/v2 v2.0.0 github.com/tengattack/jodatime v0.0.0-20180920000830-48b203d08145 github.com/tsaikd/KDGoLib v0.0.0-20191001134900-7f3cf518e07d github.com/ua-parser/uap-go v0.0.0-20200325213135-e1c09f13e2fe diff --git a/go.sum b/go.sum index e4c4d68f..38e4c06e 100644 --- a/go.sum +++ b/go.sum @@ -811,6 +811,8 @@ github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5 github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/subchen/go-trylock/v2 v2.0.0 h1:XAZYp/ZvkBFuvSPAeGM0TjbMby/mHoWnnLBAv2FidUw= +github.com/subchen/go-trylock/v2 v2.0.0/go.mod h1:jjSakPS+IvBCtFw5Fao9rQqdiCnF0ZrkzVkauvkZzLY= github.com/subosito/gotenv v1.2.0 h1:Slr1R9HxAlEKefgq5jn9U+DnETlIUa6HfgEzj0g5d7s= github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw= github.com/syndtr/gocapability v0.0.0-20170704070218-db04d3cc01c8/go.mod h1:hkRG7XYTFWNJGYcbNJQlaLq0fg1yr4J4t/NcTQtrfww= diff --git a/input/beats/inputbeats.go b/input/beats/inputbeats.go index 418ae16d..46042c50 100644 --- a/input/beats/inputbeats.go +++ b/input/beats/inputbeats.go @@ -54,7 +54,11 @@ func DefaultInputConfig() InputConfig { } // InitHandler initialize the input plugin -func InitHandler(ctx context.Context, raw config.ConfigRaw) (config.TypeInputConfig, error) { +func InitHandler( + ctx context.Context, + raw config.ConfigRaw, + control config.Control, +) (config.TypeInputConfig, error) { conf := DefaultInputConfig() err := config.ReflectConfig(raw, &conf) if err != nil { diff --git a/input/dockerlog/inputdockerlog.go b/input/dockerlog/inputdockerlog.go index 0daa67f1..2b3cec35 100644 --- a/input/dockerlog/inputdockerlog.go +++ b/input/dockerlog/inputdockerlog.go @@ -67,7 +67,11 @@ var ( ) // InitHandler initialize the input plugin -func InitHandler(ctx context.Context, raw config.ConfigRaw) (config.TypeInputConfig, error) { +func InitHandler( + ctx context.Context, + raw config.ConfigRaw, + control config.Control, +) (config.TypeInputConfig, error) { conf := DefaultInputConfig() err := config.ReflectConfig(raw, &conf) if err != nil { diff --git a/input/dockerstats/inputdockerstats.go b/input/dockerstats/inputdockerstats.go index 0343926f..1058b094 100644 --- a/input/dockerstats/inputdockerstats.go +++ b/input/dockerstats/inputdockerstats.go @@ -66,7 +66,11 @@ var ( ) // InitHandler initialize the input plugin -func InitHandler(ctx context.Context, raw config.ConfigRaw) (config.TypeInputConfig, error) { +func InitHandler( + ctx context.Context, + raw config.ConfigRaw, + control config.Control, +) (config.TypeInputConfig, error) { conf := DefaultInputConfig() err := config.ReflectConfig(raw, &conf) if err != nil { diff --git a/input/exec/inputexec.go b/input/exec/inputexec.go index ec1725e5..cfab6a69 100644 --- a/input/exec/inputexec.go +++ b/input/exec/inputexec.go @@ -53,7 +53,11 @@ var ( ) // InitHandler initialize the input plugin -func InitHandler(ctx context.Context, raw config.ConfigRaw) (config.TypeInputConfig, error) { +func InitHandler( + ctx context.Context, + raw config.ConfigRaw, + control config.Control, +) (config.TypeInputConfig, error) { conf := DefaultInputConfig() err := config.ReflectConfig(raw, &conf) if err != nil { diff --git a/input/file/inputfile.go b/input/file/inputfile.go index 6b0dc403..1a362d99 100644 --- a/input/file/inputfile.go +++ b/input/file/inputfile.go @@ -58,7 +58,11 @@ var ( ) // InitHandler initialize the input plugin -func InitHandler(ctx context.Context, raw config.ConfigRaw) (config.TypeInputConfig, error) { +func InitHandler( + ctx context.Context, + raw config.ConfigRaw, + control config.Control, +) (config.TypeInputConfig, error) { conf := DefaultInputConfig() err := config.ReflectConfig(raw, &conf) if err != nil { diff --git a/input/http/inputhttp.go b/input/http/inputhttp.go index 76377681..f0672e97 100644 --- a/input/http/inputhttp.go +++ b/input/http/inputhttp.go @@ -27,6 +27,7 @@ type InputConfig struct { URL string `json:"url"` Interval int `json:"interval,omitempty"` + control config.Control hostname string } @@ -44,13 +45,18 @@ func DefaultInputConfig() InputConfig { } // InitHandler initialize the input plugin -func InitHandler(ctx context.Context, raw config.ConfigRaw) (config.TypeInputConfig, error) { +func InitHandler( + ctx context.Context, + raw config.ConfigRaw, + control config.Control, +) (config.TypeInputConfig, error) { conf := DefaultInputConfig() err := config.ReflectConfig(raw, &conf) if err != nil { return nil, err } + conf.control = control if conf.hostname, err = os.Hostname(); err != nil { return nil, err } @@ -64,7 +70,10 @@ func InitHandler(ctx context.Context, raw config.ConfigRaw) (config.TypeInputCon } // Start wraps the actual function starting the plugin -func (t *InputConfig) Start(ctx context.Context, msgChan chan<- logevent.LogEvent) (err error) { +func (t *InputConfig) Start( + ctx context.Context, + msgChan chan<- logevent.LogEvent, +) (err error) { startChan := make(chan bool, 1) // startup tick ticker := time.NewTicker(time.Duration(t.Interval) * time.Second) defer ticker.Stop() @@ -77,6 +86,10 @@ func (t *InputConfig) Start(ctx context.Context, msgChan chan<- logevent.LogEven return nil case <-startChan: t.Request(ctx, msgChan) + case <-t.control.PauseSignal(): + // handling request pause signal + case <-t.control.ResumeSignal(): + // handling request resume signal case <-ticker.C: t.Request(ctx, msgChan) } diff --git a/input/httplisten/httplisten.go b/input/httplisten/httplisten.go index 9e7adb68..277dfa21 100644 --- a/input/httplisten/httplisten.go +++ b/input/httplisten/httplisten.go @@ -48,7 +48,11 @@ func DefaultInputConfig() InputConfig { } // InitHandler initialize the input plugin -func InitHandler(ctx context.Context, raw config.ConfigRaw) (config.TypeInputConfig, error) { +func InitHandler( + ctx context.Context, + raw config.ConfigRaw, + control config.Control, +) (config.TypeInputConfig, error) { conf := DefaultInputConfig() err := config.ReflectConfig(raw, &conf) if err != nil { @@ -132,15 +136,9 @@ func (i *InputConfig) Start(ctx context.Context, msgChan chan<- logevent.LogEven logger.Fatal(err) return } - err = http.Serve(l, nil) - if err != nil { - logger.Error(err) - } + logger.Trace(http.Serve(l, nil)) } else { - err = http.ListenAndServe(i.Address, nil) - if err != nil { - logger.Error(err) - } + logger.Trace(http.ListenAndServe(i.Address, nil)) } }() return nil diff --git a/input/kafka/inputkafka.go b/input/kafka/inputkafka.go index 4d6a8757..05de329d 100644 --- a/input/kafka/inputkafka.go +++ b/input/kafka/inputkafka.go @@ -47,7 +47,11 @@ func DefaultInputConfig() InputConfig { } // InitHandler initialize the input plugin -func InitHandler(ctx context.Context, raw config.ConfigRaw) (config.TypeInputConfig, error) { +func InitHandler( + ctx context.Context, + raw config.ConfigRaw, + control config.Control, +) (config.TypeInputConfig, error) { conf := DefaultInputConfig() err := config.ReflectConfig(raw, &conf) if err != nil { diff --git a/input/lorem/inputlorem.go b/input/lorem/inputlorem.go index 4b5a7247..f36bbe26 100644 --- a/input/lorem/inputlorem.go +++ b/input/lorem/inputlorem.go @@ -96,7 +96,11 @@ var ( ) // InitHandler initialize the input plugin -func InitHandler(ctx context.Context, raw config.ConfigRaw) (config.TypeInputConfig, error) { +func InitHandler( + ctx context.Context, + raw config.ConfigRaw, + control config.Control, +) (config.TypeInputConfig, error) { conf := DefaultInputConfig() err := config.ReflectConfig(raw, &conf) if err != nil { diff --git a/input/nats/inputnats.go b/input/nats/inputnats.go index d25e5b19..7a156ec8 100644 --- a/input/nats/inputnats.go +++ b/input/nats/inputnats.go @@ -49,7 +49,11 @@ var ( ) // InitHandler initialize the input plugin -func InitHandler(ctx context.Context, raw config.ConfigRaw) (config.TypeInputConfig, error) { +func InitHandler( + ctx context.Context, + raw config.ConfigRaw, + control config.Control, +) (config.TypeInputConfig, error) { conf := DefaultInputConfig() err := config.ReflectConfig(raw, &conf) if err != nil { diff --git a/input/nsq/inputnsq.go b/input/nsq/inputnsq.go index 5b81fe3d..db49f4c6 100644 --- a/input/nsq/inputnsq.go +++ b/input/nsq/inputnsq.go @@ -37,7 +37,11 @@ func DefaultInputConfig() InputConfig { } // InitHandler initialize the input plugin -func InitHandler(ctx context.Context, raw config.ConfigRaw) (config.TypeInputConfig, error) { +func InitHandler( + ctx context.Context, + raw config.ConfigRaw, + control config.Control, +) (config.TypeInputConfig, error) { conf := DefaultInputConfig() err := config.ReflectConfig(raw, &conf) if err != nil { diff --git a/input/redis/inputredis.go b/input/redis/inputredis.go index 29f51cd5..bc1e617c 100644 --- a/input/redis/inputredis.go +++ b/input/redis/inputredis.go @@ -58,7 +58,11 @@ var ( ) // InitHandler initialize the input plugin -func InitHandler(ctx context.Context, raw config.ConfigRaw) (config.TypeInputConfig, error) { +func InitHandler( + ctx context.Context, + raw config.ConfigRaw, + control config.Control, +) (config.TypeInputConfig, error) { conf := DefaultInputConfig() err := config.ReflectConfig(raw, &conf) if err != nil { diff --git a/input/socket/inputsocket.go b/input/socket/inputsocket.go index 32f8ff13..6ce75af7 100644 --- a/input/socket/inputsocket.go +++ b/input/socket/inputsocket.go @@ -55,7 +55,11 @@ var ( ) // InitHandler initialize the input plugin -func InitHandler(ctx context.Context, raw config.ConfigRaw) (config.TypeInputConfig, error) { +func InitHandler( + ctx context.Context, + raw config.ConfigRaw, + control config.Control, +) (config.TypeInputConfig, error) { conf := DefaultInputConfig() err := config.ReflectConfig(raw, &conf) if err != nil { diff --git a/output/amqp/outputamqp.go b/output/amqp/outputamqp.go index 5e0aa7c8..361c1a29 100644 --- a/output/amqp/outputamqp.go +++ b/output/amqp/outputamqp.go @@ -71,7 +71,11 @@ type amqpClient struct { } // InitHandler initialize the output plugin -func InitHandler(ctx context.Context, raw config.ConfigRaw) (config.TypeOutputConfig, error) { +func InitHandler( + ctx context.Context, + raw config.ConfigRaw, + control config.Control, +) (config.TypeOutputConfig, error) { conf := DefaultOutputConfig() if err := config.ReflectConfig(raw, &conf); err != nil { return nil, err diff --git a/output/cond/outputcond.go b/output/cond/outputcond.go index c611d689..2d9503f0 100644 --- a/output/cond/outputcond.go +++ b/output/cond/outputcond.go @@ -38,7 +38,11 @@ func DefaultOutputConfig() OutputConfig { } // InitHandler initialize the output plugin -func InitHandler(ctx context.Context, raw config.ConfigRaw) (config.TypeOutputConfig, error) { +func InitHandler( + ctx context.Context, + raw config.ConfigRaw, + control config.Control, +) (config.TypeOutputConfig, error) { conf := DefaultOutputConfig() err := config.ReflectConfig(raw, &conf) if err != nil { @@ -48,7 +52,7 @@ func InitHandler(ctx context.Context, raw config.ConfigRaw) (config.TypeOutputCo goglog.Logger.Warn("output cond config condition empty, ignored") return &conf, nil } - conf.outputs, err = config.GetOutputs(ctx, conf.OutputRaw) + conf.outputs, err = config.GetOutputs(ctx, conf.OutputRaw, control) if err != nil { return nil, err } @@ -57,7 +61,7 @@ func InitHandler(ctx context.Context, raw config.ConfigRaw) (config.TypeOutputCo return &conf, nil } if len(conf.ElseOutputRaw) > 0 { - conf.elseOutputs, err = config.GetOutputs(ctx, conf.ElseOutputRaw) + conf.elseOutputs, err = config.GetOutputs(ctx, conf.ElseOutputRaw, control) if err != nil { return nil, err } diff --git a/output/cond/outputcond_test.go b/output/cond/outputcond_test.go index 42adef26..22b9eab9 100644 --- a/output/cond/outputcond_test.go +++ b/output/cond/outputcond_test.go @@ -36,9 +36,9 @@ output: key: foo value: bar `))) - require.Nil(err) - _, err = InitHandler(context.TODO(), conf.OutputRaw[0]) - require.NotNil(err) + require.NoError(err) + _, err = InitHandler(context.TODO(), conf.OutputRaw[0], nil) + require.Error(err) } func Test_output_cond_module(t *testing.T) { diff --git a/output/elastic/outputelastic.go b/output/elastic/outputelastic.go index 121ed360..0af7e9ca 100644 --- a/output/elastic/outputelastic.go +++ b/output/elastic/outputelastic.go @@ -106,7 +106,11 @@ func (u *jsonDecoder) Decode(data []byte, v interface{}) error { } // InitHandler initialize the output plugin -func InitHandler(ctx context.Context, raw config.ConfigRaw) (config.TypeOutputConfig, error) { +func InitHandler( + ctx context.Context, + raw config.ConfigRaw, + control config.Control, +) (config.TypeOutputConfig, error) { conf := DefaultOutputConfig() err := config.ReflectConfig(raw, &conf) if err != nil { diff --git a/output/elastic/outputelastic_test.go b/output/elastic/outputelastic_test.go index b97c3fb0..e74c6788 100644 --- a/output/elastic/outputelastic_test.go +++ b/output/elastic/outputelastic_test.go @@ -45,9 +45,9 @@ output: document_id: "%{fieldstring}" bulk_actions: 0 `))) - require.Nil(err) + require.NoError(err) require.NotNil(conf) - _, err = InitHandler(ctx, conf.OutputRaw[0]) + _, err = InitHandler(ctx, conf.OutputRaw[0], nil) // expect error as certificate is not trusted by default require.Error(err) require.True(ErrorCreateClientFailed1.In(err), "%+v", err) @@ -63,9 +63,9 @@ output: bulk_actions: 0 ssl_certificate_validation: true `))) - require.Nil(err) + require.NoError(err) require.NotNil(conf) - _, err = InitHandler(ctx, conf.OutputRaw[0]) + _, err = InitHandler(ctx, conf.OutputRaw[0], nil) // again expect error as certificate is not trusted and we requested ssl_certificate_validation require.Error(err) require.True(ErrorCreateClientFailed1.In(err), "%+v", err) @@ -81,9 +81,9 @@ output: bulk_actions: 0 ssl_certificate_validation: false `))) - require.Nil(err) + require.NoError(err) require.NotNil(conf) - _, err = InitHandler(ctx, conf.OutputRaw[0]) + _, err = InitHandler(ctx, conf.OutputRaw[0], nil) // expect no error this time as ssl_certificate_validation is false require.NoError(err) } @@ -98,7 +98,7 @@ func TestResolveVars(t *testing.T) { defer ts.Close() err := os.Setenv("MYVAR", ts.URL) - require.Nil(err) + require.NoError(err) ctx := context.Background() conf, err := config.LoadFromYAML([]byte(strings.TrimSpace(` debugch: true @@ -109,10 +109,10 @@ output: document_id: "%{fieldstring}" bulk_actions: 0 `))) - require.Nil(err) + require.NoError(err) require.NotNil(conf) - resolvedConf, err := InitHandler(ctx, conf.OutputRaw[0]) - require.Nil(err) + resolvedConf, err := InitHandler(ctx, conf.OutputRaw[0], nil) + require.NoError(err) outputConf := resolvedConf.(*OutputConfig) require.Equal(ts.URL, outputConf.resolvedURLs[0]) } diff --git a/output/elasticv5/outputelastic.go b/output/elasticv5/outputelastic.go index db6e9250..e27146da 100644 --- a/output/elasticv5/outputelastic.go +++ b/output/elasticv5/outputelastic.go @@ -106,7 +106,11 @@ func (u *jsonDecoder) Decode(data []byte, v interface{}) error { } // InitHandler initialize the output plugin -func InitHandler(ctx context.Context, raw config.ConfigRaw) (config.TypeOutputConfig, error) { +func InitHandler( + ctx context.Context, + raw config.ConfigRaw, + control config.Control, +) (config.TypeOutputConfig, error) { conf := DefaultOutputConfig() err := config.ReflectConfig(raw, &conf) if err != nil { diff --git a/output/elasticv5/outputelastic_test.go b/output/elasticv5/outputelastic_test.go index 89a3b1fb..4fb33aa4 100644 --- a/output/elasticv5/outputelastic_test.go +++ b/output/elasticv5/outputelastic_test.go @@ -46,9 +46,9 @@ output: document_id: "%{fieldstring}" bulk_actions: 0 `))) - require.Nil(err) + require.NoError(err) require.NotNil(conf) - _, err = InitHandler(ctx, conf.OutputRaw[0]) + _, err = InitHandler(ctx, conf.OutputRaw[0], nil) // expect error as certificate is not trusted by default require.Error(err) require.True(ErrorCreateClientFailed1.In(err), "%+v", err) @@ -65,9 +65,9 @@ output: bulk_actions: 0 ssl_certificate_validation: true `))) - require.Nil(err) + require.NoError(err) require.NotNil(conf) - _, err = InitHandler(ctx, conf.OutputRaw[0]) + _, err = InitHandler(ctx, conf.OutputRaw[0], nil) // again expect error as certificate is not trusted and we requested ssl_certificate_validation require.Error(err) require.True(ErrorCreateClientFailed1.In(err), "%+v", err) @@ -84,9 +84,9 @@ output: bulk_actions: 0 ssl_certificate_validation: false `))) - require.Nil(err) + require.NoError(err) require.NotNil(conf) - _, err = InitHandler(ctx, conf.OutputRaw[0]) + _, err = InitHandler(ctx, conf.OutputRaw[0], nil) // expect no error this time as ssl_certificate_validation is false require.NoError(err) } @@ -101,7 +101,7 @@ func TestResolveVars(t *testing.T) { defer ts.Close() err := os.Setenv("MYVAR", ts.URL) - require.Nil(err) + require.NoError(err) ctx := context.Background() conf, err := config.LoadFromYAML([]byte(strings.TrimSpace(` debugch: true @@ -113,10 +113,10 @@ output: document_id: "%{fieldstring}" bulk_actions: 0 `))) - require.Nil(err) + require.NoError(err) require.NotNil(conf) - resolvedConf, err := InitHandler(ctx, conf.OutputRaw[0]) - require.Nil(err) + resolvedConf, err := InitHandler(ctx, conf.OutputRaw[0], nil) + require.NoError(err) outputConf := resolvedConf.(*OutputConfig) require.Equal(ts.URL, outputConf.resolvedURLs[0]) } diff --git a/output/email/outputemail.go b/output/email/outputemail.go index 6d5f7e83..e4ae743f 100644 --- a/output/email/outputemail.go +++ b/output/email/outputemail.go @@ -46,7 +46,11 @@ func DefaultOutputConfig() OutputConfig { } // InitHandler initialize the output plugin -func InitHandler(ctx context.Context, raw config.ConfigRaw) (config.TypeOutputConfig, error) { +func InitHandler( + ctx context.Context, + raw config.ConfigRaw, + control config.Control, +) (config.TypeOutputConfig, error) { conf := DefaultOutputConfig() if err := config.ReflectConfig(raw, &conf); err != nil { return nil, err diff --git a/output/file/outputfile.go b/output/file/outputfile.go index ff52f1bc..52073812 100644 --- a/output/file/outputfile.go +++ b/output/file/outputfile.go @@ -111,7 +111,11 @@ func parseAsIntOrOctal(input string) (result int, err error) { } // InitHandler initialize the output plugin -func InitHandler(ctx context.Context, raw config.ConfigRaw) (config.TypeOutputConfig, error) { +func InitHandler( + ctx context.Context, + raw config.ConfigRaw, + control config.Control, +) (config.TypeOutputConfig, error) { conf := DefaultOutputConfig() err := config.ReflectConfig(raw, &conf) if err != nil { diff --git a/output/file/outputfile_test.go b/output/file/outputfile_test.go index 83a35e40..0a82b6c5 100644 --- a/output/file/outputfile_test.go +++ b/output/file/outputfile_test.go @@ -29,7 +29,7 @@ output: - type: file `))) assert.Nil(err) - _, err = InitHandler(context.TODO(), conf.OutputRaw[0]) + _, err = InitHandler(context.TODO(), conf.OutputRaw[0], nil) assert.NotNil(err) // write_behavior is different from 'append' and 'overwrite' @@ -41,7 +41,7 @@ output: write_behavior: write_behavior `))) assert.Nil(err) - _, err = InitHandler(context.TODO(), conf.OutputRaw[0]) + _, err = InitHandler(context.TODO(), conf.OutputRaw[0], nil) assert.NotNil(err) // invalid file_mode @@ -53,7 +53,7 @@ output: file_mode: -1 `))) assert.Nil(err) - _, err = InitHandler(context.TODO(), conf.OutputRaw[0]) + _, err = InitHandler(context.TODO(), conf.OutputRaw[0], nil) assert.NotNil(err) // invalid file_mode @@ -65,7 +65,7 @@ output: file_mode: -1 `))) assert.Nil(err) - _, err = InitHandler(context.TODO(), conf.OutputRaw[0]) + _, err = InitHandler(context.TODO(), conf.OutputRaw[0], nil) assert.NotNil(err) // invalid dir_mode @@ -77,7 +77,7 @@ output: dir_mode: -1 `))) assert.Nil(err) - _, err = InitHandler(context.TODO(), conf.OutputRaw[0]) + _, err = InitHandler(context.TODO(), conf.OutputRaw[0], nil) assert.NotNil(err) // invalid flush_interval @@ -89,7 +89,7 @@ output: flush_interval: blah `))) assert.Nil(err) - _, err = InitHandler(context.TODO(), conf.OutputRaw[0]) + _, err = InitHandler(context.TODO(), conf.OutputRaw[0], nil) assert.NotNil(err) // test default values @@ -99,7 +99,7 @@ output: path: ` + testPath() + ` `))) assert.Nil(err) - c, err := InitHandler(context.TODO(), conf.OutputRaw[0]) + c, err := InitHandler(context.TODO(), conf.OutputRaw[0], nil) assert.Nil(err) config := c.(*OutputConfig) assert.Equal(defaultCreateIfDeleted, config.CreateIfDeleted) @@ -123,7 +123,7 @@ output: flush_interval: 1000 `))) assert.Nil(err) - c, err := InitHandler(context.TODO(), conf.OutputRaw[0]) + c, err := InitHandler(context.TODO(), conf.OutputRaw[0], nil) assert.Nil(err) config := c.(*OutputConfig) perm := os.FileMode(0640) @@ -170,7 +170,7 @@ output: file_mode: "` + strconv.Itoa(fileMode) + `" `))) assert.Nil(err) - c, err := InitHandler(context.TODO(), conf.OutputRaw[0]) + c, err := InitHandler(context.TODO(), conf.OutputRaw[0], nil) assert.Nil(err) config := c.(*OutputConfig) perm := os.FileMode(fileMode) @@ -218,7 +218,7 @@ output: codec: "test" `))) assert.Nil(err) - c, err := InitHandler(context.TODO(), conf.OutputRaw[0]) + c, err := InitHandler(context.TODO(), conf.OutputRaw[0], nil) assert.Nil(err) config := c.(*OutputConfig) perm := os.FileMode(fileMode) @@ -266,7 +266,7 @@ output: codec: "%{log}" `))) assert.Nil(err) - c, err := InitHandler(context.TODO(), conf.OutputRaw[0]) + c, err := InitHandler(context.TODO(), conf.OutputRaw[0], nil) assert.Nil(err) config := c.(*OutputConfig) perm := os.FileMode(fileMode) @@ -314,7 +314,7 @@ output: file_mode: "` + strconv.Itoa(fileMode) + `" `))) assert.Nil(err) - c, err := InitHandler(context.TODO(), conf.OutputRaw[0]) + c, err := InitHandler(context.TODO(), conf.OutputRaw[0], nil) assert.Nil(err) config := c.(*OutputConfig) perm := os.FileMode(fileMode) @@ -374,7 +374,7 @@ output: create_if_deleted: false `))) assert.Nil(err) - c, err := InitHandler(context.TODO(), conf.OutputRaw[0]) + c, err := InitHandler(context.TODO(), conf.OutputRaw[0], nil) assert.Nil(err) config := c.(*OutputConfig) perm := os.FileMode(fileMode) @@ -430,7 +430,7 @@ output: dir_mode: "` + strconv.Itoa(dirMode) + `" `))) assert.Nil(err) - c, err := InitHandler(context.TODO(), conf.OutputRaw[0]) + c, err := InitHandler(context.TODO(), conf.OutputRaw[0], nil) assert.Nil(err) config := c.(*OutputConfig) fPerm := os.FileMode(fileMode) @@ -482,7 +482,7 @@ output: dir_mode: "` + strconv.Itoa(dirMode) + `" `))) assert.Nil(err) - c, err := InitHandler(context.TODO(), conf.OutputRaw[0]) + c, err := InitHandler(context.TODO(), conf.OutputRaw[0], nil) assert.Nil(err) config := c.(*OutputConfig) fPerm := os.FileMode(fileMode) @@ -528,7 +528,7 @@ output: write_behavior: append `))) assert.Nil(err) - c, err := InitHandler(context.TODO(), conf.OutputRaw[0]) + c, err := InitHandler(context.TODO(), conf.OutputRaw[0], nil) assert.Nil(err) config := c.(*OutputConfig) perm := os.FileMode(0640) @@ -574,7 +574,7 @@ output: write_behavior: overwrite `))) assert.Nil(err) - c, err := InitHandler(context.TODO(), conf.OutputRaw[0]) + c, err := InitHandler(context.TODO(), conf.OutputRaw[0], nil) assert.Nil(err) config := c.(*OutputConfig) perm := os.FileMode(0640) @@ -620,7 +620,7 @@ output: flush_interval: 0 `))) assert.Nil(err) - c, err := InitHandler(context.TODO(), conf.OutputRaw[0]) + c, err := InitHandler(context.TODO(), conf.OutputRaw[0], nil) assert.Nil(err) config := c.(*OutputConfig) perm := os.FileMode(0640) diff --git a/output/http/outputhttp.go b/output/http/outputhttp.go index 97981397..51af8821 100644 --- a/output/http/outputhttp.go +++ b/output/http/outputhttp.go @@ -33,6 +33,7 @@ type OutputConfig struct { IgnoreSSL bool `json:"ignore_ssl" yaml:"ignore_ssl"` httpClient *http.Client + control config.Control } // DefaultOutputConfig returns an OutputConfig struct with default values @@ -48,7 +49,11 @@ func DefaultOutputConfig() OutputConfig { } // InitHandler initialize the output plugin -func InitHandler(ctx context.Context, raw config.ConfigRaw) (config.TypeOutputConfig, error) { +func InitHandler( + ctx context.Context, + raw config.ConfigRaw, + control config.Control, +) (config.TypeOutputConfig, error) { conf := DefaultOutputConfig() if err := config.ReflectConfig(raw, &conf); err != nil { return nil, err @@ -69,6 +74,7 @@ func InitHandler(ctx context.Context, raw config.ConfigRaw) (config.TypeOutputCo ExpectContinueTimeout: 1 * time.Second, TLSClientConfig: &tls.Config{InsecureSkipVerify: conf.IgnoreSSL}, }} + conf.control = control return &conf, nil } diff --git a/output/kafka/outputkafka.go b/output/kafka/outputkafka.go index 4b756d0e..0322391f 100644 --- a/output/kafka/outputkafka.go +++ b/output/kafka/outputkafka.go @@ -40,7 +40,11 @@ func DefaultOutputConfig() OutputConfig { } // InitHandler initialize the output plugin -func InitHandler(ctx context.Context, raw config.ConfigRaw) (config.TypeOutputConfig, error) { +func InitHandler( + ctx context.Context, + raw config.ConfigRaw, + control config.Control, +) (config.TypeOutputConfig, error) { conf := DefaultOutputConfig() if err := config.ReflectConfig(raw, &conf); err != nil { return nil, err diff --git a/output/nsq/outputnsq.go b/output/nsq/outputnsq.go index 2c9b09c3..32faf032 100644 --- a/output/nsq/outputnsq.go +++ b/output/nsq/outputnsq.go @@ -42,7 +42,11 @@ func DefaultOutputConfig() OutputConfig { } // InitHandler initialize the output plugin -func InitHandler(ctx context.Context, raw config.ConfigRaw) (config.TypeOutputConfig, error) { +func InitHandler( + ctx context.Context, + raw config.ConfigRaw, + control config.Control, +) (config.TypeOutputConfig, error) { conf := DefaultOutputConfig() err := config.ReflectConfig(raw, &conf) if err != nil { diff --git a/output/prometheus/outputprometheus.go b/output/prometheus/outputprometheus.go index 06124b4d..fafb91e8 100644 --- a/output/prometheus/outputprometheus.go +++ b/output/prometheus/outputprometheus.go @@ -39,7 +39,11 @@ func DefaultOutputConfig() OutputConfig { } // InitHandler initialize the output plugin -func InitHandler(ctx context.Context, raw config.ConfigRaw) (config.TypeOutputConfig, error) { +func InitHandler( + ctx context.Context, + raw config.ConfigRaw, + control config.Control, +) (config.TypeOutputConfig, error) { conf := DefaultOutputConfig() if err := config.ReflectConfig(raw, &conf); err != nil { return nil, err diff --git a/output/redis/outputredis.go b/output/redis/outputredis.go index ef63af1e..dee58431 100644 --- a/output/redis/outputredis.go +++ b/output/redis/outputredis.go @@ -56,7 +56,11 @@ var ( ) // InitHandler initialize the output plugin -func InitHandler(ctx context.Context, raw config.ConfigRaw) (config.TypeOutputConfig, error) { +func InitHandler( + ctx context.Context, + raw config.ConfigRaw, + control config.Control, +) (config.TypeOutputConfig, error) { conf := DefaultOutputConfig() err := config.ReflectConfig(raw, &conf) if err != nil { diff --git a/output/report/outputreport.go b/output/report/outputreport.go index 66f3e965..1a6010c9 100644 --- a/output/report/outputreport.go +++ b/output/report/outputreport.go @@ -37,7 +37,11 @@ func DefaultOutputConfig() OutputConfig { } // InitHandler initialize the output plugin -func InitHandler(ctx context.Context, raw config.ConfigRaw) (config.TypeOutputConfig, error) { +func InitHandler( + ctx context.Context, + raw config.ConfigRaw, + control config.Control, +) (config.TypeOutputConfig, error) { conf := DefaultOutputConfig() if err := config.ReflectConfig(raw, &conf); err != nil { return nil, err diff --git a/output/socket/outputsocket.go b/output/socket/outputsocket.go index d2ed06da..1a1ffdc9 100644 --- a/output/socket/outputsocket.go +++ b/output/socket/outputsocket.go @@ -31,7 +31,11 @@ func DefaultOutputConfig() OutputConfig { } // InitHandler initialize the output plugin -func InitHandler(ctx context.Context, raw config.ConfigRaw) (config.TypeOutputConfig, error) { +func InitHandler( + ctx context.Context, + raw config.ConfigRaw, + control config.Control, +) (config.TypeOutputConfig, error) { conf := DefaultOutputConfig() if err := config.ReflectConfig(raw, &conf); err != nil { return nil, err diff --git a/output/statsd/outputstatsd.go b/output/statsd/outputstatsd.go index 0a40f4e3..1e07e720 100644 --- a/output/statsd/outputstatsd.go +++ b/output/statsd/outputstatsd.go @@ -163,7 +163,11 @@ func errorHandler(err error) { } // InitHandler initialize the output plugin -func InitHandler(ctx context.Context, raw config.ConfigRaw) (config.TypeOutputConfig, error) { +func InitHandler( + ctx context.Context, + raw config.ConfigRaw, + control config.Control, +) (config.TypeOutputConfig, error) { conf := DefaultOutputConfig() err := config.ReflectConfig(raw, &conf) if err != nil { diff --git a/output/stdout/outputstdout.go b/output/stdout/outputstdout.go index 5210ba4e..992c5534 100644 --- a/output/stdout/outputstdout.go +++ b/output/stdout/outputstdout.go @@ -33,7 +33,11 @@ func DefaultOutputConfig() OutputConfig { } // InitHandler initialize the output plugin -func InitHandler(ctx context.Context, raw config.ConfigRaw) (config.TypeOutputConfig, error) { +func InitHandler( + ctx context.Context, + raw config.ConfigRaw, + control config.Control, +) (config.TypeOutputConfig, error) { conf := DefaultOutputConfig() err := config.ReflectConfig(raw, &conf) if err != nil {