diff --git a/lib/events/export/date_exporter.go b/lib/events/export/date_exporter.go
new file mode 100644
index 0000000000000..cee3e3a36278c
--- /dev/null
+++ b/lib/events/export/date_exporter.go
@@ -0,0 +1,461 @@
+/*
+ * Teleport
+ * Copyright (C) 2024 Gravitational, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+
+package export
+
+import (
+ "context"
+ "log/slog"
+ "sync"
+ "sync/atomic"
+ "time"
+
+ "github.com/gravitational/trace"
+ "golang.org/x/time/rate"
+ "google.golang.org/protobuf/types/known/timestamppb"
+
+ auditlogpb "github.com/gravitational/teleport/api/gen/proto/go/teleport/auditlog/v1"
+ "github.com/gravitational/teleport/api/internalutils/stream"
+ "github.com/gravitational/teleport/api/utils/retryutils"
+ "github.com/gravitational/teleport/lib/utils"
+ "github.com/gravitational/teleport/lib/utils/interval"
+)
+
+// Client is the subset of the audit event client that is used by the date exporter.
+type Client interface {
+ ExportUnstructuredEvents(ctx context.Context, req *auditlogpb.ExportUnstructuredEventsRequest) stream.Stream[*auditlogpb.ExportEventUnstructured]
+ GetEventExportChunks(ctx context.Context, req *auditlogpb.GetEventExportChunksRequest) stream.Stream[*auditlogpb.EventExportChunk]
+}
+
+// DateExporterConfig configures the date exporter.
+type DateExporterConfig struct {
+ // Client is the audit event client used to fetch and export events.
+ Client Client
+ // Date is the target date to export events from.
+ Date time.Time
+ // Export is the callback used to export events. Must be safe for concurrent use if
+ // the Concurrency parameter is greater than 1.
+ Export func(ctx context.Context, event *auditlogpb.ExportEventUnstructured) error
+ // OnIdle is an optional callback that gets invoked periodically when the exporter is idle. Note that it is
+ // safe to close the exporter or inspect its state from within this callback, but waiting on the exporter's
+ // Done channel within this callback will deadlock.
+ OnIdle func(ctx context.Context)
+ // PreviousState is an optional parameter used to resume from a previous date export run.
+ PreviousState DateExporterState
+ // Concurrency sets the maximum number of event chunks that will be processed concurrently (defaults to 1).
+ Concurrency int
+ // MaxBackoff optionally overrides the default maximum backoff applied when errors are hit.
+ MaxBackoff time.Duration
+ // PollInterval optionally overrides the default poll interval used to fetch event chunks.
+ PollInterval time.Duration
+}
+
+// CheckAndSetDefaults validates configuration and sets default values for optional parameters.
+func (cfg *DateExporterConfig) CheckAndSetDefaults() error {
+ if cfg.Client == nil {
+ return trace.BadParameter("missing required parameter Client in DateExporterConfig")
+ }
+ if cfg.Export == nil {
+ return trace.BadParameter("missing required parameter Export in DateExporterConfig")
+ }
+ if cfg.Date.IsZero() {
+ return trace.BadParameter("missing required parameter Date in DateExporterConfig")
+ }
+ if cfg.Concurrency == 0 {
+ cfg.Concurrency = 1
+ }
+ if cfg.MaxBackoff == 0 {
+ cfg.MaxBackoff = 90 * time.Second
+ }
+ if cfg.PollInterval == 0 {
+ cfg.PollInterval = 16 * time.Second
+ }
+ return nil
+}
+
+// chunkEntry represents the state of a single event chunk being processed by the date exporter. Unlike
+// the rest of the exporter which uses a basic mutex, the chunk entry uses atomic operations in order to
+// minimize the potential for reads to affect event processing perf.
+type chunkEntry struct {
+ cursor atomic.Pointer[string]
+ done atomic.Bool
+}
+
+func (e *chunkEntry) getCursor() string {
+ p := e.cursor.Load()
+ if p == nil {
+ return ""
+ }
+ return *p
+}
+
+func (e *chunkEntry) setCursor(cursor string) {
+ e.cursor.Store(&cursor)
+}
+
+// DateExporterState represents the current state of the date exporter. State can be used to resume
+// export from a previous run using the PreviousState parameter of the DateExporter config.
+type DateExporterState struct {
+ // Completed is an unordered list of the chunks for which all events have been consumed.
+ Completed []string
+ // Cursors is a map of chunk to cursor for partially completed chunks.
+ Cursors map[string]string
+}
+
+// DateExporter is a utility for exporting events for a given date using the chunked event export APIs. Note that
+// it is specifically designed to prioritize performance and ensure that events aren't missed. It may not yield events
+// in time order, and does not provide a mechanism to decide when export for a given date should be considered complete,
+// since there is no 100% reliable way to determine when all events for a given date have been exported.
+type DateExporter struct {
+ cfg DateExporterConfig
+ log *slog.Logger
+ mainLogLimiter *rate.Limiter
+ chunkLogLimiter *rate.Limiter
+ retry retryutils.Retry
+ sem chan struct{}
+ mu sync.Mutex
+ chunks map[string]*chunkEntry
+ idle bool
+ cancel context.CancelFunc
+ done chan struct{}
+}
+
+// NewDateExporter creates a new date exporter and begin background processing of event chunks. Processing will continue
+// until DateExporter.Stop is called, even if no new chunks are showing up. It is the caller's responsibility to decide
+// when export for a given date should be considered complete by examining the the exporter's progress.
+func NewDateExporter(cfg DateExporterConfig) (*DateExporter, error) {
+ if err := cfg.CheckAndSetDefaults(); err != nil {
+ return nil, trace.Wrap(err)
+ }
+
+ retry, err := retryutils.NewRetryV2(retryutils.RetryV2Config{
+ First: utils.FullJitter(cfg.MaxBackoff / 16),
+ Driver: retryutils.NewExponentialDriver(cfg.MaxBackoff / 16),
+ Max: cfg.MaxBackoff,
+ Jitter: retryutils.NewHalfJitter(),
+ })
+ if err != nil {
+ return nil, trace.Wrap(err)
+ }
+
+ // date exporter should always present a correct chunk progress state. if state from
+ // a previous run was provided as part of the configuration we want to set it up before
+ // creating the exporter to ensure that any concurrent operations being used to
+ // monitor/store progress are always shown the correct state.
+ chunks := make(map[string]*chunkEntry)
+
+ // set up entries for previously completed chunks
+ for _, chunk := range cfg.PreviousState.Completed {
+ entry := new(chunkEntry)
+ entry.done.Store(true)
+ chunks[chunk] = entry
+ }
+
+ // set up entries for partially completed chunks
+ for chunk, cursor := range cfg.PreviousState.Cursors {
+ entry := new(chunkEntry)
+ entry.setCursor(cursor)
+ chunks[chunk] = entry
+ }
+
+ ctx, cancel := context.WithCancel(context.Background())
+
+ exporter := &DateExporter{
+ cfg: cfg,
+ log: slog.With("date", cfg.Date.Format(time.DateOnly)),
+ mainLogLimiter: rate.NewLimiter(rate.Every(time.Minute), 1),
+ chunkLogLimiter: rate.NewLimiter(rate.Every(time.Minute), 3),
+ retry: retry,
+ sem: make(chan struct{}, cfg.Concurrency),
+ chunks: chunks,
+ cancel: cancel,
+ done: make(chan struct{}),
+ }
+
+ go exporter.run(ctx)
+
+ return exporter, nil
+}
+
+// GetState loads the current state of the date exporter. Note that there may be concurrent export operations
+// in progress, meaning that by the time state is observed it may already be outdated.
+func (e *DateExporter) GetState() DateExporterState {
+ e.mu.Lock()
+ defer e.mu.Unlock()
+
+ var completed []string
+ cursors := make(map[string]string)
+
+ for chunk, entry := range e.chunks {
+ if entry.done.Load() {
+ completed = append(completed, chunk)
+ } else {
+ if cursor := entry.getCursor(); cursor != "" {
+ cursors[chunk] = cursor
+ }
+ }
+ }
+
+ return DateExporterState{
+ Completed: completed,
+ Cursors: cursors,
+ }
+}
+
+// IsIdle returns true if the date exporter has successfully discovered and processed all currently extant event chunks. Note that
+// this does not imply that all events for a given date have been processed, since there may be replication delays and/or ongoing
+// activity if the current date is being polled, but it is a strong indicator that all events have been discovered when the exporter
+// is processing dates that are significantly in the past.
+func (e *DateExporter) IsIdle() bool {
+ var idle bool
+ e.withLock(func() {
+ idle = e.idle
+ })
+ return idle
+}
+
+// Close terminates all event processing. Note that shutdown is asynchronous. Any operation that needs to wait for export to fully
+// terminate should wait on Done after calling Close.
+func (e *DateExporter) Close() error {
+ e.cancel()
+ return nil
+}
+
+// Done provides a channel that will be closed when the date exporter has completed processing all event chunks. When saving the
+// final state of the exporter for future resumption, this channel must be waited upon before state is loaded. Note that the date
+// exporter never termiantes unless Close is called, so waiting on Done is only meaningful after Close has been called.
+func (e *DateExporter) Done() <-chan struct{} {
+ return e.done
+}
+
+func (e *DateExporter) run(ctx context.Context) {
+ defer close(e.done)
+ retry := e.retry.Clone()
+
+ poll := interval.New(interval.Config{
+ Duration: e.cfg.PollInterval,
+ FirstDuration: utils.FullJitter(e.cfg.PollInterval / 2),
+ Jitter: retryutils.NewSeventhJitter(),
+ })
+ defer poll.Stop()
+
+ var firstFullCycleCompleted bool
+
+ // resume processing of any partially completed chunks prior to fetching new chunks
+ for chunk, cursor := range e.GetState().Cursors {
+ e.log.InfoContext(ctx, "resuming processing of partially completed chunk", "chunk", chunk, "cursor", cursor)
+ if ok := e.startProcessingChunk(ctx, chunk, cursor); !ok {
+ return
+ }
+ }
+
+ for {
+ n, err := e.fetchAndProcessChunks(ctx)
+ if err != nil {
+ if ctx.Err() != nil {
+ return
+ }
+
+ if e.mainLogLimiter.Allow() {
+ e.log.WarnContext(ctx, "fetch and process of event chunks failed", "error", err)
+ }
+
+ retry.Inc()
+ select {
+ case <-retry.After():
+ case <-ctx.Done():
+ return
+ }
+ continue
+ }
+
+ retry.Reset()
+
+ // if no new chunks were processed, the exporter is considered idle
+ idle := n == 0
+ e.withLock(func() {
+ e.idle = idle
+ })
+ if idle && e.cfg.OnIdle != nil {
+ e.cfg.OnIdle(ctx)
+ }
+
+ // log first success, and periodically log subsequent non-idle cycles
+ if (n > 0 && e.mainLogLimiter.Allow()) || !firstFullCycleCompleted {
+ e.log.InfoContext(ctx, "successful fetch and process of event chunks", "chunks", n, "idle", idle)
+ }
+
+ firstFullCycleCompleted = true
+
+ select {
+ case <-poll.Next():
+ case <-ctx.Done():
+ return
+ }
+ }
+}
+
+// waitForInflightChunks blocks until all inflight chunks have been processed by acquiring all
+// semaphore tokens and then releasing them. note that this operation does not accept a context,
+// which is necessary in order to ensure that Done actually waits for all background processing
+// to halt.
+func (e *DateExporter) waitForInflightChunks() {
+ // acquire all semaphore tokens to block until all inflight chunks have been processed
+ for i := 0; i < e.cfg.Concurrency; i++ {
+ e.sem <- struct{}{}
+ }
+
+ // release all semaphore tokens
+ for i := 0; i < e.cfg.Concurrency; i++ {
+ <-e.sem
+ }
+}
+
+// fetchAndProcessChunks fetches and processes all chunks for the current date. if the function returns
+// without error, all chunks have been successfully processed. note that all *currently known* chunks being
+// processed does not necessarily imply that all events for a given date have been processed since there may
+// be event replication delays, and/or ongoing activity if the current date is being polled.
+func (e *DateExporter) fetchAndProcessChunks(ctx context.Context) (int, error) {
+ // wait for inflight chunks before returning. in theory it would be fine (and possibly more performant)
+ // to return immediately, but doing so makes it difficult to reason about when the exporter has fully exited
+ // and/or how many complete export cycles have been performed.
+ defer e.waitForInflightChunks()
+
+ chunks := e.cfg.Client.GetEventExportChunks(ctx, &auditlogpb.GetEventExportChunksRequest{
+ Date: timestamppb.New(e.cfg.Date),
+ })
+
+ var newChunks int
+
+ for chunks.Next() {
+ // known chunks should be skipped
+ var skip bool
+ e.withLock(func() {
+ if _, ok := e.chunks[chunks.Item().Chunk]; ok {
+ skip = true
+ return
+ }
+
+ // so long as there is at least one undiscovered chunk, the exporter is not considered idle.
+ e.idle = false
+ })
+
+ if skip {
+ continue
+ }
+
+ if ok := e.startProcessingChunk(ctx, chunks.Item().Chunk, "" /* cursor */); !ok {
+ return newChunks, trace.Wrap(ctx.Err())
+ }
+
+ newChunks++
+ }
+
+ if err := chunks.Done(); err != nil {
+ return newChunks, trace.Wrap(err)
+ }
+
+ return newChunks, nil
+}
+
+// startProcessingChunk blocks until a semaphore token is acquired, then starts background processing of the
+// supplied chunk. returns false if the context is canceled before background processing can be started.
+func (e *DateExporter) startProcessingChunk(ctx context.Context, chunk, cursor string) (ok bool) {
+ // acquire semaphore to start concurrent chunk processing
+ select {
+ case e.sem <- struct{}{}:
+ case <-ctx.Done():
+ return false
+ }
+
+ // set up entry so chunk processing status can be tracked
+ entry := new(chunkEntry)
+ if cursor != "" {
+ entry.setCursor(cursor)
+ }
+
+ e.withLock(func() {
+ e.chunks[chunk] = entry
+ })
+
+ // process chunk concurrently
+ go func() {
+ defer func() {
+ <-e.sem
+ }()
+
+ e.processChunk(ctx, chunk, entry)
+ }()
+
+ return true
+}
+
+// processChunk attempts to export events from a given chunk. it will continuously retry until the context is canceled
+// or all events have been successfully exported.
+func (e *DateExporter) processChunk(ctx context.Context, chunk string, entry *chunkEntry) {
+ // note: this retry is never reset since we return on first successful stream consumption
+ retry := e.retry.Clone()
+ var failures int
+Outer:
+ for {
+
+ events := e.cfg.Client.ExportUnstructuredEvents(ctx, &auditlogpb.ExportUnstructuredEventsRequest{
+ Date: timestamppb.New(e.cfg.Date),
+ Chunk: chunk,
+ Cursor: entry.getCursor(),
+ })
+
+ if err := e.exportEvents(ctx, events, entry); err != nil {
+ failures++
+
+ if e.chunkLogLimiter.Allow() {
+ e.log.WarnContext(ctx, "event chunk export failed", "chunk", chunk, "failures", failures, "error", err)
+ }
+ retry.Inc()
+
+ select {
+ case <-retry.After():
+ case <-ctx.Done():
+ return
+ }
+ continue Outer
+ }
+
+ entry.done.Store(true)
+ return
+ }
+}
+
+// exportEvents exports all events from the provided stream, updating the supplied entry on each successful export.
+func (e *DateExporter) exportEvents(ctx context.Context, events stream.Stream[*auditlogpb.ExportEventUnstructured], entry *chunkEntry) error {
+ for events.Next() {
+ if err := e.cfg.Export(ctx, events.Item()); err != nil {
+ events.Done()
+ return trace.Wrap(err)
+ }
+
+ entry.setCursor(events.Item().Cursor)
+ }
+ return trace.Wrap(events.Done())
+}
+
+func (e *DateExporter) withLock(fn func()) {
+ e.mu.Lock()
+ defer e.mu.Unlock()
+ fn()
+}
diff --git a/lib/events/export/date_exporter_test.go b/lib/events/export/date_exporter_test.go
new file mode 100644
index 0000000000000..a6907ade60015
--- /dev/null
+++ b/lib/events/export/date_exporter_test.go
@@ -0,0 +1,452 @@
+/*
+ * Teleport
+ * Copyright (C) 2024 Gravitational, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+
+package export
+
+import (
+ "context"
+ "fmt"
+ "math/rand"
+ "strconv"
+ "sync"
+ "testing"
+ "time"
+
+ "github.com/google/uuid"
+ "github.com/gravitational/trace"
+ "github.com/stretchr/testify/require"
+
+ auditlogpb "github.com/gravitational/teleport/api/gen/proto/go/teleport/auditlog/v1"
+ "github.com/gravitational/teleport/api/internalutils/stream"
+ apievents "github.com/gravitational/teleport/api/types/events"
+ "github.com/gravitational/teleport/lib/events"
+)
+
+// TestDateExporterBasics tests the basic functionality of the date exporter, with and
+// without random flake.
+func TestDateExporterBasics(t *testing.T) {
+ t.Parallel()
+ for _, randomFlake := range []bool{false, true} {
+ t.Run(fmt.Sprintf("randomFlake=%v", randomFlake), func(t *testing.T) {
+ t.Parallel()
+ testDateExporterBasics(t, randomFlake)
+ })
+ }
+}
+
+func testDateExporterBasics(t *testing.T, randomFlake bool) {
+ clt := newFakeClient()
+ clt.setRandomFlake(randomFlake)
+
+ now := time.Now()
+
+ var exportedMu sync.Mutex
+ var exported []*auditlogpb.ExportEventUnstructured
+
+ exportFn := func(ctx context.Context, event *auditlogpb.ExportEventUnstructured) error {
+ exportedMu.Lock()
+ defer exportedMu.Unlock()
+ exported = append(exported, event)
+ return nil
+ }
+
+ getExported := func() []*auditlogpb.ExportEventUnstructured {
+ exportedMu.Lock()
+ defer exportedMu.Unlock()
+ return append([]*auditlogpb.ExportEventUnstructured(nil), exported...)
+ }
+
+ idleCh := make(chan struct{})
+
+ onIdleFn := func(ctx context.Context) {
+ select {
+ case idleCh <- struct{}{}:
+ default:
+ }
+ }
+
+ waitIdle := func(t *testing.T) {
+ // wait for two ticks of idleness (first tick may correspond to a cycle that was finishing
+ // as the new events were being added, second cycle will have a happens-after relationship to
+ // this function being called).
+ timeout := time.After(time.Second * 30)
+ for i := 0; i < 2; i++ {
+ select {
+ case <-idleCh:
+ case <-timeout:
+ require.FailNow(t, "timeout waiting for exporter to become idle")
+ }
+ }
+ }
+
+ exporter, err := NewDateExporter(DateExporterConfig{
+ Client: clt,
+ Date: now,
+ Export: exportFn,
+ OnIdle: onIdleFn,
+ Concurrency: 3,
+ MaxBackoff: time.Millisecond * 600,
+ PollInterval: time.Millisecond * 200,
+ })
+ require.NoError(t, err)
+ defer exporter.Close()
+
+ // empty event set means the exporter should become idle almost
+ // immediately.
+ waitIdle(t)
+ require.Empty(t, getExported())
+
+ var allEvents []*auditlogpb.ExportEventUnstructured
+ var allChunks []string
+ // quickly add a bunch of chunks
+ for i := 0; i < 30; i++ {
+ chunk := makeEventChunk(t, now, 10)
+ allEvents = append(allEvents, chunk...)
+ chunkID := uuid.NewString()
+ allChunks = append(allChunks, chunkID)
+ clt.addChunk(now.Format(time.DateOnly), chunkID, chunk)
+ }
+
+ waitIdle(t)
+
+ require.ElementsMatch(t, allChunks, exporter.GetState().Completed)
+ require.ElementsMatch(t, allEvents, getExported())
+
+ // process a second round of chunks to cover the case of new chunks being added
+ // after non-trivial idleness.
+
+ // note that we do a lot more events here just to make absolutely certain
+ // that we're hitting a decent amout of random flake.
+ for i := 0; i < 30; i++ {
+ chunk := makeEventChunk(t, now, 10)
+ allEvents = append(allEvents, chunk...)
+ chunkID := uuid.NewString()
+ allChunks = append(allChunks, chunkID)
+ clt.addChunk(now.Format(time.DateOnly), chunkID, chunk)
+ }
+
+ waitIdle(t)
+
+ require.ElementsMatch(t, allChunks, exporter.GetState().Completed)
+ require.ElementsMatch(t, allEvents, getExported())
+
+ // close the exporter
+ exporter.Close()
+ timeout := time.After(time.Second * 30)
+ select {
+ case <-exporter.Done():
+ case <-timeout:
+ require.FailNow(t, "timeout waiting for exporter to close")
+ }
+
+ // get the final state of the exporter
+ state := exporter.GetState()
+
+ // recreate exporter with state from previous run
+ exporter, err = NewDateExporter(DateExporterConfig{
+ Client: clt,
+ Date: now,
+ Export: exportFn,
+ OnIdle: onIdleFn,
+ PreviousState: state,
+ Concurrency: 3,
+ MaxBackoff: time.Millisecond * 600,
+ PollInterval: time.Millisecond * 200,
+ })
+ require.NoError(t, err)
+ defer exporter.Close()
+
+ waitIdle(t)
+
+ // no additional events should have been exported
+ require.ElementsMatch(t, allChunks, exporter.GetState().Completed)
+ require.ElementsMatch(t, allEvents, getExported())
+
+ // new chunks should be consumed correctly
+ for i := 0; i < 30; i++ {
+ chunk := makeEventChunk(t, now, 10)
+ allEvents = append(allEvents, chunk...)
+ chunkID := uuid.NewString()
+ allChunks = append(allChunks, chunkID)
+ clt.addChunk(now.Format(time.DateOnly), chunkID, chunk)
+ }
+
+ waitIdle(t)
+
+ require.ElementsMatch(t, allChunks, exporter.GetState().Completed)
+ require.ElementsMatch(t, allEvents, getExported())
+}
+
+// TestDateExporterResume verifies non-trivial exporter resumption behavior, with and without
+// random flake.
+func TestDateExporterResume(t *testing.T) {
+ t.Parallel()
+ for _, randomFlake := range []bool{false, true} {
+ t.Run(fmt.Sprintf("randomFlake=%v", randomFlake), func(t *testing.T) {
+ t.Parallel()
+ testDateExporterResume(t, randomFlake)
+ })
+ }
+}
+
+func testDateExporterResume(t *testing.T, randomFlake bool) {
+ clt := newFakeClient()
+ clt.setRandomFlake(randomFlake)
+
+ now := time.Now()
+
+ // export via unbuffered channel so that we can easily block/unblock export from
+ // the main test routine.
+ exportCH := make(chan *auditlogpb.ExportEventUnstructured)
+
+ exportFn := func(ctx context.Context, event *auditlogpb.ExportEventUnstructured) error {
+ select {
+ case exportCH <- event:
+ case <-ctx.Done():
+ return trace.Wrap(ctx.Err())
+ }
+ return nil
+ }
+
+ idleCh := make(chan struct{})
+
+ onIdleFn := func(ctx context.Context) {
+ select {
+ case idleCh <- struct{}{}:
+ default:
+ }
+ }
+
+ waitIdle := func(t *testing.T) {
+ // wait for two ticks of idleness (first tick may correspond to a cycle that was finishing
+ // as the new events were being added, second cycle will have a happens-after relationship to
+ // this function being called).
+ timeout := time.After(time.Second * 30)
+ for i := 0; i < 2; i++ {
+ select {
+ case <-idleCh:
+ case <-timeout:
+ require.FailNow(t, "timeout waiting for exporter to become idle")
+ }
+ }
+ }
+
+ exporter, err := NewDateExporter(DateExporterConfig{
+ Client: clt,
+ Date: now,
+ Export: exportFn,
+ OnIdle: onIdleFn,
+ Concurrency: 3, /* low concurrency to ensure that we have some in progress chunks */
+ MaxBackoff: time.Millisecond * 600,
+ PollInterval: time.Millisecond * 200,
+ })
+ require.NoError(t, err)
+ defer exporter.Close()
+
+ // empty event set means the exporter should become idle almost
+ // immediately.
+ waitIdle(t)
+
+ var allEvents, gotEvents []*auditlogpb.ExportEventUnstructured
+ // quickly add a bunch of chunks
+ for i := 0; i < 10; i++ {
+ chunk := makeEventChunk(t, now, 10)
+ allEvents = append(allEvents, chunk...)
+ chunkID := uuid.NewString()
+ clt.addChunk(now.Format(time.DateOnly), chunkID, chunk)
+ }
+
+ // consume a large subset of events s.t. we have some completed
+ // chunks, some in progress, and some not yet started (note that
+ // to guarantee some in progress chunks, the number consumed must not
+ // divide evenly by the chunk size).
+ timeout := time.After(time.Second * 30)
+ for i := 0; i < 47; i++ {
+ select {
+ case evt := <-exportCH:
+ gotEvents = append(gotEvents, evt)
+ case <-timeout:
+ require.FailNowf(t, "timeout waiting for event", "iteration=%d", i)
+ }
+ }
+
+ // close the exporter and wait for it to finish so that
+ // we can get the correct final state.
+ exporter.Close()
+ select {
+ case <-exporter.Done():
+ case <-time.After(time.Second * 30):
+ require.FailNow(t, "timeout waiting for exporter to close")
+ }
+
+ // get the final state of the exporter
+ state := exporter.GetState()
+
+ fmt.Printf("cursors=%+v\n", state.Cursors)
+
+ // recreate exporter with state from previous run
+ exporter, err = NewDateExporter(DateExporterConfig{
+ Client: clt,
+ Date: now,
+ Export: exportFn,
+ OnIdle: onIdleFn,
+ PreviousState: state,
+ Concurrency: 3,
+ MaxBackoff: time.Millisecond * 600,
+ PollInterval: time.Millisecond * 200,
+ })
+ require.NoError(t, err)
+ defer exporter.Close()
+
+ // consume remaining events
+ for i := 0; i < 53; i++ {
+ select {
+ case evt := <-exportCH:
+ gotEvents = append(gotEvents, evt)
+ case <-timeout:
+ require.FailNowf(t, "timeout waiting for event", "iteration=%d", i)
+ }
+ }
+ require.ElementsMatch(t, allEvents, gotEvents)
+
+ // ensure that exporter becomes idle
+ waitIdle(t)
+}
+
+func makeEventChunk(t *testing.T, ts time.Time, n int) []*auditlogpb.ExportEventUnstructured {
+ var chunk []*auditlogpb.ExportEventUnstructured
+ for i := 0; i < n; i++ {
+ baseEvent := apievents.UserLogin{
+ Method: events.LoginMethodSAML,
+ Status: apievents.Status{Success: true},
+ UserMetadata: apievents.UserMetadata{User: "alice@example.com"},
+ Metadata: apievents.Metadata{
+ ID: uuid.NewString(),
+ Type: events.UserLoginEvent,
+ Time: ts.Add(time.Duration(i)),
+ },
+ }
+
+ event, err := apievents.ToUnstructured(&baseEvent)
+ require.NoError(t, err)
+ chunk = append(chunk, &auditlogpb.ExportEventUnstructured{
+ Event: event,
+ Cursor: strconv.Itoa(i + 1),
+ })
+ }
+
+ return chunk
+}
+
+type fakeClient struct {
+ mu sync.Mutex
+ data map[string]map[string][]*auditlogpb.ExportEventUnstructured
+ randomFlake bool
+}
+
+func newFakeClient() *fakeClient {
+ return &fakeClient{
+ data: make(map[string]map[string][]*auditlogpb.ExportEventUnstructured),
+ }
+}
+
+func (c *fakeClient) setRandomFlake(flake bool) {
+ c.mu.Lock()
+ defer c.mu.Unlock()
+ c.randomFlake = flake
+}
+
+func (c *fakeClient) addChunk(date string, chunk string, events []*auditlogpb.ExportEventUnstructured) {
+ c.mu.Lock()
+ defer c.mu.Unlock()
+ if _, ok := c.data[date]; !ok {
+ c.data[date] = make(map[string][]*auditlogpb.ExportEventUnstructured)
+ }
+ c.data[date][chunk] = events
+}
+
+func (c *fakeClient) ExportUnstructuredEvents(ctx context.Context, req *auditlogpb.ExportUnstructuredEventsRequest) stream.Stream[*auditlogpb.ExportEventUnstructured] {
+ c.mu.Lock()
+ defer c.mu.Unlock()
+ chunks, ok := c.data[req.Date.AsTime().Format(time.DateOnly)]
+ if !ok {
+ return stream.Fail[*auditlogpb.ExportEventUnstructured](trace.NotFound("date not found"))
+ }
+
+ chunk, ok := chunks[req.Chunk]
+ if !ok {
+ return stream.Fail[*auditlogpb.ExportEventUnstructured](trace.NotFound("chunk not found"))
+ }
+
+ var cursor int
+ if req.Cursor != "" {
+ var err error
+ cursor, err = strconv.Atoi(req.Cursor)
+ if err != nil {
+ return stream.Fail[*auditlogpb.ExportEventUnstructured](trace.BadParameter("invalid cursor %q", req.Cursor))
+ }
+ }
+
+ chunk = chunk[cursor:]
+
+ // randomly truncate the chunk and append an error to simulate flake. we target a 33% failure rate
+ // since event export is more frequent than chunk listing.
+ var fail bool
+ if c.randomFlake && rand.Int()%3 == 0 {
+ chunk = chunk[:rand.Intn(len(chunk))]
+ fail = true
+ }
+
+ return stream.MapErr(stream.Slice(chunk), func(err error) error {
+ if fail {
+ return trace.NotFound("export failed as random test condition")
+ }
+ return err
+ })
+}
+
+func (c *fakeClient) GetEventExportChunks(ctx context.Context, req *auditlogpb.GetEventExportChunksRequest) stream.Stream[*auditlogpb.EventExportChunk] {
+ c.mu.Lock()
+ defer c.mu.Unlock()
+ chunks, ok := c.data[req.Date.AsTime().Format(time.DateOnly)]
+ if !ok {
+ return stream.Empty[*auditlogpb.EventExportChunk]()
+ }
+
+ var eec []*auditlogpb.EventExportChunk
+ for name := range chunks {
+ eec = append(eec, &auditlogpb.EventExportChunk{
+ Chunk: name,
+ })
+ }
+
+ // randomly truncate the chunk list and append an error to simulate flake. we target a 50% failure rate
+ // since chunk listing is less frequent than event export.
+ var fail bool
+ if c.randomFlake && rand.Int()%2 == 0 {
+ eec = eec[:rand.Intn(len(eec))]
+ fail = true
+ }
+
+ return stream.MapErr(stream.Slice(eec), func(err error) error {
+ if fail {
+ return trace.NotFound("chunks failed as random test condition")
+ }
+ return err
+ })
+}
diff --git a/lib/events/test/suite.go b/lib/events/test/suite.go
index f753170b9453b..11b4cb2c1d26b 100644
--- a/lib/events/test/suite.go
+++ b/lib/events/test/suite.go
@@ -24,6 +24,7 @@ import (
"io"
"os"
"slices"
+ "sync/atomic"
"testing"
"time"
@@ -37,6 +38,7 @@ import (
"github.com/gravitational/teleport/api/types"
apievents "github.com/gravitational/teleport/api/types/events"
"github.com/gravitational/teleport/lib/events"
+ "github.com/gravitational/teleport/lib/events/export"
"github.com/gravitational/teleport/lib/fixtures"
"github.com/gravitational/teleport/lib/session"
)
@@ -208,6 +210,37 @@ func (s *EventsSuite) EventExport(t *testing.T) {
require.False(t, chunks.Next())
require.NoError(t, chunks.Done())
+
+ // as a sanity check, try pulling events using the exporter helper (should be
+ // equivalent to the above behavior)
+ var exportedEvents atomic.Uint64
+ var exporter *export.DateExporter
+ var err error
+ exporter, err = export.NewDateExporter(export.DateExporterConfig{
+ Client: s.Log,
+ Date: baseTime,
+ Export: func(ctx context.Context, event *auditlogpb.ExportEventUnstructured) error {
+ exportedEvents.Add(1)
+ return nil
+ },
+ OnIdle: func(ctx context.Context) {
+ // only exporting extant events, so we can close as soon as we're caught up.
+ exporter.Close()
+ },
+ Concurrency: 3,
+ MaxBackoff: time.Millisecond * 600,
+ PollInterval: time.Millisecond * 200,
+ })
+ require.NoError(t, err)
+ defer exporter.Close()
+
+ select {
+ case <-exporter.Done():
+ case <-time.After(30 * time.Second):
+ require.FailNow(t, "timeout waiting for exporter to finish")
+ }
+
+ require.Equal(t, uint64(8), exportedEvents.Load())
}
// EventPagination covers event search pagination.