Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Processor\Script] Add option to prevent creation of new sessions #40692

Merged
merged 5 commits into from
Sep 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Enable early event encoding in the Elasticsearch output, improving cpu and memory use {pull}38572[38572]
- The environment variable `BEATS_ADD_CLOUD_METADATA_PROVIDERS` overrides configured/default `add_cloud_metadata` providers {pull}38669[38669]
- When running under Elastic-Agent Kafka output allows dynamic topic in `topic` field {pull}40415[40415]
- The script processor has a new configuration option that only uses the cached javascript sessions and prevents the creation of new javascript sessions.

*Auditbeat*

Expand Down
22 changes: 12 additions & 10 deletions libbeat/processors/script/javascript/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,15 @@ import (

// Config defines the Javascript source files to use for the processor.
type Config struct {
Tag string `config:"tag"` // Processor ID for debug and metrics.
Source string `config:"source"` // Inline script to execute.
File string `config:"file"` // Source file.
Files []string `config:"files"` // Multiple source files.
Params map[string]interface{} `config:"params"` // Parameters to pass to script.
Timeout time.Duration `config:"timeout" validate:"min=0"` // Execution timeout.
TagOnException string `config:"tag_on_exception"` // Tag to add to events when an exception happens.
MaxCachedSessions int `config:"max_cached_sessions" validate:"min=0"` // Max. number of cached VM sessions.
Tag string `config:"tag"` // Processor ID for debug and metrics.
Source string `config:"source"` // Inline script to execute.
File string `config:"file"` // Source file.
Files []string `config:"files"` // Multiple source files.
Params map[string]interface{} `config:"params"` // Parameters to pass to script.
Timeout time.Duration `config:"timeout" validate:"min=0"` // Execution timeout.
TagOnException string `config:"tag_on_exception"` // Tag to add to events when an exception happens.
MaxCachedSessions int `config:"max_cached_sessions" validate:"min=0"` // Max. number of cached VM sessions.
OnlyCachedSessions bool `config:"only_cached_sessions"` // Only use cached VM sessions.
}

// Validate returns an error if one (and only one) option is not set.
Expand All @@ -57,7 +58,8 @@ func (c Config) Validate() error {

func defaultConfig() Config {
return Config{
TagOnException: "_js_exception",
MaxCachedSessions: 4,
TagOnException: "_js_exception",
MaxCachedSessions: 4,
OnlyCachedSessions: false,
}
}
31 changes: 23 additions & 8 deletions libbeat/processors/script/javascript/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func newSession(p *goja.Program, conf Config, test bool) (*session, error) {
// Measure load times
start := time.Now()
defer func() {
took := time.Now().Sub(start)
took := time.Since(start)
logger.Debugf("Load of javascript pipeline took %v", took)
}()
// Setup JS runtime.
Expand Down Expand Up @@ -217,9 +217,9 @@ func (s *session) runProcessFunc(b *beat.Event) (out *beat.Event, err error) {
}
err = fmt.Errorf("unexpected panic in javascript processor: %v", r)
if s.tagOnException != "" {
mapstr.AddTags(b.Fields, []string{s.tagOnException})
_ = mapstr.AddTags(b.Fields, []string{s.tagOnException})
}
appendString(b.Fields, "error.message", err.Error(), false)
_ = appendString(b.Fields, "error.message", err.Error(), false)
}
}()

Expand All @@ -238,9 +238,9 @@ func (s *session) runProcessFunc(b *beat.Event) (out *beat.Event, err error) {

if _, err = s.processFunc(goja.Undefined(), s.evt.JSObject()); err != nil {
if s.tagOnException != "" {
mapstr.AddTags(b.Fields, []string{s.tagOnException})
_ = mapstr.AddTags(b.Fields, []string{s.tagOnException})
}
appendString(b.Fields, "error.message", err.Error(), false)
_ = appendString(b.Fields, "error.message", err.Error(), false)
return b, fmt.Errorf("failed in process function: %w", err)
}

Expand Down Expand Up @@ -273,8 +273,9 @@ func init() {
}

type sessionPool struct {
New func() *session
C chan *session
New func() *session
C chan *session
NewSessionsAllowed bool
}

func newSessionPool(p *goja.Program, c Config) (*sessionPool, error) {
Expand All @@ -288,14 +289,28 @@ func newSessionPool(p *goja.Program, c Config) (*sessionPool, error) {
s, _ := newSession(p, c, false)
return s
},
C: make(chan *session, c.MaxCachedSessions),
C: make(chan *session, c.MaxCachedSessions),
NewSessionsAllowed: !c.OnlyCachedSessions,
strawgate marked this conversation as resolved.
Show resolved Hide resolved
}
pool.Put(s)

// If we are not allowed to create new sessions, pre-cache requested sessions
if !pool.NewSessionsAllowed {
for i := 0; i < c.MaxCachedSessions-1; i++ {
pool.Put(pool.New())
}
}

return &pool, nil
}

func (p *sessionPool) Get() *session {

if !p.NewSessionsAllowed {
return <-p.C
}
strawgate marked this conversation as resolved.
Show resolved Hide resolved

// Try to get a session from the pool, if none is available, create a new one
select {
case s := <-p.C:
return s
Expand Down
Loading