Skip to content

Commit

Permalink
Basic server implementation for the file.d playground
Browse files Browse the repository at this point in the history
  • Loading branch information
vadimalekseev committed Jul 2, 2023
1 parent 92378d4 commit d09a42f
Show file tree
Hide file tree
Showing 15 changed files with 425 additions and 43 deletions.
9 changes: 6 additions & 3 deletions cmd/file.d/file.d_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,8 @@ func TestThatPluginsAreImported(t *testing.T) {
"throttle",
}
for _, pluginName := range action {
pluginInfo := fd.DefaultPluginRegistry.Get(pipeline.PluginKindAction, pluginName)
pluginInfo, err := fd.DefaultPluginRegistry.Get(pipeline.PluginKindAction, pluginName)
require.NoError(t, err)
require.NotNil(t, pluginInfo)
}

Expand All @@ -196,7 +197,8 @@ func TestThatPluginsAreImported(t *testing.T) {
"kafka",
}
for _, pluginName := range input {
pluginInfo := fd.DefaultPluginRegistry.Get(pipeline.PluginKindInput, pluginName)
pluginInfo, err := fd.DefaultPluginRegistry.Get(pipeline.PluginKindInput, pluginName)
require.NoError(t, err)
require.NotNil(t, pluginInfo)
}

Expand All @@ -211,7 +213,8 @@ func TestThatPluginsAreImported(t *testing.T) {
"stdout",
}
for _, pluginName := range output {
pluginInfo := fd.DefaultPluginRegistry.Get(pipeline.PluginKindOutput, pluginName)
pluginInfo, err := fd.DefaultPluginRegistry.Get(pipeline.PluginKindOutput, pluginName)
require.NoError(t, err)
require.NotNil(t, pluginInfo)
}
}
Expand Down
42 changes: 42 additions & 0 deletions cmd/playground/playground.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package main

import (
"net/http"

"github.com/ozontech/file.d/fd"
"github.com/ozontech/file.d/logger"
"github.com/ozontech/file.d/playground"

_ "github.com/ozontech/file.d/plugin/action/add_file_name"
_ "github.com/ozontech/file.d/plugin/action/add_host"
_ "github.com/ozontech/file.d/plugin/action/convert_date"
_ "github.com/ozontech/file.d/plugin/action/convert_log_level"
_ "github.com/ozontech/file.d/plugin/action/debug"
_ "github.com/ozontech/file.d/plugin/action/discard"
_ "github.com/ozontech/file.d/plugin/action/flatten"
_ "github.com/ozontech/file.d/plugin/action/join"
_ "github.com/ozontech/file.d/plugin/action/join_template"
_ "github.com/ozontech/file.d/plugin/action/json_decode"
_ "github.com/ozontech/file.d/plugin/action/json_encode"
_ "github.com/ozontech/file.d/plugin/action/keep_fields"
_ "github.com/ozontech/file.d/plugin/action/mask"
_ "github.com/ozontech/file.d/plugin/action/modify"
_ "github.com/ozontech/file.d/plugin/action/parse_es"
_ "github.com/ozontech/file.d/plugin/action/parse_re2"
_ "github.com/ozontech/file.d/plugin/action/remove_fields"
_ "github.com/ozontech/file.d/plugin/action/rename"
_ "github.com/ozontech/file.d/plugin/action/set_time"

_ "github.com/ozontech/file.d/plugin/input/fake"
_ "github.com/ozontech/file.d/plugin/output/devnull"
)

