Skip to content

Commit

Permalink
Call configure in sequence (not in parallel)
Browse files Browse the repository at this point in the history
  • Loading branch information
iwahbe committed Dec 12, 2024
1 parent 7b3ace6 commit 0f94385
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 16 deletions.
27 changes: 11 additions & 16 deletions pkg/x/muxer/muxer.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,14 +291,6 @@ func (m *muxer) DiffConfig(ctx context.Context, req *pulumirpc.DiffRequest) (*pu
func (m *muxer) Configure(ctx context.Context, req *pulumirpc.ConfigureRequest) (*pulumirpc.ConfigureResponse, error) {
// Configure determines what the values the provider understands. We take the
// `and` of configure values.
subs := make([]func() tuple[*pulumirpc.ConfigureResponse, error], len(m.servers))
for i, s := range m.servers {
i, s := i, s
subs[i] = func() tuple[*pulumirpc.ConfigureResponse, error] {
req := proto.Clone(req).(*pulumirpc.ConfigureRequest)
return newTuple(s.Configure(ctx, req))
}
}
response := &pulumirpc.ConfigureResponse{
AcceptSecrets: true,
SupportsPreview: true,
Expand All @@ -307,17 +299,20 @@ func (m *muxer) Configure(ctx context.Context, req *pulumirpc.ConfigureRequest)
SupportsAutonamingConfiguration: true,
}
errs := new(multierror.Error)
for _, r := range asyncJoin(subs) {
if r.B != nil {
errs.Errors = append(errs.Errors, r.B)

for _, s := range m.servers {
req := proto.Clone(req).(*pulumirpc.ConfigureRequest)
r, err := s.Configure(ctx, req)
if err != nil {
errs.Errors = append(errs.Errors, err)
continue
}
response.AcceptOutputs = response.AcceptOutputs && r.A.GetAcceptOutputs()
response.AcceptResources = response.AcceptResources && r.A.GetAcceptResources()
response.AcceptSecrets = response.AcceptSecrets && r.A.GetAcceptSecrets()
response.SupportsPreview = response.SupportsPreview && r.A.GetSupportsPreview()
response.AcceptOutputs = response.AcceptOutputs && r.GetAcceptOutputs()
response.AcceptResources = response.AcceptResources && r.GetAcceptResources()
response.AcceptSecrets = response.AcceptSecrets && r.GetAcceptSecrets()
response.SupportsPreview = response.SupportsPreview && r.GetSupportsPreview()
response.SupportsAutonamingConfiguration = response.SupportsAutonamingConfiguration &&
r.A.GetSupportsAutonamingConfiguration()
r.GetSupportsAutonamingConfiguration()
}
return response, m.muxedErrors(errs)
}
Expand Down
35 changes: 35 additions & 0 deletions pkg/x/muxer/muxer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package muxer
import (
"context"
"fmt"
"sync/atomic"
"testing"

"github.com/pulumi/pulumi/sdk/v3/go/common/diag"
Expand Down Expand Up @@ -200,3 +201,37 @@ func (s diffConfigServer) DiffConfig(
}
return s.UnimplementedResourceProviderServer.DiffConfig(ctx, req)
}

func TestConfigureInSequence(t *testing.T) {
t.Parallel()
ctx := context.Background()

for i := 0; i < 1000; i++ {
var count atomic.Uint32
m := &muxer{
host: &host{},
servers: []server{
configure{t: t, expect: 0, counter: &count},
configure{t: t, expect: 1, counter: &count},
configure{t: t, expect: 2, counter: &count},
configure{t: t, expect: 3, counter: &count},
},
}
_, err := m.Configure(ctx, &pulumirpc.ConfigureRequest{})
require.NoError(t, err)

assert.Equal(t, uint32(4), count.Load())
}
}

type configure struct {
pulumirpc.UnimplementedResourceProviderServer
t *testing.T
expect uint32
counter *atomic.Uint32
}

func (c configure) Configure(context.Context, *pulumirpc.ConfigureRequest) (*pulumirpc.ConfigureResponse, error) {
assert.True(c.t, c.counter.CompareAndSwap(c.expect, c.expect+1), "")
return &pulumirpc.ConfigureResponse{}, nil
}

0 comments on commit 0f94385

Please sign in to comment.