From 309d772bc95b96cb55e1df11dd1c99afc734c3e2 Mon Sep 17 00:00:00 2001 From: Dmitry Romanov Date: Tue, 17 Sep 2024 13:18:05 +0700 Subject: [PATCH] parallel process events in spawn --- pipeline/processor.go | 30 ++++++++++++++++++++++++------ plugin/action/split/split_test.go | 14 ++++++++++++-- 2 files changed, 36 insertions(+), 8 deletions(-) diff --git a/pipeline/processor.go b/pipeline/processor.go index 0f04ac350..d2118ff54 100644 --- a/pipeline/processor.go +++ b/pipeline/processor.go @@ -1,6 +1,8 @@ package pipeline import ( + "sync" + "github.com/ozontech/file.d/logger" insaneJSON "github.com/vitkovskii/insane-json" "go.uber.org/atomic" @@ -397,6 +399,9 @@ func (p *processor) Spawn(parent *Event, nodes []*insaneJSON.Node) { parent.SetChildParentKind() nextActionIdx := parent.action + 1 + wg := &sync.WaitGroup{} + results := make(chan *Event) + for _, node := range nodes { // we can't reuse parent event (using insaneJSON.Root{Node: child} // because of nil decoder @@ -409,12 +414,25 @@ func (p *processor) Spawn(parent *Event, nodes []*insaneJSON.Node) { child.SetChildKind() child.action = nextActionIdx - ok, _ := p.doActions(child) - if ok { - child.stage = eventStageOutput - p.output.Out(child) - } - child.Root.ReleaseBufMem() + wg.Add(1) + go func(child *Event) { + defer wg.Done() + ok, _ := p.doActions(child) + if ok { + results <- child + } + }(child) + } + + go func() { + wg.Wait() + close(results) + }() + + for child := range results { + child.stage = eventStageOutput + p.output.Out(child) + child.Root.ReleaseMem() } if p.busyActionsTotal == 0 { diff --git a/plugin/action/split/split_test.go b/plugin/action/split/split_test.go index 90ff483a6..ff8157574 100644 --- a/plugin/action/split/split_test.go +++ b/plugin/action/split/split_test.go @@ -1,6 +1,7 @@ package split import ( + "sort" "strings" "sync" "testing" @@ -49,8 +50,12 @@ func TestPlugin_Do(t *testing.T) { wg.Wait() p.Stop() + sort.Strings(splitted) require.Equal(t, children, len(splitted)) - require.Equal(t, []string{"go", "rust", "c++", "python", "ruby", "js"}, splitted) + + expected := []string{"go", "rust", "c++", "python", "ruby", "js"} + sort.Strings(expected) + require.Equal(t, expected, splitted) } func TestPlugin_DoArray(t *testing.T) { @@ -88,6 +93,11 @@ func TestPlugin_DoArray(t *testing.T) { wg.Wait() p.Stop() + sort.Strings(splitted) require.Equal(t, children, len(splitted)) - require.Equal(t, []string{"go", "rust", "c++"}, splitted) + + expected := []string{"go", "rust", "c++"} + sort.Strings(expected) + + require.Equal(t, expected, splitted) }