From e02de1b84699c584ec315da425662aa7eadeecf6 Mon Sep 17 00:00:00 2001 From: Airren Date: Thu, 16 May 2024 11:05:52 +0800 Subject: [PATCH 1/2] improve: fix go lint Signed-off-by: Airren --- README.md | 2 + cmd/dedinic/main.go | 3 +- cmd/octopus/main.go | 13 +- doc/build-images.md | 16 +++ go.mod | 16 +-- go.sum | 16 +++ pkg/controller/agent.go | 89 +++++++------- pkg/controller/endpoint/endpoint.go | 52 +++++--- pkg/controller/endpointslice/endpointslice.go | 40 +++--- pkg/controller/peer_controller.go | 32 ++--- pkg/controller/tunnel.go | 19 +-- pkg/controller/wireguard_tools.go | 10 +- pkg/dedinic/cni_handler.go | 52 ++++---- pkg/dedinic/config.go | 8 +- pkg/dedinic/controller.go | 88 +++++--------- pkg/dedinic/delayqueue.go | 3 +- pkg/dedinic/kubelet.go | 7 +- pkg/dedinic/nri.go | 2 +- pkg/dedinic/oob.go | 115 +++++++++--------- pkg/dedinic/ovs.go | 67 ++++++---- 20 files changed, 357 insertions(+), 293 deletions(-) create mode 100644 doc/build-images.md diff --git a/README.md b/README.md index 70162c36..aaf93055 100644 --- a/README.md +++ b/README.md @@ -197,3 +197,5 @@ helm install nauti-agent mcs/nauti-agent --namespace nauti-system --create-nam done ``` + + diff --git a/cmd/dedinic/main.go b/cmd/dedinic/main.go index e3c14ec4..c46b7e70 100644 --- a/cmd/dedinic/main.go +++ b/cmd/dedinic/main.go @@ -13,7 +13,6 @@ import ( ) func main() { - defer klog.Flush() _ = dedinic.InitConfig() @@ -38,7 +37,7 @@ func main() { go dedinic.InitNRIPlugin(dedinic.Conf, ctl) - //todo if nri is invalid + // todo if nri is invalid if _, err := os.Stat("/var/run/nri/nri.sock"); os.IsNotExist(err) { klog.Infof("nri is not enabled, start with oob mode") go dedinic.InitOOb() diff --git a/cmd/octopus/main.go b/cmd/octopus/main.go index 73cdc436..d87a498d 100644 --- a/cmd/octopus/main.go +++ b/cmd/octopus/main.go @@ -17,7 +17,6 @@ import ( syncerConfig "github.com/nauti-io/nauti/pkg/config" "github.com/nauti-io/nauti/pkg/controller" octopusClientset "github.com/nauti-io/nauti/pkg/generated/clientset/versioned" - "github.com/nauti-io/nauti/pkg/generated/informers/externalversions" kubeinformers "github.com/nauti-io/nauti/pkg/generated/informers/externalversions" "github.com/nauti-io/nauti/pkg/known" ) @@ -28,7 +27,8 @@ var ( ) func init() { - flag.StringVar(&localKubeconfig, "kubeconfig", "", "Path to kubeconfig of local cluster. Only required if out-of-cluster.") + flag.StringVar(&localKubeconfig, "kubeconfig", "", + "Path to kubeconfig of local cluster. Only required if out-of-cluster.") flag.StringVar(&localMasterURL, "master", "", "The address of the Kubernetes API server. Overrides any value in kubeconfig. Only required if out-of-cluster.") } @@ -68,6 +68,9 @@ func main() { } // wait until secret is ready. hubKubeConfig, err = syncerConfig.GetHubConfig(k8sClient, &agentSpec) + if err != nil { + klog.Fatalf("get hub kubeconfig failed: %v", err) + } // syncer only work as cluster level if agent, errSyncerController := controller.New(&agentSpec, known.SyncerConfig{ @@ -102,8 +105,12 @@ func main() { klog.Fatal(err) return } - hubInformerFactory := externalversions.NewSharedInformerFactoryWithOptions(oClient, known.DefaultResync, kubeinformers.WithNamespace(agentSpec.ShareNamespace)) + hubInformerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(oClient, known.DefaultResync, + kubeinformers.WithNamespace(agentSpec.ShareNamespace)) peerController, err := controller.NewPeerController(agentSpec, w, hubInformerFactory) + if err != nil { + klog.Fatalf("start peer controller failed: %v", err) + } peerController.Start(ctx) <-ctx.Done() diff --git a/doc/build-images.md b/doc/build-images.md new file mode 100644 index 00000000..7f9899b8 --- /dev/null +++ b/doc/build-images.md @@ -0,0 +1,16 @@ +# How to build Nauti + +## Build the Nauti image +### Prerequisites +- Docker / Podman + +clone the repo to local directory +```bash +git clonehttps://github.com/nauti-io/nauti.git + +cd nauti +``` +### DediNic Image +```bash +docker build -t nauti . +``` diff --git a/go.mod b/go.mod index bc410397..f0bd8b96 100644 --- a/go.mod +++ b/go.mod @@ -19,7 +19,7 @@ require ( github.com/ovn-org/libovsdb v0.0.0-20230711201130-6785b52d4020 github.com/prometheus-community/pro-bing v0.3.0 github.com/sirupsen/logrus v1.9.3 - golang.org/x/sys v0.16.0 + golang.org/x/sys v0.20.0 golang.org/x/time v0.5.0 golang.zx2c4.com/wireguard/wgctrl v0.0.0-20230429144221-925a1e7659e6 k8s.io/api v0.28.4 @@ -65,7 +65,7 @@ require ( github.com/vishvananda/netlink v1.2.1-beta.2 github.com/vishvananda/netns v0.0.4 // indirect golang.org/x/exp v0.0.0-20231110203233-9a3e6036ecaa // indirect - golang.org/x/sync v0.6.0 // indirect + golang.org/x/sync v0.7.0 // indirect gopkg.in/k8snetworkplumbingwg/multus-cni.v4 v4.0.2 // indirect gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect ) @@ -158,13 +158,13 @@ require ( go.uber.org/zap v1.25.0 // indirect go4.org/intern v0.0.0-20211027215823-ae77deb06f29 // indirect go4.org/unsafe/assume-no-moving-gc v0.0.0-20220617031537-928513b29760 // indirect - golang.org/x/crypto v0.18.0 // indirect - golang.org/x/mod v0.14.0 // indirect - golang.org/x/net v0.20.0 // indirect + golang.org/x/crypto v0.23.0 // indirect + golang.org/x/mod v0.17.0 // indirect + golang.org/x/net v0.25.0 // indirect golang.org/x/oauth2 v0.14.0 // indirect - golang.org/x/term v0.16.0 // indirect - golang.org/x/text v0.14.0 // indirect - golang.org/x/tools v0.17.0 // indirect + golang.org/x/term v0.20.0 // indirect + golang.org/x/text v0.15.0 // indirect + golang.org/x/tools v0.21.0 // indirect golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect golang.zx2c4.com/wireguard v0.0.0-20230325221338-052af4a8072b // indirect google.golang.org/appengine v1.6.8 // indirect diff --git a/go.sum b/go.sum index 5ab6627d..6b07cb80 100644 --- a/go.sum +++ b/go.sum @@ -1115,6 +1115,8 @@ golang.org/x/crypto v0.0.0-20220411220226-7b82a4e95df4/go.mod h1:IxCIyHEi3zRg3s0 golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/crypto v0.18.0 h1:PGVlW0xEltQnzFZ55hkuX5+KLyrMYhHld1YHO4AKcdc= golang.org/x/crypto v0.18.0/go.mod h1:R0j02AL6hcrfOiy9T4ZYp/rcWeMxM3L6QYxlOuEG1mg= +golang.org/x/crypto v0.23.0 h1:dIJU/v2J8Mdglj/8rJ6UUOM3Zc9zLZxVZwwxMooUSAI= +golang.org/x/crypto v0.23.0/go.mod h1:CKFgDieR+mRhux2Lsu27y0fO304Db0wZe70UKqHu0v8= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8= @@ -1164,6 +1166,8 @@ golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91 golang.org/x/mod v0.7.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= golang.org/x/mod v0.14.0 h1:dGoOF9QVLYng8IHTm7BAyWqCqSheQ5pYWGhzW00YJr0= golang.org/x/mod v0.14.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c= +golang.org/x/mod v0.17.0 h1:zY54UmvipHiNd+pm+m0x9KhZ9hl1/7QNMyxXbc6ICqA= +golang.org/x/mod v0.17.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -1229,6 +1233,8 @@ golang.org/x/net v0.2.0/go.mod h1:KqCZLdyyvdV855qA2rE3GC2aiw5xGR5TEjj8smXukLY= golang.org/x/net v0.3.0/go.mod h1:MBQ8lrhLObU/6UmLb4fmbmk5OcyYmqtbGd/9yIeKjEE= golang.org/x/net v0.20.0 h1:aCL9BSgETF1k+blQaYUBx9hJ9LOGP3gAVemcZlf1Kpo= golang.org/x/net v0.20.0/go.mod h1:z8BVo6PvndSri0LbOE3hAn0apkU+1YvI6E70E9jsnvY= +golang.org/x/net v0.25.0 h1:d/OCCoBEUq33pjydKrGQhw7IlUPI2Oylr+8qLx49kac= +golang.org/x/net v0.25.0/go.mod h1:JkAGAh7GEvH74S6FOH42FLoXpXbE/aqXSrIQjXgsiwM= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= @@ -1268,6 +1274,8 @@ golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.6.0 h1:5BMeUDZ7vkXGfEr1x9B4bRcTH4lpkTkpdh0T/J+qjbQ= golang.org/x/sync v0.6.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M= +golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -1383,6 +1391,8 @@ golang.org/x/sys v0.2.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.3.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.16.0 h1:xWw16ngr6ZMtmxDyKyIgsE93KNKz5HKmMa3b8ALHidU= golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.20.0 h1:Od9JTbYCk261bKm4M/mw7AklTlFYIa0bIp9BgSm1S8Y= +golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= @@ -1390,6 +1400,8 @@ golang.org/x/term v0.2.0/go.mod h1:TVmDHMZPmdnySmBfhjOoOdhjzdE1h4u1VwSiw2l1Nuc= golang.org/x/term v0.3.0/go.mod h1:q750SLmJuPmVoN1blW3UFBPREJfb1KmY3vwxfr+nFDA= golang.org/x/term v0.16.0 h1:m+B6fahuftsE9qjo0VWp2FW0mB3MTJvR0BaMQrq0pmE= golang.org/x/term v0.16.0/go.mod h1:yn7UURbUtPyrVJPGPq404EukNFxcm/foM+bV/bfcDsY= +golang.org/x/term v0.20.0 h1:VnkxpohqXaOBYJtBmEppKUG6mXpi+4O6purfc2+sMhw= +golang.org/x/term v0.20.0/go.mod h1:8UkIAJTvZgivsXaD6/pH6U9ecQzZ45awqEOzuCvwpFY= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -1404,6 +1416,8 @@ golang.org/x/text v0.4.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/text v0.5.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= +golang.org/x/text v0.15.0 h1:h1V/4gjBv8v9cjcR6+AR5+/cIYK5N/WAgiv4xlsEtAk= +golang.org/x/text v0.15.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= @@ -1517,6 +1531,8 @@ golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc golang.org/x/tools v0.4.0/go.mod h1:UE5sM2OK9E/d67R0ANs2xJizIymRP5gJU295PvKXxjQ= golang.org/x/tools v0.17.0 h1:FvmRgNOcs3kOa+T20R1uhfP9F6HgG2mfxDv1vrx1Htc= golang.org/x/tools v0.17.0/go.mod h1:xsh6VxdV005rRVaS6SSAf9oiAqljS7UZUacMZ8Bnsps= +golang.org/x/tools v0.21.0 h1:qc0xYgIbsSDt9EyWz05J5wfa7LOVW0YTLOXrqdLAWIw= +golang.org/x/tools v0.21.0/go.mod h1:aiJjzUbINMkxbQROHiO6hDPo2LHcIPhhQsa9DLh0yGk= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= diff --git a/pkg/controller/agent.go b/pkg/controller/agent.go index e8f18954..bad57f59 100644 --- a/pkg/controller/agent.go +++ b/pkg/controller/agent.go @@ -2,13 +2,11 @@ package controller import ( "context" - "crypto/md5" + "crypto/sha256" "encoding/hex" "fmt" "time" - corev1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" utilruntime "k8s.io/apimachinery/pkg/util/runtime" validations "k8s.io/apimachinery/pkg/util/validation" "k8s.io/apimachinery/pkg/util/wait" @@ -16,7 +14,6 @@ import ( "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "k8s.io/klog/v2" - mcsv1a1 "sigs.k8s.io/mcs-api/pkg/apis/v1alpha1" mcsclientset "sigs.k8s.io/mcs-api/pkg/client/clientset/versioned" mcsInformers "sigs.k8s.io/mcs-api/pkg/client/informers/externalversions" @@ -54,15 +51,23 @@ func New(spec *known.Specification, syncerConf known.SyncerConfig, kubeConfig *r kubeInformerFactory := kubeinformers.NewSharedInformerFactory(kubeClientSet, known.DefaultResync) mcsInformerFactory := mcsInformers.NewSharedInformerFactory(mcsClientSet, known.DefaultResync) - serviceExportController, err := mcs.NewServiceExportController(spec.ClusterID, kubeInformerFactory.Discovery().V1().EndpointSlices(), mcsClientSet, mcsInformerFactory) + serviceExportController, err := mcs.NewServiceExportController(spec.ClusterID, + kubeInformerFactory.Discovery().V1().EndpointSlices(), mcsClientSet, mcsInformerFactory) if err != nil { return nil, err } hubK8sClient := kubernetes.NewForConfigOrDie(kubeConfig) - hubInformerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(hubK8sClient, known.DefaultResync, kubeinformers.WithNamespace(spec.ShareNamespace)) - epsController, err := mcs.NewEpsController(spec.ClusterID, syncerConf.LocalNamespace, hubInformerFactory.Discovery().V1().EndpointSlices(), + hubInformerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(hubK8sClient, known.DefaultResync, + kubeinformers.WithNamespace(spec.ShareNamespace)) + + var epsController *mcs.EpsController + epsController, err = mcs.NewEpsController(spec.ClusterID, syncerConf.LocalNamespace, + hubInformerFactory.Discovery().V1().EndpointSlices(), kubeClientSet, hubInformerFactory, serviceExportController, mcsClientSet) + if err != nil { + klog.Errorf("failed to create eps controller: %v", err) + } syncerConf.LocalNamespace = spec.LocalNamespace syncerConf.LocalClusterID = spec.ClusterID @@ -100,41 +105,41 @@ func (a *Syncer) Start(ctx context.Context) error { return nil } -func (a *Syncer) newServiceImport(name, namespace string) *mcsv1a1.ServiceImport { - return &mcsv1a1.ServiceImport{ - ObjectMeta: metav1.ObjectMeta{ - Name: a.getObjectNameWithClusterID(name, namespace), - Annotations: map[string]string{ - known.OriginName: name, - known.OriginNamespace: namespace, - }, - Labels: map[string]string{ - known.LabelSourceName: name, - known.LabelSourceNamespace: namespace, - known.LabelSourceCluster: a.ClusterID, - known.LabelOriginNameSpace: namespace, - }, - }, - } -} - -func (a *Syncer) getPortsForService(service *corev1.Service) []mcsv1a1.ServicePort { - mcsPorts := make([]mcsv1a1.ServicePort, 0, len(service.Spec.Ports)) - - for _, port := range service.Spec.Ports { - mcsPorts = append(mcsPorts, mcsv1a1.ServicePort{ - Name: port.Name, - Protocol: port.Protocol, - Port: port.Port, - }) - } - - return mcsPorts -} +// func (a *Syncer) newServiceImport(name, namespace string) *mcsv1a1.ServiceImport { +// return &mcsv1a1.ServiceImport{ +// ObjectMeta: metav1.ObjectMeta{ +// Name: a.getObjectNameWithClusterID(name, namespace), +// Annotations: map[string]string{ +// known.OriginName: name, +// known.OriginNamespace: namespace, +// }, +// Labels: map[string]string{ +// known.LabelSourceName: name, +// known.LabelSourceNamespace: namespace, +// known.LabelSourceCluster: a.ClusterID, +// known.LabelOriginNameSpace: namespace, +// }, +// }, +// } +// } + +// func (a *Syncer) getPortsForService(service *corev1.Service) []mcsv1a1.ServicePort { +// mcsPorts := make([]mcsv1a1.ServicePort, 0, len(service.Spec.Ports)) +// +// for _, port := range service.Spec.Ports { +// mcsPorts = append(mcsPorts, mcsv1a1.ServicePort{ +// Name: port.Name, +// Protocol: port.Protocol, +// Port: port.Port, +// }) +// } +// +// return mcsPorts +// } func generateSliceName(clusterName, namespace, name string) string { clusterName = fmt.Sprintf("%s%s%s", clusterName, namespace, name) - hasher := md5.New() + hasher := sha256.New() hasher.Write([]byte(clusterName)) var namespacePart, namePart string if len(namespace) > known.MaxNamespaceLength { @@ -154,6 +159,6 @@ func generateSliceName(clusterName, namespace, name string) string { return fmt.Sprintf("%s-%s-%s", namespacePart, namePart, hashPart[8:24]) } -func (a *Syncer) getObjectNameWithClusterID(name, namespace string) string { - return generateSliceName(a.ClusterID, namespace, name) -} +// func (a *Syncer) getObjectNameWithClusterID(name, namespace string) string { +// return generateSliceName(a.ClusterID, namespace, name) +// } diff --git a/pkg/controller/endpoint/endpoint.go b/pkg/controller/endpoint/endpoint.go index 63ea56de..b9050cac 100644 --- a/pkg/controller/endpoint/endpoint.go +++ b/pkg/controller/endpoint/endpoint.go @@ -85,27 +85,36 @@ func NewEndpointController(podInformer coreinformers.PodInformer, serviceInforme workerLoopPeriod: 60 * time.Second, } - serviceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + if _, err := serviceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: e.onServiceUpdate, UpdateFunc: func(old, cur interface{}) { e.onServiceUpdate(cur) }, DeleteFunc: e.onServiceDelete, - }) + }); err != nil { + klog.Errorf("failed to add event handler for service informer: %v", err) + return nil + } e.serviceLister = serviceInformer.Lister() e.servicesSynced = serviceInformer.Informer().HasSynced - podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + if _, err := podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: e.addPod, UpdateFunc: e.updatePod, DeleteFunc: e.deletePod, - }) + }); err != nil { + klog.Errorf("failed to add event handler for pod informer: %v", err) + return nil + } e.podLister = podInformer.Lister() e.podsSynced = podInformer.Informer().HasSynced - endpointsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + if _, err := endpointsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ DeleteFunc: e.onEndpointsDelete, - }) + }); err != nil { + klog.Errorf("failed to add event handler for endpoints informer: %v", err) + return nil + } e.endpointsLister = endpointsInformer.Lister() e.endpointsSynced = endpointsInformer.Informer().HasSynced @@ -280,10 +289,8 @@ func (e *Controller) updatePod(old, cur interface{}) { // obj could be an *v1.Pod, or a DeletionFinalStateUnknown marker item. func (e *Controller) deletePod(obj interface{}) { pod := endpointsliceutil.GetPodFromDeleteAction(obj) - klog.Infof("Del pod: %v", pod.Name) - if pod != nil { - e.addPod(pod) - } + klog.Infof("del pod: %v", &pod.Name) + e.addPod(pod) } // onServiceUpdate updates the Service Selector in the cache and queues the Service for processing. @@ -300,7 +307,7 @@ func (e *Controller) onServiceUpdate(obj interface{}) { func (e *Controller) onServiceDelete(obj interface{}) { key, err := controller.KeyFunc(obj) if err != nil { - utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", obj, err)) + utilruntime.HandleError(fmt.Errorf("couldn't get key for object %+v: %v", obj, err)) return } e.queue.Add(key) @@ -309,7 +316,7 @@ func (e *Controller) onServiceDelete(obj interface{}) { func (e *Controller) onEndpointsDelete(obj interface{}) { key, err := controller.KeyFunc(obj) if err != nil { - utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", obj, err)) + utilruntime.HandleError(fmt.Errorf("couldn't get key for object %+v: %v", obj, err)) return } e.queue.Add(key) @@ -543,15 +550,24 @@ func (e *Controller) syncService(ctx context.Context, key string) error { newEndpoints.Labels = utillabels.CloneAndRemoveLabel(newEndpoints.Labels, v1.IsHeadlessService) } - logger.V(4).Info("Update endpoints", "service", klog.KObj(service), + logger.V(4).Info("update endpoints", "service", klog.KObj(service), "readyEndpoints", totalReadyEps, "notreadyEndpoints", totalNotReadyEps) if createEndpoints { // No previous endpoints, create them _, err = e.client.CoreV1().Endpoints(service.Namespace).Create(ctx, newEndpoints, metav1.CreateOptions{}) + if err != nil { + klog.Errorf("create failed: %v", err) + } _, err = e.client.CoreV1().Endpoints(service.Namespace).Update(ctx, newEndpoints, metav1.UpdateOptions{}) + if err != nil { + klog.Errorf("update failed: %v", err) + } } else { // Pre-existing _, err = e.client.CoreV1().Endpoints(service.Namespace).Update(ctx, newEndpoints, metav1.UpdateOptions{}) + if err != nil { + klog.Errorf("update failed: %v", err) + } } if err != nil { if createEndpoints && errors.IsForbidden(err) { @@ -605,7 +621,7 @@ func (e *Controller) checkLeftoverEndpoints() { } key, err := controller.KeyFunc(ep) if err != nil { - utilruntime.HandleError(fmt.Errorf("Unable to get key for endpoint %#v", ep)) + utilruntime.HandleError(fmt.Errorf("unable to get key for endpoint %#v", ep)) continue } e.queue.Add(key) @@ -681,14 +697,14 @@ func truncateEndpoints(endpoints *v1.Endpoints) bool { } truncateReady := false - max := maxCapacity - totalReady + m := maxCapacity - totalReady numTotal := totalNotReady if totalReady > maxCapacity { truncateReady = true - max = maxCapacity + m = maxCapacity numTotal = totalReady } - canBeAdded := max + canBeAdded := m for i := range endpoints.Subsets { subset := endpoints.Subsets[i] @@ -701,7 +717,7 @@ func truncateEndpoints(endpoints *v1.Endpoints) bool { // in this subset versus the total number of endpoints. The proportion of endpoints // will be rounded up which most likely will lead to the last subset having less // endpoints than the expected proportion. - toBeAdded := int(math.Ceil((float64(numInSubset) / float64(numTotal)) * float64(max))) + toBeAdded := int(math.Ceil((float64(numInSubset) / float64(numTotal)) * float64(m))) // If there is not enough endpoints for the last subset, ensure only the number up // to the capacity are added if toBeAdded > canBeAdded { diff --git a/pkg/controller/endpointslice/endpointslice.go b/pkg/controller/endpointslice/endpointslice.go index 96baa130..ebb087a6 100644 --- a/pkg/controller/endpointslice/endpointslice.go +++ b/pkg/controller/endpointslice/endpointslice.go @@ -108,21 +108,25 @@ func NewController(ctx context.Context, podInformer coreinformers.PodInformer, workerLoopPeriod: time.Second, } - serviceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + if _, err := serviceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: c.onServiceUpdate, UpdateFunc: func(old, cur interface{}) { c.onServiceUpdate(cur) }, DeleteFunc: c.onServiceDelete, - }) + }); err != nil { + return nil + } c.serviceLister = serviceInformer.Lister() c.servicesSynced = serviceInformer.Informer().HasSynced - podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + if _, err := podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: c.addPod, UpdateFunc: c.updatePod, DeleteFunc: c.deletePod, - }) + }); err != nil { + return nil + } c.podLister = podInformer.Lister() c.podsSynced = podInformer.Informer().HasSynced @@ -130,13 +134,15 @@ func NewController(ctx context.Context, podInformer coreinformers.PodInformer, c.nodesSynced = nodeInformer.Informer().HasSynced logger := klog.FromContext(ctx) - endpointSliceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + if _, err := endpointSliceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: c.onEndpointSliceAdd, UpdateFunc: func(oldObj, newObj interface{}) { c.onEndpointSliceUpdate(logger, oldObj, newObj) }, DeleteFunc: c.onEndpointSliceDelete, - }) + }); err != nil { + return nil + } c.endpointSliceLister = endpointSliceInformer.Lister() c.endpointSlicesSynced = endpointSliceInformer.Informer().HasSynced @@ -152,7 +158,7 @@ func NewController(ctx context.Context, podInformer coreinformers.PodInformer, c.endpointUpdatesBatchPeriod = endpointUpdatesBatchPeriod if utilfeature.DefaultFeatureGate.Enabled(features.TopologyAwareHints) { - nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + if _, err := nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { c.addNode(logger, obj) }, @@ -162,7 +168,9 @@ func NewController(ctx context.Context, podInformer coreinformers.PodInformer, DeleteFunc: func(obj interface{}) { c.deleteNode(logger, obj) }, - }) + }); err != nil { + return nil + } c.topologyCache = topologycache.NewTopologyCache() } @@ -407,7 +415,7 @@ func (c *Controller) syncService(logger klog.Logger, key string) error { func (c *Controller) onServiceUpdate(obj interface{}) { key, err := controller.KeyFunc(obj) if err != nil { - utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", obj, err)) + utilruntime.HandleError(fmt.Errorf("couldn't get key for object %+v: %v", obj, err)) return } @@ -418,7 +426,7 @@ func (c *Controller) onServiceUpdate(obj interface{}) { func (c *Controller) onServiceDelete(obj interface{}) { key, err := controller.KeyFunc(obj) if err != nil { - utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", obj, err)) + utilruntime.HandleError(fmt.Errorf("couldn't get key for object %+v: %v", obj, err)) return } @@ -431,7 +439,7 @@ func (c *Controller) onServiceDelete(obj interface{}) { func (c *Controller) onEndpointSliceAdd(obj interface{}) { endpointSlice := obj.(*discovery.EndpointSlice) if endpointSlice == nil { - utilruntime.HandleError(fmt.Errorf("Invalid EndpointSlice provided to onEndpointSliceAdd()")) + utilruntime.HandleError(fmt.Errorf("invalid EndpointSlice provided to onEndpointSliceAdd()")) return } if c.reconciler.ManagedByController(endpointSlice) && c.endpointSliceTracker.ShouldSync(endpointSlice) { @@ -447,7 +455,7 @@ func (c *Controller) onEndpointSliceUpdate(logger klog.Logger, prevObj, obj inte prevEndpointSlice := prevObj.(*discovery.EndpointSlice) endpointSlice := obj.(*discovery.EndpointSlice) if endpointSlice == nil || prevEndpointSlice == nil { - utilruntime.HandleError(fmt.Errorf("Invalid EndpointSlice provided to onEndpointSliceUpdate()")) + utilruntime.HandleError(fmt.Errorf("invalid EndpointSlice provided to onEndpointSliceUpdate()")) return } // EndpointSlice generation does not change when labels change. Although the @@ -488,7 +496,7 @@ func (c *Controller) onEndpointSliceDelete(obj interface{}) { func (c *Controller) queueServiceForEndpointSlice(endpointSlice *discovery.EndpointSlice) { key, err := endpointslicerec.ServiceControllerKey(endpointSlice) if err != nil { - utilruntime.HandleError(fmt.Errorf("Couldn't get key for EndpointSlice %+v: %v", endpointSlice, err)) + utilruntime.HandleError(fmt.Errorf("couldn't get key for endpointSlice %v: %v", endpointSlice, err)) return } @@ -505,7 +513,7 @@ func (c *Controller) addPod(obj interface{}) { pod := obj.(*v1.Pod) services, err := endpointsliceutil.GetPodServiceMemberships(c.serviceLister, pod) if err != nil { - utilruntime.HandleError(fmt.Errorf("Unable to get pod %s/%s's service memberships: %v", pod.Namespace, pod.Name, err)) + utilruntime.HandleError(fmt.Errorf("unable to get pod %s/%s's service memberships: %v", pod.Namespace, pod.Name, err)) return } for key := range services { @@ -602,12 +610,12 @@ func getEndpointSliceFromDeleteAction(obj interface{}) *discovery.EndpointSlice // If we reached here it means the pod was deleted but its final state is unrecorded. tombstone, ok := obj.(cache.DeletedFinalStateUnknown) if !ok { - utilruntime.HandleError(fmt.Errorf("Couldn't get object from tombstone %#v", obj)) + utilruntime.HandleError(fmt.Errorf("couldn't get object from tombstone %#v", obj)) return nil } endpointSlice, ok := tombstone.Obj.(*discovery.EndpointSlice) if !ok { - utilruntime.HandleError(fmt.Errorf("Tombstone contained object that is not a EndpointSlice: %#v", obj)) + utilruntime.HandleError(fmt.Errorf("tombstone contained object that is not a EndpointSlice: %#v", obj)) return nil } return endpointSlice diff --git a/pkg/controller/peer_controller.go b/pkg/controller/peer_controller.go index 03e444f3..0f461baf 100644 --- a/pkg/controller/peer_controller.go +++ b/pkg/controller/peer_controller.go @@ -31,11 +31,11 @@ type PeerController struct { // specific namespace. peerLister v1alpha1.PeerLister octopusFactory octopusinformers.SharedInformerFactory - tunnel *wireguard + tunnel *Wireguard spec *known.Specification } -func NewPeerController(spec known.Specification, w *wireguard, +func NewPeerController(spec known.Specification, w *Wireguard, octopusFactory octopusinformers.SharedInformerFactory) (*PeerController, error) { peerController := &PeerController{ peerLister: octopusFactory.Octopus().V1alpha1().Peers().Lister(), @@ -55,7 +55,7 @@ func NewPeerController(spec known.Specification, w *wireguard, } else { tempObj = oldObj } - //klog.Infof("we got a peer connection %v", tempObj) + // klog.Infof("we got a peer connection %v", tempObj) if tempObj != nil { newPeer := tempObj.(*v1alpha1app.Peer) // hub connect with nohub, nohub connect with hub. @@ -78,7 +78,7 @@ func NewPeerController(spec known.Specification, w *wireguard, return peerController, nil } -func (c *PeerController) Handle(obj interface{}) (requeueAfter *time.Duration, err error) { +func (p *PeerController) Handle(obj interface{}) (requeueAfter *time.Duration, err error) { failedPeriod := 2 * time.Second key := obj.(string) namespace, peerName, err := cache.SplitMetaNamespaceKey(key) @@ -89,7 +89,7 @@ func (c *PeerController) Handle(obj interface{}) (requeueAfter *time.Duration, e noCIDR := false hubNotExist := false - cachedPeer, err := c.peerLister.Peers(namespace).Get(peerName) + cachedPeer, err := p.peerLister.Peers(namespace).Get(peerName) if err != nil { if errors.IsNotFound(err) { utilruntime.HandleError(fmt.Errorf("peer '%s' in hub work queue no longer exists,"+ @@ -109,7 +109,7 @@ func (c *PeerController) Handle(obj interface{}) (requeueAfter *time.Duration, e klog.Infof("can't find key for %s with key %s", peerName, cachedPeer.Spec.PublicKey) return &failedPeriod, err } - if c.tunnel.RemovePeer(&oldKey) != nil { + if p.tunnel.RemovePeer(&oldKey) != nil { return &failedPeriod, err } if errRemoveRoute := configHostRoutingRules(cachedPeer.Spec.PodCIDR, known.Delete); errRemoveRoute != nil { @@ -120,7 +120,7 @@ func (c *PeerController) Handle(obj interface{}) (requeueAfter *time.Duration, e return nil, nil } - if !c.spec.IsHub { + if !p.spec.IsHub { // just cluster, only wait if the coming peer has no cidr. if len(cachedPeer.Spec.PodCIDR) == 0 || len(cachedPeer.Spec.PodCIDR[0]) == 0 { return &failedPeriod, errors.NewServiceUnavailable("cidr is not allocated.") @@ -130,7 +130,7 @@ func (c *PeerController) Handle(obj interface{}) (requeueAfter *time.Duration, e // prepare data... existingCIDR := make([]string, 0) noCIDR = true - if peerList, errListPeer := c.peerLister.Peers(namespace).List(labels.Everything()); errListPeer != nil { + if peerList, errListPeer := p.peerLister.Peers(namespace).List(labels.Everything()); errListPeer != nil { for _, item := range peerList { if item.Name != "hub" && len(item.Spec.PodCIDR) != 0 { existingCIDR = append(existingCIDR, item.Spec.PodCIDR[0]) @@ -139,7 +139,7 @@ func (c *PeerController) Handle(obj interface{}) (requeueAfter *time.Duration, e } // cidr allocation here. cachedPeer.Spec.PodCIDR = make([]string, 1) - cachedPeer.Spec.PodCIDR[0], err = util.FindAvailableCIDR(c.spec.CIDR[0], existingCIDR) + cachedPeer.Spec.PodCIDR[0], err = util.FindAvailableCIDR(p.spec.CIDR[0], existingCIDR) if err != nil { klog.Infof("allocate peer cidr failed %v", err) return &failedPeriod, err @@ -147,7 +147,7 @@ func (c *PeerController) Handle(obj interface{}) (requeueAfter *time.Duration, e } } - if errAddPeer := c.tunnel.AddPeer(cachedPeer); errAddPeer != nil { + if errAddPeer := p.tunnel.AddPeer(cachedPeer); errAddPeer != nil { klog.Infof("add peer failed %v", cachedPeer) return &failedPeriod, errAddPeer } @@ -161,7 +161,7 @@ func (c *PeerController) Handle(obj interface{}) (requeueAfter *time.Duration, e // 需要回写peer if noCIDR { - _, err = c.tunnel.octopusClient.OctopusV1alpha1().Peers(namespace).Update(context.TODO(), + _, err = p.tunnel.octopusClient.OctopusV1alpha1().Peers(namespace).Update(context.TODO(), cachedPeer, metav1.UpdateOptions{}) if err != nil { return &failedPeriod, err @@ -170,9 +170,9 @@ func (c *PeerController) Handle(obj interface{}) (requeueAfter *time.Duration, e return nil, nil } -func (c *PeerController) Run(ctx context.Context) error { - c.octopusFactory.Start(ctx.Done()) - c.yachtController.Run(ctx) +func (p *PeerController) Run(ctx context.Context) error { + p.octopusFactory.Start(ctx.Done()) + p.yachtController.Run(ctx) return nil } @@ -188,7 +188,7 @@ func (p *PeerController) Start(ctx context.Context) { <-ctx.Done() } -func configHostRoutingRules(CIDRs []string, operation known.RouteOperation) error { +func configHostRoutingRules(cidrs []string, operation known.RouteOperation) error { var ifaceIndex int if wg, err := net.InterfaceByName(DefaultDeviceName); err == nil { ifaceIndex = wg.Index @@ -197,7 +197,7 @@ func configHostRoutingRules(CIDRs []string, operation known.RouteOperation) erro return err } - for _, cidr := range CIDRs { + for _, cidr := range cidrs { _, dst, err := net.ParseCIDR(cidr) if err != nil { klog.Errorf("Can't parse cidr %s as route dst", cidr) diff --git a/pkg/controller/tunnel.go b/pkg/controller/tunnel.go index 41f3c7c5..eac891dc 100644 --- a/pkg/controller/tunnel.go +++ b/pkg/controller/tunnel.go @@ -9,7 +9,7 @@ import ( "golang.zx2c4.com/wireguard/wgctrl" "golang.zx2c4.com/wireguard/wgctrl/wgtypes" "k8s.io/klog/v2" - "k8s.io/utils/pointer" + "k8s.io/utils/ptr" "github.com/nauti-io/nauti/pkg/apis/octopus.io/v1alpha1" "github.com/nauti-io/nauti/pkg/generated/clientset/versioned" @@ -25,7 +25,7 @@ type managedKeys struct { publicKey wgtypes.Key } -type wireguard struct { +type Wireguard struct { connections map[string]*v1alpha1.Peer // clusterID -> remote ep connection mutex sync.Mutex link netlink.Link // your link @@ -36,10 +36,11 @@ type wireguard struct { octopusClient *versioned.Clientset } -func NewTunnel(octopusClient *versioned.Clientset, spec *known.Specification, done <-chan struct{}) (*wireguard, error) { +func NewTunnel(octopusClient *versioned.Clientset, spec *known.Specification, done <-chan struct{}, +) (*Wireguard, error) { var err error - w := &wireguard{ + w := &Wireguard{ connections: make(map[string]*v1alpha1.Peer), StopCh: done, octopusClient: octopusClient, @@ -63,8 +64,8 @@ func NewTunnel(octopusClient *versioned.Clientset, spec *known.Specification, do defer func() { if err != nil { if e := w.client.Close(); e != nil { + klog.Errorf("failed to close wgctrl client: %v", e) } - w.client = nil } }() @@ -77,7 +78,7 @@ func NewTunnel(octopusClient *versioned.Clientset, spec *known.Specification, do peerConfigs := make([]wgtypes.PeerConfig, 0) cfg := wgtypes.Config{ PrivateKey: &w.keys.privateKey, - ListenPort: pointer.Int(UDPPort), + ListenPort: ptr.To(UDPPort), FirewallMark: nil, ReplacePeers: true, Peers: peerConfigs, @@ -90,7 +91,7 @@ func NewTunnel(octopusClient *versioned.Clientset, spec *known.Specification, do return w, err } -func (w *wireguard) Init() error { +func (w *Wireguard) Init() error { w.mutex.Lock() defer w.mutex.Unlock() @@ -130,8 +131,8 @@ func (w *wireguard) Init() error { return utils.ApplyPeerWithRetry(w.octopusClient, peer) } -func (w *wireguard) Cleanup() error { - //return utils.DeletePeerWithRetry(w.octopusClient, w.spec.ClusterID, w.spec.ShareNamespace) +func (w *Wireguard) Cleanup() error { + // return utils.DeletePeerWithRetry(w.octopusClient, w.spec.ClusterID, w.spec.ShareNamespace) // it pretty hard to handle the case, when we update the deployment of the cnf pod, as to roll-update mechanism. return nil } diff --git a/pkg/controller/wireguard_tools.go b/pkg/controller/wireguard_tools.go index 2956669a..022b4add 100644 --- a/pkg/controller/wireguard_tools.go +++ b/pkg/controller/wireguard_tools.go @@ -20,7 +20,7 @@ const ( ) // Create new wg link and assign addr from local subnets. -func (w *wireguard) setWGLink() error { +func (w *Wireguard) setWGLink() error { // delete existing wg device if needed if link, err := netlink.LinkByName(DefaultDeviceName); err == nil { // delete existing device @@ -46,7 +46,7 @@ func (w *wireguard) setWGLink() error { return nil } -func (w *wireguard) RemovePeer(key *wgtypes.Key) error { +func (w *Wireguard) RemovePeer(key *wgtypes.Key) error { klog.Infof("Removing WireGuard peer with key %s", key) peerCfg := []wgtypes.PeerConfig{ @@ -68,7 +68,7 @@ func (w *wireguard) RemovePeer(key *wgtypes.Key) error { return nil } -func (w *wireguard) AddPeer(peer *v1alpha1.Peer) error { +func (w *Wireguard) AddPeer(peer *v1alpha1.Peer) error { var endpoint *net.UDPAddr if w.spec.ClusterID == peer.Spec.ClusterID { klog.Infof("Will not connect to self") @@ -104,7 +104,7 @@ func (w *wireguard) AddPeer(peer *v1alpha1.Peer) error { // Delete or update old peers for ClusterID. oldCon, found := w.connections[peer.Spec.ClusterID] if found { - if oldKey, err := wgtypes.ParseKey(oldCon.Spec.PublicKey); err == nil { + if oldKey, e := wgtypes.ParseKey(oldCon.Spec.PublicKey); e == nil { // because every time we change the public key. if oldKey.String() == remoteKey.String() { // Existing connection, update status and skip. @@ -147,7 +147,7 @@ func (w *wireguard) AddPeer(peer *v1alpha1.Peer) error { return nil } -func (w *wireguard) setKeyPair() error { +func (w *Wireguard) setKeyPair() error { var err error // Generate local keys and set public key in BackendConfig. var psk, priKey, pubKey wgtypes.Key diff --git a/pkg/dedinic/cni_handler.go b/pkg/dedinic/cni_handler.go index 03cd50b5..1102e89e 100644 --- a/pkg/dedinic/cni_handler.go +++ b/pkg/dedinic/cni_handler.go @@ -21,7 +21,7 @@ import ( ) type cniHandler struct { - //Config *Configuration + // Config *Configuration KubeClient kubernetes.Interface Controller *Controller } @@ -35,13 +35,11 @@ func createCniHandler(config *Configuration, controller *Controller) *cniHandler } func (ch cniHandler) handleAdd(podRequest *request.CniRequest) error { - klog.Infof("add port request: %v", podRequest) - var gatewayCheckMode int var ( - macAddr, ip, ipAddr, cidr, gw, subnet, ingress, egress, ifName, nicType string - podNicName, latency, limit, loss, jitter, u2oInterconnectionIP, oldPodName string + macAddr, ip, ipAddr, cidr, gw, subnet, ingress, egress, ifName, nicType string + podNicName, latency, limit, loss, jitter, u2oInterconnectionIP string ) var routes []request.Route var isDefaultRoute bool @@ -53,13 +51,14 @@ func (ch cniHandler) handleAdd(podRequest *request.CniRequest) error { klog.Error(errMsg) return errMsg } - if pod.Annotations[fmt.Sprintf(known.AllocatedAnnotationTemplate, podRequest.Provider)] != "true" { - klog.Infof("wait address for pod %s/%s provider %s", podRequest.PodNamespace, podRequest.PodName, podRequest.Provider) + if pod.Annotations[fmt.Sprintf(known.AllocatedAnnotationTemplate, podRequest.Provider)] != known.NautiTrue { + klog.Infof("wait address for pod %s/%s provider %s", + podRequest.PodNamespace, podRequest.PodName, podRequest.Provider) time.Sleep(1 * time.Second) continue } - if err := util.ValidatePodNetwork(pod.Annotations); err != nil { + if err = util.ValidatePodNetwork(pod.Annotations); err != nil { klog.Errorf("validate pod %s/%s failed, %v", podRequest.PodNamespace, podRequest.PodName, err) // wait controller assign an address time.Sleep(1 * time.Second) @@ -71,7 +70,7 @@ func (ch cniHandler) handleAdd(podRequest *request.CniRequest) error { subnet = pod.Annotations[fmt.Sprintf(known.LogicalSwitchAnnotationTemplate, podRequest.Provider)] ipAddr = util.GetIPAddrWithMask(ip, cidr) - oldPodName = podRequest.PodName + // oldPodName = podRequest.PodName if s := pod.Annotations[fmt.Sprintf(known.RoutesAnnotationTemplate, podRequest.Provider)]; s != "" { if err = json.Unmarshal([]byte(s), &routes); err != nil { errMsg := fmt.Errorf("invalid routes for pod %s/%s: %v", pod.Namespace, pod.Name, err) @@ -95,7 +94,8 @@ func (ch cniHandler) handleAdd(podRequest *request.CniRequest) error { } if pod.Annotations[fmt.Sprintf(known.AllocatedAnnotationTemplate, podRequest.Provider)] != "true" { - err := fmt.Errorf("no address allocated to pod %s/%s provider %s, please see kube-ovn-controller logs to find errors", pod.Namespace, pod.Name, podRequest.Provider) + err = fmt.Errorf("no address allocated to pod %s/%s provider %s, "+ + "please see kube-ovn-controller logs to find errors", pod.Namespace, pod.Name, podRequest.Provider) klog.Error(err) return err } @@ -103,13 +103,17 @@ func (ch cniHandler) handleAdd(podRequest *request.CniRequest) error { if strings.HasSuffix(podRequest.Provider, known.NautiPrefix) && subnet != "" { detectIPConflict := false var mtu int - //mtu = ch.Config.MTU + // mtu = ch.Config.MTU routes = append(podRequest.Routes, routes...) macAddr = pod.Annotations[fmt.Sprintf(known.MacAddressAnnotationTemplate, podRequest.Provider)] - klog.Infof("create container interface %s mac %s, ip %s, cidr %s, gw %s, custom routes %v", ifName, macAddr, ipAddr, cidr, gw, routes) + klog.Infof("create container interface %s mac %s, ip %s, cidr %s, gw %s, custom routes %v", + ifName, macAddr, ipAddr, cidr, gw, routes) podNicName = ifName - err = ch.configureNic(podRequest.PodName, podRequest.PodNamespace, "ovn", podRequest.NetNs, podRequest.ContainerID, podRequest.VfDriver, ifName, macAddr, mtu, ipAddr, gw, isDefaultRoute, detectIPConflict, routes, podRequest.DNS.Nameservers, podRequest.DNS.Search, ingress, egress, podRequest.DeviceID, nicType, latency, limit, loss, jitter, gatewayCheckMode, u2oInterconnectionIP, oldPodName) + err = ch.configureNic(podRequest.PodName, podRequest.PodNamespace, "ovn", podRequest.NetNs, podRequest.ContainerID, + ifName, macAddr, mtu, ipAddr, gw, isDefaultRoute, detectIPConflict, routes, podRequest.DNS.Nameservers, + podRequest.DNS.Search, ingress, egress, nicType, latency, limit, loss, jitter, + gatewayCheckMode, u2oInterconnectionIP) if err != nil { errMsg := fmt.Errorf("configure nic failed %v", err) klog.Error(errMsg) @@ -131,12 +135,15 @@ func (ch cniHandler) handleAdd(podRequest *request.CniRequest) error { } func (ch cniHandler) configureNic(podName, podNamespace, provider, netns, containerID, - vfDriver, ifName, mac string, mtu int, ip, gateway string, isDefaultRoute, + ifName, mac string, mtu int, ip, gateway string, isDefaultRoute, detectIPConflict bool, routes []request.Route, _, _ []string, ingress, egress, - deviceID, nicType, latency, limit, loss, jitter string, gwCheckMode int, u2oInterconnectionIP, oldPodName string) error { + nicType, latency, limit, loss, jitter string, gwCheckMode int, u2oInterconnectionIP string) error { var err error var hostNicName, containerNicName string + klog.V(5).Infof("configure nic for pod %s/%s, ip %s, gateway %s, routes %v, mac %s, mtu %d, nicType %s, "+ + "gwCheckMode %d, u2oInterconnectionIP %s", + podNamespace, podName, ip, gateway, routes, mac, mtu, nicType, gwCheckMode, u2oInterconnectionIP) hostNicName, containerNicName, err = setupVethPair(containerID, ifName, mtu) if err != nil { klog.Errorf("failed to create veth pair %v", err) @@ -193,7 +200,8 @@ func (ch cniHandler) configureNic(podName, podNamespace, provider, netns, contai if err != nil { return fmt.Errorf("failed to open netns %q: %v", netns, err) } - return configureContainerNic(containerNicName, ifName, ip, gateway, isDefaultRoute, detectIPConflict, routes, macAddr, podNS, mtu, nicType, gwCheckMode, u2oInterconnectionIP) + return configureContainerNic(containerNicName, ifName, ip, gateway, isDefaultRoute, detectIPConflict, routes, + macAddr, podNS, mtu, nicType, gwCheckMode, u2oInterconnectionIP) } func configureHostNic(nicName string) error { @@ -215,7 +223,6 @@ func configureHostNic(nicName string) error { } func (ch cniHandler) handleDel(podRequest *request.CniRequest) error { - pod, err := ch.Controller.podsLister.Pods(podRequest.PodNamespace).Get(podRequest.PodName) if err != nil { if k8serrors.IsNotFound(err) { @@ -228,14 +235,9 @@ func (ch cniHandler) handleDel(podRequest *request.CniRequest) error { } klog.Infof("del port request: %v", podRequest) - if pod.Annotations != nil && (podRequest.Provider == known.NautiPrefix || podRequest.CniType == CniTypeName) { - - var nicType string - - nicType = pod.Annotations[fmt.Sprintf(known.PodNicAnnotationTemplate, podRequest.Provider)] - - err = ch.deleteNic(podRequest.PodName, podRequest.PodNamespace, podRequest.ContainerID, podRequest.NetNs, podRequest.DeviceID, podRequest.IfName, nicType, podRequest.Provider) + nicType := pod.Annotations[fmt.Sprintf(known.PodNicAnnotationTemplate, podRequest.Provider)] + err = ch.deleteNic(podRequest.PodName, podRequest.PodNamespace, podRequest.ContainerID, podRequest.IfName, nicType) if err != nil { errMsg := fmt.Errorf("del nic failed %v", err) klog.Error(errMsg) @@ -246,7 +248,7 @@ func (ch cniHandler) handleDel(podRequest *request.CniRequest) error { return err } -func (ch cniHandler) deleteNic(podName, podNamespace, containerID, netns, deviceID, ifName, nicType, provider string) error { +func (ch cniHandler) deleteNic(podName, podNamespace, containerID, ifName, nicType string) error { var nicName string hostNicName, containerNicName := generateNicName(containerID, ifName) diff --git a/pkg/dedinic/config.go b/pkg/dedinic/config.go index b7d5cff6..86487c9a 100644 --- a/pkg/dedinic/config.go +++ b/pkg/dedinic/config.go @@ -3,6 +3,10 @@ package dedinic import ( "context" "fmt" + "net" + "os" + "strings" + "github.com/kubeovn/kube-ovn/pkg/util" "github.com/vishvananda/netlink" corev1 "k8s.io/api/core/v1" @@ -10,10 +14,7 @@ import ( "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "k8s.io/klog/v2" - "net" - "os" "sigs.k8s.io/controller-runtime/pkg/manager/signals" - "strings" ) var ( @@ -43,7 +44,6 @@ type Configuration struct { var Conf *Configuration func InitConfig() error { - Conf = &Configuration{ tunnelIface: "", Iface: "", diff --git a/pkg/dedinic/controller.go b/pkg/dedinic/controller.go index 7a1e1f9c..6e369fec 100644 --- a/pkg/dedinic/controller.go +++ b/pkg/dedinic/controller.go @@ -1,8 +1,9 @@ package dedinic import ( - "fmt" - "github.com/kubeovn/kube-ovn/pkg/ovs" + "os/exec" + "time" + "github.com/kubeovn/kube-ovn/pkg/util" v1 "k8s.io/api/core/v1" utilruntime "k8s.io/apimachinery/pkg/util/runtime" @@ -16,8 +17,6 @@ import ( "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" k8sexec "k8s.io/utils/exec" - "os/exec" - "time" ) // Controller watch pod and namespace changes to update iptables, ipset and ovs qos @@ -37,7 +36,8 @@ type Controller struct { } // NewController init a daemon controller -func NewController(config *Configuration, stopCh <-chan struct{}, podInformerFactory, nodeInformerFactory informers.SharedInformerFactory) (*Controller, error) { +func NewController(config *Configuration, stopCh <-chan struct{}, podInformerFactory, + nodeInformerFactory informers.SharedInformerFactory) (*Controller, error) { eventBroadcaster := record.NewBroadcaster() eventBroadcaster.StartLogging(klog.Infof) eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: config.KubeClient.CoreV1().Events("")}) @@ -79,7 +79,6 @@ func (c *Controller) loopEncapIPCheck() { return } klog.V(5).Infof("encapip check node: %s", node.Annotations) - } // Run starts controller @@ -87,35 +86,12 @@ func (c *Controller) Run(stopCh <-chan struct{}) { defer utilruntime.HandleCrash() defer c.podQueue.ShutDown() - //go wait.Until(ovs.CleanLostInterface, time.Minute, stopCh) + // go wait.Until(ovs.CleanLostInterface, time.Minute, stopCh) go wait.Until(recompute, 10*time.Minute, stopCh) go wait.Until(rotateLog, 1*time.Hour, stopCh) - //go wait.Until(c.operateMod, 10*time.Second, stopCh) - - //if err := c.setIPSet(); err != nil { - // util.LogFatalAndExit(err, "failed to set ipsets") - //} klog.Info("Started workers") - //go wait.Until(c.loopOvn0Check, 5*time.Second, stopCh) - //go wait.Until(c.loopOvnExt0Check, 5*time.Second, stopCh) - //go wait.Until(c.runAddOrUpdateProviderNetworkWorker, time.Second, stopCh) - //go wait.Until(c.runDeleteProviderNetworkWorker, time.Second, stopCh) - //go wait.Until(c.runSubnetWorker, time.Second, stopCh) - //go wait.Until(c.runPodWorker, time.Second, stopCh) - //go wait.Until(c.runGateway, 3*time.Second, stopCh) go wait.Until(c.loopEncapIPCheck, 3*time.Second, stopCh) - //go wait.Until(c.ovnMetricsUpdate, 3*time.Second, stopCh) - //go wait.Until(func() { - // if err := c.reconcileRouters(nil); err != nil { - // klog.Errorf("failed to reconcile ovn0 routes: %v", err) - // } - //}, 3*time.Second, stopCh) - //go wait.Until(func() { - // if err := c.markAndCleanInternalPort(); err != nil { - // klog.Errorf("gc ovs port error: %v", err) - // } - //}, 5*time.Minute, stopCh) <-stopCh klog.Info("Shutting down workers") @@ -142,29 +118,29 @@ func rotateLog() { } } -var lastNoPodOvsPort map[string]bool - -func (c *Controller) markAndCleanInternalPort() error { - klog.V(4).Infof("start to gc ovs internal ports") - residualPorts := ovs.GetResidualInternalPorts() - if len(residualPorts) == 0 { - return nil - } - - noPodOvsPort := map[string]bool{} - for _, portName := range residualPorts { - if !lastNoPodOvsPort[portName] { - noPodOvsPort[portName] = true - } else { - klog.Infof("gc ovs internal port %s", portName) - // Remove ovs port - output, err := ovs.Exec(ovs.IfExists, "--with-iface", "del-port", "br-int", portName) - if err != nil { - return fmt.Errorf("failed to delete ovs port %v, %q", err, output) - } - } - } - lastNoPodOvsPort = noPodOvsPort - - return nil -} +// var lastNoPodOvsPort map[string]bool + +// func (c *Controller) markAndCleanInternalPort() error { +// klog.V(4).Infof("start to gc ovs internal ports") +// residualPorts := ovs.GetResidualInternalPorts() +// if len(residualPorts) == 0 { +// return nil +// } +// +// noPodOvsPort := map[string]bool{} +// for _, portName := range residualPorts { +// if !lastNoPodOvsPort[portName] { +// noPodOvsPort[portName] = true +// } else { +// klog.Infof("gc ovs internal port %s", portName) +// // Remove ovs port +// output, err := ovs.Exec(ovs.IfExists, "--with-iface", "del-port", "br-int", portName) +// if err != nil { +// return fmt.Errorf("failed to delete ovs port %v, %q", err, output) +// } +// } +// } +// lastNoPodOvsPort = noPodOvsPort +// +// return nil +// } diff --git a/pkg/dedinic/delayqueue.go b/pkg/dedinic/delayqueue.go index 200409de..505bb0dd 100644 --- a/pkg/dedinic/delayqueue.go +++ b/pkg/dedinic/delayqueue.go @@ -2,10 +2,11 @@ package dedinic import ( "context" + "time" + "github.com/cfanbo/delayqueue" "github.com/kubeovn/kube-ovn/pkg/request" "k8s.io/klog/v2" - "time" ) var DelayQueue *delayqueue.Queue diff --git a/pkg/dedinic/kubelet.go b/pkg/dedinic/kubelet.go index 52508079..0c58d19b 100644 --- a/pkg/dedinic/kubelet.go +++ b/pkg/dedinic/kubelet.go @@ -1,6 +1,7 @@ package dedinic import ( + "context" "crypto/tls" "encoding/json" "fmt" @@ -16,7 +17,7 @@ import ( "k8s.io/klog/v2" ) -var defaultAPIAuthTokenFile = "/var/run/secrets/kubernetes.io/serviceaccount/token" +var defaultAPIAuthTokenFile = "/var/run/secrets/kubernetes.io/serviceaccount/token" // #nosec G101 type KubeletStub interface { GetAllPods() (corev1.PodList, error) @@ -39,7 +40,7 @@ func (k kubeletStub) GetAllPods() (corev1.PodList, error) { podList := corev1.PodList{} var bearer = "Bearer " + k.token - req, err := http.NewRequest("GET", urlStr.String(), nil) + req, err := http.NewRequestWithContext(context.TODO(), "GET", urlStr.String(), nil) if err != nil { klog.Errorf("Construct http request failed, %v", err) } @@ -80,7 +81,7 @@ func NewKubeletStub(addr string, port int, scheme string, timeout time.Duration) client := &http.Client{ Timeout: timeout, Transport: &http.Transport{ - TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, + TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, // #nosec G402 }, } diff --git a/pkg/dedinic/nri.go b/pkg/dedinic/nri.go index e3e34f46..8d36e11a 100644 --- a/pkg/dedinic/nri.go +++ b/pkg/dedinic/nri.go @@ -76,7 +76,7 @@ func (p *CNIPlugin) Synchronize(pods []*api.PodSandbox, containers []*api.Contai } func (p *CNIPlugin) Shutdown() { - //dump("Shutdown") + // dump("Shutdown") } func (p *CNIPlugin) RunPodSandbox(pod *api.PodSandbox) (err error) { diff --git a/pkg/dedinic/oob.go b/pkg/dedinic/oob.go index 1ab18ee9..f85cbc40 100644 --- a/pkg/dedinic/oob.go +++ b/pkg/dedinic/oob.go @@ -44,14 +44,14 @@ const ( ContainerAdded = 0 ContainerDeleted = 1 - ContainerTaskIdDone = 2 + ContainerTaskIDDone = 2 ) type OobImpl struct { cgroupRootPath string podWatcher *inotify.Watcher containerWatcher *inotify.Watcher - taskIdWatcher *inotify.Watcher + taskIDWatcher *inotify.Watcher podEvents chan *PodEvent containerEvents chan *ContainerEvent kubeletStub KubeletStub @@ -74,16 +74,16 @@ func InitOOb() { type PodEvent struct { eventType int - podId string + podID string cgroupPath string } type ContainerEvent struct { eventType int - podId string - containerId string + podID string + containerID string cgroupPath string - netns string + // netns string } func NewOobServer(cgroupRootPath string) (*OobImpl, error) { @@ -101,7 +101,7 @@ func NewOobServer(cgroupRootPath string) (*OobImpl, error) { klog.Error("create container watcher failed", err) } - taskIdWatcher, err := inotify.NewWatcher() + taskIDWatcher, err := inotify.NewWatcher() if err != nil { klog.Error("create taskId watcher failed", err) } @@ -110,7 +110,7 @@ func NewOobServer(cgroupRootPath string) (*OobImpl, error) { cgroupRootPath: cgroupRootPath, podWatcher: podWatcher, containerWatcher: containerWatcher, - taskIdWatcher: taskIdWatcher, + taskIDWatcher: taskIDWatcher, kubeletStub: stub, podEvents: make(chan *PodEvent, 128), containerEvents: make(chan *ContainerEvent, 128), @@ -155,7 +155,7 @@ func GetNetNs(ctx context.Context, cgroupPath string) string { return "" default: file, err := os.Open(cgroupPath) - defer file.Close() + // defer file.Close() if err != nil { klog.Infof("no cgroup path now, %v %s", err, cgroupPath) return "" @@ -212,29 +212,30 @@ func (o *OobImpl) runEventHandler(stoptCh <-chan struct{}) { case event := <-o.podEvents: switch event.eventType { case PodAdded: - klog.Infof("PodAdded, %s", event.podId) + klog.Infof("PodAdded, %s", event.podID) _, err := o.GetAllPods() if err != nil { klog.Errorf("Get all pods failed %v", err) } case PodDeleted: - klog.Infof("PodDeleted, %s", event.podId) + klog.Infof("PodDeleted, %s", event.podID) } case event := <-o.containerEvents: switch event.eventType { case ContainerAdded: - klog.Infof("ContainerAdded, %s %s", event.podId, event.containerId) + klog.Infof("ContainerAdded, %s %s", event.podID, event.containerID) case ContainerDeleted: - klog.Infof("ContainerDeleted, %s %s", event.podId, event.containerId) - case ContainerTaskIdDone: + klog.Infof("ContainerDeleted, %s %s", event.podID, event.containerID) + case ContainerTaskIDDone: ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) - defer cancel() + // defer cancel() _, err := o.GetAllPods() if err != nil { klog.Errorf("Get all pods failed %v", err) } netns := GetNetNs(ctx, event.cgroupPath) - if pod, ok := o.pods[event.podId]; ok { + cancel() + if pod, ok := o.pods[event.podID]; ok { klog.Infof("add dedinic to the pod:%v", pod) podRequest := &request.CniRequest{ CniType: "kube-ovn", @@ -247,10 +248,10 @@ func (o *OobImpl) runEventHandler(stoptCh <-chan struct{}) { } DelayQueue.Put(time.Now().Add(time.Second*3), podRequest) } else { - klog.Errorf("cant find the pod info: %v", event.podId) + klog.Errorf("cant find the pod info: %v", event.podID) } klog.Infof("netns is %s", netns) - klog.Infof("ContainerTaskIdDone, %s %s %s", event.podId, event.containerId, netns) + klog.Infof("ContainerTaskIDDone, %s %s %s", event.podID, event.containerID, netns) } case <-stoptCh: return @@ -301,7 +302,7 @@ func (o *OobImpl) Run(stopCh <-chan struct{}) { case event := <-o.podWatcher.Event: switch TypeOf(event) { case DirCreated: - podId, err := ParsePodId(filepath.Base(event.Name)) + podID, err := ParsePodID(filepath.Base(event.Name)) if err != nil { klog.Errorf("failed to parse pod id from %s", event.Name) } @@ -309,9 +310,9 @@ func (o *OobImpl) Run(stopCh <-chan struct{}) { if err != nil { klog.Errorf("failed to watch path %s, err %v", event.Name, err) } - o.podEvents <- newPodEvent(podId, PodAdded, event.Name) + o.podEvents <- newPodEvent(podID, PodAdded, event.Name) case DirRemoved: - podId, err := ParsePodId(filepath.Base(event.Name)) + podID, err := ParsePodID(filepath.Base(event.Name)) if err != nil { klog.Errorf("failed to parse pod id from %s", event.Name) } @@ -319,7 +320,7 @@ func (o *OobImpl) Run(stopCh <-chan struct{}) { if err != nil { klog.Errorf("failed to remove watch path %s, err %v", event.Name, err) } - o.podEvents <- newPodEvent(podId, PodDeleted, event.Name) + o.podEvents <- newPodEvent(podID, PodDeleted, event.Name) klog.Infof("dir delete, %s", event.Name) default: klog.Infof("Unkown type") @@ -329,76 +330,77 @@ func (o *OobImpl) Run(stopCh <-chan struct{}) { case event := <-o.containerWatcher.Event: switch TypeOf(event) { case DirCreated: - containerId, err := ParseContainerId(filepath.Base(event.Name)) + containerID, err := ParseContainerID(filepath.Base(event.Name)) if err != nil { - klog.Infof("get containerId failed") + klog.Infof("get containerID failed") continue } - podId, err := ParsePodId(filepath.Base(filepath.Dir(event.Name))) + podID, err := ParsePodID(filepath.Base(filepath.Dir(event.Name))) if err != nil { - klog.Infof("get podId failed, %v", err) + klog.Infof("get podID failed, %v", err) continue } - err = o.taskIdWatcher.AddWatch(path.Join(event.Name, "cgroup.procs"), inotify.InCreate|inotify.InModify|inotify.InAllEvents) + err = o.taskIDWatcher.AddWatch(path.Join(event.Name, "cgroup.procs"), + inotify.InCreate|inotify.InModify|inotify.InAllEvents) if err != nil { klog.Errorf("failed to watch path %s, err %v", event.Name+"/cgroup.procs", err) } - o.containerEvents <- newContainerEvent(podId, containerId, ContainerAdded, event.Name) - klog.Infof("dir create, %s", event.Name, podId, containerId) + o.containerEvents <- newContainerEvent(podID, containerID, ContainerAdded, event.Name) + klog.Infof("dir create: %v, %v, %v", event.Name, podID, containerID) case DirRemoved: - containerId, err := ParseContainerId(filepath.Base(event.Name)) + containerID, err := ParseContainerID(filepath.Base(event.Name)) if err != nil { - klog.Infof("get containerId failed") + klog.Infof("get containerID failed") continue } - podId, err := ParsePodId(filepath.Base(filepath.Dir(event.Name))) + podID, err := ParsePodID(filepath.Base(filepath.Dir(event.Name))) if err != nil { - klog.Infof("get podId failed") + klog.Infof("get podID failed") continue } - o.containerEvents <- newContainerEvent(podId, containerId, ContainerDeleted, event.Name) + o.containerEvents <- newContainerEvent(podID, containerID, ContainerDeleted, event.Name) klog.Infof("dir delete, %s", event.Name) default: klog.Infof("Unkown type") } - case event := <-o.taskIdWatcher.Event: + case event := <-o.taskIDWatcher.Event: switch TypeOf(event) { case FileCreated: klog.Infof("cgroup.procs file created %v", event) containerDir := filepath.Dir(event.Name) - containerId, err := ParseContainerId(filepath.Base(containerDir)) + containerID, err := ParseContainerID(filepath.Base(containerDir)) if err != nil { - klog.Infof("get containerId failed") + klog.Infof("get containerID failed") continue } - podId, err := ParsePodId(filepath.Base(filepath.Dir(containerDir))) + podID, err := ParsePodID(filepath.Base(filepath.Dir(containerDir))) if err != nil { - klog.Infof("get podId failed, %v", err) + klog.Infof("get podID failed, %v", err) continue } - o.containerEvents <- newContainerEvent(podId, containerId, ContainerTaskIdDone, event.Name) - klog.Infof("dir create, %s", event.Name, podId, containerId) + o.containerEvents <- newContainerEvent(podID, containerID, ContainerTaskIDDone, event.Name) + klog.Infof("dir create: %v, %v, %v", event.Name, podID, containerID) case FileUpdated: klog.Infof("cgroup.procs file updated %v", event) containerDir := filepath.Dir(event.Name) - containerId, err := ParseContainerId(filepath.Base(containerDir)) + containerID, err := ParseContainerID(filepath.Base(containerDir)) if err != nil { - klog.Infof("get containerId failed") + klog.Infof("get containerID failed") continue } - podId, err := ParsePodId(filepath.Base(filepath.Dir(containerDir))) + podID, err := ParsePodID(filepath.Base(filepath.Dir(containerDir))) if err != nil { - klog.Infof("get podId failed, %v", err) + klog.Infof("get podID failed, %v", err) continue } - o.containerEvents <- newContainerEvent(podId, containerId, ContainerTaskIdDone, event.Name) - err = o.taskIdWatcher.RemoveWatch(event.Name) + o.containerEvents <- newContainerEvent(podID, containerID, ContainerTaskIDDone, event.Name) + err = o.taskIDWatcher.RemoveWatch(event.Name) if err != nil { klog.Errorf("failed to remove watch path %s, err %v", event.Name, err) } - klog.Infof("dir create, %s", event.Name, podId, containerId) + klog.Infof("dir create: %v, %v, %v", event.Name, podID, containerID) } case <-stopCh: @@ -407,7 +409,7 @@ func (o *OobImpl) Run(stopCh <-chan struct{}) { } } -func ParsePodId(basename string) (string, error) { +func ParsePodID(basename string) (string, error) { patterns := []struct { prefix string suffix string @@ -429,15 +431,14 @@ func ParsePodId(basename string) (string, error) { for i := range patterns { if strings.HasPrefix(basename, patterns[i].prefix) && strings.HasSuffix(basename, patterns[i].suffix) { - podIdStr := basename[len(patterns[i].prefix) : len(basename)-len(patterns[i].suffix)] - return strings.ReplaceAll(podIdStr, "_", "-"), nil - + podIDStr := basename[len(patterns[i].prefix) : len(basename)-len(patterns[i].suffix)] + return strings.ReplaceAll(podIDStr, "_", "-"), nil } } return "", fmt.Errorf("fail to parse pod id: %v", basename) } -func ParseContainerId(basename string) (string, error) { +func ParseContainerID(basename string) (string, error) { patterns := []struct { prefix string suffix string @@ -460,12 +461,12 @@ func ParseContainerId(basename string) (string, error) { return "", fmt.Errorf("fail to parse container id: %v", basename) } -func newPodEvent(podId string, eventType int, cgroupPath string) *PodEvent { - return &PodEvent{eventType: eventType, podId: podId, cgroupPath: cgroupPath} +func newPodEvent(podID string, eventType int, cgroupPath string) *PodEvent { + return &PodEvent{eventType: eventType, podID: podID, cgroupPath: cgroupPath} } -func newContainerEvent(podId string, containerId string, eventType int, cgroupPath string) *ContainerEvent { - return &ContainerEvent{podId: podId, containerId: containerId, eventType: eventType, cgroupPath: cgroupPath} +func newContainerEvent(podID string, containerID string, eventType int, cgroupPath string) *ContainerEvent { + return &ContainerEvent{podID: podID, containerID: containerID, eventType: eventType, cgroupPath: cgroupPath} } func (o *OobImpl) GetAllPods() (corev1.PodList, error) { diff --git a/pkg/dedinic/ovs.go b/pkg/dedinic/ovs.go index b9c9cae0..bf5b1fd2 100644 --- a/pkg/dedinic/ovs.go +++ b/pkg/dedinic/ovs.go @@ -38,7 +38,7 @@ func setupVethPair(containerID, ifName string, mtu int) (string, string, error) veth.MTU = mtu } if err = netlink.LinkAdd(&veth); err != nil { - if err := netlink.LinkDel(&veth); err != nil { + if err = netlink.LinkDel(&veth); err != nil { klog.Errorf("failed to delete veth %v", err) return "", "", err } @@ -54,9 +54,11 @@ func generateNicName(containerID, ifname string) (string, string) { // The nic name is 14 length and have prefix pod in the Kubevirt v1.0.0 if strings.HasPrefix(ifname, "pod") && len(ifname) == 14 { ifname = ifname[3 : len(ifname)-4] - return fmt.Sprintf("%s_%s_h", containerID[0:12-len(ifname)], ifname), fmt.Sprintf("%s_%s_c", containerID[0:12-len(ifname)], ifname) + return fmt.Sprintf("%s_%s_h", containerID[0:12-len(ifname)], ifname), + fmt.Sprintf("%s_%s_c", containerID[0:12-len(ifname)], ifname) } - return fmt.Sprintf("%s_%s_h", containerID[0:12-len(ifname)], ifname), fmt.Sprintf("%s_%s_c", containerID[0:12-len(ifname)], ifname) + return fmt.Sprintf("%s_%s_h", containerID[0:12-len(ifname)], ifname), + fmt.Sprintf("%s_%s_c", containerID[0:12-len(ifname)], ifname) } func turnOffNicTxChecksum(nicName string) (err error) { @@ -71,14 +73,17 @@ func turnOffNicTxChecksum(nicName string) (err error) { return nil } -func configureContainerNic(nicName, ifName, ipAddr, gateway string, isDefaultRoute, detectIPConflict bool, routes []request.Route, macAddr net.HardwareAddr, netns ns.NetNS, mtu int, nicType string, gwCheckMode int, u2oInterconnectionIP string) error { +func configureContainerNic(nicName, ifName, ipAddr, gateway string, isDefaultRoute, detectIPConflict bool, + routes []request.Route, macAddr net.HardwareAddr, netns ns.NetNS, mtu int, nicType string, gwCheckMode int, + u2oInterconnectionIP string) error { + var err error containerLink, err := netlink.LinkByName(nicName) if err != nil { return fmt.Errorf("can not find container nic %s: %v", nicName, err) } // Set link alias to its origin link name for fastpath to recognize and bypass netfilter - if err := netlink.LinkSetAlias(containerLink, nicName); err != nil { + if err = netlink.LinkSetAlias(containerLink, nicName); err != nil { klog.Errorf("failed to set link alias for container nic %s: %v", nicName, err) return err } @@ -98,7 +103,8 @@ func configureContainerNic(nicName, ifName, ipAddr, gateway string, isDefaultRou // For docker version >=17.x the "none" network will disable ipv6 by default. // We have to enable ipv6 here to add v6 address and gateway. // See https://github.com/containernetworking/cni/issues/531 - value, err := sysctl.Sysctl("net.ipv6.conf.all.disable_ipv6") + var value string + value, err = sysctl.Sysctl("net.ipv6.conf.all.disable_ipv6") if err != nil { return fmt.Errorf("failed to get sysctl net.ipv6.conf.all.disable_ipv6: %v", err) } @@ -228,35 +234,39 @@ func configureNic(link, ip string, macAddr net.HardwareAddr, mtu int, detectIPCo continue } - ipAddr, err := netlink.ParseAddr(ipStr) + var ipAddr *netlink.Addr + ipAddr, err = netlink.ParseAddr(ipStr) if err != nil { return fmt.Errorf("can not parse address %s: %v", ipStr, err) } ipAddMap[ipStr] = *ipAddr } - for ip, addr := range ipDelMap { + for ip, address := range ipDelMap { + addr := address klog.Infof("delete ip address %s on %s", ip, link) if err = netlink.AddrDel(nodeLink, &addr); err != nil { return fmt.Errorf("delete address %s: %v", addr, err) } } - for ip, addr := range ipAddMap { + for ip, address := range ipAddMap { + addr := address if detectIPConflict && addr.IP.To4() != nil { - ip := addr.IP.String() - mac, err := util.ArpDetectIPConflict(link, ip, macAddr) + ipStr := addr.IP.String() + var mac net.HardwareAddr + mac, err = util.ArpDetectIPConflict(link, ipStr, macAddr) if err != nil { - err = fmt.Errorf("failed to detect address conflict for %s on link %s: %v", ip, link, err) + err = fmt.Errorf("failed to detect address conflict for %s on link %s: %v", ipStr, link, err) klog.Error(err) return err } if mac != nil { - return fmt.Errorf("IP address %s has already been used by host with MAC %s", ip, mac) + return fmt.Errorf("IP address %s has already been used by host with MAC %s", ipStr, mac) } } if addr.IP.To4() != nil && !detectIPConflict { // when detectIPConflict is true, free arp is already broadcast in the step of announcement - if err := util.AnnounceArpAddress(link, addr.IP.String(), macAddr, 1, 1*time.Second); err != nil { + if err = util.AnnounceArpAddress(link, addr.IP.String(), macAddr, 1, 1*time.Second); err != nil { klog.Warningf("failed to broadcast free arp with err %v ", err) } } @@ -291,13 +301,15 @@ func waitNetworkReady(nic, ipAddr, gateway string, underlayGateway, verbose bool if underlayGateway && util.CheckProtocol(gw) == ProtocolIPv4 { mac, count, err := util.ArpResolve(nic, src, gw, time.Second, maxRetry) if err != nil { - err = fmt.Errorf("network %s with gateway %s is not ready for interface %s after %d checks: %v", ips[i], gw, nic, count, err) + err = fmt.Errorf("network %s with gateway %s is not ready for interface %s after %d checks: %v", + ips[i], gw, nic, count, err) klog.Warning(err) return err } if verbose { klog.Infof("MAC addresses of gateway %s is %s", gw, mac.String()) - klog.Infof("network %s with gateway %s is ready for interface %s after %d checks", ips[i], gw, nic, count) + klog.Infof("network %s with gateway %s is ready for interface %s after %d checks", + ips[i], gw, nic, count) } } else { _, err := pingGateway(gw, src, verbose, maxRetry) @@ -349,8 +361,9 @@ func pingGateway(gw, src string, verbose bool, maxRetry int) (count int, err err func setEncapIP(ip string) error { klog.V(5).Infof("setEncapIP: %s", ip) + encapIP := fmt.Sprintf("external-ids:ovn-encap-ip=%s", ip) raw, err := exec.Command( - "ovs-vsctl", "set", "open", ".", fmt.Sprintf("external-ids:ovn-encap-ip=%s", ip)).CombinedOutput() + "ovs-vsctl", "set", "open", ".", encapIP).CombinedOutput() klog.V(5).Infof("setEncapIP: %s", string(raw)) if err != nil { return fmt.Errorf("failed to set ovn-encap-ip, %s", string(raw)) @@ -358,13 +371,13 @@ func setEncapIP(ip string) error { return nil } -func disableChecksum() error { - klog.V(5).Info("disableChecksum") - raw, err := exec.Command( - "ovs-vsctl", "set", "open", ".", "external-ids:ovn-encap-csum=false").CombinedOutput() - klog.V(5).Infof("disableChecksum command: %s", string(raw)) - if err != nil { - return fmt.Errorf("failed to set ovn-encap-csum, %s", string(raw)) - } - return nil -} +// func disableChecksum() error { +// klog.V(5).Info("disableChecksum") +// raw, err := exec.Command( +// "ovs-vsctl", "set", "open", ".", "external-ids:ovn-encap-csum=false").CombinedOutput() +// klog.V(5).Infof("disableChecksum command: %s", string(raw)) +// if err != nil { +// return fmt.Errorf("failed to set ovn-encap-csum, %s", string(raw)) +// } +// return nil +// } From 8b772749320616332844319f25c77c995162e851 Mon Sep 17 00:00:00 2001 From: Airren Date: Thu, 16 May 2024 15:28:06 +0800 Subject: [PATCH 2/2] improve: refactor makefile Signed-off-by: Airren --- Makefile | 35 +++++++++++++++++++++++------------ doc/build-images.md | 39 ++++++++++++++++++++++++++++++++++++--- 2 files changed, 59 insertions(+), 15 deletions(-) diff --git a/Makefile b/Makefile index 743ea3ee..a68c58d7 100644 --- a/Makefile +++ b/Makefile @@ -4,6 +4,10 @@ IMG ?= ovnmaster:latest # Produce CRDs that work back to Kubernetes 1.11 (no version conversion) CRD_OPTIONS ?= "crd:crdVersions=v1,generateEmbeddedObjectMeta=true" +IMAGE_TAG := $(shell git rev-parse --short HEAD) +IMAGE_REPOSITORY := ghcr.io/nauti-io + + # Get the currently used golang install path (in GOPATH/bin, unless GOBIN is set) ifeq (,$(shell go env GOBIN)) GOBIN=$(shell go env GOPATH)/bin @@ -14,14 +18,7 @@ endif lint: golangci-lint golangci-lint run -c .golangci.yaml --timeout=10m -ovnmaster: - CGO_ENABLED=0 GOOS=linux go build -ldflags "-w -s" -a -installsuffix cgo -o cmd/ovnmaster/ovnmaster cmd/ovnmaster/main.go - -crossdns: - CGO_ENABLED=0 GOOS=linux go build -ldflags="-s -w" -a -installsuffix cgo -o cmd/crossdns/crossdns cmd/crossdns/main.go -octopus: - CGO_ENABLED=0 GOOS=linux go build -ldflags "-w -s" -a -installsuffix cgo -o cmd/octopus/octopus cmd/octopus/main.go # Generate manifests e.g. CRD, RBAC etc. manifests: controller-gen @@ -44,17 +41,31 @@ else CONTROLLER_GEN=$(shell which controller-gen) endif + +ovnmaster: + CGO_ENABLED=0 GOOS=linux go build -ldflags "-w -s" -a -installsuffix cgo -o cmd/ovnmaster/ovnmaster cmd/ovnmaster/main.go +crossdns: + CGO_ENABLED=0 GOOS=linux go build -ldflags="-s -w" -a -installsuffix cgo -o cmd/crossdns/crossdns cmd/crossdns/main.go +octopus: + CGO_ENABLED=0 GOOS=linux go build -ldflags "-w -s" -a -installsuffix cgo -o cmd/octopus/octopus cmd/octopus/main.go dedinic: CGO_ENABLED=0 GOARCH=amd64 GOOS=linux go build -ldflags "-w -s" -a -installsuffix cgo -o bin/dedinic cmd/dedinic/main.go - ep-controller: CGO_ENABLED=0 GOARCH=amd64 GOOS=linux go build -ldflags "-w -s" -a -installsuffix cgo -o bin/ep-controller cmd/ep-controller/main.go images: - docker build -f ./build/dedinic.Dockerfile ./ -t docker.io/airren/dedinic:v1.13.0-debug - docker build -f ./build/ep-controller.Dockerfile ./ -t docker.io/airren/ep-controller:latest - docker push docker.io/airren/dedinic:v1.13.0-debug - docker push docker.io/airren/ep-controller:latest + docker build -f ./build/dedinic.Dockerfile ./ -t ${IMAGE_REPOSITORY}/dedinic:${IMAGE_TAG} + docker build -f ./build/ep-controller.Dockerfile ./ -t ${IMAGE_REPOSITORY}/ep-controller:${IMAGE_TAG} + docker push ${IMAGE_REPOSITORY}/dedinic:${IMAGE_TAG} + docker push ${IMAGE_REPOSITORY}/ep-controller:${IMAGE_TAG} + +dedinic-image: + docker build -f ./build/dedinic.Dockerfile ./ -t${IMAGE_REPOSITORY}/dedinic:${IMAGE_TAG} + docker push ${IMAGE_REPOSITORY}/dedinic:${IMAGE_TAG} +ep-controller-image: + docker build -f ./build/ep-controller.Dockerfile ./ -t ${IMAGE_REPOSITORY}/ep-controller:${IMAGE_TAG} + docker push ${IMAGE_REPOSITORY}/ep-controller:${IMAGE_TAG} + # find or download golangci-lint # download golangci-lint if necessary diff --git a/doc/build-images.md b/doc/build-images.md index 7f9899b8..8759abdf 100644 --- a/doc/build-images.md +++ b/doc/build-images.md @@ -1,16 +1,49 @@ # How to build Nauti ## Build the Nauti image + +If you don't prefer to build `Nauti` images by yourself, you can directly pull images from the [ghcr.io/nauti-io](https://github.com/orgs/nauti-io/packages) registry. + ### Prerequisites + - Docker / Podman -clone the repo to local directory +For contributor need to login to the `ghcr.io` registry. +Get a Github Token with `read:packages` and `write:packages` permissions. +On the dev env login into the `ghcr.io` registry with the following command: + ```bash -git clonehttps://github.com/nauti-io/nauti.git +# Login to the ghcr.io registry +# GHCR_USER is the Github username +# GHCR_PAT is the Github Personal Access Token +echo $GHCR_PAT | docker login ghcr.io -u $GHCR_USER --password-stdin +``` + +Clone the repo to local directory +```bash +git clonehttps://github.com/nauti-io/nauti.git cd nauti ``` + +### Build the all Images and push to registry (ghcr.io) +```bash +make images +``` + ### DediNic Image + +```bash +make dedinic-image +``` + +### Ep-Controller Image + ```bash -docker build -t nauti . +make ep-controller-image ``` + + +## Build the Nauti Binary + +- todo \ No newline at end of file