diff --git a/input/otlp/logs_test.go b/input/otlp/logs_test.go index 8c9fed67..16fb40b7 100644 --- a/input/otlp/logs_test.go +++ b/input/otlp/logs_test.go @@ -38,6 +38,7 @@ import ( "context" "fmt" "strings" + "sync" "testing" "time" @@ -227,8 +228,15 @@ func TestConsumeLogsSemaphore(t *testing.T) { logs := plog.NewLogs() var batches []*modelpb.Batch + var once sync.Once + semAcquiredCh := make(chan struct{}) doneCh := make(chan struct{}) + recorder := modelpb.ProcessBatchFunc(func(ctx context.Context, batch *modelpb.Batch) error { + // Ensure channel is only closed the first time + once.Do(func() { + close(semAcquiredCh) + }) <-doneCh batchCopy := batch.Clone() batches = append(batches, &batchCopy) @@ -239,20 +247,25 @@ func TestConsumeLogsSemaphore(t *testing.T) { Semaphore: semaphore.NewWeighted(1), }) - startCh := make(chan struct{}) go func() { - close(startCh) + // 1. Acquires the sem lock _, err := consumer.ConsumeLogsWithResult(context.Background(), logs) assert.NoError(t, err) }() - <-startCh + // Wait until (1) has properly started. + <-semAcquiredCh + + // 2. Cannot acquire the lock held by (1). Returns expected error. ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond) defer cancel() _, err := consumer.ConsumeLogsWithResult(ctx, logs) assert.Equal(t, err.Error(), "context deadline exceeded") + + // 3. Release the sem from (1) by finishing ProcessBatchFunc. close(doneCh) + // 4. Acquires the lock to ensure is was properly released. _, err = consumer.ConsumeLogsWithResult(context.Background(), logs) assert.NoError(t, err) } diff --git a/input/otlp/metrics_test.go b/input/otlp/metrics_test.go index e6b39959..8c5002bf 100644 --- a/input/otlp/metrics_test.go +++ b/input/otlp/metrics_test.go @@ -40,6 +40,7 @@ import ( "math" "sort" "strings" + "sync" "testing" "time" @@ -229,8 +230,15 @@ func TestConsumeMetricsSemaphore(t *testing.T) { metrics := pmetric.NewMetrics() var batches []*modelpb.Batch + var once sync.Once + semAcquiredCh := make(chan struct{}) doneCh := make(chan struct{}) + recorder := modelpb.ProcessBatchFunc(func(ctx context.Context, batch *modelpb.Batch) error { + // Ensure channel is only closed the first time + once.Do(func() { + close(semAcquiredCh) + }) <-doneCh batchCopy := batch.Clone() batches = append(batches, &batchCopy) @@ -241,20 +249,25 @@ func TestConsumeMetricsSemaphore(t *testing.T) { Semaphore: semaphore.NewWeighted(1), }) - startCh := make(chan struct{}) go func() { - close(startCh) + // 1. Acquires the sem lock _, err := consumer.ConsumeMetricsWithResult(context.Background(), metrics) assert.NoError(t, err) }() - <-startCh + // Wait until (1) has properly started. + <-semAcquiredCh + + // 2. Cannot acquire the lock held by (1). Returns expected error. ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond) defer cancel() _, err := consumer.ConsumeMetricsWithResult(ctx, metrics) assert.Equal(t, err.Error(), "context deadline exceeded") + + // 3. Release the sem from (1) by finishing ProcessBatchFunc. close(doneCh) + // 4. Acquires the lock to ensure is was properly released. _, err = consumer.ConsumeMetricsWithResult(context.Background(), metrics) assert.NoError(t, err) } diff --git a/input/otlp/traces_test.go b/input/otlp/traces_test.go index 3623672e..75164f21 100644 --- a/input/otlp/traces_test.go +++ b/input/otlp/traces_test.go @@ -41,6 +41,7 @@ import ( "os" "path/filepath" "strings" + "sync" "testing" "time" @@ -1168,8 +1169,15 @@ func TestConsumeTracesSemaphore(t *testing.T) { traces := ptrace.NewTraces() var batches []*modelpb.Batch + var once sync.Once + semAcquiredCh := make(chan struct{}) doneCh := make(chan struct{}) + recorder := modelpb.ProcessBatchFunc(func(ctx context.Context, batch *modelpb.Batch) error { + // Ensure channel is only closed the first time + once.Do(func() { + close(semAcquiredCh) + }) <-doneCh batchCopy := batch.Clone() batches = append(batches, &batchCopy) @@ -1180,20 +1188,25 @@ func TestConsumeTracesSemaphore(t *testing.T) { Semaphore: semaphore.NewWeighted(1), }) - startCh := make(chan struct{}) go func() { - close(startCh) + // 1. Acquires the sem lock _, err := consumer.ConsumeTracesWithResult(context.Background(), traces) assert.NoError(t, err) }() - <-startCh + // Wait until (1) has properly started. + <-semAcquiredCh + + // 2. Cannot acquire the lock held by (1). Returns expected error. ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond) defer cancel() _, err := consumer.ConsumeTracesWithResult(ctx, traces) assert.Equal(t, err.Error(), "context deadline exceeded") + + // 3. Release the sem from (1) by finishing ProcessBatchFunc. close(doneCh) + // 4. Acquires the lock to ensure is was properly released. _, err = consumer.ConsumeTracesWithResult(context.Background(), traces) assert.NoError(t, err) }