Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support service and more fields in K8s meta #1895

Merged
merged 6 commits into from
Nov 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions docs/cn/plugins/input/service-kubernetesmeta-v2.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,7 @@

| 参数 | 类型,默认值 | 说明 |
| - | - | - |
| Type | String,无默认值(必填) | 插件类型,固定为`service_syslog`。 |
| Domain | String,默认值为空(必填) | 实体域,ACK集群填写"acs", 普通集群填写"infra"。 |
| Type | String,无默认值(必填) | 插件类型,固定为`service_kubernetes_meta`。 |
| Interval | int, 30 | 采集间隔时间,单位为秒。 |
| Pod | bool, false | 是否采集Pod元数据。 |
| Node | bool, false | 是否采集Node元数据。 |
Expand All @@ -24,7 +23,6 @@
| DaemonSet | bool, false | 是否采集DaemonSet元数据。 |
| StatefulSet | bool, false | 是否采集StatefulSet元数据。 |
| Configmap | bool, false | 是否采集ConfigMap元数据。 |
| Secret | bool, false | 是否采集Secret元数据。 |
| Job | bool, false | 是否采集Job元数据。 |
| CronJob | bool, false | 是否采集CronJob元数据。 |
| Namespace | bool, false | 是否采集Namespace元数据。 |
Expand Down
31 changes: 30 additions & 1 deletion pkg/helper/k8smeta/k8s_meta_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,13 @@ func (m *k8sMetaCache) List() []*ObjectWrapper {
return m.metaStore.List()
}

func (m *k8sMetaCache) Filter(filterFunc func(*ObjectWrapper) bool, limit int) []*ObjectWrapper {
return m.metaStore.Filter(filterFunc, limit)
}

func (m *k8sMetaCache) RegisterSendFunc(key string, sendFunc SendFunc, interval int) {
m.metaStore.RegisterSendFunc(key, sendFunc, interval)
logger.Debug(context.Background(), "register send func", m.resourceType)
}

func (m *k8sMetaCache) UnRegisterSendFunc(key string) {
Expand Down Expand Up @@ -186,6 +191,8 @@ func getIdxRules(resourceType string) []IdxFunc {
return []IdxFunc{generateNodeKey}
case POD:
return []IdxFunc{generateCommonKey, generatePodIPKey, generateContainerIDKey, generateHostIPKey}
case SERVICE:
return []IdxFunc{generateCommonKey, generateServiceIPKey}
default:
return []IdxFunc{generateCommonKey}
}
Expand Down Expand Up @@ -274,7 +281,7 @@ func generateContainerIDKey(obj interface{}) ([]string, error) {
}
result := make([]string, len(pod.Status.ContainerStatuses))
for i, containerStatus := range pod.Status.ContainerStatuses {
result[i] = containerStatus.ContainerID
result[i] = truncateContainerID(containerStatus.ContainerID)
}
return result, nil
}
Expand All @@ -286,3 +293,25 @@ func generateHostIPKey(obj interface{}) ([]string, error) {
}
return []string{pod.Status.HostIP}, nil
}

func generateServiceIPKey(obj interface{}) ([]string, error) {
svc, ok := obj.(*v1.Service)
if !ok {
return []string{}, fmt.Errorf("object is not a service")
}
results := make([]string, 0)
for _, ip := range svc.Spec.ClusterIPs {
if ip != "" {
results = append(results, ip)
}
}
for _, ip := range svc.Spec.ExternalIPs {
if ip != "" {
results = append(results, ip)
}
}
if svc.Spec.LoadBalancerIP != "" {
results = append(results, svc.Spec.LoadBalancerIP)
}
return results, nil
}
9 changes: 7 additions & 2 deletions pkg/helper/k8smeta/k8s_meta_const.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,12 +124,17 @@ const (
)

type PodMetadata struct {
PodName string `json:"podName"`
StartTime int64 `json:"startTime"`
Namespace string `json:"namespace"`
WorkloadName string `json:"workloadName"`
WorkloadKind string `json:"workloadKind"`
ServiceName string `json:"serviceName"`
Labels map[string]string `json:"labels"`
Envs map[string]string `json:"envs"`
Images map[string]string `json:"images"`
IsDeleted bool `json:"-"`

ServiceName string `json:"serviceName,omitempty"`
ContainerIDs []string `json:"containerIDs,omitempty"`
PodIP string `json:"podIP,omitempty"`
IsDeleted bool `json:"-"`
}
85 changes: 58 additions & 27 deletions pkg/helper/k8smeta/k8s_meta_deferred_deletion_meta_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ type DeferredDeletionMetaStore struct {
lock sync.RWMutex

// timer
gracePeriod int64
sendFuncs sync.Map
gracePeriod int64
registerLock sync.RWMutex
sendFuncs map[string]*SendFuncWithStopCh
}

