From 43c57ba567bbeabd084d5acf8921548c78750175 Mon Sep 17 00:00:00 2001 From: bellke Date: Wed, 12 Jun 2024 14:16:10 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E8=B0=83=E6=95=B4=E6=9F=A5=E8=AF=A2?= =?UTF-8?q?=E8=AE=A1=E7=AE=97=E5=B9=B3=E5=8F=B0=E6=8C=87=E6=A0=87=E5=8F=91?= =?UTF-8?q?=E7=8E=B0=E5=8A=9F=E8=83=BD=20(#369)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pkg/bk-monitor-worker/broker/redis/redis.go | 2 +- .../config/metadata_config.go | 6 + .../internal/api/bkdata/bkdata.go | 64 ++++++++ .../internal/metadata/apiservice/bkdata.go | 142 ++++++++++++++++++ .../metadata/service/timeseriesgroup.go | 25 ++- .../metadata/service/timeseriesgroup_test.go | 4 +- .../internal/metadata/task/customreport.go | 53 ++++++- .../service/scheduler/periodic/periodic.go | 3 +- 8 files changed, 289 insertions(+), 10 deletions(-) diff --git a/pkg/bk-monitor-worker/broker/redis/redis.go b/pkg/bk-monitor-worker/broker/redis/redis.go index 9c5ce4590..cd3519857 100644 --- a/pkg/bk-monitor-worker/broker/redis/redis.go +++ b/pkg/bk-monitor-worker/broker/redis/redis.go @@ -781,7 +781,7 @@ redis.call("ZREM", KEYS[3], ARGV[1]) redis.call("ZADD", KEYS[4], ARGV[3], ARGV[1]) redis.call("ZREMRANGEBYSCORE", KEYS[4], "-inf", ARGV[4]) redis.call("ZREMRANGEBYRANK", KEYS[4], 0, -ARGV[5]) -redis.call("HSET", KEYS[1], "msg", ARGV[2], "state", "archived") +redis.call("DEL", KEYS[1]) local n = redis.call("INCR", KEYS[5]) if tonumber(n) == 1 then redis.call("EXPIREAT", KEYS[5], ARGV[6]) diff --git a/pkg/bk-monitor-worker/config/metadata_config.go b/pkg/bk-monitor-worker/config/metadata_config.go index 7fc8a8bb2..1a105c6b5 100644 --- a/pkg/bk-monitor-worker/config/metadata_config.go +++ b/pkg/bk-monitor-worker/config/metadata_config.go @@ -22,6 +22,10 @@ var ( MetadataMetricDimensionKeyPrefix string // MetadataMetricDimensionMaxMetricFetchStep config of metadata.refreshMetric task MetadataMetricDimensionMaxMetricFetchStep int + // MetadataMetricDimensionByBkData refresh metric dimension by bkdata + MetadataMetricDimensionByBkData bool + // MetadataTableIdListForBkDataTsMetrics refresh metadata table_id dimension by bkdata + BkDataTableIdListRedisPath string // BcsEnableBcsGray 是否启用BCS集群灰度模式 BcsEnableBcsGray bool @@ -119,6 +123,8 @@ func initMetadataVariables() { MetadataMetricDimensionMetricKeyPrefix = GetValue("taskConfig.metadata.metricDimension.metricKeyPrefix", "bkmonitor:metrics_") MetadataMetricDimensionKeyPrefix = GetValue("taskConfig.metadata.metricDimension.metricDimensionKeyPrefix", "bkmonitor:metric_dimensions_") MetadataMetricDimensionMaxMetricFetchStep = GetValue("taskConfig.metadata.metricDimension.maxMetricsFetchStep", 500) + MetadataMetricDimensionByBkData = GetValue("taskConfig.metadata.metricDimension.metadataMetricDimensionByBkData", false) + BkDataTableIdListRedisPath = GetValue("taskConfig.metadata.metricDimension.BkDataTableIdListRedisPath", "metadata:query_metric:table_id_list") BcsEnableBcsGray = GetValue("taskConfig.metadata.bcs.enableBcsGray", false) BcsGrayClusterIdList = GetValue("taskConfig.metadata.bcs.grayClusterIdList", []string{}) diff --git a/pkg/bk-monitor-worker/internal/api/bkdata/bkdata.go b/pkg/bk-monitor-worker/internal/api/bkdata/bkdata.go index 63a800e36..e3451dbfa 100644 --- a/pkg/bk-monitor-worker/internal/api/bkdata/bkdata.go +++ b/pkg/bk-monitor-worker/internal/api/bkdata/bkdata.go @@ -405,3 +405,67 @@ func (c *Client) RestartDataFlow(opts ...define.OperationOption) define.Operatio Path: path, }, opts...) } + +// QueryMetrics for bkdata resource query_metrics +func (c *Client) QueryMetrics(opts ...define.OperationOption) define.Operation { + /* + @params + storage | string | 存储类型 | required + result_table_id | string | 计算平台结果表 | required + */ + path := "/v3/dd/metrics/" + return c.BkApiClient.NewOperation(bkapi.OperationConfig{ + Name: "get_dd_metrics", + Method: "GET", + Path: path, + }, opts...) +} + +// QueryDimension for bkdata resource query_dimension +func (c *Client) QueryDimension(opts ...define.OperationOption) define.Operation { + /* + @params + storage | string | 存储类型 | required + result_table_id | string | 计算平台结果表| + metric| string | 指标名称 | required + */ + path := "/v3/dd/dimensions/" + return c.BkApiClient.NewOperation(bkapi.OperationConfig{ + Name: "get_dd_dimensions", + Method: "GET", + Path: path, + }, opts...) +} + +// QueryDimensionValue for bkdata resource query +func (c *Client) QueryDimensionValue(opts ...define.OperationOption) define.Operation { + /* + @params + storage | string | 存储类型 | required + result_table_id | string | 计算平台结果表| + metric| string | 指标名称 | required + dimension| string | 维度名称 | required + */ + path := "/v3/dd/values/" + return c.BkApiClient.NewOperation(bkapi.OperationConfig{ + Name: "get_dd_values", + Method: "GET", + Path: path, + }, opts...) +} + +// QueryMetricAndDimension for bkdata resource query_metric_and_dimension +func (c *Client) QueryMetricAndDimension(opts ...define.OperationOption) define.Operation { + /* + @params + storage | string | 存储类型 | required + result_table_id | string | 计算平台结果表|required| + no_value| bool | 是否返回值 | required| + */ + path := "/v4/dd/" + return c.BkApiClient.NewOperation(bkapi.OperationConfig{ + Name: "get_dd_metrics_and_dimensions", + Method: "GET", + Path: path, + }, opts...) +} diff --git a/pkg/bk-monitor-worker/internal/metadata/apiservice/bkdata.go b/pkg/bk-monitor-worker/internal/metadata/apiservice/bkdata.go index 6054eb82a..029aa4cdd 100644 --- a/pkg/bk-monitor-worker/internal/metadata/apiservice/bkdata.go +++ b/pkg/bk-monitor-worker/internal/metadata/apiservice/bkdata.go @@ -18,6 +18,7 @@ import ( "github.com/TencentBlueKing/bkmonitor-datalink/pkg/bk-monitor-worker/internal/api" "github.com/TencentBlueKing/bkmonitor-datalink/pkg/bk-monitor-worker/internal/api/bkdata" "github.com/TencentBlueKing/bkmonitor-datalink/pkg/bk-monitor-worker/utils/jsonx" + "github.com/TencentBlueKing/bkmonitor-datalink/pkg/utils/logger" ) var Bkdata BkdataService @@ -393,3 +394,144 @@ func (s BkdataService) RestartDataFlow(flowId int, consumingMode, clusterGroup s } return resp.Data, nil } + +// QueryMetrics 查询指标数据 +func (s BkdataService) QueryMetrics(storage string, rt string) (*map[string]float64, error) { + bkdataApi, err := api.GetBkdataApi() + if err != nil { + return nil, errors.Wrap(err, "get bkdata api failed") + } + + var resp bkdata.CommonListResp + if _, err = bkdataApi.QueryMetrics().SetQueryParams(map[string]string{"storage": storage, "result_table_id": rt}).SetResult(&resp).Request(); err != nil { + return nil, errors.Wrapf(err, "query metrics error by bkdata: %s, table_id: %s", storage, rt) + } + if err := resp.Err(); err != nil { + return nil, errors.Wrapf(err, "query metrics error by bkdata: %s, table_id: %s", storage, rt) + } + // parse metrics + metrics := make(map[string]float64) + for _, data := range resp.Data { + metricInfo, ok := data.([]interface{}) + if !ok { + logger.Errorf("parse metrics data error, metric_info: %v", metricInfo) + continue + } + metric, ok := metricInfo[0].(string) + if !ok { + logger.Errorf("parse metrics data error, metric: %v", metric) + continue + } + // NOTE: 如果时间戳不符合预期,则忽略该指标 + timestamp, ok := metricInfo[1].(float64) + if !ok { + logger.Errorf("parse metrics data error, timestamp: %v", timestamp) + continue + } + metrics[metric] = timestamp + } + return &metrics, nil +} + +// QueryDimension 查询维度数据 +func (s BkdataService) QueryDimension(storage string, rt string, metric string) (*[]map[string]interface{}, error) { + bkdataApi, err := api.GetBkdataApi() + if err != nil { + return nil, errors.Wrap(err, "get bkdata api failed") + } + + var resp bkdata.CommonListResp + if _, err = bkdataApi.QueryDimension().SetQueryParams(map[string]string{"storage": storage, "result_table_id": rt, "metric": metric}).SetResult(&resp).Request(); err != nil { + return nil, errors.Wrapf(err, "query dimension error by bkdata: %s, table_id: %s", storage, rt) + } + if err := resp.Err(); err != nil { + return nil, errors.Wrapf(err, "query dimension error by bkdata: %s, table_id: %s", storage, rt) + } + // parse dimension + var dimensions []map[string]interface{} + for _, data := range resp.Data { + dimensionInfo, ok := data.([]interface{}) + if !ok { + logger.Errorf("parse dimension data error, dimension_info: %v", dimensionInfo) + continue + } + dimension, ok := dimensionInfo[0].(string) + if !ok { + logger.Errorf("parse dimension data error, dimension: %v", dimension) + continue + } + // NOTE: 如果时间戳不符合预期,则忽略该指标 + timestamp, ok := dimensionInfo[1].(float64) + if !ok { + logger.Errorf("parse dimension data error, timestamp: %v", timestamp) + continue + } + dimensions = append(dimensions, map[string]interface{}{dimension: map[string]interface{}{"last_update_time": timestamp}}) + } + return &dimensions, nil +} + +// QueryMetricAndDimension 查询指标和维度数据 +func (s BkdataService) QueryMetricAndDimension(storage string, rt string) ([]map[string]interface{}, error) { + bkdataApi, err := api.GetBkdataApi() + if err != nil { + return nil, errors.Wrap(err, "get bkdata api failed") + } + var resp bkdata.CommonMapResp + // NOTE: 设置no_value=true,不需要返回维度对应的 value + params := map[string]string{"storage": storage, "result_table_id": rt, "no_value": "true"} + if _, err = bkdataApi.QueryMetricAndDimension().SetQueryParams(params).SetResult(&resp).Request(); err != nil { + return nil, errors.Wrapf(err, "query metrics and dimension error by bkdata: %s, table_id: %s", storage, rt) + } + if err := resp.Err(); err != nil { + return nil, errors.Wrapf(err, "query metrics and dimension error by bkdata: %s, table_id: %s", storage, rt) + } + + metrics := resp.Data["metrics"] + metricInfo, ok := metrics.([]interface{}) + if !ok || len(metricInfo) == 0 { + logger.Errorf("query bkdata metrics error, params: %v, metrics: %v", params, metricInfo) + return nil, errors.New("query metrics error, no data") + } + + // parse metrics and dimensions + var MetricsDimension []map[string]interface{} + for _, dataInfo := range metricInfo { + data, ok := dataInfo.(map[string]interface{}) + if !ok { + logger.Errorf("metric data not map[string]interface{}, data: %v", params, metricInfo) + continue + } + lastModifyTime := data["update_time"].(float64) + dimensions := data["dimensions"].([]interface{}) + tagValueList := make(map[string]interface{}) + for _, dimInfo := range dimensions { + dim, ok := dimInfo.(map[string]interface{}) + if !ok { + logger.Errorf("dimension data not map[string]interface{}, dimInfo: %v", dimInfo) + continue + } + // 判断值为 string + tag_name, ok := dim["name"].(string) + if !ok { + logger.Errorf("dimension: %s is not string", dim["name"]) + continue + } + // 判断值为 float64 + tagUpdateTime, ok := dim["update_time"].(float64) + if !ok { + logger.Errorf("dimension: %s is not string", dim["name"]) + continue + } + tagValueList[tag_name] = map[string]interface{}{"last_update_time": tagUpdateTime / 1000} + } + + item := map[string]interface{}{ + "field_name": data["name"], + "last_modify_time": lastModifyTime / 1000, + "tag_value_list": tagValueList, + } + MetricsDimension = append(MetricsDimension, item) + } + return MetricsDimension, nil +} diff --git a/pkg/bk-monitor-worker/internal/metadata/service/timeseriesgroup.go b/pkg/bk-monitor-worker/internal/metadata/service/timeseriesgroup.go index 347a8688c..a4b9685f7 100644 --- a/pkg/bk-monitor-worker/internal/metadata/service/timeseriesgroup.go +++ b/pkg/bk-monitor-worker/internal/metadata/service/timeseriesgroup.go @@ -19,6 +19,7 @@ import ( "github.com/pkg/errors" cfg "github.com/TencentBlueKing/bkmonitor-datalink/pkg/bk-monitor-worker/config" + "github.com/TencentBlueKing/bkmonitor-datalink/pkg/bk-monitor-worker/internal/metadata/apiservice" "github.com/TencentBlueKing/bkmonitor-datalink/pkg/bk-monitor-worker/internal/metadata/models" "github.com/TencentBlueKing/bkmonitor-datalink/pkg/bk-monitor-worker/internal/metadata/models/customreport" "github.com/TencentBlueKing/bkmonitor-datalink/pkg/bk-monitor-worker/internal/metadata/models/resulttable" @@ -55,7 +56,16 @@ func NewTimeSeriesGroupSvc(obj *customreport.TimeSeriesGroup) TimeSeriesGroupSvc } // UpdateTimeSeriesMetrics 从远端存储中同步TS的指标和维度对应关系 -func (s *TimeSeriesGroupSvc) UpdateTimeSeriesMetrics() (bool, error) { +func (s *TimeSeriesGroupSvc) UpdateTimeSeriesMetrics(vmRt string, isInRtList bool) (bool, error) { + // 如果在白名单中,则通过计算平台获取指标数据 + if isInRtList { + // 获取 vm rt及metric + vmMetrics, err := s.QueryMetricAndDimension(vmRt) + if err != nil { + return false, err + } + return s.UpdateMetrics(*vmMetrics) + } // 获取 redis 中数据,用于后续指标及tag的更新 metricInfo, err := s.GetRedisData(cfg.GlobalFetchTimeSeriesMetricIntervalSeconds) if err != nil { @@ -68,6 +78,19 @@ func (s *TimeSeriesGroupSvc) UpdateTimeSeriesMetrics() (bool, error) { return s.UpdateMetrics(metricInfo) } +// RefreshMetric 更新指标 +func (s *TimeSeriesGroupSvc) QueryMetricAndDimension(vmRt string) (vmRtMetrics *[]map[string]interface{}, err error) { + // NOTE: 现阶段仅支持 vm 存储 + vmStorage := "vm" + + metricAndDimension, err := apiservice.Bkdata.QueryMetricAndDimension(vmStorage, vmRt) + if err != nil { + return nil, err + } + + return &metricAndDimension, nil +} + // GetRedisData get data from redis func (s *TimeSeriesGroupSvc) GetRedisData(expiredTime int) ([]map[string]interface{}, error) { /* diff --git a/pkg/bk-monitor-worker/internal/metadata/service/timeseriesgroup_test.go b/pkg/bk-monitor-worker/internal/metadata/service/timeseriesgroup_test.go index d3be303f1..b0a652afe 100644 --- a/pkg/bk-monitor-worker/internal/metadata/service/timeseriesgroup_test.go +++ b/pkg/bk-monitor-worker/internal/metadata/service/timeseriesgroup_test.go @@ -59,7 +59,7 @@ func TestTimeSeriesGroupSvc_UpdateTimeSeriesMetrics(t *testing.T) { svc := NewTimeSeriesGroupSvc(&tsm) // 测试新增 - updated, err := svc.UpdateTimeSeriesMetrics() + updated, err := svc.UpdateTimeSeriesMetrics("", false) assert.NoError(t, err) assert.True(t, updated) // metric @@ -109,7 +109,7 @@ func TestTimeSeriesGroupSvc_UpdateTimeSeriesMetrics(t *testing.T) { assert.NoError(t, err) // 测试修改 - updated, err = svc.UpdateTimeSeriesMetrics() + updated, err = svc.UpdateTimeSeriesMetrics("", false) assert.NoError(t, err) assert.True(t, updated) diff --git a/pkg/bk-monitor-worker/internal/metadata/task/customreport.go b/pkg/bk-monitor-worker/internal/metadata/task/customreport.go index 43d93e035..8754e7136 100644 --- a/pkg/bk-monitor-worker/internal/metadata/task/customreport.go +++ b/pkg/bk-monitor-worker/internal/metadata/task/customreport.go @@ -15,10 +15,15 @@ import ( "github.com/pkg/errors" + "github.com/TencentBlueKing/bkmonitor-datalink/pkg/bk-monitor-worker/config" "github.com/TencentBlueKing/bkmonitor-datalink/pkg/bk-monitor-worker/internal/metadata/models/customreport" + "github.com/TencentBlueKing/bkmonitor-datalink/pkg/bk-monitor-worker/internal/metadata/models/storage" "github.com/TencentBlueKing/bkmonitor-datalink/pkg/bk-monitor-worker/internal/metadata/service" "github.com/TencentBlueKing/bkmonitor-datalink/pkg/bk-monitor-worker/store/mysql" + "github.com/TencentBlueKing/bkmonitor-datalink/pkg/bk-monitor-worker/store/redis" t "github.com/TencentBlueKing/bkmonitor-datalink/pkg/bk-monitor-worker/task" + "github.com/TencentBlueKing/bkmonitor-datalink/pkg/bk-monitor-worker/utils/jsonx" + "github.com/TencentBlueKing/bkmonitor-datalink/pkg/bk-monitor-worker/utils/slicex" "github.com/TencentBlueKing/bkmonitor-datalink/pkg/utils/logger" ) @@ -29,11 +34,39 @@ func RefreshTimeSeriesMetric(ctx context.Context, t *t.Task) error { logger.Errorf("RefreshTimeSeriesMetric Runtime panic caught: %v", err) } }() + logger.Info("start to refresh time series metric") db := mysql.GetDBSession().DB var tsGroupList []customreport.TimeSeriesGroup if err := customreport.NewTimeSeriesGroupQuerySet(db).IsEnableEq(true).IsDeleteEq(false).All(&tsGroupList); err != nil { return errors.Wrap(err, "find ts group record error") } + + // 获取结果表对应的计算平台结果表 + var tableIdList []string + for _, tg := range tsGroupList { + tableIdList = append(tableIdList, tg.TableID) + } + rtMapVmRt := make(map[string]string) + for _, chunkDataLabels := range slicex.ChunkSlice(tableIdList, 0) { + var tempList []storage.AccessVMRecord + if err := storage.NewAccessVMRecordQuerySet(db).Select(storage.AccessVMRecordDBSchema.ResultTableId, storage.AccessVMRecordDBSchema.VmResultTableId).ResultTableIdIn(chunkDataLabels...).All(&tempList); err != nil { + logger.Errorf("get vm table id by monitor table id error, %s", err) + continue + } + for _, rtInfo := range tempList { + rtMapVmRt[rtInfo.ResultTableId] = rtInfo.VmResultTableId + } + } + + // 获取redis中数据 + client := redis.GetStorageRedisInstance() + wlTableIdList := make([]string, 0) + if wlTableIdByte, err := client.Get(config.BkDataTableIdListRedisPath); err == nil && wlTableIdByte != nil { + if err := jsonx.Unmarshal(wlTableIdByte, &wlTableIdList); err != nil { + logger.Errorf("get white list table id from redis failed, %v", err) + } + } + // 收集需要更新推送redis的table_id tableIdChan := make(chan string, GetGoroutineLimit("refresh_time_series_metric")) var updatedTableIds []string @@ -53,36 +86,46 @@ func RefreshTimeSeriesMetric(ctx context.Context, t *t.Task) error { wg := sync.WaitGroup{} wg.Add(len(tsGroupList)) for _, eg := range tsGroupList { + vmRt, ok := rtMapVmRt[eg.TableID] + if !ok { + logger.Errorf("can not find vm result table id by monitor table id: %s", eg.TableID) + wg.Done() + continue + } ch <- true - go func(ts customreport.TimeSeriesGroup, tableIdChan chan string, wg *sync.WaitGroup, ch chan bool) { + // 判断是否在白名单中 + isInRtList := slicex.IsExistItem(wlTableIdList, eg.TableID) + go func(ts customreport.TimeSeriesGroup, tableIdChan chan string, wg *sync.WaitGroup, ch chan bool, vmRt string, isInRtList bool) { defer func() { <-ch wg.Done() }() svc := service.NewTimeSeriesGroupSvc(&ts) - updated, err := svc.UpdateTimeSeriesMetrics() + updated, err := svc.UpdateTimeSeriesMetrics(vmRt, isInRtList) if err != nil { - logger.Errorf("time_series_group: [%s] try to update metrics from redis failed, %v", ts.TableID, err) + logger.Errorf("time_series_group: [%s] try to update metrics from bkdata or redis failed, %v", ts.TableID, err) return } - logger.Infof("time_series_group: [%s] metric update from redis success", ts.TableID) + logger.Infof("time_series_group: [%s] metric update from bkdata or redis success, updated: %v", ts.TableID, updated) if updated { tableIdChan <- svc.TableID } - }(eg, tableIdChan, &wg, ch) + }(eg, tableIdChan, &wg, ch, vmRt, isInRtList) } wg.Wait() close(tableIdChan) // 防止数据没有读完 wgReceive.Wait() if len(updatedTableIds) != 0 { + logger.Info("start to push table id to redis") pusher := service.NewSpacePusher() if err := pusher.PushTableIdDetail(updatedTableIds, true); err != nil { return errors.Wrapf(err, "metric update to push table id detaild for [%v] failed", updatedTableIds) } logger.Infof("metric updated of table_id [%v]", updatedTableIds) } + logger.Info("refresh time series metric success") return nil } diff --git a/pkg/bk-monitor-worker/service/scheduler/periodic/periodic.go b/pkg/bk-monitor-worker/service/scheduler/periodic/periodic.go index fe85a7663..585b4dac7 100644 --- a/pkg/bk-monitor-worker/service/scheduler/periodic/periodic.go +++ b/pkg/bk-monitor-worker/service/scheduler/periodic/periodic.go @@ -64,8 +64,9 @@ func getPeriodicTasks() map[string]PeriodicTask { return map[string]PeriodicTask{ refreshTsMetric: { - Cron: "*/3 * * * *", + Cron: "*/5 * * * *", Handler: metadataTask.RefreshTimeSeriesMetric, + Option: []task.Option{task.Timeout(600 * time.Second)}, }, refreshEventDimension: { Cron: "*/3 * * * *",