Skip to content

Commit

Permalink
ut
Browse files Browse the repository at this point in the history
  • Loading branch information
Abingcbc committed Nov 27, 2024
1 parent 24ac349 commit d7cc27a
Show file tree
Hide file tree
Showing 4 changed files with 202 additions and 25 deletions.
4 changes: 1 addition & 3 deletions docs/cn/plugins/input/service-kubernetesmeta-v2.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,7 @@

| 参数 | 类型,默认值 | 说明 |
| - | - | - |
| Type | String,无默认值(必填) | 插件类型,固定为`service_syslog`|
| Domain | String,默认值为空(必填) | 实体域,ACK集群填写"acs", 普通集群填写"infra"。 |
| Type | String,无默认值(必填) | 插件类型,固定为`service_kubernetes_meta`|
| Interval | int, 30 | 采集间隔时间,单位为秒。 |
| Pod | bool, false | 是否采集Pod元数据。 |
| Node | bool, false | 是否采集Node元数据。 |
Expand All @@ -24,7 +23,6 @@
| DaemonSet | bool, false | 是否采集DaemonSet元数据。 |
| StatefulSet | bool, false | 是否采集StatefulSet元数据。 |
| Configmap | bool, false | 是否采集ConfigMap元数据。 |
| Secret | bool, false | 是否采集Secret元数据。 |
| Job | bool, false | 是否采集Job元数据。 |
| CronJob | bool, false | 是否采集CronJob元数据。 |
| Namespace | bool, false | 是否采集Namespace元数据。 |
Expand Down
46 changes: 46 additions & 0 deletions pkg/helper/k8smeta/k8s_meta_deferred_deletion_meta_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"testing"
"time"

"github.com/stretchr/testify/assert"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/cache"
Expand Down Expand Up @@ -108,3 +109,48 @@ func TestTimerSend(t *testing.T) {
}
}
}

