diff --git a/pkg/networkservice/chains/nsmgr/vl3_test.go b/pkg/networkservice/chains/nsmgr/vl3_test.go index ab53172c5..67fc1a0d4 100644 --- a/pkg/networkservice/chains/nsmgr/vl3_test.go +++ b/pkg/networkservice/chains/nsmgr/vl3_test.go @@ -75,8 +75,7 @@ func Test_NSC_ConnectsTo_vl3NSE(t *testing.T) { dnsServerIPCh := make(chan net.IP, 1) dnsServerIPCh <- net.ParseIP("127.0.0.1") - var ipam vl3.IPAM - ipam.Reset(ctx, "10.0.0.1/24", []string{}) + ipam := vl3.NewIPAM("10.0.0.1/24") _ = domain.Nodes[0].NewEndpoint( ctx, @@ -86,7 +85,7 @@ func Test_NSC_ConnectsTo_vl3NSE(t *testing.T) { dnsServerIPCh, vl3dns.WithDomainSchemes("{{ index .Labels \"podName\" }}.{{ .NetworkService }}."), vl3dns.WithDNSPort(40053)), - vl3.NewServer(ctx, &ipam), + vl3.NewServer(ctx, ipam), ) resolver := net.Resolver{ @@ -159,8 +158,7 @@ func Test_vl3NSE_ConnectsTo_vl3NSE(t *testing.T) { dnsServerIPCh := make(chan net.IP, 1) dnsServerIPCh <- net.ParseIP("0.0.0.0") - var serverIpam vl3.IPAM - serverIpam.Reset(ctx, "10.0.0.1/24", []string{}) + serverIpam := vl3.NewIPAM("10.0.0.1/24") _ = domain.Nodes[0].NewEndpoint( ctx, @@ -175,7 +173,7 @@ func Test_vl3NSE_ConnectsTo_vl3NSE(t *testing.T) { vl3dns.WithConfigs(dnsConfigs), vl3dns.WithDNSPort(40053), ), - vl3.NewServer(ctx, &serverIpam), + vl3.NewServer(ctx, serverIpam), ) resolver := net.Resolver{ @@ -186,9 +184,8 @@ func Test_vl3NSE_ConnectsTo_vl3NSE(t *testing.T) { }, } - var clientIpam vl3.IPAM - clientIpam.Reset(ctx, "127.0.0.1/32", []string{}) - nsc := domain.Nodes[0].NewClient(ctx, sandbox.GenerateTestToken, client.WithAdditionalFunctionality(vl3dns.NewClient(net.ParseIP("127.0.0.1"), dnsConfigs), vl3.NewClient(ctx, &clientIpam))) + clientIpam := vl3.NewIPAM("127.0.0.1/32") + nsc := domain.Nodes[0].NewClient(ctx, sandbox.GenerateTestToken, client.WithAdditionalFunctionality(vl3dns.NewClient(net.ParseIP("127.0.0.1"), dnsConfigs), vl3.NewClient(ctx, clientIpam))) req := defaultRequest(nsReg.Name) req.Connection.Id = uuid.New().String() @@ -244,8 +241,7 @@ func Test_NSC_GetsVl3DnsAddressDelay(t *testing.T) { nseReg := defaultRegistryEndpoint(nsReg.Name) dnsServerIPCh := make(chan net.IP, 1) - var ipam vl3.IPAM - ipam.Reset(ctx, "10.0.0.1/24", []string{}) + ipam := vl3.NewIPAM("10.0.0.1/24") _ = domain.Nodes[0].NewEndpoint( ctx, @@ -255,7 +251,7 @@ func Test_NSC_GetsVl3DnsAddressDelay(t *testing.T) { dnsServerIPCh, vl3dns.WithDomainSchemes("{{ index .Labels \"podName\" }}.{{ .NetworkService }}."), vl3dns.WithDNSPort(40053)), - vl3.NewServer(ctx, &ipam)) + vl3.NewServer(ctx, ipam)) nsc := domain.Nodes[0].NewClient(ctx, sandbox.GenerateTestToken) @@ -290,8 +286,7 @@ func Test_vl3NSE_ConnectsTo_Itself(t *testing.T) { nseReg := defaultRegistryEndpoint(nsReg.Name) dnsServerIPCh := make(chan net.IP, 1) - var ipam vl3.IPAM - ipam.Reset(ctx, "10.0.0.1/24", []string{}) + ipam := vl3.NewIPAM("10.0.0.1/24") _ = domain.Nodes[0].NewEndpoint( ctx, @@ -300,7 +295,7 @@ func Test_vl3NSE_ConnectsTo_Itself(t *testing.T) { vl3dns.NewServer(ctx, dnsServerIPCh, vl3dns.WithDNSPort(40053)), - vl3.NewServer(ctx, &ipam)) + vl3.NewServer(ctx, ipam)) // Connection to itself. This allows us to assign a dns address to ourselves. nsc := domain.Nodes[0].NewClient(ctx, sandbox.GenerateTestToken, client.WithName(nseReg.Name)) @@ -343,11 +338,10 @@ func Test_Interdomain_vl3_dns(t *testing.T) { dnsServerIPCh := make(chan net.IP, 1) dnsServerIPCh <- net.ParseIP("127.0.0.1") - var ipam vl3.IPAM - ipam.Reset(ctx, "10.0.0.1/24", []string{}) + ipam := vl3.NewIPAM("10.0.0.1/24") cluster2.Nodes[0].NewEndpoint(ctx, nseReg, sandbox.GenerateTestToken, - vl3.NewServer(ctx, &ipam), + vl3.NewServer(ctx, ipam), vl3dns.NewServer(ctx, dnsServerIPCh, vl3dns.WithDNSPort(40053), @@ -443,11 +437,10 @@ func Test_FloatingInterdomain_vl3_dns(t *testing.T) { dnsServerIPCh := make(chan net.IP, 1) dnsServerIPCh <- net.ParseIP("127.0.0.1") - var ipam vl3.IPAM - ipam.Reset(ctx, "10.0.0.1/24", []string{}) + ipam := vl3.NewIPAM("10.0.0.1/24") cluster2.Nodes[0].NewEndpoint(ctx, nseReg, sandbox.GenerateTestToken, - vl3.NewServer(ctx, &ipam), + vl3.NewServer(ctx, ipam), vl3dns.NewServer(ctx, dnsServerIPCh, vl3dns.WithDNSPort(40053), @@ -521,14 +514,13 @@ func Test_NSC_ConnectsTo_vl3NSE_With_Invalid_IpContext(t *testing.T) { prefix1 := "10.0.0.0/24" prefix2 := "10.10.0.0/24" - var serverIpam vl3.IPAM - serverIpam.Reset(ctx, prefix1, []string{}) + serverIpam := vl3.NewIPAM(prefix1) _ = domain.Nodes[0].NewEndpoint( ctx, nseReg, sandbox.GenerateTestToken, - strictvl3ipam.NewServer(ctx, vl3.NewServer, &serverIpam), + strictvl3ipam.NewServer(ctx, vl3.NewServer, serverIpam), ) nsc := domain.Nodes[0].NewClient(ctx, sandbox.GenerateTestToken) @@ -539,7 +531,8 @@ func Test_NSC_ConnectsTo_vl3NSE_With_Invalid_IpContext(t *testing.T) { require.True(t, checkIPContext(conn.Context.IpContext, prefix1)) - serverIpam.Reset(ctx, prefix2, []string{}) + err = serverIpam.Reset(prefix2) + require.NoError(t, err) req.Connection = conn conn, err = nsc.Request(ctx, req) diff --git a/pkg/networkservice/connectioncontext/ipcontext/vl3/client.go b/pkg/networkservice/connectioncontext/ipcontext/vl3/client.go index 3a252978a..6326091e8 100644 --- a/pkg/networkservice/connectioncontext/ipcontext/vl3/client.go +++ b/pkg/networkservice/connectioncontext/ipcontext/vl3/client.go @@ -23,7 +23,6 @@ import ( "github.com/pkg/errors" - "github.com/edwarnicke/serialize" "github.com/golang/protobuf/ptypes/empty" "github.com/networkservicemesh/api/pkg/api/networkservice" "google.golang.org/grpc" @@ -33,10 +32,8 @@ import ( ) type vl3Client struct { - pool *IPAM - chainContext context.Context - executor serialize.Executor - subscriptions []chan struct{} + pool *IPAM + chainContext context.Context } // NewClient - returns a new vL3 client instance that manages connection.context.ipcontext for vL3 scenario. @@ -74,32 +71,16 @@ func (n *vl3Client) Request(ctx context.Context, request *networkservice.Network storeCancel(ctx, cancel) - notifyCh := make(chan struct{}) - - n.executor.AsyncExec(func() { - n.subscriptions = append(n.subscriptions, notifyCh) + unsubscribe := n.pool.Subscribe(func() { + eventFactory.Request(begin.CancelContext(cancelCtx)) }) go func() { - defer func() { - n.executor.AsyncExec(func() { - for i, sub := range n.subscriptions { - if sub == notifyCh { - n.subscriptions = append(n.subscriptions[:i], n.subscriptions[i+1:]...) - close(notifyCh) - return - } - } - }) - }() + defer unsubscribe() select { case <-n.chainContext.Done(): - return case <-cancelCtx.Done(): - return - case <-notifyCh: - eventFactory.Request(begin.CancelContext(cancelCtx)) } }() diff --git a/pkg/networkservice/connectioncontext/ipcontext/vl3/client_test.go b/pkg/networkservice/connectioncontext/ipcontext/vl3/client_test.go index 0ea0a6393..8780a8602 100644 --- a/pkg/networkservice/connectioncontext/ipcontext/vl3/client_test.go +++ b/pkg/networkservice/connectioncontext/ipcontext/vl3/client_test.go @@ -42,8 +42,7 @@ func Test_Client_ConnectsToVl3NSE(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() - var ipamPool vl3.IPAM - ipamPool.Reset(ctx, "10.0.0.1/24", []string{}) + ipam := vl3.NewIPAM("10.0.0.1/24") var server = next.NewNetworkServiceServer( adapters.NewClientToServer( @@ -54,7 +53,7 @@ func Test_Client_ConnectsToVl3NSE(t *testing.T) { ), ), metadata.NewServer(), - vl3.NewServer(ctx, &ipamPool), + vl3.NewServer(ctx, ipam), ) resp, err := server.Request(ctx, &networkservice.NetworkServiceRequest{Connection: &networkservice.Connection{Id: t.Name()}}) @@ -91,22 +90,19 @@ func Test_VL3NSE_ConnectsToVl3NSE(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() - var clientIpamPool vl3.IPAM - var serverIpamPool vl3.IPAM - - clientIpamPool.Reset(ctx, "10.0.1.0/24", []string{}) - serverIpamPool.Reset(ctx, "10.0.0.1/24", []string{}) + clientIpam := vl3.NewIPAM("10.0.1.0/24") + serverIpam := vl3.NewIPAM("10.0.0.1/24") var server = next.NewNetworkServiceServer( adapters.NewClientToServer( next.NewNetworkServiceClient( begin.NewClient(), metadata.NewClient(), - vl3.NewClient(ctx, &clientIpamPool), + vl3.NewClient(ctx, clientIpam), ), ), metadata.NewServer(), - vl3.NewServer(ctx, &serverIpamPool), + vl3.NewServer(ctx, serverIpam), ) resp, err := server.Request(ctx, &networkservice.NetworkServiceRequest{Connection: &networkservice.Connection{Id: t.Name()}}) @@ -145,22 +141,19 @@ func Test_VL3NSE_ConnectsToVl3NSE_ChangePrefix(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() - var clientIpamPool vl3.IPAM - var serverIpamPool vl3.IPAM - - clientIpamPool.Reset(ctx, "10.0.1.0/24", []string{}) - serverIpamPool.Reset(ctx, "10.0.0.1/24", []string{}) + clientIpam := vl3.NewIPAM("10.0.1.0/24") + serverIpam := vl3.NewIPAM("10.0.0.1/24") var server = next.NewNetworkServiceServer( adapters.NewClientToServer( next.NewNetworkServiceClient( begin.NewClient(), metadata.NewClient(), - vl3.NewClient(ctx, &clientIpamPool), + vl3.NewClient(ctx, clientIpam), ), ), metadata.NewServer(), - vl3.NewServer(ctx, &serverIpamPool), + vl3.NewServer(ctx, serverIpam), ) resp, err := server.Request(ctx, &networkservice.NetworkServiceRequest{Connection: &networkservice.Connection{Id: t.Name()}}) @@ -176,7 +169,8 @@ func Test_VL3NSE_ConnectsToVl3NSE_ChangePrefix(t *testing.T) { require.Equal(t, "10.0.1.0/32", resp.GetContext().GetIpContext().GetDstRoutes()[0].GetPrefix()) require.Equal(t, "10.0.1.0/24", resp.GetContext().GetIpContext().GetDstRoutes()[1].GetPrefix()) - clientIpamPool.Reset(ctx, "10.0.5.0/24", []string{}) + err = clientIpam.Reset("10.0.5.0/24") + require.NoError(t, err) // refresh for i := 0; i < 10; i++ { @@ -203,22 +197,19 @@ func Test_VL3NSE_ConnectsToVl3NSE_Close(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() - var clientIpamPool vl3.IPAM - var serverIpamPool vl3.IPAM - - clientIpamPool.Reset(ctx, "10.0.1.0/24", []string{}) - serverIpamPool.Reset(ctx, "10.0.0.1/24", []string{}) + clientIpam := vl3.NewIPAM("10.0.1.0/24") + serverIpam := vl3.NewIPAM("10.0.0.1/24") var server = next.NewNetworkServiceServer( adapters.NewClientToServer( next.NewNetworkServiceClient( begin.NewClient(), metadata.NewClient(), - vl3.NewClient(ctx, &clientIpamPool), + vl3.NewClient(ctx, clientIpam), ), ), metadata.NewServer(), - vl3.NewServer(ctx, &serverIpamPool), + vl3.NewServer(ctx, serverIpam), ) resp, err := server.Request(ctx, &networkservice.NetworkServiceRequest{Connection: &networkservice.Connection{Id: uuid.New().String()}}) diff --git a/pkg/networkservice/connectioncontext/ipcontext/vl3/ipam.go b/pkg/networkservice/connectioncontext/ipcontext/vl3/ipam.go index e31129403..cf1e9b74e 100644 --- a/pkg/networkservice/connectioncontext/ipcontext/vl3/ipam.go +++ b/pkg/networkservice/connectioncontext/ipcontext/vl3/ipam.go @@ -17,12 +17,12 @@ package vl3 import ( + "container/list" "context" "net" "sync" "github.com/networkservicemesh/sdk/pkg/tools/ippool" - "github.com/networkservicemesh/sdk/pkg/tools/log" ) // IPAM manages vl3 prefixes @@ -32,6 +32,40 @@ type IPAM struct { ipPool *ippool.IPPool excludedPrefixes map[string]struct{} clientMask uint8 + subscriptions list.List +} + +// NewIPAM creates a new vl3 ipam with specified prefix and excluded prefixes +func NewIPAM(prefix string, excludedPrefixes ...string) *IPAM { + ipam := new(IPAM) + err := ipam.Reset(prefix, excludedPrefixes...) + if err != nil { + panic(err) + } + return ipam +} + +// Subscribe creates a subscription for receiving events about changed prefixes +func (p *IPAM) Subscribe(action func()) context.CancelFunc { + defer p.Unlock() + p.Lock() + + node := p.subscriptions.PushBack(action) + + return func() { + p.Lock() + defer p.Unlock() + + p.subscriptions.Remove(node) + } +} + +func (p *IPAM) notify() { + for node := p.subscriptions.Front(); node != nil; node = node.Next() { + if action, ok := node.Value.(func()); ok { + action() + } + } } func (p *IPAM) isInitialized() bool { @@ -59,6 +93,7 @@ func (p *IPAM) selfPrefix() *net.IPNet { r := p.self return &r } + func (p *IPAM) globalIPNet() *net.IPNet { p.Lock() defer p.Unlock() @@ -111,16 +146,15 @@ func (p *IPAM) isExcluded(ipNet string) bool { } // Reset resets IPAM's ippol by setting new prefix -func (p *IPAM) Reset(ctx context.Context, prefix string, excludePrefies []string) { +func (p *IPAM) Reset(prefix string, excludePrefies ...string) error { p.Lock() defer p.Unlock() _, ipNet, err := net.ParseCIDR(prefix) + if err != nil { - log.FromContext(ctx).Error(err.Error()) - return + return err } - p.self = *ipNet p.ipPool = ippool.NewWithNet(ipNet) p.excludedPrefixes = make(map[string]struct{}) @@ -142,6 +176,9 @@ func (p *IPAM) Reset(ctx context.Context, prefix string, excludePrefies []string p.ipPool.ExcludeString(excludePrefix) p.excludedPrefixes[excludePrefix] = struct{}{} } + + p.notify() + return nil } // ContainsNetString checks if ippool contains net diff --git a/pkg/networkservice/connectioncontext/ipcontext/vl3/ipam_test.go b/pkg/networkservice/connectioncontext/ipcontext/vl3/ipam_test.go new file mode 100644 index 000000000..994c2b503 --- /dev/null +++ b/pkg/networkservice/connectioncontext/ipcontext/vl3/ipam_test.go @@ -0,0 +1,50 @@ +// Copyright (c) 2024 Cisco and/or its affiliates. +// +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package vl3_test + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "github.com/networkservicemesh/sdk/pkg/networkservice/connectioncontext/ipcontext/vl3" +) + +func TestSubscribtions(t *testing.T) { + counter := 0 + ipam := new(vl3.IPAM) + unsub1 := ipam.Subscribe(func() { + counter++ + }) + unsub2 := ipam.Subscribe(func() { + counter += 2 + }) + + err := ipam.Reset("10.0.0.1/24") + require.NoError(t, err) + require.Equal(t, counter, 3) + + unsub2() + err = ipam.Reset("10.0.0.1/24") + require.NoError(t, err) + require.Equal(t, counter, 4) + unsub1() + + err = ipam.Reset("10.0.0.1/24") + require.NoError(t, err) + require.Equal(t, counter, 4) +} diff --git a/pkg/networkservice/connectioncontext/ipcontext/vl3/server_test.go b/pkg/networkservice/connectioncontext/ipcontext/vl3/server_test.go index a6b2444b9..09a646629 100644 --- a/pkg/networkservice/connectioncontext/ipcontext/vl3/server_test.go +++ b/pkg/networkservice/connectioncontext/ipcontext/vl3/server_test.go @@ -35,12 +35,11 @@ func Test_NSC_ConnectsToVl3NSE(t *testing.T) { t.Cleanup(func() { goleak.VerifyNone(t) }) - var ipam vl3.IPAM - ipam.Reset(context.Background(), "10.0.0.1/24", []string{}) + ipam := vl3.NewIPAM("10.0.0.1/24") var server = next.NewNetworkServiceServer( metadata.NewServer(), - vl3.NewServer(context.Background(), &ipam), + vl3.NewServer(context.Background(), ipam), ) resp, err := server.Request(context.Background(), new(networkservice.NetworkServiceRequest)) @@ -75,12 +74,11 @@ func Test_NSC_ConnectsToVl3NSE_PrefixHasChanged(t *testing.T) { goleak.VerifyNone(t) }) - var ipam vl3.IPAM - ipam.Reset(context.Background(), "12.0.0.1/24", []string{}) + ipam := vl3.NewIPAM("12.0.0.1/24") var server = next.NewNetworkServiceServer( metadata.NewServer(), - vl3.NewServer(context.Background(), &ipam), + vl3.NewServer(context.Background(), ipam), ) resp, err := server.Request(context.Background(), new(networkservice.NetworkServiceRequest)) @@ -95,7 +93,8 @@ func Test_NSC_ConnectsToVl3NSE_PrefixHasChanged(t *testing.T) { require.Equal(t, "12.0.0.0/16", resp.GetContext().GetIpContext().GetSrcRoutes()[2].GetPrefix()) require.Equal(t, "12.0.0.1/32", resp.GetContext().GetIpContext().GetDstRoutes()[0].GetPrefix()) - ipam.Reset(context.Background(), "11.0.0.1/24", []string{}) + err = ipam.Reset("11.0.0.1/24") + require.NoError(t, err) // refresh for i := 0; i < 10; i++ { @@ -118,12 +117,11 @@ func Test_NSC_ConnectsToVl3NSE_Close(t *testing.T) { goleak.VerifyNone(t) }) - var ipam vl3.IPAM - ipam.Reset(context.Background(), "10.0.0.1/24", []string{}) + ipam := vl3.NewIPAM("10.0.0.1/24") var server = next.NewNetworkServiceServer( metadata.NewServer(), - vl3.NewServer(context.Background(), &ipam), + vl3.NewServer(context.Background(), ipam), ) for i := 0; i < 10; i++ {