Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Heartbeat] Fix summarizer #36519

Merged
merged 41 commits into from
Sep 19, 2023
Merged
Show file tree
Hide file tree
Changes from 34 commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
cc98960
Cleanup summarizer code
andrewvc Sep 5, 2023
a609fe4
Separate concerns in summarizer
andrewvc Sep 6, 2023
3c5b677
Checkpoint
andrewvc Sep 6, 2023
04076e6
Fix failing tests
andrewvc Sep 6, 2023
ba9ef02
FMT
andrewvc Sep 7, 2023
0e1ca64
Tweaks
andrewvc Sep 7, 2023
f23be6e
Merge remote-tracking branch 'origin/main' into fix-summarizer
andrewvc Sep 7, 2023
7d5512c
Make linter happy
andrewvc Sep 7, 2023
2b38f1f
progress
andrewvc Sep 8, 2023
6776b3a
cleanup docs
andrewvc Sep 8, 2023
16f74bf
Bring back wrappers tests (partial)
andrewvc Sep 8, 2023
692e29a
Restore wrapper tests
andrewvc Sep 8, 2023
1ec207b
Fix failing tests
andrewvc Sep 8, 2023
6ed4e2f
Fix err handling
andrewvc Sep 8, 2023
9689e66
Re-init plugins on retry
andrewvc Sep 8, 2023
1c93f1f
Fix error field handling across retries
andrewvc Sep 8, 2023
cfe31c2
Incorporate PR feedback
andrewvc Sep 12, 2023
1bc7f6b
Type fix
andrewvc Sep 12, 2023
4c749de
Merge remote-tracking branch 'origin/main' into fix-summarizer
andrewvc Sep 12, 2023
9dcfafe
URLs now work, tests passing
andrewvc Sep 12, 2023
fafa379
Improved err handling
andrewvc Sep 12, 2023
969625c
Test fixes
andrewvc Sep 12, 2023
0294d1b
Cleanup naming
andrewvc Sep 13, 2023
089a72c
Fix handling of step counts / journey/end missing and also fix contin…
andrewvc Sep 13, 2023
e61563c
Fix failing tests around logging / logging behavior
andrewvc Sep 13, 2023
71df932
Rename OnRetry to BeforeRetry
andrewvc Sep 13, 2023
c9784ff
Move monitor.status calculation for browsers into summarizer
andrewvc Sep 13, 2023
6418d53
Cleanup status logic
andrewvc Sep 13, 2023
e51378c
More status consolidation
andrewvc Sep 13, 2023
98d9721
Fixed failing tests
andrewvc Sep 13, 2023
ebb07d8
Make monitor logger errors more understandable
andrewvc Sep 13, 2023
f56570a
Fix retry delay
andrewvc Sep 14, 2023
170464f
Fix retry delay
andrewvc Sep 14, 2023
99b1d66
Remove spurious 'wrapped:' in logs
andrewvc Sep 14, 2023
019e72f
Incorporate pr feedback
andrewvc Sep 14, 2023
7dec5a4
Fix dur
andrewvc Sep 14, 2023
acf0daf
Fix cmd status
andrewvc Sep 14, 2023
faa4164
Fix tests
andrewvc Sep 14, 2023
b533a1c
Fmt
andrewvc Sep 14, 2023
74d685a
Integrate PR feedback
andrewvc Sep 18, 2023
4c0a49f
Merge branch 'main' into fix-summarizer
vigneshshanmugam Sep 18, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions heartbeat/hbtest/hbtestutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"github.com/elastic/beats/v7/heartbeat/ecserr"
"github.com/elastic/beats/v7/heartbeat/monitors/active/dialchain/tlsmeta"
"github.com/elastic/beats/v7/heartbeat/monitors/wrappers/summarizer/summarizertesthelper"
"github.com/elastic/beats/v7/heartbeat/monitors/wrappers/wraputil"

"github.com/elastic/beats/v7/heartbeat/hbtestllext"

Expand All @@ -49,7 +50,6 @@ import (
"github.com/elastic/go-lookslike/isdef"
"github.com/elastic/go-lookslike/validator"

"github.com/elastic/beats/v7/heartbeat/monitors/wrappers"
"github.com/elastic/beats/v7/libbeat/common/x509util"
)

Expand Down Expand Up @@ -172,6 +172,7 @@ func BaseChecks(ip string, status string, typ string) validator.Validator {
}

