Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Flaky Semaphore Test #391

Open
wants to merge 2 commits into
base: main
Choose a base branch
from

Conversation

rubvs
Copy link
Contributor

@rubvs rubvs commented Nov 4, 2024

Closes #361

Context

Testing apm-data with -race causes a deadlock, since the semaphore is not properly released.

The TestConsumeLogsSemaphore basically consists of 3 parts:

  1. Spin up a goroutine that runs a consumer in parallel with the main routine. The purpose of this child routine is to acquire the semaphore until close(doneCh).
  2. The main routine spins up its own consumer, which after attempting for some time to acquire the semaphore, will return with an expected error.
  3. After the expected error, finish running the parallel routine in (1) to release the semaphore. Main runs a last consumer to ensure it can properly acquire the released lock.

Solution

Removing the following lines from function semAcquire seems to fix the issue.

sp, ctx := apm.StartSpan(ctx, "Semaphore.Acquire", "Reporter")
defer sp.End()

Initially, I thought the problem was propagating an inappropriate context, but passing a background context and removing the context shadowing, didn't solve the problem.

The only reasonable explanation, therefore, is the span introduces enough of a lag such that Goroutine 1 does not acquire the semaphore first, before Main starts competing for it, leading to a deadlock.

The deadlock is solved by "upgrading" the startCh to sync the parallel running goroutine with main, not when it starts, but when it has fully acquired the semaphore. This is the purpose of semAcquiredCh.

However, since the processor is executed multiple times, for the context of this test we only want to sync the first acquisition. Hence, the use of sync.Once.

Test

go clean -testcache
go test -race -v -failfast ./input/otlp

@rubvs rubvs requested a review from a team as a code owner November 4, 2024 19:16
@elastic-observability-automation elastic-observability-automation bot added the safe-to-test Changes are safe to run in the CI label Nov 4, 2024

var once sync.Once
Copy link
Member

@1pkg 1pkg Nov 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think, we don't really need this complicated concurrency primitive structure for signalling here and we can simplify the test. The same test can be accomplished with a single buffered channel without any additional primitives, see the example below.

func TestConsumeLogsSemaphore(t *testing.T) {
	logs := plog.NewLogs()
	var batches []*modelpb.Batch

	doneCh := make(chan struct{})
	recorder := modelpb.ProcessBatchFunc(func(ctx context.Context, batch *modelpb.Batch) error {
		doneCh <- struct{}{}
		doneCh <- struct{}{}
		batchCopy := batch.Clone()
		batches = append(batches, &batchCopy)
		return nil
	})
	consumer := otlp.NewConsumer(otlp.ConsumerConfig{
		Processor: recorder,
		Semaphore: semaphore.NewWeighted(1),
	})

	go func() {
		_, err := consumer.ConsumeLogsWithResult(context.Background(), logs)
		assert.NoError(t, err)
	}()

	<-doneCh
	ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
	defer cancel()
	_, err := consumer.ConsumeLogsWithResult(ctx, logs)
	assert.Equal(t, err.Error(), "context deadline exceeded")
	<-doneCh

	// turn channel into sink
	doneCh = make(chan struct{}, 2)
	_, err = consumer.ConsumeLogsWithResult(context.Background(), logs)
	assert.NoError(t, err)
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
safe-to-test Changes are safe to run in the CI
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Flaky semaphore test
2 participants