type TimerEvent struct {
Expand All @@ -49,7 +50,7 @@ func NewDeferredDeletionMetaStore(eventCh chan *K8sMetaEvent, stopCh <-chan stru
Index: make(map[string][]string),

gracePeriod: gracePeriod,
sendFuncs: sync.Map{},
sendFuncs: make(map[string]*SendFuncWithStopCh),
}
return m
}
Expand All @@ -65,7 +66,7 @@ func (m *DeferredDeletionMetaStore) Get(key []string) map[string][]*ObjectWrappe
for _, k := range key {
realKeys, ok := m.Index[k]
if !ok {
return nil
continue
}
for _, realKey := range realKeys {
result[k] = append(result[k], m.Items[realKey])
Expand All @@ -84,12 +85,33 @@ func (m *DeferredDeletionMetaStore) List() []*ObjectWrapper {
return result
}

func (m *DeferredDeletionMetaStore) Filter(filterFunc func(*ObjectWrapper) bool, limit int) []*ObjectWrapper {
m.lock.RLock()
defer m.lock.RUnlock()
result := make([]*ObjectWrapper, 0)
for _, item := range m.Items {
if filterFunc != nil {
if filterFunc(item) {
result = append(result, item)
}
} else {
result = append(result, item)
}
if limit > 0 && len(result) >= limit {
break
}
}
return result
}

func (m *DeferredDeletionMetaStore) RegisterSendFunc(key string, f SendFunc, interval int) {
sendFuncWithStopCh := &SendFuncWithStopCh{
SendFunc: f,
StopCh: make(chan struct{}),
}
m.sendFuncs.Store(key, sendFuncWithStopCh)
m.registerLock.Lock()
m.sendFuncs[key] = sendFuncWithStopCh
m.registerLock.Unlock()
go func() {
defer panicRecover()
event := &K8sMetaEvent{
Expand Down Expand Up @@ -123,9 +145,12 @@ func (m *DeferredDeletionMetaStore) RegisterSendFunc(key string, f SendFunc, int
}

func (m *DeferredDeletionMetaStore) UnRegisterSendFunc(key string) {
if stopCh, ok := m.sendFuncs.LoadAndDelete(key); ok {
close(stopCh.(*SendFuncWithStopCh).StopCh)
m.registerLock.Lock()
if stopCh, ok := m.sendFuncs[key]; ok {
close(stopCh.StopCh)
}
delete(m.sendFuncs, key)
m.registerLock.Unlock()
}

// realtime events (add, update, delete) and timer events are handled sequentially
Expand All @@ -149,10 +174,11 @@ func (m *DeferredDeletionMetaStore) handleEvent() {
logger.Error(context.Background(), "unknown event type", event.EventType)
}
case <-m.stopCh:
m.sendFuncs.Range(func(key, value interface{}) bool {
close(value.(*SendFuncWithStopCh).StopCh)
return true
})
m.registerLock.Lock()
for _, f := range m.sendFuncs {
close(f.StopCh)
}
m.registerLock.Unlock()
return
}
}
Expand All @@ -174,10 +200,11 @@ func (m *DeferredDeletionMetaStore) handleAddEvent(event *K8sMetaEvent) {
m.Index[idxKey] = append(m.Index[idxKey], key)
}
m.lock.Unlock()
m.sendFuncs.Range(func(key, value interface{}) bool {
value.(*SendFuncWithStopCh).SendFunc([]*K8sMetaEvent{event})
return true
})
m.registerLock.RLock()
for _, f := range m.sendFuncs {
f.SendFunc([]*K8sMetaEvent{event})
}
m.registerLock.RUnlock()
}

func (m *DeferredDeletionMetaStore) handleUpdateEvent(event *K8sMetaEvent) {
Expand All @@ -199,10 +226,11 @@ func (m *DeferredDeletionMetaStore) handleUpdateEvent(event *K8sMetaEvent) {
m.Index[idxKey] = append(m.Index[idxKey], key)
}
m.lock.Unlock()
m.sendFuncs.Range(func(key, value interface{}) bool {
value.(*SendFuncWithStopCh).SendFunc([]*K8sMetaEvent{event})
return true
})
m.registerLock.RLock()
for _, f := range m.sendFuncs {
f.SendFunc([]*K8sMetaEvent{event})
}
m.registerLock.RUnlock()
}

func (m *DeferredDeletionMetaStore) handleDeleteEvent(event *K8sMetaEvent) {
Expand All @@ -217,10 +245,11 @@ func (m *DeferredDeletionMetaStore) handleDeleteEvent(event *K8sMetaEvent) {
event.Object.FirstObservedTime = obj.FirstObservedTime
}
m.lock.Unlock()
m.sendFuncs.Range(func(key, value interface{}) bool {
value.(*SendFuncWithStopCh).SendFunc([]*K8sMetaEvent{event})
return true
})
m.registerLock.RLock()
for _, f := range m.sendFuncs {
f.SendFunc([]*K8sMetaEvent{event})
}
m.registerLock.RUnlock()
go func() {
// wait and add a deferred delete event
time.Sleep(time.Duration(m.gracePeriod) * time.Second)
Expand Down Expand Up @@ -277,9 +306,11 @@ func (m *DeferredDeletionMetaStore) handleDeferredDeleteEvent(event *K8sMetaEven

func (m *DeferredDeletionMetaStore) handleTimerEvent(event *K8sMetaEvent) {
timerEvent := event.Object.Raw.(*TimerEvent)
if f, ok := m.sendFuncs.Load(timerEvent.ConfigName); ok {
sendFuncWithStopCh := f.(*SendFuncWithStopCh)
m.registerLock.RLock()
defer m.registerLock.RUnlock()
if f, ok := m.sendFuncs[timerEvent.ConfigName]; ok {
allItems := make([]*K8sMetaEvent, 0)
m.lock.RLock()
Comment on lines +309 to +313
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

之前在遍历sendFunc时,锁的粒度不够。只能保证取到元素时是原子的。
但是取到之后,到实际调用执行之间,可能会出现配置重新加载,unregister,无法保证元素仍是有效的。

for _, obj := range m.Items {
if !obj.Deleted {
obj.LastObservedTime = time.Now().Unix()
Expand All @@ -289,9 +320,9 @@ func (m *DeferredDeletionMetaStore) handleTimerEvent(event *K8sMetaEvent) {
})
}
}
sendFuncWithStopCh.SendFunc(allItems)
m.lock.RUnlock()
f.SendFunc(allItems)
}

}

func (m *DeferredDeletionMetaStore) getIdxKeys(obj *ObjectWrapper) []string {
Expand Down
46 changes: 46 additions & 0 deletions pkg/helper/k8smeta/k8s_meta_deferred_deletion_meta_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"testing"
"time"

"github.com/stretchr/testify/assert"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/cache"
Expand Down Expand Up @@ -108,3 +109,48 @@ func TestTimerSend(t *testing.T) {
}
}
}

func TestFilter(t *testing.T) {
eventCh := make(chan *K8sMetaEvent)
stopCh := make(chan struct{})
gracePeriod := 1
cache := NewDeferredDeletionMetaStore(eventCh, stopCh, int64(gracePeriod), cache.MetaNamespaceKeyFunc)
cache.Items["default/test"] = &ObjectWrapper{
Raw: &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "test",
Namespace: "default",
Labels: map[string]string{
"app": "test",
},
},
},
}
cache.Items["default/test2"] = &ObjectWrapper{
Raw: &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "test2",
Namespace: "default",
Labels: map[string]string{
"app": "test2",
},
},
},
}
cache.Items["default/test3"] = &ObjectWrapper{
Raw: &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "test3",
Namespace: "default",
Labels: map[string]string{
"app": "test2",
},
},
},
}
objs := cache.Filter(func(obj *ObjectWrapper) bool {
return obj.Raw.(*corev1.Pod).Labels["app"] == "test2"
}, 1)
assert.Len(t, objs, 1)
assert.Equal(t, "test2", objs[0].Raw.(*corev1.Pod).Labels["app"])
}
Loading
Loading