From 0b3e94de61139e6ec6fcc16dfd2ed30a3bdca482 Mon Sep 17 00:00:00 2001 From: ctenetlau <74183504+EASYGOING45@users.noreply.github.com> Date: Tue, 10 Sep 2024 16:04:24 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=8C=87=E6=A0=87=E5=8F=91=E7=8E=B0&?= =?UTF-8?q?=E5=90=8C=E6=AD=A5=E5=85=9C=E5=BA=95=E4=BB=BB=E5=8A=A1=E9=80=BB?= =?UTF-8?q?=E8=BE=91=E4=BF=AE=E5=A4=8D=EF=BC=8C=E9=80=82=E9=85=8Dbkbase?= =?UTF-8?q?=E6=96=B0=E9=93=BE=E8=B7=AF=EF=BC=8C=E6=97=A5=E5=BF=97=E8=A1=A5?= =?UTF-8?q?=E5=AE=8C=EF=BC=8C=E5=8D=95=E6=B5=8B=E8=A1=A5=E5=85=85=20--stor?= =?UTF-8?q?y=3D119625124=20(#526)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../models/resulttable/resulttable.go | 1 + .../metadata/service/timeseriesgroup.go | 6 +- .../internal/metadata/task/customreport.go | 54 ++++++--- .../metadata/task/customreport_test.go | 109 ++++++++++++++++++ 4 files changed, 150 insertions(+), 20 deletions(-) create mode 100644 pkg/bk-monitor-worker/internal/metadata/task/customreport_test.go diff --git a/pkg/bk-monitor-worker/internal/metadata/models/resulttable/resulttable.go b/pkg/bk-monitor-worker/internal/metadata/models/resulttable/resulttable.go index 1c76b1e51..75319028a 100644 --- a/pkg/bk-monitor-worker/internal/metadata/models/resulttable/resulttable.go +++ b/pkg/bk-monitor-worker/internal/metadata/models/resulttable/resulttable.go @@ -34,6 +34,7 @@ type ResultTable struct { Label string `gorm:"label;size:128" json:"label"` IsEnable bool `gorm:"is_enable" json:"is_enable"` DataLabel *string `gorm:"data_label;size:128" json:"data_label"` + IsBuiltin bool `gorm:"column:is_builtin" json:"is_builtin"` } // BeforeCreate 新建前时间字段设置为当前时间 diff --git a/pkg/bk-monitor-worker/internal/metadata/service/timeseriesgroup.go b/pkg/bk-monitor-worker/internal/metadata/service/timeseriesgroup.go index a4b9685f7..14ccaf4ce 100644 --- a/pkg/bk-monitor-worker/internal/metadata/service/timeseriesgroup.go +++ b/pkg/bk-monitor-worker/internal/metadata/service/timeseriesgroup.go @@ -56,9 +56,10 @@ func NewTimeSeriesGroupSvc(obj *customreport.TimeSeriesGroup) TimeSeriesGroupSvc } // UpdateTimeSeriesMetrics 从远端存储中同步TS的指标和维度对应关系 -func (s *TimeSeriesGroupSvc) UpdateTimeSeriesMetrics(vmRt string, isInRtList bool) (bool, error) { +func (s *TimeSeriesGroupSvc) UpdateTimeSeriesMetrics(vmRt string, queryFromBkData bool) (bool, error) { + logger.Info("UpdateTimeSeriesMetrics stated, vm_rt: %s, query_from_bkdata: %v", vmRt, queryFromBkData) // 如果在白名单中,则通过计算平台获取指标数据 - if isInRtList { + if queryFromBkData { // 获取 vm rt及metric vmMetrics, err := s.QueryMetricAndDimension(vmRt) if err != nil { @@ -75,6 +76,7 @@ func (s *TimeSeriesGroupSvc) UpdateTimeSeriesMetrics(vmRt string, isInRtList boo return false, nil } // 记录是否有更新,然后推送redis并发布通知 + logger.Info("UpdateTimeSeriesMetrics get redis data for vm_rt: %s, metric_info: %v", vmRt, metricInfo) return s.UpdateMetrics(metricInfo) } diff --git a/pkg/bk-monitor-worker/internal/metadata/task/customreport.go b/pkg/bk-monitor-worker/internal/metadata/task/customreport.go index 91b420ff3..d23496cdc 100644 --- a/pkg/bk-monitor-worker/internal/metadata/task/customreport.go +++ b/pkg/bk-monitor-worker/internal/metadata/task/customreport.go @@ -12,11 +12,14 @@ package task import ( "context" "sync" + "time" "github.com/pkg/errors" + "github.com/TencentBlueKing/bkmonitor-datalink/pkg/bk-monitor-worker/common" "github.com/TencentBlueKing/bkmonitor-datalink/pkg/bk-monitor-worker/config" "github.com/TencentBlueKing/bkmonitor-datalink/pkg/bk-monitor-worker/internal/metadata/models/customreport" + "github.com/TencentBlueKing/bkmonitor-datalink/pkg/bk-monitor-worker/internal/metadata/models/resulttable" "github.com/TencentBlueKing/bkmonitor-datalink/pkg/bk-monitor-worker/internal/metadata/models/storage" "github.com/TencentBlueKing/bkmonitor-datalink/pkg/bk-monitor-worker/internal/metadata/service" "github.com/TencentBlueKing/bkmonitor-datalink/pkg/bk-monitor-worker/store/mysql" @@ -27,14 +30,15 @@ import ( "github.com/TencentBlueKing/bkmonitor-datalink/pkg/utils/logger" ) -// RefreshTimeSeriesMetric : update ts metrics from redis +// RefreshTimeSeriesMetric : update ts metrics from redis or bkdata func RefreshTimeSeriesMetric(ctx context.Context, t *t.Task) error { defer func() { if err := recover(); err != nil { logger.Errorf("RefreshTimeSeriesMetric Runtime panic caught: %v", err) } }() - logger.Info("start to refresh time series metric") + startTime := time.Now() // 记录开始时间 + logger.Info("RefreshTimeSeriesMetric started!") db := mysql.GetDBSession().DB var tsGroupList []customreport.TimeSeriesGroup if err := customreport.NewTimeSeriesGroupQuerySet(db).IsEnableEq(true).IsDeleteEq(false).All(&tsGroupList); err != nil { @@ -50,7 +54,7 @@ func RefreshTimeSeriesMetric(ctx context.Context, t *t.Task) error { for _, chunkDataLabels := range slicex.ChunkSlice(tableIdList, 0) { var tempList []storage.AccessVMRecord if err := storage.NewAccessVMRecordQuerySet(db).Select(storage.AccessVMRecordDBSchema.ResultTableId, storage.AccessVMRecordDBSchema.VmResultTableId).ResultTableIdIn(chunkDataLabels...).All(&tempList); err != nil { - logger.Errorf("get vm table id by monitor table id error, %s", err) + logger.Errorf("RefreshTimeSeriesMetric get vm table id by monitor table id error, %s", err) continue } for _, rtInfo := range tempList { @@ -63,7 +67,7 @@ func RefreshTimeSeriesMetric(ctx context.Context, t *t.Task) error { wlTableIdList := make([]string, 0) if wlTableIdByte, err := client.Get(config.BkDataTableIdListRedisPath); err == nil && wlTableIdByte != nil { if err := jsonx.Unmarshal(wlTableIdByte, &wlTableIdList); err != nil { - logger.Errorf("get white list table id from redis failed, %v", err) + logger.Errorf("RefreshTimeSeriesMetric get white list table id from redis failed, %v", err) } } @@ -88,48 +92,62 @@ func RefreshTimeSeriesMetric(ctx context.Context, t *t.Task) error { for _, eg := range tsGroupList { ch <- struct{}{} // 默认不在白名单中 - isInRtList := false + queryFromBkdata := false // 如果不存在 vm rt, 则不会通过bkbase查询 vmRt, ok := rtMapVmRt[eg.TableID] + + //var ds resulttable.DataSource + + var ds resulttable.DataSource + if err := resulttable.NewDataSourceQuerySet(db).BkDataIdEq(eg.BkDataID).One(&ds); err != nil { + logger.Errorf("RefreshTimeSeriesMetric:table_id %s found datasource record error, %v", eg.TableID, err) + } + if !ok { - logger.Errorf("can not find vm result table id by monitor table id: %s", eg.TableID) - isInRtList = false + logger.Errorf("RefreshTimeSeriesMetric:can not find vm result table id by monitor table id: %s", eg.TableID) + queryFromBkdata = false } else if slicex.IsExistItem(wlTableIdList, eg.TableID) { // 判断是否在白名单中 - isInRtList = true + logger.Infof("RefreshTimeSeriesMetric:table_id %s ,data_id %v in white list, will query metrics from bkdata", eg.TableID, eg.BkDataID) + queryFromBkdata = true + } else if ds.CreatedFrom == common.DataIdFromBkData { + logger.Infof("RefreshTimeSeriesMetric:table_id %s ,data_id %v created from bkbase, will query metrics from bkdata", eg.TableID, eg.BkDataID) + // 如果TSGroup的创建来源是计算平台,则需从计算平台获取相应的指标 + queryFromBkdata = true } - go func(ts customreport.TimeSeriesGroup, tableIdChan chan string, wg *sync.WaitGroup, ch chan struct{}, vmRt string, isInRtList bool) { + go func(ts customreport.TimeSeriesGroup, tableIdChan chan string, wg *sync.WaitGroup, ch chan struct{}, vmRt string, queryFromBkdata bool) { defer func() { <-ch wg.Done() }() svc := service.NewTimeSeriesGroupSvc(&ts) - updated, err := svc.UpdateTimeSeriesMetrics(vmRt, isInRtList) + updated, err := svc.UpdateTimeSeriesMetrics(vmRt, queryFromBkdata) if err != nil { - logger.Errorf("time_series_group: [%s] try to update metrics from bkdata or redis failed, %v", ts.TableID, err) + logger.Errorf("RefreshTimeSeriesMetric: time_series_group: [%s] try to update metrics from bkdata or redis failed, %v", ts.TableID, err) return } - logger.Infof("time_series_group: [%s] metric update from bkdata or redis success, updated: %v", ts.TableID, updated) + logger.Infof("RefreshTimeSeriesMetric: time_series_group: [%s] metric update from bkdata or redis success, updated: %v", ts.TableID, updated) if updated { tableIdChan <- svc.TableID } - }(eg, tableIdChan, &wg, ch, vmRt, isInRtList) + }(eg, tableIdChan, &wg, ch, vmRt, queryFromBkdata) } + wg.Wait() close(tableIdChan) // 防止数据没有读完 wgReceive.Wait() if len(updatedTableIds) != 0 { - logger.Info("start to push table id to redis") + logger.Info("RefreshTimeSeriesMetric,start to push table id to redis, updatedTableIds %v", updatedTableIds) pusher := service.NewSpacePusher() if err := pusher.PushTableIdDetail(updatedTableIds, true, false); err != nil { - return errors.Wrapf(err, "metric update to push table id detaild for [%v] failed", updatedTableIds) + return errors.Wrapf(err, "RefreshTimeSeriesMetric,metric update to push table id detaild for [%v] failed", updatedTableIds) } - logger.Infof("metric updated of table_id [%v]", updatedTableIds) + logger.Infof("RefreshTimeSeriesMetric,metric updated of table_id [%v]", updatedTableIds) } - logger.Info("refresh time series metric success") - + elapsedTime := time.Since(startTime) // 计算耗时 + logger.Infof("RefreshTimeSeriesMetric finished succuessfully, took %s", elapsedTime) return nil } diff --git a/pkg/bk-monitor-worker/internal/metadata/task/customreport_test.go b/pkg/bk-monitor-worker/internal/metadata/task/customreport_test.go new file mode 100644 index 000000000..5a1e902cc --- /dev/null +++ b/pkg/bk-monitor-worker/internal/metadata/task/customreport_test.go @@ -0,0 +1,109 @@ +// Tencent is pleased to support the open source community by making +// 蓝鲸智云 - 监控平台 (BlueKing - Monitor) available. +// Copyright (C) 2022 THL A29 Limited, a Tencent company. All rights reserved. +// Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License. +// You may obtain a copy of the License at http://opensource.org/licenses/MIT +// Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on +// an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the +// specific language governing permissions and limitations under the License. + +package task + +import ( + "context" + "testing" + "time" + + goRedis "github.com/go-redis/redis/v8" + "github.com/jinzhu/gorm" + "github.com/stretchr/testify/assert" + + "github.com/TencentBlueKing/bkmonitor-datalink/pkg/bk-monitor-worker/common" + "github.com/TencentBlueKing/bkmonitor-datalink/pkg/bk-monitor-worker/internal/metadata/models/customreport" + "github.com/TencentBlueKing/bkmonitor-datalink/pkg/bk-monitor-worker/internal/metadata/models/resulttable" + "github.com/TencentBlueKing/bkmonitor-datalink/pkg/bk-monitor-worker/internal/metadata/models/storage" + "github.com/TencentBlueKing/bkmonitor-datalink/pkg/bk-monitor-worker/store/mysql" + ta "github.com/TencentBlueKing/bkmonitor-datalink/pkg/bk-monitor-worker/task" + "github.com/TencentBlueKing/bkmonitor-datalink/pkg/bk-monitor-worker/utils/jsonx" + "github.com/TencentBlueKing/bkmonitor-datalink/pkg/bk-monitor-worker/utils/mocker" +) + +func TestRefreshTimeSeriesMetric_CreatedFromBkData(t *testing.T) { + // 初始化模拟数据库配置 + mocker.InitTestDBConfig("../../../bmw_test.yaml") + db := mysql.GetDBSession().DB + + // 准备数据 + tsGroup := customreport.TimeSeriesGroup{ + CustomGroupBase: customreport.CustomGroupBase{ + BkDataID: 22112, + TableID: "test_for_metric_update.base", + IsEnable: true, + }, + TimeSeriesGroupID: 3343, + TimeSeriesGroupName: "test_for_metric_update_group", + } + db.Delete(&tsGroup, "bk_data_id = ?", tsGroup.BkDataID) + err := tsGroup.Create(db) + assert.NoError(t, err) + + ds := resulttable.DataSource{ + BkDataId: 22112, + DataName: "test_for_metric_update_name", + CreatedFrom: common.DataIdFromBkData, + } + db.Delete(&ds, "bk_data_id = ?", ds.BkDataId) + err = db.Create(&ds).Error + assert.NoError(t, err) + + // AccessVMRecord + vmTableName := "vm_table_name" + vmTable := storage.AccessVMRecord{ + ResultTableId: "test_for_metric_update.base", + VmResultTableId: vmTableName, + } + db.Delete(&vmTable) + err = vmTable.Create(db) + assert.NoError(t, err) + + // Mock Redis + mockerClient, redisPatch := mocker.DependenceRedisMocker() + defer redisPatch.Reset() + mockerClient.ZcountValue = 2 + mockerClient.ZRangeByScoreWithScoresValue = append(mockerClient.ZRangeByScoreWithScoresValue, []goRedis.Z{ + {Score: float64(time.Now().Add(-600 * time.Second).Unix()), Member: "metric_a"}, + {Score: float64(time.Now().Add(-600 * time.Second).Unix()), Member: "metric_b"}, + {Score: float64(time.Now().Add(-100000 * time.Second).Unix()), Member: "metric_expired"}, + }...) + mockerClient.HMGetValue = append(mockerClient.HMGetValue, []interface{}{ + "{\"dimensions\":{\"d1\":{\"last_update_time\":1685503141,\"values\":[]},\"d2\":{\"last_update_time\":1685503141,\"values\":[]}}}", + "{\"dimensions\":{\"d3\":{\"last_update_time\":1685503141,\"values\":[]},\"d4\":{\"last_update_time\":1685503141,\"values\":[]}}}", + }...) + //mockerClient.GetValue = []byte(`["test_for_metric_update.base"]`) + + // 直接调用方法 + ctx := context.TODO() + task := &ta.Task{} + err = RefreshTimeSeriesMetric(ctx, task) + assert.NoError(t, err) + + // 验证结果 + var metricA, metricB, metricExpired customreport.TimeSeriesMetric + var tagListA, tagListB []string + err = customreport.NewTimeSeriesMetricQuerySet(db).GroupIDEq(tsGroup.TimeSeriesGroupID).FieldNameEq("metric_a").One(&metricA) + assert.NoError(t, err) + err = jsonx.UnmarshalString(metricA.TagList, &tagListA) + assert.NoError(t, err) + assert.ElementsMatch(t, []string{"d1", "d2", "target"}, tagListA) + assert.Equal(t, "test_for_metric_update.metric_a", metricA.TableID) + + err = customreport.NewTimeSeriesMetricQuerySet(db).GroupIDEq(tsGroup.TimeSeriesGroupID).FieldNameEq("metric_b").One(&metricB) + assert.NoError(t, err) + err = jsonx.UnmarshalString(metricB.TagList, &tagListB) + assert.NoError(t, err) + assert.ElementsMatch(t, []string{"d3", "d4", "target"}, tagListB) + assert.Equal(t, "test_for_metric_update.metric_b", metricB.TableID) + + err = customreport.NewTimeSeriesMetricQuerySet(db).GroupIDEq(tsGroup.TimeSeriesGroupID).FieldNameEq("metric_expired").One(&metricExpired) + assert.ErrorIs(t, gorm.ErrRecordNotFound, err) +}