Skip to content

Commit

Permalink
feat: add more fields in K8s meta server
Browse files Browse the repository at this point in the history
  • Loading branch information
Abingcbc committed Nov 19, 2024
1 parent 257aad1 commit f8203d4
Show file tree
Hide file tree
Showing 5 changed files with 246 additions and 70 deletions.
31 changes: 30 additions & 1 deletion pkg/helper/k8smeta/k8s_meta_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,13 @@ func (m *k8sMetaCache) List() []*ObjectWrapper {
return m.metaStore.List()
}

func (m *k8sMetaCache) Filter(filterFunc func(*ObjectWrapper) bool, limit int) []*ObjectWrapper {
return m.metaStore.Filter(filterFunc, limit)
}

func (m *k8sMetaCache) RegisterSendFunc(key string, sendFunc SendFunc, interval int) {
m.metaStore.RegisterSendFunc(key, sendFunc, interval)
logger.Info(context.Background(), "register send func", m.resourceType)
}

func (m *k8sMetaCache) UnRegisterSendFunc(key string) {
Expand Down Expand Up @@ -186,6 +191,8 @@ func getIdxRules(resourceType string) []IdxFunc {
return []IdxFunc{generateNodeKey}
case POD:
return []IdxFunc{generateCommonKey, generatePodIPKey, generateContainerIDKey, generateHostIPKey}
case SERVICE:
return []IdxFunc{generateCommonKey, generateServiceIPKey}
default:
return []IdxFunc{generateCommonKey}
}
Expand Down Expand Up @@ -274,7 +281,7 @@ func generateContainerIDKey(obj interface{}) ([]string, error) {
}
result := make([]string, len(pod.Status.ContainerStatuses))
for i, containerStatus := range pod.Status.ContainerStatuses {
result[i] = containerStatus.ContainerID
result[i] = truncateContainerID(containerStatus.ContainerID)
}
return result, nil
}
Expand All @@ -286,3 +293,25 @@ func generateHostIPKey(obj interface{}) ([]string, error) {
}
return []string{pod.Status.HostIP}, nil
}

func generateServiceIPKey(obj interface{}) ([]string, error) {
svc, ok := obj.(*v1.Service)
if !ok {
return []string{}, fmt.Errorf("object is not a service")
}
results := make([]string, 0)
for _, ip := range svc.Spec.ClusterIPs {
if ip != "" {
results = append(results, ip)
}
}
for _, ip := range svc.Spec.ExternalIPs {
if ip != "" {
results = append(results, ip)
}
}
if svc.Spec.LoadBalancerIP != "" {
results = append(results, svc.Spec.LoadBalancerIP)
}
return results, nil
}
9 changes: 7 additions & 2 deletions pkg/helper/k8smeta/k8s_meta_const.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,12 +124,17 @@ const (
)

type PodMetadata struct {
PodName string `json:"podName"`
StartTime int64 `json:"startTime"`
Namespace string `json:"namespace"`
WorkloadName string `json:"workloadName"`
WorkloadKind string `json:"workloadKind"`
ServiceName string `json:"serviceName"`
Labels map[string]string `json:"labels"`
Envs map[string]string `json:"envs"`
Images map[string]string `json:"images"`
IsDeleted bool `json:"-"`

ServiceName string `json:"serviceName,omitempty"`
ContainerIDs []string `json:"containerIDs,omitempty"`
PodIP string `json:"podIP,omitempty"`
IsDeleted bool `json:"-"`
}
58 changes: 56 additions & 2 deletions pkg/helper/k8smeta/k8s_meta_deferred_deletion_meta_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,12 @@ func (m *DeferredDeletionMetaStore) Get(key []string) map[string][]*ObjectWrappe
for _, k := range key {
realKeys, ok := m.Index[k]
if !ok {
return nil
continue
}
for _, realKey := range realKeys {
if _, ok := result[k]; !ok {
result[k] = make([]*ObjectWrapper, 0)
}
result[k] = append(result[k], m.Items[realKey])
}
}
Expand All @@ -84,6 +87,25 @@ func (m *DeferredDeletionMetaStore) List() []*ObjectWrapper {
return result
}

