diff --git a/event-handler/app.go b/event-handler/app.go index bdd195435..41541c628 100644 --- a/event-handler/app.go +++ b/event-handler/app.go @@ -179,7 +179,7 @@ func (a *App) init(ctx context.Context) error { return trace.Wrap(err) } - t, err := NewTeleportEventsWatcher(ctx, a.Config, *startTime, latestCursor, latestID) + t, err := NewTeleportEventsWatcher(ctx, a.Config, *startTime, latestCursor, latestID, s) if err != nil { return trace.Wrap(err) } diff --git a/event-handler/teleport_events_watcher.go b/event-handler/teleport_events_watcher.go index f715f3191..5e81a698d 100644 --- a/event-handler/teleport_events_watcher.go +++ b/event-handler/teleport_events_watcher.go @@ -71,6 +71,7 @@ type TeleportEventsWatcher struct { config *StartCmdConfig // startTime is event time frame start startTime time.Time + state *State } // NewTeleportEventsWatcher builds Teleport client instance @@ -80,6 +81,7 @@ func NewTeleportEventsWatcher( startTime time.Time, cursor string, id string, + state *State, ) (*TeleportEventsWatcher, error) { var creds []client.Credentials switch { @@ -207,16 +209,49 @@ func (t *TeleportEventsWatcher) fetch(ctx context.Context) error { // getEvents calls Teleport client and loads events func (t *TeleportEventsWatcher) getEvents(ctx context.Context) ([]*auditlogpb.EventUnstructured, string, error) { - return t.client.SearchUnstructuredEvents( - ctx, - t.startTime, - time.Now().UTC(), - "default", - t.config.Types, - t.config.BatchSize, - types.EventOrderAscending, - t.cursor, - ) + rangeSplitByDay := splitRangeByDay(t.startTime, time.Now().UTC()) + for i := 1; i < len(rangeSplitByDay); i++ { + startTime := rangeSplitByDay[i-1] + endTime := rangeSplitByDay[i] + log.Debugf("Fetching events from %v to %v", startTime, endTime) + evts, cursor, err := t.client.SearchUnstructuredEvents( + ctx, + startTime, + endTime, + "default", + t.config.Types, + t.config.BatchSize, + types.EventOrderAscending, + t.cursor, + ) + if err != nil { + return nil, "", trace.Wrap(err) + } + + // if no events are found, the cursor is out of the range [startTime, endTime] + // and it's the last complete day, update start time to the next day. + if len(evts) == 0 && i < len(rangeSplitByDay)-2 { + log.Infof("No events found for the range %v to %v", startTime, endTime) + // update start time to the next day + if err := t.state.SetStartTime(&endTime); err != nil { + return nil, "", trace.Wrap(err) + } + continue + } + // if any events are found, return them + return evts, cursor, nil + + } + return nil, t.cursor, nil +} + +func splitRangeByDay(from, to time.Time) []time.Time { + // splitRangeByDay splits the range into days + var days []time.Time + for d := from; d.Before(to); d = d.AddDate(0, 0, 1) { + days = append(days, d) + } + return append(days, to) // add the last date } // pause sleeps for timeout seconds diff --git a/event-handler/teleport_events_watcher_test.go b/event-handler/teleport_events_watcher_test.go index b3242ac7c..cf7083202 100644 --- a/event-handler/teleport_events_watcher_test.go +++ b/event-handler/teleport_events_watcher_test.go @@ -27,6 +27,7 @@ import ( "github.com/gravitational/teleport/api/types" "github.com/gravitational/teleport/api/types/events" "github.com/gravitational/trace" + "github.com/peterbourgon/diskv/v3" "github.com/stretchr/testify/require" "golang.org/x/net/context" ) @@ -121,6 +122,12 @@ func (c *mockTeleportEventWatcher) Close() error { } func newTeleportEventWatcher(t *testing.T, eventsClient TeleportSearchEventsClient) *TeleportEventsWatcher { + dv := diskv.New(diskv.Options{ + BasePath: t.TempDir(), + Transform: func(s string) []string { return []string{} }, + CacheSizeMax: cacheSizeMaxBytes, + }) + client := &TeleportEventsWatcher{ client: eventsClient, pos: -1, @@ -130,6 +137,7 @@ func newTeleportEventWatcher(t *testing.T, eventsClient TeleportSearchEventsClie ExitOnLastEvent: true, }, }, + state: &State{dv}, } return client