func main() {
lg := logger.Instance.Desugar().Named("playground")
handler := playground.NewDoActionsHandler(fd.DefaultPluginRegistry, lg)

err := http.ListenAndServe(":9090", handler)
if err != nil && err != http.ErrServerClosed {
panic(err)
}
}
46 changes: 30 additions & 16 deletions fd/file.d.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,13 +97,16 @@ func (f *FileD) addPipeline(name string, config *cfg.PipelineConfig) {

logger.Infof("creating pipeline %q: capacity=%d, stream field=%s, decoder=%s", name, settings.Capacity, settings.StreamField, settings.Decoder)

p := pipeline.New(name, settings, f.registry)
p := pipeline.New(name, settings, f.registry, logger.Instance.Named(name))
err := f.setupInput(p, config, values)
if err != nil {
logger.Fatalf("can't create pipeline %q: %s", name, err.Error())
}

f.setupActions(p, config, values)
actions := config.Raw.Get("actions")
if err := SetupActions(p, f.plugins, actions, values); err != nil {
logger.Fatalf("can't create pipeline %q: %s", name, err.Error())
}

err = f.setupOutput(p, config, values)
if err != nil {
Expand All @@ -126,7 +129,10 @@ func (f *FileD) setupInput(p *pipeline.Pipeline, pipelineConfig *cfg.PipelineCon
})

for _, actionType := range inputInfo.AdditionalActions {
actionInfo := f.plugins.GetActionByType(actionType)
actionInfo, err := f.plugins.GetActionByType(actionType)
if err != nil {
return err
}

infoCopy := *actionInfo
infoCopy.Config = inputInfo.Config
Expand All @@ -141,46 +147,50 @@ func (f *FileD) setupInput(p *pipeline.Pipeline, pipelineConfig *cfg.PipelineCon
return nil
}

func (f *FileD) setupActions(p *pipeline.Pipeline, pipelineConfig *cfg.PipelineConfig, values map[string]int) {
actions := pipelineConfig.Raw.Get("actions")
func SetupActions(p *pipeline.Pipeline, plugins *PluginRegistry, actions *simplejson.Json, values map[string]int) error {
for index := range actions.MustArray() {
actionJSON := actions.GetIndex(index)
if actionJSON.MustMap() == nil {
logger.Fatalf("empty action #%d for pipeline %q", index, p.Name)
return fmt.Errorf("empty action #%d for pipeline %q", index, p.Name)
}

t := actionJSON.Get("type").MustString()
if t == "" {
logger.Fatalf("action #%d doesn't provide type %q", index, p.Name)
return fmt.Errorf("action #%d doesn't provide type %q", index, p.Name)
}
if err := setupAction(p, plugins, index, t, actionJSON, values); err != nil {
return err
}
f.setupAction(p, index, t, actionJSON, values)
}
return nil
}

func (f *FileD) setupAction(p *pipeline.Pipeline, index int, t string, actionJSON *simplejson.Json, values map[string]int) {
logger.Infof("creating action with type %q for pipeline %q", t, p.Name)
info := f.plugins.GetActionByType(t)
func setupAction(p *pipeline.Pipeline, plugins *PluginRegistry, index int, t string, actionJSON *simplejson.Json, values map[string]int) error {
info, err := plugins.GetActionByType(t)
if err != nil {
return err
}

matchMode := extractMatchMode(actionJSON)
if matchMode == pipeline.MatchModeUnknown {
logger.Fatalf("unknown match_mode value for action %d/%s in pipeline %q", index, t, p.Name)
return fmt.Errorf("unknown match_mode value for action %d/%s", index, t)
}
matchInvert := extractMatchInvert(actionJSON)
conditions, err := extractConditions(actionJSON.Get("match_fields"))
if err != nil {
logger.Fatalf("can't extract conditions for action %d/%s in pipeline %q: %s", index, t, p.Name, err.Error())
return fmt.Errorf("can't extract conditions for action %d/%s: %s", index, t, err.Error())
}
metricName, metricLabels, skipStatus := extractMetrics(actionJSON)
configJSON := makeActionJSON(actionJSON)

_, config := info.Factory()
if err := DecodeConfig(config, configJSON); err != nil {
logger.Fatalf("can't unmarshal config for %s action in pipeline %q: %s", info.Type, p.Name, err.Error())
return fmt.Errorf("can't unmarshal config for %s action: %s", info.Type, err.Error())
}

err = cfg.Parse(config, values)
if err != nil {
logger.Fatalf("wrong config for %q action in pipeline %q: %s", info.Type, p.Name, err.Error())
return fmt.Errorf("wrong config for %q action: %s", info.Type, err.Error())
}

infoCopy := *info
Expand All @@ -196,6 +206,7 @@ func (f *FileD) setupAction(p *pipeline.Pipeline, index int, t string, actionJSO
MetricSkipStatus: skipStatus,
MatchInvert: matchInvert,
})
return nil
}

func (f *FileD) setupOutput(p *pipeline.Pipeline, pipelineConfig *cfg.PipelineConfig, values map[string]int) error {
Expand Down Expand Up @@ -232,7 +243,10 @@ func (f *FileD) getStaticInfo(pipelineConfig *cfg.PipelineConfig, pluginKind pip
return nil, fmt.Errorf("%s doesn't have type", pluginKind)
}
logger.Infof("creating %s with type %q", pluginKind, t)
info := f.plugins.Get(pluginKind, t)
info, err := f.plugins.Get(pluginKind, t)
if err != nil {
return nil, err
}
configJson, err := configJSON.Encode()
if err != nil {
logger.Panicf("can't create config json for %s", t)
Expand Down
17 changes: 8 additions & 9 deletions fd/plugin_registry.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package fd

import (
"fmt"

"github.com/ozontech/file.d/logger"
"github.com/ozontech/file.d/pipeline"
)
Expand All @@ -13,28 +15,25 @@ type PluginRegistry struct {
plugins map[string]*pipeline.PluginStaticInfo
}

func (r *PluginRegistry) Get(kind pipeline.PluginKind, t string) *pipeline.PluginStaticInfo {
func (r *PluginRegistry) Get(kind pipeline.PluginKind, t string) (*pipeline.PluginStaticInfo, error) {
id := r.MakeID(kind, t)

info := r.plugins[id]
if info == nil {
logger.Fatalf("can't find plugin kind=%s type=%s", kind, t)
return nil
return nil, fmt.Errorf("can't find plugin kind=%s type=%s", kind, t)
}

return info
return info, nil
}

func (r *PluginRegistry) GetActionByType(t string) *pipeline.PluginStaticInfo {
func (r *PluginRegistry) GetActionByType(t string) (*pipeline.PluginStaticInfo, error) {
id := r.MakeID(pipeline.PluginKindAction, t)

info := r.plugins[id]
if info == nil {
logger.Fatalf("can't find action plugin with type %q", t)
return nil
return nil, fmt.Errorf("can't find action plugin with type %q", t)
}

return info
return info, nil
}

func (r *PluginRegistry) RegisterInput(info *pipeline.PluginStaticInfo) {
Expand Down
4 changes: 4 additions & 0 deletions pipeline/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,10 @@ func (e *Event) String() string {
return fmt.Sprintf("kind=%s, action=%d, source=%d/%s, stream=%s, stage=%s, json=%s", e.kindStr(), e.action.Load(), e.SourceID, e.SourceName, e.streamName, e.stageStr(), e.Root.EncodeToString())
}

func (e *Event) Action() int {
return int(e.action.Load())
}

// channels are slower than this implementation by ~20%
type eventPool struct {
capacity int
Expand Down
5 changes: 2 additions & 3 deletions pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ type InputPluginController interface {
type ActionPluginController interface {
Commit(event *Event) // commit offset of held event and skip further processing
Propagate(event *Event) // throw held event back to pipeline
ActionTypeByIndex(idx int) (string, bool)
}

type OutputPluginController interface {
Expand Down Expand Up @@ -139,11 +140,9 @@ type Settings struct {
}

// New creates new pipeline. Consider using `SetupHTTPHandlers` next.
func New(name string, settings *Settings, registry *prometheus.Registry) *Pipeline {
func New(name string, settings *Settings, registry *prometheus.Registry, lg *zap.SugaredLogger) *Pipeline {
metricCtl := metric.New("pipeline_"+name, registry)

lg := logger.Instance.Named(name)

pipeline := &Pipeline{
Name: name,
logger: lg,
Expand Down
5 changes: 3 additions & 2 deletions pipeline/pipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/ozontech/file.d/plugin/input/fake"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
)

func getFakeInputInfo() *pipeline.InputPluginInfo {
Expand Down Expand Up @@ -36,7 +37,7 @@ func TestInUnparsableMessages(t *testing.T) {
sourceID := pipeline.SourceID(3<<16 + int(10))

t.Run(name, func(t *testing.T) {
pipe := pipeline.New("test_pipeline", pipelineSettings, prometheus.NewRegistry())
pipe := pipeline.New("test_pipeline", pipelineSettings, prometheus.NewRegistry(), zap.NewNop().Sugar())

pipe.SetInput(getFakeInputInfo())

Expand Down Expand Up @@ -112,7 +113,7 @@ func TestInInvalidMessages(t *testing.T) {

for _, tCase := range cases {
t.Run(tCase.name, func(t *testing.T) {
pipe := pipeline.New("test_pipeline", tCase.pipelineSettings, prometheus.NewRegistry())
pipe := pipeline.New("test_pipeline", tCase.pipelineSettings, prometheus.NewRegistry(), zap.NewNop().Sugar())

pipe.SetInput(getFakeInputInfo())

Expand Down
3 changes: 2 additions & 1 deletion pipeline/pipeline_whitebox_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,15 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/assert"
"go.uber.org/atomic"
"go.uber.org/zap"
)

func TestPipeline_streamEvent(t *testing.T) {
settings := &Settings{
Capacity: 5,
Decoder: "json",
}
p := New("test", settings, prometheus.NewRegistry())
p := New("test", settings, prometheus.NewRegistry(), zap.NewNop().Sugar())

streamID := StreamID(123123)
procs := int32(7)
Expand Down
7 changes: 7 additions & 0 deletions pipeline/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,3 +349,10 @@ func (p *processor) Propagate(event *Event) {
func (p *processor) RecoverFromPanic() {
p.recoverFromPanic()
}

func (p *processor) ActionTypeByIndex(idx int) (string, bool) {
if idx <= 0 || idx >= len(p.actionInfos) {
return "", false
}
return p.actionInfos[idx].Type, true
}
26 changes: 26 additions & 0 deletions playground/fakeclock.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package playground

import (
"time"

"go.uber.org/zap/zapcore"
)

type ZeroClock struct {
start time.Time
}

func NewZeroClock(now time.Time) *ZeroClock {
return &ZeroClock{start: now}
}

var _ zapcore.Clock = ZeroClock{}

func (z ZeroClock) Now() time.Time {
diff := time.Since(z.start)
return time.Time{}.Add(diff)
}

func (z ZeroClock) NewTicker(_ time.Duration) *time.Ticker {
return new(time.Ticker)
}
Loading

0 comments on commit d09a42f

Please sign in to comment.