From 57995c94f1a293b21cccf481ec97e5165ebb702f Mon Sep 17 00:00:00 2001 From: sivchari Date: Thu, 18 Jul 2024 18:46:00 +0900 Subject: [PATCH 1/6] fix: race code Signed-off-by: sivchari --- .../inmemory/pkg/runtime/cache/sync.go | 13 +++---------- 1 file changed, 3 insertions(+), 10 deletions(-) diff --git a/test/infrastructure/inmemory/pkg/runtime/cache/sync.go b/test/infrastructure/inmemory/pkg/runtime/cache/sync.go index 62cea6355ed7..e731cff1b7e4 100644 --- a/test/infrastructure/inmemory/pkg/runtime/cache/sync.go +++ b/test/infrastructure/inmemory/pkg/runtime/cache/sync.go @@ -50,10 +50,10 @@ func (c *cache) startSyncer(ctx context.Context) error { c.syncQueue.ShutDown() }() - syncLoopStarted := false + syncLoopStarted := make(chan struct{}) go func() { log.Info("Starting sync loop") - syncLoopStarted = true + syncLoopStarted <- struct{}{} for { select { case <-time.After(c.syncPeriod / 4): @@ -81,14 +81,7 @@ func (c *cache) startSyncer(ctx context.Context) error { wg.Wait() }() - if err := wait.PollUntilContextTimeout(ctx, 50*time.Millisecond, 5*time.Second, false, func(context.Context) (done bool, err error) { - if !syncLoopStarted { - return false, nil - } - return true, nil - }); err != nil { - return fmt.Errorf("failed to start sync loop: %v", err) - } + <-syncLoopStarted if err := wait.PollUntilContextTimeout(ctx, 50*time.Millisecond, 5*time.Second, false, func(context.Context) (done bool, err error) { if atomic.LoadInt64(&workers) < int64(c.syncConcurrency) { From 1b505e3f251181274e7adcfe1b754bbc0d4c538d Mon Sep 17 00:00:00 2001 From: sivchari Date: Tue, 20 Aug 2024 09:07:14 +0900 Subject: [PATCH 2/6] remove sync variable Signed-off-by: sivchari --- test/infrastructure/inmemory/pkg/runtime/cache/sync.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/test/infrastructure/inmemory/pkg/runtime/cache/sync.go b/test/infrastructure/inmemory/pkg/runtime/cache/sync.go index e731cff1b7e4..c05ff75b5dfe 100644 --- a/test/infrastructure/inmemory/pkg/runtime/cache/sync.go +++ b/test/infrastructure/inmemory/pkg/runtime/cache/sync.go @@ -50,10 +50,8 @@ func (c *cache) startSyncer(ctx context.Context) error { c.syncQueue.ShutDown() }() - syncLoopStarted := make(chan struct{}) go func() { log.Info("Starting sync loop") - syncLoopStarted <- struct{}{} for { select { case <-time.After(c.syncPeriod / 4): @@ -81,8 +79,6 @@ func (c *cache) startSyncer(ctx context.Context) error { wg.Wait() }() - <-syncLoopStarted - if err := wait.PollUntilContextTimeout(ctx, 50*time.Millisecond, 5*time.Second, false, func(context.Context) (done bool, err error) { if atomic.LoadInt64(&workers) < int64(c.syncConcurrency) { return false, nil From 7cc8b286e5245895c52b7312c091a65a9b4c867b Mon Sep 17 00:00:00 2001 From: sivchari Date: Tue, 24 Sep 2024 20:58:42 +0900 Subject: [PATCH 3/6] use sync chan Signed-off-by: sivchari --- test/infrastructure/inmemory/pkg/runtime/cache/sync.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/test/infrastructure/inmemory/pkg/runtime/cache/sync.go b/test/infrastructure/inmemory/pkg/runtime/cache/sync.go index c05ff75b5dfe..e731cff1b7e4 100644 --- a/test/infrastructure/inmemory/pkg/runtime/cache/sync.go +++ b/test/infrastructure/inmemory/pkg/runtime/cache/sync.go @@ -50,8 +50,10 @@ func (c *cache) startSyncer(ctx context.Context) error { c.syncQueue.ShutDown() }() + syncLoopStarted := make(chan struct{}) go func() { log.Info("Starting sync loop") + syncLoopStarted <- struct{}{} for { select { case <-time.After(c.syncPeriod / 4): @@ -79,6 +81,8 @@ func (c *cache) startSyncer(ctx context.Context) error { wg.Wait() }() + <-syncLoopStarted + if err := wait.PollUntilContextTimeout(ctx, 50*time.Millisecond, 5*time.Second, false, func(context.Context) (done bool, err error) { if atomic.LoadInt64(&workers) < int64(c.syncConcurrency) { return false, nil From 02c8ae0e8d7cc5e530c8275418d3839b29761714 Mon Sep 17 00:00:00 2001 From: sivchari Date: Tue, 24 Sep 2024 20:59:54 +0900 Subject: [PATCH 4/6] drop chan Signed-off-by: sivchari --- test/infrastructure/inmemory/pkg/runtime/cache/sync.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/test/infrastructure/inmemory/pkg/runtime/cache/sync.go b/test/infrastructure/inmemory/pkg/runtime/cache/sync.go index e731cff1b7e4..c05ff75b5dfe 100644 --- a/test/infrastructure/inmemory/pkg/runtime/cache/sync.go +++ b/test/infrastructure/inmemory/pkg/runtime/cache/sync.go @@ -50,10 +50,8 @@ func (c *cache) startSyncer(ctx context.Context) error { c.syncQueue.ShutDown() }() - syncLoopStarted := make(chan struct{}) go func() { log.Info("Starting sync loop") - syncLoopStarted <- struct{}{} for { select { case <-time.After(c.syncPeriod / 4): @@ -81,8 +79,6 @@ func (c *cache) startSyncer(ctx context.Context) error { wg.Wait() }() - <-syncLoopStarted - if err := wait.PollUntilContextTimeout(ctx, 50*time.Millisecond, 5*time.Second, false, func(context.Context) (done bool, err error) { if atomic.LoadInt64(&workers) < int64(c.syncConcurrency) { return false, nil From 4d91c48876a0b63096767e18aec983f1ce642160 Mon Sep 17 00:00:00 2001 From: sivchari Date: Mon, 14 Oct 2024 00:31:00 +0900 Subject: [PATCH 5/6] add -race option again Signed-off-by: sivchari --- Makefile | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/Makefile b/Makefile index 5d32f6d2c100..ec9ad71d45cc 100644 --- a/Makefile +++ b/Makefile @@ -941,7 +941,7 @@ test-cover: ## Run unit and integration tests and generate a coverage report .PHONY: test-docker-infrastructure test-docker-infrastructure: $(SETUP_ENVTEST) ## Run unit and integration tests for docker infrastructure provider - cd $(CAPD_DIR); KUBEBUILDER_ASSETS="$(KUBEBUILDER_ASSETS)" go test ./... $(TEST_ARGS) + cd $(CAPD_DIR); KUBEBUILDER_ASSETS="$(KUBEBUILDER_ASSETS)" go test -race ./... $(TEST_ARGS) .PHONY: test-docker-infrastructure-verbose test-docker-infrastructure-verbose: ## Run unit and integration tests for docker infrastructure provider with verbose flag @@ -949,13 +949,13 @@ test-docker-infrastructure-verbose: ## Run unit and integration tests for docker .PHONY: test-docker-infrastructure-junit test-docker-infrastructure-junit: $(SETUP_ENVTEST) $(GOTESTSUM) ## Run unit and integration tests and generate a junit report for docker infrastructure provider - cd $(CAPD_DIR); set +o errexit; (KUBEBUILDER_ASSETS="$(KUBEBUILDER_ASSETS)" go test -json ./... $(TEST_ARGS); echo $$? > $(ARTIFACTS)/junit.infra_docker.exitcode) | tee $(ARTIFACTS)/junit.infra_docker.stdout + cd $(CAPD_DIR); set +o errexit; (KUBEBUILDER_ASSETS="$(KUBEBUILDER_ASSETS)" go test -race -json ./... $(TEST_ARGS); echo $$? > $(ARTIFACTS)/junit.infra_docker.exitcode) | tee $(ARTIFACTS)/junit.infra_docker.stdout $(GOTESTSUM) --junitfile $(ARTIFACTS)/junit.infra_docker.xml --raw-command cat $(ARTIFACTS)/junit.infra_docker.stdout exit $$(cat $(ARTIFACTS)/junit.infra_docker.exitcode) .PHONY: test-in-memory-infrastructure test-in-memory-infrastructure: $(SETUP_ENVTEST) ## Run unit and integration tests for in-memory infrastructure provider - cd $(CAPIM_DIR); KUBEBUILDER_ASSETS="$(KUBEBUILDER_ASSETS)" go test ./... $(TEST_ARGS) + cd $(CAPIM_DIR); KUBEBUILDER_ASSETS="$(KUBEBUILDER_ASSETS)" go test -race ./... $(TEST_ARGS) .PHONY: test-in-memory-infrastructure-verbose test-in-memory-infrastructure-verbose: ## Run unit and integration tests for in-memory infrastructure provider with verbose flag @@ -963,13 +963,13 @@ test-in-memory-infrastructure-verbose: ## Run unit and integration tests for in- .PHONY: test-in-memory-infrastructure-junit test-in-memory-infrastructure-junit: $(SETUP_ENVTEST) $(GOTESTSUM) ## Run unit and integration tests and generate a junit report for in-memory infrastructure provider - cd $(CAPIM_DIR); set +o errexit; (KUBEBUILDER_ASSETS="$(KUBEBUILDER_ASSETS)" go test -json ./... $(TEST_ARGS); echo $$? > $(ARTIFACTS)/junit.infra_inmemory.exitcode) | tee $(ARTIFACTS)/junit.infra_inmemory.stdout + cd $(CAPIM_DIR); set +o errexit; (KUBEBUILDER_ASSETS="$(KUBEBUILDER_ASSETS)" go test -race -json ./... $(TEST_ARGS); echo $$? > $(ARTIFACTS)/junit.infra_inmemory.exitcode) | tee $(ARTIFACTS)/junit.infra_inmemory.stdout $(GOTESTSUM) --junitfile $(ARTIFACTS)/junit.infra_inmemory.xml --raw-command cat $(ARTIFACTS)/junit.infra_inmemory.stdout exit $$(cat $(ARTIFACTS)/junit.infra_inmemory.exitcode) .PHONY: test-test-extension test-test-extension: $(SETUP_ENVTEST) ## Run unit and integration tests for the test extension - cd $(TEST_EXTENSION_DIR); KUBEBUILDER_ASSETS="$(KUBEBUILDER_ASSETS)" go test ./... $(TEST_ARGS) + cd $(TEST_EXTENSION_DIR); KUBEBUILDER_ASSETS="$(KUBEBUILDER_ASSETS)" go test -race ./... $(TEST_ARGS) .PHONY: test-test-extension-verbose test-test-extension-verbose: ## Run unit and integration tests with verbose flag @@ -977,7 +977,7 @@ test-test-extension-verbose: ## Run unit and integration tests with verbose flag .PHONY: test-test-extension-junit test-test-extension-junit: $(SETUP_ENVTEST) $(GOTESTSUM) ## Run unit and integration tests and generate a junit report for the test extension - cd $(TEST_EXTENSION_DIR); set +o errexit; (KUBEBUILDER_ASSETS="$(KUBEBUILDER_ASSETS)" go test -json ./... $(TEST_ARGS); echo $$? > $(ARTIFACTS)/junit.test_extension.exitcode) | tee $(ARTIFACTS)/junit.test_extension.stdout + cd $(TEST_EXTENSION_DIR); set +o errexit; (KUBEBUILDER_ASSETS="$(KUBEBUILDER_ASSETS)" go test -race -json ./... $(TEST_ARGS); echo $$? > $(ARTIFACTS)/junit.test_extension.exitcode) | tee $(ARTIFACTS)/junit.test_extension.stdout $(GOTESTSUM) --junitfile $(ARTIFACTS)/junit.test_extension.xml --raw-command cat $(ARTIFACTS)/junit.test_extension.stdout exit $$(cat $(ARTIFACTS)/junit.test_extension.exitcode) @@ -1197,7 +1197,7 @@ release-notes: release-notes-tool .PHONY: test-release-notes-tool test-release-notes-tool: - go test -C hack/tools -v -tags tools,integration sigs.k8s.io/cluster-api/hack/tools/release/notes + go test -race -C hack/tools -v -tags tools,integration sigs.k8s.io/cluster-api/hack/tools/release/notes .PHONY: release-provider-issues-tool release-provider-issues-tool: # Creates GitHub issues in a pre-defined list of CAPI provider repositories From 6ce1b6621afd7c164624c2a3e128a33e30c84505 Mon Sep 17 00:00:00 2001 From: sivchari Date: Wed, 23 Oct 2024 09:58:00 +0900 Subject: [PATCH 6/6] use atomic package Signed-off-by: sivchari --- .../inmemory/pkg/runtime/cache/sync.go | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/test/infrastructure/inmemory/pkg/runtime/cache/sync.go b/test/infrastructure/inmemory/pkg/runtime/cache/sync.go index c05ff75b5dfe..3d7163d3a064 100644 --- a/test/infrastructure/inmemory/pkg/runtime/cache/sync.go +++ b/test/infrastructure/inmemory/pkg/runtime/cache/sync.go @@ -50,8 +50,10 @@ func (c *cache) startSyncer(ctx context.Context) error { c.syncQueue.ShutDown() }() + var syncLoopStarted atomic.Bool go func() { log.Info("Starting sync loop") + syncLoopStarted.Store(true) for { select { case <-time.After(c.syncPeriod / 4): @@ -61,15 +63,14 @@ func (c *cache) startSyncer(ctx context.Context) error { } } }() - - var workers int64 + var workers atomic.Int64 go func() { log.Info("Starting sync workers", "count", c.syncConcurrency) wg := &sync.WaitGroup{} wg.Add(c.syncConcurrency) for range c.syncConcurrency { go func() { - atomic.AddInt64(&workers, 1) + workers.Add(1) defer wg.Done() for c.processSyncWorkItem(ctx) { } @@ -80,7 +81,16 @@ func (c *cache) startSyncer(ctx context.Context) error { }() if err := wait.PollUntilContextTimeout(ctx, 50*time.Millisecond, 5*time.Second, false, func(context.Context) (done bool, err error) { - if atomic.LoadInt64(&workers) < int64(c.syncConcurrency) { + if !syncLoopStarted.Load() { + return false, nil + } + return true, nil + }); err != nil { + return fmt.Errorf("failed to start sync loop: %v", err) + } + + if err := wait.PollUntilContextTimeout(ctx, 50*time.Millisecond, 5*time.Second, false, func(context.Context) (done bool, err error) { + if workers.Load() < int64(c.syncConcurrency) { return false, nil } return true, nil