Skip to content

Commit

Permalink
feat: support service and more fields in K8s meta (#1895)
Browse files Browse the repository at this point in the history
  • Loading branch information
Abingcbc authored Nov 27, 2024
1 parent 6cc36ae commit f7fe832
Show file tree
Hide file tree
Showing 9 changed files with 510 additions and 100 deletions.
4 changes: 1 addition & 3 deletions docs/cn/plugins/input/service-kubernetesmeta-v2.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,7 @@

| 参数 | 类型,默认值 | 说明 |
| - | - | - |
| Type | String,无默认值(必填) | 插件类型,固定为`service_syslog`|
| Domain | String,默认值为空(必填) | 实体域,ACK集群填写"acs", 普通集群填写"infra"。 |
| Type | String,无默认值(必填) | 插件类型,固定为`service_kubernetes_meta`|
| Interval | int, 30 | 采集间隔时间,单位为秒。 |
| Pod | bool, false | 是否采集Pod元数据。 |
| Node | bool, false | 是否采集Node元数据。 |
Expand All @@ -24,7 +23,6 @@
| DaemonSet | bool, false | 是否采集DaemonSet元数据。 |
| StatefulSet | bool, false | 是否采集StatefulSet元数据。 |
| Configmap | bool, false | 是否采集ConfigMap元数据。 |
| Secret | bool, false | 是否采集Secret元数据。 |
| Job | bool, false | 是否采集Job元数据。 |
| CronJob | bool, false | 是否采集CronJob元数据。 |
| Namespace | bool, false | 是否采集Namespace元数据。 |
Expand Down
31 changes: 30 additions & 1 deletion pkg/helper/k8smeta/k8s_meta_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,13 @@ func (m *k8sMetaCache) List() []*ObjectWrapper {
return m.metaStore.List()
}

func (m *k8sMetaCache) Filter(filterFunc func(*ObjectWrapper) bool, limit int) []*ObjectWrapper {
return m.metaStore.Filter(filterFunc, limit)
}

func (m *k8sMetaCache) RegisterSendFunc(key string, sendFunc SendFunc, interval int) {
m.metaStore.RegisterSendFunc(key, sendFunc, interval)
logger.Debug(context.Background(), "register send func", m.resourceType)
}

func (m *k8sMetaCache) UnRegisterSendFunc(key string) {
Expand Down Expand Up @@ -186,6 +191,8 @@ func getIdxRules(resourceType string) []IdxFunc {
return []IdxFunc{generateNodeKey}
case POD:
return []IdxFunc{generateCommonKey, generatePodIPKey, generateContainerIDKey, generateHostIPKey}
case SERVICE:
return []IdxFunc{generateCommonKey, generateServiceIPKey}
default:
return []IdxFunc{generateCommonKey}
}
Expand Down Expand Up @@ -274,7 +281,7 @@ func generateContainerIDKey(obj interface{}) ([]string, error) {
}
result := make([]string, len(pod.Status.ContainerStatuses))
for i, containerStatus := range pod.Status.ContainerStatuses {
result[i] = containerStatus.ContainerID
result[i] = truncateContainerID(containerStatus.ContainerID)
}
return result, nil
}
Expand All @@ -286,3 +293,25 @@ func generateHostIPKey(obj interface{}) ([]string, error) {
}
return []string{pod.Status.HostIP}, nil
}

func generateServiceIPKey(obj interface{}) ([]string, error) {
svc, ok := obj.(*v1.Service)
if !ok {
return []string{}, fmt.Errorf("object is not a service")
}
results := make([]string, 0)
for _, ip := range svc.Spec.ClusterIPs {
if ip != "" {
results = append(results, ip)
}
}
for _, ip := range svc.Spec.ExternalIPs {
if ip != "" {
results = append(results, ip)
}
}
if svc.Spec.LoadBalancerIP != "" {
results = append(results, svc.Spec.LoadBalancerIP)
}
return results, nil
}
9 changes: 7 additions & 2 deletions pkg/helper/k8smeta/k8s_meta_const.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,12 +124,17 @@ const (
)

