Skip to content

Commit

Permalink
feat: 调整查询计算平台指标发现功能 (#369)
Browse files Browse the repository at this point in the history
  • Loading branch information
bellke authored Jun 12, 2024
1 parent 1c8311d commit 43c57ba
Show file tree
Hide file tree
Showing 8 changed files with 289 additions and 10 deletions.
2 changes: 1 addition & 1 deletion pkg/bk-monitor-worker/broker/redis/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -781,7 +781,7 @@ redis.call("ZREM", KEYS[3], ARGV[1])
redis.call("ZADD", KEYS[4], ARGV[3], ARGV[1])
redis.call("ZREMRANGEBYSCORE", KEYS[4], "-inf", ARGV[4])
redis.call("ZREMRANGEBYRANK", KEYS[4], 0, -ARGV[5])
redis.call("HSET", KEYS[1], "msg", ARGV[2], "state", "archived")
redis.call("DEL", KEYS[1])
local n = redis.call("INCR", KEYS[5])
if tonumber(n) == 1 then
redis.call("EXPIREAT", KEYS[5], ARGV[6])
Expand Down
6 changes: 6 additions & 0 deletions pkg/bk-monitor-worker/config/metadata_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ var (
MetadataMetricDimensionKeyPrefix string
// MetadataMetricDimensionMaxMetricFetchStep config of metadata.refreshMetric task
MetadataMetricDimensionMaxMetricFetchStep int
// MetadataMetricDimensionByBkData refresh metric dimension by bkdata
MetadataMetricDimensionByBkData bool
// MetadataTableIdListForBkDataTsMetrics refresh metadata table_id dimension by bkdata
BkDataTableIdListRedisPath string

// BcsEnableBcsGray 是否启用BCS集群灰度模式
BcsEnableBcsGray bool
Expand Down Expand Up @@ -119,6 +123,8 @@ func initMetadataVariables() {
MetadataMetricDimensionMetricKeyPrefix = GetValue("taskConfig.metadata.metricDimension.metricKeyPrefix", "bkmonitor:metrics_")
MetadataMetricDimensionKeyPrefix = GetValue("taskConfig.metadata.metricDimension.metricDimensionKeyPrefix", "bkmonitor:metric_dimensions_")
MetadataMetricDimensionMaxMetricFetchStep = GetValue("taskConfig.metadata.metricDimension.maxMetricsFetchStep", 500)
MetadataMetricDimensionByBkData = GetValue("taskConfig.metadata.metricDimension.metadataMetricDimensionByBkData", false)
BkDataTableIdListRedisPath = GetValue("taskConfig.metadata.metricDimension.BkDataTableIdListRedisPath", "metadata:query_metric:table_id_list")

BcsEnableBcsGray = GetValue("taskConfig.metadata.bcs.enableBcsGray", false)
BcsGrayClusterIdList = GetValue("taskConfig.metadata.bcs.grayClusterIdList", []string{})
Expand Down
64 changes: 64 additions & 0 deletions pkg/bk-monitor-worker/internal/api/bkdata/bkdata.go
Original file line number Diff line number Diff line change
Expand Up @@ -405,3 +405,67 @@ func (c *Client) RestartDataFlow(opts ...define.OperationOption) define.Operatio
Path: path,
}, opts...)
}

// QueryMetrics for bkdata resource query_metrics
func (c *Client) QueryMetrics(opts ...define.OperationOption) define.Operation {
/*
@params
storage | string | 存储类型 | required
result_table_id | string | 计算平台结果表 | required
*/
path := "/v3/dd/metrics/"
return c.BkApiClient.NewOperation(bkapi.OperationConfig{
Name: "get_dd_metrics",
Method: "GET",
Path: path,
}, opts...)
}

// QueryDimension for bkdata resource query_dimension
func (c *Client) QueryDimension(opts ...define.OperationOption) define.Operation {
/*
@params
storage | string | 存储类型 | required
result_table_id | string | 计算平台结果表|
metric| string | 指标名称 | required
*/
path := "/v3/dd/dimensions/"
return c.BkApiClient.NewOperation(bkapi.OperationConfig{
Name: "get_dd_dimensions",
Method: "GET",
Path: path,
}, opts...)
}

// QueryDimensionValue for bkdata resource query
func (c *Client) QueryDimensionValue(opts ...define.OperationOption) define.Operation {
/*
@params
storage | string | 存储类型 | required
result_table_id | string | 计算平台结果表|
metric| string | 指标名称 | required
dimension| string | 维度名称 | required
*/
path := "/v3/dd/values/"
return c.BkApiClient.NewOperation(bkapi.OperationConfig{
Name: "get_dd_values",
Method: "GET",
Path: path,
}, opts...)
}

