Skip to content
This repository has been archived by the owner on Jun 4, 2024. It is now read-only.

Commit

Permalink
[Buddy] fix leaking go-routines in event-handler watcher (#966)
Browse files Browse the repository at this point in the history
* fix leaking go-routines in event-handler watcher

Signed-off-by: joerger <[email protected]>

* Edits.

* Update test.

* Fix context import.

* Fix lint and race condition in test.

---------

Signed-off-by: joerger <[email protected]>
Co-authored-by: miguelvalerio <[email protected]>
  • Loading branch information
Joerger and miguelvalerio authored Nov 28, 2023
1 parent d0e372b commit 2b8d5ee
Show file tree
Hide file tree
Showing 3 changed files with 290 additions and 54 deletions.
52 changes: 43 additions & 9 deletions event-handler/teleport_event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

auditlogpb "github.com/gravitational/teleport/api/gen/proto/go/teleport/auditlog/v1"
"github.com/gravitational/teleport/api/types/events"
"github.com/gravitational/trace"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/types/known/structpb"
Expand All @@ -39,7 +40,10 @@ func TestNew(t *testing.T) {
},
}

event, err := NewTeleportEvent(eventToJSON(t, events.AuditEvent(e)), "cursor")
protoEvent, err := eventToProto(events.AuditEvent(e))
require.NoError(t, err)

event, err := NewTeleportEvent(protoEvent, "cursor")
require.NoError(t, err)
assert.Equal(t, "test", event.ID)
assert.Equal(t, "mock", event.Type)
Expand All @@ -49,7 +53,10 @@ func TestNew(t *testing.T) {
func TestGenID(t *testing.T) {
e := &events.SessionPrint{}

event, err := NewTeleportEvent(eventToJSON(t, events.AuditEvent(e)), "cursor")
protoEvent, err := eventToProto(events.AuditEvent(e))
require.NoError(t, err)

event, err := NewTeleportEvent(protoEvent, "cursor")
require.NoError(t, err)
assert.NotEmpty(t, event.ID)
}
Expand All @@ -64,7 +71,10 @@ func TestSessionEnd(t *testing.T) {
},
}

event, err := NewTeleportEvent(eventToJSON(t, events.AuditEvent(e)), "cursor")
protoEvent, err := eventToProto(events.AuditEvent(e))
require.NoError(t, err)

event, err := NewTeleportEvent(protoEvent, "cursor")
require.NoError(t, err)
assert.NotEmpty(t, event.ID)
assert.NotEmpty(t, event.SessionID)
Expand All @@ -81,7 +91,10 @@ func TestFailedLogin(t *testing.T) {
},
}

event, err := NewTeleportEvent(eventToJSON(t, events.AuditEvent(e)), "cursor")
protoEvent, err := eventToProto(events.AuditEvent(e))
require.NoError(t, err)

event, err := NewTeleportEvent(protoEvent, "cursor")
require.NoError(t, err)
assert.NotEmpty(t, event.ID)
assert.True(t, event.IsFailedLogin)
Expand All @@ -97,28 +110,49 @@ func TestSuccessLogin(t *testing.T) {
},
}

event, err := NewTeleportEvent(eventToJSON(t, events.AuditEvent(e)), "cursor")
protoEvent, err := eventToProto(events.AuditEvent(e))
require.NoError(t, err)

event, err := NewTeleportEvent(protoEvent, "cursor")
require.NoError(t, err)
assert.NotEmpty(t, event.ID)
assert.False(t, event.IsFailedLogin)
}

func eventToJSON(t *testing.T, e events.AuditEvent) *auditlogpb.EventUnstructured {
func eventToProto(e events.AuditEvent) (*auditlogpb.EventUnstructured, error) {
data, err := lib.FastMarshal(e)
require.NoError(t, err)
if err != nil {
return nil, trace.Wrap(err)
}

str := &structpb.Struct{}
err = str.UnmarshalJSON(data)
require.NoError(t, err)
if err = str.UnmarshalJSON(data); err != nil {
return nil, trace.Wrap(err)
}

id := e.GetID()
if id == "" {
hash := sha256.Sum256(data)
id = hex.EncodeToString(hash[:])
}

return &auditlogpb.EventUnstructured{
Type: e.GetType(),
Unstructured: str,
Id: id,
Index: e.GetIndex(),
Time: timestamppb.New(e.GetTime()),
}, nil
}

func eventsToProto(events []events.AuditEvent) ([]*auditlogpb.EventUnstructured, error) {
protoEvents := make([]*auditlogpb.EventUnstructured, len(events))
for i, event := range events {
protoEvent, err := eventToProto(event)
if err != nil {
return nil, trace.Wrap(err)
}
protoEvents[i] = protoEvent
}
return protoEvents, nil
}
4 changes: 2 additions & 2 deletions event-handler/teleport_events_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package main

import (
"context"
"fmt"
"time"

Expand All @@ -30,7 +31,6 @@ import (
"github.com/gravitational/teleport/integrations/lib/logger"
"github.com/gravitational/trace"
log "github.com/sirupsen/logrus"
"golang.org/x/net/context"
)

const (
Expand Down Expand Up @@ -293,7 +293,7 @@ func (t *TeleportEventsWatcher) Events(ctx context.Context) (chan *TeleportEvent
err := t.fetch(ctx)
if err != nil {
e <- trace.Wrap(err)
continue
break
}

// If there is still nothing new on current page, sleep
Expand Down
Loading

0 comments on commit 2b8d5ee

Please sign in to comment.