Skip to content

Commit

Permalink
feat: unifyquery 适配查询 es 集群的无损切换 --story=121259459 (#672)
Browse files Browse the repository at this point in the history
  • Loading branch information
shamcleren authored Jan 2, 2025
1 parent d98b081 commit 6536611
Show file tree
Hide file tree
Showing 29 changed files with 1,366 additions and 389 deletions.
31 changes: 30 additions & 1 deletion pkg/unify-query/influxdb/router_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"context"
"fmt"
"sync"
"time"

goRedis "github.com/go-redis/redis/v8"
"github.com/spf13/viper"
Expand Down Expand Up @@ -107,11 +108,13 @@ func MockSpaceRouter(ctx context.Context) {
"pod_with_replicaset_relation",
"apm_service_instance_with_pod_relation",
"apm_service_instance_with_system_relation",
"kubelet_info",
}
influxdbFields := []string{
"kube_pod_info",
"kube_node_info",
"kube_node_status_condition",
"kubelet_cluster_request_total",
}

tsdb.SetStorage(
Expand Down Expand Up @@ -183,6 +186,16 @@ func MockSpaceRouter(ctx context.Context) {
},
},
ir.ResultTableDetailInfo{
"result_table.kubelet_info": &ir.ResultTableDetail{
StorageId: 2,
TableId: "result_table.kubelet_info",
VmRt: "2_bcs_prom_computation_result_table",
Fields: vmFiedls,
DB: "other",
Measurement: "kubelet_info",
BcsClusterID: "BCS-K8S-00000",
MeasurementType: redis.BkSplitMeasurement,
},
"system.cpu_summary": &ir.ResultTableDetail{
StorageId: 2,
TableId: "system.cpu_summary",
Expand Down Expand Up @@ -228,6 +241,18 @@ func MockSpaceRouter(ctx context.Context) {
ResultTableEs: &ir.ResultTableDetail{
StorageId: 3,
TableId: ResultTableEs,
StorageClusterRecords: []ir.Record{
{
StorageID: 3,
// 2019-12-02 08:00:00
EnableTime: 1575244800,
},
{
StorageID: 4,
// 2019-11-02 08:00:00
EnableTime: 1572652800,
},
},
},
ResultTableBkSQL: &ir.ResultTableDetail{
StorageId: 4,
Expand All @@ -251,12 +276,16 @@ func MockSpaceRouter(ctx context.Context) {
func setSpaceTsDbMockData(ctx context.Context, bkAppSpace ir.BkAppSpace, spaceInfo ir.SpaceInfo, rtInfo ir.ResultTableDetailInfo, fieldInfo ir.FieldToResultTable, dataLabelInfo ir.DataLabelToResultTable) {
mockRedisOnce.Do(func() {
setRedisClient(ctx)

})

sr, err := SetSpaceTsDbRouter(ctx, "mock", "mock", "", 100)
mockPath := "mock" + time.Now().String()
sr, err := SetSpaceTsDbRouter(ctx, mockPath, mockPath, "", 5, false)
if err != nil {
panic(err)
}
sr.cache.Clear()

for bkApp, spaceUidList := range bkAppSpace {
err = sr.Add(ctx, ir.BkAppToSpaceKey, bkApp, spaceUidList)
if err != nil {
Expand Down
15 changes: 9 additions & 6 deletions pkg/unify-query/influxdb/spacetsdb_router.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,15 @@ type SpaceTsDbRouter struct {
kvBucketName string
kvPath string
kvClient kvstore.KVStore
cache memcache.Cache
hasInit bool
batchSize int

isCache bool
cache memcache.Cache
hasInit bool
batchSize int
}

// SetSpaceTsDbRouter 设置全局可用的 Router 单例,用于管理空间数据
func SetSpaceTsDbRouter(ctx context.Context, kvPath string, kvBucketName string, routerPrefix string, batchSize int) (*SpaceTsDbRouter, error) {
func SetSpaceTsDbRouter(ctx context.Context, kvPath string, kvBucketName string, routerPrefix string, batchSize int, isCache bool) (*SpaceTsDbRouter, error) {
globalSpaceTsDbRouterLock.Lock()
defer globalSpaceTsDbRouterLock.Unlock()
if globalSpaceTsDbRouter != nil {
Expand All @@ -67,6 +69,7 @@ func SetSpaceTsDbRouter(ctx context.Context, kvPath string, kvBucketName string,
kvPath: kvPath,
routerPrefix: routerPrefix,
batchSize: batchSize,
isCache: isCache,
}
err := globalSpaceTsDbRouter.initRouter(ctx)
if err != nil {
Expand Down Expand Up @@ -178,7 +181,7 @@ func (r *SpaceTsDbRouter) Get(ctx context.Context, stoPrefix string, stoKey stri
log.Warnf(ctx, "Fail to new generic value, %s", err)
return nil
}
if cached {
if cached && r.isCache {
data, exist := r.cache.Get(stoKey)
if exist {
// 存入缓存的数据可能有 nil 情况,需要兼容
Expand Down Expand Up @@ -209,7 +212,7 @@ func (r *SpaceTsDbRouter) Get(ctx context.Context, stoPrefix string, stoKey stri
}
}
// 添加缓存
if cached {
if cached && r.isCache {
// NOTE: 暂时使用 20 作为随机
expiredTime := viper.GetInt64(memcache.RistrettoExpiredTimePath) + rand.Int63n(viper.GetInt64(memcache.RistrettoExpiredTimeFluxValuePath))
r.cache.SetWithTTL(stoKey, stoVal, 0, time.Duration(expiredTime)*time.Minute)
Expand Down
2 changes: 1 addition & 1 deletion pkg/unify-query/influxdb/spacetsdb_router_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func (s *TestSuite) SetupTest() {
"script_hhb_test.group3",
"{\"storage_id\":2,\"cluster_name\":\"default\",\"db\":\"script_hhb_test\",\"measurement\":\"group3\",\"vm_rt\":\"\",\"tags_key\":[],\"fields\":[\"disk_usage30\",\"disk_usage8\",\"disk_usage27\",\"disk_usage4\",\"disk_usage24\",\"disk_usage11\",\"disk_usage7\",\"disk_usage5\",\"disk_usage20\",\"disk_usage25\",\"disk_usage10\",\"disk_usage6\",\"disk_usage19\",\"disk_usage18\",\"disk_usage17\",\"disk_usage15\",\"disk_usage22\",\"disk_usage28\",\"disk_usage21\",\"disk_usage26\",\"disk_usage13\",\"disk_usage14\",\"disk_usage12\",\"disk_usage23\",\"disk_usage3\",\"disk_usage16\",\"disk_usage9\"],\"measurement_type\":\"bk_exporter\",\"bcs_cluster_id\":\"\",\"data_label\":\"script_hhb_test\",\"bk_data_id\": 11}")

router, err := SetSpaceTsDbRouter(s.ctx, "spacetsdb_test.db", "spacetsdb_test", "bkmonitorv3:spaces", 100)
router, err := SetSpaceTsDbRouter(s.ctx, "spacetsdb_test.db", "spacetsdb_test", "bkmonitorv3:spaces", 100, false)
if err != nil {
panic(err)
}
Expand Down
118 changes: 118 additions & 0 deletions pkg/unify-query/internal/function/samples.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
// Tencent is pleased to support the open source community by making
// 蓝鲸智云 - 监控平台 (BlueKing - Monitor) available.
// Copyright (C) 2022 THL A29 Limited, a Tencent company. All rights reserved.
// Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License.
// You may obtain a copy of the License at http://opensource.org/licenses/MIT
// Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
// an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
// specific language governing permissions and limitations under the License.

package function

import (
"sort"
"strings"

"github.com/prometheus/prometheus/prompb"

"github.com/TencentBlueKing/bkmonitor-datalink/pkg/unify-query/internal/set"
)

const (
Min = "min"
Max = "max"
Avg = "avg"
Sum = "sum"
Count = "count"
)

// MergeSamplesWithFuncAndSort 合并 samples 数据,如果相同时间的进行函数处理,并且按照时间排序
func MergeSamplesWithFuncAndSort(name string) func(samplesList ...[]prompb.Sample) []prompb.Sample {
return func(samplesList ...[]prompb.Sample) []prompb.Sample {
var (
aggFunc func(i, j float64) float64
)
switch strings.ToLower(name) {
case Min:
aggFunc = func(i, j float64) float64 {
if i < j {
return i
}
return j
}
case Max:
aggFunc = func(i, j float64) float64 {
if i > j {
return i
}
return j
}
default:
aggFunc = func(i, j float64) float64 {
return i + j
}
}

// 生成 sampleMap 用户合并计算
sampleMap := make(map[int64]float64)
countMap := make(map[int64]float64)

// 生成时间 set 用于排序
timestampSet := set.New[int64]()

for _, samples := range samplesList {
for _, sample := range samples {
timestampSet.Add(sample.GetTimestamp())

if v, ok := sampleMap[sample.GetTimestamp()]; ok {
sampleMap[sample.GetTimestamp()] = aggFunc(v, sample.GetValue())
} else {
sampleMap[sample.GetTimestamp()] = sample.GetValue()
}
countMap[sample.GetTimestamp()] += 1
}
}

out := make([]prompb.Sample, timestampSet.Size())

// 正序
timestamps := timestampSet.ToArray()
sort.Slice(timestamps, func(i, j int) bool {
return timestamps[i] < timestamps[j]
})

for i, timestamp := range timestamps {
var value float64
switch name {
// Avg 方法需要等所有的数据合并了之后,再做计算
case Avg:
if countMap[timestamp] > 0 {
value = sampleMap[timestamp] / countMap[timestamp]
} else {
value = 0
}
default:
value = sampleMap[timestamp]
}

out[i] = prompb.Sample{
Timestamp: timestamp,
Value: value,
}
}
return out
}
}

// MergeSamplesWithUnionAndSort 合并 samples 数据,如果相同时间的则追加,并且按照时间排序
func MergeSamplesWithUnionAndSort(samplesList ...[]prompb.Sample) []prompb.Sample {
var out []prompb.Sample
for _, samples := range samplesList {
out = append(out, samples...)
}

sort.Slice(out, func(i, j int) bool {
return out[i].GetTimestamp() < out[j].GetTimestamp()
})
return out
}
133 changes: 133 additions & 0 deletions pkg/unify-query/internal/function/samples_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
// Tencent is pleased to support the open source community by making
// 蓝鲸智云 - 监控平台 (BlueKing - Monitor) available.
// Copyright (C) 2022 THL A29 Limited, a Tencent company. All rights reserved.
// Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License.
// You may obtain a copy of the License at http://opensource.org/licenses/MIT
// Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
// an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
// specific language governing permissions and limitations under the License.

package function

import (
"testing"

"github.com/prometheus/prometheus/prompb"
"github.com/stretchr/testify/assert"
)

func TestMergeSamples(t *testing.T) {
t1 := []prompb.Sample{
{
Timestamp: 1734462839000,
Value: 5,
},
{
Timestamp: 1734462719000,
Value: 0.1,
},
{
Timestamp: 1734462719000,
Value: 0.2,
},
}

t2 := []prompb.Sample{
{
Timestamp: 1734462779000,
Value: 2,
},
{
Timestamp: 1734462719000,
Value: 3,
},
}

t3 := MergeSamplesWithFuncAndSort(Sum)(t1, t2)
assert.Equal(t, []prompb.Sample{
{
Timestamp: 1734462719000,
Value: 3.3,
},
{
Timestamp: 1734462779000,
Value: 2,
},
{
Timestamp: 1734462839000,
Value: 5,
},
}, t3)

t4 := MergeSamplesWithFuncAndSort(Min)(t1, t2)
assert.Equal(t, []prompb.Sample{
{
Timestamp: 1734462719000,
Value: 0.1,
},
{
Timestamp: 1734462779000,
Value: 2,
},
{
Timestamp: 1734462839000,
Value: 5,
},
}, t4)

t5 := MergeSamplesWithFuncAndSort(Max)(t1, t2)
assert.Equal(t, []prompb.Sample{
{
Timestamp: 1734462719000,
Value: 3,
},
{
Timestamp: 1734462779000,
Value: 2,
},
{
Timestamp: 1734462839000,
Value: 5,
},
}, t5)

t6 := MergeSamplesWithFuncAndSort(Avg)(t1, t2)
assert.Equal(t, []prompb.Sample{
{
Timestamp: 1734462719000,
Value: 1.0999999999999999,
},
{
Timestamp: 1734462779000,
Value: 2,
},
{
Timestamp: 1734462839000,
Value: 5,
},
}, t6)

t8 := MergeSamplesWithUnionAndSort(t1, t2)
assert.Equal(t, []prompb.Sample{
{
Timestamp: 1734462719000,
Value: 0.1,
},
{
Timestamp: 1734462719000,
Value: 0.2,
},
{
Timestamp: 1734462719000,
Value: 3,
},
{
Timestamp: 1734462779000,
Value: 2,
},
{
Timestamp: 1734462839000,
Value: 5,
},
}, t8)
}
1 change: 1 addition & 0 deletions pkg/unify-query/memcache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,5 @@ type Cache interface {
Set(string, interface{}, int64) bool
SetWithTTL(string, interface{}, int64, time.Duration) bool
Del(string)
Clear()
}
4 changes: 4 additions & 0 deletions pkg/unify-query/memcache/ristretto.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,3 +67,7 @@ func (c *Ristretto) SetWithTTL(key string, val interface{}, cost int64, t time.D
func (c *Ristretto) Del(key string) {
c.cache.Del(key)
}

func (c *Ristretto) Clear() {
c.cache.Clear()
}
Loading

0 comments on commit 6536611

Please sign in to comment.