Skip to content

Commit

Permalink
feat: operator 优化 secrets list 内存 --story=120731727 (#624)
Browse files Browse the repository at this point in the history
  • Loading branch information
chenjiandongx authored Nov 18, 2024
1 parent eb31ad1 commit cf233a9
Show file tree
Hide file tree
Showing 10 changed files with 222 additions and 137 deletions.
17 changes: 16 additions & 1 deletion pkg/operator/operator/discover/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,14 +380,17 @@ func (d *BaseDiscover) loopHandleTargetGroup() {

case <-ticker.C:
counter++
tgList, updatedAt := shareddiscovery.FetchTargetGroups(d.UK())
// 避免 skip 情况下多申请不必要的内存
updatedAt := shareddiscovery.FetchTargetGroupsUpdatedAt(d.UK())
logger.Debugf("%s updated at: %v", d.Name(), time.Unix(updatedAt, 0))
if time.Now().Unix()-updatedAt > duration*2 && counter%resync != 0 && d.fetched {
logger.Debugf("%s found nothing changed, skip targetgourps handled", d.Name())
continue
}
d.fetched = true

// 真正需要变更时才 fetch targetgroups
tgList := shareddiscovery.FetchTargetGroups(d.UK())
for _, tg := range tgList {
if tg == nil {
continue
Expand Down Expand Up @@ -571,6 +574,12 @@ func (d *BaseDiscover) notify(source string, childConfigs []*ChildConfig) {
d.childConfigMut.Lock()
defer d.childConfigMut.Unlock()

// 如果新的 source/childconfigs 为空且之前的缓存也为空 那就无需对比处理了
if len(childConfigs) == 0 && len(d.childConfigGroups[source]) == 0 {
logger.Debugf("%s skip handle notify", d.Name())
return
}

if _, ok := d.childConfigGroups[source]; !ok {
d.childConfigGroups[source] = make(map[uint64]*ChildConfig)
}
Expand Down Expand Up @@ -609,6 +618,12 @@ func (d *BaseDiscover) notify(source string, childConfigs []*ChildConfig) {
logger.Infof("%s found targetgroup.source changed", source)
Publish()
}

// 删除事件 即后续 source 可能不会再有任何事件了
if len(d.childConfigGroups[source]) == 0 {
delete(d.childConfigGroups, source)
logger.Infof("delete source (%s), cause no childconfigs", source)
}
}

// populateLabels builds a label set from the given label set and scrape configuration.
Expand Down
36 changes: 29 additions & 7 deletions pkg/operator/operator/discover/shareddiscovery/shared_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,16 +74,28 @@ type tgWithTime struct {
updatedAt int64
}

// FetchTargetGroups 获取缓存 targetgroups 以及最新更新时间
func FetchTargetGroups(uk string) ([]*targetgroup.Group, int64) {
// FetchTargetGroups 获取缓存 targetgroups
func FetchTargetGroups(uk string) []*targetgroup.Group {
sharedDiscoveryLock.Lock()
defer sharedDiscoveryLock.Unlock()

if d, ok := sharedDiscoveryMap[uk]; ok {
return d.fetch()
}

return nil, 0
return nil
}

// FetchTargetGroupsUpdatedAt 获取缓存最新更新时间
func FetchTargetGroupsUpdatedAt(uk string) int64 {
sharedDiscoveryLock.Lock()
defer sharedDiscoveryLock.Unlock()

if d, ok := sharedDiscoveryMap[uk]; ok {
return d.fetchUpdatedAt()
}

return 0
}

// Register 注册 shared discovery
Expand Down Expand Up @@ -161,6 +173,7 @@ func (sd *SharedDiscovery) start() {
var total int
for source, tg := range sd.store {
// 超过 10 分钟未更新且已经没有目标的对象需要删除
// 确保 basediscovery 已经处理了删除事件
if now-tg.updatedAt > 600 {
if tg.tg == nil || len(tg.tg.Targets) == 0 {
delete(sd.store, source)
Expand All @@ -178,17 +191,26 @@ func (sd *SharedDiscovery) start() {
}
}

func (sd *SharedDiscovery) fetch() ([]*targetgroup.Group, int64) {
func (sd *SharedDiscovery) fetch() []*targetgroup.Group {
sd.mut.RLock()
defer sd.mut.RUnlock()

ret := make([]*targetgroup.Group, 0, len(sd.store))
for _, v := range sd.store {
ret = append(ret, v.tg)
}
return ret
}

func (sd *SharedDiscovery) fetchUpdatedAt() int64 {
sd.mut.RLock()
defer sd.mut.RUnlock()

var maxTs int64 = math.MinInt64
ret := make([]*targetgroup.Group, 0, 2)
for _, v := range sd.store {
if maxTs < v.updatedAt {
maxTs = v.updatedAt
}
ret = append(ret, v.tg)
}
return ret, maxTs
return maxTs
}
24 changes: 11 additions & 13 deletions pkg/operator/operator/kubelet.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,11 +134,8 @@ func (c *Operator) syncNodeEndpoints(ctx context.Context) error {
},
}

nodes, err := c.client.CoreV1().Nodes().List(ctx, metav1.ListOptions{})
if err != nil {
return errors.Wrap(err, "listing nodes failed")
}
logger.Debugf("Nodes retrieved from the Kubernetes API, num_nodes:%d", len(nodes.Items))
nodes := c.objectsController.NodeObjs()
logger.Debugf("nodes retrieved from the Kubernetes API, num_nodes: %d", len(nodes))

addresses, errs := getNodeAddresses(nodes)
for _, err := range errs {
Expand Down Expand Up @@ -171,7 +168,7 @@ func (c *Operator) syncNodeEndpoints(ctx context.Context) error {
},
}

err = k8sutils.CreateOrUpdateService(ctx, c.client.CoreV1().Services(cfg.Namespace), svc)
err := k8sutils.CreateOrUpdateService(ctx, c.client.CoreV1().Services(cfg.Namespace), svc)
if err != nil {
return errors.Wrap(err, "synchronizing kubelet service object failed")
}
Expand All @@ -186,23 +183,24 @@ func (c *Operator) syncNodeEndpoints(ctx context.Context) error {
return nil
}

func getNodeAddresses(nodes *corev1.NodeList) ([]corev1.EndpointAddress, []error) {
func getNodeAddresses(nodes []*corev1.Node) ([]corev1.EndpointAddress, []error) {
addresses := make([]corev1.EndpointAddress, 0)
errs := make([]error, 0)

for _, n := range nodes.Items {
address, _, err := k8sutils.GetNodeAddress(n)
for i := 0; i < len(nodes); i++ {
node := nodes[i]
address, _, err := k8sutils.GetNodeAddress(*node)
if err != nil {
errs = append(errs, errors.Wrapf(err, "failed to determine hostname for node (%s)", n.Name))
errs = append(errs, errors.Wrapf(err, "failed to determine hostname for node (%s)", node.Name))
continue
}
addresses = append(addresses, corev1.EndpointAddress{
IP: address,
TargetRef: &corev1.ObjectReference{
Kind: "Node",
Name: n.Name,
UID: n.UID,
APIVersion: n.APIVersion,
Name: node.Name,
UID: node.UID,
APIVersion: node.APIVersion,
},
})
}
Expand Down
66 changes: 31 additions & 35 deletions pkg/operator/operator/kubelet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,24 +22,22 @@ import (
func TestGetNodeAddresses(t *testing.T) {
cases := []struct {
name string
nodes *corev1.NodeList
nodes []*corev1.Node
expectedAddresses []string
expectedErrors int
}{
{
name: "simple",
nodes: &corev1.NodeList{
Items: []corev1.Node{
{
ObjectMeta: metav1.ObjectMeta{
Name: "node-0",
},
Status: corev1.NodeStatus{
Addresses: []corev1.NodeAddress{
{
Address: "127.0.0.1",
Type: corev1.NodeInternalIP,
},
nodes: []*corev1.Node{
{
ObjectMeta: metav1.ObjectMeta{
Name: "node-0",
},
Status: corev1.NodeStatus{
Addresses: []corev1.NodeAddress{
{
Address: "127.0.0.1",
Type: corev1.NodeInternalIP,
},
},
},
Expand All @@ -50,31 +48,29 @@ func TestGetNodeAddresses(t *testing.T) {
},
{
name: "missing ip on one node",
nodes: &corev1.NodeList{
Items: []corev1.Node{
{
ObjectMeta: metav1.ObjectMeta{
Name: "node-0",
},
Status: corev1.NodeStatus{
Addresses: []corev1.NodeAddress{
{
Address: "node-0",
Type: corev1.NodeHostName,
},
nodes: []*corev1.Node{
{
ObjectMeta: metav1.ObjectMeta{
Name: "node-0",
},
Status: corev1.NodeStatus{
Addresses: []corev1.NodeAddress{
{
Address: "node-0",
Type: corev1.NodeHostName,
},
},
},
{
ObjectMeta: metav1.ObjectMeta{
Name: "node-1",
},
Status: corev1.NodeStatus{
Addresses: []corev1.NodeAddress{
{
Address: "127.0.0.1",
Type: corev1.NodeInternalIP,
},
},
{
ObjectMeta: metav1.ObjectMeta{
Name: "node-1",
},
Status: corev1.NodeStatus{
Addresses: []corev1.NodeAddress{
{
Address: "127.0.0.1",
Type: corev1.NodeInternalIP,
},
},
},
Expand Down
27 changes: 12 additions & 15 deletions pkg/operator/operator/objectsref/bklogconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,13 +70,6 @@ func (e *bkLogConfigEntity) isVCluster(matcherLabel map[string]string) bool {
return ok
}

func (e *bkLogConfigEntity) getValues(matcherLabel map[string]string, key string, defaultValue string) string {
if v, ok := matcherLabel[key]; ok {
return v
}
return defaultValue
}

func (e *bkLogConfigEntity) getWorkloadName(name string, kind string) string {
if stringx.LowerEq(kind, kindReplicaSet) {
index := strings.LastIndex(name, "-")
Expand All @@ -85,7 +78,11 @@ func (e *bkLogConfigEntity) getWorkloadName(name string, kind string) string {
return name
}

func (e *bkLogConfigEntity) MatchWorkloadName(matcherLabels, matcherAnnotations map[string]string, ownerRefs []OwnerRef) bool {
func (e *bkLogConfigEntity) MatchWorkload(labels, annotations map[string]string, ownerRefs []OwnerRef) bool {
return e.matchWorkloadType(labels, annotations, ownerRefs) && e.matchWorkloadType(labels, annotations, ownerRefs)
}

func (e *bkLogConfigEntity) matchWorkloadName(labels, annotations map[string]string, ownerRefs []OwnerRef) bool {
if e.Obj.Spec.WorkloadName == "" {
return true
}
Expand All @@ -96,9 +93,9 @@ func (e *bkLogConfigEntity) MatchWorkloadName(matcherLabels, matcherAnnotations
}

var names []string
if e.isVCluster(matcherLabels) {
name := e.getValues(matcherAnnotations, configs.G().VCluster.WorkloadNameAnnotationKey, "")
kind := e.getValues(matcherAnnotations, configs.G().VCluster.WorkloadTypeAnnotationKey, "")
if e.isVCluster(labels) {
name := annotations[configs.G().VCluster.WorkloadNameAnnotationKey]
kind := annotations[configs.G().VCluster.WorkloadTypeAnnotationKey]
names = append(names, e.getWorkloadName(name, kind))
} else {
for _, ownerReference := range ownerRefs {
Expand All @@ -117,14 +114,14 @@ func (e *bkLogConfigEntity) MatchWorkloadName(matcherLabels, matcherAnnotations
return false
}

func (e *bkLogConfigEntity) MatchWorkloadType(matcherLabels, matcherAnnotations map[string]string, ownerRefs []OwnerRef) bool {
func (e *bkLogConfigEntity) matchWorkloadType(labels, annotations map[string]string, ownerRefs []OwnerRef) bool {
if e.Obj.Spec.WorkloadType == "" {
return true
}

var kinds []string
if e.isVCluster(matcherLabels) {
kinds = append(kinds, e.getValues(matcherAnnotations, configs.G().VCluster.WorkloadTypeAnnotationKey, ""))
if e.isVCluster(labels) {
kinds = append(kinds, annotations[configs.G().VCluster.WorkloadTypeAnnotationKey])
} else {
for _, ownerReference := range ownerRefs {
kinds = append(kinds, ownerReference.Kind)
Expand Down Expand Up @@ -258,7 +255,7 @@ func (m *BkLogConfigMap) Set(e *bkLogConfigEntity) {
m.entitiesMap[e.UUID()] = e
}

func (m *BkLogConfigMap) RangeBkLogConfig(visitFunc func(e *bkLogConfigEntity)) {
func (m *BkLogConfigMap) Range(visitFunc func(e *bkLogConfigEntity)) {
m.lock.RLock()
defer m.lock.RUnlock()

Expand Down
Loading

0 comments on commit cf233a9

Please sign in to comment.