diff --git a/pkg/helper/k8smeta/k8s_meta_cache.go b/pkg/helper/k8smeta/k8s_meta_cache.go index 1a4fe444ce..c1245d6a76 100644 --- a/pkg/helper/k8smeta/k8s_meta_cache.go +++ b/pkg/helper/k8smeta/k8s_meta_cache.go @@ -69,8 +69,13 @@ func (m *k8sMetaCache) List() []*ObjectWrapper { return m.metaStore.List() } +func (m *k8sMetaCache) Filter(filterFunc func(*ObjectWrapper) bool, limit int) []*ObjectWrapper { + return m.metaStore.Filter(filterFunc, limit) +} + func (m *k8sMetaCache) RegisterSendFunc(key string, sendFunc SendFunc, interval int) { m.metaStore.RegisterSendFunc(key, sendFunc, interval) + logger.Info(context.Background(), "register send func", m.resourceType) } func (m *k8sMetaCache) UnRegisterSendFunc(key string) { @@ -186,6 +191,8 @@ func getIdxRules(resourceType string) []IdxFunc { return []IdxFunc{generateNodeKey} case POD: return []IdxFunc{generateCommonKey, generatePodIPKey, generateContainerIDKey, generateHostIPKey} + case SERVICE: + return []IdxFunc{generateCommonKey, generateServiceIPKey} default: return []IdxFunc{generateCommonKey} } @@ -274,7 +281,7 @@ func generateContainerIDKey(obj interface{}) ([]string, error) { } result := make([]string, len(pod.Status.ContainerStatuses)) for i, containerStatus := range pod.Status.ContainerStatuses { - result[i] = containerStatus.ContainerID + result[i] = truncateContainerID(containerStatus.ContainerID) } return result, nil } @@ -286,3 +293,25 @@ func generateHostIPKey(obj interface{}) ([]string, error) { } return []string{pod.Status.HostIP}, nil } + +func generateServiceIPKey(obj interface{}) ([]string, error) { + svc, ok := obj.(*v1.Service) + if !ok { + return []string{}, fmt.Errorf("object is not a service") + } + results := make([]string, 0) + for _, ip := range svc.Spec.ClusterIPs { + if ip != "" { + results = append(results, ip) + } + } + for _, ip := range svc.Spec.ExternalIPs { + if ip != "" { + results = append(results, ip) + } + } + if svc.Spec.LoadBalancerIP != "" { + results = append(results, svc.Spec.LoadBalancerIP) + } + return results, nil +} diff --git a/pkg/helper/k8smeta/k8s_meta_const.go b/pkg/helper/k8smeta/k8s_meta_const.go index 2797d621be..a1db357527 100644 --- a/pkg/helper/k8smeta/k8s_meta_const.go +++ b/pkg/helper/k8smeta/k8s_meta_const.go @@ -124,12 +124,17 @@ const ( ) type PodMetadata struct { + PodName string `json:"podName"` + StartTime int64 `json:"startTime"` Namespace string `json:"namespace"` WorkloadName string `json:"workloadName"` WorkloadKind string `json:"workloadKind"` - ServiceName string `json:"serviceName"` Labels map[string]string `json:"labels"` Envs map[string]string `json:"envs"` Images map[string]string `json:"images"` - IsDeleted bool `json:"-"` + + ServiceName string `json:"serviceName,omitempty"` + ContainerIDs []string `json:"containerIDs,omitempty"` + PodIP string `json:"podIP,omitempty"` + IsDeleted bool `json:"-"` } diff --git a/pkg/helper/k8smeta/k8s_meta_deferred_deletion_meta_store.go b/pkg/helper/k8smeta/k8s_meta_deferred_deletion_meta_store.go index 4265dbc7d1..afa82097f3 100644 --- a/pkg/helper/k8smeta/k8s_meta_deferred_deletion_meta_store.go +++ b/pkg/helper/k8smeta/k8s_meta_deferred_deletion_meta_store.go @@ -65,9 +65,12 @@ func (m *DeferredDeletionMetaStore) Get(key []string) map[string][]*ObjectWrappe for _, k := range key { realKeys, ok := m.Index[k] if !ok { - return nil + continue } for _, realKey := range realKeys { + if _, ok := result[k]; !ok { + result[k] = make([]*ObjectWrapper, 0) + } result[k] = append(result[k], m.Items[realKey]) } } @@ -84,6 +87,25 @@ func (m *DeferredDeletionMetaStore) List() []*ObjectWrapper { return result } +func (m *DeferredDeletionMetaStore) Filter(filterFunc func(*ObjectWrapper) bool, limit int) []*ObjectWrapper { + m.lock.RLock() + defer m.lock.RUnlock() + result := make([]*ObjectWrapper, 0) + for _, item := range m.Items { + if filterFunc != nil { + if filterFunc(item) { + result = append(result, item) + } + } else { + result = append(result, item) + } + if limit > 0 && len(result) >= limit { + break + } + } + return result +} + func (m *DeferredDeletionMetaStore) RegisterSendFunc(key string, f SendFunc, interval int) { sendFuncWithStopCh := &SendFuncWithStopCh{ SendFunc: f, @@ -117,6 +139,10 @@ func (m *DeferredDeletionMetaStore) RegisterSendFunc(key string, f SendFunc, int case <-ticker.C: m.eventCh <- event case <-sendFuncWithStopCh.StopCh: + for _, v := range m.Items { + logger.Info(context.Background(), "stop send func", key, "cache", v.ResourceType) + break + } return } } @@ -137,23 +163,51 @@ func (m *DeferredDeletionMetaStore) handleEvent() { case event := <-m.eventCh: switch event.EventType { case EventTypeAdd: + for _, v := range m.Items { + logger.Info(context.Background(), "handle add event", "type", event.EventType, "cache", v.ResourceType) + break + } m.handleAddEvent(event) case EventTypeUpdate: + for _, v := range m.Items { + logger.Info(context.Background(), "handle update event", "type", event.EventType, "cache", v.ResourceType) + break + } m.handleUpdateEvent(event) case EventTypeDelete: + for _, v := range m.Items { + logger.Info(context.Background(), "handle delete event", "type", event.EventType, "cache", v.ResourceType) + break + } m.handleDeleteEvent(event) case EventTypeDeferredDelete: + for _, v := range m.Items { + logger.Info(context.Background(), "handle deferred delete event", "type", event.EventType, "cache", v.ResourceType) + break + } m.handleDeferredDeleteEvent(event) case EventTypeTimer: + for _, v := range m.Items { + logger.Info(context.Background(), "handle timer event", "type", event.EventType, "cache", v.ResourceType) + break + } m.handleTimerEvent(event) default: logger.Error(context.Background(), "unknown event type", event.EventType) } + for _, v := range m.Items { + logger.Info(context.Background(), "handle event", "type", event.EventType, "cache", v.ResourceType) + break + } case <-m.stopCh: m.sendFuncs.Range(func(key, value interface{}) bool { close(value.(*SendFuncWithStopCh).StopCh) return true }) + for _, v := range m.Items { + logger.Info(context.Background(), "stop handle event", "cache", v.ResourceType) + break + } return } } @@ -282,7 +336,6 @@ func (m *DeferredDeletionMetaStore) handleTimerEvent(event *K8sMetaEvent) { sendFuncWithStopCh := f.(*SendFuncWithStopCh) allItems := make([]*K8sMetaEvent, 0) m.lock.RLock() - defer m.lock.RUnlock() for _, obj := range m.Items { if !obj.Deleted { obj.LastObservedTime = time.Now().Unix() @@ -292,6 +345,7 @@ func (m *DeferredDeletionMetaStore) handleTimerEvent(event *K8sMetaEvent) { }) } } + m.lock.RUnlock() sendFuncWithStopCh.SendFunc(allItems) } } diff --git a/pkg/helper/k8smeta/k8s_meta_http_server.go b/pkg/helper/k8smeta/k8s_meta_http_server.go index 50fa757bab..258b6156dd 100644 --- a/pkg/helper/k8smeta/k8s_meta_http_server.go +++ b/pkg/helper/k8smeta/k8s_meta_http_server.go @@ -11,6 +11,7 @@ import ( app "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/labels" "github.com/alibaba/ilogtail/pkg/logger" ) @@ -46,8 +47,8 @@ func (m *metadataHandler) K8sServerRun(stopCh <-chan struct{}) error { mux := http.NewServeMux() // TODO: add port in ip endpoint - mux.HandleFunc("/metadata/ip", m.handler(m.handlePodMetaByUniqueID)) - mux.HandleFunc("/metadata/containerid", m.handler(m.handlePodMetaByUniqueID)) + mux.HandleFunc("/metadata/ip", m.handler(m.handlePodMetaByPodIP)) + mux.HandleFunc("/metadata/containerid", m.handler(m.handlePodMetaByContainerID)) mux.HandleFunc("/metadata/host", m.handler(m.handlePodMetaByHostIP)) server.Handler = mux logger.Info(context.Background(), "k8s meta server", "started", "port", port) @@ -74,7 +75,65 @@ func (m *metadataHandler) handler(handleFunc func(w http.ResponseWriter, r *http } } -func (m *metadataHandler) handlePodMetaByUniqueID(w http.ResponseWriter, r *http.Request) { +func (m *metadataHandler) handlePodMetaByPodIP(w http.ResponseWriter, r *http.Request) { + defer r.Body.Close() + var rBody requestBody + // Decode the JSON data into the struct + err := json.NewDecoder(r.Body).Decode(&rBody) + if err != nil { + http.Error(w, "Error parsing JSON: "+err.Error(), http.StatusBadRequest) + return + } + + // Get the metadata + metadata := make(map[string]*PodMetadata) + for _, key := range rBody.Keys { + objs := m.metaManager.cacheMap[POD].Get([]string{key}) + if len(objs) == 0 { + // try service IP + svcObjs := m.metaManager.cacheMap[SERVICE].Get([]string{key}) + for key, svcObj := range svcObjs { + service, ok := svcObj[0].Raw.(*v1.Service) + if !ok { + continue + } + lm := newLabelMatcher(service, labels.SelectorFromSet(service.Spec.Selector)) + podObjs := m.metaManager.cacheMap[POD].Filter(func(ow *ObjectWrapper) bool { + pod := ow.Raw.(*v1.Pod) + if pod.Namespace != service.Namespace { + return false + } + return lm.selector.Matches(labels.Set(pod.Labels)) + }, 1) + if len(podObjs) != 0 { + podMetadata := m.convertObj2PodResponse(podObjs[0]) + podMetadata.ServiceName = service.Name + metadata[key] = podMetadata + } + } + } else { + for key, obj := range objs { + podMetadata := m.convertObj2PodResponse(obj[0]) + metadata[key] = podMetadata + } + } + } + wrapperResponse(w, metadata) +} + +func (m *metadataHandler) convertObj2PodResponse(obj *ObjectWrapper) *PodMetadata { + pod := obj.Raw.(*v1.Pod) + podMetadata := m.getCommonPodMetadata(pod) + containerIDs := make([]string, 0) + for _, container := range pod.Status.ContainerStatuses { + containerIDs = append(containerIDs, truncateContainerID(container.ContainerID)) + } + podMetadata.ContainerIDs = containerIDs + podMetadata.PodIP = pod.Status.PodIP + return podMetadata +} + +func (m *metadataHandler) handlePodMetaByContainerID(w http.ResponseWriter, r *http.Request) { defer r.Body.Close() var rBody requestBody // Decode the JSON data into the struct @@ -88,28 +147,26 @@ func (m *metadataHandler) handlePodMetaByUniqueID(w http.ResponseWriter, r *http metadata := make(map[string]*PodMetadata) objs := m.metaManager.cacheMap[POD].Get(rBody.Keys) for key, obj := range objs { - podMetadata := m.convertObj2PodMetadata(obj) + podMetadata := m.convertObjs2ContainerResponse(obj) if len(podMetadata) > 1 { - logger.Warning(context.Background(), "Multiple pods found for unique ID", key) + logger.Warning(context.Background(), "Multiple pods found for unique container ID", key) } if len(podMetadata) > 0 { metadata[key] = podMetadata[0] } } - // Convert metadata to JSON - metadataJSON, err := json.Marshal(metadata) - if err != nil { - http.Error(w, "Error converting metadata to JSON: "+err.Error(), http.StatusInternalServerError) - return - } - // Set the response content type to application/json - w.Header().Set("Content-Type", "application/json") - // Write the metadata JSON to the response body - _, err = w.Write(metadataJSON) - if err != nil { - http.Error(w, "Error writing response: "+err.Error(), http.StatusInternalServerError) - return + wrapperResponse(w, metadata) +} + +func (m *metadataHandler) convertObjs2ContainerResponse(objs []*ObjectWrapper) []*PodMetadata { + metadatas := make([]*PodMetadata, 0) + for _, obj := range objs { + pod := obj.Raw.(*v1.Pod) + podMetadata := m.getCommonPodMetadata(pod) + podMetadata.PodIP = pod.Status.PodIP + metadatas = append(metadatas, podMetadata) } + return metadatas } func (m *metadataHandler) handlePodMetaByHostIP(w http.ResponseWriter, r *http.Request) { @@ -126,12 +183,85 @@ func (m *metadataHandler) handlePodMetaByHostIP(w http.ResponseWriter, r *http.R metadata := make(map[string]*PodMetadata) objs := m.metaManager.cacheMap[POD].Get(rBody.Keys) for _, obj := range objs { - podMetadata := m.convertObj2PodMetadata(obj) + podMetadata := m.convertObjs2HostResponse(obj) for i, meta := range podMetadata { pod := obj[i].Raw.(*v1.Pod) metadata[pod.Status.PodIP] = meta } } + wrapperResponse(w, metadata) +} + +func (m *metadataHandler) convertObjs2HostResponse(objs []*ObjectWrapper) []*PodMetadata { + metadatas := make([]*PodMetadata, 0) + for _, obj := range objs { + pod := obj.Raw.(*v1.Pod) + podMetadata := m.getCommonPodMetadata(pod) + containerIDs := make([]string, 0) + for _, container := range pod.Status.ContainerStatuses { + containerIDs = append(containerIDs, truncateContainerID(container.ContainerID)) + } + podMetadata.ContainerIDs = containerIDs + metadatas = append(metadatas, podMetadata) + } + return metadatas +} + +func (m *metadataHandler) getCommonPodMetadata(pod *v1.Pod) *PodMetadata { + images := make(map[string]string) + envs := make(map[string]string) + for _, container := range pod.Spec.Containers { + images[container.Name] = container.Image + for _, env := range container.Env { + envs[env.Name] = env.Value + } + } + podMetadata := &PodMetadata{ + PodName: pod.Name, + StartTime: pod.CreationTimestamp.Time.Unix(), + Namespace: pod.Namespace, + Labels: pod.Labels, + Images: images, + Envs: envs, + IsDeleted: false, + } + if len(pod.GetOwnerReferences()) == 0 { + podMetadata.WorkloadName = "" + podMetadata.WorkloadKind = "" + logger.Warning(context.Background(), "Pod has no owner", pod.Name) + } else { + podMetadata.WorkloadName = pod.GetOwnerReferences()[0].Name + podMetadata.WorkloadKind = strings.ToLower(pod.GetOwnerReferences()[0].Kind) + if podMetadata.WorkloadKind == REPLICASET { + // replicaset -> deployment + replicasetKey := generateNameWithNamespaceKey(pod.Namespace, podMetadata.WorkloadName) + replicasets := m.metaManager.cacheMap[REPLICASET].Get([]string{replicasetKey}) + for _, replicaset := range replicasets[replicasetKey] { + logger.Warning(context.Background(), "ReplicaSet has no owner1", podMetadata.WorkloadName) + if len(replicaset.Raw.(*app.ReplicaSet).OwnerReferences) > 0 { + podMetadata.WorkloadName = replicaset.Raw.(*app.ReplicaSet).OwnerReferences[0].Name + podMetadata.WorkloadKind = strings.ToLower(replicaset.Raw.(*app.ReplicaSet).OwnerReferences[0].Kind) + break + } + } + if podMetadata.WorkloadKind == "replicaset" { + logger.Warning(context.Background(), "ReplicaSet has no owner", podMetadata.WorkloadName) + } + } + } + return podMetadata +} + +func truncateContainerID(containerID string) string { + sep := "://" + separated := strings.SplitN(containerID, sep, 2) + if len(separated) < 2 { + return "" + } + return separated[1] +} + +func wrapperResponse(w http.ResponseWriter, metadata map[string]*PodMetadata) { // Convert metadata to JSON metadataJSON, err := json.Marshal(metadata) if err != nil { @@ -147,48 +277,3 @@ func (m *metadataHandler) handlePodMetaByHostIP(w http.ResponseWriter, r *http.R return } } - -func (m *metadataHandler) convertObj2PodMetadata(objs []*ObjectWrapper) []*PodMetadata { - result := make([]*PodMetadata, 0) - for _, obj := range objs { - pod := obj.Raw.(*v1.Pod) - images := make(map[string]string) - for _, container := range pod.Spec.Containers { - images[container.Name] = container.Image - } - envs := make(map[string]string) - for _, container := range pod.Spec.Containers { - for _, env := range container.Env { - envs[env.Name] = env.Value - } - } - podMetadata := &PodMetadata{ - Namespace: pod.Namespace, - Labels: pod.Labels, - Images: images, - Envs: envs, - IsDeleted: false, - } - if len(pod.GetOwnerReferences()) == 0 { - podMetadata.WorkloadName = "" - podMetadata.WorkloadKind = "" - logger.Warning(context.Background(), "Pod has no owner", pod.Name) - } else { - podMetadata.WorkloadName = pod.GetOwnerReferences()[0].Name - podMetadata.WorkloadKind = strings.ToLower(pod.GetOwnerReferences()[0].Kind) - if podMetadata.WorkloadKind == "replicaset" { - // replicaset -> deployment - replicasets := m.metaManager.cacheMap[REPLICASET].Get([]string{podMetadata.WorkloadName}) - for _, replicaset := range replicasets[podMetadata.WorkloadName] { - if len(replicaset.Raw.(*app.ReplicaSet).OwnerReferences) > 0 { - podMetadata.WorkloadName = replicaset.Raw.(*app.ReplicaSet).OwnerReferences[0].Name - podMetadata.WorkloadKind = strings.ToLower(replicaset.Raw.(*app.ReplicaSet).OwnerReferences[0].Kind) - break - } - } - } - } - result = append(result, podMetadata) - } - return result -} diff --git a/pkg/helper/k8smeta/k8s_meta_manager.go b/pkg/helper/k8smeta/k8s_meta_manager.go index 1432b1561e..f75c32dba3 100644 --- a/pkg/helper/k8smeta/k8s_meta_manager.go +++ b/pkg/helper/k8smeta/k8s_meta_manager.go @@ -28,6 +28,7 @@ type MetaCache interface { GetSize() int GetQueueSize() int List() []*ObjectWrapper + Filter(filterFunc func(*ObjectWrapper) bool, limit int) []*ObjectWrapper RegisterSendFunc(key string, sendFunc SendFunc, interval int) UnRegisterSendFunc(key string) init(*kubernetes.Clientset) @@ -210,8 +211,10 @@ func GetMetaManagerMetrics() []map[string]string { cacheSize := 0 for _, cache := range manager.cacheMap { queueSize += cache.GetQueueSize() + if cache.GetQueueSize() > 0 { + logger.Warning(context.Background(), "k8s meta event delay in queue", "resourceType", cache, "queueSize", cache.GetQueueSize()) + } cacheSize += cache.GetSize() - } manager.queueSizeGauge.Set(float64(queueSize)) manager.cacheResourceGauge.Set(float64(cacheSize))