From 02ab24647d37155e785bb5ade2b008e9730bfca3 Mon Sep 17 00:00:00 2001 From: Tiago Silva Date: Mon, 13 May 2024 15:31:34 +0100 Subject: [PATCH] Event-handler: call `SearchUnstructuredEvents` with smaller windows This PR aims to reduce the windows sent from `SearchUnstructuredEvents`. StartTime is never update so it's kept with its initial value which causes problems to the event handler sending windows that include more than 1Gb of data when using the Athena backend which causes failures. Signed-off-by: Tiago Silva --- event-handler/app.go | 2 +- event-handler/teleport_events_watcher.go | 55 +++++++++++++++---- event-handler/teleport_events_watcher_test.go | 8 +++ 3 files changed, 54 insertions(+), 11 deletions(-) diff --git a/event-handler/app.go b/event-handler/app.go index bdd195435..41541c628 100644 --- a/event-handler/app.go +++ b/event-handler/app.go @@ -179,7 +179,7 @@ func (a *App) init(ctx context.Context) error { return trace.Wrap(err) } - t, err := NewTeleportEventsWatcher(ctx, a.Config, *startTime, latestCursor, latestID) + t, err := NewTeleportEventsWatcher(ctx, a.Config, *startTime, latestCursor, latestID, s) if err != nil { return trace.Wrap(err) } diff --git a/event-handler/teleport_events_watcher.go b/event-handler/teleport_events_watcher.go index f715f3191..5e81a698d 100644 --- a/event-handler/teleport_events_watcher.go +++ b/event-handler/teleport_events_watcher.go @@ -71,6 +71,7 @@ type TeleportEventsWatcher struct { config *StartCmdConfig // startTime is event time frame start startTime time.Time + state *State } // NewTeleportEventsWatcher builds Teleport client instance @@ -80,6 +81,7 @@ func NewTeleportEventsWatcher( startTime time.Time, cursor string, id string, + state *State, ) (*TeleportEventsWatcher, error) { var creds []client.Credentials switch { @@ -207,16 +209,49 @@ func (t *TeleportEventsWatcher) fetch(ctx context.Context) error { // getEvents calls Teleport client and loads events func (t *TeleportEventsWatcher) getEvents(ctx context.Context) ([]*auditlogpb.EventUnstructured, string, error) { - return t.client.SearchUnstructuredEvents( - ctx, - t.startTime, - time.Now().UTC(), - "default", - t.config.Types, - t.config.BatchSize, - types.EventOrderAscending, - t.cursor, - ) + rangeSplitByDay := splitRangeByDay(t.startTime, time.Now().UTC()) + for i := 1; i < len(rangeSplitByDay); i++ { + startTime := rangeSplitByDay[i-1] + endTime := rangeSplitByDay[i] + log.Debugf("Fetching events from %v to %v", startTime, endTime) + evts, cursor, err := t.client.SearchUnstructuredEvents( + ctx, + startTime, + endTime, + "default", + t.config.Types, + t.config.BatchSize, + types.EventOrderAscending, + t.cursor, + ) + if err != nil { + return nil, "", trace.Wrap(err) + } + + // if no events are found, the cursor is out of the range [startTime, endTime] + // and it's the last complete day, update start time to the next day. + if len(evts) == 0 && i < len(rangeSplitByDay)-2 { + log.Infof("No events found for the range %v to %v", startTime, endTime) + // update start time to the next day + if err := t.state.SetStartTime(&endTime); err != nil { + return nil, "", trace.Wrap(err) + } + continue + } + // if any events are found, return them + return evts, cursor, nil + + } + return nil, t.cursor, nil +} + +func splitRangeByDay(from, to time.Time) []time.Time { + // splitRangeByDay splits the range into days + var days []time.Time + for d := from; d.Before(to); d = d.AddDate(0, 0, 1) { + days = append(days, d) + } + return append(days, to) // add the last date } // pause sleeps for timeout seconds diff --git a/event-handler/teleport_events_watcher_test.go b/event-handler/teleport_events_watcher_test.go index b3242ac7c..cf7083202 100644 --- a/event-handler/teleport_events_watcher_test.go +++ b/event-handler/teleport_events_watcher_test.go @@ -27,6 +27,7 @@ import ( "github.com/gravitational/teleport/api/types" "github.com/gravitational/teleport/api/types/events" "github.com/gravitational/trace" + "github.com/peterbourgon/diskv/v3" "github.com/stretchr/testify/require" "golang.org/x/net/context" ) @@ -121,6 +122,12 @@ func (c *mockTeleportEventWatcher) Close() error { } func newTeleportEventWatcher(t *testing.T, eventsClient TeleportSearchEventsClient) *TeleportEventsWatcher { + dv := diskv.New(diskv.Options{ + BasePath: t.TempDir(), + Transform: func(s string) []string { return []string{} }, + CacheSizeMax: cacheSizeMaxBytes, + }) + client := &TeleportEventsWatcher{ client: eventsClient, pos: -1, @@ -130,6 +137,7 @@ func newTeleportEventWatcher(t *testing.T, eventsClient TeleportSearchEventsClie ExitOnLastEvent: true, }, }, + state: &State{dv}, } return client