Skip to content

Commit

Permalink
parallel process events in spawn
Browse files Browse the repository at this point in the history
  • Loading branch information
DmitryRomanov committed Sep 18, 2024
1 parent 9f57de9 commit 309d772
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 8 deletions.
30 changes: 24 additions & 6 deletions pipeline/processor.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package pipeline

import (
"sync"

"github.com/ozontech/file.d/logger"
insaneJSON "github.com/vitkovskii/insane-json"
"go.uber.org/atomic"
Expand Down Expand Up @@ -397,6 +399,9 @@ func (p *processor) Spawn(parent *Event, nodes []*insaneJSON.Node) {
parent.SetChildParentKind()
nextActionIdx := parent.action + 1

wg := &sync.WaitGroup{}
results := make(chan *Event)

for _, node := range nodes {
// we can't reuse parent event (using insaneJSON.Root{Node: child}
// because of nil decoder
Expand All @@ -409,12 +414,25 @@ func (p *processor) Spawn(parent *Event, nodes []*insaneJSON.Node) {
child.SetChildKind()
child.action = nextActionIdx

ok, _ := p.doActions(child)
if ok {
child.stage = eventStageOutput
p.output.Out(child)
}
child.Root.ReleaseBufMem()
wg.Add(1)
go func(child *Event) {
defer wg.Done()
ok, _ := p.doActions(child)
if ok {
results <- child
}
}(child)
}

go func() {
wg.Wait()
close(results)
}()

for child := range results {
child.stage = eventStageOutput
p.output.Out(child)
child.Root.ReleaseMem()
}

if p.busyActionsTotal == 0 {
Expand Down
14 changes: 12 additions & 2 deletions plugin/action/split/split_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package split

import (
"sort"
"strings"
"sync"
"testing"
Expand Down Expand Up @@ -49,8 +50,12 @@ func TestPlugin_Do(t *testing.T) {
wg.Wait()
p.Stop()

sort.Strings(splitted)
require.Equal(t, children, len(splitted))
require.Equal(t, []string{"go", "rust", "c++", "python", "ruby", "js"}, splitted)

expected := []string{"go", "rust", "c++", "python", "ruby", "js"}
sort.Strings(expected)
require.Equal(t, expected, splitted)
}

func TestPlugin_DoArray(t *testing.T) {
Expand Down Expand Up @@ -88,6 +93,11 @@ func TestPlugin_DoArray(t *testing.T) {
wg.Wait()
p.Stop()

sort.Strings(splitted)
require.Equal(t, children, len(splitted))
require.Equal(t, []string{"go", "rust", "c++"}, splitted)

expected := []string{"go", "rust", "c++"}
sort.Strings(expected)

require.Equal(t, expected, splitted)
}

0 comments on commit 309d772

Please sign in to comment.