Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add sample logger for parse_re2 and discard plugins #260

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 44 additions & 22 deletions logger/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,22 @@ package logger
import (
"os"
"strings"
"time"

"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)

var Instance *zap.SugaredLogger
var SampleInstance *zap.SugaredLogger
var Level zap.AtomicLevel

const defaultLevel = zap.InfoLevel
const (
defaultLevel = zap.InfoLevel
defaultTick = time.Second
defaultFirst = 10
defaultThereAfter = 100
)

func init() {
var level zapcore.Level
Expand All @@ -32,27 +39,42 @@ func init() {

Level = zap.NewAtomicLevelAt(level)

Instance = zap.New(
zapcore.NewCore(
zapcore.NewConsoleEncoder(zapcore.EncoderConfig{
TimeKey: "ts",
LevelKey: "level",
NameKey: "Instance",
CallerKey: "caller",
MessageKey: "message",
StacktraceKey: "stacktrace",
LineEnding: zapcore.DefaultLineEnding,
EncodeLevel: zapcore.LowercaseLevelEncoder,
EncodeTime: zapcore.ISO8601TimeEncoder,
EncodeDuration: zapcore.SecondsDurationEncoder,
EncodeCaller: zapcore.ShortCallerEncoder,
}),
zapcore.AddSync(os.Stdout),
Level,
),
).Sugar().Named("fd")

Instance.Infof("Logger initialized with level: %s", level)
core := zapcore.NewCore(
zapcore.NewConsoleEncoder(zapcore.EncoderConfig{
TimeKey: "ts",
LevelKey: "level",
NameKey: "Instance",
CallerKey: "caller",
MessageKey: "message",
StacktraceKey: "stacktrace",
LineEnding: zapcore.DefaultLineEnding,
EncodeLevel: zapcore.LowercaseLevelEncoder,
EncodeTime: zapcore.ISO8601TimeEncoder,
EncodeDuration: zapcore.SecondsDurationEncoder,
EncodeCaller: zapcore.ShortCallerEncoder,
}),
zapcore.AddSync(os.Stdout),
Level,
)

sampleCore := zapcore.NewSamplerWithOptions(
core,
defaultTick,
defaultFirst,
defaultThereAfter,
)

// logger initialization
Instance = NewLogger(core)
Instance.Infof("Logger initialized with level=%s", level)

// sample logger initialization
SampleInstance = NewLogger(sampleCore)
SampleInstance.Infof("SampleLogger initialized with level=%s", level)
}

func NewLogger(core zapcore.Core) *zap.SugaredLogger {
return zap.New(core).Sugar().Named("fd")
}

func Debug(args ...any) {
Expand Down
6 changes: 4 additions & 2 deletions pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ type Pipeline struct {

// some debugging stuff
logger *zap.SugaredLogger
sampleLogger *zap.SugaredLogger
eventLogEnabled bool
eventLog []string
eventLogMu *sync.Mutex
Expand Down Expand Up @@ -143,6 +144,7 @@ func New(name string, settings *Settings, registry *prometheus.Registry) *Pipeli
pipeline := &Pipeline{
Name: name,
logger: logger.Instance.Named(name),
sampleLogger: logger.SampleInstance.Named(name),
settings: settings,
useSpread: false,
disableStreams: false,
Expand Down Expand Up @@ -266,7 +268,7 @@ func (p *Pipeline) Start() {
p.logger.Infof("stating processors, count=%d", len(p.Procs))
for _, processor := range p.Procs {
processor.registerMetrics(p.metricsCtl)
processor.start(p.actionParams, p.logger)
processor.start(p.actionParams, p.logger, p.sampleLogger)
}

p.logger.Infof("starting input plugin %q", p.inputInfo.Type)
Expand Down Expand Up @@ -571,7 +573,7 @@ func (p *Pipeline) expandProcs() {
proc := p.newProc()
p.Procs = append(p.Procs, proc)
proc.registerMetrics(p.metricsCtl)
proc.start(p.actionParams, p.logger)
proc.start(p.actionParams, p.logger, p.sampleLogger)
}

p.procCount.Swap(to)
Expand Down
5 changes: 3 additions & 2 deletions pipeline/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,9 @@ type PluginDefaultParams struct {

type ActionPluginParams struct {
*PluginDefaultParams
Controller ActionPluginController
Logger *zap.SugaredLogger
Controller ActionPluginController
Logger *zap.SugaredLogger
SampleLogger *zap.SugaredLogger
}

type OutputPluginParams struct {
Expand Down
5 changes: 3 additions & 2 deletions pipeline/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,13 +94,14 @@ func NewProcessor(
return processor
}

func (p *processor) start(params *PluginDefaultParams, logger *zap.SugaredLogger) {
func (p *processor) start(params *PluginDefaultParams, defaultLogger *zap.SugaredLogger, sampleLogger *zap.SugaredLogger) {
for i, action := range p.actions {
actionInfo := p.actionInfos[i]
action.Start(actionInfo.PluginStaticInfo.Config, &ActionPluginParams{
PluginDefaultParams: params,
Controller: p,
Logger: logger.Named("action").Named(actionInfo.Type),
Logger: defaultLogger.Named("action").Named(actionInfo.Type),
SampleLogger: sampleLogger.Named("action").Named(actionInfo.Type),
})
}

Expand Down
3 changes: 2 additions & 1 deletion plugin/action/discard/README.idoc.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# Discard plugin
@introduction

> No config params
### Config params
@config-params|description
8 changes: 7 additions & 1 deletion plugin/action/discard/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,12 @@ pipelines:
...
```

> No config params
### Config params
**`is_logging`** *`bool`* *`default=false`*

Field that includes logging (with sampling).

<br>


<br>*Generated using [__insane-doc__](https://github.com/vitkovskii/insane-doc)*
21 changes: 18 additions & 3 deletions plugin/action/discard/discard.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"github.com/ozontech/file.d/fd"
"github.com/ozontech/file.d/pipeline"
"github.com/ozontech/file.d/plugin"
"go.uber.org/zap"
)

/*{ introduction
Expand All @@ -23,10 +24,19 @@ pipelines:
}*/

type Plugin struct {
config *Config
sampleLogger *zap.SugaredLogger
plugin.NoMetricsPlugin
}

type Config struct{}
// ! config-params
// ^ config-params
type Config struct {
// > @3@4@5@6
// >
// > Field that includes logging (with sampling).
IsLogging bool `json:"is_logging" default:"false"` // *
}

func init() {
fd.DefaultPluginRegistry.RegisterAction(&pipeline.PluginStaticInfo{
Expand All @@ -39,12 +49,17 @@ func factory() (pipeline.AnyPlugin, pipeline.AnyConfig) {
return &Plugin{}, &Config{}
}

func (p *Plugin) Start(_ pipeline.AnyConfig, _ *pipeline.ActionPluginParams) {
func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.ActionPluginParams) {
p.config = config.(*Config)
p.sampleLogger = params.SampleLogger
}

func (p *Plugin) Stop() {
}

func (p *Plugin) Do(_ *pipeline.Event) pipeline.ActionResult {
func (p *Plugin) Do(event *pipeline.Event) pipeline.ActionResult {
if p.config.IsLogging {
p.sampleLogger.Info("discarded event: ", zap.Stringer("json", event))
}
return pipeline.ActionDiscard
}
8 changes: 4 additions & 4 deletions plugin/action/discard/discard_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func TestDiscardAnd(t *testing.T) {
},
}

p, input, output := test.NewPipelineMock(test.NewActionPluginStaticInfo(factory, nil, pipeline.MatchModeAnd, conds, false))
p, input, output := test.NewPipelineMock(test.NewActionPluginStaticInfo(factory, &Config{IsLogging: false}, pipeline.MatchModeAnd, conds, false))

wg := &sync.WaitGroup{}
wg.Add(10)
Expand Down Expand Up @@ -66,7 +66,7 @@ func TestDiscardOr(t *testing.T) {
},
}

p, input, output := test.NewPipelineMock(test.NewActionPluginStaticInfo(factory, nil, pipeline.MatchModeOr, conds, false))
p, input, output := test.NewPipelineMock(test.NewActionPluginStaticInfo(factory, &Config{IsLogging: false}, pipeline.MatchModeOr, conds, false))

wg := &sync.WaitGroup{}
wg.Add(8)
Expand Down Expand Up @@ -110,7 +110,7 @@ func TestDiscardRegex(t *testing.T) {
},
}

p, input, output := test.NewPipelineMock(test.NewActionPluginStaticInfo(factory, nil, pipeline.MatchModeOr, conds, false))
p, input, output := test.NewPipelineMock(test.NewActionPluginStaticInfo(factory, &Config{IsLogging: false}, pipeline.MatchModeOr, conds, false))

wg := &sync.WaitGroup{}
wg.Add(11)
Expand Down Expand Up @@ -151,7 +151,7 @@ func TestDiscardMatchInvert(t *testing.T) {
},
}

p, input, output := test.NewPipelineMock(test.NewActionPluginStaticInfo(factory, nil, pipeline.MatchModeAnd, conds, true))
p, input, output := test.NewPipelineMock(test.NewActionPluginStaticInfo(factory, &Config{IsLogging: false}, pipeline.MatchModeAnd, conds, true))

wg := &sync.WaitGroup{}
wg.Add(9)
Expand Down
3 changes: 2 additions & 1 deletion plugin/action/mask/mask_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,8 @@ func TestMaskAddExtraField(t *testing.T) {
PipelineName: "test_pipeline",
PipelineSettings: &pipeline.Settings{},
},
Logger: zap.L().Sugar(),
Logger: zap.L().Sugar(),
SampleLogger: zap.L().Sugar(),
})
plugin.config.Masks[0].Re_ = regexp.MustCompile(plugin.config.Masks[0].Re)

Expand Down
6 changes: 6 additions & 0 deletions plugin/action/parse_re2/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,11 @@ A prefix to add to decoded object keys.

<br>

**`is_logging`** *`bool`* *`default=false`*

Field that includes logging (with sampling).

<br>


<br>*Generated using [__insane-doc__](https://github.com/vitkovskii/insane-doc)*
18 changes: 14 additions & 4 deletions plugin/action/parse_re2/parse_re2.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,17 @@ import (
"github.com/ozontech/file.d/pipeline"
"github.com/ozontech/file.d/plugin"
insaneJSON "github.com/vitkovskii/insane-json"
"go.uber.org/zap"
)

/*{ introduction
It parses string from the event field using re2 expression with named subgroups and merges the result with the event root.
}*/

type Plugin struct {
config *Config

re *regexp.Regexp
config *Config
sampleLogger *zap.SugaredLogger
re *regexp.Regexp
plugin.NoMetricsPlugin
}

Expand All @@ -39,6 +40,11 @@ type Config struct {
// >
// > A prefix to add to decoded object keys.
Prefix string `json:"prefix" default:""` // *

// > @3@4@5@6
// >
// > Field that includes logging (with sampling).
IsLogging bool `json:"is_logging" default:"false"` // *
}

func init() {
Expand All @@ -52,8 +58,9 @@ func factory() (pipeline.AnyPlugin, pipeline.AnyConfig) {
return &Plugin{}, &Config{}
}

func (p *Plugin) Start(config pipeline.AnyConfig, _ *pipeline.ActionPluginParams) {
func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.ActionPluginParams) {
p.config = config.(*Config)
p.sampleLogger = params.SampleLogger

p.re = regexp.MustCompile(p.config.Re2)
}
Expand All @@ -70,6 +77,9 @@ func (p *Plugin) Do(event *pipeline.Event) pipeline.ActionResult {
sm := p.re.FindSubmatch(jsonNode.AsBytes())

if len(sm) == 0 {
if p.config.IsLogging {
p.sampleLogger.Info("event is not parsed: ", zap.Stringer("json", event))
}
return pipeline.ActionPass
}

Expand Down
27 changes: 15 additions & 12 deletions plugin/action/throttle/throttle.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,14 @@ It discards the events if pipeline throughput gets higher than a configured thre
}*/

type Plugin struct {
ctx context.Context
cancel context.CancelFunc
logger *zap.SugaredLogger
config *Config
pipeline string
format string
redisClient redisClient
ctx context.Context
cancel context.CancelFunc
logger *zap.SugaredLogger
sampleLogger *zap.SugaredLogger
config *Config
pipeline string
format string
redisClient redisClient

limiterBuf []byte
rules []*rule
Expand Down Expand Up @@ -185,6 +186,7 @@ func (p *Plugin) syncWorker(ctx context.Context, jobCh <-chan limiter, wg *sync.
func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.ActionPluginParams) {
p.config = config.(*Config)
p.logger = params.Logger
p.sampleLogger = params.SampleLogger
p.pipeline = params.PipelineName
p.limiterBuf = make([]byte, 0)
ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -303,11 +305,12 @@ func (p *Plugin) isAllowed(event *pipeline.Event) bool {

if len(p.config.TimeField_) != 0 {
tsValue := event.Root.Dig(p.config.TimeField_...).AsString()
t, err := time.Parse(p.format, tsValue)
if err != nil || ts.IsZero() {
p.logger.Warnf("can't parse field %q using format %s: %s", p.config.TimeField, p.config.TimeFieldFormat, tsValue)
} else {
ts = t
if tsValue != "" {
if t, err := time.Parse(p.format, tsValue); err != nil || ts.IsZero() {
p.sampleLogger.Warnf("can't parse field=%q using format=%s time=%s", p.config.TimeField, p.config.TimeFieldFormat, tsValue)
} else {
ts = t
}
}
}

Expand Down
Loading