From e432fd79d6a60691c70520eaad2fa586bc37f752 Mon Sep 17 00:00:00 2001 From: Jessie Wang Date: Mon, 30 Sep 2024 09:23:57 +1000 Subject: [PATCH] Add yurthub podenv filter --- cmd/yurthub/app/options/filters.go | 5 +- pkg/yurthub/filter/masterservice/filter.go | 1 - pkg/yurthub/filter/podenvupdater/filter.go | 78 +++++ .../filter/podenvupdater/filter_test.go | 283 ++++++++++++++++++ .../filter/responsefilter/filter_test.go | 224 ++++++++++++++ .../v3/deviceservice_client_test.go | 1 - 6 files changed, 589 insertions(+), 3 deletions(-) create mode 100644 pkg/yurthub/filter/podenvupdater/filter.go create mode 100644 pkg/yurthub/filter/podenvupdater/filter_test.go diff --git a/cmd/yurthub/app/options/filters.go b/cmd/yurthub/app/options/filters.go index b2a93512040..15544d98574 100644 --- a/cmd/yurthub/app/options/filters.go +++ b/cmd/yurthub/app/options/filters.go @@ -23,12 +23,13 @@ import ( "github.com/openyurtio/openyurt/pkg/yurthub/filter/inclusterconfig" "github.com/openyurtio/openyurt/pkg/yurthub/filter/masterservice" "github.com/openyurtio/openyurt/pkg/yurthub/filter/nodeportisolation" + "github.com/openyurtio/openyurt/pkg/yurthub/filter/podenvupdater" "github.com/openyurtio/openyurt/pkg/yurthub/filter/servicetopology" ) var ( // DisabledInCloudMode contains the filters that should be disabled when yurthub is working in cloud mode. - DisabledInCloudMode = []string{discardcloudservice.FilterName, forwardkubesvctraffic.FilterName} + DisabledInCloudMode = []string{discardcloudservice.FilterName, forwardkubesvctraffic.FilterName, podenvupdater.FilterName} // SupportedComponentsForFilter is used for specifying which components are supported by filters as default setting. SupportedComponentsForFilter = map[string]string{ @@ -38,6 +39,7 @@ var ( inclusterconfig.FilterName: "kubelet", nodeportisolation.FilterName: "kube-proxy", forwardkubesvctraffic.FilterName: "kube-proxy", + podenvupdater.FilterName: "kubelet", } ) @@ -52,4 +54,5 @@ func RegisterAllFilters(filters *base.Filters) { inclusterconfig.Register(filters) nodeportisolation.Register(filters) forwardkubesvctraffic.Register(filters) + podenvupdater.Register(filters) } diff --git a/pkg/yurthub/filter/masterservice/filter.go b/pkg/yurthub/filter/masterservice/filter.go index eab0450f143..1f32ba78dcc 100644 --- a/pkg/yurthub/filter/masterservice/filter.go +++ b/pkg/yurthub/filter/masterservice/filter.go @@ -67,7 +67,6 @@ func (msf *masterServiceFilter) SupportedResourceAndVerbs() map[string]sets.Set[ func (msf *masterServiceFilter) SetMasterServiceHost(host string) error { msf.host = host return nil - } func (msf *masterServiceFilter) SetMasterServicePort(portStr string) error { diff --git a/pkg/yurthub/filter/podenvupdater/filter.go b/pkg/yurthub/filter/podenvupdater/filter.go new file mode 100644 index 00000000000..10ee3f6fe3b --- /dev/null +++ b/pkg/yurthub/filter/podenvupdater/filter.go @@ -0,0 +1,78 @@ +package podenvupdater + +import ( + "github.com/openyurtio/openyurt/pkg/yurthub/filter" + "github.com/openyurtio/openyurt/pkg/yurthub/filter/base" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/sets" +) + +const ( + // FilterName filter is used to mutate the Kubernetes service host + // in order for pods on edge nodes to access kube-apiserver via Yurthub proxy + FilterName = "podenvupdater" + + envVarServiceHost = "KUBERNETES_SERVICE_HOST" + yurthubHost = "169.254.2.1" +) + +// Register registers a filter +func Register(filters *base.Filters) { + filters.Register(FilterName, func() (filter.ObjectFilter, error) { + return NewPodEnvFilter() + }) +} + +type podEnvFilter struct { + host string +} + +func NewPodEnvFilter() (*podEnvFilter, error) { + return &podEnvFilter{}, nil +} + +func (pef *podEnvFilter) Name() string { + return FilterName +} + +func (pef *podEnvFilter) SupportedResourceAndVerbs() map[string]sets.Set[string] { + return map[string]sets.Set[string]{ + "pods": sets.New("list", "watch", "get", "patch"), + } +} + +func (pef *podEnvFilter) SetMasterServiceHost(host string) error { + pef.host = host + return nil +} + +func (pef *podEnvFilter) SetMasterServicePort(portStr string) error { + // do nothing + return nil +} + +func (pef *podEnvFilter) Filter(obj runtime.Object, _ <-chan struct{}) runtime.Object { + switch v := obj.(type) { + case *corev1.Pod: + return pef.mutatePodEnv(v) + default: + return v + } +} + +func (pef *podEnvFilter) mutatePodEnv(req *corev1.Pod) *corev1.Pod { + proxyIP := pef.host + if proxyIP == "" { + proxyIP = yurthubHost + } + for i := range req.Spec.Containers { + for j, envVar := range req.Spec.Containers[i].Env { + if envVar.Name == envVarServiceHost { + req.Spec.Containers[i].Env[j].Value = proxyIP + break + } + } + } + return req +} diff --git a/pkg/yurthub/filter/podenvupdater/filter_test.go b/pkg/yurthub/filter/podenvupdater/filter_test.go new file mode 100644 index 00000000000..7cb78682a73 --- /dev/null +++ b/pkg/yurthub/filter/podenvupdater/filter_test.go @@ -0,0 +1,283 @@ +package podenvupdater + +import ( + "reflect" + "testing" + + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + "github.com/openyurtio/openyurt/pkg/util" + "github.com/openyurtio/openyurt/pkg/yurthub/filter/base" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/sets" +) + +const ( + masterHost = "169.254.2.1" +) + +func TestRegister(t *testing.T) { + filters := base.NewFilters([]string{}) + Register(filters) + if !filters.Enabled(FilterName) { + t.Errorf("couldn't register %s filter", FilterName) + } +} + +func TestName(t *testing.T) { + nif, _ := NewPodEnvFilter() + if nif.Name() != FilterName { + t.Errorf("expect %s, but got %s", FilterName, nif.Name()) + } +} + +func TestSupportedResourceAndVerbs(t *testing.T) { + nif, _ := NewPodEnvFilter() + rvs := nif.SupportedResourceAndVerbs() + if len(rvs) != 1 { + t.Errorf("supported more than one resources, %v", rvs) + } + for resource, verbs := range rvs { + if resource != "pods" { + t.Errorf("expect resource is pods, but got %s", resource) + } + + if !verbs.Equal(sets.New("list", "watch", "get", "patch")) { + t.Errorf("expect verbs are list/watch, but got %v", verbs.UnsortedList()) + } + } +} + +func TestFilterPodEnv(t *testing.T) { + tests := []struct { + name string + setMasterHost bool + requestObj runtime.Object + expectedObj runtime.Object + }{ + { + name: "KUBERNETES_SERVICE_HOST set to original value", + setMasterHost: true, + requestObj: &corev1.Pod{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "test-container", + Env: []corev1.EnvVar{ + {Name: "KUBERNETES_SERVICE_HOST", Value: "original-value"}, + {Name: "OTHER_ENV_VAR", Value: "some-value"}, + }, + }, + }, + }, + }, + expectedObj: &corev1.Pod{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "test-container", + Env: []corev1.EnvVar{ + {Name: "KUBERNETES_SERVICE_HOST", Value: masterHost}, + {Name: "OTHER_ENV_VAR", Value: "some-value"}, + }, + }, + }, + }, + }, + }, + { + name: "KUBERNETES_SERVICE_HOST set to correct value, should update nothing", + setMasterHost: true, + requestObj: &corev1.Pod{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "test-container", + Env: []corev1.EnvVar{ + {Name: "KUBERNETES_SERVICE_HOST", Value: masterHost}, + {Name: "OTHER_ENV_VAR", Value: "some-value"}, + }, + }, + }, + }, + }, + expectedObj: &corev1.Pod{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "test-container", + Env: []corev1.EnvVar{ + {Name: "KUBERNETES_SERVICE_HOST", Value: masterHost}, + {Name: "OTHER_ENV_VAR", Value: "some-value"}, + }, + }, + }, + }, + }, + }, + { + name: "KUBERNETES_SERVICE_HOST does not exist", + setMasterHost: true, + requestObj: &corev1.Pod{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "test-container", + Env: []corev1.EnvVar{ + {Name: "OTHER_ENV_VAR", Value: "some-value"}, + }, + }, + }, + }, + }, + expectedObj: &corev1.Pod{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "test-container", + Env: []corev1.EnvVar{ + {Name: "OTHER_ENV_VAR", Value: "some-value"}, + }, + }, + }, + }, + }, + }, + { + name: "KUBERNETES_SERVICE_HOST updates correctly in multiple containers", + setMasterHost: true, + requestObj: &corev1.Pod{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "test-container", + Env: []corev1.EnvVar{ + {Name: "KUBERNETES_SERVICE_HOST", Value: "original-value"}, + {Name: "OTHER_ENV_VAR", Value: "some-value"}, + }, + }, + { + Name: "test-container1", + Env: []corev1.EnvVar{ + {Name: "KUBERNETES_SERVICE_HOST", Value: "original-value"}, + {Name: "OTHER_ENV_VAR", Value: "some-value"}, + }, + }, + }, + }, + }, + expectedObj: &corev1.Pod{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "test-container", + Env: []corev1.EnvVar{ + {Name: "KUBERNETES_SERVICE_HOST", Value: masterHost}, + {Name: "OTHER_ENV_VAR", Value: "some-value"}, + }, + }, + { + Name: "test-container1", + Env: []corev1.EnvVar{ + {Name: "KUBERNETES_SERVICE_HOST", Value: masterHost}, + {Name: "OTHER_ENV_VAR", Value: "some-value"}, + }, + }, + }, + }, + }, + }, + { + name: "masterhost is not set - KUBERNETES_SERVICE_HOST should still update correctly", + setMasterHost: false, + requestObj: &corev1.Pod{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "test-container", + Env: []corev1.EnvVar{ + {Name: "KUBERNETES_SERVICE_HOST", Value: "original-value"}, + {Name: "OTHER_ENV_VAR", Value: "some-value"}, + }, + }, + }, + }, + }, + expectedObj: &corev1.Pod{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "test-container", + Env: []corev1.EnvVar{ + {Name: "KUBERNETES_SERVICE_HOST", Value: masterHost}, + {Name: "OTHER_ENV_VAR", Value: "some-value"}, + }, + }, + }, + }, + }, + }, + } + stopCh := make(<-chan struct{}) + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + pef, _ := NewPodEnvFilter() + if tc.setMasterHost { + pef.SetMasterServiceHost(masterHost) + } + newObj := pef.Filter(tc.requestObj, stopCh) + + if tc.expectedObj == nil { + if !util.IsNil(newObj) { + t.Errorf("RuntimeObjectFilter expect nil obj, but got %v", newObj) + } + } else if !reflect.DeepEqual(newObj, tc.expectedObj) { + t.Errorf("RuntimeObjectFilter got error, expected: \n%v\nbut got: \n%v\n", tc.expectedObj, newObj) + } + }) + } +} + +func TestFilterMasterPortNoop(t *testing.T) { + podReq := &corev1.Pod{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "test-container", + Env: []corev1.EnvVar{ + {Name: "KUBERNETES_SERVICE_HOST", Value: masterHost}, + {Name: "OTHER_ENV_VAR", Value: "some-value"}, + }, + }, + }, + }, + } + pef, _ := NewPodEnvFilter() + pef.SetMasterServicePort("10261") + newObj := pef.Filter(podReq, make(<-chan struct{})) + + if !reflect.DeepEqual(newObj, podReq) { + t.Errorf("RuntimeObjectFilter got error, expected: \n%v\nbut got: \n%v\n", podReq, newObj) + } +} + +func TestFilterNonPodEnv(t *testing.T) { + serviceReq := &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "svc1", + Namespace: "default", + }, + Spec: corev1.ServiceSpec{ + ClusterIP: "10.96.105.187", + Type: corev1.ServiceTypeClusterIP, + }, + } + pef, _ := NewPodEnvFilter() + newObj := pef.Filter(serviceReq, make(<-chan struct{})) + + if !reflect.DeepEqual(newObj, serviceReq) { + t.Errorf("RuntimeObjectFilter got error, expected: \n%v\nbut got: \n%v\n", serviceReq, newObj) + } +} diff --git a/pkg/yurthub/filter/responsefilter/filter_test.go b/pkg/yurthub/filter/responsefilter/filter_test.go index 02a21df359f..03b8da511f1 100644 --- a/pkg/yurthub/filter/responsefilter/filter_test.go +++ b/pkg/yurthub/filter/responsefilter/filter_test.go @@ -2220,6 +2220,230 @@ func TestResponseFilterForListRequest(t *testing.T) { }, }, }, + "podenvupdater: updates KUBERNETES_SERVICE_HOST env var in multiple pods": { + masterHost: masterHost, + masterPort: masterPort, + kubeClient: &k8sfake.Clientset{}, + yurtClient: &fake.FakeDynamicClient{}, + poolName: poolName, + group: "", + version: "v1", + resource: "pods", + userAgent: "kubelet", + verb: "GET", + path: "/api/v1/pods", + accept: "application/json", + inputObj: &corev1.PodList{ + TypeMeta: metav1.TypeMeta{ + Kind: "PodList", + APIVersion: "v1", + }, + Items: []corev1.Pod{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "pod1", + Namespace: "kube-system", + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "test-container", + Env: []corev1.EnvVar{ + {Name: "KUBERNETES_SERVICE_HOST", Value: "192.0.2.1"}, + {Name: "OTHER_ENV_VAR", Value: "some-value"}, + }, + }, + }, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "pod2", + Namespace: "default", + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "test-container", + Env: []corev1.EnvVar{ + {Name: "OTHER_ENV_VAR", Value: "some-value"}, + {Name: "KUBERNETES_SERVICE_HOST", Value: "192.0.2.1"}, + }, + }, + }, + }, + }, + }, + }, + expectedObj: &corev1.PodList{ + TypeMeta: metav1.TypeMeta{ + Kind: "PodList", + APIVersion: "v1", + }, + Items: []corev1.Pod{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "pod1", + Namespace: "kube-system", + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "test-container", + Env: []corev1.EnvVar{ + {Name: "KUBERNETES_SERVICE_HOST", Value: "169.254.2.1"}, + {Name: "OTHER_ENV_VAR", Value: "some-value"}, + }, + }, + }, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "pod2", + Namespace: "default", + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "test-container", + Env: []corev1.EnvVar{ + {Name: "OTHER_ENV_VAR", Value: "some-value"}, + {Name: "KUBERNETES_SERVICE_HOST", Value: "169.254.2.1"}, + }, + }, + }, + }, + }, + }, + }, + }, + "podenvupdater: updates KUBERNETES_SERVICE_HOST env var in multiple containers per pod": { + masterHost: masterHost, + masterPort: masterPort, + kubeClient: &k8sfake.Clientset{}, + yurtClient: &fake.FakeDynamicClient{}, + poolName: poolName, + group: "", + version: "v1", + resource: "pods", + userAgent: "kubelet", + verb: "GET", + path: "/api/v1/pods", + accept: "application/json", + inputObj: &corev1.PodList{ + TypeMeta: metav1.TypeMeta{ + Kind: "PodList", + APIVersion: "v1", + }, + Items: []corev1.Pod{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "pod1", + Namespace: "kube-system", + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "test-container", + Env: []corev1.EnvVar{ + {Name: "KUBERNETES_SERVICE_HOST", Value: "192.0.2.1"}, + {Name: "OTHER_ENV_VAR", Value: "some-value"}, + }, + }, + { + Name: "test-container1", + Env: []corev1.EnvVar{ + {Name: "KUBERNETES_SERVICE_HOST", Value: "192.0.2.1"}, + {Name: "OTHER_ENV_VAR", Value: "some-value"}, + }, + }, + }, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "pod2", + Namespace: "default", + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "test-container", + Env: []corev1.EnvVar{ + {Name: "OTHER_ENV_VAR", Value: "some-value"}, + {Name: "KUBERNETES_SERVICE_HOST", Value: "192.0.2.1"}, + }, + }, + { + Name: "test-container1", + Env: []corev1.EnvVar{ + {Name: "OTHER_ENV_VAR", Value: "some-value"}, + {Name: "KUBERNETES_SERVICE_HOST", Value: "192.0.2.1"}, + }, + }, + }, + }, + }, + }, + }, + expectedObj: &corev1.PodList{ + TypeMeta: metav1.TypeMeta{ + Kind: "PodList", + APIVersion: "v1", + }, + Items: []corev1.Pod{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "pod1", + Namespace: "kube-system", + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "test-container", + Env: []corev1.EnvVar{ + {Name: "KUBERNETES_SERVICE_HOST", Value: "169.254.2.1"}, + {Name: "OTHER_ENV_VAR", Value: "some-value"}, + }, + }, + { + Name: "test-container1", + Env: []corev1.EnvVar{ + {Name: "KUBERNETES_SERVICE_HOST", Value: "169.254.2.1"}, + {Name: "OTHER_ENV_VAR", Value: "some-value"}, + }, + }, + }, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "pod2", + Namespace: "default", + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "test-container", + Env: []corev1.EnvVar{ + {Name: "OTHER_ENV_VAR", Value: "some-value"}, + {Name: "KUBERNETES_SERVICE_HOST", Value: "169.254.2.1"}, + }, + }, + { + Name: "test-container1", + Env: []corev1.EnvVar{ + {Name: "OTHER_ENV_VAR", Value: "some-value"}, + {Name: "KUBERNETES_SERVICE_HOST", Value: "169.254.2.1"}, + }, + }, + }, + }, + }, + }, + }, + }, } resolver := newTestRequestInfoResolver() diff --git a/pkg/yurtiotdock/clients/edgex-foundry/v3/deviceservice_client_test.go b/pkg/yurtiotdock/clients/edgex-foundry/v3/deviceservice_client_test.go index 0a2a8b7aec4..07717ff09fe 100644 --- a/pkg/yurtiotdock/clients/edgex-foundry/v3/deviceservice_client_test.go +++ b/pkg/yurtiotdock/clients/edgex-foundry/v3/deviceservice_client_test.go @@ -138,7 +138,6 @@ func Test_ConvertServiceSystemEvents(t *testing.T) { service, err := serviceClient.Convert(context.TODO(), dsse, clients.GetOptions{Namespace: "default"}) assert.Nil(t, err) - fmt.Println(service) assert.Equal(t, "device-virtual", service.Name) assert.Equal(t, "http://edgex-device-virtual:59900", service.Spec.BaseAddress) }