Skip to content

Commit

Permalink
[Processor\Script] Add option to prevent creation of new sessions (#4…
Browse files Browse the repository at this point in the history
…0692) (#40793)

* Add option to prevent new session creation and pre-create cached sessions

(cherry picked from commit 3f3b988)

Co-authored-by: William Easton <[email protected]>
  • Loading branch information
mergify[bot] and strawgate authored Sep 13, 2024
1 parent ef04f1c commit 305f1e1
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 18 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,8 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- elasticsearch output now supports `idle_connection_timeout`. {issue}35616[35615] {pull}36843[36843]
- Enable early event encoding in the Elasticsearch output, improving cpu and memory use {pull}38572[38572]
- The environment variable `BEATS_ADD_CLOUD_METADATA_PROVIDERS` overrides configured/default `add_cloud_metadata` providers {pull}38669[38669]
- When running under Elastic-Agent Kafka output allows dynamic topic in `topic` field {pull}40415[40415]
- The script processor has a new configuration option that only uses the cached javascript sessions and prevents the creation of new javascript sessions.

*Auditbeat*

Expand Down
22 changes: 12 additions & 10 deletions libbeat/processors/script/javascript/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,15 @@ import (

// Config defines the Javascript source files to use for the processor.
type Config struct {
Tag string `config:"tag"` // Processor ID for debug and metrics.
Source string `config:"source"` // Inline script to execute.
File string `config:"file"` // Source file.
Files []string `config:"files"` // Multiple source files.
Params map[string]interface{} `config:"params"` // Parameters to pass to script.
Timeout time.Duration `config:"timeout" validate:"min=0"` // Execution timeout.
TagOnException string `config:"tag_on_exception"` // Tag to add to events when an exception happens.
MaxCachedSessions int `config:"max_cached_sessions" validate:"min=0"` // Max. number of cached VM sessions.
Tag string `config:"tag"` // Processor ID for debug and metrics.
Source string `config:"source"` // Inline script to execute.
File string `config:"file"` // Source file.
Files []string `config:"files"` // Multiple source files.
Params map[string]interface{} `config:"params"` // Parameters to pass to script.
Timeout time.Duration `config:"timeout" validate:"min=0"` // Execution timeout.
TagOnException string `config:"tag_on_exception"` // Tag to add to events when an exception happens.
MaxCachedSessions int `config:"max_cached_sessions" validate:"min=0"` // Max. number of cached VM sessions.
OnlyCachedSessions bool `config:"only_cached_sessions"` // Only use cached VM sessions.
}

// Validate returns an error if one (and only one) option is not set.
Expand All @@ -57,7 +58,8 @@ func (c Config) Validate() error {

func defaultConfig() Config {
return Config{
TagOnException: "_js_exception",
MaxCachedSessions: 4,
TagOnException: "_js_exception",
MaxCachedSessions: 4,
OnlyCachedSessions: false,
}
}
31 changes: 23 additions & 8 deletions libbeat/processors/script/javascript/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func newSession(p *goja.Program, conf Config, test bool) (*session, error) {
// Measure load times
start := time.Now()
defer func() {
took := time.Now().Sub(start)
took := time.Since(start)
logger.Debugf("Load of javascript pipeline took %v", took)
}()
// Setup JS runtime.
Expand Down Expand Up @@ -217,9 +217,9 @@ func (s *session) runProcessFunc(b *beat.Event) (out *beat.Event, err error) {
}
err = fmt.Errorf("unexpected panic in javascript processor: %v", r)
if s.tagOnException != "" {
mapstr.AddTags(b.Fields, []string{s.tagOnException})
_ = mapstr.AddTags(b.Fields, []string{s.tagOnException})
}
appendString(b.Fields, "error.message", err.Error(), false)
_ = appendString(b.Fields, "error.message", err.Error(), false)
}
}()

Expand All @@ -238,9 +238,9 @@ func (s *session) runProcessFunc(b *beat.Event) (out *beat.Event, err error) {

if _, err = s.processFunc(goja.Undefined(), s.evt.JSObject()); err != nil {
if s.tagOnException != "" {
mapstr.AddTags(b.Fields, []string{s.tagOnException})
_ = mapstr.AddTags(b.Fields, []string{s.tagOnException})
}
appendString(b.Fields, "error.message", err.Error(), false)
_ = appendString(b.Fields, "error.message", err.Error(), false)
return b, fmt.Errorf("failed in process function: %w", err)
}

Expand Down Expand Up @@ -273,8 +273,9 @@ func init() {
}

type sessionPool struct {
New func() *session
C chan *session
New func() *session
C chan *session
NewSessionsAllowed bool
}

func newSessionPool(p *goja.Program, c Config) (*sessionPool, error) {
Expand All @@ -288,14 +289,28 @@ func newSessionPool(p *goja.Program, c Config) (*sessionPool, error) {
s, _ := newSession(p, c, false)
return s
},
C: make(chan *session, c.MaxCachedSessions),
C: make(chan *session, c.MaxCachedSessions),
NewSessionsAllowed: !c.OnlyCachedSessions,
}
pool.Put(s)

// If we are not allowed to create new sessions, pre-cache requested sessions
if !pool.NewSessionsAllowed {
for i := 0; i < c.MaxCachedSessions-1; i++ {
pool.Put(pool.New())
}
}

return &pool, nil
}

func (p *sessionPool) Get() *session {

if !p.NewSessionsAllowed {
return <-p.C
}

// Try to get a session from the pool, if none is available, create a new one
select {
case s := <-p.C:
return s
Expand Down

0 comments on commit 305f1e1

Please sign in to comment.