Skip to content

Commit

Permalink
Add a constructor for vl3.IPAM (#1597)
Browse files Browse the repository at this point in the history
* Add a constructor for vl3.IPAM

Signed-off-by: NikitaSkrynnik <[email protected]>

* use the constructor everywhere

Signed-off-by: NikitaSkrynnik <[email protected]>

* fix go linter issues

Signed-off-by: NikitaSkrynnik <[email protected]>

* Fix event system in vl3 client

Signed-off-by: NikitaSkrynnik <[email protected]>

* fix go linter issues

Signed-off-by: NikitaSkrynnik <[email protected]>

* fix race condition

Signed-off-by: NikitaSkrynnik <[email protected]>

* reduce param count

Signed-off-by: NikitaSkrynnik <[email protected]>

* use linked list to store ipam's subscriptions

Signed-off-by: NikitaSkrynnik <[email protected]>

* cleanup

Signed-off-by: NikitaSkrynnik <[email protected]>

* fix go linter issues

Signed-off-by: NikitaSkrynnik <[email protected]>

* cleanup

Signed-off-by: NikitaSkrynnik <[email protected]>

* fix go linter issues

Signed-off-by: NikitaSkrynnik <[email protected]>

---------

Signed-off-by: NikitaSkrynnik <[email protected]>
  • Loading branch information
NikitaSkrynnik authored Mar 25, 2024
1 parent 368383f commit d803d10
Show file tree
Hide file tree
Showing 6 changed files with 139 additions and 89 deletions.
43 changes: 18 additions & 25 deletions pkg/networkservice/chains/nsmgr/vl3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,7 @@ func Test_NSC_ConnectsTo_vl3NSE(t *testing.T) {
dnsServerIPCh := make(chan net.IP, 1)
dnsServerIPCh <- net.ParseIP("127.0.0.1")

var ipam vl3.IPAM
ipam.Reset(ctx, "10.0.0.1/24", []string{})
ipam := vl3.NewIPAM("10.0.0.1/24")

_ = domain.Nodes[0].NewEndpoint(
ctx,
Expand All @@ -86,7 +85,7 @@ func Test_NSC_ConnectsTo_vl3NSE(t *testing.T) {
dnsServerIPCh,
vl3dns.WithDomainSchemes("{{ index .Labels \"podName\" }}.{{ .NetworkService }}."),
vl3dns.WithDNSPort(40053)),
vl3.NewServer(ctx, &ipam),
vl3.NewServer(ctx, ipam),
)

resolver := net.Resolver{
Expand Down Expand Up @@ -159,8 +158,7 @@ func Test_vl3NSE_ConnectsTo_vl3NSE(t *testing.T) {
dnsServerIPCh := make(chan net.IP, 1)
dnsServerIPCh <- net.ParseIP("0.0.0.0")

var serverIpam vl3.IPAM
serverIpam.Reset(ctx, "10.0.0.1/24", []string{})
serverIpam := vl3.NewIPAM("10.0.0.1/24")

_ = domain.Nodes[0].NewEndpoint(
ctx,
Expand All @@ -175,7 +173,7 @@ func Test_vl3NSE_ConnectsTo_vl3NSE(t *testing.T) {
vl3dns.WithConfigs(dnsConfigs),
vl3dns.WithDNSPort(40053),
),
vl3.NewServer(ctx, &serverIpam),
vl3.NewServer(ctx, serverIpam),
)

resolver := net.Resolver{
Expand All @@ -186,9 +184,8 @@ func Test_vl3NSE_ConnectsTo_vl3NSE(t *testing.T) {
},
}

var clientIpam vl3.IPAM
clientIpam.Reset(ctx, "127.0.0.1/32", []string{})
nsc := domain.Nodes[0].NewClient(ctx, sandbox.GenerateTestToken, client.WithAdditionalFunctionality(vl3dns.NewClient(net.ParseIP("127.0.0.1"), dnsConfigs), vl3.NewClient(ctx, &clientIpam)))
clientIpam := vl3.NewIPAM("127.0.0.1/32")
nsc := domain.Nodes[0].NewClient(ctx, sandbox.GenerateTestToken, client.WithAdditionalFunctionality(vl3dns.NewClient(net.ParseIP("127.0.0.1"), dnsConfigs), vl3.NewClient(ctx, clientIpam)))

req := defaultRequest(nsReg.Name)
req.Connection.Id = uuid.New().String()
Expand Down Expand Up @@ -244,8 +241,7 @@ func Test_NSC_GetsVl3DnsAddressDelay(t *testing.T) {
nseReg := defaultRegistryEndpoint(nsReg.Name)
dnsServerIPCh := make(chan net.IP, 1)

var ipam vl3.IPAM
ipam.Reset(ctx, "10.0.0.1/24", []string{})
ipam := vl3.NewIPAM("10.0.0.1/24")

_ = domain.Nodes[0].NewEndpoint(
ctx,
Expand All @@ -255,7 +251,7 @@ func Test_NSC_GetsVl3DnsAddressDelay(t *testing.T) {
dnsServerIPCh,
vl3dns.WithDomainSchemes("{{ index .Labels \"podName\" }}.{{ .NetworkService }}."),
vl3dns.WithDNSPort(40053)),
vl3.NewServer(ctx, &ipam))
vl3.NewServer(ctx, ipam))

nsc := domain.Nodes[0].NewClient(ctx, sandbox.GenerateTestToken)

Expand Down Expand Up @@ -290,8 +286,7 @@ func Test_vl3NSE_ConnectsTo_Itself(t *testing.T) {
nseReg := defaultRegistryEndpoint(nsReg.Name)
dnsServerIPCh := make(chan net.IP, 1)

var ipam vl3.IPAM
ipam.Reset(ctx, "10.0.0.1/24", []string{})
ipam := vl3.NewIPAM("10.0.0.1/24")

_ = domain.Nodes[0].NewEndpoint(
ctx,
Expand All @@ -300,7 +295,7 @@ func Test_vl3NSE_ConnectsTo_Itself(t *testing.T) {
vl3dns.NewServer(ctx,
dnsServerIPCh,
vl3dns.WithDNSPort(40053)),
vl3.NewServer(ctx, &ipam))
vl3.NewServer(ctx, ipam))

// Connection to itself. This allows us to assign a dns address to ourselves.
nsc := domain.Nodes[0].NewClient(ctx, sandbox.GenerateTestToken, client.WithName(nseReg.Name))
Expand Down Expand Up @@ -343,11 +338,10 @@ func Test_Interdomain_vl3_dns(t *testing.T) {
dnsServerIPCh := make(chan net.IP, 1)
dnsServerIPCh <- net.ParseIP("127.0.0.1")

var ipam vl3.IPAM
ipam.Reset(ctx, "10.0.0.1/24", []string{})
ipam := vl3.NewIPAM("10.0.0.1/24")

cluster2.Nodes[0].NewEndpoint(ctx, nseReg, sandbox.GenerateTestToken,
vl3.NewServer(ctx, &ipam),
vl3.NewServer(ctx, ipam),
vl3dns.NewServer(ctx,
dnsServerIPCh,
vl3dns.WithDNSPort(40053),
Expand Down Expand Up @@ -443,11 +437,10 @@ func Test_FloatingInterdomain_vl3_dns(t *testing.T) {
dnsServerIPCh := make(chan net.IP, 1)
dnsServerIPCh <- net.ParseIP("127.0.0.1")

var ipam vl3.IPAM
ipam.Reset(ctx, "10.0.0.1/24", []string{})
ipam := vl3.NewIPAM("10.0.0.1/24")

cluster2.Nodes[0].NewEndpoint(ctx, nseReg, sandbox.GenerateTestToken,
vl3.NewServer(ctx, &ipam),
vl3.NewServer(ctx, ipam),
vl3dns.NewServer(ctx,
dnsServerIPCh,
vl3dns.WithDNSPort(40053),
Expand Down Expand Up @@ -521,14 +514,13 @@ func Test_NSC_ConnectsTo_vl3NSE_With_Invalid_IpContext(t *testing.T) {
prefix1 := "10.0.0.0/24"
prefix2 := "10.10.0.0/24"

var serverIpam vl3.IPAM
serverIpam.Reset(ctx, prefix1, []string{})
serverIpam := vl3.NewIPAM(prefix1)

_ = domain.Nodes[0].NewEndpoint(
ctx,
nseReg,
sandbox.GenerateTestToken,
strictvl3ipam.NewServer(ctx, vl3.NewServer, &serverIpam),
strictvl3ipam.NewServer(ctx, vl3.NewServer, serverIpam),
)

nsc := domain.Nodes[0].NewClient(ctx, sandbox.GenerateTestToken)
Expand All @@ -539,7 +531,8 @@ func Test_NSC_ConnectsTo_vl3NSE_With_Invalid_IpContext(t *testing.T) {

require.True(t, checkIPContext(conn.Context.IpContext, prefix1))

serverIpam.Reset(ctx, prefix2, []string{})
err = serverIpam.Reset(prefix2)
require.NoError(t, err)

req.Connection = conn
conn, err = nsc.Request(ctx, req)
Expand Down
29 changes: 5 additions & 24 deletions pkg/networkservice/connectioncontext/ipcontext/vl3/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (

"github.com/pkg/errors"

"github.com/edwarnicke/serialize"
"github.com/golang/protobuf/ptypes/empty"
"github.com/networkservicemesh/api/pkg/api/networkservice"
"google.golang.org/grpc"
Expand All @@ -33,10 +32,8 @@ import (
)

type vl3Client struct {
pool *IPAM
chainContext context.Context
executor serialize.Executor
subscriptions []chan struct{}
pool *IPAM
chainContext context.Context
}

// NewClient - returns a new vL3 client instance that manages connection.context.ipcontext for vL3 scenario.
Expand Down Expand Up @@ -74,32 +71,16 @@ func (n *vl3Client) Request(ctx context.Context, request *networkservice.Network

storeCancel(ctx, cancel)

notifyCh := make(chan struct{})

n.executor.AsyncExec(func() {
n.subscriptions = append(n.subscriptions, notifyCh)
unsubscribe := n.pool.Subscribe(func() {
eventFactory.Request(begin.CancelContext(cancelCtx))
})

go func() {
defer func() {
n.executor.AsyncExec(func() {
for i, sub := range n.subscriptions {
if sub == notifyCh {
n.subscriptions = append(n.subscriptions[:i], n.subscriptions[i+1:]...)
close(notifyCh)
return
}
}
})
}()
defer unsubscribe()

select {
case <-n.chainContext.Done():
return
case <-cancelCtx.Done():
return
case <-notifyCh:
eventFactory.Request(begin.CancelContext(cancelCtx))
}
}()

Expand Down
41 changes: 16 additions & 25 deletions pkg/networkservice/connectioncontext/ipcontext/vl3/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,7 @@ func Test_Client_ConnectsToVl3NSE(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()

var ipamPool vl3.IPAM
ipamPool.Reset(ctx, "10.0.0.1/24", []string{})
ipam := vl3.NewIPAM("10.0.0.1/24")

var server = next.NewNetworkServiceServer(
adapters.NewClientToServer(
Expand All @@ -54,7 +53,7 @@ func Test_Client_ConnectsToVl3NSE(t *testing.T) {
),
),
metadata.NewServer(),
vl3.NewServer(ctx, &ipamPool),
vl3.NewServer(ctx, ipam),
)

resp, err := server.Request(ctx, &networkservice.NetworkServiceRequest{Connection: &networkservice.Connection{Id: t.Name()}})
Expand Down Expand Up @@ -91,22 +90,19 @@ func Test_VL3NSE_ConnectsToVl3NSE(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()

var clientIpamPool vl3.IPAM
var serverIpamPool vl3.IPAM

clientIpamPool.Reset(ctx, "10.0.1.0/24", []string{})
serverIpamPool.Reset(ctx, "10.0.0.1/24", []string{})
clientIpam := vl3.NewIPAM("10.0.1.0/24")
serverIpam := vl3.NewIPAM("10.0.0.1/24")

var server = next.NewNetworkServiceServer(
adapters.NewClientToServer(
next.NewNetworkServiceClient(
begin.NewClient(),
metadata.NewClient(),
vl3.NewClient(ctx, &clientIpamPool),
vl3.NewClient(ctx, clientIpam),
),
),
metadata.NewServer(),
vl3.NewServer(ctx, &serverIpamPool),
vl3.NewServer(ctx, serverIpam),
)

resp, err := server.Request(ctx, &networkservice.NetworkServiceRequest{Connection: &networkservice.Connection{Id: t.Name()}})
Expand Down Expand Up @@ -145,22 +141,19 @@ func Test_VL3NSE_ConnectsToVl3NSE_ChangePrefix(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()

var clientIpamPool vl3.IPAM
var serverIpamPool vl3.IPAM

clientIpamPool.Reset(ctx, "10.0.1.0/24", []string{})
serverIpamPool.Reset(ctx, "10.0.0.1/24", []string{})
clientIpam := vl3.NewIPAM("10.0.1.0/24")
serverIpam := vl3.NewIPAM("10.0.0.1/24")

var server = next.NewNetworkServiceServer(
adapters.NewClientToServer(
next.NewNetworkServiceClient(
begin.NewClient(),
metadata.NewClient(),
vl3.NewClient(ctx, &clientIpamPool),
vl3.NewClient(ctx, clientIpam),
),
),
metadata.NewServer(),
vl3.NewServer(ctx, &serverIpamPool),
vl3.NewServer(ctx, serverIpam),
)

resp, err := server.Request(ctx, &networkservice.NetworkServiceRequest{Connection: &networkservice.Connection{Id: t.Name()}})
Expand All @@ -176,7 +169,8 @@ func Test_VL3NSE_ConnectsToVl3NSE_ChangePrefix(t *testing.T) {
require.Equal(t, "10.0.1.0/32", resp.GetContext().GetIpContext().GetDstRoutes()[0].GetPrefix())
require.Equal(t, "10.0.1.0/24", resp.GetContext().GetIpContext().GetDstRoutes()[1].GetPrefix())

clientIpamPool.Reset(ctx, "10.0.5.0/24", []string{})
err = clientIpam.Reset("10.0.5.0/24")
require.NoError(t, err)

// refresh
for i := 0; i < 10; i++ {
Expand All @@ -203,22 +197,19 @@ func Test_VL3NSE_ConnectsToVl3NSE_Close(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()

var clientIpamPool vl3.IPAM
var serverIpamPool vl3.IPAM

clientIpamPool.Reset(ctx, "10.0.1.0/24", []string{})
serverIpamPool.Reset(ctx, "10.0.0.1/24", []string{})
clientIpam := vl3.NewIPAM("10.0.1.0/24")
serverIpam := vl3.NewIPAM("10.0.0.1/24")

var server = next.NewNetworkServiceServer(
adapters.NewClientToServer(
next.NewNetworkServiceClient(
begin.NewClient(),
metadata.NewClient(),
vl3.NewClient(ctx, &clientIpamPool),
vl3.NewClient(ctx, clientIpam),
),
),
metadata.NewServer(),
vl3.NewServer(ctx, &serverIpamPool),
vl3.NewServer(ctx, serverIpam),
)

resp, err := server.Request(ctx, &networkservice.NetworkServiceRequest{Connection: &networkservice.Connection{Id: uuid.New().String()}})
Expand Down
Loading

0 comments on commit d803d10

Please sign in to comment.