Skip to content

Commit

Permalink
Remove longpanic package
Browse files Browse the repository at this point in the history
  • Loading branch information
vadimalekseev committed Jul 20, 2023
1 parent 8f5cbfe commit 0956cb7
Show file tree
Hide file tree
Showing 20 changed files with 32 additions and 203 deletions.
18 changes: 2 additions & 16 deletions cfg/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,8 @@ import (
const trueValue = "true"

type Config struct {
Vault VaultConfig
PanicTimeout time.Duration
Pipelines map[string]*PipelineConfig
Vault VaultConfig
Pipelines map[string]*PipelineConfig
}

type (
Expand Down Expand Up @@ -181,19 +180,6 @@ func parseConfig(object *simplejson.Json) *Config {
config.Pipelines[name] = &PipelineConfig{Raw: raw}
}

config.PanicTimeout = time.Minute
panicTimeoutRaw := object.Get("panic_timeout")
if panicTimeoutRaw.Interface() != nil {
panicTimeoutStr, err := panicTimeoutRaw.String()
if err != nil {
logger.Panicf("can't get panic_timeout: %s", err.Error())
}
config.PanicTimeout, err = time.ParseDuration(panicTimeoutStr)
if err != nil {
logger.Panicf("can't parse panic_timeout: %s", err.Error())
}
}

return config
}

Expand Down
4 changes: 1 addition & 3 deletions cmd/file.d/file.d.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"github.com/ozontech/file.d/cfg"
"github.com/ozontech/file.d/fd"
"github.com/ozontech/file.d/logger"
"github.com/ozontech/file.d/longpanic"
"github.com/ozontech/file.d/pipeline"
_ "github.com/ozontech/file.d/plugin/action/add_file_name"
_ "github.com/ozontech/file.d/plugin/action/add_host"
Expand Down Expand Up @@ -82,15 +81,14 @@ func main() {
_, _ = maxprocs.Set(maxprocs.Logger(logger.Debugf))

go listenSignals()
longpanic.Go(start)
go start()

<-exit
logger.Infof("see you soon...")
}

func start() {
appCfg := cfg.NewConfigFromFile(*config)
longpanic.SetTimeout(appCfg.PanicTimeout)

fileD = fd.New(appCfg, *http)
fileD.Start()
Expand Down
8 changes: 0 additions & 8 deletions docs/architecture.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,3 @@ Input plugin has the index of zero, output plugin has the last index.
Actions also have the standard endpoints `/info` and `/sample`.
If the action has `metric_name`, it will be collected and can be viewed via the `/info` endpoint.
The `/sample` handler stores and shows an event before and after processing, so you can debug the action better.

#### `longpanic` and `/reset`
Every goroutine can (and should) use `longpanic.Go` and `longpanic.WithRecover` functions.
`longpanic.Go` is a goroutine wrapper that panics only after a timeout that you can set in pipeline settings.
`longpanic.WithRecover` is essentially the same but it runs a function synchronously.
It helps to debug the app, because you can see the state of failed file.d via API.
Also you can restart the failed plugin via API, i.e. with the `/reset` endpoint of `file` input plugin.
In case of nobody call API, it will panic with the given error message.
10 changes: 2 additions & 8 deletions fd/file.d.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"github.com/ozontech/file.d/buildinfo"
"github.com/ozontech/file.d/cfg"
"github.com/ozontech/file.d/logger"
"github.com/ozontech/file.d/longpanic"
"github.com/ozontech/file.d/metric"
"github.com/ozontech/file.d/pipeline"
"github.com/prometheus/client_golang/prometheus"
Expand All @@ -33,8 +32,7 @@ type FileD struct {

// file_d metrics

longPanicMetric *prometheus.CounterVec
versionMetric *prometheus.CounterVec
versionMetric *prometheus.CounterVec
}

func New(config *cfg.Config, httpAddr string) *FileD {
Expand Down Expand Up @@ -62,12 +60,8 @@ func (f *FileD) Start() {

func (f *FileD) initMetrics() {
f.metricCtl = metric.New("file_d", f.registry)
f.longPanicMetric = f.metricCtl.RegisterCounter("long_panic", "Count of panics in the LongPanic")
f.versionMetric = f.metricCtl.RegisterCounter("version", "", "version")
f.versionMetric.WithLabelValues(buildinfo.Version).Inc()
longpanic.SetOnPanicHandler(func(_ error) {
f.longPanicMetric.WithLabelValues().Inc()
})
}

func (f *FileD) createRegistry() {
Expand Down Expand Up @@ -295,7 +289,7 @@ func (f *FileD) startHTTP() {
mux.Handle("/log/level", logger.Level)

f.server = &http.Server{Addr: f.httpAddr, Handler: mux}
longpanic.Go(f.listenHTTP)
go f.listenHTTP()
}

func (f *FileD) listenHTTP() {
Expand Down
117 changes: 0 additions & 117 deletions longpanic/longpanic.go

This file was deleted.

7 changes: 2 additions & 5 deletions pipeline/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"time"

"github.com/ozontech/file.d/logger"
"github.com/ozontech/file.d/longpanic"
"github.com/ozontech/file.d/metric"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/atomic"
Expand Down Expand Up @@ -149,12 +148,10 @@ func (b *Batcher) Start(_ context.Context) {
b.fullBatches = make(chan *Batch, b.opts.Workers)
for i := 0; i < b.opts.Workers; i++ {
b.freeBatches <- newBatch(b.opts.BatchSizeCount, b.opts.BatchSizeBytes, b.opts.FlushTimeout)
longpanic.Go(func() {
b.work()
})
go b.work()
}

longpanic.Go(b.heartbeat)
go b.heartbeat()
}

type WorkerData any
Expand Down
5 changes: 2 additions & 3 deletions pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (

"github.com/ozontech/file.d/decoder"
"github.com/ozontech/file.d/logger"
"github.com/ozontech/file.d/longpanic"
"github.com/ozontech/file.d/metric"
)

Expand Down Expand Up @@ -294,9 +293,9 @@ func (p *Pipeline) Start() {

p.streamer.start()

longpanic.Go(p.maintenance)
go p.maintenance()
if !p.useSpread {
longpanic.Go(p.growProcs)
go p.growProcs()
}
p.started = true
}
Expand Down
3 changes: 1 addition & 2 deletions pipeline/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package pipeline

import (
"github.com/ozontech/file.d/logger"
"github.com/ozontech/file.d/longpanic"
"go.uber.org/atomic"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -103,7 +102,7 @@ func (p *processor) start(params PluginDefaultParams, log *zap.SugaredLogger) {
})
}

longpanic.Go(p.process)
go p.process()
}

func (p *processor) process() {
Expand Down
3 changes: 1 addition & 2 deletions pipeline/streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"time"

"github.com/ozontech/file.d/logger"
"github.com/ozontech/file.d/longpanic"
"go.uber.org/atomic"
)

Expand Down Expand Up @@ -45,7 +44,7 @@ func newStreamer(eventTimeout time.Duration) *streamer {
}

func (s *streamer) start() {
longpanic.Go(s.heartbeat)
go s.heartbeat()
}

func (s *streamer) stop() {
Expand Down
3 changes: 1 addition & 2 deletions plugin/input/dmesg/dmesg.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (

"github.com/euank/go-kmsg-parser/kmsgparser"
"github.com/ozontech/file.d/fd"
"github.com/ozontech/file.d/longpanic"
"github.com/ozontech/file.d/metric"
"github.com/ozontech/file.d/offset"
"github.com/ozontech/file.d/pipeline"
Expand Down Expand Up @@ -77,7 +76,7 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.InputPluginPa

p.parser = parser

longpanic.Go(p.read)
go p.read()
}

func (p *Plugin) registerMetrics(ctl *metric.Ctl) {
Expand Down
19 changes: 8 additions & 11 deletions plugin/input/file/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"time"

"github.com/ozontech/file.d/logger"
"github.com/ozontech/file.d/longpanic"
"github.com/ozontech/file.d/pipeline"
"github.com/prometheus/client_golang/prometheus"
"github.com/rjeczalik/notify"
Expand Down Expand Up @@ -139,25 +138,23 @@ func NewJobProvider(config *Config, possibleOffsetCorruptionMetric *prometheus.C
func (jp *jobProvider) start() {
jp.logger.Infof("starting job provider persistence mode=%s", jp.config.PersistenceMode)
if jp.config.OffsetsOp_ == offsetsOpContinue {
longpanic.WithRecover(func() {
offsets, err := jp.offsetDB.load()
if err != nil {
logger.Panicf("can't load offsets: %s", err.Error())
}
jp.loadedOffsets = offsets
})
offsets, err := jp.offsetDB.load()
if err != nil {
logger.Panicf("can't load offsets: %s", err.Error())
}
jp.loadedOffsets = offsets
}

jp.watcher.start()

if jp.config.PersistenceMode_ == persistenceModeAsync {
longpanic.Go(func() { jp.saveOffsetsCyclic(jp.config.AsyncInterval_) })
go jp.saveOffsetsCyclic(jp.config.AsyncInterval_)
}

jp.isStarted.Store(true)

longpanic.Go(jp.reportStats)
longpanic.Go(jp.maintenance)
go jp.reportStats()
go jp.maintenance()
}

func (jp *jobProvider) stop() {
Expand Down
3 changes: 0 additions & 3 deletions plugin/input/file/resetter.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"sync"

"github.com/ozontech/file.d/logger"
"github.com/ozontech/file.d/longpanic"
"gopkg.in/yaml.v3"
)

Expand Down Expand Up @@ -96,8 +95,6 @@ func (r *resetter) reset(request *http.Request) {
logger.Panicf("can't load offsets: %s", err.Error())
}
jp.loadedOffsets = offsets

longpanic.RecoverFromPanic()
}

func (r *resetter) truncateJobs(truncateAll bool, inode, sourceID uint64) {
Expand Down
3 changes: 1 addition & 2 deletions plugin/input/file/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"os"
"path/filepath"

"github.com/ozontech/file.d/longpanic"
"github.com/rjeczalik/notify"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -67,7 +66,7 @@ func (w *watcher) start() {
return
}

longpanic.Go(w.watch)
go w.watch()

w.tryAddPath(w.path)
}
Expand Down
Loading

0 comments on commit 0956cb7

Please sign in to comment.