func TestFilter(t *testing.T) {
eventCh := make(chan *K8sMetaEvent)
stopCh := make(chan struct{})
gracePeriod := 1
cache := NewDeferredDeletionMetaStore(eventCh, stopCh, int64(gracePeriod), cache.MetaNamespaceKeyFunc)
cache.Items["default/test"] = &ObjectWrapper{
Raw: &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "test",
Namespace: "default",
Labels: map[string]string{
"app": "test",
},
},
},
}
cache.Items["default/test2"] = &ObjectWrapper{
Raw: &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "test2",
Namespace: "default",
Labels: map[string]string{
"app": "test2",
},
},
},
}
cache.Items["default/test3"] = &ObjectWrapper{
Raw: &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "test3",
Namespace: "default",
Labels: map[string]string{
"app": "test2",
},
},
},
}
objs := cache.Filter(func(obj *ObjectWrapper) bool {
return obj.Raw.(*corev1.Pod).Labels["app"] == "test2"
}, 1)
assert.Len(t, objs, 1)
assert.Equal(t, "test2", objs[0].Raw.(*corev1.Pod).Labels["app"])
}
46 changes: 24 additions & 22 deletions pkg/helper/k8smeta/k8s_meta_http_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,14 +92,19 @@ func (m *metadataHandler) handlePodMetaByIPPort(w http.ResponseWriter, r *http.R
continue
}
ip := ipPort[0]
port := int32(0)
if len(ipPort) > 1 {
tmp, _ := strconv.ParseInt(ipPort[1], 10, 32)
port = int32(tmp)
}
objs := m.metaManager.cacheMap[POD].Get([]string{ip})
if len(objs) == 0 {
podMetadata := m.findPodByServiceIPPort(ipPort)
podMetadata := m.findPodByServiceIPPort(ip, port)
if podMetadata != nil {
metadata[key] = podMetadata
}
} else {
podMetadata := m.findPodByPodIPPort(ipPort, objs)
podMetadata := m.findPodByPodIPPort(ip, port, objs)
if podMetadata != nil {
metadata[key] = podMetadata
}
Expand All @@ -108,27 +113,22 @@ func (m *metadataHandler) handlePodMetaByIPPort(w http.ResponseWriter, r *http.R
wrapperResponse(w, metadata)
}

func (m *metadataHandler) findPodByServiceIPPort(ipPort []string) *PodMetadata {
ip := ipPort[0]
func (m *metadataHandler) findPodByServiceIPPort(ip string, port int32) *PodMetadata {
// try service IP
svcObjs := m.metaManager.cacheMap[SERVICE].Get([]string{ip})
if len(svcObjs) == 0 {
return nil
}
var service *v1.Service
if len(ipPort) == 2 {
expectedPort, err := strconv.Atoi(ipPort[1])
if err != nil {
return nil
}
if port != 0 {
for _, obj := range svcObjs[ip] {
svc, ok := obj.Raw.(*v1.Service)
if !ok {
continue
}
portMatch := false
for _, port := range svc.Spec.Ports {
if port.Port == int32(expectedPort) {
for _, realPort := range svc.Spec.Ports {
if realPort.Port == port {
portMatch = true
break
}
Expand All @@ -139,6 +139,9 @@ func (m *metadataHandler) findPodByServiceIPPort(ipPort []string) *PodMetadata {
service = svc
break
}
if service == nil {
return nil
}
} else {
for _, obj := range svcObjs[ip] {
// if no port specified, use the first service
Expand Down Expand Up @@ -168,19 +171,14 @@ func (m *metadataHandler) findPodByServiceIPPort(ipPort []string) *PodMetadata {
return nil
}

func (m *metadataHandler) findPodByPodIPPort(ipPort []string, objs map[string][]*ObjectWrapper) *PodMetadata {
ip := ipPort[0]
if len(ipPort) == 2 {
expectedPort, err := strconv.Atoi(ipPort[1])
if err != nil {
return nil
}
func (m *metadataHandler) findPodByPodIPPort(ip string, port int32, objs map[string][]*ObjectWrapper) *PodMetadata {
if port != 0 {
for _, obj := range objs[ip] {
pod := obj.Raw.(*v1.Pod)
for _, container := range pod.Spec.Containers {
portMatch := false
for _, port := range container.Ports {
if port.ContainerPort == int32(expectedPort) {
for _, realPort := range container.Ports {
if realPort.ContainerPort == port {
portMatch = true
break
}
Expand All @@ -194,6 +192,9 @@ func (m *metadataHandler) findPodByPodIPPort(ipPort []string, objs map[string][]
}
} else {
// without port
if objs[ip] == nil || len(objs[ip]) == 0 {
return nil
}
podMetadata := m.convertObj2PodResponse(objs[ip][0])
return podMetadata
}
Expand Down Expand Up @@ -309,8 +310,9 @@ func (m *metadataHandler) getCommonPodMetadata(pod *v1.Pod) *PodMetadata {
podMetadata.WorkloadKind = ""
logger.Warning(context.Background(), "Pod has no owner", pod.Name)
} else {
podMetadata.WorkloadName = pod.GetOwnerReferences()[0].Name
podMetadata.WorkloadKind = strings.ToLower(pod.GetOwnerReferences()[0].Kind)
reference := pod.GetOwnerReferences()[0]
podMetadata.WorkloadName = reference.Name
podMetadata.WorkloadKind = strings.ToLower(reference.Kind)
if podMetadata.WorkloadKind == REPLICASET {
// replicaset -> deployment
replicasetKey := generateNameWithNamespaceKey(pod.Namespace, podMetadata.WorkloadName)
Expand Down
131 changes: 131 additions & 0 deletions pkg/helper/k8smeta/k8s_meta_http_server_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
package k8smeta

import (
"testing"

"github.com/stretchr/testify/assert"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

func TestFindPodByServiceIPPort(t *testing.T) {
manager := GetMetaManagerInstance()
podCache := newK8sMetaCache(make(chan struct{}), POD)
podCache.metaStore.Items["default/pod1"] = &ObjectWrapper{
Raw: &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "pod1",
Namespace: "default",
Labels: map[string]string{
"app": "test",
"env": "test",
},
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "test",
Image: "test",
Ports: []corev1.ContainerPort{
{
ContainerPort: 80,
},
},
},
},
},
Status: corev1.PodStatus{
PodIP: "1.1.1.1",
},
},
}
manager.cacheMap[POD] = podCache
serviceCache := newK8sMetaCache(make(chan struct{}), SERVICE)
serviceCache.metaStore.Items["default/service1"] = &ObjectWrapper{
Raw: &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: "service1",
Namespace: "default",
},
Spec: corev1.ServiceSpec{
Selector: map[string]string{
"app": "test",
},
ClusterIPs: []string{
"2.2.2.2",
},
Ports: []corev1.ServicePort{
{
Port: 80,
},
},
},
},
}
serviceCache.metaStore.Index["2.2.2.2"] = []string{"default/service1"}
manager.cacheMap[SERVICE] = serviceCache
handler := newMetadataHandler(GetMetaManagerInstance())
podMetadata := handler.findPodByServiceIPPort("2.2.2.2", 0)
assert.NotNil(t, podMetadata)
assert.Equal(t, "pod1", podMetadata.PodName)

podMetadata = handler.findPodByServiceIPPort("2.2.2.2", 80)
assert.NotNil(t, podMetadata)
assert.Equal(t, "pod1", podMetadata.PodName)

podMetadata = handler.findPodByServiceIPPort("2.2.2.2", 90)
assert.Nil(t, podMetadata)

podMetadata = handler.findPodByServiceIPPort("3.3.3.3", 0)
assert.Nil(t, podMetadata)
}

func TestFindPodByPodIPPort(t *testing.T) {
handler := newMetadataHandler(GetMetaManagerInstance())
pods := map[string][]*ObjectWrapper{
"1.1.1.1": {
{
Raw: &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "pod1",
Namespace: "default",
Labels: map[string]string{
"app": "test",
"env": "test",
},
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "test",
Image: "test",
Ports: []corev1.ContainerPort{
{
ContainerPort: 80,
},
},
},
},
},
Status: corev1.PodStatus{
PodIP: "1.1.1.1",
},
},
},
},
}

podMetadata := handler.findPodByPodIPPort("1.1.1.1", 0, pods)
assert.NotNil(t, podMetadata)
assert.Equal(t, "pod1", podMetadata.PodName)

podMetadata = handler.findPodByPodIPPort("1.1.1.1", 80, pods)
assert.NotNil(t, podMetadata)
assert.Equal(t, "pod1", podMetadata.PodName)

podMetadata = handler.findPodByPodIPPort("1.1.1.1", 90, pods)
assert.Nil(t, podMetadata)

podMetadata = handler.findPodByPodIPPort("2.2.2.2", 0, pods)
assert.Nil(t, podMetadata)
}

0 comments on commit d7cc27a

Please sign in to comment.