Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove longpanic package #424

Merged
merged 2 commits into from
Jul 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 2 additions & 16 deletions cfg/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,8 @@ import (
const trueValue = "true"

type Config struct {
Vault VaultConfig
PanicTimeout time.Duration
Pipelines map[string]*PipelineConfig
Vault VaultConfig
Pipelines map[string]*PipelineConfig
}

type (
Expand Down Expand Up @@ -181,19 +180,6 @@ func parseConfig(object *simplejson.Json) *Config {
config.Pipelines[name] = &PipelineConfig{Raw: raw}
}

config.PanicTimeout = time.Minute
panicTimeoutRaw := object.Get("panic_timeout")
if panicTimeoutRaw.Interface() != nil {
panicTimeoutStr, err := panicTimeoutRaw.String()
if err != nil {
logger.Panicf("can't get panic_timeout: %s", err.Error())
}
config.PanicTimeout, err = time.ParseDuration(panicTimeoutStr)
if err != nil {
logger.Panicf("can't parse panic_timeout: %s", err.Error())
}
}

return config
}

Expand Down
4 changes: 1 addition & 3 deletions cmd/file.d/file.d.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"github.com/ozontech/file.d/cfg"
"github.com/ozontech/file.d/fd"
"github.com/ozontech/file.d/logger"
"github.com/ozontech/file.d/longpanic"
"github.com/ozontech/file.d/pipeline"
_ "github.com/ozontech/file.d/plugin/action/add_file_name"
_ "github.com/ozontech/file.d/plugin/action/add_host"
Expand Down Expand Up @@ -82,15 +81,14 @@ func main() {
_, _ = maxprocs.Set(maxprocs.Logger(logger.Debugf))

go listenSignals()
longpanic.Go(start)
go start()

<-exit
logger.Infof("see you soon...")
}

func start() {
appCfg := cfg.NewConfigFromFile(*config)
longpanic.SetTimeout(appCfg.PanicTimeout)

fileD = fd.New(appCfg, *http)
fileD.Start()
Expand Down
8 changes: 0 additions & 8 deletions docs/architecture.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,3 @@ Input plugin has the index of zero, output plugin has the last index.
Actions also have the standard endpoints `/info` and `/sample`.
If the action has `metric_name`, it will be collected and can be viewed via the `/info` endpoint.
The `/sample` handler stores and shows an event before and after processing, so you can debug the action better.

#### `longpanic` and `/reset`
Every goroutine can (and should) use `longpanic.Go` and `longpanic.WithRecover` functions.
`longpanic.Go` is a goroutine wrapper that panics only after a timeout that you can set in pipeline settings.
`longpanic.WithRecover` is essentially the same but it runs a function synchronously.
It helps to debug the app, because you can see the state of failed file.d via API.
Also you can restart the failed plugin via API, i.e. with the `/reset` endpoint of `file` input plugin.
In case of nobody call API, it will panic with the given error message.
10 changes: 2 additions & 8 deletions fd/file.d.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"github.com/ozontech/file.d/buildinfo"
"github.com/ozontech/file.d/cfg"
"github.com/ozontech/file.d/logger"
"github.com/ozontech/file.d/longpanic"
"github.com/ozontech/file.d/metric"
"github.com/ozontech/file.d/pipeline"
"github.com/prometheus/client_golang/prometheus"
Expand All @@ -35,8 +34,7 @@ type FileD struct {

// file_d metrics

longPanicMetric *prometheus.CounterVec
versionMetric *prometheus.CounterVec
versionMetric *prometheus.CounterVec
}

func New(config *cfg.Config, httpAddr string) *FileD {
Expand Down Expand Up @@ -64,12 +62,8 @@ func (f *FileD) Start() {

func (f *FileD) initMetrics() {
f.metricCtl = metric.New("file_d", f.registry)
f.longPanicMetric = f.metricCtl.RegisterCounter("long_panic", "Count of panics in the LongPanic")
f.versionMetric = f.metricCtl.RegisterCounter("version", "", "version")
f.versionMetric.WithLabelValues(buildinfo.Version).Inc()
longpanic.SetOnPanicHandler(func(_ error) {
f.longPanicMetric.WithLabelValues().Inc()
})
}

func (f *FileD) createRegistry() {
Expand Down Expand Up @@ -317,7 +311,7 @@ func (f *FileD) startHTTP() {
})

f.server = &http.Server{Addr: f.httpAddr, Handler: mux}
longpanic.Go(f.listenHTTP)
go f.listenHTTP()
}

func (f *FileD) listenHTTP() {
Expand Down
117 changes: 0 additions & 117 deletions longpanic/longpanic.go

This file was deleted.

7 changes: 2 additions & 5 deletions pipeline/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"time"

"github.com/ozontech/file.d/logger"
"github.com/ozontech/file.d/longpanic"
"github.com/ozontech/file.d/metric"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/atomic"
Expand Down Expand Up @@ -149,12 +148,10 @@ func (b *Batcher) Start(_ context.Context) {
b.fullBatches = make(chan *Batch, b.opts.Workers)
for i := 0; i < b.opts.Workers; i++ {
b.freeBatches <- newBatch(b.opts.BatchSizeCount, b.opts.BatchSizeBytes, b.opts.FlushTimeout)
longpanic.Go(func() {
b.work()
})
go b.work()
}

longpanic.Go(b.heartbeat)
go b.heartbeat()
}

type WorkerData any
Expand Down
5 changes: 2 additions & 3 deletions pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (

"github.com/ozontech/file.d/decoder"
"github.com/ozontech/file.d/logger"
"github.com/ozontech/file.d/longpanic"
"github.com/ozontech/file.d/metric"
)

Expand Down Expand Up @@ -294,9 +293,9 @@ func (p *Pipeline) Start() {

p.streamer.start()

longpanic.Go(p.maintenance)
go p.maintenance()
if !p.useSpread {
longpanic.Go(p.growProcs)
go p.growProcs()
}
p.started = true
}
Expand Down
3 changes: 1 addition & 2 deletions pipeline/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package pipeline

import (
"github.com/ozontech/file.d/logger"
"github.com/ozontech/file.d/longpanic"
"go.uber.org/atomic"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -103,7 +102,7 @@ func (p *processor) start(params PluginDefaultParams, log *zap.SugaredLogger) {
})
}

longpanic.Go(p.process)
go p.process()
}

func (p *processor) process() {
Expand Down
3 changes: 1 addition & 2 deletions pipeline/streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"time"

"github.com/ozontech/file.d/logger"
"github.com/ozontech/file.d/longpanic"
"go.uber.org/atomic"
)

Expand Down Expand Up @@ -45,7 +44,7 @@ func newStreamer(eventTimeout time.Duration) *streamer {
}

func (s *streamer) start() {
longpanic.Go(s.heartbeat)
go s.heartbeat()
}

func (s *streamer) stop() {
Expand Down
3 changes: 1 addition & 2 deletions plugin/input/dmesg/dmesg.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@

"github.com/euank/go-kmsg-parser/kmsgparser"
"github.com/ozontech/file.d/fd"
"github.com/ozontech/file.d/longpanic"
"github.com/ozontech/file.d/metric"
"github.com/ozontech/file.d/offset"
"github.com/ozontech/file.d/pipeline"
Expand Down Expand Up @@ -77,7 +76,7 @@

p.parser = parser

longpanic.Go(p.read)
go p.read()

Check warning on line 79 in plugin/input/dmesg/dmesg.go

View check run for this annotation

Codecov / codecov/patch

plugin/input/dmesg/dmesg.go#L79

Added line #L79 was not covered by tests
}

func (p *Plugin) registerMetrics(ctl *metric.Ctl) {
Expand Down
19 changes: 8 additions & 11 deletions plugin/input/file/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
"time"

"github.com/ozontech/file.d/logger"
"github.com/ozontech/file.d/longpanic"
"github.com/ozontech/file.d/pipeline"
"github.com/prometheus/client_golang/prometheus"
"github.com/rjeczalik/notify"
Expand Down Expand Up @@ -139,25 +138,23 @@
func (jp *jobProvider) start() {
jp.logger.Infof("starting job provider persistence mode=%s", jp.config.PersistenceMode)
if jp.config.OffsetsOp_ == offsetsOpContinue {
longpanic.WithRecover(func() {
offsets, err := jp.offsetDB.load()
if err != nil {
logger.Panicf("can't load offsets: %s", err.Error())
}
jp.loadedOffsets = offsets
})
offsets, err := jp.offsetDB.load()
if err != nil {
logger.Panicf("can't load offsets: %s", err.Error())
}

Check warning on line 144 in plugin/input/file/provider.go

View check run for this annotation

Codecov / codecov/patch

plugin/input/file/provider.go#L143-L144

Added lines #L143 - L144 were not covered by tests
jp.loadedOffsets = offsets
}

jp.watcher.start()

if jp.config.PersistenceMode_ == persistenceModeAsync {
longpanic.Go(func() { jp.saveOffsetsCyclic(jp.config.AsyncInterval_) })
go jp.saveOffsetsCyclic(jp.config.AsyncInterval_)
}

jp.isStarted.Store(true)

longpanic.Go(jp.reportStats)
longpanic.Go(jp.maintenance)
go jp.reportStats()
go jp.maintenance()
}

func (jp *jobProvider) stop() {
Expand Down
3 changes: 0 additions & 3 deletions plugin/input/file/resetter.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"sync"

"github.com/ozontech/file.d/logger"
"github.com/ozontech/file.d/longpanic"
"gopkg.in/yaml.v3"
)

Expand Down Expand Up @@ -96,8 +95,6 @@ func (r *resetter) reset(request *http.Request) {
logger.Panicf("can't load offsets: %s", err.Error())
}
jp.loadedOffsets = offsets

longpanic.RecoverFromPanic()
}

func (r *resetter) truncateJobs(truncateAll bool, inode, sourceID uint64) {
Expand Down
3 changes: 1 addition & 2 deletions plugin/input/file/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"os"
"path/filepath"

"github.com/ozontech/file.d/longpanic"
"github.com/rjeczalik/notify"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -67,7 +66,7 @@ func (w *watcher) start() {
return
}

longpanic.Go(w.watch)
go w.watch()

w.tryAddPath(w.path)
}
Expand Down
Loading