Skip to content

Commit

Permalink
feat: BMW: 空间路由任务,空间路由链路结果表过滤支持ES类型 --story=121683817 (#686)
Browse files Browse the repository at this point in the history
  • Loading branch information
EASYGOING45 authored Jan 15, 2025
1 parent c0a6c48 commit d9c762b
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 6 deletions.
32 changes: 28 additions & 4 deletions pkg/bk-monitor-worker/internal/metadata/service/spaceredis.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ func (s *SpacePusher) getAllDataLabelTableId() (map[string][]string, error) {
return dataLabelTableIdMap, nil
}

// 提取写入到influxdb或vm的结果表数据
// 提取具备VM、ES、InfluxDB链路的结果表
func (s *SpacePusher) refineTableIds(tableIdList []string) ([]string, error) {
db := mysql.GetDBSession().DB
// 过滤写入 influxdb 的结果表
Expand Down Expand Up @@ -382,13 +382,38 @@ func (s *SpacePusher) refineTableIds(tableIdList []string) ([]string, error) {
return nil, err
}
}

// 过滤写入 ES 的结果表
var esStorageList []storage.ESStorage
qs3 := storage.NewESStorageQuerySet(db).Select(storage.ESStorageDBSchema.TableID)
if len(tableIdList) != 0 {
for _, chunkTableIdList := range slicex.ChunkSlice(tableIdList, 0) {
var tempList []storage.ESStorage
qsTemp := qs3.TableIDIn(chunkTableIdList...)
if err := qsTemp.All(&tempList); err != nil {
return nil, err
}
esStorageList = append(esStorageList, tempList...)
}
} else {
if err := qs3.All(&esStorageList); err != nil {
return nil, err
}
}

// 合并所有表 ID
var tableIds []string
for _, i := range influxdbStorageList {
tableIds = append(tableIds, i.TableID)
}
for _, i := range vmRecordList {
tableIds = append(tableIds, i.ResultTableId)
}
for _, i := range esStorageList {
tableIds = append(tableIds, i.TableID)
}

// 去重
tableIds = slicex.RemoveDuplicate(&tableIds)
return tableIds, nil
}
Expand Down Expand Up @@ -1536,7 +1561,6 @@ func (s *SpacePusher) composeData(spaceType, spaceId string, tableIdList []strin
ops.Set("fromAuthorization", need)
}
tableIdDataId, err := s.GetSpaceTableIdDataId(spaceType, spaceId, tableIdList, nil, ops)
logger.Infof("composeData: GetSpaceTableIdDataId success,space_type [%s], space_id [%s],tableIdDataId[%v]", spaceType, spaceId, tableIdDataId)
if err != nil {
logger.Errorf("composeData: GetSpaceTableIdDataId failed,space_type [%s], space_id [%s],err[%v]", spaceType, spaceId, err)
return nil, err
Expand All @@ -1551,9 +1575,9 @@ func (s *SpacePusher) composeData(spaceType, spaceId string, tableIdList []strin
for tableId := range tableIdDataId {
tableIds = append(tableIds, tableId)
}
// 提取仅包含写入 influxdb 和 vm 的结果表
// 提取具备VM、ES、InfluxDB的链路结果表
tableIds, err = s.refineTableIds(tableIds)
// 再一次过滤,过滤到有链路的结果表,并且写入 influxdb 或 vm 的数据
// 再一次过滤,过滤到有链路的结果表,并且写入 influxdb&vm&es 的数据
tableIdDataIdMap := make(map[string]uint)
var dataIdList []uint
for _, tableId := range tableIds {
Expand Down
20 changes: 18 additions & 2 deletions pkg/bk-monitor-worker/internal/metadata/service/spaceredis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,8 +286,11 @@ func TestSpacePusher_getTableIdClusterId(t *testing.T) {
}

func TestSpacePusher_refineTableIds(t *testing.T) {
// 初始化测试数据库配置
mocker.InitTestDBConfig("../../../bmw_test.yaml")
db := mysql.GetDBSession().DB

// 创建 Influxdb 表数据
itableName := "i_table_test.dbname"
iTable := storage.InfluxdbStorage{TableID: itableName, RealTableName: "i_table_test", Database: "dbname"}
db.Delete(&iTable)
Expand All @@ -300,16 +303,29 @@ func TestSpacePusher_refineTableIds(t *testing.T) {
err = iTable1.Create(db)
assert.NoError(t, err)

// 创建 VM 表数据
vmTableName := "vm_table_name"
vmTable := storage.AccessVMRecord{ResultTableId: vmTableName}
db.Delete(&vmTable)
err = vmTable.Create(db)
assert.NoError(t, err)

// 创建 ES 表数据
esTableName := "es_table_name"
esTable := storage.ESStorage{TableID: esTableName, NeedCreateIndex: true}
db.Delete(&esTable)
err = esTable.Create(db)
assert.NoError(t, err)

// 不存在的表
notExistTable := "not_exist_rt"

ids, err := NewSpacePusher().refineTableIds([]string{itableName, itableName1, notExistTable, vmTableName})
assert.ElementsMatch(t, []string{itableName, itableName1, vmTableName}, ids)
// 调用 refineTableIds 方法
ids, err := NewSpacePusher().refineTableIds([]string{itableName, itableName1, notExistTable, vmTableName, esTableName})

// 断言结果,期望返回正确的表 ID
assert.NoError(t, err)
assert.ElementsMatch(t, []string{itableName, itableName1, vmTableName, esTableName}, ids)
}

func TestSpacePusher_refineEsTableIds(t *testing.T) {
Expand Down

0 comments on commit d9c762b

Please sign in to comment.