Skip to content

Commit

Permalink
add defers to cancel stream so we have no goroutine leaks
Browse files Browse the repository at this point in the history
Signed-off-by: Zhiyan Foo <[email protected]>
  • Loading branch information
zhiyanfoo committed Apr 9, 2024
1 parent c5a1fe1 commit f8eb965
Showing 1 changed file with 18 additions and 2 deletions.
20 changes: 18 additions & 2 deletions pkg/server/v3/delta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@ func TestDeltaResponseHandlersWildcard(t *testing.T) {
s := server.NewServer(context.Background(), config, server.CallbackFuncs{})

resp := makeMockDeltaStream(t)
defer resp.cancel()
// This is a wildcard request since we don't specify a list of resource subscriptions
resp.recv <- &discovery.DeltaDiscoveryRequest{Node: node, TypeUrl: typ}

Expand Down Expand Up @@ -261,6 +262,7 @@ func TestDeltaResponseHandlers(t *testing.T) {
s := server.NewServer(context.Background(), config, server.CallbackFuncs{})

resp := makeMockDeltaStream(t)
defer resp.cancel()
resourceNames := []string{}
for resourceName := range config.deltaResources[typ] {
resourceNames = append(resourceNames, resourceName)
Expand Down Expand Up @@ -295,6 +297,7 @@ func TestSendDeltaError(t *testing.T) {

// make a request with an error
resp := makeMockDeltaStream(t)
defer resp.cancel()
resp.sendError = true
resp.recv <- &discovery.DeltaDiscoveryRequest{
Node: node,
Expand All @@ -314,6 +317,7 @@ func TestDeltaAggregatedHandlers(t *testing.T) {
config := makeMockConfigWatcher()
config.deltaResources = makeDeltaResources()
resp := makeMockDeltaStream(t)
defer resp.cancel()

reqs := []*discovery.DeltaDiscoveryRequest{
{
Expand Down Expand Up @@ -388,9 +392,11 @@ func TestDeltaAggregatedHandlers(t *testing.T) {
}

func TestDeltaAggregateRequestType(t *testing.T) {
defer goleak.VerifyNone(t)
config := makeMockConfigWatcher()
s := server.NewServer(context.Background(), config, server.CallbackFuncs{})
resp := makeMockDeltaStream(t)
defer resp.cancel()
resp.recv <- &discovery.DeltaDiscoveryRequest{Node: node}
if err := s.DeltaAggregatedResources(resp); err == nil {
t.Error("DeltaAggregatedResources() => got nil, want an error")
Expand All @@ -399,8 +405,10 @@ func TestDeltaAggregateRequestType(t *testing.T) {
}

func TestDeltaCancellations(t *testing.T) {
defer goleak.VerifyNone(t)
config := makeMockConfigWatcher()
resp := makeMockDeltaStream(t)
defer resp.cancel()
for _, typ := range testTypes {
resp.recv <- &discovery.DeltaDiscoveryRequest{
Node: node,
Expand All @@ -418,8 +426,10 @@ func TestDeltaCancellations(t *testing.T) {
}

func TestDeltaOpaqueRequestsChannelMuxing(t *testing.T) {
defer goleak.VerifyNone(t)
config := makeMockConfigWatcher()
resp := makeMockDeltaStream(t)
defer resp.cancel()
for i := 0; i < 10; i++ {
resp.recv <- &discovery.DeltaDiscoveryRequest{
Node: node,
Expand All @@ -440,6 +450,7 @@ func TestDeltaOpaqueRequestsChannelMuxing(t *testing.T) {
func TestDeltaCallbackError(t *testing.T) {
for _, typ := range testTypes {
t.Run(typ, func(t *testing.T) {
defer goleak.VerifyNone(t)
config := makeMockConfigWatcher()
config.deltaResources = makeDeltaResources()

Expand All @@ -451,6 +462,7 @@ func TestDeltaCallbackError(t *testing.T) {

// make a request
resp := makeMockDeltaStream(t)
defer resp.cancel()
resp.recv <- &discovery.DeltaDiscoveryRequest{
Node: node,
TypeUrl: typ,
Expand Down Expand Up @@ -503,7 +515,9 @@ func TestDeltaWildcardSubscriptions(t *testing.T) {
}

t.Run("legacy still working", func(t *testing.T) {
defer goleak.VerifyNone(t)
resp := makeMockDeltaStream(t)
defer resp.cancel()
defer close(resp.recv)
s := server.NewServer(context.Background(), config, server.CallbackFuncs{})
go func() {
Expand Down Expand Up @@ -541,6 +555,7 @@ func TestDeltaWildcardSubscriptions(t *testing.T) {

t.Run("* subscription/unsubscription support", func(t *testing.T) {
resp := makeMockDeltaStream(t)
defer resp.cancel()
defer close(resp.recv)
s := server.NewServer(context.Background(), config, server.CallbackFuncs{})
go func() {
Expand Down Expand Up @@ -586,6 +601,7 @@ func TestDeltaWildcardSubscriptions(t *testing.T) {

t.Run("resource specific subscriptions while using wildcard", func(t *testing.T) {
resp := makeMockDeltaStream(t)
defer resp.cancel()
defer close(resp.recv)
s := server.NewServer(context.Background(), config, server.CallbackFuncs{})
go func() {
Expand Down Expand Up @@ -625,9 +641,11 @@ func TestDeltaWildcardSubscriptions(t *testing.T) {
func TestDeltaMultipleStreams(t *testing.T) {
// Unit test for issue identified in https://github.com/envoyproxy/go-control-plane/issues/913
t.Run("return error to delta stream request; multiple streams.", func(t *testing.T) {
defer goleak.VerifyNone(t)
config := makeMockConfigWatcher()
resp := makeMockDeltaStream(t)
defer close(resp.recv)
defer resp.cancel()
s := server.NewServer(
context.Background(),
config,
Expand All @@ -648,7 +666,5 @@ func TestDeltaMultipleStreams(t *testing.T) {

err := s.DeltaAggregatedResources(resp)
require.Error(t, err)
resp.cancel()
defer goleak.VerifyNone(t)
})
}

0 comments on commit f8eb965

Please sign in to comment.