Skip to content

Commit

Permalink
feat: 优化 bksql 查询逻辑,增加 thedate 条件过滤 --story=121169369 (#664)
Browse files Browse the repository at this point in the history
  • Loading branch information
shamcleren authored Dec 18, 2024
1 parent de933ca commit 82b37ec
Show file tree
Hide file tree
Showing 10 changed files with 228 additions and 39 deletions.
4 changes: 2 additions & 2 deletions pkg/unify-query/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ lint:

.PHONY: addlicense
addlicense:
find ./ -type f \( -iname \*.go -o -iname \*.py -iname \*.sh \)|xargs addlicense -v -f ../../scripts/license.txt -ignore vendor/*
find ./ -type f \( -iname \*.go -o -iname \*.py -iname \*.sh \) | xargs addlicense -v -f ../../scripts/license.txt -ignore vendor/*

.PHONY: imports
imports: addlicense
goimports-reviser -rm-unused -set-alias -format ./...
goimports-reviser -rm-unused -set-alias -format -project-name "github.com/TencentBlueKing/bkmonitor-datalink/pkg" ./...
6 changes: 6 additions & 0 deletions pkg/unify-query/influxdb/router_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,12 @@ func MockSpaceRouter(ctx context.Context) {
"false": false
},
"targeting": [
{
"query": "spaceUID in [\"bkdata\"]",
"percentage": {
"false": 100
}
}
],
"defaultRule": {
"variation": "true"
Expand Down
39 changes: 39 additions & 0 deletions pkg/unify-query/internal/function/function.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
package function

import (
"time"

"github.com/prometheus/prometheus/model/labels"
)

Expand All @@ -24,3 +26,40 @@ func MatcherToMetricName(matchers ...*labels.Matcher) string {

return ""
}

func RangeDateWithUnit(unit string, start, end time.Time, step int) (dates []string) {
var (
addYear int
addMonth int
addDay int
toDate func(t time.Time) time.Time
format string
)

switch unit {
case "year":
addYear = step
format = "2006"
toDate = func(t time.Time) time.Time {
return time.Date(t.Year(), 1, 1, 0, 0, 0, 0, t.Location())
}
case "month":
addMonth = step
format = "200601"
toDate = func(t time.Time) time.Time {
return time.Date(t.Year(), t.Month(), 1, 0, 0, 0, 0, t.Location())
}
default:
addDay = step
format = "20060102"
toDate = func(t time.Time) time.Time {
return time.Date(t.Year(), t.Month(), t.Day(), 0, 0, 0, 0, t.Location())
}
}

for d := toDate(start); !d.After(toDate(end)); d = d.AddDate(addYear, addMonth, addDay) {
dates = append(dates, d.Format(format))
}

return dates
}
3 changes: 2 additions & 1 deletion pkg/unify-query/metadata/featureFlag.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,8 @@ func GetMustVmQueryFeatureFlag(ctx context.Context, tableID string) bool {

span.Set("ff-user-custom", ffUser.GetCustom())

status := featureFlag.BoolVariation(ctx, ffUser, "must-vm-query", false)
// 如果匹配不到,则默认查询 vm
status := featureFlag.BoolVariation(ctx, ffUser, "must-vm-query", true)

// 根据查询时间范围判断是否满足当前时间配置
vmDataTime := featureFlag.IntVariation(ctx, ffUser, "range-vm-query", 0)
Expand Down
45 changes: 45 additions & 0 deletions pkg/unify-query/metadata/featureFlag_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,51 @@ import (
"github.com/TencentBlueKing/bkmonitor-datalink/pkg/unify-query/log"
)

func TestGetBkDataTableIDCheck(t *testing.T) {
ctx := InitHashID(context.Background())
InitMetadata()

featureFlag.MockFeatureFlag(ctx, `{
"bk-data-table-id-auth": {
"variations": {
"Default": true,
"true": true,
"false": false
},
"targeting": [
{
"query": "spaceUid in [\"bkdata\", \"bkdata_1\"]",
"percentage": {
"false": 100,
"true": 0
}
}
],
"defaultRule": {
"variation": "Default"
}
}
}`)

var actual bool

SetUser(ctx, "", "bkdata_1", "")
actual = GetBkDataTableIDCheck(ctx)
assert.Equal(t, false, actual)

SetUser(ctx, "", "bkmonitor", "")
actual = GetBkDataTableIDCheck(ctx)
assert.Equal(t, true, actual)

SetUser(ctx, "", "bkdata_1_1", "")
actual = GetBkDataTableIDCheck(ctx)
assert.Equal(t, true, actual)

SetUser(ctx, "", "bkdata", "")
actual = GetBkDataTableIDCheck(ctx)
assert.Equal(t, false, actual)
}

func TestGetMustVmQueryFeatureFlag(t *testing.T) {
ctx := context.Background()

Expand Down
36 changes: 36 additions & 0 deletions pkg/unify-query/tsdb/bksql/format.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (

"github.com/prometheus/prometheus/prompb"

"github.com/TencentBlueKing/bkmonitor-datalink/pkg/unify-query/internal/function"
"github.com/TencentBlueKing/bkmonitor-datalink/pkg/unify-query/metadata"
)

Expand Down Expand Up @@ -131,6 +132,31 @@ func (f *QueryFactory) ParserQuery() (err error) {
return
}

func (f *QueryFactory) getTheDateFilters() (theDateFilter string, err error) {
// bkbase 使用 时区东八区 转换为 thedate
loc, err := time.LoadLocation("Asia/Shanghai")
if err != nil {
return
}

start := f.start.In(loc)
end := f.end.In(loc)

dates := function.RangeDateWithUnit("day", start, end, 1)

if len(dates) == 0 {
return
}

if len(dates) == 1 {
theDateFilter = fmt.Sprintf("`%s` = '%s'", theDate, dates[0])
return
}

theDateFilter = fmt.Sprintf("`%s` >= '%s' AND `%s` <= '%s'", theDate, dates[0], theDate, dates[len(dates)-1])
return
}

func (f *QueryFactory) SQL() (sql string, err error) {
f.sql.Reset()
err = f.ParserQuery()
Expand All @@ -151,6 +177,16 @@ func (f *QueryFactory) SQL() (sql string, err error) {
f.write(db)
f.write("WHERE")
f.write(fmt.Sprintf("`%s` >= %d AND `%s` < %d", f.timeField, f.start.UnixMilli(), f.timeField, f.end.UnixMilli()))

theDateFilter, err := f.getTheDateFilters()
if err != nil {
return
}
if theDateFilter != "" {
f.write("AND")
f.write(theDateFilter)
}

if f.query.BkSqlCondition != "" {
f.write("AND")
f.write("(" + f.query.BkSqlCondition + ")")
Expand Down
46 changes: 41 additions & 5 deletions pkg/unify-query/tsdb/bksql/format_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (

"github.com/stretchr/testify/assert"

"github.com/TencentBlueKing/bkmonitor-datalink/pkg/unify-query/log"
"github.com/TencentBlueKing/bkmonitor-datalink/pkg/unify-query/metadata"
)

Expand All @@ -26,6 +27,9 @@ func TestNewSqlFactory(t *testing.T) {
for name, c := range map[string]struct {
query *metadata.Query
expected string

start time.Time
end time.Time
}{
"sum-count_over_time-with-promql-1": {
query: &metadata.Query{
Expand All @@ -46,7 +50,7 @@ func TestNewSqlFactory(t *testing.T) {
Size: 0,
Orders: metadata.Orders{"ip": true},
},
expected: "SELECT `ip`, COUNT(`gseIndex`) AS `_value_`, MAX((`dtEventTimeStamp` - (`dtEventTimeStamp` % 60000))) AS `_timestamp_` FROM `100133_ieod_logsearch4_errorlog_p`.doris WHERE `dtEventTimeStamp` >= 1717144141000 AND `dtEventTimeStamp` < 1717147741000 AND (gseIndex > 0) GROUP BY `ip`, (`dtEventTimeStamp` - (`dtEventTimeStamp` % 60000)) ORDER BY `_timestamp_` ASC, `ip` ASC",
expected: "SELECT `ip`, COUNT(`gseIndex`) AS `_value_`, MAX((`dtEventTimeStamp` - (`dtEventTimeStamp` % 60000))) AS `_timestamp_` FROM `100133_ieod_logsearch4_errorlog_p`.doris WHERE `dtEventTimeStamp` >= 1717144141000 AND `dtEventTimeStamp` < 1717147741000 AND `thedate` = '20240531' AND (gseIndex > 0) GROUP BY `ip`, (`dtEventTimeStamp` - (`dtEventTimeStamp` % 60000)) ORDER BY `_timestamp_` ASC, `ip` ASC",
},
"sum-with-promql-1": {
query: &metadata.Query{
Expand All @@ -66,9 +70,33 @@ func TestNewSqlFactory(t *testing.T) {
Size: 10,
Orders: nil,
},
expected: "SELECT `ip`, SUM(`gseIndex`) AS `_value_` FROM `100133_ieod_logsearch4_errorlog_p`.doris WHERE `dtEventTimeStamp` >= 1717144141000 AND `dtEventTimeStamp` < 1717147741000 AND (gseIndex > 0) GROUP BY `ip` LIMIT 10",
expected: "SELECT `ip`, SUM(`gseIndex`) AS `_value_` FROM `100133_ieod_logsearch4_errorlog_p`.doris WHERE `dtEventTimeStamp` >= 1717144141000 AND `dtEventTimeStamp` < 1717147741000 AND `thedate` = '20240531' AND (gseIndex > 0) GROUP BY `ip` LIMIT 10",
},
"count-with-count-promql-1": {
query: &metadata.Query{
DB: "100133_ieod_logsearch4_errorlog_p",
Measurement: "doris",
Field: "gseIndex",
Aggregates: metadata.Aggregates{
{
Name: "count",
Dimensions: []string{
"ip",
},
Window: time.Minute,
},
},
BkSqlCondition: "gseIndex > 0",
},
expected: "SELECT `ip`, COUNT(`gseIndex`) AS `_value_`, MAX((`dtEventTimeStamp` - (`dtEventTimeStamp` % 60000))) AS `_timestamp_` FROM `100133_ieod_logsearch4_errorlog_p`.doris WHERE `dtEventTimeStamp` >= 1717144141000 AND `dtEventTimeStamp` < 1717147741000 AND `thedate` = '20240531' AND (gseIndex > 0) GROUP BY `ip`, (`dtEventTimeStamp` - (`dtEventTimeStamp` % 60000)) ORDER BY `_timestamp_` ASC",
},
"count-without-promql-1": {
"count-with-count-promql-2": {
// 2024-12-07 21:36:40 UTC
// 2024-12-08 05:36:40 Asia/ShangHai
start: time.Unix(1733607400, 0),
// 2024-12-11 17:49:35 UTC
// 2024-12-12 01:49:35 Asia/ShangHai
end: time.Unix(1733939375, 0),
query: &metadata.Query{
DB: "100133_ieod_logsearch4_errorlog_p",
Measurement: "doris",
Expand All @@ -84,12 +112,20 @@ func TestNewSqlFactory(t *testing.T) {
},
BkSqlCondition: "gseIndex > 0",
},
expected: "SELECT `ip`, COUNT(`gseIndex`) AS `_value_`, MAX((`dtEventTimeStamp` - (`dtEventTimeStamp` % 60000))) AS `_timestamp_` FROM `100133_ieod_logsearch4_errorlog_p`.doris WHERE `dtEventTimeStamp` >= 1717144141000 AND `dtEventTimeStamp` < 1717147741000 AND (gseIndex > 0) GROUP BY `ip`, (`dtEventTimeStamp` - (`dtEventTimeStamp` % 60000)) ORDER BY `_timestamp_` ASC",
expected: "SELECT `ip`, COUNT(`gseIndex`) AS `_value_`, MAX((`dtEventTimeStamp` - (`dtEventTimeStamp` % 60000))) AS `_timestamp_` FROM `100133_ieod_logsearch4_errorlog_p`.doris WHERE `dtEventTimeStamp` >= 1733607400000 AND `dtEventTimeStamp` < 1733939375000 AND `thedate` >= '20241208' AND `thedate` <= '20241212' AND (gseIndex > 0) GROUP BY `ip`, (`dtEventTimeStamp` - (`dtEventTimeStamp` % 60000)) ORDER BY `_timestamp_` ASC",
},
} {
t.Run(name, func(t *testing.T) {
ctx := metadata.InitHashID(context.Background())
fact := NewQueryFactory(ctx, c.query).WithRangeTime(start, end)
if c.start.Unix() == 0 {
c.start = start
}
if c.end.Unix() == 0 {
c.end = end
}

log.Infof(ctx, "start: %s, end: %s", c.start, c.end)
fact := NewQueryFactory(ctx, c.query).WithRangeTime(c.start, c.end)
sql, err := fact.SQL()
assert.Nil(t, err)
assert.Equal(t, c.expected, sql)
Expand Down
3 changes: 3 additions & 0 deletions pkg/unify-query/tsdb/bksql/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,9 @@ func (i *Instance) QuerySeriesSet(ctx context.Context, query *metadata.Query, st
ctx, span := trace.NewSpan(ctx, "bk-sql-raw")
defer span.End(&err)

span.Set("query-series-set-start", start)
span.Set("query-series-set-end", end)

if start.UnixMilli() > end.UnixMilli() || start.UnixMilli() == 0 {
return storage.ErrSeriesSet(fmt.Errorf("range time is error, start: %s, end: %s ", start, end))
}
Expand Down
52 changes: 46 additions & 6 deletions pkg/unify-query/tsdb/bksql/instance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,8 @@ func TestInstance_bkSql(t *testing.T) {
end := time.UnixMilli(1718193555000)

testCases := []struct {
start time.Time
end time.Time
query *metadata.Query

expected string
Expand All @@ -143,7 +145,7 @@ func TestInstance_bkSql(t *testing.T) {
},
},
},
expected: "SELECT `namespace`, COUNT(`login_rate`) AS `_value_`, MAX((`dtEventTimeStamp` - (`dtEventTimeStamp` % 15000))) AS `_timestamp_` FROM `132_lol_new_login_queue_login_1min` WHERE `dtEventTimeStamp` >= 1718189940000 AND `dtEventTimeStamp` < 1718193555000 AND (namespace REGEXP '^(bgp2\\-new|gz100)$') GROUP BY `namespace`, (`dtEventTimeStamp` - (`dtEventTimeStamp` % 15000)) ORDER BY `_timestamp_` ASC",
expected: "SELECT `namespace`, COUNT(`login_rate`) AS `_value_`, MAX((`dtEventTimeStamp` - (`dtEventTimeStamp` % 15000))) AS `_timestamp_` FROM `132_lol_new_login_queue_login_1min` WHERE `dtEventTimeStamp` >= 1718189940000 AND `dtEventTimeStamp` < 1718193555000 AND `thedate` = '20240612' AND (namespace REGEXP '^(bgp2\\-new|gz100)$') GROUP BY `namespace`, (`dtEventTimeStamp` - (`dtEventTimeStamp` % 15000)) ORDER BY `_timestamp_` ASC",
},
{
query: &metadata.Query{
Expand All @@ -156,7 +158,7 @@ func TestInstance_bkSql(t *testing.T) {
},
},

expected: "SELECT SUM(`value`) AS `_value_` FROM `132_hander_opmon_avg` WHERE `dtEventTimeStamp` >= 1718189940000 AND `dtEventTimeStamp` < 1718193555000",
expected: "SELECT SUM(`value`) AS `_value_` FROM `132_hander_opmon_avg` WHERE `dtEventTimeStamp` >= 1718189940000 AND `dtEventTimeStamp` < 1718193555000 AND `thedate` = '20240612'",
},
{
query: &metadata.Query{
Expand All @@ -165,7 +167,7 @@ func TestInstance_bkSql(t *testing.T) {
Field: "value",
Size: 5,
},
expected: "SELECT *, `value` AS `_value_`, `dtEventTimeStamp` AS `_timestamp_` FROM `100133_ieod_logsearch4_errorlog_p`.doris WHERE `dtEventTimeStamp` >= 1718189940000 AND `dtEventTimeStamp` < 1718193555000 LIMIT 5",
expected: "SELECT *, `value` AS `_value_`, `dtEventTimeStamp` AS `_timestamp_` FROM `100133_ieod_logsearch4_errorlog_p`.doris WHERE `dtEventTimeStamp` >= 1718189940000 AND `dtEventTimeStamp` < 1718193555000 AND `thedate` = '20240612' LIMIT 5",
},
{
query: &metadata.Query{
Expand All @@ -176,7 +178,7 @@ func TestInstance_bkSql(t *testing.T) {
"_time": false,
},
},
expected: "SELECT *, `value` AS `_value_`, `dtEventTimeStamp` AS `_timestamp_` FROM `100133_ieod_logsearch4_errorlog_p`.doris WHERE `dtEventTimeStamp` >= 1718189940000 AND `dtEventTimeStamp` < 1718193555000 ORDER BY `_timestamp_` DESC",
expected: "SELECT *, `value` AS `_value_`, `dtEventTimeStamp` AS `_timestamp_` FROM `100133_ieod_logsearch4_errorlog_p`.doris WHERE `dtEventTimeStamp` >= 1718189940000 AND `dtEventTimeStamp` < 1718193555000 AND `thedate` = '20240612' ORDER BY `_timestamp_` DESC",
},
{
query: &metadata.Query{
Expand All @@ -194,14 +196,52 @@ func TestInstance_bkSql(t *testing.T) {
Size: 5,
},

expected: "SELECT `ip`, COUNT(`gseIndex`) AS `_value_` FROM `100133_ieod_logsearch4_errorlog_p`.doris WHERE `dtEventTimeStamp` >= 1718189940000 AND `dtEventTimeStamp` < 1718193555000 GROUP BY `ip` LIMIT 5",
expected: "SELECT `ip`, COUNT(`gseIndex`) AS `_value_` FROM `100133_ieod_logsearch4_errorlog_p`.doris WHERE `dtEventTimeStamp` >= 1718189940000 AND `dtEventTimeStamp` < 1718193555000 AND `thedate` = '20240612' GROUP BY `ip` LIMIT 5",
},
{
start: time.Unix(1733756400, 0),
end: time.Unix(1733846399, 0),
query: &metadata.Query{
DB: "101068_MatchFullLinkTimeConsumptionFlow_CostTime",
Field: "matchstep_start_to_fail_0_100",
Aggregates: metadata.Aggregates{
{
Name: "count",
},
},
},

expected: "SELECT COUNT(`matchstep_start_to_fail_0_100`) AS `_value_` FROM `101068_MatchFullLinkTimeConsumptionFlow_CostTime` WHERE `dtEventTimeStamp` >= 1733756400000 AND `dtEventTimeStamp` < 1733846399000 AND `thedate` >= '20241209' AND `thedate` <= '20241210'",
},
{
start: time.Unix(1733756400, 0),
end: time.Unix(1733846399, 0),
query: &metadata.Query{
DB: "101068_MatchFullLinkTimeConsumptionFlow_CostTime",
Field: "matchstep_start_to_fail_0_100",
Aggregates: metadata.Aggregates{
{
Name: "count",
Window: time.Hour,
},
},
},

expected: "SELECT COUNT(`matchstep_start_to_fail_0_100`) AS `_value_`, MAX((`dtEventTimeStamp` - (`dtEventTimeStamp` % 3600000))) AS `_timestamp_` FROM `101068_MatchFullLinkTimeConsumptionFlow_CostTime` WHERE `dtEventTimeStamp` >= 1733756400000 AND `dtEventTimeStamp` < 1733846399000 AND `thedate` >= '20241209' AND `thedate` <= '20241210' GROUP BY (`dtEventTimeStamp` - (`dtEventTimeStamp` % 3600000)) ORDER BY `_timestamp_` ASC",
},
}

for i, c := range testCases {
t.Run(fmt.Sprintf("%d", i), func(t *testing.T) {
ctx := metadata.InitHashID(context.Background())
sql, err := NewQueryFactory(ctx, c.query).WithRangeTime(start, end).SQL()
if c.start.Unix() == 0 {
c.start = start
}
if c.end.Unix() == 0 {
c.end = end
}

sql, err := NewQueryFactory(ctx, c.query).WithRangeTime(c.start, c.end).SQL()
assert.Nil(t, err)
if err == nil {
assert.Equal(t, c.expected, sql)
Expand Down
Loading

0 comments on commit 82b37ec

Please sign in to comment.