From cd275942c1205562b3a05a754a9853becd4fcc85 Mon Sep 17 00:00:00 2001 From: Dmitry Smolonogov Date: Tue, 10 Jan 2023 15:15:18 +0300 Subject: [PATCH] add sample logger for parse_re2, discard, throttle plugins --- logger/logger.go | 66 +++++++++++++++-------- pipeline/pipeline.go | 6 ++- pipeline/plugin.go | 5 +- pipeline/processor.go | 5 +- plugin/action/discard/README.idoc.md | 3 +- plugin/action/discard/README.md | 8 ++- plugin/action/discard/discard.go | 21 ++++++-- plugin/action/discard/discard_test.go | 8 +-- plugin/action/mask/mask_test.go | 3 +- plugin/action/parse_re2/README.md | 6 +++ plugin/action/parse_re2/parse_re2.go | 18 +++++-- plugin/action/throttle/throttle.go | 27 +++++----- plugin/input/k8s/multiline_action_test.go | 22 +++++--- 13 files changed, 137 insertions(+), 61 deletions(-) diff --git a/logger/logger.go b/logger/logger.go index 1fab0428f..102a5b526 100644 --- a/logger/logger.go +++ b/logger/logger.go @@ -3,15 +3,22 @@ package logger import ( "os" "strings" + "time" "go.uber.org/zap" "go.uber.org/zap/zapcore" ) var Instance *zap.SugaredLogger +var SampleInstance *zap.SugaredLogger var Level zap.AtomicLevel -const defaultLevel = zap.InfoLevel +const ( + defaultLevel = zap.InfoLevel + defaultTick = time.Second + defaultFirst = 10 + defaultThereAfter = 100 +) func init() { var level zapcore.Level @@ -32,27 +39,42 @@ func init() { Level = zap.NewAtomicLevelAt(level) - Instance = zap.New( - zapcore.NewCore( - zapcore.NewConsoleEncoder(zapcore.EncoderConfig{ - TimeKey: "ts", - LevelKey: "level", - NameKey: "Instance", - CallerKey: "caller", - MessageKey: "message", - StacktraceKey: "stacktrace", - LineEnding: zapcore.DefaultLineEnding, - EncodeLevel: zapcore.LowercaseLevelEncoder, - EncodeTime: zapcore.ISO8601TimeEncoder, - EncodeDuration: zapcore.SecondsDurationEncoder, - EncodeCaller: zapcore.ShortCallerEncoder, - }), - zapcore.AddSync(os.Stdout), - Level, - ), - ).Sugar().Named("fd") - - Instance.Infof("Logger initialized with level: %s", level) + core := zapcore.NewCore( + zapcore.NewConsoleEncoder(zapcore.EncoderConfig{ + TimeKey: "ts", + LevelKey: "level", + NameKey: "Instance", + CallerKey: "caller", + MessageKey: "message", + StacktraceKey: "stacktrace", + LineEnding: zapcore.DefaultLineEnding, + EncodeLevel: zapcore.LowercaseLevelEncoder, + EncodeTime: zapcore.ISO8601TimeEncoder, + EncodeDuration: zapcore.SecondsDurationEncoder, + EncodeCaller: zapcore.ShortCallerEncoder, + }), + zapcore.AddSync(os.Stdout), + Level, + ) + + sampleCore := zapcore.NewSamplerWithOptions( + core, + defaultTick, + defaultFirst, + defaultThereAfter, + ) + + // logger initialization + Instance = NewLogger(core) + Instance.Infof("Logger initialized with level=%s", level) + + // sample logger initialization + SampleInstance = NewLogger(sampleCore) + SampleInstance.Infof("SampleLogger initialized with level=%s", level) +} + +func NewLogger(core zapcore.Core) *zap.SugaredLogger { + return zap.New(core).Sugar().Named("fd") } func Debug(args ...any) { diff --git a/pipeline/pipeline.go b/pipeline/pipeline.go index 52e71871e..9d6935e14 100644 --- a/pipeline/pipeline.go +++ b/pipeline/pipeline.go @@ -99,6 +99,7 @@ type Pipeline struct { // some debugging stuff logger *zap.SugaredLogger + sampleLogger *zap.SugaredLogger eventLogEnabled bool eventLog []string eventLogMu *sync.Mutex @@ -143,6 +144,7 @@ func New(name string, settings *Settings, registry *prometheus.Registry) *Pipeli pipeline := &Pipeline{ Name: name, logger: logger.Instance.Named(name), + sampleLogger: logger.SampleInstance.Named(name), settings: settings, useSpread: false, disableStreams: false, @@ -266,7 +268,7 @@ func (p *Pipeline) Start() { p.logger.Infof("stating processors, count=%d", len(p.Procs)) for _, processor := range p.Procs { processor.registerMetrics(p.metricsCtl) - processor.start(p.actionParams, p.logger) + processor.start(p.actionParams, p.logger, p.sampleLogger) } p.logger.Infof("starting input plugin %q", p.inputInfo.Type) @@ -571,7 +573,7 @@ func (p *Pipeline) expandProcs() { proc := p.newProc() p.Procs = append(p.Procs, proc) proc.registerMetrics(p.metricsCtl) - proc.start(p.actionParams, p.logger) + proc.start(p.actionParams, p.logger, p.sampleLogger) } p.procCount.Swap(to) diff --git a/pipeline/plugin.go b/pipeline/plugin.go index 36fd88bdd..2afaaa0de 100644 --- a/pipeline/plugin.go +++ b/pipeline/plugin.go @@ -52,8 +52,9 @@ type PluginDefaultParams struct { type ActionPluginParams struct { *PluginDefaultParams - Controller ActionPluginController - Logger *zap.SugaredLogger + Controller ActionPluginController + Logger *zap.SugaredLogger + SampleLogger *zap.SugaredLogger } type OutputPluginParams struct { diff --git a/pipeline/processor.go b/pipeline/processor.go index ca666350f..2cb9cd947 100644 --- a/pipeline/processor.go +++ b/pipeline/processor.go @@ -94,13 +94,14 @@ func NewProcessor( return processor } -func (p *processor) start(params *PluginDefaultParams, logger *zap.SugaredLogger) { +func (p *processor) start(params *PluginDefaultParams, defaultLogger *zap.SugaredLogger, sampleLogger *zap.SugaredLogger) { for i, action := range p.actions { actionInfo := p.actionInfos[i] action.Start(actionInfo.PluginStaticInfo.Config, &ActionPluginParams{ PluginDefaultParams: params, Controller: p, - Logger: logger.Named("action").Named(actionInfo.Type), + Logger: defaultLogger.Named("action").Named(actionInfo.Type), + SampleLogger: sampleLogger.Named("action").Named(actionInfo.Type), }) } diff --git a/plugin/action/discard/README.idoc.md b/plugin/action/discard/README.idoc.md index a41776b0c..f56a628de 100644 --- a/plugin/action/discard/README.idoc.md +++ b/plugin/action/discard/README.idoc.md @@ -1,4 +1,5 @@ # Discard plugin @introduction -> No config params +### Config params +@config-params|description diff --git a/plugin/action/discard/README.md b/plugin/action/discard/README.md index d856baba0..acfc92333 100755 --- a/plugin/action/discard/README.md +++ b/plugin/action/discard/README.md @@ -13,6 +13,12 @@ pipelines: ... ``` -> No config params +### Config params +**`is_logging`** *`bool`* *`default=false`* + +Field that includes logging (with sampling). + +
+
*Generated using [__insane-doc__](https://github.com/vitkovskii/insane-doc)* \ No newline at end of file diff --git a/plugin/action/discard/discard.go b/plugin/action/discard/discard.go index cbc156069..0589031d4 100644 --- a/plugin/action/discard/discard.go +++ b/plugin/action/discard/discard.go @@ -4,6 +4,7 @@ import ( "github.com/ozontech/file.d/fd" "github.com/ozontech/file.d/pipeline" "github.com/ozontech/file.d/plugin" + "go.uber.org/zap" ) /*{ introduction @@ -23,10 +24,19 @@ pipelines: }*/ type Plugin struct { + config *Config + sampleLogger *zap.SugaredLogger plugin.NoMetricsPlugin } -type Config struct{} +// ! config-params +// ^ config-params +type Config struct { + // > @3@4@5@6 + // > + // > Field that includes logging (with sampling). + IsLogging bool `json:"is_logging" default:"false"` // * +} func init() { fd.DefaultPluginRegistry.RegisterAction(&pipeline.PluginStaticInfo{ @@ -39,12 +49,17 @@ func factory() (pipeline.AnyPlugin, pipeline.AnyConfig) { return &Plugin{}, &Config{} } -func (p *Plugin) Start(_ pipeline.AnyConfig, _ *pipeline.ActionPluginParams) { +func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.ActionPluginParams) { + p.config = config.(*Config) + p.sampleLogger = params.SampleLogger } func (p *Plugin) Stop() { } -func (p *Plugin) Do(_ *pipeline.Event) pipeline.ActionResult { +func (p *Plugin) Do(event *pipeline.Event) pipeline.ActionResult { + if p.config.IsLogging { + p.sampleLogger.Info("discarded event: ", zap.Stringer("json", event)) + } return pipeline.ActionDiscard } diff --git a/plugin/action/discard/discard_test.go b/plugin/action/discard/discard_test.go index 5e424c2c4..4390b5752 100644 --- a/plugin/action/discard/discard_test.go +++ b/plugin/action/discard/discard_test.go @@ -22,7 +22,7 @@ func TestDiscardAnd(t *testing.T) { }, } - p, input, output := test.NewPipelineMock(test.NewActionPluginStaticInfo(factory, nil, pipeline.MatchModeAnd, conds, false)) + p, input, output := test.NewPipelineMock(test.NewActionPluginStaticInfo(factory, &Config{IsLogging: false}, pipeline.MatchModeAnd, conds, false)) wg := &sync.WaitGroup{} wg.Add(10) @@ -66,7 +66,7 @@ func TestDiscardOr(t *testing.T) { }, } - p, input, output := test.NewPipelineMock(test.NewActionPluginStaticInfo(factory, nil, pipeline.MatchModeOr, conds, false)) + p, input, output := test.NewPipelineMock(test.NewActionPluginStaticInfo(factory, &Config{IsLogging: false}, pipeline.MatchModeOr, conds, false)) wg := &sync.WaitGroup{} wg.Add(8) @@ -110,7 +110,7 @@ func TestDiscardRegex(t *testing.T) { }, } - p, input, output := test.NewPipelineMock(test.NewActionPluginStaticInfo(factory, nil, pipeline.MatchModeOr, conds, false)) + p, input, output := test.NewPipelineMock(test.NewActionPluginStaticInfo(factory, &Config{IsLogging: false}, pipeline.MatchModeOr, conds, false)) wg := &sync.WaitGroup{} wg.Add(11) @@ -151,7 +151,7 @@ func TestDiscardMatchInvert(t *testing.T) { }, } - p, input, output := test.NewPipelineMock(test.NewActionPluginStaticInfo(factory, nil, pipeline.MatchModeAnd, conds, true)) + p, input, output := test.NewPipelineMock(test.NewActionPluginStaticInfo(factory, &Config{IsLogging: false}, pipeline.MatchModeAnd, conds, true)) wg := &sync.WaitGroup{} wg.Add(9) diff --git a/plugin/action/mask/mask_test.go b/plugin/action/mask/mask_test.go index b0572b180..0072d6c4d 100644 --- a/plugin/action/mask/mask_test.go +++ b/plugin/action/mask/mask_test.go @@ -179,7 +179,8 @@ func TestMaskAddExtraField(t *testing.T) { PipelineName: "test_pipeline", PipelineSettings: &pipeline.Settings{}, }, - Logger: zap.L().Sugar(), + Logger: zap.L().Sugar(), + SampleLogger: zap.L().Sugar(), }) plugin.config.Masks[0].Re_ = regexp.MustCompile(plugin.config.Masks[0].Re) diff --git a/plugin/action/parse_re2/README.md b/plugin/action/parse_re2/README.md index c4ab67c7a..25c919fc9 100755 --- a/plugin/action/parse_re2/README.md +++ b/plugin/action/parse_re2/README.md @@ -14,5 +14,11 @@ A prefix to add to decoded object keys.
+**`is_logging`** *`bool`* *`default=false`* + +Field that includes logging (with sampling). + +
+
*Generated using [__insane-doc__](https://github.com/vitkovskii/insane-doc)* \ No newline at end of file diff --git a/plugin/action/parse_re2/parse_re2.go b/plugin/action/parse_re2/parse_re2.go index 7d58e8b6b..1149180bb 100644 --- a/plugin/action/parse_re2/parse_re2.go +++ b/plugin/action/parse_re2/parse_re2.go @@ -8,6 +8,7 @@ import ( "github.com/ozontech/file.d/pipeline" "github.com/ozontech/file.d/plugin" insaneJSON "github.com/vitkovskii/insane-json" + "go.uber.org/zap" ) /*{ introduction @@ -15,9 +16,9 @@ It parses string from the event field using re2 expression with named subgroups }*/ type Plugin struct { - config *Config - - re *regexp.Regexp + config *Config + sampleLogger *zap.SugaredLogger + re *regexp.Regexp plugin.NoMetricsPlugin } @@ -39,6 +40,11 @@ type Config struct { // > // > A prefix to add to decoded object keys. Prefix string `json:"prefix" default:""` // * + + // > @3@4@5@6 + // > + // > Field that includes logging (with sampling). + IsLogging bool `json:"is_logging" default:"false"` // * } func init() { @@ -52,8 +58,9 @@ func factory() (pipeline.AnyPlugin, pipeline.AnyConfig) { return &Plugin{}, &Config{} } -func (p *Plugin) Start(config pipeline.AnyConfig, _ *pipeline.ActionPluginParams) { +func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.ActionPluginParams) { p.config = config.(*Config) + p.sampleLogger = params.SampleLogger p.re = regexp.MustCompile(p.config.Re2) } @@ -70,6 +77,9 @@ func (p *Plugin) Do(event *pipeline.Event) pipeline.ActionResult { sm := p.re.FindSubmatch(jsonNode.AsBytes()) if len(sm) == 0 { + if p.config.IsLogging { + p.sampleLogger.Info("event is not parsed: ", zap.Stringer("json", event)) + } return pipeline.ActionPass } diff --git a/plugin/action/throttle/throttle.go b/plugin/action/throttle/throttle.go index 511dd2ae5..6c46f9b9f 100644 --- a/plugin/action/throttle/throttle.go +++ b/plugin/action/throttle/throttle.go @@ -45,13 +45,14 @@ It discards the events if pipeline throughput gets higher than a configured thre }*/ type Plugin struct { - ctx context.Context - cancel context.CancelFunc - logger *zap.SugaredLogger - config *Config - pipeline string - format string - redisClient redisClient + ctx context.Context + cancel context.CancelFunc + logger *zap.SugaredLogger + sampleLogger *zap.SugaredLogger + config *Config + pipeline string + format string + redisClient redisClient limiterBuf []byte rules []*rule @@ -185,6 +186,7 @@ func (p *Plugin) syncWorker(ctx context.Context, jobCh <-chan limiter, wg *sync. func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.ActionPluginParams) { p.config = config.(*Config) p.logger = params.Logger + p.sampleLogger = params.SampleLogger p.pipeline = params.PipelineName p.limiterBuf = make([]byte, 0) ctx, cancel := context.WithCancel(context.Background()) @@ -303,11 +305,12 @@ func (p *Plugin) isAllowed(event *pipeline.Event) bool { if len(p.config.TimeField_) != 0 { tsValue := event.Root.Dig(p.config.TimeField_...).AsString() - t, err := time.Parse(p.format, tsValue) - if err != nil || ts.IsZero() { - p.logger.Warnf("can't parse field %q using format %s: %s", p.config.TimeField, p.config.TimeFieldFormat, tsValue) - } else { - ts = t + if tsValue != "" { + if t, err := time.Parse(p.format, tsValue); err != nil || ts.IsZero() { + p.sampleLogger.Warnf("can't parse field=%q using format=%s time=%s", p.config.TimeField, p.config.TimeFieldFormat, tsValue) + } else { + ts = t + } } } diff --git a/plugin/input/k8s/multiline_action_test.go b/plugin/input/k8s/multiline_action_test.go index 40ce06f40..65e1f9ccb 100644 --- a/plugin/input/k8s/multiline_action_test.go +++ b/plugin/input/k8s/multiline_action_test.go @@ -17,11 +17,15 @@ func TestMultilineAction_Do(t *testing.T) { config := &Config{ SplitEventSize: predictionLookahead * 4, } - plugin.Start(config, &pipeline.ActionPluginParams{Logger: zap.S(), PluginDefaultParams: &pipeline.PluginDefaultParams{ - PipelineSettings: &pipeline.Settings{ - MaxEventSize: 20, + plugin.Start(config, &pipeline.ActionPluginParams{ + Logger: zap.S(), + SampleLogger: zap.S(), + PluginDefaultParams: &pipeline.PluginDefaultParams{ + PipelineSettings: &pipeline.Settings{ + MaxEventSize: 20, + }, }, - }}) + }) item := &metaItem{ nodeName: "node_1", @@ -93,9 +97,13 @@ func TestMultilineAction_Do_shouldSplit(t *testing.T) { config := &Config{ SplitEventSize: predictionLookahead * 4, } - plugin.Start(config, &pipeline.ActionPluginParams{Logger: zap.S(), PluginDefaultParams: &pipeline.PluginDefaultParams{ - PipelineSettings: &pipeline.Settings{}, - }}) + plugin.Start(config, &pipeline.ActionPluginParams{ + Logger: zap.S(), + SampleLogger: zap.S(), + PluginDefaultParams: &pipeline.PluginDefaultParams{ + PipelineSettings: &pipeline.Settings{}, + }, + }) item := &metaItem{ nodeName: "node_1",