Skip to content
This repository has been archived by the owner on Jun 4, 2024. It is now read-only.

Commit

Permalink
Update test.
Browse files Browse the repository at this point in the history
  • Loading branch information
Joerger committed Nov 3, 2023
1 parent 9d616c2 commit 4319631
Showing 1 changed file with 21 additions and 35 deletions.
56 changes: 21 additions & 35 deletions event-handler/teleport_events_watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ limitations under the License.
package main

import (
"errors"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -133,74 +131,62 @@ func TestNext(t *testing.T) {
}
}

// errMockTeleportEventWatcher is Teleport client mock that returns an error after the first SearchUnstructuredEvents
// errMockTeleportEventWatcher is Teleport client mock that returns an error when no events are left.
type errMockTeleportEventWatcher struct {
mockTeleportEventWatcher
searchUnstructuredEventsCalled bool
err error
}

func (c *errMockTeleportEventWatcher) SearchUnstructuredEvents(ctx context.Context, fromUTC, toUTC time.Time, namespace string, eventTypes []string, limit int, order types.EventOrder, startKey string) ([]*auditlogpb.EventUnstructured, string, error) {
if c.searchUnstructuredEventsCalled {
return nil, "", errors.New("error")
if len(c.events) == 0 {
return nil, "", c.err
}
defer func() { c.searchUnstructuredEventsCalled = true }()

return c.mockTeleportEventWatcher.SearchUnstructuredEvents(ctx, fromUTC, toUTC, namespace, eventTypes, limit, order, startKey)
events, _, err := c.mockTeleportEventWatcher.SearchUnstructuredEvents(ctx, fromUTC, toUTC, namespace, eventTypes, limit, order, startKey)

// return empty cursor to force event client.Events to try to update the current page.
return events, "", err
}

func TestLastEvent(t *testing.T) {
func TestUpdatePage(t *testing.T) {
t.Run("should not leave hanging go-routines", func(t *testing.T) {
const mockEventID = "1"
mockErr := trace.Errorf("error")
e := []events.AuditEvent{
&events.UserCreate{
Metadata: events.Metadata{
ID: mockEventID,
ID: "",
},
},
}

mockEventWatcher := &errMockTeleportEventWatcher{mockTeleportEventWatcher: mockTeleportEventWatcher{e}}
mockEventWatcher := &errMockTeleportEventWatcher{err: mockErr, mockTeleportEventWatcher: mockTeleportEventWatcher{e}}
client := newTeleportEventWatcher(t, mockEventWatcher, true)

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
t.Cleanup(cancel)

chEvt, chErr := client.Events(ctx)

select {
case err := <-chErr:
t.Fatalf("received unexpected error from error channel: %v", err)
case e := <-chEvt:
require.NotNil(t, e.Event)
require.Equal(t, mockEventID, e.ID)
case <-time.After(time.Second):
t.Fatalf("No events received withing one second")
}

var wg sync.WaitGroup

const numGoroutines = 5
for i := 0; i < numGoroutines; i++ {
wg.Add(1)
go func() {
defer wg.Done()
chEvt, _ := client.Events(ctx)
// consume events.
for range chEvt {
}
}()
select {
case err := <-chErr:
require.ErrorIs(t, mockErr, err)
case <-time.After(time.Second):
t.Fatalf("No events received withing one second")
}

goroutinesDone := make(chan struct{})
go func() {
wg.Wait()
close(goroutinesDone)
}()

select {
case <-goroutinesDone:
case <-ctx.Done():
require.Fail(t, "timeout reached, some goroutines were not closed")
case <-chEvt:
// channel should be closed after error to update page.
case <-time.After(time.Second):
t.Fatalf("No events received withing one second")
}
})
}
Expand Down

0 comments on commit 4319631

Please sign in to comment.