Skip to content

Commit

Permalink
feat: 指标发现&同步兜底任务逻辑修复,适配bkbase新链路,日志补完,单测补充 --story=119625124 (#526)
Browse files Browse the repository at this point in the history
  • Loading branch information
EASYGOING45 authored Sep 10, 2024
1 parent 4f76538 commit 0b3e94d
Show file tree
Hide file tree
Showing 4 changed files with 150 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type ResultTable struct {
Label string `gorm:"label;size:128" json:"label"`
IsEnable bool `gorm:"is_enable" json:"is_enable"`
DataLabel *string `gorm:"data_label;size:128" json:"data_label"`
IsBuiltin bool `gorm:"column:is_builtin" json:"is_builtin"`
}

// BeforeCreate 新建前时间字段设置为当前时间
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,10 @@ func NewTimeSeriesGroupSvc(obj *customreport.TimeSeriesGroup) TimeSeriesGroupSvc
}

// UpdateTimeSeriesMetrics 从远端存储中同步TS的指标和维度对应关系
func (s *TimeSeriesGroupSvc) UpdateTimeSeriesMetrics(vmRt string, isInRtList bool) (bool, error) {
func (s *TimeSeriesGroupSvc) UpdateTimeSeriesMetrics(vmRt string, queryFromBkData bool) (bool, error) {
logger.Info("UpdateTimeSeriesMetrics stated, vm_rt: %s, query_from_bkdata: %v", vmRt, queryFromBkData)
// 如果在白名单中,则通过计算平台获取指标数据
if isInRtList {
if queryFromBkData {
// 获取 vm rt及metric
vmMetrics, err := s.QueryMetricAndDimension(vmRt)
if err != nil {
Expand All @@ -75,6 +76,7 @@ func (s *TimeSeriesGroupSvc) UpdateTimeSeriesMetrics(vmRt string, isInRtList boo
return false, nil
}
// 记录是否有更新,然后推送redis并发布通知
logger.Info("UpdateTimeSeriesMetrics get redis data for vm_rt: %s, metric_info: %v", vmRt, metricInfo)
return s.UpdateMetrics(metricInfo)
}

Expand Down
54 changes: 36 additions & 18 deletions pkg/bk-monitor-worker/internal/metadata/task/customreport.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,14 @@ package task
import (
"context"
"sync"
"time"

"github.com/pkg/errors"

"github.com/TencentBlueKing/bkmonitor-datalink/pkg/bk-monitor-worker/common"
"github.com/TencentBlueKing/bkmonitor-datalink/pkg/bk-monitor-worker/config"
"github.com/TencentBlueKing/bkmonitor-datalink/pkg/bk-monitor-worker/internal/metadata/models/customreport"
"github.com/TencentBlueKing/bkmonitor-datalink/pkg/bk-monitor-worker/internal/metadata/models/resulttable"
"github.com/TencentBlueKing/bkmonitor-datalink/pkg/bk-monitor-worker/internal/metadata/models/storage"
"github.com/TencentBlueKing/bkmonitor-datalink/pkg/bk-monitor-worker/internal/metadata/service"
"github.com/TencentBlueKing/bkmonitor-datalink/pkg/bk-monitor-worker/store/mysql"
Expand All @@ -27,14 +30,15 @@ import (
"github.com/TencentBlueKing/bkmonitor-datalink/pkg/utils/logger"
)

// RefreshTimeSeriesMetric : update ts metrics from redis
// RefreshTimeSeriesMetric : update ts metrics from redis or bkdata
func RefreshTimeSeriesMetric(ctx context.Context, t *t.Task) error {
defer func() {
if err := recover(); err != nil {
logger.Errorf("RefreshTimeSeriesMetric Runtime panic caught: %v", err)
}
}()
logger.Info("start to refresh time series metric")
startTime := time.Now() // 记录开始时间
logger.Info("RefreshTimeSeriesMetric started!")
db := mysql.GetDBSession().DB
var tsGroupList []customreport.TimeSeriesGroup
if err := customreport.NewTimeSeriesGroupQuerySet(db).IsEnableEq(true).IsDeleteEq(false).All(&tsGroupList); err != nil {
Expand All @@ -50,7 +54,7 @@ func RefreshTimeSeriesMetric(ctx context.Context, t *t.Task) error {
for _, chunkDataLabels := range slicex.ChunkSlice(tableIdList, 0) {
var tempList []storage.AccessVMRecord
if err := storage.NewAccessVMRecordQuerySet(db).Select(storage.AccessVMRecordDBSchema.ResultTableId, storage.AccessVMRecordDBSchema.VmResultTableId).ResultTableIdIn(chunkDataLabels...).All(&tempList); err != nil {
logger.Errorf("get vm table id by monitor table id error, %s", err)
logger.Errorf("RefreshTimeSeriesMetric get vm table id by monitor table id error, %s", err)
continue
}
for _, rtInfo := range tempList {
Expand All @@ -63,7 +67,7 @@ func RefreshTimeSeriesMetric(ctx context.Context, t *t.Task) error {
wlTableIdList := make([]string, 0)
if wlTableIdByte, err := client.Get(config.BkDataTableIdListRedisPath); err == nil && wlTableIdByte != nil {
if err := jsonx.Unmarshal(wlTableIdByte, &wlTableIdList); err != nil {
logger.Errorf("get white list table id from redis failed, %v", err)
logger.Errorf("RefreshTimeSeriesMetric get white list table id from redis failed, %v", err)
}
}

Expand All @@ -88,48 +92,62 @@ func RefreshTimeSeriesMetric(ctx context.Context, t *t.Task) error {
for _, eg := range tsGroupList {
ch <- struct{}{}
// 默认不在白名单中
isInRtList := false
queryFromBkdata := false
// 如果不存在 vm rt, 则不会通过bkbase查询
vmRt, ok := rtMapVmRt[eg.TableID]

//var ds resulttable.DataSource

var ds resulttable.DataSource
if err := resulttable.NewDataSourceQuerySet(db).BkDataIdEq(eg.BkDataID).One(&ds); err != nil {
logger.Errorf("RefreshTimeSeriesMetric:table_id %s found datasource record error, %v", eg.TableID, err)
}

if !ok {
logger.Errorf("can not find vm result table id by monitor table id: %s", eg.TableID)
isInRtList = false
logger.Errorf("RefreshTimeSeriesMetric:can not find vm result table id by monitor table id: %s", eg.TableID)
queryFromBkdata = false
} else if slicex.IsExistItem(wlTableIdList, eg.TableID) {
// 判断是否在白名单中
isInRtList = true
logger.Infof("RefreshTimeSeriesMetric:table_id %s ,data_id %v in white list, will query metrics from bkdata", eg.TableID, eg.BkDataID)
queryFromBkdata = true
} else if ds.CreatedFrom == common.DataIdFromBkData {
logger.Infof("RefreshTimeSeriesMetric:table_id %s ,data_id %v created from bkbase, will query metrics from bkdata", eg.TableID, eg.BkDataID)
// 如果TSGroup的创建来源是计算平台,则需从计算平台获取相应的指标
queryFromBkdata = true
}
go func(ts customreport.TimeSeriesGroup, tableIdChan chan string, wg *sync.WaitGroup, ch chan struct{}, vmRt string, isInRtList bool) {
go func(ts customreport.TimeSeriesGroup, tableIdChan chan string, wg *sync.WaitGroup, ch chan struct{}, vmRt string, queryFromBkdata bool) {
defer func() {
<-ch
wg.Done()
}()

svc := service.NewTimeSeriesGroupSvc(&ts)
updated, err := svc.UpdateTimeSeriesMetrics(vmRt, isInRtList)
updated, err := svc.UpdateTimeSeriesMetrics(vmRt, queryFromBkdata)
if err != nil {
logger.Errorf("time_series_group: [%s] try to update metrics from bkdata or redis failed, %v", ts.TableID, err)
logger.Errorf("RefreshTimeSeriesMetric: time_series_group: [%s] try to update metrics from bkdata or redis failed, %v", ts.TableID, err)
return
}
logger.Infof("time_series_group: [%s] metric update from bkdata or redis success, updated: %v", ts.TableID, updated)
logger.Infof("RefreshTimeSeriesMetric: time_series_group: [%s] metric update from bkdata or redis success, updated: %v", ts.TableID, updated)
if updated {
tableIdChan <- svc.TableID
}
}(eg, tableIdChan, &wg, ch, vmRt, isInRtList)
}(eg, tableIdChan, &wg, ch, vmRt, queryFromBkdata)
}

wg.Wait()
close(tableIdChan)
// 防止数据没有读完
wgReceive.Wait()
if len(updatedTableIds) != 0 {
logger.Info("start to push table id to redis")
logger.Info("RefreshTimeSeriesMetric,start to push table id to redis, updatedTableIds %v", updatedTableIds)
pusher := service.NewSpacePusher()
if err := pusher.PushTableIdDetail(updatedTableIds, true, false); err != nil {
return errors.Wrapf(err, "metric update to push table id detaild for [%v] failed", updatedTableIds)
return errors.Wrapf(err, "RefreshTimeSeriesMetric,metric update to push table id detaild for [%v] failed", updatedTableIds)
}
logger.Infof("metric updated of table_id [%v]", updatedTableIds)
logger.Infof("RefreshTimeSeriesMetric,metric updated of table_id [%v]", updatedTableIds)
}
logger.Info("refresh time series metric success")

elapsedTime := time.Since(startTime) // 计算耗时
logger.Infof("RefreshTimeSeriesMetric finished succuessfully, took %s", elapsedTime)
return nil
}

Expand Down
109 changes: 109 additions & 0 deletions pkg/bk-monitor-worker/internal/metadata/task/customreport_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
// Tencent is pleased to support the open source community by making
// 蓝鲸智云 - 监控平台 (BlueKing - Monitor) available.
// Copyright (C) 2022 THL A29 Limited, a Tencent company. All rights reserved.
// Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License.
// You may obtain a copy of the License at http://opensource.org/licenses/MIT
// Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
// an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
// specific language governing permissions and limitations under the License.

package task

import (
"context"
"testing"
"time"

goRedis "github.com/go-redis/redis/v8"
"github.com/jinzhu/gorm"
"github.com/stretchr/testify/assert"

"github.com/TencentBlueKing/bkmonitor-datalink/pkg/bk-monitor-worker/common"
"github.com/TencentBlueKing/bkmonitor-datalink/pkg/bk-monitor-worker/internal/metadata/models/customreport"
"github.com/TencentBlueKing/bkmonitor-datalink/pkg/bk-monitor-worker/internal/metadata/models/resulttable"
"github.com/TencentBlueKing/bkmonitor-datalink/pkg/bk-monitor-worker/internal/metadata/models/storage"
"github.com/TencentBlueKing/bkmonitor-datalink/pkg/bk-monitor-worker/store/mysql"
ta "github.com/TencentBlueKing/bkmonitor-datalink/pkg/bk-monitor-worker/task"
"github.com/TencentBlueKing/bkmonitor-datalink/pkg/bk-monitor-worker/utils/jsonx"
"github.com/TencentBlueKing/bkmonitor-datalink/pkg/bk-monitor-worker/utils/mocker"
)

func TestRefreshTimeSeriesMetric_CreatedFromBkData(t *testing.T) {
// 初始化模拟数据库配置
mocker.InitTestDBConfig("../../../bmw_test.yaml")
db := mysql.GetDBSession().DB

// 准备数据
tsGroup := customreport.TimeSeriesGroup{
CustomGroupBase: customreport.CustomGroupBase{
BkDataID: 22112,
TableID: "test_for_metric_update.base",
IsEnable: true,
},
TimeSeriesGroupID: 3343,
TimeSeriesGroupName: "test_for_metric_update_group",
}
db.Delete(&tsGroup, "bk_data_id = ?", tsGroup.BkDataID)
err := tsGroup.Create(db)
assert.NoError(t, err)

ds := resulttable.DataSource{
BkDataId: 22112,
DataName: "test_for_metric_update_name",
CreatedFrom: common.DataIdFromBkData,
}
db.Delete(&ds, "bk_data_id = ?", ds.BkDataId)
err = db.Create(&ds).Error
assert.NoError(t, err)

// AccessVMRecord
vmTableName := "vm_table_name"
vmTable := storage.AccessVMRecord{
ResultTableId: "test_for_metric_update.base",
VmResultTableId: vmTableName,
}
db.Delete(&vmTable)
err = vmTable.Create(db)
assert.NoError(t, err)

// Mock Redis
mockerClient, redisPatch := mocker.DependenceRedisMocker()
defer redisPatch.Reset()
mockerClient.ZcountValue = 2
mockerClient.ZRangeByScoreWithScoresValue = append(mockerClient.ZRangeByScoreWithScoresValue, []goRedis.Z{
{Score: float64(time.Now().Add(-600 * time.Second).Unix()), Member: "metric_a"},
{Score: float64(time.Now().Add(-600 * time.Second).Unix()), Member: "metric_b"},
{Score: float64(time.Now().Add(-100000 * time.Second).Unix()), Member: "metric_expired"},
}...)
mockerClient.HMGetValue = append(mockerClient.HMGetValue, []interface{}{
"{\"dimensions\":{\"d1\":{\"last_update_time\":1685503141,\"values\":[]},\"d2\":{\"last_update_time\":1685503141,\"values\":[]}}}",
"{\"dimensions\":{\"d3\":{\"last_update_time\":1685503141,\"values\":[]},\"d4\":{\"last_update_time\":1685503141,\"values\":[]}}}",
}...)
//mockerClient.GetValue = []byte(`["test_for_metric_update.base"]`)

// 直接调用方法
ctx := context.TODO()
task := &ta.Task{}
err = RefreshTimeSeriesMetric(ctx, task)
assert.NoError(t, err)

// 验证结果
var metricA, metricB, metricExpired customreport.TimeSeriesMetric
var tagListA, tagListB []string
err = customreport.NewTimeSeriesMetricQuerySet(db).GroupIDEq(tsGroup.TimeSeriesGroupID).FieldNameEq("metric_a").One(&metricA)
assert.NoError(t, err)
err = jsonx.UnmarshalString(metricA.TagList, &tagListA)
assert.NoError(t, err)
assert.ElementsMatch(t, []string{"d1", "d2", "target"}, tagListA)
assert.Equal(t, "test_for_metric_update.metric_a", metricA.TableID)

err = customreport.NewTimeSeriesMetricQuerySet(db).GroupIDEq(tsGroup.TimeSeriesGroupID).FieldNameEq("metric_b").One(&metricB)
assert.NoError(t, err)
err = jsonx.UnmarshalString(metricB.TagList, &tagListB)
assert.NoError(t, err)
assert.ElementsMatch(t, []string{"d3", "d4", "target"}, tagListB)
assert.Equal(t, "test_for_metric_update.metric_b", metricB.TableID)

err = customreport.NewTimeSeriesMetricQuerySet(db).GroupIDEq(tsGroup.TimeSeriesGroupID).FieldNameEq("metric_expired").One(&metricExpired)
assert.ErrorIs(t, gorm.ErrRecordNotFound, err)
}

0 comments on commit 0b3e94d

Please sign in to comment.