func (m *DeferredDeletionMetaStore) Filter(filterFunc func(*ObjectWrapper) bool, limit int) []*ObjectWrapper {
m.lock.RLock()
defer m.lock.RUnlock()
result := make([]*ObjectWrapper, 0)
for _, item := range m.Items {
if filterFunc != nil {
if filterFunc(item) {
result = append(result, item)
}
} else {
result = append(result, item)
}
if limit > 0 && len(result) >= limit {
break
}
}
return result
}

func (m *DeferredDeletionMetaStore) RegisterSendFunc(key string, f SendFunc, interval int) {
sendFuncWithStopCh := &SendFuncWithStopCh{
SendFunc: f,
Expand Down Expand Up @@ -117,6 +139,10 @@ func (m *DeferredDeletionMetaStore) RegisterSendFunc(key string, f SendFunc, int
case <-ticker.C:
m.eventCh <- event
case <-sendFuncWithStopCh.StopCh:
for _, v := range m.Items {
logger.Info(context.Background(), "stop send func", key, "cache", v.ResourceType)
break
}
return
}
}
Expand All @@ -137,23 +163,51 @@ func (m *DeferredDeletionMetaStore) handleEvent() {
case event := <-m.eventCh:
switch event.EventType {
case EventTypeAdd:
for _, v := range m.Items {
logger.Info(context.Background(), "handle add event", "type", event.EventType, "cache", v.ResourceType)
break
}
m.handleAddEvent(event)
case EventTypeUpdate:
for _, v := range m.Items {
logger.Info(context.Background(), "handle update event", "type", event.EventType, "cache", v.ResourceType)
break
}
m.handleUpdateEvent(event)
case EventTypeDelete:
for _, v := range m.Items {
logger.Info(context.Background(), "handle delete event", "type", event.EventType, "cache", v.ResourceType)
break
}
m.handleDeleteEvent(event)
case EventTypeDeferredDelete:
for _, v := range m.Items {
logger.Info(context.Background(), "handle deferred delete event", "type", event.EventType, "cache", v.ResourceType)
break
}
m.handleDeferredDeleteEvent(event)
case EventTypeTimer:
for _, v := range m.Items {
logger.Info(context.Background(), "handle timer event", "type", event.EventType, "cache", v.ResourceType)
break
}
m.handleTimerEvent(event)
default:
logger.Error(context.Background(), "unknown event type", event.EventType)
}
for _, v := range m.Items {
logger.Info(context.Background(), "handle event", "type", event.EventType, "cache", v.ResourceType)
break
}
case <-m.stopCh:
m.sendFuncs.Range(func(key, value interface{}) bool {
close(value.(*SendFuncWithStopCh).StopCh)
return true
})
for _, v := range m.Items {
logger.Info(context.Background(), "stop handle event", "cache", v.ResourceType)
break
}
return
}
}
Expand Down Expand Up @@ -282,7 +336,6 @@ func (m *DeferredDeletionMetaStore) handleTimerEvent(event *K8sMetaEvent) {
sendFuncWithStopCh := f.(*SendFuncWithStopCh)
allItems := make([]*K8sMetaEvent, 0)
m.lock.RLock()
defer m.lock.RUnlock()
for _, obj := range m.Items {
if !obj.Deleted {
obj.LastObservedTime = time.Now().Unix()
Expand All @@ -292,6 +345,7 @@ func (m *DeferredDeletionMetaStore) handleTimerEvent(event *K8sMetaEvent) {
})
}
}
m.lock.RUnlock()
sendFuncWithStopCh.SendFunc(allItems)
}
}
Expand Down
Loading

0 comments on commit f8203d4

Please sign in to comment.