Skip to content

Commit

Permalink
Double up the athena tests that hit AWS
Browse files Browse the repository at this point in the history
  • Loading branch information
espadolini authored and github-actions committed Jun 4, 2024
1 parent 20d5cca commit 92849d3
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 12 deletions.
55 changes: 51 additions & 4 deletions lib/events/athena/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,20 @@ import (
)

func TestIntegrationAthenaSearchSessionEventsBySessionID(t *testing.T) {
t.Run("sns", func(t *testing.T) {
const bypassSNSFalse = false
testIntegrationAthenaSearchSessionEventsBySessionID(t, bypassSNSFalse)
})
t.Run("sqs", func(t *testing.T) {
const bypassSNSTrue = true
testIntegrationAthenaSearchSessionEventsBySessionID(t, bypassSNSTrue)
})
}

func testIntegrationAthenaSearchSessionEventsBySessionID(t *testing.T, bypassSNS bool) {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
defer cancel()
ac := SetupAthenaContext(t, ctx, AthenaContextConfig{})
ac := SetupAthenaContext(t, ctx, AthenaContextConfig{BypassSNS: bypassSNS})
auditLogger := &EventuallyConsistentAuditLogger{
Inner: ac.log,
// Additional 5s is used to compensate for uploading parquet on s3.
Expand All @@ -55,9 +66,20 @@ func TestIntegrationAthenaSearchSessionEventsBySessionID(t *testing.T) {
}

func TestIntegrationAthenaSessionEventsCRUD(t *testing.T) {
t.Run("sns", func(t *testing.T) {
const bypassSNSFalse = false
testIntegrationAthenaSessionEventsCRUD(t, bypassSNSFalse)
})
t.Run("sqs", func(t *testing.T) {
const bypassSNSTrue = true
testIntegrationAthenaSessionEventsCRUD(t, bypassSNSTrue)
})
}

func testIntegrationAthenaSessionEventsCRUD(t *testing.T, bypassSNS bool) {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
defer cancel()
ac := SetupAthenaContext(t, ctx, AthenaContextConfig{})
ac := SetupAthenaContext(t, ctx, AthenaContextConfig{BypassSNS: bypassSNS})
auditLogger := &EventuallyConsistentAuditLogger{
Inner: ac.log,
// Additional 5s is used to compensate for uploading parquet on s3.
Expand All @@ -72,9 +94,20 @@ func TestIntegrationAthenaSessionEventsCRUD(t *testing.T) {
}

func TestIntegrationAthenaEventPagination(t *testing.T) {
t.Run("sns", func(t *testing.T) {
const bypassSNSFalse = false
testIntegrationAthenaEventPagination(t, bypassSNSFalse)
})
t.Run("sqs", func(t *testing.T) {
const bypassSNSTrue = true
testIntegrationAthenaEventPagination(t, bypassSNSTrue)
})
}

func testIntegrationAthenaEventPagination(t *testing.T, bypassSNS bool) {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
defer cancel()
ac := SetupAthenaContext(t, ctx, AthenaContextConfig{})
ac := SetupAthenaContext(t, ctx, AthenaContextConfig{BypassSNS: bypassSNS})
auditLogger := &EventuallyConsistentAuditLogger{
Inner: ac.log,
// Additional 5s is used to compensate for uploading parquet on s3.
Expand All @@ -89,10 +122,24 @@ func TestIntegrationAthenaEventPagination(t *testing.T) {
}

func TestIntegrationAthenaLargeEvents(t *testing.T) {
t.Run("sns", func(t *testing.T) {
const bypassSNSFalse = false
testIntegrationAthenaLargeEvents(t, bypassSNSFalse)
})
t.Run("sqs", func(t *testing.T) {
const bypassSNSTrue = true
testIntegrationAthenaLargeEvents(t, bypassSNSTrue)
})
}

func testIntegrationAthenaLargeEvents(t *testing.T, bypassSNS bool) {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
defer cancel()

ac := SetupAthenaContext(t, ctx, AthenaContextConfig{MaxBatchSize: 1})
ac := SetupAthenaContext(t, ctx, AthenaContextConfig{
MaxBatchSize: 1,
BypassSNS: bypassSNS,
})
in := &apievents.SessionStart{
Metadata: apievents.Metadata{
Index: 2,
Expand Down
10 changes: 2 additions & 8 deletions lib/events/athena/test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ func (a *AthenaContext) GetLog() *Log {
// AthenaContextConfig is optional config to override defaults in athena context.
type AthenaContextConfig struct {
MaxBatchSize int
BypassSNS bool
}

type InfraOutputs struct {
Expand Down Expand Up @@ -169,13 +170,6 @@ func SetupAthenaContext(t *testing.T, ctx context.Context, cfg AthenaContextConf
if ok, _ := strconv.ParseBool(testEnabled); !ok {
t.Skip("Skipping AWS-dependent test suite.")
}
var bypassSNS bool
if s := os.Getenv(teleport.AWSRunTests + "_ATHENA_BYPASS_SNS"); s != "" {
if ok, _ := strconv.ParseBool(s); ok {
t.Log("bypassing SNS for Athena audit log events")
bypassSNS = true
}
}

testID := fmt.Sprintf("auditlogs-integrationtests-%v", uuid.New().String())

Expand Down Expand Up @@ -208,7 +202,7 @@ func SetupAthenaContext(t *testing.T, ctx context.Context, cfg AthenaContextConf
}

topicARN := infraOut.TopicARN
if bypassSNS {
if cfg.BypassSNS {
topicARN = topicARNBypass
}
log, err := New(ctx, Config{
Expand Down

0 comments on commit 92849d3

Please sign in to comment.