diff --git a/docs/cn/plugins/input/service-kubernetesmeta-v2.md b/docs/cn/plugins/input/service-kubernetesmeta-v2.md index df4c4589ab..d7cb809bbb 100644 --- a/docs/cn/plugins/input/service-kubernetesmeta-v2.md +++ b/docs/cn/plugins/input/service-kubernetesmeta-v2.md @@ -24,7 +24,6 @@ | DaemonSet | bool, false | 是否采集DaemonSet元数据。 | | StatefulSet | bool, false | 是否采集StatefulSet元数据。 | | Configmap | bool, false | 是否采集ConfigMap元数据。 | -| Secret | bool, false | 是否采集Secret元数据。 | | Job | bool, false | 是否采集Job元数据。 | | CronJob | bool, false | 是否采集CronJob元数据。 | | Namespace | bool, false | 是否采集Namespace元数据。 | diff --git a/pkg/helper/k8smeta/k8s_meta_deferred_deletion_meta_store_test.go b/pkg/helper/k8smeta/k8s_meta_deferred_deletion_meta_store_test.go index 64f210599b..5e14124fb2 100644 --- a/pkg/helper/k8smeta/k8s_meta_deferred_deletion_meta_store_test.go +++ b/pkg/helper/k8smeta/k8s_meta_deferred_deletion_meta_store_test.go @@ -4,6 +4,7 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/tools/cache" @@ -108,3 +109,48 @@ func TestTimerSend(t *testing.T) { } } } + +func TestFilter(t *testing.T) { + eventCh := make(chan *K8sMetaEvent) + stopCh := make(chan struct{}) + gracePeriod := 1 + cache := NewDeferredDeletionMetaStore(eventCh, stopCh, int64(gracePeriod), cache.MetaNamespaceKeyFunc) + cache.Items["default/test"] = &ObjectWrapper{ + Raw: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + Namespace: "default", + Labels: map[string]string{ + "app": "test", + }, + }, + }, + } + cache.Items["default/test2"] = &ObjectWrapper{ + Raw: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test2", + Namespace: "default", + Labels: map[string]string{ + "app": "test2", + }, + }, + }, + } + cache.Items["default/test3"] = &ObjectWrapper{ + Raw: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test3", + Namespace: "default", + Labels: map[string]string{ + "app": "test2", + }, + }, + }, + } + objs := cache.Filter(func(obj *ObjectWrapper) bool { + return obj.Raw.(*corev1.Pod).Labels["app"] == "test2" + }, 1) + assert.Len(t, objs, 1) + assert.Equal(t, "test2", objs[0].Raw.(*corev1.Pod).Labels["app"]) +} diff --git a/pkg/helper/k8smeta/k8s_meta_http_server.go b/pkg/helper/k8smeta/k8s_meta_http_server.go index b9bc687f95..274704fafd 100644 --- a/pkg/helper/k8smeta/k8s_meta_http_server.go +++ b/pkg/helper/k8smeta/k8s_meta_http_server.go @@ -92,14 +92,19 @@ func (m *metadataHandler) handlePodMetaByIPPort(w http.ResponseWriter, r *http.R continue } ip := ipPort[0] + port := int32(0) + if len(ipPort) > 1 { + tmp, _ := strconv.ParseInt(ipPort[1], 10, 32) + port = int32(tmp) + } objs := m.metaManager.cacheMap[POD].Get([]string{ip}) if len(objs) == 0 { - podMetadata := m.findPodByServiceIPPort(ipPort) + podMetadata := m.findPodByServiceIPPort(ip, port) if podMetadata != nil { metadata[key] = podMetadata } } else { - podMetadata := m.findPodByPodIPPort(ipPort, objs) + podMetadata := m.findPodByPodIPPort(ip, port, objs) if podMetadata != nil { metadata[key] = podMetadata } @@ -108,27 +113,22 @@ func (m *metadataHandler) handlePodMetaByIPPort(w http.ResponseWriter, r *http.R wrapperResponse(w, metadata) } -func (m *metadataHandler) findPodByServiceIPPort(ipPort []string) *PodMetadata { - ip := ipPort[0] +func (m *metadataHandler) findPodByServiceIPPort(ip string, port int32) *PodMetadata { // try service IP svcObjs := m.metaManager.cacheMap[SERVICE].Get([]string{ip}) if len(svcObjs) == 0 { return nil } var service *v1.Service - if len(ipPort) == 2 { - expectedPort, err := strconv.Atoi(ipPort[1]) - if err != nil { - return nil - } + if port != 0 { for _, obj := range svcObjs[ip] { svc, ok := obj.Raw.(*v1.Service) if !ok { continue } portMatch := false - for _, port := range svc.Spec.Ports { - if port.Port == int32(expectedPort) { + for _, realPort := range svc.Spec.Ports { + if realPort.Port == port { portMatch = true break } @@ -139,6 +139,9 @@ func (m *metadataHandler) findPodByServiceIPPort(ipPort []string) *PodMetadata { service = svc break } + if service == nil { + return nil + } } else { for _, obj := range svcObjs[ip] { // if no port specified, use the first service @@ -168,19 +171,14 @@ func (m *metadataHandler) findPodByServiceIPPort(ipPort []string) *PodMetadata { return nil } -func (m *metadataHandler) findPodByPodIPPort(ipPort []string, objs map[string][]*ObjectWrapper) *PodMetadata { - ip := ipPort[0] - if len(ipPort) == 2 { - expectedPort, err := strconv.Atoi(ipPort[1]) - if err != nil { - return nil - } +func (m *metadataHandler) findPodByPodIPPort(ip string, port int32, objs map[string][]*ObjectWrapper) *PodMetadata { + if port != 0 { for _, obj := range objs[ip] { pod := obj.Raw.(*v1.Pod) for _, container := range pod.Spec.Containers { portMatch := false - for _, port := range container.Ports { - if port.ContainerPort == int32(expectedPort) { + for _, realPort := range container.Ports { + if realPort.ContainerPort == port { portMatch = true break } @@ -194,6 +192,9 @@ func (m *metadataHandler) findPodByPodIPPort(ipPort []string, objs map[string][] } } else { // without port + if objs[ip] == nil || len(objs[ip]) == 0 { + return nil + } podMetadata := m.convertObj2PodResponse(objs[ip][0]) return podMetadata } @@ -309,8 +310,9 @@ func (m *metadataHandler) getCommonPodMetadata(pod *v1.Pod) *PodMetadata { podMetadata.WorkloadKind = "" logger.Warning(context.Background(), "Pod has no owner", pod.Name) } else { - podMetadata.WorkloadName = pod.GetOwnerReferences()[0].Name - podMetadata.WorkloadKind = strings.ToLower(pod.GetOwnerReferences()[0].Kind) + reference := pod.GetOwnerReferences()[0] + podMetadata.WorkloadName = reference.Name + podMetadata.WorkloadKind = strings.ToLower(reference.Kind) if podMetadata.WorkloadKind == REPLICASET { // replicaset -> deployment replicasetKey := generateNameWithNamespaceKey(pod.Namespace, podMetadata.WorkloadName) diff --git a/pkg/helper/k8smeta/k8s_meta_http_server_test.go b/pkg/helper/k8smeta/k8s_meta_http_server_test.go new file mode 100644 index 0000000000..fbdc64e4c4 --- /dev/null +++ b/pkg/helper/k8smeta/k8s_meta_http_server_test.go @@ -0,0 +1,131 @@ +package k8smeta + +import ( + "testing" + + "github.com/stretchr/testify/assert" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +func TestFindPodByServiceIPPort(t *testing.T) { + manager := GetMetaManagerInstance() + podCache := newK8sMetaCache(make(chan struct{}), POD) + podCache.metaStore.Items["default/pod1"] = &ObjectWrapper{ + Raw: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod1", + Namespace: "default", + Labels: map[string]string{ + "app": "test", + "env": "test", + }, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "test", + Image: "test", + Ports: []corev1.ContainerPort{ + { + ContainerPort: 80, + }, + }, + }, + }, + }, + Status: corev1.PodStatus{ + PodIP: "1.1.1.1", + }, + }, + } + manager.cacheMap[POD] = podCache + serviceCache := newK8sMetaCache(make(chan struct{}), SERVICE) + serviceCache.metaStore.Items["default/service1"] = &ObjectWrapper{ + Raw: &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "service1", + Namespace: "default", + }, + Spec: corev1.ServiceSpec{ + Selector: map[string]string{ + "app": "test", + }, + ClusterIPs: []string{ + "2.2.2.2", + }, + Ports: []corev1.ServicePort{ + { + Port: 80, + }, + }, + }, + }, + } + serviceCache.metaStore.Index["2.2.2.2"] = []string{"default/service1"} + manager.cacheMap[SERVICE] = serviceCache + handler := newMetadataHandler(GetMetaManagerInstance()) + podMetadata := handler.findPodByServiceIPPort("2.2.2.2", 0) + assert.NotNil(t, podMetadata) + assert.Equal(t, "pod1", podMetadata.PodName) + + podMetadata = handler.findPodByServiceIPPort("2.2.2.2", 80) + assert.NotNil(t, podMetadata) + assert.Equal(t, "pod1", podMetadata.PodName) + + podMetadata = handler.findPodByServiceIPPort("2.2.2.2", 90) + assert.Nil(t, podMetadata) + + podMetadata = handler.findPodByServiceIPPort("3.3.3.3", 0) + assert.Nil(t, podMetadata) +} + +func TestFindPodByPodIPPort(t *testing.T) { + handler := newMetadataHandler(GetMetaManagerInstance()) + pods := map[string][]*ObjectWrapper{ + "1.1.1.1": { + { + Raw: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod1", + Namespace: "default", + Labels: map[string]string{ + "app": "test", + "env": "test", + }, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "test", + Image: "test", + Ports: []corev1.ContainerPort{ + { + ContainerPort: 80, + }, + }, + }, + }, + }, + Status: corev1.PodStatus{ + PodIP: "1.1.1.1", + }, + }, + }, + }, + } + + podMetadata := handler.findPodByPodIPPort("1.1.1.1", 0, pods) + assert.NotNil(t, podMetadata) + assert.Equal(t, "pod1", podMetadata.PodName) + + podMetadata = handler.findPodByPodIPPort("1.1.1.1", 80, pods) + assert.NotNil(t, podMetadata) + assert.Equal(t, "pod1", podMetadata.PodName) + + podMetadata = handler.findPodByPodIPPort("1.1.1.1", 90, pods) + assert.Nil(t, podMetadata) + + podMetadata = handler.findPodByPodIPPort("2.2.2.2", 0, pods) + assert.Nil(t, podMetadata) +}