From aa6ee1e5ee05f26dcae6751384875d13a424ad5a Mon Sep 17 00:00:00 2001 From: Matthieu MOREL Date: Wed, 1 Nov 2023 13:28:18 +0100 Subject: [PATCH] ci: enable more linters Co-authored-by: James Peach Signed-off-by: Matthieu MOREL --- .github/workflows/golangci-lint.yaml | 2 +- .golangci.yml | 18 +++-- Makefile | 2 +- pkg/cache/v3/cache.go | 33 +++++--- pkg/cache/v3/cache_test.go | 49 ++++++------ pkg/cache/v3/delta_test.go | 7 +- pkg/cache/v3/fixtures_test.go | 1 - pkg/cache/v3/linear.go | 20 ++--- pkg/cache/v3/linear_test.go | 100 ++++++++++++------------ pkg/cache/v3/resource.go | 16 ++-- pkg/cache/v3/resource_test.go | 28 ++++--- pkg/cache/v3/simple.go | 74 +++++++++--------- pkg/cache/v3/simple_test.go | 28 ++++--- pkg/cache/v3/snapshot_test.go | 12 +-- pkg/cache/v3/status.go | 6 +- pkg/cache/v3/status_test.go | 2 +- pkg/client/sotw/v3/client_test.go | 39 ++++----- pkg/conversion/struct_test.go | 4 +- pkg/integration/ttl_integration_test.go | 39 ++++----- pkg/log/log_test.go | 8 +- pkg/server/delta/v3/server.go | 12 +-- pkg/server/sotw/v3/ads.go | 14 ++-- pkg/server/sotw/v3/server.go | 8 +- pkg/server/sotw/v3/xds.go | 14 ++-- pkg/server/v3/delta_test.go | 66 ++++++++-------- pkg/server/v3/server_test.go | 95 +++++++++++----------- pkg/test/main/main.go | 3 +- pkg/test/resource/v3/resource.go | 10 +-- pkg/test/server.go | 1 - pkg/test/v3/accesslog.go | 24 +++--- 30 files changed, 379 insertions(+), 356 deletions(-) diff --git a/.github/workflows/golangci-lint.yaml b/.github/workflows/golangci-lint.yaml index 012518efc9..551d22c1c6 100644 --- a/.github/workflows/golangci-lint.yaml +++ b/.github/workflows/golangci-lint.yaml @@ -15,4 +15,4 @@ jobs: only-new-issues: true # Optional: golangci-lint command line arguments. - args: --timeout=10m0s + args: --verbose diff --git a/.golangci.yml b/.golangci.yml index 5ff911e4b4..b7d67bf0a1 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -1,5 +1,12 @@ -run: - timeout: 10m +issues: + fix: true + exclude-rules: + - linters: + - gosec + text: 'G101' + path: 'pkg/test/resource/v3/secret.go' + max-issues-per-linter: 0 + max-same-issues: 0 linters: presets: @@ -7,7 +14,8 @@ linters: enable: - bodyclose - - gofmt + - errorlint + - gofumpt - goimports - gosec - misspell @@ -30,5 +38,5 @@ linters-settings: exhaustive: default-signifies-exhaustive: true -issues: - fix: true +run: + timeout: 10m diff --git a/Makefile b/Makefile index 8b5dd1f9e2..a65680296f 100644 --- a/Makefile +++ b/Makefile @@ -44,7 +44,7 @@ lint: --rm \ --volume $$(pwd):/src \ --workdir /src \ - golangci/golangci-lint:v1.52.2 \ + golangci/golangci-lint:latest \ golangci-lint -v run #----------------- diff --git a/pkg/cache/v3/cache.go b/pkg/cache/v3/cache.go index b92d50c91d..5ad8e24140 100644 --- a/pkg/cache/v3/cache.go +++ b/pkg/cache/v3/cache.go @@ -167,8 +167,10 @@ type RawDeltaResponse struct { marshaledResponse atomic.Value } -var _ Response = &RawResponse{} -var _ DeltaResponse = &RawDeltaResponse{} +var ( + _ Response = &RawResponse{} + _ DeltaResponse = &RawDeltaResponse{} +) // PassthroughResponse is a pre constructed xDS response that need not go through marshaling transformations. type PassthroughResponse struct { @@ -195,8 +197,10 @@ type DeltaPassthroughResponse struct { ctx context.Context } -var _ Response = &PassthroughResponse{} -var _ DeltaResponse = &DeltaPassthroughResponse{} +var ( + _ Response = &PassthroughResponse{} + _ DeltaResponse = &DeltaPassthroughResponse{} +) // GetDiscoveryResponse performs the marshaling the first time its called and uses the cached response subsequently. // This is necessary because the marshaled response does not change across the calls. @@ -225,7 +229,7 @@ func (r *RawResponse) GetDiscoveryResponse() (*discovery.DiscoveryResponse, erro marshaledResponse = &discovery.DiscoveryResponse{ VersionInfo: r.Version, Resources: marshaledResources, - TypeUrl: r.Request.TypeUrl, + TypeUrl: r.GetRequest().GetTypeUrl(), } r.marshaledResponse.Store(marshaledResponse) @@ -256,7 +260,7 @@ func (r *RawDeltaResponse) GetDeltaDiscoveryResponse() (*discovery.DeltaDiscover marshaledResources[i] = &discovery.Resource{ Name: name, Resource: &anypb.Any{ - TypeUrl: r.DeltaRequest.TypeUrl, + TypeUrl: r.GetDeltaRequest().GetTypeUrl(), Value: marshaledResource, }, Version: version, @@ -266,7 +270,7 @@ func (r *RawDeltaResponse) GetDeltaDiscoveryResponse() (*discovery.DeltaDiscover marshaledResponse = &discovery.DeltaDiscoveryResponse{ Resources: marshaledResources, RemovedResources: r.RemovedResources, - TypeUrl: r.DeltaRequest.TypeUrl, + TypeUrl: r.GetDeltaRequest().GetTypeUrl(), SystemVersionInfo: r.SystemVersionInfo, } r.marshaledResponse.Store(marshaledResponse) @@ -322,14 +326,14 @@ func (r *RawResponse) maybeCreateTTLResource(resource types.ResourceWithTTL) (ty if err != nil { return nil, "", err } - rsrc.TypeUrl = r.Request.TypeUrl + rsrc.TypeUrl = r.GetRequest().GetTypeUrl() wrappedResource.Resource = rsrc } return wrappedResource, deltaResourceTypeURL, nil } - return resource.Resource, r.Request.TypeUrl, nil + return resource.Resource, r.GetRequest().GetTypeUrl(), nil } // GetDiscoveryResponse returns the final passthrough Discovery Response. @@ -354,19 +358,22 @@ func (r *DeltaPassthroughResponse) GetDeltaRequest() *discovery.DeltaDiscoveryRe // GetVersion returns the response version. func (r *PassthroughResponse) GetVersion() (string, error) { - if r.DiscoveryResponse != nil { - return r.DiscoveryResponse.VersionInfo, nil + discoveryResponse, _ := r.GetDiscoveryResponse() + if discoveryResponse != nil { + return discoveryResponse.GetVersionInfo(), nil } return "", fmt.Errorf("DiscoveryResponse is nil") } + func (r *PassthroughResponse) GetContext() context.Context { return r.ctx } // GetSystemVersion returns the response version. func (r *DeltaPassthroughResponse) GetSystemVersion() (string, error) { - if r.DeltaDiscoveryResponse != nil { - return r.DeltaDiscoveryResponse.SystemVersionInfo, nil + deltaDiscoveryResponse, _ := r.GetDeltaDiscoveryResponse() + if deltaDiscoveryResponse != nil { + return deltaDiscoveryResponse.GetSystemVersionInfo(), nil } return "", fmt.Errorf("DeltaDiscoveryResponse is nil") } diff --git a/pkg/cache/v3/cache_test.go b/pkg/cache/v3/cache_test.go index 2b76723312..c476a63dde 100644 --- a/pkg/cache/v3/cache_test.go +++ b/pkg/cache/v3/cache_test.go @@ -4,6 +4,7 @@ import ( "testing" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "google.golang.org/protobuf/encoding/protojson" "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/anypb" @@ -28,24 +29,24 @@ func TestResponseGetDiscoveryResponse(t *testing.T) { } discoveryResponse, err := resp.GetDiscoveryResponse() - assert.Nil(t, err) - assert.Equal(t, discoveryResponse.VersionInfo, resp.Version) - assert.Equal(t, len(discoveryResponse.Resources), 1) + require.NoError(t, err) + assert.Equal(t, discoveryResponse.GetVersionInfo(), resp.Version) + assert.Len(t, discoveryResponse.GetResources(), 1) cachedResponse, err := resp.GetDiscoveryResponse() - assert.Nil(t, err) + require.NoError(t, err) assert.Same(t, discoveryResponse, cachedResponse) r := &route.RouteConfiguration{} - err = anypb.UnmarshalTo(discoveryResponse.Resources[0], r, proto.UnmarshalOptions{}) - assert.Nil(t, err) - assert.Equal(t, r.Name, resourceName) + err = anypb.UnmarshalTo(discoveryResponse.GetResources()[0], r, proto.UnmarshalOptions{}) + require.NoError(t, err) + assert.Equal(t, resourceName, r.GetName()) } func TestPassthroughResponseGetDiscoveryResponse(t *testing.T) { routes := []types.Resource{&route.RouteConfiguration{Name: resourceName}} rsrc, err := anypb.New(routes[0]) - assert.Nil(t, err) + require.NoError(t, err) dr := &discovery.DiscoveryResponse{ TypeUrl: resource.RouteType, Resources: []*anypb.Any{rsrc}, @@ -57,14 +58,14 @@ func TestPassthroughResponseGetDiscoveryResponse(t *testing.T) { } discoveryResponse, err := resp.GetDiscoveryResponse() - assert.Nil(t, err) - assert.Equal(t, discoveryResponse.VersionInfo, resp.DiscoveryResponse.VersionInfo) - assert.Equal(t, len(discoveryResponse.Resources), 1) + require.NoError(t, err) + assert.Equal(t, "v", discoveryResponse.GetVersionInfo()) + assert.Len(t, discoveryResponse.GetResources(), 1) r := &route.RouteConfiguration{} - err = anypb.UnmarshalTo(discoveryResponse.Resources[0], r, proto.UnmarshalOptions{}) - assert.Nil(t, err) - assert.Equal(t, r.Name, resourceName) + err = anypb.UnmarshalTo(discoveryResponse.GetResources()[0], r, proto.UnmarshalOptions{}) + require.NoError(t, err) + assert.Equal(t, resourceName, r.GetName()) assert.Equal(t, discoveryResponse, dr) } @@ -78,27 +79,27 @@ func TestHeartbeatResponseGetDiscoveryResponse(t *testing.T) { } discoveryResponse, err := resp.GetDiscoveryResponse() - assert.Nil(t, err) - assert.Equal(t, discoveryResponse.VersionInfo, resp.Version) - assert.Equal(t, len(discoveryResponse.Resources), 1) - assert.False(t, isTTLResource(discoveryResponse.Resources[0])) + require.NoError(t, err) + assert.Equal(t, discoveryResponse.GetVersionInfo(), resp.Version) + require.Len(t, discoveryResponse.GetResources(), 1) + assert.False(t, isTTLResource(discoveryResponse.GetResources()[0])) cachedResponse, err := resp.GetDiscoveryResponse() - assert.Nil(t, err) + require.NoError(t, err) assert.Same(t, discoveryResponse, cachedResponse) r := &route.RouteConfiguration{} - err = anypb.UnmarshalTo(discoveryResponse.Resources[0], r, proto.UnmarshalOptions{}) - assert.Nil(t, err) - assert.Equal(t, r.Name, resourceName) + err = anypb.UnmarshalTo(discoveryResponse.GetResources()[0], r, proto.UnmarshalOptions{}) + require.NoError(t, err) + assert.Equal(t, resourceName, r.GetName()) } func isTTLResource(resource *anypb.Any) bool { wrappedResource := &discovery.Resource{} - err := protojson.Unmarshal(resource.Value, wrappedResource) + err := protojson.Unmarshal(resource.GetValue(), wrappedResource) if err != nil { return false } - return wrappedResource.Resource == nil + return wrappedResource.GetResource() == nil } diff --git a/pkg/cache/v3/delta_test.go b/pkg/cache/v3/delta_test.go index 4999cad603..fc4ee91327 100644 --- a/pkg/cache/v3/delta_test.go +++ b/pkg/cache/v3/delta_test.go @@ -9,6 +9,7 @@ import ( "github.com/google/go-cmp/cmp" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "google.golang.org/protobuf/testing/protocmp" core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" @@ -240,7 +241,7 @@ func TestSnapshotDeltaCacheWatchTimeout(t *testing.T) { defer cancel() err := c.SetSnapshot(ctx, key, fixture.snapshot()) - assert.EqualError(t, err, context.Canceled.Error()) + require.EqualError(t, err, context.Canceled.Error()) // Now reset the snapshot with a consuming channel. This verifies that if setting the snapshot fails, // we can retry by setting the same snapshot. In other words, we keep the watch open even if we failed @@ -253,13 +254,13 @@ func TestSnapshotDeltaCacheWatchTimeout(t *testing.T) { }() err = c.SetSnapshot(context.WithValue(context.Background(), testKey{}, "bar"), key, fixture.snapshot()) - assert.NoError(t, err) + require.NoError(t, err) // The channel should get closed due to the watch trigger. select { case response := <-watchTriggeredCh: // Verify that we pass the context through. - assert.Equal(t, response.GetContext().Value(testKey{}), "bar") + assert.Equal(t, "bar", response.GetContext().Value(testKey{})) case <-time.After(time.Second): t.Fatalf("timed out") } diff --git a/pkg/cache/v3/fixtures_test.go b/pkg/cache/v3/fixtures_test.go index 907c8e7ad5..c242bef75e 100644 --- a/pkg/cache/v3/fixtures_test.go +++ b/pkg/cache/v3/fixtures_test.go @@ -31,7 +31,6 @@ func (f *fixtureGenerator) snapshot() *cache.Snapshot { rsrc.ExtensionConfigType: {testExtensionConfig}, }, ) - if err != nil { panic(err.Error()) } diff --git a/pkg/cache/v3/linear.go b/pkg/cache/v3/linear.go index cf5ab7e268..7c8a36c51e 100644 --- a/pkg/cache/v3/linear.go +++ b/pkg/cache/v3/linear.go @@ -187,7 +187,7 @@ func (cache *LinearCache) respondDelta(request *DeltaRequest, value chan DeltaRe if len(resp.Resources) > 0 || len(resp.RemovedResources) > 0 { if cache.log != nil { cache.log.Debugf("[linear cache] node: %s, sending delta response for typeURL %s with resources: %v removed resources: %v with wildcard: %t", - request.GetNode().GetId(), request.TypeUrl, GetResourceNames(resp.Resources), resp.RemovedResources, state.IsWildcard()) + request.GetNode().GetId(), request.GetTypeUrl(), GetResourceNames(resp.Resources), resp.RemovedResources, state.IsWildcard()) } value <- resp return resp @@ -299,7 +299,7 @@ func (cache *LinearCache) GetResources() map[string]types.Resource { } func (cache *LinearCache) CreateWatch(request *Request, _ stream.StreamState, value chan Response) func() { - if request.TypeUrl != cache.typeURL { + if request.GetTypeUrl() != cache.typeURL { value <- nil return nil } @@ -312,8 +312,8 @@ func (cache *LinearCache) CreateWatch(request *Request, _ stream.StreamState, va // strip version prefix if it is present var lastVersion uint64 var err error - if strings.HasPrefix(request.VersionInfo, cache.versionPrefix) { - lastVersion, err = strconv.ParseUint(request.VersionInfo[len(cache.versionPrefix):], 0, 64) + if strings.HasPrefix(request.GetVersionInfo(), cache.versionPrefix) { + lastVersion, err = strconv.ParseUint(request.GetVersionInfo()[len(cache.versionPrefix):], 0, 64) } else { err = errors.New("mis-matched version prefix") } @@ -323,11 +323,11 @@ func (cache *LinearCache) CreateWatch(request *Request, _ stream.StreamState, va if err != nil { stale = true - staleResources = request.ResourceNames - } else if len(request.ResourceNames) == 0 { + staleResources = request.GetResourceNames() + } else if len(request.GetResourceNames()) == 0 { stale = lastVersion != cache.version } else { - for _, name := range request.ResourceNames { + for _, name := range request.GetResourceNames() { // When a resource is removed, its version defaults 0 and it is not considered stale. if lastVersion < cache.versionVector[name] { stale = true @@ -340,7 +340,7 @@ func (cache *LinearCache) CreateWatch(request *Request, _ stream.StreamState, va return nil } // Create open watches since versions are up to date. - if len(request.ResourceNames) == 0 { + if len(request.GetResourceNames()) == 0 { cache.watchAll[value] = struct{}{} return func() { cache.mu.Lock() @@ -348,7 +348,7 @@ func (cache *LinearCache) CreateWatch(request *Request, _ stream.StreamState, va delete(cache.watchAll, value) } } - for _, name := range request.ResourceNames { + for _, name := range request.GetResourceNames() { set, exists := cache.watches[name] if !exists { set = make(watches) @@ -359,7 +359,7 @@ func (cache *LinearCache) CreateWatch(request *Request, _ stream.StreamState, va return func() { cache.mu.Lock() defer cache.mu.Unlock() - for _, name := range request.ResourceNames { + for _, name := range request.GetResourceNames() { set, exists := cache.watches[name] if exists { delete(set, value) diff --git a/pkg/cache/v3/linear_test.go b/pkg/cache/v3/linear_test.go index 617d90366e..82478ca3aa 100644 --- a/pkg/cache/v3/linear_test.go +++ b/pkg/cache/v3/linear_test.go @@ -41,8 +41,8 @@ func testResource(s string) types.Resource { func verifyResponse(t *testing.T, ch <-chan Response, version string, num int) { t.Helper() r := <-ch - if r.GetRequest().TypeUrl != testType { - t.Errorf("unexpected empty request type URL: %q", r.GetRequest().TypeUrl) + if r.GetRequest().GetTypeUrl() != testType { + t.Errorf("unexpected empty request type URL: %q", r.GetRequest().GetTypeUrl()) } if r.GetContext() == nil { t.Errorf("unexpected empty response context") @@ -51,17 +51,17 @@ func verifyResponse(t *testing.T, ch <-chan Response, version string, num int) { if err != nil { t.Fatal(err) } - if out.VersionInfo == "" { + if out.GetVersionInfo() == "" { t.Error("unexpected response empty version") } - if n := len(out.Resources); n != num { + if n := len(out.GetResources()); n != num { t.Errorf("unexpected number of responses: got %d, want %d", n, num) } - if version != "" && out.VersionInfo != version { - t.Errorf("unexpected version: got %q, want %q", out.VersionInfo, version) + if version != "" && out.GetVersionInfo() != version { + t.Errorf("unexpected version: got %q, want %q", out.GetVersionInfo(), version) } - if out.TypeUrl != testType { - t.Errorf("unexpected type URL: %q", out.TypeUrl) + if out.GetTypeUrl() != testType { + t.Errorf("unexpected type URL: %q", out.GetTypeUrl()) } } @@ -73,24 +73,24 @@ type resourceInfo struct { func validateDeltaResponse(t *testing.T, resp DeltaResponse, resources []resourceInfo, deleted []string) { t.Helper() - if resp.GetDeltaRequest().TypeUrl != testType { - t.Errorf("unexpected empty request type URL: %q", resp.GetDeltaRequest().TypeUrl) + if resp.GetDeltaRequest().GetTypeUrl() != testType { + t.Errorf("unexpected empty request type URL: %q", resp.GetDeltaRequest().GetTypeUrl()) } out, err := resp.GetDeltaDiscoveryResponse() if err != nil { t.Fatal(err) } - if len(out.Resources) != len(resources) { - t.Errorf("unexpected number of responses: got %d, want %d", len(out.Resources), len(resources)) + if len(out.GetResources()) != len(resources) { + t.Errorf("unexpected number of responses: got %d, want %d", len(out.GetResources()), len(resources)) } for _, r := range resources { found := false - for _, r1 := range out.Resources { - if r1.Name == r.name && r1.Version == r.version { + for _, r1 := range out.GetResources() { + if r1.GetName() == r.name && r1.GetVersion() == r.version { found = true break - } else if r1.Name == r.name { - t.Errorf("unexpected version for resource %q: got %q, want %q", r.name, r1.Version, r.version) + } else if r1.GetName() == r.name { + t.Errorf("unexpected version for resource %q: got %q, want %q", r.name, r1.GetVersion(), r.version) found = true break } @@ -99,15 +99,15 @@ func validateDeltaResponse(t *testing.T, resp DeltaResponse, resources []resourc t.Errorf("resource with name %q not found in response", r.name) } } - if out.TypeUrl != testType { - t.Errorf("unexpected type URL: %q", out.TypeUrl) + if out.GetTypeUrl() != testType { + t.Errorf("unexpected type URL: %q", out.GetTypeUrl()) } - if len(out.RemovedResources) != len(deleted) { - t.Errorf("unexpected number of removed resurces: got %d, want %d", len(out.RemovedResources), len(deleted)) + if len(out.GetRemovedResources()) != len(deleted) { + t.Errorf("unexpected number of removed resurces: got %d, want %d", len(out.GetRemovedResources()), len(deleted)) } for _, r := range deleted { found := false - for _, rr := range out.RemovedResources { + for _, rr := range out.GetRemovedResources() { if r == rr { found = true break @@ -473,7 +473,7 @@ func TestLinearDeltaWildcard(t *testing.T) { a := &endpoint.ClusterLoadAssignment{ClusterName: "a"} hash := hashResource(t, a) err := c.UpdateResource("a", a) - assert.NoError(t, err) + require.NoError(t, err) checkDeltaWatchCount(t, c, 0) verifyDeltaResponse(t, w1, []resourceInfo{{"a", hash}}, nil) verifyDeltaResponse(t, w2, []resourceInfo{{"a", hash}}, nil) @@ -484,11 +484,11 @@ func TestLinearDeltaExistingResources(t *testing.T) { a := &endpoint.ClusterLoadAssignment{ClusterName: "a"} hashA := hashResource(t, a) err := c.UpdateResource("a", a) - assert.NoError(t, err) + require.NoError(t, err) b := &endpoint.ClusterLoadAssignment{ClusterName: "b"} hashB := hashResource(t, b) err = c.UpdateResource("b", b) - assert.NoError(t, err) + require.NoError(t, err) state := stream.NewStreamState(false, nil) state.SetSubscribedResourceNames(map[string]struct{}{"b": {}, "c": {}}) // watching b and c - not interested in a @@ -510,11 +510,11 @@ func TestLinearDeltaInitialResourcesVersionSet(t *testing.T) { a := &endpoint.ClusterLoadAssignment{ClusterName: "a"} hashA := hashResource(t, a) err := c.UpdateResource("a", a) - assert.NoError(t, err) + require.NoError(t, err) b := &endpoint.ClusterLoadAssignment{ClusterName: "b"} hashB := hashResource(t, b) err = c.UpdateResource("b", b) - assert.NoError(t, err) + require.NoError(t, err) state := stream.NewStreamState(false, map[string]string{"b": hashB}) state.SetSubscribedResourceNames(map[string]struct{}{"a": {}, "b": {}}) @@ -532,7 +532,7 @@ func TestLinearDeltaInitialResourcesVersionSet(t *testing.T) { b = &endpoint.ClusterLoadAssignment{ClusterName: "b", Endpoints: []*endpoint.LocalityLbEndpoints{{Priority: 10}}} // new version of b hashB = hashResource(t, b) err = c.UpdateResource("b", b) - assert.NoError(t, err) + require.NoError(t, err) checkDeltaWatchCount(t, c, 0) verifyDeltaResponse(t, w, []resourceInfo{{"b", hashB}}, nil) } @@ -542,11 +542,11 @@ func TestLinearDeltaResourceUpdate(t *testing.T) { a := &endpoint.ClusterLoadAssignment{ClusterName: "a"} hashA := hashResource(t, a) err := c.UpdateResource("a", a) - assert.NoError(t, err) + require.NoError(t, err) b := &endpoint.ClusterLoadAssignment{ClusterName: "b"} hashB := hashResource(t, b) err = c.UpdateResource("b", b) - assert.NoError(t, err) + require.NoError(t, err) // There is currently no delta watch checkVersionMapNotSet(t, c) @@ -565,12 +565,12 @@ func TestLinearDeltaResourceUpdate(t *testing.T) { mustBlockDelta(t, w) checkDeltaWatchCount(t, c, 1) - a = &endpoint.ClusterLoadAssignment{ClusterName: "a", Endpoints: []*endpoint.LocalityLbEndpoints{ //resource update + a = &endpoint.ClusterLoadAssignment{ClusterName: "a", Endpoints: []*endpoint.LocalityLbEndpoints{ // resource update {Priority: 10}, }} hashA = hashResource(t, a) err = c.UpdateResource("a", a) - assert.NoError(t, err) + require.NoError(t, err) verifyDeltaResponse(t, w, []resourceInfo{{"a", hashA}}, nil) checkVersionMapSet(t, c) } @@ -580,11 +580,11 @@ func TestLinearDeltaResourceDelete(t *testing.T) { a := &endpoint.ClusterLoadAssignment{ClusterName: "a"} hashA := hashResource(t, a) err := c.UpdateResource("a", a) - assert.NoError(t, err) + require.NoError(t, err) b := &endpoint.ClusterLoadAssignment{ClusterName: "b"} hashB := hashResource(t, b) err = c.UpdateResource("b", b) - assert.NoError(t, err) + require.NoError(t, err) state := stream.NewStreamState(false, nil) state.SetSubscribedResourceNames(map[string]struct{}{"a": {}, "b": {}}) @@ -600,7 +600,7 @@ func TestLinearDeltaResourceDelete(t *testing.T) { mustBlockDelta(t, w) checkDeltaWatchCount(t, c, 1) - a = &endpoint.ClusterLoadAssignment{ClusterName: "a", Endpoints: []*endpoint.LocalityLbEndpoints{ //resource update + a = &endpoint.ClusterLoadAssignment{ClusterName: "a", Endpoints: []*endpoint.LocalityLbEndpoints{ // resource update {Priority: 10}, }} hashA = hashResource(t, a) @@ -628,7 +628,7 @@ func TestLinearDeltaMultiResourceUpdates(t *testing.T) { b := &endpoint.ClusterLoadAssignment{ClusterName: "b"} hashB := hashResource(t, b) err := c.UpdateResources(map[string]types.Resource{"a": a, "b": b}, nil) - assert.NoError(t, err) + require.NoError(t, err) resp := <-w validateDeltaResponse(t, resp, []resourceInfo{{"a", hashA}, {"b", hashB}}, nil) checkVersionMapSet(t, c) @@ -639,16 +639,16 @@ func TestLinearDeltaMultiResourceUpdates(t *testing.T) { c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, state, w) mustBlockDelta(t, w) checkDeltaWatchCount(t, c, 1) - a = &endpoint.ClusterLoadAssignment{ClusterName: "a", Endpoints: []*endpoint.LocalityLbEndpoints{ //resource update + a = &endpoint.ClusterLoadAssignment{ClusterName: "a", Endpoints: []*endpoint.LocalityLbEndpoints{ // resource update {Priority: 10}, }} - b = &endpoint.ClusterLoadAssignment{ClusterName: "b", Endpoints: []*endpoint.LocalityLbEndpoints{ //resource update + b = &endpoint.ClusterLoadAssignment{ClusterName: "b", Endpoints: []*endpoint.LocalityLbEndpoints{ // resource update {Priority: 15}, }} hashA = hashResource(t, a) hashB = hashResource(t, b) err = c.UpdateResources(map[string]types.Resource{"a": a, "b": b}, nil) - assert.NoError(t, err) + require.NoError(t, err) resp = <-w validateDeltaResponse(t, resp, []resourceInfo{{"a", hashA}, {"b", hashB}}, nil) checkVersionMapSet(t, c) @@ -659,13 +659,13 @@ func TestLinearDeltaMultiResourceUpdates(t *testing.T) { c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, state, w) mustBlockDelta(t, w) checkDeltaWatchCount(t, c, 1) - a = &endpoint.ClusterLoadAssignment{ClusterName: "a", Endpoints: []*endpoint.LocalityLbEndpoints{ //resource update + a = &endpoint.ClusterLoadAssignment{ClusterName: "a", Endpoints: []*endpoint.LocalityLbEndpoints{ // resource update {Priority: 15}, }} d := &endpoint.ClusterLoadAssignment{ClusterName: "d", Endpoints: []*endpoint.LocalityLbEndpoints{}} // resource created, but not watched hashA = hashResource(t, a) err = c.UpdateResources(map[string]types.Resource{"a": a, "d": d}, []string{"b"}) - assert.NoError(t, err) + require.NoError(t, err) assert.Contains(t, c.resources, "d", "resource with name d not found in cache") assert.NotContains(t, c.resources, "b", "resource with name b was found in cache") resp = <-w @@ -681,7 +681,7 @@ func TestLinearDeltaMultiResourceUpdates(t *testing.T) { b = &endpoint.ClusterLoadAssignment{ClusterName: "b", Endpoints: []*endpoint.LocalityLbEndpoints{}} // recreate watched resource hashB = hashResource(t, b) err = c.UpdateResources(map[string]types.Resource{"b": b}, []string{"d"}) - assert.NoError(t, err) + require.NoError(t, err) assert.Contains(t, c.resources, "b", "resource with name b not found in cache") assert.NotContains(t, c.resources, "d", "resource with name d was found in cache") resp = <-w @@ -694,14 +694,14 @@ func TestLinearDeltaMultiResourceUpdates(t *testing.T) { createWildcardDeltaWatch(c, w) mustBlockDelta(t, w) checkDeltaWatchCount(t, c, 1) - b = &endpoint.ClusterLoadAssignment{ClusterName: "b", Endpoints: []*endpoint.LocalityLbEndpoints{ //resource update + b = &endpoint.ClusterLoadAssignment{ClusterName: "b", Endpoints: []*endpoint.LocalityLbEndpoints{ // resource update {Priority: 15}, }} d = &endpoint.ClusterLoadAssignment{ClusterName: "d", Endpoints: []*endpoint.LocalityLbEndpoints{}} // resource create hashB = hashResource(t, b) hashD := hashResource(t, d) err = c.UpdateResources(map[string]types.Resource{"b": b, "d": d}, nil) - assert.NoError(t, err) + require.NoError(t, err) verifyDeltaResponse(t, w, []resourceInfo{{"b", hashB}, {"d", hashD}}, nil) checkVersionMapSet(t, c) assert.Equal(t, 3, c.NumResources()) @@ -710,12 +710,12 @@ func TestLinearDeltaMultiResourceUpdates(t *testing.T) { createWildcardDeltaWatch(c, w) mustBlockDelta(t, w) checkDeltaWatchCount(t, c, 1) - a = &endpoint.ClusterLoadAssignment{ClusterName: "a", Endpoints: []*endpoint.LocalityLbEndpoints{ //resource update + a = &endpoint.ClusterLoadAssignment{ClusterName: "a", Endpoints: []*endpoint.LocalityLbEndpoints{ // resource update {Priority: 25}, }} hashA = hashResource(t, a) err = c.UpdateResources(map[string]types.Resource{"a": a}, []string{"d"}) - assert.NoError(t, err) + require.NoError(t, err) assert.NotContains(t, c.resources, "d", "resource with name d was found in cache") verifyDeltaResponse(t, w, []resourceInfo{{"a", hashA}}, []string{"d"}) @@ -729,11 +729,11 @@ func TestLinearMixedWatches(t *testing.T) { c := NewLinearCache(testType) a := &endpoint.ClusterLoadAssignment{ClusterName: "a"} err := c.UpdateResource("a", a) - assert.NoError(t, err) + require.NoError(t, err) b := &endpoint.ClusterLoadAssignment{ClusterName: "b"} hashB := hashResource(t, b) err = c.UpdateResource("b", b) - assert.NoError(t, err) + require.NoError(t, err) assert.Equal(t, 2, c.NumResources()) sotwState := stream.NewStreamState(false, nil) @@ -742,12 +742,12 @@ func TestLinearMixedWatches(t *testing.T) { mustBlock(t, w) checkVersionMapNotSet(t, c) - a = &endpoint.ClusterLoadAssignment{ClusterName: "a", Endpoints: []*endpoint.LocalityLbEndpoints{ //resource update + a = &endpoint.ClusterLoadAssignment{ClusterName: "a", Endpoints: []*endpoint.LocalityLbEndpoints{ // resource update {Priority: 25}, }} hashA := hashResource(t, a) err = c.UpdateResources(map[string]types.Resource{"a": a}, nil) - assert.NoError(t, err) + require.NoError(t, err) // This behavior is currently invalid for cds and lds, but due to a current limitation of linear cache sotw implementation verifyResponse(t, w, c.getVersion(), 1) checkVersionMapNotSet(t, c) @@ -767,7 +767,7 @@ func TestLinearMixedWatches(t *testing.T) { checkVersionMapSet(t, c) err = c.UpdateResources(nil, []string{"b"}) - assert.NoError(t, err) + require.NoError(t, err) checkVersionMapSet(t, c) verifyResponse(t, w, c.getVersion(), 0) diff --git a/pkg/cache/v3/resource.go b/pkg/cache/v3/resource.go index a21266b905..d4c25f2a11 100644 --- a/pkg/cache/v3/resource.go +++ b/pkg/cache/v3/resource.go @@ -176,13 +176,13 @@ func mapMerge(dst map[string]bool, src map[string]bool) { func getClusterReferences(src *cluster.Cluster, out map[resource.Type]map[string]bool) { endpoints := map[string]bool{} - switch typ := src.ClusterDiscoveryType.(type) { + switch typ := src.GetClusterDiscoveryType().(type) { case *cluster.Cluster_Type: if typ.Type == cluster.Cluster_EDS { - if src.EdsClusterConfig != nil && src.EdsClusterConfig.ServiceName != "" { - endpoints[src.EdsClusterConfig.ServiceName] = true + if src.GetEdsClusterConfig() != nil && src.GetEdsClusterConfig().GetServiceName() != "" { + endpoints[src.GetEdsClusterConfig().GetServiceName()] = true } else { - endpoints[src.Name] = true + endpoints[src.GetName()] = true } } } @@ -201,8 +201,8 @@ func getListenerReferences(src *listener.Listener, out map[resource.Type]map[str routes := map[string]bool{} // Extract route configuration names from HTTP connection manager. - for _, chain := range src.FilterChains { - for _, filter := range chain.Filters { + for _, chain := range src.GetFilterChains() { + for _, filter := range chain.GetFilters() { config := resource.GetHTTPConnectionManager(filter) if config == nil { continue @@ -215,7 +215,7 @@ func getListenerReferences(src *listener.Listener, out map[resource.Type]map[str // If the scoped route mapping is embedded, add the referenced route resource names. for _, s := range config.GetScopedRoutes().GetScopedRouteConfigurationsList().GetScopedRouteConfigurations() { - routes[s.RouteConfigurationName] = true + routes[s.GetRouteConfigurationName()] = true } } } @@ -233,7 +233,7 @@ func getScopedRouteReferences(src *route.ScopedRouteConfiguration, out map[resou routes := map[string]bool{} // For a scoped route configuration, the dependent resource is the RouteConfigurationName. - routes[src.RouteConfigurationName] = true + routes[src.GetRouteConfigurationName()] = true if len(routes) > 0 { if _, ok := out[resource.RouteType]; !ok { diff --git a/pkg/cache/v3/resource_test.go b/pkg/cache/v3/resource_test.go index 65286363f7..6927a933b6 100644 --- a/pkg/cache/v3/resource_test.go +++ b/pkg/cache/v3/resource_test.go @@ -19,6 +19,7 @@ import ( "testing" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" cluster "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3" route "github.com/envoyproxy/go-control-plane/envoy/config/route/v3" @@ -57,15 +58,15 @@ var ( ) func TestValidate(t *testing.T) { - assert.NoError(t, testEndpoint.Validate()) - assert.NoError(t, testCluster.Validate()) - assert.NoError(t, testRoute.Validate()) - assert.NoError(t, testScopedRoute.Validate()) - assert.NoError(t, testVirtualHost.Validate()) - assert.NoError(t, testListener.Validate()) - assert.NoError(t, testScopedListener.Validate()) - assert.NoError(t, testRuntime.Validate()) - assert.NoError(t, testExtensionConfig.Validate()) + require.NoError(t, testEndpoint.Validate()) + require.NoError(t, testCluster.Validate()) + require.NoError(t, testRoute.Validate()) + require.NoError(t, testScopedRoute.Validate()) + require.NoError(t, testVirtualHost.Validate()) + require.NoError(t, testListener.Validate()) + require.NoError(t, testScopedListener.Validate()) + require.NoError(t, testRuntime.Validate()) + require.NoError(t, testExtensionConfig.Validate()) invalidRoute := &route.RouteConfiguration{ Name: "test", @@ -78,7 +79,7 @@ func TestValidate(t *testing.T) { if err := invalidRoute.Validate(); err == nil { t.Error("expected an error") } - if err := invalidRoute.VirtualHosts[0].Validate(); err == nil { + if err := invalidRoute.GetVirtualHosts()[0].Validate(); err == nil { t.Error("expected an error") } } @@ -163,8 +164,10 @@ func TestGetResourceReferences(t *testing.T) { out: map[rsrc.Type]map[string]bool{rsrc.EndpointType: {clusterName: true}}, }, { - in: &cluster.Cluster{Name: clusterName, ClusterDiscoveryType: &cluster.Cluster_Type{Type: cluster.Cluster_EDS}, - EdsClusterConfig: &cluster.Cluster_EdsClusterConfig{ServiceName: "test"}}, + in: &cluster.Cluster{ + Name: clusterName, ClusterDiscoveryType: &cluster.Cluster_Type{Type: cluster.Cluster_EDS}, + EdsClusterConfig: &cluster.Cluster_EdsClusterConfig{ServiceName: "test"}, + }, out: map[rsrc.Type]map[string]bool{rsrc.EndpointType: {"test": true}}, }, { @@ -215,6 +218,7 @@ func TestGetResourceReferences(t *testing.T) { } } } + func TestGetAllResourceReferencesReturnsExpectedRefs(t *testing.T) { expected := map[rsrc.Type]map[string]bool{ rsrc.RouteType: {routeName: true, embeddedRouteName: true}, diff --git a/pkg/cache/v3/simple.go b/pkg/cache/v3/simple.go index c34eca3cff..ebf63f5b6f 100644 --- a/pkg/cache/v3/simple.go +++ b/pkg/cache/v3/simple.go @@ -193,8 +193,8 @@ func (cache *snapshotCache) sendHeartbeats(ctx context.Context, node string) { info.mu.Lock() for id, watch := range info.watches { // Respond with the current version regardless of whether the version has changed. - version := snapshot.GetVersion(watch.Request.TypeUrl) - resources := snapshot.GetResourcesAndTTL(watch.Request.TypeUrl) + version := snapshot.GetVersion(watch.Request.GetTypeUrl()) + resources := snapshot.GetResourcesAndTTL(watch.Request.GetTypeUrl()) // TODO(snowp): Construct this once per type instead of once per watch. resourcesWithTTL := map[string]types.ResourceWithTTL{} @@ -207,7 +207,7 @@ func (cache *snapshotCache) sendHeartbeats(ctx context.Context, node string) { if len(resourcesWithTTL) == 0 { continue } - cache.log.Debugf("respond open watch %d%v with heartbeat for version %q", id, watch.Request.ResourceNames, version) + cache.log.Debugf("respond open watch %d%v with heartbeat for version %q", id, watch.Request.GetResourceNames(), version) err := cache.respond(ctx, watch.Request, watch.Response, resourcesWithTTL, version, true) if err != nil { cache.log.Errorf("received error when attempting to respond to watches: %v", err) @@ -248,10 +248,10 @@ func (cache *snapshotCache) SetSnapshot(ctx context.Context, node string, snapsh func (cache *snapshotCache) respondSOTWWatches(ctx context.Context, info *statusInfo, snapshot ResourceSnapshot) error { // responder callback for SOTW watches respond := func(watch ResponseWatch, id int64) error { - version := snapshot.GetVersion(watch.Request.TypeUrl) - if version != watch.Request.VersionInfo { - cache.log.Debugf("respond open watch %d %s%v with new version %q", id, watch.Request.TypeUrl, watch.Request.ResourceNames, version) - resources := snapshot.GetResourcesAndTTL(watch.Request.TypeUrl) + version := snapshot.GetVersion(watch.Request.GetTypeUrl()) + if version != watch.Request.GetVersionInfo() { + cache.log.Debugf("respond open watch %d %s%v with new version %q", id, watch.Request.GetTypeUrl(), watch.Request.GetResourceNames(), version) + resources := snapshot.GetResourcesAndTTL(watch.Request.GetTypeUrl()) err := cache.respond(ctx, watch.Request, watch.Response, resources, version, false) if err != nil { return err @@ -386,14 +386,14 @@ func superset(names map[string]bool, resources map[string]types.ResourceWithTTL) // CreateWatch returns a watch for an xDS request. A nil function may be // returned if an error occurs. func (cache *snapshotCache) CreateWatch(request *Request, streamState stream.StreamState, value chan Response) func() { - nodeID := cache.hash.ID(request.Node) + nodeID := cache.hash.ID(request.GetNode()) cache.mu.Lock() defer cache.mu.Unlock() info, ok := cache.status[nodeID] if !ok { - info = newStatusInfo(request.Node) + info = newStatusInfo(request.GetNode()) cache.status[nodeID] = info } @@ -405,28 +405,28 @@ func (cache *snapshotCache) CreateWatch(request *Request, streamState stream.Str var version string snapshot, exists := cache.snapshots[nodeID] if exists { - version = snapshot.GetVersion(request.TypeUrl) + version = snapshot.GetVersion(request.GetTypeUrl()) } if exists { - knownResourceNames := streamState.GetKnownResourceNames(request.TypeUrl) + knownResourceNames := streamState.GetKnownResourceNames(request.GetTypeUrl()) diff := []string{} - for _, r := range request.ResourceNames { + for _, r := range request.GetResourceNames() { if _, ok := knownResourceNames[r]; !ok { diff = append(diff, r) } } cache.log.Debugf("nodeID %q requested %s%v and known %v. Diff %v", nodeID, - request.TypeUrl, request.ResourceNames, knownResourceNames, diff) + request.GetTypeUrl(), request.GetResourceNames(), knownResourceNames, diff) if len(diff) > 0 { - resources := snapshot.GetResourcesAndTTL(request.TypeUrl) + resources := snapshot.GetResourcesAndTTL(request.GetTypeUrl()) for _, name := range diff { if _, exists := resources[name]; exists { if err := cache.respond(context.Background(), request, value, resources, version, false); err != nil { - cache.log.Errorf("failed to send a response for %s%v to nodeID %q: %s", request.TypeUrl, - request.ResourceNames, nodeID, err) + cache.log.Errorf("failed to send a response for %s%v to nodeID %q: %s", request.GetTypeUrl(), + request.GetResourceNames(), nodeID, err) return nil } return func() {} @@ -436,9 +436,9 @@ func (cache *snapshotCache) CreateWatch(request *Request, streamState stream.Str } // if the requested version is up-to-date or missing a response, leave an open watch - if !exists || request.VersionInfo == version { + if !exists || request.GetVersionInfo() == version { watchID := cache.nextWatchID() - cache.log.Debugf("open watch %d for %s%v from nodeID %q, version %q", watchID, request.TypeUrl, request.ResourceNames, nodeID, request.VersionInfo) + cache.log.Debugf("open watch %d for %s%v from nodeID %q, version %q", watchID, request.GetTypeUrl(), request.GetResourceNames(), nodeID, request.GetVersionInfo()) info.mu.Lock() info.watches[watchID] = ResponseWatch{Request: request, Response: value} info.mu.Unlock() @@ -446,10 +446,10 @@ func (cache *snapshotCache) CreateWatch(request *Request, streamState stream.Str } // otherwise, the watch may be responded immediately - resources := snapshot.GetResourcesAndTTL(request.TypeUrl) + resources := snapshot.GetResourcesAndTTL(request.GetTypeUrl()) if err := cache.respond(context.Background(), request, value, resources, version, false); err != nil { - cache.log.Errorf("failed to send a response for %s%v to nodeID %q: %s", request.TypeUrl, - request.ResourceNames, nodeID, err) + cache.log.Errorf("failed to send a response for %s%v to nodeID %q: %s", request.GetTypeUrl(), + request.GetResourceNames(), nodeID, err) return nil } @@ -479,14 +479,14 @@ func (cache *snapshotCache) cancelWatch(nodeID string, watchID int64) func() { func (cache *snapshotCache) respond(ctx context.Context, request *Request, value chan Response, resources map[string]types.ResourceWithTTL, version string, heartbeat bool) error { // for ADS, the request names must match the snapshot names // if they do not, then the watch is never responded, and it is expected that envoy makes another request - if len(request.ResourceNames) != 0 && cache.ads { - if err := superset(nameSet(request.ResourceNames), resources); err != nil { - cache.log.Warnf("ADS mode: not responding to request %s%v: %v", request.TypeUrl, request.ResourceNames, err) + if len(request.GetResourceNames()) != 0 && cache.ads { + if err := superset(nameSet(request.GetResourceNames()), resources); err != nil { + cache.log.Warnf("ADS mode: not responding to request %s%v: %v", request.GetTypeUrl(), request.GetResourceNames(), err) return nil } } - cache.log.Debugf("respond %s%v version %q with version %q", request.TypeUrl, request.ResourceNames, request.VersionInfo, version) + cache.log.Debugf("respond %s%v version %q with version %q", request.GetTypeUrl(), request.GetResourceNames(), request.GetVersionInfo(), version) select { case value <- createResponse(ctx, request, resources, version, heartbeat): @@ -502,8 +502,8 @@ func createResponse(ctx context.Context, request *Request, resources map[string] // Reply only with the requested resources. Envoy may ask each resource // individually in a separate stream. It is ok to reply with the same version // on separate streams since requests do not share their response versions. - if len(request.ResourceNames) != 0 { - set := nameSet(request.ResourceNames) + if len(request.GetResourceNames()) != 0 { + set := nameSet(request.GetResourceNames()) for name, resource := range resources { if set[name] { filtered = append(filtered, resource) @@ -526,7 +526,7 @@ func createResponse(ctx context.Context, request *Request, resources map[string] // CreateDeltaWatch returns a watch for a delta xDS request which implements the Simple SnapshotCache. func (cache *snapshotCache) CreateDeltaWatch(request *DeltaRequest, state stream.StreamState, value chan DeltaResponse) func() { - nodeID := cache.hash.ID(request.Node) + nodeID := cache.hash.ID(request.GetNode()) t := request.GetTypeUrl() cache.mu.Lock() @@ -534,7 +534,7 @@ func (cache *snapshotCache) CreateDeltaWatch(request *DeltaRequest, state stream info, ok := cache.status[nodeID] if !ok { - info = newStatusInfo(request.Node) + info = newStatusInfo(request.GetNode()) cache.status[nodeID] = info } @@ -581,9 +581,9 @@ func (cache *snapshotCache) CreateDeltaWatch(request *DeltaRequest, state stream // Respond to a delta watch with the provided snapshot value. If the response is nil, there has been no state change. func (cache *snapshotCache) respondDelta(ctx context.Context, snapshot ResourceSnapshot, request *DeltaRequest, value chan DeltaResponse, state stream.StreamState) (*RawDeltaResponse, error) { resp := createDeltaResponse(ctx, request, state, resourceContainer{ - resourceMap: snapshot.GetResources(request.TypeUrl), - versionMap: snapshot.GetVersionMap(request.TypeUrl), - systemVersion: snapshot.GetVersion(request.TypeUrl), + resourceMap: snapshot.GetResources(request.GetTypeUrl()), + versionMap: snapshot.GetVersionMap(request.GetTypeUrl()), + systemVersion: snapshot.GetVersion(request.GetTypeUrl()), }) // Only send a response if there were changes @@ -592,7 +592,7 @@ func (cache *snapshotCache) respondDelta(ctx context.Context, snapshot ResourceS if len(resp.Resources) > 0 || len(resp.RemovedResources) > 0 || (state.IsWildcard() && state.IsFirst()) { if cache.log != nil { cache.log.Debugf("node: %s, sending delta response for typeURL %s with resources: %v removed resources: %v with wildcard: %t", - request.GetNode().GetId(), request.TypeUrl, GetResourceNames(resp.Resources), resp.RemovedResources, state.IsWildcard()) + request.GetNode().GetId(), request.GetTypeUrl(), GetResourceNames(resp.Resources), resp.RemovedResources, state.IsWildcard()) } select { case value <- resp: @@ -624,7 +624,7 @@ func (cache *snapshotCache) cancelDeltaWatch(nodeID string, watchID int64) func( // Fetch implements the cache fetch function. // Fetch is called on multiple streams, so responding to individual names with the same version works. func (cache *snapshotCache) Fetch(ctx context.Context, request *Request) (Response, error) { - nodeID := cache.hash.ID(request.Node) + nodeID := cache.hash.ID(request.GetNode()) cache.mu.RLock() defer cache.mu.RUnlock() @@ -632,13 +632,13 @@ func (cache *snapshotCache) Fetch(ctx context.Context, request *Request) (Respon if snapshot, exists := cache.snapshots[nodeID]; exists { // Respond only if the request version is distinct from the current snapshot state. // It might be beneficial to hold the request since Envoy will re-attempt the refresh. - version := snapshot.GetVersion(request.TypeUrl) - if request.VersionInfo == version { + version := snapshot.GetVersion(request.GetTypeUrl()) + if request.GetVersionInfo() == version { cache.log.Warnf("skip fetch: version up to date") return nil, &types.SkipFetchError{} } - resources := snapshot.GetResourcesAndTTL(request.TypeUrl) + resources := snapshot.GetResourcesAndTTL(request.GetTypeUrl()) out := createResponse(ctx, request, resources, version, false) return out, nil } diff --git a/pkg/cache/v3/simple_test.go b/pkg/cache/v3/simple_test.go index 5fb9bbc62d..eba4cf96d9 100644 --- a/pkg/cache/v3/simple_test.go +++ b/pkg/cache/v3/simple_test.go @@ -47,7 +47,7 @@ const ( func (group) ID(node *core.Node) string { if node != nil { - return node.Id + return node.GetId() } return key } @@ -172,7 +172,7 @@ func TestSnapshotCacheWithTTL(t *testing.T) { updatesByType[typ]++ - streamState.SetKnownResourceNamesAsList(typ, out.GetRequest().ResourceNames) + streamState.SetKnownResourceNamesAsList(typ, out.GetRequest().GetResourceNames()) case <-end: cancel() return @@ -403,7 +403,7 @@ func TestSnapshotCacheWatchTimeout(t *testing.T) { defer cancel() err := c.SetSnapshot(ctx, key, fixture.snapshot()) - assert.EqualError(t, err, context.Canceled.Error()) + require.EqualError(t, err, context.Canceled.Error()) // Now reset the snapshot with a consuming channel. This verifies that if setting the snapshot fails, // we can retry by setting the same snapshot. In other words, we keep the watch open even if we failed @@ -416,13 +416,13 @@ func TestSnapshotCacheWatchTimeout(t *testing.T) { }() err = c.SetSnapshot(context.WithValue(context.Background(), testKey{}, "bar"), key, fixture.snapshot()) - assert.NoError(t, err) + require.NoError(t, err) // The channel should get closed due to the watch trigger. select { case response := <-watchTriggeredCh: // Verify that we pass the context through. - assert.Equal(t, response.GetContext().Value(testKey{}), "bar") + assert.Equal(t, "bar", response.GetContext().Value(testKey{})) case <-time.After(time.Second): t.Fatalf("timed out") } @@ -471,8 +471,10 @@ func TestSnapshotCreateWatchWithResourcePreviouslyNotRequested(t *testing.T) { go func() { state := stream.NewStreamState(false, map[string]string{}) state.SetKnownResourceNames(rsrc.EndpointType, map[string]struct{}{clusterName: {}}) - c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: rsrc.EndpointType, VersionInfo: fixture.version, - ResourceNames: []string{clusterName, clusterName2}}, state, watch) + c.CreateWatch(&discovery.DiscoveryRequest{ + TypeUrl: rsrc.EndpointType, VersionInfo: fixture.version, + ResourceNames: []string{clusterName, clusterName2}, + }, state, watch) }() select { @@ -490,8 +492,10 @@ func TestSnapshotCreateWatchWithResourcePreviouslyNotRequested(t *testing.T) { // Repeat request for with same version and make sure a watch is created state := stream.NewStreamState(false, map[string]string{}) state.SetKnownResourceNames(rsrc.EndpointType, map[string]struct{}{clusterName: {}, clusterName2: {}}) - if cancel := c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: rsrc.EndpointType, VersionInfo: fixture.version, - ResourceNames: []string{clusterName, clusterName2}}, state, watch); cancel == nil { + if cancel := c.CreateWatch(&discovery.DiscoveryRequest{ + TypeUrl: rsrc.EndpointType, VersionInfo: fixture.version, + ResourceNames: []string{clusterName, clusterName2}, + }, state, watch); cancel == nil { t.Fatal("Should create a watch") } else { cancel() @@ -591,8 +595,8 @@ func TestSnapshotSingleResourceFetch(t *testing.T) { resp, err := c.Fetch(context.Background(), &discovery.DiscoveryRequest{ TypeUrl: durationTypeURL, - ResourceNames: []string{"one-second"}}, - ) + ResourceNames: []string{"one-second"}, + }) require.NoError(t, err) vers, err := resp.GetVersion() @@ -602,7 +606,7 @@ func TestSnapshotSingleResourceFetch(t *testing.T) { discoveryResponse, err := resp.GetDiscoveryResponse() require.NoError(t, err) assert.Equal(t, durationTypeURL, discoveryResponse.GetTypeUrl()) - require.Equal(t, 1, len(discoveryResponse.GetResources())) + require.Len(t, discoveryResponse.GetResources(), 1) assert.Equal(t, "", cmp.Diff( unwrapResource(discoveryResponse.GetResources()[0]).GetResource(), anyDuration(time.Second), diff --git a/pkg/cache/v3/snapshot_test.go b/pkg/cache/v3/snapshot_test.go index 07a87a9050..c32f492076 100644 --- a/pkg/cache/v3/snapshot_test.go +++ b/pkg/cache/v3/snapshot_test.go @@ -54,8 +54,8 @@ func TestClusterWithMissingEndpointIsInconsistent(t *testing.T) { func TestListenerWithMissingRoutesIsInconsistent(t *testing.T) { if snap, _ := cache.NewSnapshot(fixture.version, map[rsrc.Type][]types.Resource{ - rsrc.ListenerType: {testListener}}, - ); snap.Consistent() == nil { + rsrc.ListenerType: {testListener}, + }); snap.Consistent() == nil { t.Errorf("got consistent snapshot %#v", snap) } } @@ -110,7 +110,7 @@ func TestScopedRouteListenerWithScopedRouteAndRouteIsConsistent(t *testing.T) { }, }) - assert.NoError(t, snap.Consistent(), "got inconsistent snapshot %#v", snap) + require.NoError(t, snap.Consistent(), "got inconsistent snapshot %#v", snap) } func TestScopedRouteListenerWithInlineScopedRouteAndRouteIsConsistent(t *testing.T) { @@ -124,7 +124,7 @@ func TestScopedRouteListenerWithInlineScopedRouteAndRouteIsConsistent(t *testing }) require.NoError(t, err) - assert.NoError(t, snap.Consistent()) + require.NoError(t, snap.Consistent()) } func TestScopedRouteListenerWithInlineScopedRouteAndNoRouteIsInconsistent(t *testing.T) { @@ -138,7 +138,7 @@ func TestScopedRouteListenerWithInlineScopedRouteAndNoRouteIsInconsistent(t *tes }) require.NoError(t, err) - assert.Error(t, snap.Consistent()) + require.Error(t, snap.Consistent()) } func TestMultipleListenersWithScopedRouteAndRouteIsConsistent(t *testing.T) { @@ -188,6 +188,6 @@ func TestNewSnapshotBadType(t *testing.T) { }) // Should receive an error from an unknown type - assert.Error(t, err) + require.Error(t, err) assert.Nil(t, snap) } diff --git a/pkg/cache/v3/status.go b/pkg/cache/v3/status.go index e50f85beff..dca93e02ff 100644 --- a/pkg/cache/v3/status.go +++ b/pkg/cache/v3/status.go @@ -37,7 +37,7 @@ func (IDHash) ID(node *core.Node) string { if node == nil { return "" } - return node.Id + return node.GetId() } var _ NodeHash = IDHash{} @@ -169,7 +169,7 @@ func (info *statusInfo) orderResponseWatches() { for id, watch := range info.watches { info.orderedWatches[index] = key{ ID: id, - TypeURL: watch.Request.TypeUrl, + TypeURL: watch.Request.GetTypeUrl(), } index++ } @@ -188,7 +188,7 @@ func (info *statusInfo) orderResponseDeltaWatches() { for id, deltaWatch := range info.deltaWatches { info.orderedDeltaWatches[index] = key{ ID: id, - TypeURL: deltaWatch.Request.TypeUrl, + TypeURL: deltaWatch.Request.GetTypeUrl(), } index++ } diff --git a/pkg/cache/v3/status_test.go b/pkg/cache/v3/status_test.go index 0299483d5f..def8346118 100644 --- a/pkg/cache/v3/status_test.go +++ b/pkg/cache/v3/status_test.go @@ -24,7 +24,7 @@ import ( func TestIDHash(t *testing.T) { node := &core.Node{Id: "test"} if got := (IDHash{}).ID(node); got != "test" { - t.Errorf("IDHash.ID(%v) => got %s, want %s", node, got, node.Id) + t.Errorf("IDHash.ID(%v) => got %s, want %s", node, got, node.GetId()) } if got := (IDHash{}).ID(nil); got != "" { t.Errorf("IDHash.ID(nil) => got %s, want empty", got) diff --git a/pkg/client/sotw/v3/client_test.go b/pkg/client/sotw/v3/client_test.go index 198f2d91cd..bf942d7fbf 100644 --- a/pkg/client/sotw/v3/client_test.go +++ b/pkg/client/sotw/v3/client_test.go @@ -22,6 +22,7 @@ import ( "github.com/envoyproxy/go-control-plane/pkg/server/v3" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func TestFetch(t *testing.T) { @@ -31,16 +32,16 @@ func TestFetch(t *testing.T) { snapCache := cache.NewSnapshotCache(true, cache.IDHash{}, nil) go func() { err := startAdsServer(ctx, snapCache) - assert.NoError(t, err) + require.NoError(t, err) }() conn, err := grpc.Dial(":18001", grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithBlock()) - assert.NoError(t, err) + require.NoError(t, err) defer conn.Close() c := client.NewADSClient(ctx, &core.Node{Id: "node_1"}, resource.ClusterType) err = c.InitConnect(conn) - assert.NoError(t, err) + require.NoError(t, err) t.Run("Test initial fetch", testInitialFetch(ctx, snapCache, c)) t.Run("Test next fetch", testNextFetch(ctx, snapCache, c)) @@ -54,17 +55,17 @@ func testInitialFetch(ctx context.Context, snapCache cache.SnapshotCache, c clie go func() { // watch for configs resp, err := c.Fetch() - assert.NoError(t, err) - assert.Equal(t, 3, len(resp.Resources)) + require.NoError(t, err) + assert.Len(t, resp.Resources, 3) for _, r := range resp.Resources { cluster := &clusterv3.Cluster{} err := anypb.UnmarshalTo(r, cluster, proto.UnmarshalOptions{}) - assert.NoError(t, err) - assert.Contains(t, []string{"cluster_1", "cluster_2", "cluster_3"}, cluster.Name) + require.NoError(t, err) + assert.Contains(t, []string{"cluster_1", "cluster_2", "cluster_3"}, cluster.GetName()) } err = c.Ack() - assert.NoError(t, err) + require.NoError(t, err) wg.Done() }() @@ -75,13 +76,13 @@ func testInitialFetch(ctx context.Context, snapCache cache.SnapshotCache, c clie &clusterv3.Cluster{Name: "cluster_3"}, }, }) - assert.NoError(t, err) + require.NoError(t, err) err = snapshot.Consistent() - assert.NoError(t, err) + require.NoError(t, err) err = snapCache.SetSnapshot(ctx, "node_1", snapshot) wg.Wait() - assert.NoError(t, err) + require.NoError(t, err) } } @@ -93,17 +94,17 @@ func testNextFetch(ctx context.Context, snapCache cache.SnapshotCache, c client. go func() { // watch for configs resp, err := c.Fetch() - assert.NoError(t, err) - assert.Equal(t, 2, len(resp.Resources)) + require.NoError(t, err) + assert.Len(t, resp.Resources, 2) for _, r := range resp.Resources { cluster := &clusterv3.Cluster{} err = anypb.UnmarshalTo(r, cluster, proto.UnmarshalOptions{}) - assert.NoError(t, err) - assert.Contains(t, []string{"cluster_2", "cluster_4"}, cluster.Name) + require.NoError(t, err) + assert.Contains(t, []string{"cluster_2", "cluster_4"}, cluster.GetName()) } err = c.Ack() - assert.NoError(t, err) + require.NoError(t, err) wg.Done() }() @@ -113,12 +114,12 @@ func testNextFetch(ctx context.Context, snapCache cache.SnapshotCache, c client. &clusterv3.Cluster{Name: "cluster_4"}, }, }) - assert.NoError(t, err) + require.NoError(t, err) err = snapshot.Consistent() - assert.NoError(t, err) + require.NoError(t, err) err = snapCache.SetSnapshot(ctx, "node_1", snapshot) - assert.NoError(t, err) + require.NoError(t, err) wg.Wait() } } diff --git a/pkg/conversion/struct_test.go b/pkg/conversion/struct_test.go index 4e2bbbbc02..0c6fef869b 100644 --- a/pkg/conversion/struct_test.go +++ b/pkg/conversion/struct_test.go @@ -43,8 +43,8 @@ func TestConversion(t *testing.T) { }, }}}, } - if !cmp.Equal(st.Fields, pbst, cmp.Comparer(proto.Equal)) { - t.Errorf("MessageToStruct(%v) => got %v, want %v", pb, st.Fields, pbst) + if !cmp.Equal(st.GetFields(), pbst, cmp.Comparer(proto.Equal)) { + t.Errorf("MessageToStruct(%v) => got %v, want %v", pb, st.GetFields(), pbst) } out := &discovery.DiscoveryRequest{} diff --git a/pkg/integration/ttl_integration_test.go b/pkg/integration/ttl_integration_test.go index b81fa769f4..b2bdb318c5 100644 --- a/pkg/integration/ttl_integration_test.go +++ b/pkg/integration/ttl_integration_test.go @@ -7,6 +7,7 @@ import ( "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" "google.golang.org/protobuf/proto" @@ -41,19 +42,19 @@ func TestTTLResponse(t *testing.T) { endpointservice.RegisterEndpointDiscoveryServiceServer(grpcServer, server) l, err := net.Listen("tcp", ":9999") // nolint:gosec - assert.NoError(t, err) + require.NoError(t, err) go func() { - assert.NoError(t, grpcServer.Serve(l)) + require.NoError(t, grpcServer.Serve(l)) }() defer grpcServer.Stop() conn, err := grpc.Dial(":9999", grpc.WithTransportCredentials(insecure.NewCredentials())) - assert.NoError(t, err) + require.NoError(t, err) client := endpointservice.NewEndpointDiscoveryServiceClient(conn) sclient, err := client.StreamEndpoints(ctx) - assert.NoError(t, err) + require.NoError(t, err) err = sclient.Send(&envoy_service_discovery_v3.DiscoveryRequest{ Node: &envoy_config_core_v3.Node{ @@ -62,7 +63,7 @@ func TestTTLResponse(t *testing.T) { ResourceNames: []string{"resource"}, TypeUrl: resource.EndpointType, }) - assert.NoError(t, err) + require.NoError(t, err) oneSecond := time.Second cla := &envoy_config_endpoint_v3.ClusterLoadAssignment{ClusterName: "resource"} @@ -74,7 +75,7 @@ func TestTTLResponse(t *testing.T) { }) err = snapshotCache.SetSnapshot(context.Background(), "test", snap) - assert.NoError(t, err) + require.NoError(t, err) timeout := time.NewTimer(5 * time.Second) awaitResponse := func() *envoy_service_discovery_v3.DiscoveryResponse { @@ -83,7 +84,7 @@ func TestTTLResponse(t *testing.T) { go func() { r, err := sclient.Recv() - assert.NoError(t, err) + require.NoError(t, err) doneCh <- r }() @@ -107,9 +108,9 @@ func TestTTLResponse(t *testing.T) { ResourceNames: []string{"resource"}, TypeUrl: resource.EndpointType, VersionInfo: "1", - ResponseNonce: response.Nonce, + ResponseNonce: response.GetNonce(), }) - assert.NoError(t, err) + require.NoError(t, err) response = awaitResponse() isHeartbeatResponseWithTTL(t, response) @@ -118,25 +119,25 @@ func TestTTLResponse(t *testing.T) { func isFullResponseWithTTL(t *testing.T, response *envoy_service_discovery_v3.DiscoveryResponse) { t.Helper() - assert.Len(t, response.Resources, 1) - r := response.Resources[0] + require.Len(t, response.GetResources(), 1) + r := response.GetResources()[0] resource := &envoy_service_discovery_v3.Resource{} err := anypb.UnmarshalTo(r, resource, proto.UnmarshalOptions{}) - assert.NoError(t, err) + require.NoError(t, err) - assert.NotNil(t, resource.Ttl) - assert.NotNil(t, resource.Resource) + assert.NotNil(t, resource.GetTtl()) + assert.NotNil(t, resource.GetResource()) } func isHeartbeatResponseWithTTL(t *testing.T, response *envoy_service_discovery_v3.DiscoveryResponse) { t.Helper() - assert.Len(t, response.Resources, 1) - r := response.Resources[0] + require.Len(t, response.GetResources(), 1) + r := response.GetResources()[0] resource := &envoy_service_discovery_v3.Resource{} err := anypb.UnmarshalTo(r, resource, proto.UnmarshalOptions{}) - assert.NoError(t, err) + require.NoError(t, err) - assert.NotNil(t, resource.Ttl) - assert.Nil(t, resource.Resource) + assert.NotNil(t, resource.GetTtl()) + assert.Nil(t, resource.GetResource()) } diff --git a/pkg/log/log_test.go b/pkg/log/log_test.go index 636e2b9088..7b7e25190c 100644 --- a/pkg/log/log_test.go +++ b/pkg/log/log_test.go @@ -53,10 +53,10 @@ func TestLoggerFuncs(t *testing.T) { xdsLogger.Warnf("warn") xdsLogger.Errorf("err") - assert.Equal(t, debug, 1) - assert.Equal(t, info, 1) - assert.Equal(t, warn, 1) - assert.Equal(t, err, 1) + assert.Equal(t, 1, debug) + assert.Equal(t, 1, info) + assert.Equal(t, 1, warn) + assert.Equal(t, 1, err) } func TestNilLoggerFuncs(_ *testing.T) { diff --git a/pkg/server/delta/v3/server.go b/pkg/server/delta/v3/server.go index 74f13e3505..d4828b64c3 100644 --- a/pkg/server/delta/v3/server.go +++ b/pkg/server/delta/v3/server.go @@ -74,7 +74,7 @@ func (s *server) processDelta(str stream.DeltaStream, reqCh <-chan *discovery.De // a collection of stack allocated watches per request type watches := newWatches() - var node = &core.Node{} + node := &core.Node{} defer func() { watches.Cancel() @@ -100,7 +100,7 @@ func (s *server) processDelta(str stream.DeltaStream, reqCh <-chan *discovery.De s.callbacks.OnStreamDeltaResponse(streamID, resp.GetDeltaRequest(), response) } - return response.Nonce, str.Send(response) + return response.GetNonce(), str.Send(response) } // process a single delta response @@ -184,18 +184,18 @@ func (s *server) processDelta(str stream.DeltaStream, reqCh <-chan *discovery.De // The node information might only be set on the first incoming delta discovery request, so store it here so we can // reset it on subsequent requests that omit it. - if req.Node != nil { - node = req.Node + if req.GetNode() != nil { + node = req.GetNode() } else { req.Node = node } // type URL is required for ADS but is implicit for any other xDS stream if defaultTypeURL == resource.AnyType { - if req.TypeUrl == "" { + if req.GetTypeUrl() == "" { return status.Errorf(codes.InvalidArgument, "type URL is required for ADS") } - } else if req.TypeUrl == "" { + } else if req.GetTypeUrl() == "" { req.TypeUrl = defaultTypeURL } diff --git a/pkg/server/sotw/v3/ads.go b/pkg/server/sotw/v3/ads.go index 1efb97ba4f..bbb6dd4b20 100644 --- a/pkg/server/sotw/v3/ads.go +++ b/pkg/server/sotw/v3/ads.go @@ -27,7 +27,7 @@ func (s *server) processADS(sw *streamWrapper, reqCh chan *discovery.DiscoveryRe return err } - sw.watches.responders[resp.GetRequest().TypeUrl].nonce = nonce + sw.watches.responders[resp.GetRequest().GetTypeUrl()].nonce = nonce return nil } @@ -43,7 +43,7 @@ func (s *server) processADS(sw *streamWrapper, reqCh chan *discovery.DiscoveryRe select { // We watch the multiplexed ADS channel for incoming responses. case res := <-sw.watches.responders[resource.AnyType].response: - if res.GetRequest().TypeUrl != typeURL { + if res.GetRequest().GetTypeUrl() != typeURL { if err := process(res); err != nil { return err } @@ -79,8 +79,8 @@ func (s *server) processADS(sw *streamWrapper, reqCh chan *discovery.DiscoveryRe } // Only first request is guaranteed to hold node info so if it's missing, reassign. - if req.Node != nil { - sw.node = req.Node + if req.GetNode() != nil { + sw.node = req.GetNode() } else { req.Node = sw.node } @@ -90,7 +90,7 @@ func (s *server) processADS(sw *streamWrapper, reqCh chan *discovery.DiscoveryRe // type URL is required for ADS but is implicit for xDS if defaultTypeURL == resource.AnyType { - if req.TypeUrl == "" { + if req.GetTypeUrl() == "" { return status.Errorf(codes.InvalidArgument, "type URL is required for ADS") } } @@ -101,10 +101,10 @@ func (s *server) processADS(sw *streamWrapper, reqCh chan *discovery.DiscoveryRe } } - if lastResponse, ok := sw.lastDiscoveryResponses[req.TypeUrl]; ok { + if lastResponse, ok := sw.lastDiscoveryResponses[req.GetTypeUrl()]; ok { if lastResponse.nonce == "" || lastResponse.nonce == nonce { // Let's record Resource names that a client has received. - sw.streamState.SetKnownResourceNames(req.TypeUrl, lastResponse.resources) + sw.streamState.SetKnownResourceNames(req.GetTypeUrl(), lastResponse.resources) } } diff --git a/pkg/server/sotw/v3/server.go b/pkg/server/sotw/v3/server.go index 0813a655c7..f5be0c57a9 100644 --- a/pkg/server/sotw/v3/server.go +++ b/pkg/server/sotw/v3/server.go @@ -111,20 +111,20 @@ func (s *streamWrapper) send(resp cache.Response) (string, error) { out.Nonce = strconv.FormatInt(atomic.AddInt64(&s.nonce, 1), 10) lastResponse := lastDiscoveryResponse{ - nonce: out.Nonce, + nonce: out.GetNonce(), resources: make(map[string]struct{}), } - for _, r := range resp.GetRequest().ResourceNames { + for _, r := range resp.GetRequest().GetResourceNames() { lastResponse.resources[r] = struct{}{} } - s.lastDiscoveryResponses[resp.GetRequest().TypeUrl] = lastResponse + s.lastDiscoveryResponses[resp.GetRequest().GetTypeUrl()] = lastResponse // Register with the callbacks provided that we are sending the response. if s.callbacks != nil { s.callbacks.OnStreamResponse(resp.GetContext(), s.ID, resp.GetRequest(), out) } - return out.Nonce, s.stream.Send(out) + return out.GetNonce(), s.stream.Send(out) } // Shutdown closes all open watches, and notifies API consumers the stream has closed. diff --git a/pkg/server/sotw/v3/xds.go b/pkg/server/sotw/v3/xds.go index 145fba9c0f..3b24dec409 100644 --- a/pkg/server/sotw/v3/xds.go +++ b/pkg/server/sotw/v3/xds.go @@ -70,8 +70,8 @@ func (s *server) process(str stream.Stream, reqCh chan *discovery.DiscoveryReque } // Only first request is guaranteed to hold node info so if it's missing, reassign. - if req.Node != nil { - sw.node = req.Node + if req.GetNode() != nil { + sw.node = req.GetNode() } else { req.Node = sw.node } @@ -81,7 +81,7 @@ func (s *server) process(str stream.Stream, reqCh chan *discovery.DiscoveryReque // type URL is required for ADS but is implicit for xDS if defaultTypeURL == resource.AnyType { - if req.TypeUrl == "" { + if req.GetTypeUrl() == "" { return status.Errorf(codes.InvalidArgument, "type URL is required for ADS") } @@ -106,7 +106,7 @@ func (s *server) process(str stream.Stream, reqCh chan *discovery.DiscoveryReque // on successful completion. return s.processADS(&sw, reqCh, defaultTypeURL) } - } else if req.TypeUrl == "" { + } else if req.GetTypeUrl() == "" { req.TypeUrl = defaultTypeURL } @@ -116,10 +116,10 @@ func (s *server) process(str stream.Stream, reqCh chan *discovery.DiscoveryReque } } - if lastResponse, ok := sw.lastDiscoveryResponses[req.TypeUrl]; ok { + if lastResponse, ok := sw.lastDiscoveryResponses[req.GetTypeUrl()]; ok { if lastResponse.nonce == "" || lastResponse.nonce == nonce { // Let's record Resource names that a client has received. - sw.streamState.SetKnownResourceNames(req.TypeUrl, lastResponse.resources) + sw.streamState.SetKnownResourceNames(req.GetTypeUrl(), lastResponse.resources) } } @@ -160,7 +160,7 @@ func (s *server) process(str stream.Stream, reqCh chan *discovery.DiscoveryReque return err } - sw.watches.responders[res.GetRequest().TypeUrl].nonce = nonce + sw.watches.responders[res.GetRequest().GetTypeUrl()].nonce = nonce } } } diff --git a/pkg/server/v3/delta_test.go b/pkg/server/v3/delta_test.go index 870b0a85fd..aaefc4ca11 100644 --- a/pkg/server/v3/delta_test.go +++ b/pkg/server/v3/delta_test.go @@ -10,6 +10,7 @@ import ( "google.golang.org/grpc" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" "github.com/envoyproxy/go-control-plane/pkg/cache/types" @@ -21,10 +22,10 @@ import ( ) func (config *mockConfigWatcher) CreateDeltaWatch(req *discovery.DeltaDiscoveryRequest, state stream.StreamState, out chan cache.DeltaResponse) func() { - config.deltaCounts[req.TypeUrl] = config.deltaCounts[req.TypeUrl] + 1 + config.deltaCounts[req.GetTypeUrl()] = config.deltaCounts[req.GetTypeUrl()] + 1 // This is duplicated from pkg/cache/v3/delta.go as private there - resourceMap := config.deltaResources[req.TypeUrl] + resourceMap := config.deltaResources[req.GetTypeUrl()] versionMap := map[string]string{} for name, resource := range resourceMap { marshaledResource, _ := cache.MarshalResource(resource) @@ -111,21 +112,21 @@ func (stream *mockDeltaStream) Context() context.Context { func (stream *mockDeltaStream) Send(resp *discovery.DeltaDiscoveryResponse) error { // Check that nonce is incremented by one stream.nonce = stream.nonce + 1 - if resp.Nonce != fmt.Sprintf("%d", stream.nonce) { - stream.t.Errorf("Nonce => got %q, want %d", resp.Nonce, stream.nonce) + if resp.GetNonce() != fmt.Sprintf("%d", stream.nonce) { + stream.t.Errorf("Nonce => got %q, want %d", resp.GetNonce(), stream.nonce) } // Check that resources are non-empty - if len(resp.Resources) == 0 { + if len(resp.GetResources()) == 0 { stream.t.Error("Resources => got none, want non-empty") } - if resp.TypeUrl == "" { + if resp.GetTypeUrl() == "" { stream.t.Error("TypeUrl => got none, want non-empty") } // Check that the per resource TypeURL is correctly set. - for _, res := range resp.Resources { - if res.Resource.TypeUrl != resp.TypeUrl { - stream.t.Errorf("TypeUrl => got %q, want %q", res.Resource.TypeUrl, resp.TypeUrl) + for _, res := range resp.GetResources() { + if res.GetResource().GetTypeUrl() != resp.GetTypeUrl() { + stream.t.Errorf("TypeUrl => got %q, want %q", res.GetResource().GetTypeUrl(), resp.GetTypeUrl()) } } @@ -159,29 +160,29 @@ func makeDeltaResources() map[string]map[string]types.Resource { endpoint.GetClusterName(): endpoint, }, rsrc.ClusterType: { - cluster.Name: cluster, + cluster.GetName(): cluster, }, rsrc.RouteType: { - route.Name: route, + route.GetName(): route, }, rsrc.ScopedRouteType: { - scopedRoute.Name: scopedRoute, + scopedRoute.GetName(): scopedRoute, }, rsrc.VirtualHostType: { - virtualHost.Name: virtualHost, + virtualHost.GetName(): virtualHost, }, rsrc.ListenerType: { - httpListener.Name: httpListener, - httpScopedListener.Name: httpScopedListener, + httpListener.GetName(): httpListener, + httpScopedListener.GetName(): httpScopedListener, }, rsrc.SecretType: { - secret.Name: secret, + secret.GetName(): secret, }, rsrc.RuntimeType: { - runtime.Name: runtime, + runtime.GetName(): runtime, }, rsrc.ExtensionConfigType: { - extensionConfig.Name: extensionConfig, + extensionConfig.GetName(): extensionConfig, }, // Pass-through type (types without explicit handling) opaqueType: { @@ -231,7 +232,7 @@ func TestDeltaResponseHandlersWildcard(t *testing.T) { go func() { err := process(typ, resp, s) - assert.NoError(t, err) + require.NoError(t, err) }() select { @@ -264,7 +265,7 @@ func TestDeltaResponseHandlers(t *testing.T) { go func() { err := process(typ, resp, s) - assert.NoError(t, err) + require.NoError(t, err) }() select { @@ -297,7 +298,7 @@ func TestSendDeltaError(t *testing.T) { // check that response fails since we expect an error to come through err := s.DeltaAggregatedResources(resp) - assert.Error(t, err) + require.Error(t, err) close(resp.recv) }) @@ -350,7 +351,7 @@ func TestDeltaAggregatedHandlers(t *testing.T) { s := server.NewServer(context.Background(), config, server.CallbackFuncs{}) go func() { err := s.DeltaAggregatedResources(resp) - assert.NoError(t, err) + require.NoError(t, err) }() count := 0 @@ -369,7 +370,8 @@ func TestDeltaAggregatedHandlers(t *testing.T) { rsrc.ScopedRouteType: 1, rsrc.VirtualHostType: 1, rsrc.ListenerType: 1, - rsrc.SecretType: 1}, + rsrc.SecretType: 1, + }, config.deltaCounts, ) return @@ -474,14 +476,14 @@ func TestDeltaWildcardSubscriptions(t *testing.T) { t.Helper() select { case response := <-replies: - assert.Equal(t, rsrc.EndpointType, response.TypeUrl) - if assert.Equal(t, len(expectedResources), len(response.Resources)) { + assert.Equal(t, rsrc.EndpointType, response.GetTypeUrl()) + if assert.Equal(t, len(expectedResources), len(response.GetResources())) { var names []string - for _, resource := range response.Resources { - names = append(names, resource.Name) + for _, resource := range response.GetResources() { + names = append(names, resource.GetName()) } assert.ElementsMatch(t, names, expectedResources) - assert.ElementsMatch(t, response.RemovedResources, expectedRemovedResources) + assert.ElementsMatch(t, response.GetRemovedResources(), expectedRemovedResources) } case <-time.After(1 * time.Second): t.Fatalf("got no response") @@ -501,7 +503,7 @@ func TestDeltaWildcardSubscriptions(t *testing.T) { s := server.NewServer(context.Background(), config, server.CallbackFuncs{}) go func() { err := s.DeltaAggregatedResources(resp) - assert.NoError(t, err) + require.NoError(t, err) }() resp.recv <- &discovery.DeltaDiscoveryRequest{ @@ -530,7 +532,6 @@ func TestDeltaWildcardSubscriptions(t *testing.T) { ResourceNamesUnsubscribe: []string{"*"}, } validateResponse(t, resp.sent, []string{"endpoints0"}, nil) - }) t.Run("* subscription/unsubscription support", func(t *testing.T) { @@ -539,7 +540,7 @@ func TestDeltaWildcardSubscriptions(t *testing.T) { s := server.NewServer(context.Background(), config, server.CallbackFuncs{}) go func() { err := s.DeltaAggregatedResources(resp) - assert.NoError(t, err) + require.NoError(t, err) }() updateResources(1234) @@ -584,7 +585,7 @@ func TestDeltaWildcardSubscriptions(t *testing.T) { s := server.NewServer(context.Background(), config, server.CallbackFuncs{}) go func() { err := s.DeltaAggregatedResources(resp) - assert.NoError(t, err) + require.NoError(t, err) }() updateResources(1234) @@ -614,5 +615,4 @@ func TestDeltaWildcardSubscriptions(t *testing.T) { } validateResponse(t, resp.sent, []string{"endpoints2"}, []string{"endpoints4"}) }) - } diff --git a/pkg/server/v3/server_test.go b/pkg/server/v3/server_test.go index 4f281cf15c..b8c4c2ddff 100644 --- a/pkg/server/v3/server_test.go +++ b/pkg/server/v3/server_test.go @@ -26,6 +26,7 @@ import ( "google.golang.org/grpc" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" @@ -50,10 +51,10 @@ type mockConfigWatcher struct { } func (config *mockConfigWatcher) CreateWatch(req *discovery.DiscoveryRequest, _ stream.StreamState, out chan cache.Response) func() { - config.counts[req.TypeUrl] = config.counts[req.TypeUrl] + 1 - if len(config.responses[req.TypeUrl]) > 0 { - out <- config.responses[req.TypeUrl][0] - config.responses[req.TypeUrl] = config.responses[req.TypeUrl][1:] + config.counts[req.GetTypeUrl()] = config.counts[req.GetTypeUrl()] + 1 + if len(config.responses[req.GetTypeUrl()]) > 0 { + out <- config.responses[req.GetTypeUrl()][0] + config.responses[req.GetTypeUrl()] = config.responses[req.GetTypeUrl()][1:] } else { config.watches++ return func() { @@ -64,9 +65,9 @@ func (config *mockConfigWatcher) CreateWatch(req *discovery.DiscoveryRequest, _ } func (config *mockConfigWatcher) Fetch(_ context.Context, req *discovery.DiscoveryRequest) (cache.Response, error) { - if len(config.responses[req.TypeUrl]) > 0 { - out := config.responses[req.TypeUrl][0] - config.responses[req.TypeUrl] = config.responses[req.TypeUrl][1:] + if len(config.responses[req.GetTypeUrl()]) > 0 { + out := config.responses[req.GetTypeUrl()][0] + config.responses[req.GetTypeUrl()] = config.responses[req.GetTypeUrl()][1:] return out, nil } return nil, errors.New("missing") @@ -97,16 +98,16 @@ func (stream *mockStream) Context() context.Context { func (stream *mockStream) Send(resp *discovery.DiscoveryResponse) error { // check that nonce is monotonically incrementing stream.nonce = stream.nonce + 1 - assert.Equal(stream.t, resp.Nonce, fmt.Sprintf("%d", stream.nonce)) + assert.Equal(stream.t, resp.GetNonce(), fmt.Sprintf("%d", stream.nonce)) // check that version is set - assert.NotEmpty(stream.t, resp.VersionInfo) + assert.NotEmpty(stream.t, resp.GetVersionInfo()) // check resources are non-empty - assert.NotEmpty(stream.t, resp.Resources) + assert.NotEmpty(stream.t, resp.GetResources()) // check that type URL matches in resources - assert.NotEmpty(stream.t, resp.TypeUrl) + assert.NotEmpty(stream.t, resp.GetTypeUrl()) - for _, res := range resp.Resources { - assert.Equal(stream.t, res.TypeUrl, resp.TypeUrl) + for _, res := range resp.GetResources() { + assert.Equal(stream.t, res.GetTypeUrl(), resp.GetTypeUrl()) } stream.sent <- resp @@ -340,7 +341,7 @@ func TestResponseHandlers(t *testing.T) { case opaqueType: err = s.StreamAggregatedResources(resp) } - assert.NoError(t, err) + require.NoError(t, err) close(done) }(typ) @@ -391,91 +392,91 @@ func TestFetch(t *testing.T) { s := server.NewServer(context.Background(), config, cb) out, err := s.FetchEndpoints(context.Background(), &discovery.DiscoveryRequest{Node: node}) assert.NotNil(t, out) - assert.NoError(t, err) + require.NoError(t, err) out, err = s.FetchClusters(context.Background(), &discovery.DiscoveryRequest{Node: node}) assert.NotNil(t, out) - assert.NoError(t, err) + require.NoError(t, err) out, err = s.FetchRoutes(context.Background(), &discovery.DiscoveryRequest{Node: node}) assert.NotNil(t, out) - assert.NoError(t, err) + require.NoError(t, err) out, err = s.FetchListeners(context.Background(), &discovery.DiscoveryRequest{Node: node}) assert.NotNil(t, out) - assert.NoError(t, err) + require.NoError(t, err) out, err = s.FetchSecrets(context.Background(), &discovery.DiscoveryRequest{Node: node}) assert.NotNil(t, out) - assert.NoError(t, err) + require.NoError(t, err) out, err = s.FetchRuntime(context.Background(), &discovery.DiscoveryRequest{Node: node}) assert.NotNil(t, out) - assert.NoError(t, err) + require.NoError(t, err) // try again and expect empty results out, err = s.FetchEndpoints(context.Background(), &discovery.DiscoveryRequest{Node: node}) assert.Nil(t, out) - assert.Error(t, err) + require.Error(t, err) out, err = s.FetchClusters(context.Background(), &discovery.DiscoveryRequest{Node: node}) assert.Nil(t, out) - assert.Error(t, err) + require.Error(t, err) out, err = s.FetchRoutes(context.Background(), &discovery.DiscoveryRequest{Node: node}) assert.Nil(t, out) - assert.Error(t, err) + require.Error(t, err) out, err = s.FetchListeners(context.Background(), &discovery.DiscoveryRequest{Node: node}) assert.Nil(t, out) - assert.Error(t, err) + require.Error(t, err) // try empty requests: not valid in a real gRPC server out, err = s.FetchEndpoints(context.Background(), nil) assert.Nil(t, out) - assert.Error(t, err) + require.Error(t, err) out, err = s.FetchClusters(context.Background(), nil) assert.Nil(t, out) - assert.Error(t, err) + require.Error(t, err) out, err = s.FetchRoutes(context.Background(), nil) assert.Nil(t, out) - assert.Error(t, err) + require.Error(t, err) out, err = s.FetchListeners(context.Background(), nil) assert.Nil(t, out) - assert.Error(t, err) + require.Error(t, err) out, err = s.FetchSecrets(context.Background(), nil) assert.Nil(t, out) - assert.Error(t, err) + require.Error(t, err) out, err = s.FetchRuntime(context.Background(), nil) assert.Nil(t, out) - assert.Error(t, err) + require.Error(t, err) // send error from callback callbackError = true out, err = s.FetchEndpoints(context.Background(), nil) assert.Nil(t, out) - assert.Error(t, err) + require.Error(t, err) out, err = s.FetchClusters(context.Background(), nil) assert.Nil(t, out) - assert.Error(t, err) + require.Error(t, err) out, err = s.FetchRoutes(context.Background(), nil) assert.Nil(t, out) - assert.Error(t, err) + require.Error(t, err) out, err = s.FetchListeners(context.Background(), nil) assert.Nil(t, out) - assert.Error(t, err) + require.Error(t, err) // verify fetch callbacks - assert.Equal(t, requestCount, 10) - assert.Equal(t, responseCount, 6) + assert.Equal(t, 10, requestCount) + assert.Equal(t, 6, responseCount) } func TestSendError(t *testing.T) { @@ -495,7 +496,7 @@ func TestSendError(t *testing.T) { // check that response fails since send returns error err := s.StreamAggregatedResources(resp) - assert.Error(t, err) + require.Error(t, err) close(resp.recv) }) @@ -517,9 +518,9 @@ func TestStaleNonce(t *testing.T) { stop := make(chan struct{}) go func() { err := s.StreamAggregatedResources(resp) - assert.NoError(t, err) + require.NoError(t, err) // should be two watches called - assert.False(t, !reflect.DeepEqual(map[string]int{typ: 2}, config.counts)) + assert.True(t, reflect.DeepEqual(map[string]int{typ: 2}, config.counts)) close(stop) }() select { @@ -585,7 +586,7 @@ func TestAggregatedHandlers(t *testing.T) { s := server.NewServer(context.Background(), config, server.CallbackFuncs{}, sotw.WithOrderedADS()) go func() { err := s.StreamAggregatedResources(resp) - assert.NoError(t, err) + require.NoError(t, err) }() count := 0 @@ -596,7 +597,7 @@ func TestAggregatedHandlers(t *testing.T) { count++ if count >= expectedCount { close(resp.recv) - assert.False(t, !reflect.DeepEqual(map[string]int{ + assert.True(t, reflect.DeepEqual(map[string]int{ rsrc.EndpointType: 1, rsrc.ClusterType: 1, rsrc.RouteType: 1, @@ -621,7 +622,7 @@ func TestAggregateRequestType(t *testing.T) { resp := makeMockStream(t) resp.recv <- &discovery.DiscoveryRequest{Node: node} err := s.StreamAggregatedResources(resp) - assert.Error(t, err) + require.Error(t, err) } func TestCancellations(t *testing.T) { @@ -636,8 +637,8 @@ func TestCancellations(t *testing.T) { close(resp.recv) s := server.NewServer(context.Background(), config, server.CallbackFuncs{}) err := s.StreamAggregatedResources(resp) - assert.NoError(t, err) - assert.Equal(t, config.watches, 0) + require.NoError(t, err) + assert.Equal(t, 0, config.watches) } func TestOpaqueRequestsChannelMuxing(t *testing.T) { @@ -654,8 +655,8 @@ func TestOpaqueRequestsChannelMuxing(t *testing.T) { close(resp.recv) s := server.NewServer(context.Background(), config, server.CallbackFuncs{}) err := s.StreamAggregatedResources(resp) - assert.NoError(t, err) - assert.Equal(t, config.watches, 0) + require.NoError(t, err) + assert.Equal(t, 0, config.watches) } func TestCallbackError(t *testing.T) { @@ -679,7 +680,7 @@ func TestCallbackError(t *testing.T) { // check that response fails since stream open returns error err := s.StreamAggregatedResources(resp) - assert.Error(t, err) + require.Error(t, err) close(resp.recv) }) diff --git a/pkg/test/main/main.go b/pkg/test/main/main.go index 3072e16578..763a3ad3ed 100644 --- a/pkg/test/main/main.go +++ b/pkg/test/main/main.go @@ -155,7 +155,6 @@ func init() { // Enable use of the pprof profiler flag.BoolVar(&pprofEnabled, "pprof", false, "Enable use of the pprof profiler") - } // main returns code 1 if any of the batches failed to pass all requests @@ -186,7 +185,7 @@ func main() { if mux { configCache = &cache.MuxCache{ Classify: func(req *cache.Request) string { - if req.TypeUrl == typeURL { + if req.GetTypeUrl() == typeURL { return "eds" } return "default" diff --git a/pkg/test/resource/v3/resource.go b/pkg/test/resource/v3/resource.go index 942f8c7beb..afd8369196 100644 --- a/pkg/test/resource/v3/resource.go +++ b/pkg/test/resource/v3/resource.go @@ -65,10 +65,8 @@ const ( DeltaAds = "delta-ads" ) -var ( - // RefreshDelay for the polling config source. - RefreshDelay = 500 * time.Millisecond -) +// RefreshDelay for the polling config source. +var RefreshDelay = 500 * time.Millisecond // MakeEndpoint creates a localhost endpoint on a given port. func MakeEndpoint(clusterName string, port uint32) *endpoint.ClusterLoadAssignment { @@ -155,7 +153,7 @@ func MakeScopedRouteConfig(scopedRouteName string, routeConfigurationName string StringKey: key, }, } - k.Fragments = append(k.Fragments, fragment) + k.Fragments = append(k.GetFragments(), fragment) } return &route.ScopedRouteConfiguration{ @@ -652,7 +650,7 @@ func (ts *TestSnapshot) generateTCPListeners(numListeners int, clusters []types. func (ts *TestSnapshot) addTLS(l *listener.Listener) { if ts.TLS { - for i, chain := range l.FilterChains { + for i, chain := range l.GetFilterChains() { tlsc := &auth.DownstreamTlsContext{ CommonTlsContext: &auth.CommonTlsContext{ TlsCertificateSdsSecretConfigs: []*auth.SdsSecretConfig{{ diff --git a/pkg/test/server.go b/pkg/test/server.go index 28096956e8..edf464dcc0 100644 --- a/pkg/test/server.go +++ b/pkg/test/server.go @@ -119,7 +119,6 @@ func RunManagementGateway(ctx context.Context, srv server.Server, port uint) { func (h *HTTPGateway) ServeHTTP(resp http.ResponseWriter, req *http.Request) { bytes, code, err := h.Gateway.ServeHTTP(req) - if err != nil { http.Error(resp, err.Error(), code) return diff --git a/pkg/test/v3/accesslog.go b/pkg/test/v3/accesslog.go index 9202df369f..0fa74f1606 100644 --- a/pkg/test/v3/accesslog.go +++ b/pkg/test/v3/accesslog.go @@ -44,16 +44,16 @@ func (svc *AccessLogService) StreamAccessLogs(stream accessloggrpc.AccessLogServ if err != nil { return err } - if msg.Identifier != nil { - logName = msg.Identifier.LogName + if msg.GetIdentifier() != nil { + logName = msg.GetIdentifier().GetLogName() } - switch entries := msg.LogEntries.(type) { + switch entries := msg.GetLogEntries().(type) { case *accessloggrpc.StreamAccessLogsMessage_HttpLogs: - for _, entry := range entries.HttpLogs.LogEntry { + for _, entry := range entries.HttpLogs.GetLogEntry() { if entry != nil { - common := entry.CommonProperties - req := entry.Request - resp := entry.Response + common := entry.GetCommonProperties() + req := entry.GetRequest() + resp := entry.GetResponse() if common == nil { common = &alf.AccessLogCommon{} } @@ -64,19 +64,19 @@ func (svc *AccessLogService) StreamAccessLogs(stream accessloggrpc.AccessLogServ resp = &alf.HTTPResponseProperties{} } svc.log(fmt.Sprintf("[%s%s] %s %s %s %d %s %s", - logName, time.Now().Format(time.RFC3339), req.Authority, req.Path, req.Scheme, - resp.ResponseCode.GetValue(), req.RequestId, common.UpstreamCluster)) + logName, time.Now().Format(time.RFC3339), req.GetAuthority(), req.GetPath(), req.GetScheme(), + resp.GetResponseCode().GetValue(), req.GetRequestId(), common.GetUpstreamCluster())) } } case *accessloggrpc.StreamAccessLogsMessage_TcpLogs: - for _, entry := range entries.TcpLogs.LogEntry { + for _, entry := range entries.TcpLogs.GetLogEntry() { if entry != nil { - common := entry.CommonProperties + common := entry.GetCommonProperties() if common == nil { common = &alf.AccessLogCommon{} } svc.log(fmt.Sprintf("[%s%s] tcp %s %s", - logName, time.Now().Format(time.RFC3339), common.UpstreamLocalAddress, common.UpstreamCluster)) + logName, time.Now().Format(time.RFC3339), common.GetUpstreamLocalAddress(), common.GetUpstreamCluster())) } } }