diff --git a/cfg/config.go b/cfg/config.go index ee91d6f7a..49e8fcb1b 100644 --- a/cfg/config.go +++ b/cfg/config.go @@ -19,9 +19,8 @@ import ( const trueValue = "true" type Config struct { - Vault VaultConfig - PanicTimeout time.Duration - Pipelines map[string]*PipelineConfig + Vault VaultConfig + Pipelines map[string]*PipelineConfig } type ( @@ -181,19 +180,6 @@ func parseConfig(object *simplejson.Json) *Config { config.Pipelines[name] = &PipelineConfig{Raw: raw} } - config.PanicTimeout = time.Minute - panicTimeoutRaw := object.Get("panic_timeout") - if panicTimeoutRaw.Interface() != nil { - panicTimeoutStr, err := panicTimeoutRaw.String() - if err != nil { - logger.Panicf("can't get panic_timeout: %s", err.Error()) - } - config.PanicTimeout, err = time.ParseDuration(panicTimeoutStr) - if err != nil { - logger.Panicf("can't parse panic_timeout: %s", err.Error()) - } - } - return config } diff --git a/cmd/file.d/file.d.go b/cmd/file.d/file.d.go index 92ea116a1..6a46fefa1 100644 --- a/cmd/file.d/file.d.go +++ b/cmd/file.d/file.d.go @@ -13,7 +13,6 @@ import ( "github.com/ozontech/file.d/cfg" "github.com/ozontech/file.d/fd" "github.com/ozontech/file.d/logger" - "github.com/ozontech/file.d/longpanic" "github.com/ozontech/file.d/pipeline" _ "github.com/ozontech/file.d/plugin/action/add_file_name" _ "github.com/ozontech/file.d/plugin/action/add_host" @@ -82,7 +81,7 @@ func main() { _, _ = maxprocs.Set(maxprocs.Logger(logger.Debugf)) go listenSignals() - longpanic.Go(start) + go start() <-exit logger.Infof("see you soon...") @@ -90,7 +89,6 @@ func main() { func start() { appCfg := cfg.NewConfigFromFile(*config) - longpanic.SetTimeout(appCfg.PanicTimeout) fileD = fd.New(appCfg, *http) fileD.Start() diff --git a/docs/architecture.md b/docs/architecture.md index 4f8e81f6f..e78ac56f7 100644 --- a/docs/architecture.md +++ b/docs/architecture.md @@ -26,11 +26,3 @@ Input plugin has the index of zero, output plugin has the last index. Actions also have the standard endpoints `/info` and `/sample`. If the action has `metric_name`, it will be collected and can be viewed via the `/info` endpoint. The `/sample` handler stores and shows an event before and after processing, so you can debug the action better. - -#### `longpanic` and `/reset` -Every goroutine can (and should) use `longpanic.Go` and `longpanic.WithRecover` functions. -`longpanic.Go` is a goroutine wrapper that panics only after a timeout that you can set in pipeline settings. -`longpanic.WithRecover` is essentially the same but it runs a function synchronously. -It helps to debug the app, because you can see the state of failed file.d via API. -Also you can restart the failed plugin via API, i.e. with the `/reset` endpoint of `file` input plugin. -In case of nobody call API, it will panic with the given error message. diff --git a/fd/file.d.go b/fd/file.d.go index 154d74eb2..f0cf9bafb 100644 --- a/fd/file.d.go +++ b/fd/file.d.go @@ -15,7 +15,6 @@ import ( "github.com/ozontech/file.d/buildinfo" "github.com/ozontech/file.d/cfg" "github.com/ozontech/file.d/logger" - "github.com/ozontech/file.d/longpanic" "github.com/ozontech/file.d/metric" "github.com/ozontech/file.d/pipeline" "github.com/prometheus/client_golang/prometheus" @@ -35,8 +34,7 @@ type FileD struct { // file_d metrics - longPanicMetric *prometheus.CounterVec - versionMetric *prometheus.CounterVec + versionMetric *prometheus.CounterVec } func New(config *cfg.Config, httpAddr string) *FileD { @@ -64,12 +62,8 @@ func (f *FileD) Start() { func (f *FileD) initMetrics() { f.metricCtl = metric.New("file_d", f.registry) - f.longPanicMetric = f.metricCtl.RegisterCounter("long_panic", "Count of panics in the LongPanic") f.versionMetric = f.metricCtl.RegisterCounter("version", "", "version") f.versionMetric.WithLabelValues(buildinfo.Version).Inc() - longpanic.SetOnPanicHandler(func(_ error) { - f.longPanicMetric.WithLabelValues().Inc() - }) } func (f *FileD) createRegistry() { @@ -317,7 +311,7 @@ func (f *FileD) startHTTP() { }) f.server = &http.Server{Addr: f.httpAddr, Handler: mux} - longpanic.Go(f.listenHTTP) + go f.listenHTTP() } func (f *FileD) listenHTTP() { diff --git a/longpanic/longpanic.go b/longpanic/longpanic.go deleted file mode 100644 index 8a625d472..000000000 --- a/longpanic/longpanic.go +++ /dev/null @@ -1,117 +0,0 @@ -// package longpanic defines `Go` func that creates goroutine with defer -// that waits for somebody to call `RecoverFromPanic` or panics after timeout. -package longpanic - -import ( - "time" - - "go.uber.org/atomic" - "go.uber.org/zap" - - "github.com/ozontech/file.d/logger" -) - -// instance is a singleton with timeout that every `go func` call should use. -var instance *LongPanic = NewLongPanic(time.Minute) - -// SetTimeout set the timeout after the program panics. -func SetTimeout(timeout time.Duration) { - instance.timeout = timeout -} - -// Go runs fn in a different goroutine with defer statement that: -// 1. Recovers from panic -// 2. Waits for somebody to call `RecoverFromPanic` or timeout -// 3. Panics if nobody calls `RecoverFromPanic` -func Go(fn func()) { - instance.Go(fn) -} - -// WithRecover runs fn with defer statement that: -// 1. Recovers from panic -// 2. Waits for somebody to call `RecoverFromPanic` or timeout -// 3. Panics if nobody calls `RecoverFromPanic` -func WithRecover(fn func()) { - instance.WithRecover(fn) -} - -// RecoverFromPanic is a signal to not wait for the panic and tries to continue the execution. -func RecoverFromPanic() { - instance.RecoverFromPanic() -} - -func SetOnPanicHandler(cb func(err error)) { - instance.SetOnPanicHandler(cb) -} - -// LongPanic is a struct that holds an atomic and a timeout after a defer fn will panic. -type LongPanic struct { - shouldPanic *atomic.Bool - timeout time.Duration - - panicHandler func(err error) -} - -// NewLongPanic creates LongPanic. -func NewLongPanic(timeout time.Duration) *LongPanic { - return &LongPanic{ - shouldPanic: atomic.NewBool(false), - timeout: timeout, - } -} - -// Go runs fn in a different goroutine with defer statement that: -// 1. Recovers from panic -// 2. Waits for somebody to call `RecoverFromPanic` or timeout -// 3. Panics if nobody calls `RecoverFromPanic` -func (l *LongPanic) Go(fn func()) { - go func() { - defer l.recoverUntilTimeout() - fn() - }() -} - -// WithRecover runs fn with defer statement that: -// 1. Recovers from panic -// 2. Waits for somebody to call `RecoverFromPanic` or timeout -// 3. Panics if nobody calls `RecoverFromPanic` -func (l *LongPanic) WithRecover(fn func()) { - defer l.recoverUntilTimeout() - fn() -} - -// recover waits for somebody to reset the error plugin or panics after a timeout. -func (l *LongPanic) recoverUntilTimeout() { - if err, ok := recover().(error); ok { - if l.panicHandler != nil { - l.panicHandler(err) - } - - logger.Error(err.Error(), zap.Stack("stacktrace")) - logger.Error("wait for somebody to restart plugins via endpoint") - - l.shouldPanic.Store(true) - t := time.Now() - for { - time.Sleep(10 * time.Millisecond) - if !l.shouldPanic.Load() { - logger.Error("panic recovered! Trying to continue execution...") - - return - } - if time.Since(t) > l.timeout { - logger.Panic(err.Error()) - } - } - } -} - -// RecoverFromPanic is a signal to not wait for the panic and tries to continue the execution. -func (l *LongPanic) RecoverFromPanic() { - l.shouldPanic.Store(false) -} - -// SetOnPanicHandler setups callback when panic happened. -func (l *LongPanic) SetOnPanicHandler(cb func(err error)) { - l.panicHandler = cb -} diff --git a/pipeline/batch.go b/pipeline/batch.go index 33d36a307..f613c6a8f 100644 --- a/pipeline/batch.go +++ b/pipeline/batch.go @@ -6,7 +6,6 @@ import ( "time" "github.com/ozontech/file.d/logger" - "github.com/ozontech/file.d/longpanic" "github.com/ozontech/file.d/metric" "github.com/prometheus/client_golang/prometheus" "go.uber.org/atomic" @@ -149,12 +148,10 @@ func (b *Batcher) Start(_ context.Context) { b.fullBatches = make(chan *Batch, b.opts.Workers) for i := 0; i < b.opts.Workers; i++ { b.freeBatches <- newBatch(b.opts.BatchSizeCount, b.opts.BatchSizeBytes, b.opts.FlushTimeout) - longpanic.Go(func() { - b.work() - }) + go b.work() } - longpanic.Go(b.heartbeat) + go b.heartbeat() } type WorkerData any diff --git a/pipeline/pipeline.go b/pipeline/pipeline.go index 4163c59d8..e258934c4 100644 --- a/pipeline/pipeline.go +++ b/pipeline/pipeline.go @@ -20,7 +20,6 @@ import ( "github.com/ozontech/file.d/decoder" "github.com/ozontech/file.d/logger" - "github.com/ozontech/file.d/longpanic" "github.com/ozontech/file.d/metric" ) @@ -294,9 +293,9 @@ func (p *Pipeline) Start() { p.streamer.start() - longpanic.Go(p.maintenance) + go p.maintenance() if !p.useSpread { - longpanic.Go(p.growProcs) + go p.growProcs() } p.started = true } diff --git a/pipeline/processor.go b/pipeline/processor.go index 347ce6fb6..7bef30000 100644 --- a/pipeline/processor.go +++ b/pipeline/processor.go @@ -2,7 +2,6 @@ package pipeline import ( "github.com/ozontech/file.d/logger" - "github.com/ozontech/file.d/longpanic" "go.uber.org/atomic" "go.uber.org/zap" ) @@ -103,7 +102,7 @@ func (p *processor) start(params PluginDefaultParams, log *zap.SugaredLogger) { }) } - longpanic.Go(p.process) + go p.process() } func (p *processor) process() { diff --git a/pipeline/streamer.go b/pipeline/streamer.go index 2b0afe169..22c3c4a28 100644 --- a/pipeline/streamer.go +++ b/pipeline/streamer.go @@ -6,7 +6,6 @@ import ( "time" "github.com/ozontech/file.d/logger" - "github.com/ozontech/file.d/longpanic" "go.uber.org/atomic" ) @@ -45,7 +44,7 @@ func newStreamer(eventTimeout time.Duration) *streamer { } func (s *streamer) start() { - longpanic.Go(s.heartbeat) + go s.heartbeat() } func (s *streamer) stop() { diff --git a/plugin/input/dmesg/dmesg.go b/plugin/input/dmesg/dmesg.go index bba3957f9..cfcdecbb2 100644 --- a/plugin/input/dmesg/dmesg.go +++ b/plugin/input/dmesg/dmesg.go @@ -8,7 +8,6 @@ import ( "github.com/euank/go-kmsg-parser/kmsgparser" "github.com/ozontech/file.d/fd" - "github.com/ozontech/file.d/longpanic" "github.com/ozontech/file.d/metric" "github.com/ozontech/file.d/offset" "github.com/ozontech/file.d/pipeline" @@ -77,7 +76,7 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.InputPluginPa p.parser = parser - longpanic.Go(p.read) + go p.read() } func (p *Plugin) registerMetrics(ctl *metric.Ctl) { diff --git a/plugin/input/file/provider.go b/plugin/input/file/provider.go index 1ad54f3e3..c9f29f8c2 100644 --- a/plugin/input/file/provider.go +++ b/plugin/input/file/provider.go @@ -10,7 +10,6 @@ import ( "time" "github.com/ozontech/file.d/logger" - "github.com/ozontech/file.d/longpanic" "github.com/ozontech/file.d/pipeline" "github.com/prometheus/client_golang/prometheus" "github.com/rjeczalik/notify" @@ -139,25 +138,23 @@ func NewJobProvider(config *Config, possibleOffsetCorruptionMetric *prometheus.C func (jp *jobProvider) start() { jp.logger.Infof("starting job provider persistence mode=%s", jp.config.PersistenceMode) if jp.config.OffsetsOp_ == offsetsOpContinue { - longpanic.WithRecover(func() { - offsets, err := jp.offsetDB.load() - if err != nil { - logger.Panicf("can't load offsets: %s", err.Error()) - } - jp.loadedOffsets = offsets - }) + offsets, err := jp.offsetDB.load() + if err != nil { + logger.Panicf("can't load offsets: %s", err.Error()) + } + jp.loadedOffsets = offsets } jp.watcher.start() if jp.config.PersistenceMode_ == persistenceModeAsync { - longpanic.Go(func() { jp.saveOffsetsCyclic(jp.config.AsyncInterval_) }) + go jp.saveOffsetsCyclic(jp.config.AsyncInterval_) } jp.isStarted.Store(true) - longpanic.Go(jp.reportStats) - longpanic.Go(jp.maintenance) + go jp.reportStats() + go jp.maintenance() } func (jp *jobProvider) stop() { diff --git a/plugin/input/file/resetter.go b/plugin/input/file/resetter.go index 977ed8b1e..48aa8a7f8 100644 --- a/plugin/input/file/resetter.go +++ b/plugin/input/file/resetter.go @@ -8,7 +8,6 @@ import ( "sync" "github.com/ozontech/file.d/logger" - "github.com/ozontech/file.d/longpanic" "gopkg.in/yaml.v3" ) @@ -96,8 +95,6 @@ func (r *resetter) reset(request *http.Request) { logger.Panicf("can't load offsets: %s", err.Error()) } jp.loadedOffsets = offsets - - longpanic.RecoverFromPanic() } func (r *resetter) truncateJobs(truncateAll bool, inode, sourceID uint64) { diff --git a/plugin/input/file/watcher.go b/plugin/input/file/watcher.go index 6c8a39802..f2b9c3cec 100644 --- a/plugin/input/file/watcher.go +++ b/plugin/input/file/watcher.go @@ -4,7 +4,6 @@ import ( "os" "path/filepath" - "github.com/ozontech/file.d/longpanic" "github.com/rjeczalik/notify" "go.uber.org/zap" ) @@ -67,7 +66,7 @@ func (w *watcher) start() { return } - longpanic.Go(w.watch) + go w.watch() w.tryAddPath(w.path) } diff --git a/plugin/input/file/worker.go b/plugin/input/file/worker.go index aa7aecb01..3866677a0 100644 --- a/plugin/input/file/worker.go +++ b/plugin/input/file/worker.go @@ -5,7 +5,6 @@ import ( "io" "os" - "github.com/ozontech/file.d/longpanic" "github.com/ozontech/file.d/pipeline" "go.uber.org/zap" ) @@ -22,7 +21,7 @@ type inputer interface { } func (w *worker) start(inputController inputer, jobProvider *jobProvider, readBufferSize int, logger *zap.SugaredLogger) { - longpanic.Go(func() { w.work(inputController, jobProvider, readBufferSize, logger) }) + go w.work(inputController, jobProvider, readBufferSize, logger) } func (w *worker) work(controller inputer, jobProvider *jobProvider, readBufferSize int, logger *zap.SugaredLogger) { diff --git a/plugin/input/http/http.go b/plugin/input/http/http.go index 759997574..40da85a61 100644 --- a/plugin/input/http/http.go +++ b/plugin/input/http/http.go @@ -7,7 +7,6 @@ import ( "github.com/ozontech/file.d/fd" "github.com/ozontech/file.d/logger" - "github.com/ozontech/file.d/longpanic" "github.com/ozontech/file.d/metric" "github.com/ozontech/file.d/pipeline" "github.com/ozontech/file.d/tls" @@ -154,7 +153,7 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.InputPluginPa p.server = &http.Server{Addr: p.config.Address, Handler: mux} if p.config.Address != "off" { - longpanic.Go(p.listenHTTP) + go p.listenHTTP() } } diff --git a/plugin/input/journalctl/reader.go b/plugin/input/journalctl/reader.go index 31fe1476b..c28920d24 100644 --- a/plugin/input/journalctl/reader.go +++ b/plugin/input/journalctl/reader.go @@ -7,7 +7,6 @@ import ( "strings" "github.com/ozontech/file.d/logger" - "github.com/ozontech/file.d/longpanic" "github.com/prometheus/client_golang/prometheus" "go.uber.org/zap" ) @@ -98,7 +97,7 @@ func (r *journalReader) start() error { return err } - longpanic.Go(func() { r.readLines(out, r.config) }) + go r.readLines(out, r.config) return nil } diff --git a/plugin/input/k8s/gatherer.go b/plugin/input/k8s/gatherer.go index 300aa47a4..6899771fb 100644 --- a/plugin/input/k8s/gatherer.go +++ b/plugin/input/k8s/gatherer.go @@ -7,7 +7,6 @@ import ( "sync" "time" - "github.com/ozontech/file.d/longpanic" "go.uber.org/atomic" "go.uber.org/zap" corev1 "k8s.io/api/core/v1" @@ -88,10 +87,10 @@ func enableGatherer(l *zap.SugaredLogger) { if !DisableMetaUpdates { initGatherer() - longpanic.Go(func() { controller.Run(informerStop) }) + go controller.Run(informerStop) } - longpanic.Go(maintenance) + go maintenance() } func disableGatherer() { diff --git a/plugin/input/kafka/kafka.go b/plugin/input/kafka/kafka.go index a19c55551..143ac13b9 100644 --- a/plugin/input/kafka/kafka.go +++ b/plugin/input/kafka/kafka.go @@ -6,7 +6,6 @@ import ( "github.com/Shopify/sarama" "github.com/ozontech/file.d/fd" - "github.com/ozontech/file.d/longpanic" "github.com/ozontech/file.d/metric" "github.com/ozontech/file.d/pipeline" "github.com/prometheus/client_golang/prometheus" @@ -114,9 +113,7 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.InputPluginPa p.controller.UseSpread() p.controller.DisableStreams() - longpanic.Go(func() { - p.consume(ctx) - }) + go p.consume(ctx) } func (p *Plugin) registerMetrics(ctl *metric.Ctl) { diff --git a/plugin/output/file/file.go b/plugin/output/file/file.go index e26b68a0e..819086859 100644 --- a/plugin/output/file/file.go +++ b/plugin/output/file/file.go @@ -12,7 +12,6 @@ import ( "github.com/ozontech/file.d/cfg" "github.com/ozontech/file.d/fd" "github.com/ozontech/file.d/logger" - "github.com/ozontech/file.d/longpanic" "github.com/ozontech/file.d/pipeline" "go.uber.org/zap" "golang.org/x/net/context" @@ -167,9 +166,8 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP p.logger.Panic("next seal up time is nil!") } - longpanic.Go(func() { - p.fileSealUpTicker(ctx) - }) + go p.fileSealUpTicker(ctx) + p.batcher.Start(ctx) } @@ -281,7 +279,7 @@ func (p *Plugin) sealUp() { } logger.Infof("sealing file, newFileName=%s", newFileName) if p.SealUpCallback != nil { - longpanic.Go(func() { p.SealUpCallback(newFileName) }) + go p.SealUpCallback(newFileName) } } diff --git a/plugin/output/s3/s3.go b/plugin/output/s3/s3.go index 9147eb736..d132a223b 100644 --- a/plugin/output/s3/s3.go +++ b/plugin/output/s3/s3.go @@ -16,7 +16,6 @@ import ( "github.com/minio/minio-go" "github.com/ozontech/file.d/cfg" "github.com/ozontech/file.d/fd" - "github.com/ozontech/file.d/longpanic" "github.com/ozontech/file.d/metric" "github.com/ozontech/file.d/pipeline" "github.com/ozontech/file.d/plugin/output/file" @@ -313,8 +312,8 @@ func (p *Plugin) StartWithMinio(config pipeline.AnyConfig, params *pipeline.Outp p.compressCh = make(chan fileDTO, p.config.FileConfig.WorkersCount_) for i := 0; i < p.config.FileConfig.WorkersCount_; i++ { - longpanic.Go(p.uploadWork) - longpanic.Go(p.compressWork) + go p.uploadWork() + go p.compressWork() } err = p.startPlugins(params, outPlugCount, targetDirs, fileNames) if err != nil {