// QueryMetricAndDimension for bkdata resource query_metric_and_dimension
func (c *Client) QueryMetricAndDimension(opts ...define.OperationOption) define.Operation {
/*
@params
storage | string | 存储类型 | required
result_table_id | string | 计算平台结果表|required|
no_value| bool | 是否返回值 | required|
*/
path := "/v4/dd/"
return c.BkApiClient.NewOperation(bkapi.OperationConfig{
Name: "get_dd_metrics_and_dimensions",
Method: "GET",
Path: path,
}, opts...)
}
142 changes: 142 additions & 0 deletions pkg/bk-monitor-worker/internal/metadata/apiservice/bkdata.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/TencentBlueKing/bkmonitor-datalink/pkg/bk-monitor-worker/internal/api"
"github.com/TencentBlueKing/bkmonitor-datalink/pkg/bk-monitor-worker/internal/api/bkdata"
"github.com/TencentBlueKing/bkmonitor-datalink/pkg/bk-monitor-worker/utils/jsonx"
"github.com/TencentBlueKing/bkmonitor-datalink/pkg/utils/logger"
)

var Bkdata BkdataService
Expand Down Expand Up @@ -393,3 +394,144 @@ func (s BkdataService) RestartDataFlow(flowId int, consumingMode, clusterGroup s
}
return resp.Data, nil
}

// QueryMetrics 查询指标数据
func (s BkdataService) QueryMetrics(storage string, rt string) (*map[string]float64, error) {
bkdataApi, err := api.GetBkdataApi()
if err != nil {
return nil, errors.Wrap(err, "get bkdata api failed")
}

var resp bkdata.CommonListResp
if _, err = bkdataApi.QueryMetrics().SetQueryParams(map[string]string{"storage": storage, "result_table_id": rt}).SetResult(&resp).Request(); err != nil {
return nil, errors.Wrapf(err, "query metrics error by bkdata: %s, table_id: %s", storage, rt)
}
if err := resp.Err(); err != nil {
return nil, errors.Wrapf(err, "query metrics error by bkdata: %s, table_id: %s", storage, rt)
}
// parse metrics
metrics := make(map[string]float64)
for _, data := range resp.Data {
metricInfo, ok := data.([]interface{})
if !ok {
logger.Errorf("parse metrics data error, metric_info: %v", metricInfo)
continue
}
metric, ok := metricInfo[0].(string)
if !ok {
logger.Errorf("parse metrics data error, metric: %v", metric)
continue
}
// NOTE: 如果时间戳不符合预期,则忽略该指标
timestamp, ok := metricInfo[1].(float64)
if !ok {
logger.Errorf("parse metrics data error, timestamp: %v", timestamp)
continue
}
metrics[metric] = timestamp
}
return &metrics, nil
}

// QueryDimension 查询维度数据
func (s BkdataService) QueryDimension(storage string, rt string, metric string) (*[]map[string]interface{}, error) {
bkdataApi, err := api.GetBkdataApi()
if err != nil {
return nil, errors.Wrap(err, "get bkdata api failed")
}

var resp bkdata.CommonListResp
if _, err = bkdataApi.QueryDimension().SetQueryParams(map[string]string{"storage": storage, "result_table_id": rt, "metric": metric}).SetResult(&resp).Request(); err != nil {
return nil, errors.Wrapf(err, "query dimension error by bkdata: %s, table_id: %s", storage, rt)
}
if err := resp.Err(); err != nil {
return nil, errors.Wrapf(err, "query dimension error by bkdata: %s, table_id: %s", storage, rt)
}
// parse dimension
var dimensions []map[string]interface{}
for _, data := range resp.Data {
dimensionInfo, ok := data.([]interface{})
if !ok {
logger.Errorf("parse dimension data error, dimension_info: %v", dimensionInfo)
continue
}
dimension, ok := dimensionInfo[0].(string)
if !ok {
logger.Errorf("parse dimension data error, dimension: %v", dimension)
continue
}
// NOTE: 如果时间戳不符合预期,则忽略该指标
timestamp, ok := dimensionInfo[1].(float64)
if !ok {
logger.Errorf("parse dimension data error, timestamp: %v", timestamp)
continue
}
dimensions = append(dimensions, map[string]interface{}{dimension: map[string]interface{}{"last_update_time": timestamp}})
}
return &dimensions, nil
}