return lookslike.Compose(
hbtestllext.MaybeHasEventType,
lookslike.MustCompile(map[string]interface{}{
"monitor": map[string]interface{}{
"ip": ipCheck,
Expand Down Expand Up @@ -223,8 +224,10 @@ func SimpleURLChecks(t *testing.T, scheme string, host string, port uint16) vali

// URLChecks returns a validator for the given URL's fields
func URLChecks(t *testing.T, u *url.URL) validator.Validator {
t.Helper()
require.NotNil(t, u)
return lookslike.MustCompile(map[string]interface{}{
"url": wrappers.URLFields(u),
"url": wraputil.URLFields(u),
})
}

Expand Down
12 changes: 12 additions & 0 deletions heartbeat/hbtestllext/validators.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package hbtestllext

import (
"github.com/elastic/go-lookslike"
"github.com/elastic/go-lookslike/isdef"
)

// MonitorTimespanValidator is tests for the `next_run` and `next_run_in.us` keys.
Expand All @@ -30,3 +31,14 @@ var MonitorTimespanValidator = lookslike.MustCompile(map[string]interface{}{
},
},
})

var MaybeHasEventType = lookslike.MustCompile(map[string]interface{}{
"event": map[string]interface{}{
"type": isdef.Optional(isdef.IsNonEmptyString),
},
"synthetics.type": isdef.Optional(isdef.IsNonEmptyString),
})

var MaybeHasDuration = lookslike.MustCompile(map[string]interface{}{
"monitor.duration.us": IsInt64,
})
15 changes: 10 additions & 5 deletions heartbeat/look/look.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,21 @@ import (
// RTT formats a round-trip-time given as time.Duration into an
// event field. The duration is stored in `{"us": rtt}`.
func RTT(rtt time.Duration) mapstr.M {
if rtt < 0 {
rtt = 0
}

return mapstr.M{
// cast to int64 since a go duration is a nano, but we want micros
// This makes the types less confusing because other wise the duration
// we get back has the wrong unit
"us": rtt.Microseconds(),
"us": RTTMS(rtt),
}
}

// RTTMS returns the given time.Duration as an int64 in microseconds, with a value of 0
// if input is negative.
func RTTMS(rtt time.Duration) int64 {
if rtt < 0 {
return 0
}
return rtt.Microseconds()
}

// Reason formats an error into an error event field.
Expand Down
4 changes: 2 additions & 2 deletions heartbeat/monitors/active/http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@ import (
"net/url"

"github.com/elastic/beats/v7/heartbeat/monitors/plugin"
"github.com/elastic/beats/v7/heartbeat/monitors/wrappers/wraputil"
"github.com/elastic/beats/v7/libbeat/version"
conf "github.com/elastic/elastic-agent-libs/config"

"github.com/elastic/beats/v7/heartbeat/monitors/jobs"
"github.com/elastic/beats/v7/heartbeat/monitors/wrappers"
"github.com/elastic/elastic-agent-libs/transport/httpcommon"
"github.com/elastic/elastic-agent-libs/transport/tlscommon"
"github.com/elastic/elastic-agent-libs/useragent"
Expand Down Expand Up @@ -116,7 +116,7 @@ func create(

// Assign any execution errors to the error field and
// assign the url field
js[i] = wrappers.WithURLField(u, job)
js[i] = wraputil.WithURLField(u, job)
}

return plugin.Plugin{Jobs: js, Endpoints: len(config.Hosts)}, nil
Expand Down
3 changes: 2 additions & 1 deletion heartbeat/monitors/active/http/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ import (
"github.com/elastic/beats/v7/heartbeat/monitors/jobs"
"github.com/elastic/beats/v7/heartbeat/monitors/stdfields"
"github.com/elastic/beats/v7/heartbeat/monitors/wrappers"
"github.com/elastic/beats/v7/heartbeat/monitors/wrappers/wraputil"
"github.com/elastic/beats/v7/heartbeat/scheduler/schedule"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common/file"
Expand Down Expand Up @@ -110,7 +111,7 @@ func checkServer(t *testing.T, handlerFunc http.HandlerFunc, useUrls bool) (*htt
func urlChecks(urlStr string) validator.Validator {
u, _ := url.Parse(urlStr)
return lookslike.MustCompile(map[string]interface{}{
"url": wrappers.URLFields(u),
"url": wraputil.URLFields(u),
})
}

Expand Down
4 changes: 2 additions & 2 deletions heartbeat/monitors/active/icmp/icmp.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,14 @@ import (
"net/url"

"github.com/elastic/beats/v7/heartbeat/monitors/plugin"
"github.com/elastic/beats/v7/heartbeat/monitors/wrappers/wraputil"
conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/mapstr"

"github.com/elastic/beats/v7/heartbeat/eventext"
"github.com/elastic/beats/v7/heartbeat/look"
"github.com/elastic/beats/v7/heartbeat/monitors"
"github.com/elastic/beats/v7/heartbeat/monitors/jobs"
"github.com/elastic/beats/v7/heartbeat/monitors/wrappers"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/elastic-agent-libs/logp"
)
Expand Down Expand Up @@ -107,7 +107,7 @@ func (jf *jobFactory) makePlugin() (plugin2 plugin.Plugin, err error) {
return plugin.Plugin{}, err
}

j = append(j, wrappers.WithURLField(u, job))
j = append(j, wraputil.WithURLField(u, job))
}

return plugin.Plugin{Jobs: j, Endpoints: len(jf.config.Hosts)}, nil
Expand Down
6 changes: 3 additions & 3 deletions heartbeat/monitors/active/tcp/tcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
"github.com/elastic/beats/v7/heartbeat/monitors/active/dialchain/tlsmeta"
"github.com/elastic/beats/v7/heartbeat/monitors/jobs"
"github.com/elastic/beats/v7/heartbeat/monitors/plugin"
"github.com/elastic/beats/v7/heartbeat/monitors/wrappers"
"github.com/elastic/beats/v7/heartbeat/monitors/wrappers/wraputil"
"github.com/elastic/beats/v7/heartbeat/reason"
"github.com/elastic/beats/v7/libbeat/beat"
conf "github.com/elastic/elastic-agent-libs/config"
Expand Down Expand Up @@ -130,7 +130,7 @@ func (jf *jobFactory) makeJobs() ([]jobs.Job, error) {
if err != nil {
return nil, err
}
jobs = append(jobs, wrappers.WithURLField(url, endpointJob))
jobs = append(jobs, wraputil.WithURLField(url, endpointJob))
}

}
Expand Down Expand Up @@ -174,7 +174,7 @@ func (jf *jobFactory) makeDirectEndpointJob(endpointURL *url.URL) (jobs.Job, err

// makeSocksLookupEndpointJob makes jobs that use a Socks5 proxy to perform DNS lookups
func (jf *jobFactory) makeSocksLookupEndpointJob(endpointURL *url.URL) jobs.Job {
return wrappers.WithURLField(endpointURL,
return wraputil.WithURLField(endpointURL,
jobs.MakeSimpleJob(func(event *beat.Event) error {
hostPort := net.JoinHostPort(endpointURL.Hostname(), endpointURL.Port())
return jf.dial(event, hostPort, endpointURL)
Expand Down
22 changes: 18 additions & 4 deletions heartbeat/monitors/logger/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"sync"
"time"

"github.com/elastic/beats/v7/heartbeat/monitors/wrappers/summarizer/jobsummary"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/elastic-agent-libs/logp"
)
Expand All @@ -44,6 +45,7 @@ type MonitorRunInfo struct {
Duration int64 `json:"-"`
Steps *int `json:"steps,omitempty"`
Status string `json:"status"`
Attempt int `json:"attempt"`
}

func (m *MonitorRunInfo) MarshalJSON() ([]byte, error) {
Expand Down Expand Up @@ -78,22 +80,33 @@ func extractRunInfo(event *beat.Event) (*MonitorRunInfo, error) {
errors := []error{}
monitorID, err := event.GetValue("monitor.id")
if err != nil {
errors = append(errors, err)
errors = append(errors, fmt.Errorf("could not extract monitor.id: %w", err))
}

durationUs, err := event.GetValue("monitor.duration.us")
if err != nil {
errors = append(errors, err)
errors = append(errors, fmt.Errorf("could not extract monitor.duration.us: %w", err))
}

monType, err := event.GetValue("monitor.type")
if err != nil {
errors = append(errors, err)
errors = append(errors, fmt.Errorf("could not extract monitor.type: %w", err))
}

status, err := event.GetValue("monitor.status")
if err != nil {
errors = append(errors, err)
errors = append(errors, fmt.Errorf("could not extract monitor.status: %w", err))
}

jsIface, err := event.GetValue("summary")
var attempt int
if err != nil {
errors = append(errors, fmt.Errorf("could not extract summary to add attempt info: %w", err))
} else {
js, ok := jsIface.(*jobsummary.JobSummary)
if ok && js != nil {
attempt = int(js.Attempt)
}
}

if len(errors) > 0 {
Expand All @@ -105,6 +118,7 @@ func extractRunInfo(event *beat.Event) (*MonitorRunInfo, error) {
Type: monType.(string),
Duration: durationUs.(int64),
Status: status.(string),
Attempt: attempt,
}

sc, _ := event.Meta.GetValue(META_STEP_COUNT)
Expand Down
3 changes: 3 additions & 0 deletions heartbeat/monitors/logger/logger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"go.uber.org/zap/zaptest/observer"

"github.com/elastic/beats/v7/heartbeat/eventext"
"github.com/elastic/beats/v7/heartbeat/monitors/wrappers/summarizer/jobsummary"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/mapstr"
Expand All @@ -47,6 +48,7 @@ func TestLogRun(t *testing.T) {
"monitor.duration.us": durationUs,
"monitor.type": "browser",
"monitor.status": "down",
"summary": jobsummary.NewJobSummary(1, 1, "abc"),
}

event := beat.Event{Fields: fields}
Expand All @@ -64,6 +66,7 @@ func TestLogRun(t *testing.T) {
Duration: durationUs,
Status: "down",
Steps: &steps,
Attempt: 1,
}

assert.ElementsMatch(t, []zap.Field{
Expand Down
1 change: 1 addition & 0 deletions heartbeat/monitors/mocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ func baseMockEventMonitorValidator(id string, name string, status string) valida

func mockEventMonitorValidator(id string, name string) validator.Validator {
return lookslike.Strict(lookslike.Compose(
hbtestllext.MaybeHasEventType,
baseMockEventMonitorValidator(id, name, "up"),
hbtestllext.MonitorTimespanValidator,
hbtest.SummaryStateChecks(1, 0),
Expand Down
8 changes: 4 additions & 4 deletions heartbeat/monitors/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
"github.com/elastic/beats/v7/heartbeat/eventext"
"github.com/elastic/beats/v7/heartbeat/look"
"github.com/elastic/beats/v7/heartbeat/monitors/jobs"
"github.com/elastic/beats/v7/heartbeat/monitors/wrappers"
"github.com/elastic/beats/v7/heartbeat/monitors/wrappers/wraputil"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/elastic-agent-libs/mapstr"
)
Expand Down Expand Up @@ -114,7 +114,7 @@ func MakeByIPJob(
"monitor": mapstr.M{"ip": addr.String()},
}

return wrappers.WithFields(fields, pingFactory(addr)), nil
return wraputil.WithFields(fields, pingFactory(addr)), nil
}

// MakeByHostJob creates a new Job including host lookup. The pingFactory will be used to
Expand Down Expand Up @@ -165,7 +165,7 @@ func makeByHostAnyIPJob(
resolveRTT := resolveEnd.Sub(resolveStart)

ipFields := resolveIPEvent(ip.String(), resolveRTT)
return wrappers.WithFields(ipFields, pingFactory(ip))(event)
return wraputil.WithFields(ipFields, pingFactory(ip))(event)
}
}

Expand Down Expand Up @@ -206,7 +206,7 @@ func makeByHostAllIPJob(
for i, ip := range ips {
addr := &net.IPAddr{IP: ip}
ipFields := resolveIPEvent(ip.String(), resolveRTT)
cont[i] = wrappers.WithFields(ipFields, pingFactory(addr))
cont[i] = wraputil.WithFields(ipFields, pingFactory(addr))
}
// Ideally we would test this invocation. This function however is really hard to to test given all the extra context it takes in
// In a future refactor we could perhaps test that this in correctly invoked.
Expand Down
57 changes: 57 additions & 0 deletions heartbeat/monitors/wrappers/summarizer/jobsummary/jobsummary.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package jobsummary

import (
"fmt"

"github.com/elastic/beats/v7/heartbeat/monitors/wrappers/monitorstate"
)

// JobSummary is the struct that is serialized in the `summary` field in the emitted event.
type JobSummary struct {
Attempt uint16 `json:"attempt"`
MaxAttempts uint16 `json:"max_attempts"`
FinalAttempt bool `json:"final_attempt"`
Up uint16 `json:"up"`
Down uint16 `json:"down"`
Status monitorstate.StateStatus `json:"status"`
RetryGroup string `json:"retry_group"`
}

func NewJobSummary(attempt uint16, maxAttempts uint16, retryGroup string) *JobSummary {
if maxAttempts < 1 {
maxAttempts = 1
}

return &JobSummary{
MaxAttempts: maxAttempts,
Attempt: attempt,
RetryGroup: retryGroup,
}
}

// BumpAttempt swaps the JobSummary object's pointer for a new job summary
// that is a clone of the current one but with the Attempt field incremented.
func (js *JobSummary) BumpAttempt() {
*js = *NewJobSummary(js.Attempt+1, js.MaxAttempts, js.RetryGroup)
}

func (js *JobSummary) String() string {
return fmt.Sprintf("<JobSummary status=%s attempt=%d/%d, final=%t, up=%d/%d retryGroup=%s>", js.Status, js.Attempt, js.MaxAttempts, js.FinalAttempt, js.Up, js.Down, js.RetryGroup)
}
Loading
Loading