From 0f94385db92f16349c5113036bdb09d2d20a8bd8 Mon Sep 17 00:00:00 2001 From: Ian Wahbe Date: Thu, 12 Dec 2024 14:10:13 +0100 Subject: [PATCH] Call configure in sequence (not in parallel) --- pkg/x/muxer/muxer.go | 27 +++++++++++---------------- pkg/x/muxer/muxer_test.go | 35 +++++++++++++++++++++++++++++++++++ 2 files changed, 46 insertions(+), 16 deletions(-) diff --git a/pkg/x/muxer/muxer.go b/pkg/x/muxer/muxer.go index d8db6efbe..2fa8b722a 100644 --- a/pkg/x/muxer/muxer.go +++ b/pkg/x/muxer/muxer.go @@ -291,14 +291,6 @@ func (m *muxer) DiffConfig(ctx context.Context, req *pulumirpc.DiffRequest) (*pu func (m *muxer) Configure(ctx context.Context, req *pulumirpc.ConfigureRequest) (*pulumirpc.ConfigureResponse, error) { // Configure determines what the values the provider understands. We take the // `and` of configure values. - subs := make([]func() tuple[*pulumirpc.ConfigureResponse, error], len(m.servers)) - for i, s := range m.servers { - i, s := i, s - subs[i] = func() tuple[*pulumirpc.ConfigureResponse, error] { - req := proto.Clone(req).(*pulumirpc.ConfigureRequest) - return newTuple(s.Configure(ctx, req)) - } - } response := &pulumirpc.ConfigureResponse{ AcceptSecrets: true, SupportsPreview: true, @@ -307,17 +299,20 @@ func (m *muxer) Configure(ctx context.Context, req *pulumirpc.ConfigureRequest) SupportsAutonamingConfiguration: true, } errs := new(multierror.Error) - for _, r := range asyncJoin(subs) { - if r.B != nil { - errs.Errors = append(errs.Errors, r.B) + + for _, s := range m.servers { + req := proto.Clone(req).(*pulumirpc.ConfigureRequest) + r, err := s.Configure(ctx, req) + if err != nil { + errs.Errors = append(errs.Errors, err) continue } - response.AcceptOutputs = response.AcceptOutputs && r.A.GetAcceptOutputs() - response.AcceptResources = response.AcceptResources && r.A.GetAcceptResources() - response.AcceptSecrets = response.AcceptSecrets && r.A.GetAcceptSecrets() - response.SupportsPreview = response.SupportsPreview && r.A.GetSupportsPreview() + response.AcceptOutputs = response.AcceptOutputs && r.GetAcceptOutputs() + response.AcceptResources = response.AcceptResources && r.GetAcceptResources() + response.AcceptSecrets = response.AcceptSecrets && r.GetAcceptSecrets() + response.SupportsPreview = response.SupportsPreview && r.GetSupportsPreview() response.SupportsAutonamingConfiguration = response.SupportsAutonamingConfiguration && - r.A.GetSupportsAutonamingConfiguration() + r.GetSupportsAutonamingConfiguration() } return response, m.muxedErrors(errs) } diff --git a/pkg/x/muxer/muxer_test.go b/pkg/x/muxer/muxer_test.go index e6c2bb60f..b706b0414 100644 --- a/pkg/x/muxer/muxer_test.go +++ b/pkg/x/muxer/muxer_test.go @@ -17,6 +17,7 @@ package muxer import ( "context" "fmt" + "sync/atomic" "testing" "github.com/pulumi/pulumi/sdk/v3/go/common/diag" @@ -200,3 +201,37 @@ func (s diffConfigServer) DiffConfig( } return s.UnimplementedResourceProviderServer.DiffConfig(ctx, req) } + +func TestConfigureInSequence(t *testing.T) { + t.Parallel() + ctx := context.Background() + + for i := 0; i < 1000; i++ { + var count atomic.Uint32 + m := &muxer{ + host: &host{}, + servers: []server{ + configure{t: t, expect: 0, counter: &count}, + configure{t: t, expect: 1, counter: &count}, + configure{t: t, expect: 2, counter: &count}, + configure{t: t, expect: 3, counter: &count}, + }, + } + _, err := m.Configure(ctx, &pulumirpc.ConfigureRequest{}) + require.NoError(t, err) + + assert.Equal(t, uint32(4), count.Load()) + } +} + +type configure struct { + pulumirpc.UnimplementedResourceProviderServer + t *testing.T + expect uint32 + counter *atomic.Uint32 +} + +func (c configure) Configure(context.Context, *pulumirpc.ConfigureRequest) (*pulumirpc.ConfigureResponse, error) { + assert.True(c.t, c.counter.CompareAndSwap(c.expect, c.expect+1), "") + return &pulumirpc.ConfigureResponse{}, nil +}