// QueryMetricAndDimension 查询指标和维度数据
func (s BkdataService) QueryMetricAndDimension(storage string, rt string) ([]map[string]interface{}, error) {
bkdataApi, err := api.GetBkdataApi()
if err != nil {
return nil, errors.Wrap(err, "get bkdata api failed")
}
var resp bkdata.CommonMapResp
// NOTE: 设置no_value=true,不需要返回维度对应的 value
params := map[string]string{"storage": storage, "result_table_id": rt, "no_value": "true"}
if _, err = bkdataApi.QueryMetricAndDimension().SetQueryParams(params).SetResult(&resp).Request(); err != nil {
return nil, errors.Wrapf(err, "query metrics and dimension error by bkdata: %s, table_id: %s", storage, rt)
}
if err := resp.Err(); err != nil {
return nil, errors.Wrapf(err, "query metrics and dimension error by bkdata: %s, table_id: %s", storage, rt)
}

metrics := resp.Data["metrics"]
metricInfo, ok := metrics.([]interface{})
if !ok || len(metricInfo) == 0 {
logger.Errorf("query bkdata metrics error, params: %v, metrics: %v", params, metricInfo)
return nil, errors.New("query metrics error, no data")
}

// parse metrics and dimensions
var MetricsDimension []map[string]interface{}
for _, dataInfo := range metricInfo {
data, ok := dataInfo.(map[string]interface{})
if !ok {
logger.Errorf("metric data not map[string]interface{}, data: %v", params, metricInfo)
continue
}
lastModifyTime := data["update_time"].(float64)
dimensions := data["dimensions"].([]interface{})
tagValueList := make(map[string]interface{})
for _, dimInfo := range dimensions {
dim, ok := dimInfo.(map[string]interface{})
if !ok {
logger.Errorf("dimension data not map[string]interface{}, dimInfo: %v", dimInfo)
continue
}
// 判断值为 string
tag_name, ok := dim["name"].(string)
if !ok {
logger.Errorf("dimension: %s is not string", dim["name"])
continue
}
// 判断值为 float64
tagUpdateTime, ok := dim["update_time"].(float64)
if !ok {
logger.Errorf("dimension: %s is not string", dim["name"])
continue
}
tagValueList[tag_name] = map[string]interface{}{"last_update_time": tagUpdateTime / 1000}
}

item := map[string]interface{}{
"field_name": data["name"],
"last_modify_time": lastModifyTime / 1000,
"tag_value_list": tagValueList,
}
MetricsDimension = append(MetricsDimension, item)
}
return MetricsDimension, nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/pkg/errors"

cfg "github.com/TencentBlueKing/bkmonitor-datalink/pkg/bk-monitor-worker/config"
"github.com/TencentBlueKing/bkmonitor-datalink/pkg/bk-monitor-worker/internal/metadata/apiservice"
"github.com/TencentBlueKing/bkmonitor-datalink/pkg/bk-monitor-worker/internal/metadata/models"
"github.com/TencentBlueKing/bkmonitor-datalink/pkg/bk-monitor-worker/internal/metadata/models/customreport"
"github.com/TencentBlueKing/bkmonitor-datalink/pkg/bk-monitor-worker/internal/metadata/models/resulttable"
Expand Down Expand Up @@ -55,7 +56,16 @@ func NewTimeSeriesGroupSvc(obj *customreport.TimeSeriesGroup) TimeSeriesGroupSvc
}

// UpdateTimeSeriesMetrics 从远端存储中同步TS的指标和维度对应关系
func (s *TimeSeriesGroupSvc) UpdateTimeSeriesMetrics() (bool, error) {
func (s *TimeSeriesGroupSvc) UpdateTimeSeriesMetrics(vmRt string, isInRtList bool) (bool, error) {
// 如果在白名单中,则通过计算平台获取指标数据
if isInRtList {
// 获取 vm rt及metric
vmMetrics, err := s.QueryMetricAndDimension(vmRt)
if err != nil {
return false, err
}
return s.UpdateMetrics(*vmMetrics)
}
// 获取 redis 中数据,用于后续指标及tag的更新
metricInfo, err := s.GetRedisData(cfg.GlobalFetchTimeSeriesMetricIntervalSeconds)
if err != nil {
Expand All @@ -68,6 +78,19 @@ func (s *TimeSeriesGroupSvc) UpdateTimeSeriesMetrics() (bool, error) {
return s.UpdateMetrics(metricInfo)
}

// RefreshMetric 更新指标
func (s *TimeSeriesGroupSvc) QueryMetricAndDimension(vmRt string) (vmRtMetrics *[]map[string]interface{}, err error) {
// NOTE: 现阶段仅支持 vm 存储
vmStorage := "vm"

metricAndDimension, err := apiservice.Bkdata.QueryMetricAndDimension(vmStorage, vmRt)
if err != nil {
return nil, err
}

return &metricAndDimension, nil
}

// GetRedisData get data from redis
func (s *TimeSeriesGroupSvc) GetRedisData(expiredTime int) ([]map[string]interface{}, error) {
/*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func TestTimeSeriesGroupSvc_UpdateTimeSeriesMetrics(t *testing.T) {

svc := NewTimeSeriesGroupSvc(&tsm)
// 测试新增
updated, err := svc.UpdateTimeSeriesMetrics()
updated, err := svc.UpdateTimeSeriesMetrics("", false)
assert.NoError(t, err)
assert.True(t, updated)
// metric
Expand Down Expand Up @@ -109,7 +109,7 @@ func TestTimeSeriesGroupSvc_UpdateTimeSeriesMetrics(t *testing.T) {
assert.NoError(t, err)

// 测试修改
updated, err = svc.UpdateTimeSeriesMetrics()
updated, err = svc.UpdateTimeSeriesMetrics("", false)
assert.NoError(t, err)
assert.True(t, updated)

Expand Down
Loading

0 comments on commit 43c57ba

Please sign in to comment.