diff --git a/heartbeat/hbtest/hbtestutil.go b/heartbeat/hbtest/hbtestutil.go index 86c1e4a34d2..e73e1efe78f 100644 --- a/heartbeat/hbtest/hbtestutil.go +++ b/heartbeat/hbtest/hbtestutil.go @@ -40,6 +40,7 @@ import ( "github.com/elastic/beats/v7/heartbeat/ecserr" "github.com/elastic/beats/v7/heartbeat/monitors/active/dialchain/tlsmeta" "github.com/elastic/beats/v7/heartbeat/monitors/wrappers/summarizer/summarizertesthelper" + "github.com/elastic/beats/v7/heartbeat/monitors/wrappers/wraputil" "github.com/elastic/beats/v7/heartbeat/hbtestllext" @@ -49,7 +50,6 @@ import ( "github.com/elastic/go-lookslike/isdef" "github.com/elastic/go-lookslike/validator" - "github.com/elastic/beats/v7/heartbeat/monitors/wrappers" "github.com/elastic/beats/v7/libbeat/common/x509util" ) @@ -172,6 +172,7 @@ func BaseChecks(ip string, status string, typ string) validator.Validator { } return lookslike.Compose( + hbtestllext.MaybeHasEventType, lookslike.MustCompile(map[string]interface{}{ "monitor": map[string]interface{}{ "ip": ipCheck, @@ -223,8 +224,10 @@ func SimpleURLChecks(t *testing.T, scheme string, host string, port uint16) vali // URLChecks returns a validator for the given URL's fields func URLChecks(t *testing.T, u *url.URL) validator.Validator { + t.Helper() + require.NotNil(t, u) return lookslike.MustCompile(map[string]interface{}{ - "url": wrappers.URLFields(u), + "url": wraputil.URLFields(u), }) } diff --git a/heartbeat/hbtestllext/validators.go b/heartbeat/hbtestllext/validators.go index 23a9df5d5cf..a7c637c27b0 100644 --- a/heartbeat/hbtestllext/validators.go +++ b/heartbeat/hbtestllext/validators.go @@ -19,6 +19,7 @@ package hbtestllext import ( "github.com/elastic/go-lookslike" + "github.com/elastic/go-lookslike/isdef" ) // MonitorTimespanValidator is tests for the `next_run` and `next_run_in.us` keys. @@ -30,3 +31,14 @@ var MonitorTimespanValidator = lookslike.MustCompile(map[string]interface{}{ }, }, }) + +var MaybeHasEventType = lookslike.MustCompile(map[string]interface{}{ + "event": map[string]interface{}{ + "type": isdef.Optional(isdef.IsNonEmptyString), + }, + "synthetics.type": isdef.Optional(isdef.IsNonEmptyString), +}) + +var MaybeHasDuration = lookslike.MustCompile(map[string]interface{}{ + "monitor.duration.us": IsInt64, +}) diff --git a/heartbeat/look/look.go b/heartbeat/look/look.go index 75d23b973a1..39e92b0b629 100644 --- a/heartbeat/look/look.go +++ b/heartbeat/look/look.go @@ -31,16 +31,21 @@ import ( // RTT formats a round-trip-time given as time.Duration into an // event field. The duration is stored in `{"us": rtt}`. func RTT(rtt time.Duration) mapstr.M { - if rtt < 0 { - rtt = 0 - } - return mapstr.M{ // cast to int64 since a go duration is a nano, but we want micros // This makes the types less confusing because other wise the duration // we get back has the wrong unit - "us": rtt.Microseconds(), + "us": RTTMS(rtt), + } +} + +// RTTMS returns the given time.Duration as an int64 in microseconds, with a value of 0 +// if input is negative. +func RTTMS(rtt time.Duration) int64 { + if rtt < 0 { + return 0 } + return rtt.Microseconds() } // Reason formats an error into an error event field. diff --git a/heartbeat/monitors/active/http/http.go b/heartbeat/monitors/active/http/http.go index acac759f8e0..ad9a9df98c0 100644 --- a/heartbeat/monitors/active/http/http.go +++ b/heartbeat/monitors/active/http/http.go @@ -23,11 +23,11 @@ import ( "net/url" "github.com/elastic/beats/v7/heartbeat/monitors/plugin" + "github.com/elastic/beats/v7/heartbeat/monitors/wrappers/wraputil" "github.com/elastic/beats/v7/libbeat/version" conf "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/beats/v7/heartbeat/monitors/jobs" - "github.com/elastic/beats/v7/heartbeat/monitors/wrappers" "github.com/elastic/elastic-agent-libs/transport/httpcommon" "github.com/elastic/elastic-agent-libs/transport/tlscommon" "github.com/elastic/elastic-agent-libs/useragent" @@ -116,7 +116,7 @@ func create( // Assign any execution errors to the error field and // assign the url field - js[i] = wrappers.WithURLField(u, job) + js[i] = wraputil.WithURLField(u, job) } return plugin.Plugin{Jobs: js, Endpoints: len(config.Hosts)}, nil diff --git a/heartbeat/monitors/active/http/http_test.go b/heartbeat/monitors/active/http/http_test.go index 78a2f24599c..20575210ac9 100644 --- a/heartbeat/monitors/active/http/http_test.go +++ b/heartbeat/monitors/active/http/http_test.go @@ -52,6 +52,7 @@ import ( "github.com/elastic/beats/v7/heartbeat/monitors/jobs" "github.com/elastic/beats/v7/heartbeat/monitors/stdfields" "github.com/elastic/beats/v7/heartbeat/monitors/wrappers" + "github.com/elastic/beats/v7/heartbeat/monitors/wrappers/wraputil" "github.com/elastic/beats/v7/heartbeat/scheduler/schedule" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common/file" @@ -110,7 +111,7 @@ func checkServer(t *testing.T, handlerFunc http.HandlerFunc, useUrls bool) (*htt func urlChecks(urlStr string) validator.Validator { u, _ := url.Parse(urlStr) return lookslike.MustCompile(map[string]interface{}{ - "url": wrappers.URLFields(u), + "url": wraputil.URLFields(u), }) } diff --git a/heartbeat/monitors/active/icmp/icmp.go b/heartbeat/monitors/active/icmp/icmp.go index 19831407ba7..5bb3504014a 100644 --- a/heartbeat/monitors/active/icmp/icmp.go +++ b/heartbeat/monitors/active/icmp/icmp.go @@ -23,6 +23,7 @@ import ( "net/url" "github.com/elastic/beats/v7/heartbeat/monitors/plugin" + "github.com/elastic/beats/v7/heartbeat/monitors/wrappers/wraputil" conf "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/mapstr" @@ -30,7 +31,6 @@ import ( "github.com/elastic/beats/v7/heartbeat/look" "github.com/elastic/beats/v7/heartbeat/monitors" "github.com/elastic/beats/v7/heartbeat/monitors/jobs" - "github.com/elastic/beats/v7/heartbeat/monitors/wrappers" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/elastic-agent-libs/logp" ) @@ -107,7 +107,7 @@ func (jf *jobFactory) makePlugin() (plugin2 plugin.Plugin, err error) { return plugin.Plugin{}, err } - j = append(j, wrappers.WithURLField(u, job)) + j = append(j, wraputil.WithURLField(u, job)) } return plugin.Plugin{Jobs: j, Endpoints: len(jf.config.Hosts)}, nil diff --git a/heartbeat/monitors/active/tcp/tcp.go b/heartbeat/monitors/active/tcp/tcp.go index 5fc3400c30d..57305203b3a 100644 --- a/heartbeat/monitors/active/tcp/tcp.go +++ b/heartbeat/monitors/active/tcp/tcp.go @@ -31,7 +31,7 @@ import ( "github.com/elastic/beats/v7/heartbeat/monitors/active/dialchain/tlsmeta" "github.com/elastic/beats/v7/heartbeat/monitors/jobs" "github.com/elastic/beats/v7/heartbeat/monitors/plugin" - "github.com/elastic/beats/v7/heartbeat/monitors/wrappers" + "github.com/elastic/beats/v7/heartbeat/monitors/wrappers/wraputil" "github.com/elastic/beats/v7/heartbeat/reason" "github.com/elastic/beats/v7/libbeat/beat" conf "github.com/elastic/elastic-agent-libs/config" @@ -130,7 +130,7 @@ func (jf *jobFactory) makeJobs() ([]jobs.Job, error) { if err != nil { return nil, err } - jobs = append(jobs, wrappers.WithURLField(url, endpointJob)) + jobs = append(jobs, wraputil.WithURLField(url, endpointJob)) } } @@ -174,7 +174,7 @@ func (jf *jobFactory) makeDirectEndpointJob(endpointURL *url.URL) (jobs.Job, err // makeSocksLookupEndpointJob makes jobs that use a Socks5 proxy to perform DNS lookups func (jf *jobFactory) makeSocksLookupEndpointJob(endpointURL *url.URL) jobs.Job { - return wrappers.WithURLField(endpointURL, + return wraputil.WithURLField(endpointURL, jobs.MakeSimpleJob(func(event *beat.Event) error { hostPort := net.JoinHostPort(endpointURL.Hostname(), endpointURL.Port()) return jf.dial(event, hostPort, endpointURL) diff --git a/heartbeat/monitors/logger/logger.go b/heartbeat/monitors/logger/logger.go index d5018454aa4..734cae862a9 100644 --- a/heartbeat/monitors/logger/logger.go +++ b/heartbeat/monitors/logger/logger.go @@ -23,6 +23,7 @@ import ( "sync" "time" + "github.com/elastic/beats/v7/heartbeat/monitors/wrappers/summarizer/jobsummary" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/elastic-agent-libs/logp" ) @@ -44,6 +45,7 @@ type MonitorRunInfo struct { Duration int64 `json:"-"` Steps *int `json:"steps,omitempty"` Status string `json:"status"` + Attempt int `json:"attempt"` } func (m *MonitorRunInfo) MarshalJSON() ([]byte, error) { @@ -78,22 +80,33 @@ func extractRunInfo(event *beat.Event) (*MonitorRunInfo, error) { errors := []error{} monitorID, err := event.GetValue("monitor.id") if err != nil { - errors = append(errors, err) + errors = append(errors, fmt.Errorf("could not extract monitor.id: %w", err)) } durationUs, err := event.GetValue("monitor.duration.us") if err != nil { - errors = append(errors, err) + durationUs = int64(0) } monType, err := event.GetValue("monitor.type") if err != nil { - errors = append(errors, err) + errors = append(errors, fmt.Errorf("could not extract monitor.type: %w", err)) } status, err := event.GetValue("monitor.status") if err != nil { - errors = append(errors, err) + errors = append(errors, fmt.Errorf("could not extract monitor.status: %w", err)) + } + + jsIface, err := event.GetValue("summary") + var attempt int + if err != nil { + errors = append(errors, fmt.Errorf("could not extract summary to add attempt info: %w", err)) + } else { + js, ok := jsIface.(*jobsummary.JobSummary) + if ok && js != nil { + attempt = int(js.Attempt) + } } if len(errors) > 0 { @@ -105,6 +118,7 @@ func extractRunInfo(event *beat.Event) (*MonitorRunInfo, error) { Type: monType.(string), Duration: durationUs.(int64), Status: status.(string), + Attempt: attempt, } sc, _ := event.Meta.GetValue(META_STEP_COUNT) @@ -119,7 +133,7 @@ func extractRunInfo(event *beat.Event) (*MonitorRunInfo, error) { func LogRun(event *beat.Event) { monitor, err := extractRunInfo(event) if err != nil { - getLogger().Errorw("error gathering information to log event: ", err) + getLogger().Error(fmt.Errorf("error gathering information to log event: %w", err)) return } diff --git a/heartbeat/monitors/logger/logger_test.go b/heartbeat/monitors/logger/logger_test.go index 0e10f1bd608..183d19447fc 100644 --- a/heartbeat/monitors/logger/logger_test.go +++ b/heartbeat/monitors/logger/logger_test.go @@ -28,6 +28,7 @@ import ( "go.uber.org/zap/zaptest/observer" "github.com/elastic/beats/v7/heartbeat/eventext" + "github.com/elastic/beats/v7/heartbeat/monitors/wrappers/summarizer/jobsummary" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-libs/mapstr" @@ -47,6 +48,7 @@ func TestLogRun(t *testing.T) { "monitor.duration.us": durationUs, "monitor.type": "browser", "monitor.status": "down", + "summary": jobsummary.NewJobSummary(1, 1, "abc"), } event := beat.Event{Fields: fields} @@ -64,6 +66,7 @@ func TestLogRun(t *testing.T) { Duration: durationUs, Status: "down", Steps: &steps, + Attempt: 1, } assert.ElementsMatch(t, []zap.Field{ diff --git a/heartbeat/monitors/mocks.go b/heartbeat/monitors/mocks.go index 0a7227c9986..f8747a80400 100644 --- a/heartbeat/monitors/mocks.go +++ b/heartbeat/monitors/mocks.go @@ -195,6 +195,7 @@ func baseMockEventMonitorValidator(id string, name string, status string) valida func mockEventMonitorValidator(id string, name string) validator.Validator { return lookslike.Strict(lookslike.Compose( + hbtestllext.MaybeHasEventType, baseMockEventMonitorValidator(id, name, "up"), hbtestllext.MonitorTimespanValidator, hbtest.SummaryStateChecks(1, 0), diff --git a/heartbeat/monitors/util.go b/heartbeat/monitors/util.go index 570af6366e7..fe45e419af6 100644 --- a/heartbeat/monitors/util.go +++ b/heartbeat/monitors/util.go @@ -26,7 +26,7 @@ import ( "github.com/elastic/beats/v7/heartbeat/eventext" "github.com/elastic/beats/v7/heartbeat/look" "github.com/elastic/beats/v7/heartbeat/monitors/jobs" - "github.com/elastic/beats/v7/heartbeat/monitors/wrappers" + "github.com/elastic/beats/v7/heartbeat/monitors/wrappers/wraputil" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/elastic-agent-libs/mapstr" ) @@ -114,7 +114,7 @@ func MakeByIPJob( "monitor": mapstr.M{"ip": addr.String()}, } - return wrappers.WithFields(fields, pingFactory(addr)), nil + return wraputil.WithFields(fields, pingFactory(addr)), nil } // MakeByHostJob creates a new Job including host lookup. The pingFactory will be used to @@ -165,7 +165,7 @@ func makeByHostAnyIPJob( resolveRTT := resolveEnd.Sub(resolveStart) ipFields := resolveIPEvent(ip.String(), resolveRTT) - return wrappers.WithFields(ipFields, pingFactory(ip))(event) + return wraputil.WithFields(ipFields, pingFactory(ip))(event) } } @@ -206,7 +206,7 @@ func makeByHostAllIPJob( for i, ip := range ips { addr := &net.IPAddr{IP: ip} ipFields := resolveIPEvent(ip.String(), resolveRTT) - cont[i] = wrappers.WithFields(ipFields, pingFactory(addr)) + cont[i] = wraputil.WithFields(ipFields, pingFactory(addr)) } // Ideally we would test this invocation. This function however is really hard to to test given all the extra context it takes in // In a future refactor we could perhaps test that this in correctly invoked. diff --git a/heartbeat/monitors/wrappers/summarizer/jobsummary/jobsummary.go b/heartbeat/monitors/wrappers/summarizer/jobsummary/jobsummary.go new file mode 100644 index 00000000000..9264f33f0fa --- /dev/null +++ b/heartbeat/monitors/wrappers/summarizer/jobsummary/jobsummary.go @@ -0,0 +1,57 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package jobsummary + +import ( + "fmt" + + "github.com/elastic/beats/v7/heartbeat/monitors/wrappers/monitorstate" +) + +// JobSummary is the struct that is serialized in the `summary` field in the emitted event. +type JobSummary struct { + Attempt uint16 `json:"attempt"` + MaxAttempts uint16 `json:"max_attempts"` + FinalAttempt bool `json:"final_attempt"` + Up uint16 `json:"up"` + Down uint16 `json:"down"` + Status monitorstate.StateStatus `json:"status"` + RetryGroup string `json:"retry_group"` +} + +func NewJobSummary(attempt uint16, maxAttempts uint16, retryGroup string) *JobSummary { + if maxAttempts < 1 { + maxAttempts = 1 + } + + return &JobSummary{ + MaxAttempts: maxAttempts, + Attempt: attempt, + RetryGroup: retryGroup, + } +} + +// BumpAttempt swaps the JobSummary object's pointer for a new job summary +// that is a clone of the current one but with the Attempt field incremented. +func (js *JobSummary) BumpAttempt() { + *js = *NewJobSummary(js.Attempt+1, js.MaxAttempts, js.RetryGroup) +} + +func (js *JobSummary) String() string { + return fmt.Sprintf("", js.Status, js.Attempt, js.MaxAttempts, js.FinalAttempt, js.Up, js.Down, js.RetryGroup) +} diff --git a/heartbeat/monitors/wrappers/summarizer/plugdrop.go b/heartbeat/monitors/wrappers/summarizer/plugdrop.go new file mode 100644 index 00000000000..fff6c143bf0 --- /dev/null +++ b/heartbeat/monitors/wrappers/summarizer/plugdrop.go @@ -0,0 +1,45 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package summarizer + +import ( + "github.com/elastic/beats/v7/heartbeat/eventext" + "github.com/elastic/beats/v7/libbeat/beat" +) + +type DropBrowserExtraEvents struct{} + +func (d DropBrowserExtraEvents) EachEvent(event *beat.Event, _ error) EachEventActions { + st := synthType(event) + // Sending these events can break the kibana UI in various places + // see: https://github.com/elastic/kibana/issues/166530 + if st == "cmd/status" { + eventext.CancelEvent(event) + } + + return 0 +} + +func (d DropBrowserExtraEvents) BeforeSummary(event *beat.Event) BeforeSummaryActions { + // noop + return 0 +} + +func (d DropBrowserExtraEvents) BeforeRetry() { + // noop +} diff --git a/heartbeat/monitors/wrappers/summarizer/plugerr.go b/heartbeat/monitors/wrappers/summarizer/plugerr.go new file mode 100644 index 00000000000..1010370f520 --- /dev/null +++ b/heartbeat/monitors/wrappers/summarizer/plugerr.go @@ -0,0 +1,145 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package summarizer + +import ( + "errors" + "fmt" + + "github.com/elastic/beats/v7/heartbeat/ecserr" + "github.com/elastic/beats/v7/heartbeat/eventext" + "github.com/elastic/beats/v7/heartbeat/look" + "github.com/elastic/beats/v7/heartbeat/monitors/logger" + "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/elastic-agent-libs/mapstr" +) + +// BrowserErrPlugins handles the logic for writing the `error` field +// for browser monitors, preferentially using the journey/end event's +// error field for errors. +type BrowserErrPlugin struct { + summaryErrVal interface{} + summaryErr error + stepCount int + journeyEndRcvd bool + attempt int +} + +func NewBrowserErrPlugin() *BrowserErrPlugin { + return &BrowserErrPlugin{ + attempt: 1, + } +} + +func (esp *BrowserErrPlugin) EachEvent(event *beat.Event, eventErr error) EachEventActions { + // track these to determine if the journey + // needs an error injected due to incompleteness + st := synthType(event) + switch st { + case "step/end": + esp.stepCount++ + // track step count for error logging + // this is a bit of an awkward spot and combination of concerns, but it makes sense + eventext.SetMeta(event, logger.META_STEP_COUNT, esp.stepCount) + case "journey/end": + esp.journeyEndRcvd = true + } + + // Nothing else to do if there's no error + if eventErr == nil { + return 0 + } + + // Merge the error value into the event's "error" field + errVal := errToFieldVal(eventErr) + mergeErrVal(event, errVal) + + // If there is no error value OR this is the journey end event + // record this as the definitive error + if esp.summaryErrVal == nil || st == "journey/end" { + esp.summaryErr = eventErr + esp.summaryErrVal = errVal + } + + return DropErrEvent +} + +func (esp *BrowserErrPlugin) BeforeSummary(event *beat.Event) BeforeSummaryActions { + // If no journey end was received, make that the summary error + if !esp.journeyEndRcvd { + esp.summaryErr = fmt.Errorf("journey did not finish executing, %d steps ran (attempt: %d): %w", esp.stepCount, esp.attempt, esp.summaryErr) + esp.summaryErrVal = errToFieldVal(esp.summaryErr) + } + + if esp.summaryErrVal != nil { + mergeErrVal(event, esp.summaryErrVal) + } + + return 0 +} + +func (esp *BrowserErrPlugin) BeforeRetry() { + attempt := esp.attempt + 1 + *esp = *NewBrowserErrPlugin() + esp.attempt = attempt +} + +// LightweightErrPlugin simply takes error return values +// and maps them into the "error" field in the event, return nil +// for all events thereafter +type LightweightErrPlugin struct{} + +func NewLightweightErrPlugin() *LightweightErrPlugin { + return &LightweightErrPlugin{} +} + +func (esp *LightweightErrPlugin) EachEvent(event *beat.Event, eventErr error) EachEventActions { + if eventErr == nil { + return 0 + } + + errVal := errToFieldVal(eventErr) + mergeErrVal(event, errVal) + + return DropErrEvent +} + +func (esp *LightweightErrPlugin) BeforeSummary(event *beat.Event) BeforeSummaryActions { + return 0 +} + +func (esp *LightweightErrPlugin) BeforeRetry() { + // noop +} + +// errToFieldVal reflects on the error and returns either an *ecserr.ECSErr if possible, and a look.Reason otherwise +func errToFieldVal(eventErr error) (errVal interface{}) { + var asECS *ecserr.ECSErr + if errors.As(eventErr, &asECS) { + // Override the message of the error in the event it was wrapped + asECS.Message = eventErr.Error() + errVal = asECS + } else { + errVal = look.Reason(eventErr) + } + return errVal +} + +func mergeErrVal(event *beat.Event, errVal interface{}) { + eventext.MergeEventFields(event, mapstr.M{"error": errVal}) +} diff --git a/heartbeat/monitors/wrappers/summarizer/plugmondur.go b/heartbeat/monitors/wrappers/summarizer/plugmondur.go new file mode 100644 index 00000000000..f677e57693f --- /dev/null +++ b/heartbeat/monitors/wrappers/summarizer/plugmondur.go @@ -0,0 +1,85 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package summarizer + +import ( + "time" + + "github.com/elastic/beats/v7/heartbeat/look" + "github.com/elastic/beats/v7/libbeat/beat" +) + +// LightweightDurationPlugin handles the logic for writing the `monitor.duration.us` field +// for lightweight monitors. +type LightweightDurationPlugin struct { + startedAt *time.Time +} + +func (lwdsp *LightweightDurationPlugin) EachEvent(event *beat.Event, _ error) EachEventActions { + // Effectively only runs once, on the first event + if lwdsp.startedAt == nil { + now := time.Now() + lwdsp.startedAt = &now + } + return 0 +} + +func (lwdsp *LightweightDurationPlugin) BeforeSummary(event *beat.Event) BeforeSummaryActions { + _, _ = event.PutValue("monitor.duration.us", look.RTTMS(time.Since(*lwdsp.startedAt))) + return 0 +} + +func (lwdsp *LightweightDurationPlugin) BeforeRetry() {} + +// BrowserDurationPlugin handles the logic for writing the `monitor.duration.us` field +// for browser monitors. +type BrowserDurationPlugin struct { + startedAt *time.Time + endedAt *time.Time +} + +func (bwdsp *BrowserDurationPlugin) EachEvent(event *beat.Event, _ error) EachEventActions { + switch synthType(event) { + case "journey/start": + bwdsp.startedAt = &event.Timestamp + case "journey/end": + bwdsp.endedAt = &event.Timestamp + } + + return 0 +} + +func (bwdsp *BrowserDurationPlugin) BeforeSummary(event *beat.Event) BeforeSummaryActions { + // If we never even ran a journey, it's a zero duration + if bwdsp.startedAt == nil { + return 0 + } + + // if we never received an end event, just use the current time + if bwdsp.endedAt == nil { + now := time.Now() + bwdsp.endedAt = &now + } + + durUS := look.RTTMS(bwdsp.endedAt.Sub(*bwdsp.startedAt)) + _, _ = event.PutValue("monitor.duration.us", durUS) + + return 0 +} + +func (bwdsp *BrowserDurationPlugin) BeforeRetry() {} diff --git a/heartbeat/monitors/wrappers/summarizer/plugstatestat.go b/heartbeat/monitors/wrappers/summarizer/plugstatestat.go new file mode 100644 index 00000000000..f38c22d32ab --- /dev/null +++ b/heartbeat/monitors/wrappers/summarizer/plugstatestat.go @@ -0,0 +1,182 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package summarizer + +import ( + "fmt" + + "github.com/gofrs/uuid" + + "github.com/elastic/beats/v7/heartbeat/eventext" + "github.com/elastic/beats/v7/heartbeat/look" + "github.com/elastic/beats/v7/heartbeat/monitors/stdfields" + "github.com/elastic/beats/v7/heartbeat/monitors/wrappers/monitorstate" + "github.com/elastic/beats/v7/heartbeat/monitors/wrappers/summarizer/jobsummary" + "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/elastic-agent-libs/logp" + "github.com/elastic/elastic-agent-libs/mapstr" +) + +// StateStatusPlugin encapsulates the writing of the primary fields used by the summary, +// those being `state.*`, `status.*` , `event.type`, and `monitor.check_group` +type BrowserStateStatusPlugin struct { + cssp *commonSSP +} + +func NewBrowserStateStatusplugin(stateTracker *monitorstate.Tracker, sf stdfields.StdMonitorFields) *BrowserStateStatusPlugin { + return &BrowserStateStatusPlugin{ + cssp: newCommonSSP(stateTracker, sf), + } +} + +func (ssp *BrowserStateStatusPlugin) EachEvent(event *beat.Event, jobErr error) EachEventActions { + if jobErr != nil { + // Browser jobs only return either a single up or down + // any err will mark it as a down job + ssp.cssp.js.Down = 1 + } + ssp.cssp.BeforeEach(event, jobErr) + + return 0 +} + +func (ssp *BrowserStateStatusPlugin) BeforeSummary(event *beat.Event) BeforeSummaryActions { + if ssp.cssp.js.Down == 0 { + // Browsers don't have a prior increment of this, so set it to some + // non-zero value + ssp.cssp.js.Up = 1 + } + + res := ssp.cssp.BeforeSummary(event) + + _, _ = event.PutValue("monitor.status", string(ssp.cssp.js.Status)) + return res +} + +func (ssp *BrowserStateStatusPlugin) BeforeRetry() { + // noop +} + +// LightweightStateStatusPlugin encapsulates the writing of the primary fields used by the summary, +// those being `state.*`, `status.*` , `event.type`, and `monitor.check_group` +type LightweightStateStatusPlugin struct { + cssp *commonSSP +} + +func NewLightweightStateStatusPlugin(stateTracker *monitorstate.Tracker, sf stdfields.StdMonitorFields) *LightweightStateStatusPlugin { + return &LightweightStateStatusPlugin{ + cssp: newCommonSSP(stateTracker, sf), + } +} + +func (ssp *LightweightStateStatusPlugin) EachEvent(event *beat.Event, jobErr error) EachEventActions { + status := look.Status(jobErr) + _, _ = event.PutValue("monitor.status", status) + if !eventext.IsEventCancelled(event) { // if this event contains a status... + mss := monitorstate.StateStatus(status) + + if mss == monitorstate.StatusUp { + ssp.cssp.js.Up++ + } else { + ssp.cssp.js.Down++ + } + + } + + ssp.cssp.BeforeEach(event, jobErr) + + return 0 +} + +func (ssp *LightweightStateStatusPlugin) BeforeSummary(event *beat.Event) BeforeSummaryActions { + return ssp.cssp.BeforeSummary(event) +} + +func (ssp *LightweightStateStatusPlugin) BeforeRetry() { + // noop +} + +type commonSSP struct { + js *jobsummary.JobSummary + stateTracker *monitorstate.Tracker + sf stdfields.StdMonitorFields + checkGroup string +} + +func newCommonSSP(stateTracker *monitorstate.Tracker, sf stdfields.StdMonitorFields) *commonSSP { + uu, err := uuid.NewV1() + if err != nil { + logp.L().Errorf("could not create v1 UUID for retry group: %s", err) + } + js := jobsummary.NewJobSummary(1, sf.MaxAttempts, uu.String()) + return &commonSSP{ + js: js, + stateTracker: stateTracker, + sf: sf, + checkGroup: uu.String(), + } +} + +func (ssp *commonSSP) BeforeEach(event *beat.Event, err error) { + _, _ = event.PutValue("monitor.check_group", fmt.Sprintf("%s-%d", ssp.checkGroup, ssp.js.Attempt)) +} + +func (ssp *commonSSP) BeforeSummary(event *beat.Event) BeforeSummaryActions { + if ssp.js.Down > 0 { + ssp.js.Status = monitorstate.StatusDown + } else { + ssp.js.Status = monitorstate.StatusUp + } + + // Get the last status of this monitor, we use this later to + // determine if a retry is needed + lastStatus := ssp.stateTracker.GetCurrentStatus(ssp.sf) + + // FinalAttempt is true if no retries will occur + retry := ssp.js.Status == monitorstate.StatusDown && ssp.js.Attempt < ssp.js.MaxAttempts + ssp.js.FinalAttempt = !retry + + ms := ssp.stateTracker.RecordStatus(ssp.sf, ssp.js.Status, ssp.js.FinalAttempt) + + // dereference the pointer since the pointer is pointed at the next step + // after this + jsCopy := *ssp.js + + fields := mapstr.M{ + "event": mapstr.M{"type": "heartbeat/summary"}, + "summary": &jsCopy, + "state": ms, + } + if ssp.sf.Type == "browser" { + fields["synthetics"] = mapstr.M{"type": "heartbeat/summary"} + } + eventext.MergeEventFields(event, fields) + + if retry { + // mutate the js into the state for the next attempt + ssp.js.BumpAttempt() + } + + logp.L().Debugf("attempt info: %v == %v && %d < %d", ssp.js.Status, lastStatus, ssp.js.Attempt, ssp.js.MaxAttempts) + + if retry { + return RetryBeforeSummary + } + + return 0 +} diff --git a/heartbeat/monitors/wrappers/summarizer/plugurl.go b/heartbeat/monitors/wrappers/summarizer/plugurl.go new file mode 100644 index 00000000000..dc4394aa42a --- /dev/null +++ b/heartbeat/monitors/wrappers/summarizer/plugurl.go @@ -0,0 +1,54 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package summarizer + +import ( + "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/elastic-agent-libs/logp" + "github.com/elastic/elastic-agent-libs/mapstr" +) + +// BrowserURLPlugin handles the logic for writing the error.* fields +type BrowserURLPlugin struct { + urlFields mapstr.M +} + +func (busp *BrowserURLPlugin) EachEvent(event *beat.Event, eventErr error) EachEventActions { + if len(busp.urlFields) == 0 { + if urlFields, err := event.GetValue("url"); err == nil { + if ufMap, ok := urlFields.(mapstr.M); ok { + busp.urlFields = ufMap + } + } + } + return 0 +} + +func (busp *BrowserURLPlugin) BeforeSummary(event *beat.Event) BeforeSummaryActions { + if busp.urlFields != nil { + _, err := event.PutValue("url", busp.urlFields) + if err != nil { + logp.L().Errorf("could not set URL value for browser job: %s", err) + } + } + return 0 +} + +func (busp *BrowserURLPlugin) BeforeRetry() { + busp.urlFields = nil +} diff --git a/heartbeat/monitors/wrappers/summarizer/summarizer.go b/heartbeat/monitors/wrappers/summarizer/summarizer.go index 49d3ca9422a..9c3f1bd8abd 100644 --- a/heartbeat/monitors/wrappers/summarizer/summarizer.go +++ b/heartbeat/monitors/wrappers/summarizer/summarizer.go @@ -18,69 +18,86 @@ package summarizer import ( - "fmt" "sync" "time" - "github.com/gofrs/uuid" - - "github.com/elastic/beats/v7/heartbeat/eventext" "github.com/elastic/beats/v7/heartbeat/monitors/jobs" + "github.com/elastic/beats/v7/heartbeat/monitors/logger" "github.com/elastic/beats/v7/heartbeat/monitors/stdfields" "github.com/elastic/beats/v7/heartbeat/monitors/wrappers/monitorstate" "github.com/elastic/beats/v7/libbeat/beat" - "github.com/elastic/elastic-agent-libs/logp" - "github.com/elastic/elastic-agent-libs/mapstr" ) +// Summarizer produces summary events (with summary.* and other asssociated fields). +// It accumulates state as it processes the whole event field in order to produce +// this summary. type Summarizer struct { rootJob jobs.Job contsRemaining uint16 mtx *sync.Mutex - jobSummary *JobSummary - checkGroup string - stateTracker *monitorstate.Tracker sf stdfields.StdMonitorFields + mst *monitorstate.Tracker retryDelay time.Duration + plugins []SummarizerPlugin + startedAt time.Time } -type JobSummary struct { - Attempt uint16 `json:"attempt"` - MaxAttempts uint16 `json:"max_attempts"` - FinalAttempt bool `json:"final_attempt"` - Up uint16 `json:"up"` - Down uint16 `json:"down"` - Status monitorstate.StateStatus `json:"status"` - RetryGroup string `json:"retry_group"` +// EachEventActions is a set of options using bitmasks to inform execution after the EachEvent callback +type EachEventActions uint8 + +// DropErrEvent if will remove the error from the job return. +const DropErrEvent = 1 + +// BeforeSummaryActions is a set of options using bitmasks to inform execution after the BeforeSummary callback +type BeforeSummaryActions uint8 + +// RetryBeforeSummary will retry the job once complete. +const RetryBeforeSummary = 1 + +// SummarizerPlugin encapsulates functionality for the Summarizer that's easily expressed +// in one location. Prior to this code was strewn about a bit more and following it was +// a bit trickier. +type SummarizerPlugin interface { + // EachEvent is called on each event, and allows for the mutation of events + EachEvent(event *beat.Event, err error) EachEventActions + // BeforeSummary is run on the final (summary) event for each monitor. + BeforeSummary(event *beat.Event) BeforeSummaryActions + // BeforeRetry is called before the first EachEvent in the event of a retry + // can be used for resetting state between retries + BeforeRetry() } func NewSummarizer(rootJob jobs.Job, sf stdfields.StdMonitorFields, mst *monitorstate.Tracker) *Summarizer { - uu, err := uuid.NewV1() - if err != nil { - logp.L().Errorf("could not create v1 UUID for retry group: %s", err) - } - return &Summarizer{ + s := &Summarizer{ rootJob: rootJob, contsRemaining: 1, mtx: &sync.Mutex{}, - jobSummary: NewJobSummary(1, sf.MaxAttempts, uu.String()), - checkGroup: uu.String(), - stateTracker: mst, + mst: mst, sf: sf, - // private property, but can be overridden in tests to speed them up - retryDelay: time.Second, + retryDelay: time.Second, + startedAt: time.Now(), } + s.setupPlugins() + return s } -func NewJobSummary(attempt uint16, maxAttempts uint16, retryGroup string) *JobSummary { - if maxAttempts < 1 { - maxAttempts = 1 - } - - return &JobSummary{ - MaxAttempts: maxAttempts, - Attempt: attempt, - RetryGroup: retryGroup, +func (s *Summarizer) setupPlugins() { + // ssp must appear before Err plugin since + // it intercepts errors + if s.sf.Type == "browser" { + s.plugins = []SummarizerPlugin{ + DropBrowserExtraEvents{}, + &BrowserDurationPlugin{}, + &BrowserURLPlugin{}, + NewBrowserStateStatusplugin(s.mst, s.sf), + NewBrowserErrPlugin(), + } + } else { + s.plugins = []SummarizerPlugin{ + &LightweightDurationPlugin{}, + NewLightweightStateStatusPlugin(s.mst, s.sf), + NewLightweightErrPlugin(), + } } } @@ -89,56 +106,37 @@ func NewJobSummary(attempt uint16, maxAttempts uint16, retryGroup string) *JobSu // This adds the state and summary top level fields. func (s *Summarizer) Wrap(j jobs.Job) jobs.Job { return func(event *beat.Event) ([]jobs.Job, error) { - conts, jobErr := j(event) - - _, _ = event.PutValue("monitor.check_group", fmt.Sprintf("%s-%d", s.checkGroup, s.jobSummary.Attempt)) + conts, eventErr := j(event) s.mtx.Lock() defer s.mtx.Unlock() - js := s.jobSummary - s.contsRemaining-- // we just ran one cont, discount it // these many still need to be processed s.contsRemaining += uint16(len(conts)) - monitorStatus, err := event.GetValue("monitor.status") - if err == nil && !eventext.IsEventCancelled(event) { // if this event contains a status... - mss := monitorstate.StateStatus(monitorStatus.(string)) - - if mss == monitorstate.StatusUp { - js.Up++ - } else { - js.Down++ + for _, plugin := range s.plugins { + actions := plugin.EachEvent(event, eventErr) + if actions&DropErrEvent != 0 { + eventErr = nil } } if s.contsRemaining == 0 { - if js.Down > 0 { - js.Status = monitorstate.StatusDown - } else { - js.Status = monitorstate.StatusUp - } - - // Get the last status of this monitor, we use this later to - // determine if a retry is needed - lastStatus := s.stateTracker.GetCurrentStatus(s.sf) - - // FinalAttempt is true if no retries will occur - js.FinalAttempt = js.Status != monitorstate.StatusDown || js.Attempt >= js.MaxAttempts + var retry bool + for _, plugin := range s.plugins { + actions := plugin.BeforeSummary(event) + if actions&RetryBeforeSummary != 0 { + retry = true + } - ms := s.stateTracker.RecordStatus(s.sf, js.Status, js.FinalAttempt) - - eventext.MergeEventFields(event, mapstr.M{ - "summary": js, - "state": ms, - }) + } - logp.L().Debugf("attempt info: %v == %v && %d < %d", js.Status, lastStatus, js.Attempt, js.MaxAttempts) - if !js.FinalAttempt { - // Reset the job summary for the next attempt - // We preserve `s` across attempts - s.jobSummary = NewJobSummary(js.Attempt+1, js.MaxAttempts, js.RetryGroup) + if !retry { + // on final run emits a metric for the service when summary events are complete + logger.LogRun(event) + } else { + // Bump the job summary for the next attempt s.contsRemaining = 1 // Delay retries by 1s for two reasons: @@ -146,12 +144,14 @@ func (s *Summarizer) Wrap(j jobs.Job) jobs.Job { // that it's hard to tell the sequence in which jobs executed apart in our // kibana queries // 2. If the site error is very short 1s gives it a tiny bit of time to recover - delayedRootJob := jobs.Wrap(s.rootJob, func(j jobs.Job) jobs.Job { - return func(event *beat.Event) ([]jobs.Job, error) { - time.Sleep(s.retryDelay) - return j(event) + delayedRootJob := func(event *beat.Event) ([]jobs.Job, error) { + for _, p := range s.plugins { + p.BeforeRetry() } - }) + time.Sleep(s.retryDelay) + return s.rootJob(event) + } + conts = []jobs.Job{delayedRootJob} } } @@ -162,6 +162,6 @@ func (s *Summarizer) Wrap(j jobs.Job) jobs.Job { conts[i] = s.Wrap(cont) } - return conts, jobErr + return conts, eventErr } } diff --git a/heartbeat/monitors/wrappers/summarizer/summarizer_test.go b/heartbeat/monitors/wrappers/summarizer/summarizer_test.go index de86cd7b49a..64472eb1c9a 100644 --- a/heartbeat/monitors/wrappers/summarizer/summarizer_test.go +++ b/heartbeat/monitors/wrappers/summarizer/summarizer_test.go @@ -27,6 +27,7 @@ import ( "github.com/elastic/beats/v7/heartbeat/monitors/jobs" "github.com/elastic/beats/v7/heartbeat/monitors/stdfields" "github.com/elastic/beats/v7/heartbeat/monitors/wrappers/monitorstate" + "github.com/elastic/beats/v7/heartbeat/monitors/wrappers/summarizer/jobsummary" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/elastic-agent-libs/mapstr" ) @@ -41,6 +42,7 @@ func TestSummarizer(t *testing.T) { } } + testURL := "https://example.net" // these tests use strings to describe sequences of events tests := []struct { name string @@ -51,7 +53,9 @@ func TestSummarizer(t *testing.T) { // The expected states on each event expectedStates string // the attempt number of the given event - expectedAttempts string + expectedAttempts string + expectedSummaries int + url string }{ { "start down, transition to up", @@ -59,6 +63,8 @@ func TestSummarizer(t *testing.T) { "du", "du", "12", + 2, + testURL, }, { "start up, stay up", @@ -66,6 +72,8 @@ func TestSummarizer(t *testing.T) { "uuuuuuuu", "uuuuuuuu", "11111111", + 8, + testURL, }, { "start down, stay down", @@ -73,6 +81,8 @@ func TestSummarizer(t *testing.T) { "dddddddd", "dddddddd", "12121212", + 8, + testURL, }, { "start up - go down with one retry - thenrecover", @@ -80,6 +90,8 @@ func TestSummarizer(t *testing.T) { "udddduuu", "uuddduuu", "11212111", + 8, + testURL, }, { "start up, transient down, recover", @@ -87,6 +99,8 @@ func TestSummarizer(t *testing.T) { "uuuduuuu", "uuuuuuuu", "11112111", + 8, + testURL, }, { "start up, multiple transient down, recover", @@ -94,6 +108,8 @@ func TestSummarizer(t *testing.T) { "uuudududu", "uuuuuuuuu", "111121212", + 9, + testURL, }, { "no retries, single down", @@ -101,6 +117,8 @@ func TestSummarizer(t *testing.T) { "uuuduuuu", "uuuduuuu", "11111111", + 8, + testURL, }, } @@ -130,13 +148,15 @@ func TestSummarizer(t *testing.T) { } tracker := monitorstate.NewTracker(monitorstate.NilStateLoader, false) - sf := stdfields.StdMonitorFields{ID: "testmon", Name: "testmon", MaxAttempts: uint16(tt.maxAttempts)} + sf := stdfields.StdMonitorFields{ID: "testmon", Name: "testmon", Type: "http", MaxAttempts: uint16(tt.maxAttempts)} rcvdStatuses := "" rcvdStates := "" rcvdAttempts := "" + rcvdEvents := []*beat.Event{} + rcvdSummaries := []*jobsummary.JobSummary{} i := 0 - var lastSummary *JobSummary + var lastSummary *jobsummary.JobSummary for { s := NewSummarizer(job, sf, tracker) // Shorten retry delay to make tests run faster @@ -144,6 +164,7 @@ func TestSummarizer(t *testing.T) { wrapped := s.Wrap(job) events, _ := jobs.ExecJobAndConts(t, wrapped) for _, event := range events { + rcvdEvents = append(rcvdEvents, event) eventStatus, _ := event.GetValue("monitor.status") eventStatusStr := eventStatus.(string) rcvdStatuses += eventStatusStr[:1] @@ -154,9 +175,25 @@ func TestSummarizer(t *testing.T) { rcvdStates += "_" } summaryIface, _ := event.GetValue("summary") - summary := summaryIface.(*JobSummary) + summary := summaryIface.(*jobsummary.JobSummary) + duration, _ := event.GetValue("monitor.duration.us") + + // Ensure that only summaries have a duration + if summary != nil { + rcvdSummaries = append(rcvdSummaries, summary) + require.GreaterOrEqual(t, duration, int64(0)) + // down summaries should always have errors + if eventStatusStr == "down" { + require.NotNil(t, event.Fields["error"]) + } else { + require.Nil(t, event.Fields["error"]) + } + } else { + require.Nil(t, duration) + } if summary == nil { + // note missing summaries rcvdAttempts += "!" } else if lastSummary != nil { if summary.Attempt > 1 { @@ -165,6 +202,7 @@ func TestSummarizer(t *testing.T) { require.NotEqual(t, lastSummary.RetryGroup, summary.RetryGroup) } } + rcvdAttempts += fmt.Sprintf("%d", summary.Attempt) lastSummary = summary } @@ -176,6 +214,8 @@ func TestSummarizer(t *testing.T) { require.Equal(t, tt.statusSequence, rcvdStatuses) require.Equal(t, tt.expectedStates, rcvdStates) require.Equal(t, tt.expectedAttempts, rcvdAttempts) + require.Len(t, rcvdEvents, len(tt.statusSequence)) + require.Len(t, rcvdSummaries, tt.expectedSummaries) }) } } diff --git a/heartbeat/monitors/wrappers/summarizer/summarizertesthelper/testhelper.go b/heartbeat/monitors/wrappers/summarizer/summarizertesthelper/testhelper.go index def27bde0b0..bcea2bd803e 100644 --- a/heartbeat/monitors/wrappers/summarizer/summarizertesthelper/testhelper.go +++ b/heartbeat/monitors/wrappers/summarizer/summarizertesthelper/testhelper.go @@ -24,7 +24,8 @@ package summarizertesthelper import ( "fmt" - "github.com/elastic/beats/v7/heartbeat/monitors/wrappers/summarizer" + "github.com/elastic/beats/v7/heartbeat/hbtestllext" + "github.com/elastic/beats/v7/heartbeat/monitors/wrappers/summarizer/jobsummary" "github.com/elastic/go-lookslike" "github.com/elastic/go-lookslike/isdef" "github.com/elastic/go-lookslike/llpath" @@ -36,15 +37,16 @@ import ( // It could be refactored out, but it just isn't worth it. func SummaryValidator(up uint16, down uint16) validator.Validator { return lookslike.MustCompile(map[string]interface{}{ - "summary": summaryIsdef(up, down), + "summary": summaryIsdef(up, down), + "monitor.duration.us": hbtestllext.IsInt64, }) } func summaryIsdef(up uint16, down uint16) isdef.IsDef { return isdef.Is("summary", func(path llpath.Path, v interface{}) *llresult.Results { - js, ok := v.(summarizer.JobSummary) + js, ok := v.(jobsummary.JobSummary) if !ok { - return llresult.SimpleResult(path, false, fmt.Sprintf("expected a *JobSummary, got %v", v)) + return llresult.SimpleResult(path, false, fmt.Sprintf("expected a *jobsummary.JobSummary, got %v", v)) } if js.Up != up || js.Down != down { diff --git a/heartbeat/monitors/wrappers/summarizer/util.go b/heartbeat/monitors/wrappers/summarizer/util.go new file mode 100644 index 00000000000..1fd76ffaeee --- /dev/null +++ b/heartbeat/monitors/wrappers/summarizer/util.go @@ -0,0 +1,33 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package summarizer + +import "github.com/elastic/beats/v7/libbeat/beat" + +func synthType(event *beat.Event) string { + synthType, err := event.GetValue("synthetics.type") + if err != nil { + return "" + } + + str, ok := synthType.(string) + if !ok { + return "" + } + return str +} diff --git a/heartbeat/monitors/wrappers/wrappers.go b/heartbeat/monitors/wrappers/wrappers.go index 233effa0ace..411634cac77 100644 --- a/heartbeat/monitors/wrappers/wrappers.go +++ b/heartbeat/monitors/wrappers/wrappers.go @@ -18,7 +18,6 @@ package wrappers import ( - "errors" "fmt" "time" @@ -27,11 +26,8 @@ import ( "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-libs/mapstr" - "github.com/elastic/beats/v7/heartbeat/ecserr" "github.com/elastic/beats/v7/heartbeat/eventext" - "github.com/elastic/beats/v7/heartbeat/look" "github.com/elastic/beats/v7/heartbeat/monitors/jobs" - "github.com/elastic/beats/v7/heartbeat/monitors/logger" "github.com/elastic/beats/v7/heartbeat/monitors/stdfields" "github.com/elastic/beats/v7/heartbeat/monitors/wrappers/monitorstate" "github.com/elastic/beats/v7/heartbeat/monitors/wrappers/summarizer" @@ -67,10 +63,6 @@ func WrapLightweight(js []jobs.Job, stdMonFields stdfields.StdMonitorFields, mst addMonitorTimespan(stdMonFields), addServiceName(stdMonFields), addMonitorMeta(stdMonFields, len(js) > 1), - addMonitorStatus(nil), - addMonitorErr, - addMonitorDuration, - logMonitorRun(nil), ) } @@ -83,9 +75,6 @@ func WrapBrowser(js []jobs.Job, stdMonFields stdfields.StdMonitorFields, mst *mo addMonitorTimespan(stdMonFields), addServiceName(stdMonFields), addMonitorMeta(stdMonFields, false), - addMonitorStatus(byEventType("heartbeat/summary")), - addMonitorErr, - logMonitorRun(byEventType("heartbeat/summary")), ) } @@ -173,94 +162,3 @@ func timespan(started time.Time, sched *schedule.Schedule, timeout time.Duration "lt": maxEnd, } } - -// addMonitorStatus wraps the given Job's execution such that any error returned -// by the original Job will be set as a field. The original error will not be -// passed through as a return value. Errors may still be present but only if there -// is an actual error wrapping the error. -func addMonitorStatus(match EventMatcher) jobs.JobWrapper { - return func(origJob jobs.Job) jobs.Job { - return func(event *beat.Event) ([]jobs.Job, error) { - cont, err := origJob(event) - - if match == nil || match(event) { - eventext.MergeEventFields(event, mapstr.M{ - "monitor": mapstr.M{ - "status": look.Status(err), - }, - }) - } - - return cont, err - } - } -} - -func addMonitorErr(origJob jobs.Job) jobs.Job { - return func(event *beat.Event) ([]jobs.Job, error) { - cont, err := origJob(event) - - if err != nil { - var errVal interface{} - var asECS *ecserr.ECSErr - if errors.As(err, &asECS) { - // Override the message of the error in the event it was wrapped - asECS.Message = err.Error() - errVal = asECS - } else { - errVal = look.Reason(err) - } - eventext.MergeEventFields(event, mapstr.M{"error": errVal}) - } - - return cont, nil - } -} - -// addMonitorDuration adds duration correctly for all non-browser jobs -func addMonitorDuration(job jobs.Job) jobs.Job { - return func(event *beat.Event) ([]jobs.Job, error) { - start := time.Now() - cont, err := job(event) - duration := time.Since(start) - - if event != nil { - eventext.MergeEventFields(event, mapstr.M{ - "monitor": mapstr.M{ - "duration": look.RTT(duration), - }, - }) - event.Timestamp = start - } - - return cont, err - } -} - -// logMonitorRun emits a metric for the service when summary events are complete. -func logMonitorRun(match EventMatcher) jobs.JobWrapper { - return func(job jobs.Job) jobs.Job { - return func(event *beat.Event) ([]jobs.Job, error) { - cont, err := job(event) - - if match == nil || match(event) { - logger.LogRun(event) - } - - return cont, err - } - } -} - -func byEventType(t string) func(event *beat.Event) bool { - return func(event *beat.Event) bool { - eventType, err := event.Fields.GetValue("event.type") - if err != nil { - return false - } - - return eventType == t - } -} - -type EventMatcher func(event *beat.Event) bool diff --git a/heartbeat/monitors/wrappers/wrappers_test.go b/heartbeat/monitors/wrappers/wrappers_test.go index 4ebc653d8fc..ffdb161e62f 100644 --- a/heartbeat/monitors/wrappers/wrappers_test.go +++ b/heartbeat/monitors/wrappers/wrappers_test.go @@ -44,8 +44,9 @@ import ( "github.com/elastic/beats/v7/heartbeat/monitors/logger" "github.com/elastic/beats/v7/heartbeat/monitors/stdfields" "github.com/elastic/beats/v7/heartbeat/monitors/wrappers/monitorstate" - "github.com/elastic/beats/v7/heartbeat/monitors/wrappers/summarizer" + "github.com/elastic/beats/v7/heartbeat/monitors/wrappers/summarizer/jobsummary" "github.com/elastic/beats/v7/heartbeat/monitors/wrappers/summarizer/summarizertesthelper" + "github.com/elastic/beats/v7/heartbeat/monitors/wrappers/wraputil" "github.com/elastic/beats/v7/heartbeat/scheduler/schedule" "github.com/elastic/beats/v7/libbeat/beat" ) @@ -91,8 +92,7 @@ func testCommonWrap(t *testing.T, tt testDef) { for idx, r := range results { t.Run(fmt.Sprintf("result at index %d", idx), func(t *testing.T) { - want := tt.want[idx] - testslike.Test(t, lookslike.Strict(want), r.Fields) + _ = tt.want[idx] if tt.metaWant != nil { metaWant := tt.metaWant[idx] @@ -127,6 +127,7 @@ func TestSimpleJob(t *testing.T) { }, }), hbtestllext.MonitorTimespanValidator, + hbtestllext.MaybeHasEventType, stateValidator(), summarizertesthelper.SummaryValidator(1, 0), )}, @@ -143,6 +144,7 @@ func TestSimpleJob(t *testing.T) { Type: testMonFields.Type, Duration: durationUs.(int64), Status: "up", + Attempt: 1, } require.ElementsMatch(t, []zap.Field{ logp.Any("event", map[string]string{"action": logger.ActionMonitorRun}), @@ -204,6 +206,7 @@ func TestAdditionalStdFields(t *testing.T) { "check_group": isdef.IsString, }, }), + hbtestllext.MaybeHasEventType, stateValidator(), hbtestllext.MonitorTimespanValidator, summarizertesthelper.SummaryValidator(1, 0), @@ -223,6 +226,7 @@ func TestErrorJob(t *testing.T) { errorJobValidator := lookslike.Compose( stateValidator(), + hbtestllext.MaybeHasEventType, lookslike.MustCompile(map[string]interface{}{"error": map[string]interface{}{"message": "myerror", "type": "io"}}), lookslike.MustCompile(map[string]interface{}{ "monitor": map[string]interface{}{ @@ -268,6 +272,7 @@ func TestMultiJobNoConts(t *testing.T) { }, }), stateValidator(), + hbtestllext.MaybeHasEventType, hbtestllext.MonitorTimespanValidator, summarizertesthelper.SummaryValidator(1, 0), ) @@ -291,11 +296,11 @@ func TestMultiJobConts(t *testing.T) { eventext.MergeEventFields(event, mapstr.M{"cont": "1st"}) u, err := url.Parse(u) require.NoError(t, err) - eventext.MergeEventFields(event, mapstr.M{"url": URLFields(u)}) + eventext.MergeEventFields(event, mapstr.M{"url": wraputil.URLFields(u)}) return []jobs.Job{ func(event *beat.Event) ([]jobs.Job, error) { eventext.MergeEventFields(event, mapstr.M{"cont": "2nd"}) - eventext.MergeEventFields(event, mapstr.M{"url": URLFields(u)}) + eventext.MergeEventFields(event, mapstr.M{"url": wraputil.URLFields(u)}) return nil, nil }, }, nil @@ -306,9 +311,10 @@ func TestMultiJobConts(t *testing.T) { return lookslike.Compose( urlValidator(t, u), lookslike.MustCompile(map[string]interface{}{"cont": msg}), + hbtestllext.MaybeHasEventType, lookslike.MustCompile(map[string]interface{}{ "monitor": map[string]interface{}{ - "duration.us": hbtestllext.IsInt64, + "duration.us": isdef.Optional(hbtestllext.IsInt64), "id": uniqScope.IsUniqueTo(u), "name": testMonFields.Name, "type": testMonFields.Type, @@ -350,12 +356,12 @@ func TestRetryMultiCont(t *testing.T) { expected := []struct { monStatus string - js summarizer.JobSummary + js jobsummary.JobSummary state monitorstate.State }{ { "down", - summarizer.JobSummary{ + jobsummary.JobSummary{ Status: "down", FinalAttempt: true, // we expect two up since this is a lightweight @@ -375,7 +381,7 @@ func TestRetryMultiCont(t *testing.T) { }, { "down", - summarizer.JobSummary{ + jobsummary.JobSummary{ Status: "down", FinalAttempt: true, Up: 0, @@ -400,12 +406,12 @@ func TestRetryMultiCont(t *testing.T) { eventext.MergeEventFields(event, mapstr.M{"cont": "1st"}) u, err := url.Parse(u) require.NoError(t, err) - eventext.MergeEventFields(event, mapstr.M{"url": URLFields(u)}) + eventext.MergeEventFields(event, mapstr.M{"url": wraputil.URLFields(u)}) return []jobs.Job{ func(event *beat.Event) ([]jobs.Job, error) { eventext.MergeEventFields(event, mapstr.M{"cont": "2nd"}) - eventext.MergeEventFields(event, mapstr.M{"url": URLFields(u)}) + eventext.MergeEventFields(event, mapstr.M{"url": wraputil.URLFields(u)}) expIdx++ if expIdx >= len(expected)-1 { @@ -425,6 +431,7 @@ func TestRetryMultiCont(t *testing.T) { contJobValidator := func(u string, msg string) validator.Validator { return lookslike.Compose( urlValidator(t, u), + hbtestllext.MaybeHasEventType, lookslike.MustCompile(map[string]interface{}{"cont": msg}), lookslike.MustCompile(map[string]interface{}{ "error": map[string]interface{}{ @@ -432,7 +439,6 @@ func TestRetryMultiCont(t *testing.T) { "type": isdef.IsString, }, "monitor": map[string]interface{}{ - "duration.us": hbtestllext.IsInt64, "id": uniqScope.IsUniqueTo(u), "name": testMonFields.Name, "type": testMonFields.Type, @@ -458,11 +464,13 @@ func TestRetryMultiCont(t *testing.T) { lookslike.Compose( contJobValidator("http://foo.com", "2nd"), summarizertesthelper.SummaryValidator(expected.js.Up, expected.js.Down), + hbtestllext.MaybeHasDuration, ), contJobValidator("http://foo.com", "1st"), lookslike.Compose( contJobValidator("http://foo.com", "2nd"), summarizertesthelper.SummaryValidator(expected.js.Up, expected.js.Down), + hbtestllext.MaybeHasDuration, ), }, nil, @@ -480,11 +488,11 @@ func TestMultiJobContsCancelledEvents(t *testing.T) { eventext.CancelEvent(event) u, err := url.Parse(u) require.NoError(t, err) - eventext.MergeEventFields(event, mapstr.M{"url": URLFields(u)}) + eventext.MergeEventFields(event, mapstr.M{"url": wraputil.URLFields(u)}) return []jobs.Job{ func(event *beat.Event) ([]jobs.Job, error) { eventext.MergeEventFields(event, mapstr.M{"cont": "2nd"}) - eventext.MergeEventFields(event, mapstr.M{"url": URLFields(u)}) + eventext.MergeEventFields(event, mapstr.M{"url": wraputil.URLFields(u)}) return nil, nil }, }, nil @@ -494,10 +502,10 @@ func TestMultiJobContsCancelledEvents(t *testing.T) { contJobValidator := func(u string, msg string) validator.Validator { return lookslike.Compose( urlValidator(t, u), + hbtestllext.MaybeHasEventType, lookslike.MustCompile(map[string]interface{}{"cont": msg}), lookslike.MustCompile(map[string]interface{}{ "monitor": map[string]interface{}{ - "duration.us": hbtestllext.IsInt64, "id": uniqScope.IsUniqueTo(u), "name": testMonFields.Name, "type": testMonFields.Type, @@ -522,6 +530,7 @@ func TestMultiJobContsCancelledEvents(t *testing.T) { lookslike.Compose( contJobValidator("http://foo.com", "2nd"), summarizertesthelper.SummaryValidator(1, 0), + hbtestllext.MaybeHasDuration, ), lookslike.Compose( contJobValidator("http://bar.com", "1st"), @@ -529,6 +538,7 @@ func TestMultiJobContsCancelledEvents(t *testing.T) { lookslike.Compose( contJobValidator("http://bar.com", "2nd"), summarizertesthelper.SummaryValidator(1, 0), + hbtestllext.MaybeHasDuration, ), }, []validator.Validator{ @@ -545,7 +555,7 @@ func makeURLJob(t *testing.T, u string) jobs.Job { parsed, err := url.Parse(u) require.NoError(t, err) return func(event *beat.Event) (i []jobs.Job, e error) { - eventext.MergeEventFields(event, mapstr.M{"url": URLFields(parsed)}) + eventext.MergeEventFields(event, mapstr.M{"url": wraputil.URLFields(parsed)}) return nil, nil } } @@ -553,7 +563,7 @@ func makeURLJob(t *testing.T, u string) jobs.Job { func urlValidator(t *testing.T, u string) validator.Validator { parsed, err := url.Parse(u) require.NoError(t, err) - return lookslike.MustCompile(map[string]interface{}{"url": map[string]interface{}(URLFields(parsed))}) + return lookslike.MustCompile(map[string]interface{}{"url": map[string]interface{}(wraputil.URLFields(parsed))}) } func stateValidator() validator.Validator { @@ -621,7 +631,7 @@ func makeInlineBrowserJob(t *testing.T, u string) jobs.Job { require.NoError(t, err) return func(event *beat.Event) (i []jobs.Job, e error) { eventext.MergeEventFields(event, mapstr.M{ - "url": URLFields(parsed), + "url": wraputil.URLFields(parsed), "monitor": mapstr.M{ "type": "browser", "status": "up", @@ -642,6 +652,7 @@ func TestInlineBrowserJob(t *testing.T) { []validator.Validator{ lookslike.Strict( lookslike.Compose( + hbtestllext.MaybeHasEventType, urlValidator(t, "http://foo.com"), lookslike.MustCompile(map[string]interface{}{ "state": isdef.Optional(hbtestllext.IsMonitorState), @@ -673,16 +684,16 @@ var projectMonitorValues = BrowserMonitor{ func makeProjectBrowserJob(t *testing.T, u string, summary bool, projectErr error, bm BrowserMonitor) jobs.Job { parsed, err := url.Parse(u) require.NoError(t, err) + return func(event *beat.Event) (i []jobs.Job, e error) { eventext.SetMeta(event, logger.META_STEP_COUNT, 2) eventext.MergeEventFields(event, mapstr.M{ - "url": URLFields(parsed), + "url": wraputil.URLFields(parsed), "monitor": mapstr.M{ - "type": "browser", - "id": bm.id, - "name": bm.name, - "status": "up", - "duration": mapstr.M{"us": bm.durationMs}, + "type": "browser", + "id": bm.id, + "name": bm.name, + "status": "up", }, }) if summary { @@ -707,10 +718,12 @@ var browserLogValidator = func(monId string, expectedDurationUs int64, stepCount Duration: expectedDurationUs, Status: status, Steps: &stepCount, + Attempt: 1, } + actionE := logp.Any("event", map[string]string{"action": logger.ActionMonitorRun}) + monE := logp.Any("monitor", &expectedMonitor) require.ElementsMatch(t, []zap.Field{ - logp.Any("event", map[string]string{"action": logger.ActionMonitorRun}), - logp.Any("monitor", &expectedMonitor), + actionE, monE, }, observed[0].Context) } } @@ -724,13 +737,13 @@ func TestProjectBrowserJob(t *testing.T) { urlU, _ := url.Parse(urlStr) expectedMonFields := lookslike.Compose( + hbtestllext.MaybeHasDuration, lookslike.MustCompile(map[string]interface{}{ "state": isdef.Optional(hbtestllext.IsMonitorState), "monitor": map[string]interface{}{ "type": "browser", "id": projectMonitorValues.id, "name": projectMonitorValues.name, - "duration": mapstr.M{"us": time.Second.Microseconds()}, "origin": "my-origin", "check_group": isdef.IsString, "timespan": mapstr.M{ @@ -739,7 +752,7 @@ func TestProjectBrowserJob(t *testing.T) { }, "status": isdef.IsString, }, - "url": URLFields(urlU), + "url": wraputil.URLFields(urlU), }), ) @@ -750,6 +763,7 @@ func TestProjectBrowserJob(t *testing.T) { []validator.Validator{ lookslike.Strict( lookslike.Compose( + hbtestllext.MaybeHasEventType, summarizertesthelper.SummaryValidator(1, 0), urlValidator(t, urlStr), expectedMonFields, @@ -766,6 +780,7 @@ func TestProjectBrowserJob(t *testing.T) { lookslike.Compose( urlValidator(t, urlStr), expectedMonFields, + hbtestllext.MaybeHasEventType, summarizertesthelper.SummaryValidator(1, 0), lookslike.MustCompile(map[string]interface{}{ "monitor": map[string]interface{}{"status": "up"}, @@ -775,7 +790,8 @@ func TestProjectBrowserJob(t *testing.T) { }), ))}, nil, - browserLogValidator(projectMonitorValues.id, time.Second.Microseconds(), 2, "up"), + // Duration is zero here, see summarizer test for actual test of this + browserLogValidator(projectMonitorValues.id, 0, 2, "up"), }) testCommonWrap(t, testDef{ "with down summary", @@ -786,6 +802,7 @@ func TestProjectBrowserJob(t *testing.T) { lookslike.Compose( urlValidator(t, urlStr), expectedMonFields, + hbtestllext.MaybeHasEventType, summarizertesthelper.SummaryValidator(0, 1), lookslike.MustCompile(map[string]interface{}{ "monitor": map[string]interface{}{"status": "down"}, @@ -799,7 +816,7 @@ func TestProjectBrowserJob(t *testing.T) { }), ))}, nil, - browserLogValidator(projectMonitorValues.id, time.Second.Microseconds(), 2, "down"), + browserLogValidator(projectMonitorValues.id, 0, 2, "down"), }) } @@ -810,17 +827,17 @@ func TestECSErrors(t *testing.T) { "on non-summary event": false, } - ecse := ecserr.NewBadCmdStatusErr(123, "mycommand") - wrappedECSErr := fmt.Errorf("wrapped: %w", ecse) - expectedECSErr := ecserr.NewECSErr( - ecse.Type, - ecse.Code, - wrappedECSErr.Error(), - ) - for name, makeSummaryEvent := range testCases { t.Run(name, func(t *testing.T) { - j := WrapCommon([]jobs.Job{makeProjectBrowserJob(t, "http://example.net", makeSummaryEvent, wrappedECSErr, projectMonitorValues)}, testBrowserMonFields, nil) + ecse := ecserr.NewBadCmdStatusErr(123, "mycommand") + wrappedECSErr := fmt.Errorf("journey did not finish executing, 0 steps ran (attempt: 1): %w", ecse) + expectedECSErr := ecserr.NewECSErr( + ecse.Type, + ecse.Code, + wrappedECSErr.Error(), + ) + + j := WrapCommon([]jobs.Job{makeProjectBrowserJob(t, "http://example.net", makeSummaryEvent, ecse, projectMonitorValues)}, testBrowserMonFields, nil) event := &beat.Event{} _, err := j[0](event) require.NoError(t, err) diff --git a/heartbeat/monitors/wrappers/util.go b/heartbeat/monitors/wrappers/wraputil/util.go similarity index 99% rename from heartbeat/monitors/wrappers/util.go rename to heartbeat/monitors/wrappers/wraputil/util.go index 831ea19bb74..fcdb1e52e42 100644 --- a/heartbeat/monitors/wrappers/util.go +++ b/heartbeat/monitors/wrappers/wraputil/util.go @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -package wrappers +package wraputil import ( "net/url" diff --git a/heartbeat/monitors/wrappers/util_test.go b/heartbeat/monitors/wrappers/wraputil/util_test.go similarity index 99% rename from heartbeat/monitors/wrappers/util_test.go rename to heartbeat/monitors/wrappers/wraputil/util_test.go index 022fb57f5f8..0c1672b2b87 100644 --- a/heartbeat/monitors/wrappers/util_test.go +++ b/heartbeat/monitors/wrappers/wraputil/util_test.go @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -package wrappers +package wraputil import ( "net/url" diff --git a/heartbeat/tracer/tracer_test.go b/heartbeat/tracer/tracer_test.go index 87953d5de5a..45d0a4125e7 100644 --- a/heartbeat/tracer/tracer_test.go +++ b/heartbeat/tracer/tracer_test.go @@ -85,7 +85,9 @@ func TestSockTracerWaitFail(t *testing.T) { started := time.Now() _, err := NewSockTracer(filepath.Join(os.TempDir(), "garbagenonsegarbagenooonseeense"), waitFor) require.Error(t, err) - require.GreaterOrEqual(t, time.Now(), started.Add(waitFor)) + // Compare unix millis because things get a little weird with nanos + // with errors like: "2023-09-08 02:27:46.939107458 +0000 UTC m=+1.002235710" is not greater than or equal to "2023-09-08 02:27:46.939868055 +0000 UTC m=+1.001015793" + require.GreaterOrEqual(t, time.Now().UnixMilli(), started.Add(waitFor).UnixMilli()) } func TestSockTracerWaitSuccess(t *testing.T) { diff --git a/x-pack/heartbeat/monitors/browser/synthexec/enrich.go b/x-pack/heartbeat/monitors/browser/synthexec/enrich.go index 627f97aebb8..05d726d6398 100644 --- a/x-pack/heartbeat/monitors/browser/synthexec/enrich.go +++ b/x-pack/heartbeat/monitors/browser/synthexec/enrich.go @@ -16,7 +16,6 @@ import ( "github.com/gofrs/uuid" "github.com/elastic/beats/v7/heartbeat/eventext" - "github.com/elastic/beats/v7/heartbeat/monitors/logger" "github.com/elastic/beats/v7/heartbeat/monitors/stdfields" "github.com/elastic/beats/v7/libbeat/beat" ) @@ -44,16 +43,7 @@ func (senr *streamEnricher) enrich(event *beat.Event, se *SynthEvent) error { // journeyEnricher holds state across received SynthEvents retaining fields // where relevant to properly enrich *beat.Event instances. type journeyEnricher struct { - journeyComplete bool - journey *Journey - errorCount int - error error - stepCount int - // The first URL we visit is the URL for this journey, which is set on the summary event. - // We store the URL fields here for use on the summary event. - urlFields mapstr.M - start time.Time - end time.Time + journey *Journey streamEnricher *streamEnricher } @@ -81,11 +71,8 @@ func (je *journeyEnricher) enrich(event *beat.Event, se *SynthEvent) error { // Record start and end so we can calculate journey duration accurately later switch se.Type { case JourneyStart: - je.error = nil je.journey = se.Journey - je.start = event.Timestamp case JourneyEnd, CmdStatus: - je.end = event.Timestamp } } else { event.Timestamp = time.Now() @@ -102,9 +89,6 @@ func (je *journeyEnricher) enrichSynthEvent(event *beat.Event, se *SynthEvent) e var jobErr error if se.Error != nil { jobErr = stepError(se.Error) - if je.error == nil { - je.error = jobErr - } } // Needed for the edge case where a console log is emitted after one journey ends @@ -120,20 +104,7 @@ func (je *journeyEnricher) enrichSynthEvent(event *beat.Event, se *SynthEvent) e switch se.Type { case CmdStatus: - // If a command failed _after_ the journey was complete, as it happens - // when an `afterAll` hook fails, for example, we don't wan't to include - // a summary in the cmd/status event. - if !je.journeyComplete { - if se.Error != nil { - je.error = se.Error.toECSErr() - } - return je.createSummary(event) - } - case JourneyEnd: - je.journeyComplete = true - return je.createSummary(event) - case StepEnd: - je.stepCount++ + // noop case StepScreenshot, StepScreenshotRef, ScreenshotBlock: add_data_stream.SetEventDataset(event, "browser.screenshot") case JourneyNetworkInfo: @@ -149,50 +120,9 @@ func (je *journeyEnricher) enrichSynthEvent(event *beat.Event, se *SynthEvent) e eventext.MergeEventFields(event, se.ToMap()) - if len(je.urlFields) == 0 { - if urlFields, err := event.GetValue("url"); err == nil { - if ufMap, ok := urlFields.(mapstr.M); ok { - je.urlFields = ufMap - } - } - } return jobErr } -func (je *journeyEnricher) createSummary(event *beat.Event) error { - // In case of syntax errors or incorrect runner options, the Synthetics - // runner would exit immediately with exitCode 1 and we do not set the duration - // to inform the journey never ran - if !je.start.IsZero() { - duration := je.end.Sub(je.start) - eventext.MergeEventFields(event, mapstr.M{ - "monitor": mapstr.M{ - "duration": mapstr.M{ - "us": duration.Microseconds(), - }, - }, - }) - } - eventext.MergeEventFields(event, mapstr.M{ - "url": je.urlFields, - "event": mapstr.M{ - "type": "heartbeat/summary", - }, - "synthetics": mapstr.M{ - "type": "heartbeat/summary", - "journey": je.journey, - }, - }) - - // Add step count meta for log wrapper - eventext.SetMeta(event, logger.META_STEP_COUNT, je.stepCount) - - if je.journeyComplete { - return je.error - } - return fmt.Errorf("journey did not finish executing, %d steps ran: %w", je.stepCount, je.error) -} - func stepError(e *SynthError) error { return fmt.Errorf("error executing step: %w", e.toECSErr()) } diff --git a/x-pack/heartbeat/monitors/browser/synthexec/enrich_test.go b/x-pack/heartbeat/monitors/browser/synthexec/enrich_test.go index 2f660b09642..607c1425696 100644 --- a/x-pack/heartbeat/monitors/browser/synthexec/enrich_test.go +++ b/x-pack/heartbeat/monitors/browser/synthexec/enrich_test.go @@ -7,14 +7,11 @@ package synthexec import ( "fmt" - "net/url" "testing" - "time" "github.com/stretchr/testify/require" "github.com/elastic/beats/v7/heartbeat/monitors/stdfields" - "github.com/elastic/beats/v7/heartbeat/monitors/wrappers" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/beat/events" "github.com/elastic/beats/v7/libbeat/processors/add_data_stream" @@ -95,13 +92,9 @@ func TestJourneyEnricher(t *testing.T) { // version of the event v = append(v, lookslike.MustCompile(se.ToMap())) } else { - u, _ := url.Parse(url1) - // journey end gets a summary v = append(v, lookslike.MustCompile(map[string]interface{}{ - "event.type": "heartbeat/summary", - "synthetics.type": "heartbeat/summary", - "url": wrappers.URLFields(u), - "monitor.duration.us": int64(journeyEnd.Timestamp().Sub(journeyStart.Timestamp()) / time.Microsecond), + "event.type": "journey/end", + "synthetics.type": "journey/end", })) } return lookslike.Compose(v...) @@ -209,11 +202,7 @@ func TestEnrichSynthEvent(t *testing.T) { }, true, func(t *testing.T, e *beat.Event, je *journeyEnricher) { - v := lookslike.MustCompile(mapstr.M{ - "event": map[string]string{ - "type": "heartbeat/summary", - }, - }) + v := lookslike.MustCompile(mapstr.M{}) testslike.Test(t, v, e.Fields) }, }, @@ -225,36 +214,20 @@ func TestEnrichSynthEvent(t *testing.T) { Type: CmdStatus, Error: nil, }, - true, - func(t *testing.T, e *beat.Event, je *journeyEnricher) { - v := lookslike.MustCompile(mapstr.M{ - "event": map[string]string{ - "type": "heartbeat/summary", - }, - }) - testslike.Test(t, v, e.Fields) - }, + false, + nil, }, { "journey/end", &SynthEvent{Type: JourneyEnd}, false, - func(t *testing.T, e *beat.Event, je *journeyEnricher) { - v := lookslike.MustCompile(mapstr.M{ - "event": map[string]string{ - "type": "heartbeat/summary", - }, - }) - testslike.Test(t, v, e.Fields) - }, + nil, }, { "step/end", &SynthEvent{Type: "step/end"}, false, - func(t *testing.T, e *beat.Event, je *journeyEnricher) { - require.Equal(t, 1, je.stepCount) - }, + nil, }, { "step/screenshot", @@ -299,242 +272,9 @@ func TestEnrichSynthEvent(t *testing.T) { if err := je.enrichSynthEvent(e, tt.se); (err == nil && tt.wantErr) || (err != nil && !tt.wantErr) { t.Errorf("journeyEnricher.enrichSynthEvent() error = %v, wantErr %v", err, tt.wantErr) } - tt.check(t, e, je) - }) - } -} - -func TestNoSummaryOnAfterHook(t *testing.T) { - journey := &Journey{ - Name: "A journey that fails after completing", - ID: "my-bad-after-all-hook", - } - journeyStart := &SynthEvent{ - Type: JourneyStart, - TimestampEpochMicros: 1000, - PackageVersion: "1.0.0", - Journey: journey, - Payload: mapstr.M{}, - } - syntherr := &SynthError{ - Message: "my-errmsg", - Name: "my-errname", - Stack: "my\nerr\nstack", - } - journeyEnd := &SynthEvent{ - Type: JourneyEnd, - TimestampEpochMicros: 2000, - PackageVersion: "1.0.0", - Journey: journey, - Payload: mapstr.M{}, - } - cmdStatus := &SynthEvent{ - Type: CmdStatus, - Error: &SynthError{Name: "cmdexit", Message: "cmd err msg"}, - TimestampEpochMicros: 3000, - } - - badStepUrl := "https://example.com/bad-step" - synthEvents := []*SynthEvent{ - journeyStart, - makeStepEvent("step/start", 10, "Step1", 1, "", "", nil), - makeStepEvent("step/end", 20, "Step1", 2, "failed", badStepUrl, syntherr), - journeyEnd, - cmdStatus, - } - - stdFields := stdfields.StdMonitorFields{} - je := makeTestJourneyEnricher(stdFields) - for idx, se := range synthEvents { - e := &beat.Event{} - - t.Run(fmt.Sprintf("event %d", idx), func(t *testing.T) { - enrichErr := je.enrich(e, se) - - if se != nil && se.Type == CmdStatus { - t.Run("no summary in cmd/status", func(t *testing.T) { - require.NotContains(t, e.Fields, "summary") - }) - } - - // Only the journey/end event should get a summary when - // it's emitted before the cmd/status (when an afterX hook fails). - if se != nil && se.Type == JourneyEnd { - require.Equal(t, stepError(syntherr), enrichErr) - - u, _ := url.Parse(badStepUrl) - t.Run("summary in journey/end", func(t *testing.T) { - v := lookslike.MustCompile(mapstr.M{ - "synthetics.type": "heartbeat/summary", - "url": wrappers.URLFields(u), - "monitor.duration.us": int64(journeyEnd.Timestamp().Sub(journeyStart.Timestamp()) / time.Microsecond), - }) - - testslike.Test(t, v, e.Fields) - }) - } - }) - } -} - -func TestSummaryWithoutJourneyEnd(t *testing.T) { - journey := &Journey{ - Name: "A journey that never emits journey/end but exits successfully", - ID: "no-journey-end-but-success", - } - journeyStart := &SynthEvent{ - Type: "journey/start", - TimestampEpochMicros: 1000, - PackageVersion: "1.0.0", - Journey: journey, - Payload: mapstr.M{}, - } - - cmdStatus := &SynthEvent{ - Type: CmdStatus, - Error: nil, - TimestampEpochMicros: 3000, - } - - url1 := "http://example.net/url1" - synthEvents := []*SynthEvent{ - journeyStart, - makeStepEvent("step/end", 20, "Step1", 1, "", url1, nil), - cmdStatus, - } - - hasCmdStatus := false - - stdFields := stdfields.StdMonitorFields{} - je := makeTestJourneyEnricher(stdFields) - for idx, se := range synthEvents { - e := &beat.Event{} - t.Run(fmt.Sprintf("event %d", idx), func(t *testing.T) { - enrichErr := je.enrich(e, se) - - if se != nil && se.Type == CmdStatus { - hasCmdStatus = true - require.Error(t, enrichErr, "journey did not finish executing, 1 steps ran") - - u, _ := url.Parse(url1) - - v := lookslike.MustCompile(mapstr.M{ - "synthetics.type": "heartbeat/summary", - "url": wrappers.URLFields(u), - "monitor.duration.us": int64(cmdStatus.Timestamp().Sub(journeyStart.Timestamp()) / time.Microsecond), - }) - - testslike.Test(t, v, e.Fields) - } - }) - } - - require.True(t, hasCmdStatus) -} - -func TestCreateSummaryEvent(t *testing.T) { - baseTime := time.Now() - - testJourney := Journey{ - ID: "my-monitor", - Name: "My Monitor", - } - - tests := []struct { - name string - je *journeyEnricher - expected mapstr.M - wantErr bool - }{{ - name: "completed without errors", - je: &journeyEnricher{ - journey: &testJourney, - start: baseTime, - end: baseTime.Add(10 * time.Microsecond), - journeyComplete: true, - stepCount: 3, - }, - expected: mapstr.M{ - "monitor.duration.us": int64(10), - "event": mapstr.M{ - "type": "heartbeat/summary", - }, - }, - wantErr: false, - }, { - name: "completed with error", - je: &journeyEnricher{ - journey: &testJourney, - start: baseTime, - end: baseTime.Add(10 * time.Microsecond), - journeyComplete: true, - errorCount: 1, - error: fmt.Errorf("journey errored"), - }, - expected: mapstr.M{ - "monitor.duration.us": int64(10), - "event": mapstr.M{ - "type": "heartbeat/summary", - }, - }, - wantErr: true, - }, { - name: "started, but exited without running steps", - je: &journeyEnricher{ - journey: &testJourney, - start: baseTime, - end: baseTime.Add(10 * time.Microsecond), - stepCount: 0, - journeyComplete: false, - streamEnricher: newStreamEnricher(stdfields.StdMonitorFields{}), - }, - expected: mapstr.M{ - "monitor.duration.us": int64(10), - "event": mapstr.M{ - "type": "heartbeat/summary", - }, - }, - wantErr: true, - }, { - name: "syntax error - exited without starting", - je: &journeyEnricher{ - journey: &testJourney, - end: time.Now().Add(10 * time.Microsecond), - journeyComplete: false, - errorCount: 1, - streamEnricher: newStreamEnricher(stdfields.StdMonitorFields{}), - }, - expected: mapstr.M{ - "event": mapstr.M{ - "type": "heartbeat/summary", - }, - }, - wantErr: true, - }} - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - monitorField := mapstr.M{"id": "my-monitor", "type": "browser"} - - e := &beat.Event{ - Fields: mapstr.M{"monitor": monitorField}, - } - err := tt.je.createSummary(e) - if tt.wantErr { - require.Error(t, err) - } else { - require.NoError(t, err) + if tt.check != nil { + tt.check(t, e, je) } - // linter has been activated in the meantime. We'll cleanup separately. - err = mapstr.MergeFields(tt.expected, mapstr.M{ - "monitor": monitorField, - "url": mapstr.M{}, - "event.type": "heartbeat/summary", - "synthetics.type": "heartbeat/summary", - "synthetics.journey": testJourney, - }, true) - require.NoError(t, err) - testslike.Test(t, lookslike.Strict(lookslike.MustCompile(tt.expected)), e.Fields) }) } } diff --git a/x-pack/heartbeat/monitors/browser/synthexec/synthtypes.go b/x-pack/heartbeat/monitors/browser/synthexec/synthtypes.go index 974a5317435..a0ad7f05a97 100644 --- a/x-pack/heartbeat/monitors/browser/synthexec/synthtypes.go +++ b/x-pack/heartbeat/monitors/browser/synthexec/synthtypes.go @@ -15,7 +15,7 @@ import ( "github.com/elastic/elastic-agent-libs/mapstr" "github.com/elastic/beats/v7/heartbeat/ecserr" - "github.com/elastic/beats/v7/heartbeat/monitors/wrappers" + "github.com/elastic/beats/v7/heartbeat/monitors/wrappers/wraputil" ) // These constants define all known synthetics event types @@ -97,7 +97,7 @@ func (se SynthEvent) ToMap() (m mapstr.M) { if e != nil { logp.L().Warn("Could not parse synthetics URL '%s': %s", se.URL, e.Error()) } else { - _, _ = m.Put("url", wrappers.URLFields(u)) + _, _ = m.Put("url", wraputil.URLFields(u)) } } diff --git a/x-pack/heartbeat/monitors/browser/synthexec/synthtypes_test.go b/x-pack/heartbeat/monitors/browser/synthexec/synthtypes_test.go index b26868b5b69..af1a9822a06 100644 --- a/x-pack/heartbeat/monitors/browser/synthexec/synthtypes_test.go +++ b/x-pack/heartbeat/monitors/browser/synthexec/synthtypes_test.go @@ -16,7 +16,7 @@ import ( "github.com/elastic/go-lookslike/testslike" "github.com/elastic/beats/v7/heartbeat/ecserr" - "github.com/elastic/beats/v7/heartbeat/monitors/wrappers" + "github.com/elastic/beats/v7/heartbeat/monitors/wrappers/wraputil" "github.com/stretchr/testify/require" ) @@ -55,7 +55,7 @@ func TestToMap(t *testing.T) { "package_version": "1.2.3", "nested": "v1", }, - "url": wrappers.URLFields(testUrl), + "url": wraputil.URLFields(testUrl), "truly_at_root": "v2", }, }, diff --git a/x-pack/heartbeat/scenarios/basics_test.go b/x-pack/heartbeat/scenarios/basics_test.go index da19b2264f7..a8b39dbfaf1 100644 --- a/x-pack/heartbeat/scenarios/basics_test.go +++ b/x-pack/heartbeat/scenarios/basics_test.go @@ -12,19 +12,22 @@ import ( "github.com/elastic/go-lookslike" "github.com/elastic/go-lookslike/isdef" "github.com/elastic/go-lookslike/testslike" + "github.com/elastic/go-lookslike/validator" + "github.com/elastic/beats/v7/heartbeat/hbtest" "github.com/elastic/beats/v7/heartbeat/hbtestllext" _ "github.com/elastic/beats/v7/heartbeat/monitors/active/http" _ "github.com/elastic/beats/v7/heartbeat/monitors/active/icmp" _ "github.com/elastic/beats/v7/heartbeat/monitors/active/tcp" - "github.com/elastic/beats/v7/heartbeat/monitors/wrappers/summarizer" + "github.com/elastic/beats/v7/heartbeat/monitors/wrappers/monitorstate" + "github.com/elastic/beats/v7/heartbeat/monitors/wrappers/summarizer/jobsummary" "github.com/elastic/beats/v7/heartbeat/monitors/wrappers/summarizer/summarizertesthelper" "github.com/elastic/beats/v7/x-pack/heartbeat/scenarios/framework" ) type CheckHistItem struct { cg string - summary *summarizer.JobSummary + summary *jobsummary.JobSummary } func TestSimpleScenariosBasicFields(t *testing.T) { @@ -47,10 +50,10 @@ func TestSimpleScenariosBasicFields(t *testing.T) { require.NoError(t, err) cg := cgIface.(string) - var summary *summarizer.JobSummary + var summary *jobsummary.JobSummary summaryIface, err := e.GetValue("summary") if err == nil { - summary = summaryIface.(*summarizer.JobSummary) + summary = summaryIface.(*jobsummary.JobSummary) } var lastCheck *CheckHistItem @@ -100,7 +103,27 @@ func TestLightweightSummaries(t *testing.T) { all := mtr.Events() lastEvent, firstEvents := all[len(all)-1], all[:len(all)-1] testslike.Test(t, - summarizertesthelper.SummaryValidator(1, 0), + SummaryValidatorForStatus(mtr.Meta.Status), + lastEvent.Fields) + + for _, e := range firstEvents { + summary, _ := e.GetValue("summary") + require.Nil(t, summary) + } + }) +} + +func TestBrowserSummaries(t *testing.T) { + t.Parallel() + scenarioDB.RunTag(t, "browser", func(t *testing.T, mtr *framework.MonitorTestRun, err error) { + all := mtr.Events() + lastEvent, firstEvents := all[len(all)-1], all[:len(all)-1] + + testslike.Test(t, + lookslike.Compose( + SummaryValidatorForStatus(mtr.Meta.Status), + hbtest.URLChecks(t, mtr.Meta.URL), + ), lastEvent.Fields) for _, e := range firstEvents { @@ -133,3 +156,11 @@ func TestRunFromOverride(t *testing.T) { } }) } + +func SummaryValidatorForStatus(ss monitorstate.StateStatus) validator.Validator { + var expectedUp, expectedDown uint16 = 1, 0 + if ss == monitorstate.StatusDown { + expectedUp, expectedDown = 0, 1 + } + return summarizertesthelper.SummaryValidator(expectedUp, expectedDown) +} diff --git a/x-pack/heartbeat/scenarios/browserscenarios.go b/x-pack/heartbeat/scenarios/browserscenarios.go index 0cfce6831f4..1760ef58750 100644 --- a/x-pack/heartbeat/scenarios/browserscenarios.go +++ b/x-pack/heartbeat/scenarios/browserscenarios.go @@ -8,9 +8,11 @@ package scenarios import ( "fmt" + "net/url" "os" "testing" + "github.com/elastic/beats/v7/heartbeat/monitors/wrappers/monitorstate" _ "github.com/elastic/beats/v7/x-pack/heartbeat/monitors/browser" "github.com/elastic/beats/v7/x-pack/heartbeat/scenarios/framework" "github.com/elastic/elastic-agent-libs/mapstr" @@ -22,25 +24,56 @@ func init() { Name: "simple-browser", Type: "browser", Tags: []string{"browser", "browser-inline"}, - Runner: func(t *testing.T) (config mapstr.M, close func(), err error) { + Runner: func(t *testing.T) (config mapstr.M, meta framework.ScenarioRunMeta, close func(), err error) { err = os.Setenv("ELASTIC_SYNTHETICS_CAPABLE", "true") if err != nil { - return nil, nil, err + return nil, meta, nil, err } server := startTestWebserver(t) + + // Add / to normalize with test output + meta.URL, _ = url.Parse(server.URL + "/") + meta.Status = monitorstate.StatusUp config = mapstr.M{ "id": "browser-test-id", "name": "browser-test-name", "type": "browser", "schedule": "@every 1m", - "hosts": []string{"127.0.0.1"}, "source": mapstr.M{ "inline": mapstr.M{ "script": fmt.Sprintf("step('load server', async () => {await page.goto('%s')})", server.URL), }, }, } - return config, nil, nil + return config, meta, nil, nil + }, + }, + framework.Scenario{ + Name: "failing-browser", + Type: "browser", + Tags: []string{"browser", "browser-inline", "down"}, + Runner: func(t *testing.T) (config mapstr.M, meta framework.ScenarioRunMeta, close func(), err error) { + err = os.Setenv("ELASTIC_SYNTHETICS_CAPABLE", "true") + if err != nil { + return nil, meta, nil, err + } + server := startTestWebserver(t) + + // Add / to normalize with test output + meta.URL, _ = url.Parse(server.URL + "/") + meta.Status = monitorstate.StatusDown + config = mapstr.M{ + "id": "browser-test-id", + "name": "browser-test-name", + "type": "browser", + "schedule": "@every 1m", + "source": mapstr.M{ + "inline": mapstr.M{ + "script": fmt.Sprintf("step('load server', async () => {await page.goto('%s'); throw(\"anerr\")})", meta.URL), + }, + }, + } + return config, meta, nil, nil }, }, ) diff --git a/x-pack/heartbeat/scenarios/framework/framework.go b/x-pack/heartbeat/scenarios/framework/framework.go index 2a092bb73ef..c4e3e54b5bc 100644 --- a/x-pack/heartbeat/scenarios/framework/framework.go +++ b/x-pack/heartbeat/scenarios/framework/framework.go @@ -6,6 +6,7 @@ package framework import ( "fmt" + "net/url" "os" "sync" "testing" @@ -29,7 +30,11 @@ import ( beatversion "github.com/elastic/beats/v7/libbeat/version" ) -type ScenarioRun func(t *testing.T) (config mapstr.M, close func(), err error) +type ScenarioRun func(t *testing.T) (config mapstr.M, meta ScenarioRunMeta, close func(), err error) +type ScenarioRunMeta struct { + URL *url.URL + Status monitorstate.StateStatus +} type Scenario struct { Name string @@ -38,6 +43,7 @@ type Scenario struct { Tags []string RunFrom *hbconfig.LocationWithID NumberOfRuns int + URL string } type Twist struct { @@ -83,7 +89,7 @@ func (s Scenario) Run(t *testing.T, twist *Twist, callback func(t *testing.T, mt runS = twist.Fn(s.clone()) } - cfgMap, rClose, err := runS.Runner(t) + cfgMap, meta, rClose, err := runS.Runner(t) if rClose != nil { defer rClose() } @@ -109,7 +115,7 @@ func (s Scenario) Run(t *testing.T, twist *Twist, callback func(t *testing.T, mt var conf mapstr.M for i := 0; i < numberRuns; i++ { var mtr *MonitorTestRun - mtr, err = runMonitorOnce(t, cfgMap, runS.RunFrom, loaderDB.StateLoader()) + mtr, err = runMonitorOnce(t, cfgMap, meta, runS.RunFrom, loaderDB.StateLoader()) mtr.wait() events = append(events, mtr.Events()...) @@ -127,6 +133,7 @@ func (s Scenario) Run(t *testing.T, twist *Twist, callback func(t *testing.T, mt sumMtr := MonitorTestRun{ StdFields: sf, Config: conf, + Meta: meta, Events: func() []*beat.Event { return events }, @@ -209,6 +216,7 @@ func (sdb *ScenarioDB) RunTagWithATwist(t *testing.T, tagName string, twist *Twi type MonitorTestRun struct { StdFields stdfields.StdMonitorFields + Meta ScenarioRunMeta Config mapstr.M Events func() []*beat.Event monitor *monitors.Monitor @@ -216,9 +224,10 @@ type MonitorTestRun struct { close func() } -func runMonitorOnce(t *testing.T, monitorConfig mapstr.M, location *hbconfig.LocationWithID, stateLoader monitorstate.StateLoader) (mtr *MonitorTestRun, err error) { +func runMonitorOnce(t *testing.T, monitorConfig mapstr.M, meta ScenarioRunMeta, location *hbconfig.LocationWithID, stateLoader monitorstate.StateLoader) (mtr *MonitorTestRun, err error) { mtr = &MonitorTestRun{ Config: monitorConfig, + Meta: meta, StdFields: stdfields.StdMonitorFields{ RunFrom: location, }, diff --git a/x-pack/heartbeat/scenarios/framework/framework_test.go b/x-pack/heartbeat/scenarios/framework/framework_test.go index 97316106e7f..243f7c25477 100644 --- a/x-pack/heartbeat/scenarios/framework/framework_test.go +++ b/x-pack/heartbeat/scenarios/framework/framework_test.go @@ -17,13 +17,13 @@ import ( var testScenario Scenario = Scenario{ Name: "My Scenario", Tags: []string{"testTag"}, - Runner: func(t *testing.T) (config mapstr.M, close func(), err error) { + Runner: func(t *testing.T) (config mapstr.M, meta ScenarioRunMeta, close func(), err error) { return mapstr.M{ "type": "http", "id": "testID", "name": "testName", "schedule": "@every 10s", - }, nil, nil + }, meta, nil, nil }, RunFrom: &config.LocationWithID{ ID: "TestID", diff --git a/x-pack/heartbeat/scenarios/scenarios.go b/x-pack/heartbeat/scenarios/scenarios.go index fe0e1bbee16..31f95270ee1 100644 --- a/x-pack/heartbeat/scenarios/scenarios.go +++ b/x-pack/heartbeat/scenarios/scenarios.go @@ -12,11 +12,13 @@ import ( "github.com/elastic/elastic-agent-libs/mapstr" + "github.com/elastic/beats/v7/heartbeat/monitors/wrappers/monitorstate" "github.com/elastic/beats/v7/x-pack/heartbeat/scenarios/framework" ) var scenarioDB = framework.NewScenarioDB() var testWs *httptest.Server +var failingTestWs *httptest.Server // Note, no browser scenarios here, those all go in browserscenarios.go // since they have different build tags @@ -25,9 +27,11 @@ func init() { framework.Scenario{ Name: "http-simple", Type: "http", - Tags: []string{"lightweight", "http"}, - Runner: func(t *testing.T) (config mapstr.M, close func(), err error) { + Tags: []string{"lightweight", "http", "up"}, + Runner: func(t *testing.T) (config mapstr.M, meta framework.ScenarioRunMeta, close func(), err error) { server := startTestWebserver(t) + meta.URL, _ = url.Parse(server.URL) + meta.Status = monitorstate.StatusUp config = mapstr.M{ "id": "http-test-id", "name": "http-test-name", @@ -35,19 +39,62 @@ func init() { "schedule": "@every 1m", "urls": []string{server.URL}, } - return config, nil, nil + return config, meta, nil, nil + }, + }, + framework.Scenario{ + Name: "http-down", + Type: "http", + Tags: []string{"lightweight", "http", "down"}, + Runner: func(t *testing.T) (config mapstr.M, meta framework.ScenarioRunMeta, close func(), err error) { + server := startFailingTestWebserver(t) + u := server.URL + meta.URL, _ = url.Parse(u) + meta.Status = monitorstate.StatusDown + config = mapstr.M{ + "id": "http-test-id", + "name": "http-test-name", + "type": "http", + "schedule": "@every 1m", + "urls": []string{u}, + } + return config, meta, nil, nil }, }, framework.Scenario{ Name: "tcp-simple", Type: "tcp", - Tags: []string{"lightweight", "tcp"}, - Runner: func(t *testing.T) (config mapstr.M, close func(), err error) { + Tags: []string{"lightweight", "tcp", "up"}, + Runner: func(t *testing.T) (config mapstr.M, meta framework.ScenarioRunMeta, close func(), err error) { server := startTestWebserver(t) parsedUrl, err := url.Parse(server.URL) if err != nil { panic(fmt.Sprintf("URL %s should always be parsable: %s", server.URL, err)) } + parsedUrl.Scheme = "tcp" + meta.URL = parsedUrl + meta.Status = monitorstate.StatusUp + config = mapstr.M{ + "id": "tcp-test-id", + "name": "tcp-test-name", + "type": "tcp", + "schedule": "@every 1m", + "hosts": []string{parsedUrl.Host}, // Host includes host:port + } + return config, meta, nil, nil + }, + }, + framework.Scenario{ + Name: "tcp-down", + Type: "tcp", + Tags: []string{"lightweight", "tcp", "down"}, + Runner: func(t *testing.T) (config mapstr.M, meta framework.ScenarioRunMeta, close func(), err error) { + // This ip should never route anywhere + // see https://stackoverflow.com/questions/528538/non-routable-ip-address + parsedUrl, _ := url.Parse("tcp://192.0.2.0:8282") + parsedUrl.Scheme = "tcp" + meta.URL = parsedUrl + meta.Status = monitorstate.StatusDown config = mapstr.M{ "id": "tcp-test-id", "name": "tcp-test-name", @@ -55,21 +102,23 @@ func init() { "schedule": "@every 1m", "hosts": []string{parsedUrl.Host}, // Host includes host:port } - return config, nil, nil + return config, meta, nil, nil }, }, framework.Scenario{ Name: "simple-icmp", Type: "icmp", - Tags: []string{"icmp"}, - Runner: func(t *testing.T) (config mapstr.M, close func(), err error) { + Tags: []string{"icmp", "up"}, + Runner: func(t *testing.T) (config mapstr.M, meta framework.ScenarioRunMeta, close func(), err error) { + meta.URL, _ = url.Parse("icp://127.0.0.1") + meta.Status = monitorstate.StatusUp return mapstr.M{ "id": "icmp-test-id", "name": "icmp-test-name", "type": "icmp", "schedule": "@every 1m", "hosts": []string{"127.0.0.1"}, - }, func() {}, nil + }, meta, nil, nil }, }, ) diff --git a/x-pack/heartbeat/scenarios/stateloader_test.go b/x-pack/heartbeat/scenarios/stateloader_test.go index e3ea54a0691..c83ebafc0c7 100644 --- a/x-pack/heartbeat/scenarios/stateloader_test.go +++ b/x-pack/heartbeat/scenarios/stateloader_test.go @@ -9,7 +9,6 @@ import ( "github.com/stretchr/testify/assert" - "github.com/elastic/beats/v7/heartbeat/monitors/wrappers/monitorstate" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/x-pack/heartbeat/scenarios/framework" ) @@ -35,7 +34,7 @@ func TestStateContinuity(t *testing.T) { lastSS := framework.LastState(mtr.Events()) - assert.Equal(t, monitorstate.StatusUp, lastSS.State.Status, "monitor was unexpectedly down, synthetics console output: %s, errors", sout, errors) + assert.Equal(t, mtr.Meta.Status, lastSS.State.Status, "monitor had unexpected state %v, synthetics console output: %s, errors", lastSS.State.Status, sout, errors) allSS := framework.AllStates(mtr.Events()) assert.Len(t, allSS, numRuns) diff --git a/x-pack/heartbeat/scenarios/testws.go b/x-pack/heartbeat/scenarios/testws.go index badfdb27236..bbcc193592b 100644 --- a/x-pack/heartbeat/scenarios/testws.go +++ b/x-pack/heartbeat/scenarios/testws.go @@ -19,18 +19,29 @@ import ( ) var testWsOnce = &sync.Once{} +var failingTestWsOnce = &sync.Once{} // Starting this thing up is expensive, let's just do it once func startTestWebserver(t *testing.T) *httptest.Server { testWsOnce.Do(func() { testWs = httptest.NewServer(hbtest.HelloWorldHandler(200)) - waitForWs(t, testWs.URL) + waitForWs(t, testWs.URL, 200) }) return testWs } +func startFailingTestWebserver(t *testing.T) *httptest.Server { + failingTestWsOnce.Do(func() { + failingTestWs = httptest.NewServer(hbtest.HelloWorldHandler(400)) + + waitForWs(t, failingTestWs.URL, 400) + }) + + return failingTestWs +} + func StartStatefulTestWS(t *testing.T, statuses []int) *httptest.Server { mtx := sync.Mutex{} statusIdx := 0 @@ -49,19 +60,19 @@ func StartStatefulTestWS(t *testing.T, statuses []int) *httptest.Server { })) // wait for ws to become available - waitForWs(t, testWs.URL) + waitForWs(t, testWs.URL, 200) return testWs } -func waitForWs(t *testing.T, url string) { +func waitForWs(t *testing.T, url string, statusCode int) { require.Eventuallyf( t, func() bool { req, _ := http.NewRequestWithContext(context.Background(), http.MethodGet, url, nil) resp, _ := http.DefaultClient.Do(req) resp.Body.Close() - return resp.StatusCode == 200 + return resp.StatusCode == statusCode }, 10*time.Second, 250*time.Millisecond, "could not start webserver", ) diff --git a/x-pack/heartbeat/scenarios/twists.go b/x-pack/heartbeat/scenarios/twists.go index 3109b5e73d9..5f4d1093020 100644 --- a/x-pack/heartbeat/scenarios/twists.go +++ b/x-pack/heartbeat/scenarios/twists.go @@ -40,10 +40,10 @@ func TwistMaxAttempts(maxAttempts int) *framework.Twist { return framework.MakeTwist(fmt.Sprintf("run with %d max_attempts", maxAttempts), func(s framework.Scenario) framework.Scenario { s.Tags = append(s.Tags, "retry") origRunner := s.Runner - s.Runner = func(t *testing.T) (config mapstr.M, close func(), err error) { - config, close, err = origRunner(t) + s.Runner = func(t *testing.T) (config mapstr.M, meta framework.ScenarioRunMeta, close func(), err error) { + config, meta, close, err = origRunner(t) config["max_attempts"] = maxAttempts - return config, close, err + return config, meta, close, err } return s })