type PodMetadata struct {
PodName string `json:"podName"`
StartTime int64 `json:"startTime"`
Namespace string `json:"namespace"`
WorkloadName string `json:"workloadName"`
WorkloadKind string `json:"workloadKind"`
ServiceName string `json:"serviceName"`
Labels map[string]string `json:"labels"`
Envs map[string]string `json:"envs"`
Images map[string]string `json:"images"`
IsDeleted bool `json:"-"`

ServiceName string `json:"serviceName,omitempty"`
ContainerIDs []string `json:"containerIDs,omitempty"`
PodIP string `json:"podIP,omitempty"`
IsDeleted bool `json:"-"`
}
85 changes: 58 additions & 27 deletions pkg/helper/k8smeta/k8s_meta_deferred_deletion_meta_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ type DeferredDeletionMetaStore struct {
lock sync.RWMutex

// timer
gracePeriod int64
sendFuncs sync.Map
gracePeriod int64
registerLock sync.RWMutex
sendFuncs map[string]*SendFuncWithStopCh
}

type TimerEvent struct {
Expand All @@ -49,7 +50,7 @@ func NewDeferredDeletionMetaStore(eventCh chan *K8sMetaEvent, stopCh <-chan stru
Index: make(map[string][]string),

gracePeriod: gracePeriod,
sendFuncs: sync.Map{},
sendFuncs: make(map[string]*SendFuncWithStopCh),
}
return m
}
Expand All @@ -65,7 +66,7 @@ func (m *DeferredDeletionMetaStore) Get(key []string) map[string][]*ObjectWrappe
for _, k := range key {
realKeys, ok := m.Index[k]
if !ok {
return nil
continue
}
for _, realKey := range realKeys {
result[k] = append(result[k], m.Items[realKey])
Expand All @@ -84,12 +85,33 @@ func (m *DeferredDeletionMetaStore) List() []*ObjectWrapper {
return result
}

func (m *DeferredDeletionMetaStore) Filter(filterFunc func(*ObjectWrapper) bool, limit int) []*ObjectWrapper {
m.lock.RLock()
defer m.lock.RUnlock()
result := make([]*ObjectWrapper, 0)
for _, item := range m.Items {
if filterFunc != nil {
if filterFunc(item) {
result = append(result, item)
}
} else {
result = append(result, item)
}
if limit > 0 && len(result) >= limit {
break
}
}
return result
}

func (m *DeferredDeletionMetaStore) RegisterSendFunc(key string, f SendFunc, interval int) {
sendFuncWithStopCh := &SendFuncWithStopCh{
SendFunc: f,
StopCh: make(chan struct{}),
}
m.sendFuncs.Store(key, sendFuncWithStopCh)
m.registerLock.Lock()
m.sendFuncs[key] = sendFuncWithStopCh
m.registerLock.Unlock()
go func() {
defer panicRecover()
event := &K8sMetaEvent{
Expand Down Expand Up @@ -123,9 +145,12 @@ func (m *DeferredDeletionMetaStore) RegisterSendFunc(key string, f SendFunc, int
}

func (m *DeferredDeletionMetaStore) UnRegisterSendFunc(key string) {
if stopCh, ok := m.sendFuncs.LoadAndDelete(key); ok {
close(stopCh.(*SendFuncWithStopCh).StopCh)
m.registerLock.Lock()
if stopCh, ok := m.sendFuncs[key]; ok {
close(stopCh.StopCh)
}
delete(m.sendFuncs, key)
m.registerLock.Unlock()
}

// realtime events (add, update, delete) and timer events are handled sequentially
Expand All @@ -149,10 +174,11 @@ func (m *DeferredDeletionMetaStore) handleEvent() {
logger.Error(context.Background(), "unknown event type", event.EventType)
}
case <-m.stopCh:
m.sendFuncs.Range(func(key, value interface{}) bool {
close(value.(*SendFuncWithStopCh).StopCh)
return true
})
m.registerLock.Lock()
for _, f := range m.sendFuncs {
close(f.StopCh)
}
m.registerLock.Unlock()
return
}
}
Expand All @@ -174,10 +200,11 @@ func (m *DeferredDeletionMetaStore) handleAddEvent(event *K8sMetaEvent) {
m.Index[idxKey] = append(m.Index[idxKey], key)
}
m.lock.Unlock()
m.sendFuncs.Range(func(key, value interface{}) bool {
value.(*SendFuncWithStopCh).SendFunc([]*K8sMetaEvent{event})
return true
})
m.registerLock.RLock()
for _, f := range m.sendFuncs {
f.SendFunc([]*K8sMetaEvent{event})
}
m.registerLock.RUnlock()
}

func (m *DeferredDeletionMetaStore) handleUpdateEvent(event *K8sMetaEvent) {
Expand All @@ -199,10 +226,11 @@ func (m *DeferredDeletionMetaStore) handleUpdateEvent(event *K8sMetaEvent) {
m.Index[idxKey] = append(m.Index[idxKey], key)
}
m.lock.Unlock()
m.sendFuncs.Range(func(key, value interface{}) bool {
value.(*SendFuncWithStopCh).SendFunc([]*K8sMetaEvent{event})
return true
})
m.registerLock.RLock()
for _, f := range m.sendFuncs {
f.SendFunc([]*K8sMetaEvent{event})
}
m.registerLock.RUnlock()
}

func (m *DeferredDeletionMetaStore) handleDeleteEvent(event *K8sMetaEvent) {
Expand All @@ -217,10 +245,11 @@ func (m *DeferredDeletionMetaStore) handleDeleteEvent(event *K8sMetaEvent) {
event.Object.FirstObservedTime = obj.FirstObservedTime
}
m.lock.Unlock()
m.sendFuncs.Range(func(key, value interface{}) bool {
value.(*SendFuncWithStopCh).SendFunc([]*K8sMetaEvent{event})
return true
})
m.registerLock.RLock()
for _, f := range m.sendFuncs {
f.SendFunc([]*K8sMetaEvent{event})
}
m.registerLock.RUnlock()
go func() {
// wait and add a deferred delete event
time.Sleep(time.Duration(m.gracePeriod) * time.Second)
Expand Down Expand Up @@ -277,9 +306,11 @@ func (m *DeferredDeletionMetaStore) handleDeferredDeleteEvent(event *K8sMetaEven

func (m *DeferredDeletionMetaStore) handleTimerEvent(event *K8sMetaEvent) {
timerEvent := event.Object.Raw.(*TimerEvent)
if f, ok := m.sendFuncs.Load(timerEvent.ConfigName); ok {
sendFuncWithStopCh := f.(*SendFuncWithStopCh)
m.registerLock.RLock()
defer m.registerLock.RUnlock()
if f, ok := m.sendFuncs[timerEvent.ConfigName]; ok {
allItems := make([]*K8sMetaEvent, 0)
m.lock.RLock()
for _, obj := range m.Items {
if !obj.Deleted {
obj.LastObservedTime = time.Now().Unix()
Expand All @@ -289,9 +320,9 @@ func (m *DeferredDeletionMetaStore) handleTimerEvent(event *K8sMetaEvent) {
})
}
}
sendFuncWithStopCh.SendFunc(allItems)
m.lock.RUnlock()
f.SendFunc(allItems)
}

}

func (m *DeferredDeletionMetaStore) getIdxKeys(obj *ObjectWrapper) []string {
Expand Down
46 changes: 46 additions & 0 deletions pkg/helper/k8smeta/k8s_meta_deferred_deletion_meta_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"testing"
"time"

"github.com/stretchr/testify/assert"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/cache"
Expand Down Expand Up @@ -108,3 +109,48 @@ func TestTimerSend(t *testing.T) {
}
}
}

func TestFilter(t *testing.T) {
eventCh := make(chan *K8sMetaEvent)
stopCh := make(chan struct{})
gracePeriod := 1
cache := NewDeferredDeletionMetaStore(eventCh, stopCh, int64(gracePeriod), cache.MetaNamespaceKeyFunc)
cache.Items["default/test"] = &ObjectWrapper{
Raw: &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "test",
Namespace: "default",
Labels: map[string]string{
"app": "test",
},
},
},
}
cache.Items["default/test2"] = &ObjectWrapper{
Raw: &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "test2",
Namespace: "default",
Labels: map[string]string{
"app": "test2",
},
},
},
}
cache.Items["default/test3"] = &ObjectWrapper{
Raw: &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "test3",
Namespace: "default",
Labels: map[string]string{
"app": "test2",
},
},
},
}
objs := cache.Filter(func(obj *ObjectWrapper) bool {
return obj.Raw.(*corev1.Pod).Labels["app"] == "test2"
}, 1)
assert.Len(t, objs, 1)
assert.Equal(t, "test2", objs[0].Raw.(*corev1.Pod).Labels["app"])
}
Loading

0 comments on commit f7fe832

Please sign in to comment.