diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 0a1538c9ad5..5761e0a9340 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -138,6 +138,8 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - elasticsearch output now supports `idle_connection_timeout`. {issue}35616[35615] {pull}36843[36843] - Enable early event encoding in the Elasticsearch output, improving cpu and memory use {pull}38572[38572] - The environment variable `BEATS_ADD_CLOUD_METADATA_PROVIDERS` overrides configured/default `add_cloud_metadata` providers {pull}38669[38669] +- When running under Elastic-Agent Kafka output allows dynamic topic in `topic` field {pull}40415[40415] +- The script processor has a new configuration option that only uses the cached javascript sessions and prevents the creation of new javascript sessions. *Auditbeat* diff --git a/libbeat/processors/script/javascript/config.go b/libbeat/processors/script/javascript/config.go index 03415a96d1b..4074ab3f974 100644 --- a/libbeat/processors/script/javascript/config.go +++ b/libbeat/processors/script/javascript/config.go @@ -24,14 +24,15 @@ import ( // Config defines the Javascript source files to use for the processor. type Config struct { - Tag string `config:"tag"` // Processor ID for debug and metrics. - Source string `config:"source"` // Inline script to execute. - File string `config:"file"` // Source file. - Files []string `config:"files"` // Multiple source files. - Params map[string]interface{} `config:"params"` // Parameters to pass to script. - Timeout time.Duration `config:"timeout" validate:"min=0"` // Execution timeout. - TagOnException string `config:"tag_on_exception"` // Tag to add to events when an exception happens. - MaxCachedSessions int `config:"max_cached_sessions" validate:"min=0"` // Max. number of cached VM sessions. + Tag string `config:"tag"` // Processor ID for debug and metrics. + Source string `config:"source"` // Inline script to execute. + File string `config:"file"` // Source file. + Files []string `config:"files"` // Multiple source files. + Params map[string]interface{} `config:"params"` // Parameters to pass to script. + Timeout time.Duration `config:"timeout" validate:"min=0"` // Execution timeout. + TagOnException string `config:"tag_on_exception"` // Tag to add to events when an exception happens. + MaxCachedSessions int `config:"max_cached_sessions" validate:"min=0"` // Max. number of cached VM sessions. + OnlyCachedSessions bool `config:"only_cached_sessions"` // Only use cached VM sessions. } // Validate returns an error if one (and only one) option is not set. @@ -57,7 +58,8 @@ func (c Config) Validate() error { func defaultConfig() Config { return Config{ - TagOnException: "_js_exception", - MaxCachedSessions: 4, + TagOnException: "_js_exception", + MaxCachedSessions: 4, + OnlyCachedSessions: false, } } diff --git a/libbeat/processors/script/javascript/session.go b/libbeat/processors/script/javascript/session.go index 5b08e7d6052..94e344d4e22 100644 --- a/libbeat/processors/script/javascript/session.go +++ b/libbeat/processors/script/javascript/session.go @@ -91,7 +91,7 @@ func newSession(p *goja.Program, conf Config, test bool) (*session, error) { // Measure load times start := time.Now() defer func() { - took := time.Now().Sub(start) + took := time.Since(start) logger.Debugf("Load of javascript pipeline took %v", took) }() // Setup JS runtime. @@ -217,9 +217,9 @@ func (s *session) runProcessFunc(b *beat.Event) (out *beat.Event, err error) { } err = fmt.Errorf("unexpected panic in javascript processor: %v", r) if s.tagOnException != "" { - mapstr.AddTags(b.Fields, []string{s.tagOnException}) + _ = mapstr.AddTags(b.Fields, []string{s.tagOnException}) } - appendString(b.Fields, "error.message", err.Error(), false) + _ = appendString(b.Fields, "error.message", err.Error(), false) } }() @@ -238,9 +238,9 @@ func (s *session) runProcessFunc(b *beat.Event) (out *beat.Event, err error) { if _, err = s.processFunc(goja.Undefined(), s.evt.JSObject()); err != nil { if s.tagOnException != "" { - mapstr.AddTags(b.Fields, []string{s.tagOnException}) + _ = mapstr.AddTags(b.Fields, []string{s.tagOnException}) } - appendString(b.Fields, "error.message", err.Error(), false) + _ = appendString(b.Fields, "error.message", err.Error(), false) return b, fmt.Errorf("failed in process function: %w", err) } @@ -273,8 +273,9 @@ func init() { } type sessionPool struct { - New func() *session - C chan *session + New func() *session + C chan *session + NewSessionsAllowed bool } func newSessionPool(p *goja.Program, c Config) (*sessionPool, error) { @@ -288,14 +289,28 @@ func newSessionPool(p *goja.Program, c Config) (*sessionPool, error) { s, _ := newSession(p, c, false) return s }, - C: make(chan *session, c.MaxCachedSessions), + C: make(chan *session, c.MaxCachedSessions), + NewSessionsAllowed: !c.OnlyCachedSessions, } pool.Put(s) + // If we are not allowed to create new sessions, pre-cache requested sessions + if !pool.NewSessionsAllowed { + for i := 0; i < c.MaxCachedSessions-1; i++ { + pool.Put(pool.New()) + } + } + return &pool, nil } func (p *sessionPool) Get() *session { + + if !p.NewSessionsAllowed { + return <-p.C + } + + // Try to get a session from the pool, if none is available, create a new one select { case s := <-p.C: return s