Skip to content

Commit

Permalink
fix: flaky semaphore test for logs, metric, and traces
Browse files Browse the repository at this point in the history
  • Loading branch information
rubvs committed Nov 1, 2024
1 parent 46f8e38 commit 5be0588
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 9 deletions.
19 changes: 16 additions & 3 deletions input/otlp/logs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"context"
"fmt"
"strings"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -227,8 +228,15 @@ func TestConsumeLogsSemaphore(t *testing.T) {
logs := plog.NewLogs()
var batches []*modelpb.Batch

var once sync.Once
semAcquiredCh := make(chan struct{})
doneCh := make(chan struct{})

recorder := modelpb.ProcessBatchFunc(func(ctx context.Context, batch *modelpb.Batch) error {
// Ensure channel is only closed the first time
once.Do(func() {
close(semAcquiredCh)
})
<-doneCh
batchCopy := batch.Clone()
batches = append(batches, &batchCopy)
Expand All @@ -239,20 +247,25 @@ func TestConsumeLogsSemaphore(t *testing.T) {
Semaphore: semaphore.NewWeighted(1),
})

startCh := make(chan struct{})
go func() {
close(startCh)
// 1. Acquires the sem lock
_, err := consumer.ConsumeLogsWithResult(context.Background(), logs)
assert.NoError(t, err)
}()

<-startCh
// Wait until (1) has properly started.
<-semAcquiredCh

// 2. Cannot acquire the lock held by (1). Returns expected error.
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
defer cancel()
_, err := consumer.ConsumeLogsWithResult(ctx, logs)
assert.Equal(t, err.Error(), "context deadline exceeded")

// 3. Release the sem from (1) by finishing ProcessBatchFunc.
close(doneCh)

// 4. Acquires the lock to ensure is was properly released.
_, err = consumer.ConsumeLogsWithResult(context.Background(), logs)
assert.NoError(t, err)
}
Expand Down
19 changes: 16 additions & 3 deletions input/otlp/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"math"
"sort"
"strings"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -229,8 +230,15 @@ func TestConsumeMetricsSemaphore(t *testing.T) {
metrics := pmetric.NewMetrics()
var batches []*modelpb.Batch

var once sync.Once
semAcquiredCh := make(chan struct{})
doneCh := make(chan struct{})

recorder := modelpb.ProcessBatchFunc(func(ctx context.Context, batch *modelpb.Batch) error {
// Ensure channel is only closed the first time
once.Do(func() {
close(semAcquiredCh)
})
<-doneCh
batchCopy := batch.Clone()
batches = append(batches, &batchCopy)
Expand All @@ -241,20 +249,25 @@ func TestConsumeMetricsSemaphore(t *testing.T) {
Semaphore: semaphore.NewWeighted(1),
})

startCh := make(chan struct{})
go func() {
close(startCh)
// 1. Acquires the sem lock
_, err := consumer.ConsumeMetricsWithResult(context.Background(), metrics)
assert.NoError(t, err)
}()

<-startCh
// Wait until (1) has properly started.
<-semAcquiredCh

// 2. Cannot acquire the lock held by (1). Returns expected error.
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
defer cancel()
_, err := consumer.ConsumeMetricsWithResult(ctx, metrics)
assert.Equal(t, err.Error(), "context deadline exceeded")

// 3. Release the sem from (1) by finishing ProcessBatchFunc.
close(doneCh)

// 4. Acquires the lock to ensure is was properly released.
_, err = consumer.ConsumeMetricsWithResult(context.Background(), metrics)
assert.NoError(t, err)
}
Expand Down
19 changes: 16 additions & 3 deletions input/otlp/traces_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"os"
"path/filepath"
"strings"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -1168,8 +1169,15 @@ func TestConsumeTracesSemaphore(t *testing.T) {
traces := ptrace.NewTraces()
var batches []*modelpb.Batch

var once sync.Once
semAcquiredCh := make(chan struct{})
doneCh := make(chan struct{})

recorder := modelpb.ProcessBatchFunc(func(ctx context.Context, batch *modelpb.Batch) error {
// Ensure channel is only closed the first time
once.Do(func() {
close(semAcquiredCh)
})
<-doneCh
batchCopy := batch.Clone()
batches = append(batches, &batchCopy)
Expand All @@ -1180,20 +1188,25 @@ func TestConsumeTracesSemaphore(t *testing.T) {
Semaphore: semaphore.NewWeighted(1),
})

startCh := make(chan struct{})
go func() {
close(startCh)
// 1. Acquires the sem lock
_, err := consumer.ConsumeTracesWithResult(context.Background(), traces)
assert.NoError(t, err)
}()

<-startCh
// Wait until (1) has properly started.
<-semAcquiredCh

// 2. Cannot acquire the lock held by (1). Returns expected error.
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
defer cancel()
_, err := consumer.ConsumeTracesWithResult(ctx, traces)
assert.Equal(t, err.Error(), "context deadline exceeded")

// 3. Release the sem from (1) by finishing ProcessBatchFunc.
close(doneCh)

// 4. Acquires the lock to ensure is was properly released.
_, err = consumer.ConsumeTracesWithResult(context.Background(), traces)
assert.NoError(t, err)
}
Expand Down

0 comments on commit 5be0588

Please sign in to comment.