Skip to content

Commit

Permalink
Merge pull request #141 from taosdata/feat/TD-30682
Browse files Browse the repository at this point in the history
 support  sub table name readable
  • Loading branch information
sheyanjie-qq authored Sep 19, 2024
2 parents b0475e6 + 5a70af6 commit b701f00
Show file tree
Hide file tree
Showing 5 changed files with 276 additions and 8 deletions.
15 changes: 13 additions & 2 deletions api/adapter2.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,19 @@ func (a *Adapter) parseSql(report AdapterReport) string {
}

func (a *Adapter) tableName(endpoint string, reqType adapterReqType) string {
sum := md5.Sum([]byte(fmt.Sprintf("%s%d", endpoint, reqType)))
return fmt.Sprintf("t_%s", hex.EncodeToString(sum[:]))
var tbname string
if reqType == rest {
tbname = fmt.Sprintf("adapter_req_%s_%s", endpoint, "rest")
} else {
tbname = fmt.Sprintf("adapter_req_%s_%s", endpoint, "ws")
}

if len(tbname) <= util.MAX_TABLE_NAME_LEN {
return util.ToValidTableName(tbname)
} else {
sum := md5.Sum([]byte(fmt.Sprintf("%s%d", endpoint, reqType)))
return fmt.Sprintf("adapter_req_%s", hex.EncodeToString(sum[:]))
}
}

func (a *Adapter) createDatabase() error {
Expand Down
125 changes: 124 additions & 1 deletion api/gen_metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ var gmLogger = log.GetLogger("GEN")

var MAX_SQL_LEN = 1000000

var STABLE_NAME_KEY = "priv_stn"

type ColumnSeq struct {
tagNames []string
metricNames []string
Expand Down Expand Up @@ -168,7 +170,7 @@ func NewGeneralMetric(conf *config.Config) *GeneralMetric {
Scheme: protocol,
Host: fmt.Sprintf("%s:%d", conf.TDengine.Host, conf.TDengine.Port),
Path: "/influxdb/v1/write",
RawQuery: fmt.Sprintf("db=%s&precision=ms", conf.Metrics.Database.Name),
RawQuery: fmt.Sprintf("db=%s&precision=ms&table_name_key=%s", conf.Metrics.Database.Name, STABLE_NAME_KEY),
},
}
return imp
Expand Down Expand Up @@ -499,6 +501,127 @@ func writeTags(tags []Tag, stbName string, buf *bytes.Buffer) {
buf.WriteString(fmt.Sprintf(",%s=%s", name, "unknown"))
}
}

// have sub table name
if _, ok := tagMap[STABLE_NAME_KEY]; ok {
return
}

subTableName := get_sub_table_name_valid(stbName, tagMap)
if subTableName != "" {
buf.WriteString(fmt.Sprintf(",%s=%s", STABLE_NAME_KEY, subTableName))
} else {
gmLogger.Errorf("get sub stable name error, stable name:%s, tag map:%v", stbName, tagMap)
}
}

func checkKeysExist(data map[string]string, keys ...string) bool {
for _, key := range keys {
_, ok := data[key]
if !ok {
return false
}
}
return true
}

func get_sub_table_name_valid(stbName string, tagMap map[string]string) string {
stbName = get_sub_table_name(stbName, tagMap)
return util.ToValidTableName(stbName)
}

func get_sub_table_name(stbName string, tagMap map[string]string) string {
if strings.HasPrefix(stbName, "taosx") {
switch stbName {
case "taosx_sys":
if checkKeysExist(tagMap, "taosx_id") {
return fmt.Sprintf("sys_%s", tagMap["taosx_id"])
}
case "taosx_agent":
if checkKeysExist(tagMap, "taosx_id", "agent_id") {
return fmt.Sprintf("agent_%s_%s", tagMap["taosx_id"], tagMap["agent_id"])
}
case "taosx_connector":
if checkKeysExist(tagMap, "taosx_id", "ds_name", "task_id") {
return fmt.Sprintf("connector_%s_%s_%s", tagMap["taosx_id"], tagMap["ds_name"], tagMap["task_id"])
}
default:
if strings.HasPrefix(stbName, "taosx_task_") {
ds_name := stbName[len("taosx_task_"):]
if checkKeysExist(tagMap, "taosx_id", "task_id") {
return fmt.Sprintf("task_%s_%s_%s", tagMap["taosx_id"], ds_name, tagMap["task_id"])
}
}
return ""
}
}

switch stbName {
case "taosd_cluster_info":
if checkKeysExist(tagMap, "cluster_id") {
return fmt.Sprintf("cluster_%s", tagMap["cluster_id"])
}
case "taosd_vgroups_info":
if checkKeysExist(tagMap, "cluster_id", "vgroup_id", "database_name") {
return fmt.Sprintf("vginfo_%s_vgroup_%s_cluster_%s", tagMap["database_name"], tagMap["vgroup_id"], tagMap["cluster_id"])
}
case "taosd_dnodes_info":
if checkKeysExist(tagMap, "cluster_id", "dnode_id") {
return fmt.Sprintf("dinfo_%s_cluster_%s", tagMap["dnode_id"], tagMap["cluster_id"])
}
case "taosd_dnodes_status":
if checkKeysExist(tagMap, "cluster_id", "dnode_id") {
return fmt.Sprintf("dstatus_%s_cluster_%s", tagMap["dnode_id"], tagMap["cluster_id"])
}
case "taosd_dnodes_log_dirs":
if checkKeysExist(tagMap, "cluster_id", "dnode_id", "data_dir_name") {
subTableName := fmt.Sprintf("dlog_%s_%s_cluster_%s", tagMap["dnode_id"], tagMap["data_dir_name"], tagMap["cluster_id"])
if len(subTableName) <= util.MAX_TABLE_NAME_LEN {
return subTableName
}
return fmt.Sprintf("dlog_%s_%s_cluster_%s", tagMap["dnode_id"],
util.GetMd5HexStr(tagMap["data_dir_name"]),
tagMap["cluster_id"])
}
case "taosd_dnodes_data_dirs":
if checkKeysExist(tagMap, "cluster_id", "dnode_id", "data_dir_name", "data_dir_level") {
subTableName := fmt.Sprintf("ddata_%s_%s_level_%s_cluster_%s", tagMap["dnode_id"], tagMap["data_dir_name"], tagMap["data_dir_level"], tagMap["cluster_id"])
if len(subTableName) <= util.MAX_TABLE_NAME_LEN {
return subTableName
}
return fmt.Sprintf("ddata_%s_%s_level_%s_cluster_%s", tagMap["dnode_id"],
util.GetMd5HexStr(tagMap["data_dir_name"]),
tagMap["data_dir_level"],
tagMap["cluster_id"])
}
case "taosd_mnodes_info":
if checkKeysExist(tagMap, "cluster_id", "mnode_id") {
return fmt.Sprintf("minfo_%s_cluster_%s", tagMap["mnode_id"], tagMap["cluster_id"])
}
case "taosd_vnodes_info":
if checkKeysExist(tagMap, "cluster_id", "database_name", "vgroup_id", "dnode_id") {
return fmt.Sprintf("vninfo_%s_dnode_%s_vgroup_%s_cluster_%s", tagMap["database_name"], tagMap["dnode_id"], tagMap["vgroup_id"], tagMap["cluster_id"])
}
case "taosd_sql_req":
if checkKeysExist(tagMap, "username", "sql_type", "result", "dnode_id", "vgroup_id", "cluster_id") {
return fmt.Sprintf("taosdsql_%s_%s_%s_%s_vgroup_%s_cluster_%s", tagMap["username"],
tagMap["sql_type"], tagMap["result"], tagMap["dnode_id"], tagMap["vgroup_id"], tagMap["cluster_id"])
}
case "taos_sql_req":
if checkKeysExist(tagMap, "username", "sql_type", "result", "cluster_id") {
return fmt.Sprintf("taossql_%s_%s_%s_cluster_%s", tagMap["username"],
tagMap["sql_type"], tagMap["result"], tagMap["cluster_id"])
}
case "taos_slow_sql":
if checkKeysExist(tagMap, "username", "duration", "result", "cluster_id") {
return fmt.Sprintf("slowsql_%s_%s_%s_cluster_%s", tagMap["username"],
tagMap["duration"], tagMap["result"], tagMap["cluster_id"])
}

default:
return ""
}
return ""
}

func contains(array []string, item string) bool {
Expand Down
101 changes: 101 additions & 0 deletions api/gen_metric_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,3 +254,104 @@ func TestGenMetric(t *testing.T) {
}
})
}
func TestGetSubTableName(t *testing.T) {
tests := []struct {
stbName string
tagMap map[string]string
want string
}{
{
stbName: "taosx_sys",
tagMap: map[string]string{"taosx_id": "123"},
want: "sys_123",
},
{
stbName: "taosx_agent",
tagMap: map[string]string{"taosx_id": "123", "agent_id": "456"},
want: "agent_123_456",
},
{
stbName: "taosx_connector",
tagMap: map[string]string{"taosx_id": "123", "ds_name": "ds", "task_id": "789"},
want: "connector_123_ds_789",
},
{
stbName: "taosx_task_example",
tagMap: map[string]string{"taosx_id": "123", "task_id": "789"},
want: "task_123_example_789",
},
{
stbName: "taosd_cluster_info",
tagMap: map[string]string{"cluster_id": "123"},
want: "cluster_123",
},
{
stbName: "taosd_vgroups_info",
tagMap: map[string]string{"cluster_id": "123", "vgroup_id": "456", "database_name": "db"},
want: "vginfo_db_vgroup_456_cluster_123",
},
{
stbName: "taosd_dnodes_info",
tagMap: map[string]string{"cluster_id": "123", "dnode_id": "123"},
want: "dinfo_123_cluster_123",
},
{
stbName: "taosd_dnodes_status",
tagMap: map[string]string{"cluster_id": "123", "dnode_id": "123"},
want: "dstatus_123_cluster_123",
},
{
stbName: "taosd_dnodes_log_dirs",
tagMap: map[string]string{"cluster_id": "123", "dnode_id": "123", "data_dir_name": "log"},
want: "dlog_123_log_cluster_123",
},
{
stbName: "taosd_dnodes_log_dirs",
tagMap: map[string]string{"cluster_id": "123", "dnode_id": "123", "data_dir_name": "loglogloglogloglogloglogloglogloglogloglogloglogloglogloglogloglogloglogloglogloglogloglogloglogloglogloglogloglogloglogloglogloglogloglogloglogloglogloglogloglogloglogloglogloglogloglogloglogloglogloglogloglogloglogloglogloglogloglogloglogloglogloglogloglogloglogloglogloglogloglogloglogloglogloglogloglogloglogloglogloglogloglogloglogloglogloglogloglogloglogloglogloglogloglogloglogloglogloglogloglogloglogloglog"},
want: "dlog_123_9cdc719961a632a27603cd5ed9f1aee2_cluster_123",
},
{
stbName: "taosd_dnodes_data_dirs",
tagMap: map[string]string{"cluster_id": "123", "dnode_id": "123", "data_dir_name": "data", "data_dir_level": "5"},
want: "ddata_123_data_level_5_cluster_123",
},
{
stbName: "taosd_dnodes_data_dirs",
tagMap: map[string]string{"cluster_id": "123", "dnode_id": "123", "data_dir_name": "datadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadata", "data_dir_level": "5"},
want: "ddata_123_03bf8dffdf6b97e08f347c6ae795998b_level_5_cluster_123",
},
{
stbName: "taosd_mnodes_info",
tagMap: map[string]string{"cluster_id": "123", "mnode_id": "12"},
want: "minfo_12_cluster_123",
},
{
stbName: "taosd_vnodes_info",
tagMap: map[string]string{"cluster_id": "123", "database_name": "db", "vgroup_id": "456", "dnode_id": "789"},
want: "vninfo_db_dnode_789_vgroup_456_cluster_123",
},
{
stbName: "taosd_sql_req",
tagMap: map[string]string{"username": "user", "sql_type": "select", "result": "success", "dnode_id": "123", "vgroup_id": "456", "cluster_id": "123"},
want: "taosdsql_user_select_success_123_vgroup_456_cluster_123",
},
{
stbName: "taos_sql_req",
tagMap: map[string]string{"username": "user", "sql_type": "select", "result": "success", "cluster_id": "123"},
want: "taossql_user_select_success_cluster_123",
},
{
stbName: "taos_slow_sql",
tagMap: map[string]string{"username": "user", "duration": "100ms", "result": "success", "cluster_id": "123"},
want: "slowsql_user_100ms_success_cluster_123",
},
}

for _, tt := range tests {
t.Run(tt.stbName, func(t *testing.T) {
if got := get_sub_table_name_valid(tt.stbName, tt.tagMap); got != tt.want {
panic(fmt.Sprintf("get_sub_table_name() = %v, want %v", got, tt.want))
}
})
}
}
14 changes: 9 additions & 5 deletions monitor/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ package monitor

import (
"context"
"crypto/md5"
"encoding/hex"
"fmt"
"os"
"time"
Expand Down Expand Up @@ -56,9 +54,15 @@ func StartMonitor(identity string, conf *config.Config, reporter *api.Reporter)
reporter.GetTotalRep().Store(0)
}

kn := md5.Sum([]byte(identity))
sql := fmt.Sprintf("insert into `keeper_monitor_%s` using keeper_monitor tags ('%s') values ( now, "+
" %f, %f, %d)", hex.EncodeToString(kn[:]), identity, cpuPercent, memPercent, totalReport)
var kn string
if len(identity) <= util.MAX_TABLE_NAME_LEN {
kn = util.ToValidTableName(identity)
} else {
kn = util.GetMd5HexStr(identity)
}

sql := fmt.Sprintf("insert into `km_%s` using keeper_monitor tags ('%s') values ( now, "+
" %f, %f, %d)", kn, identity, cpuPercent, memPercent, totalReport)
conn, err := db.NewConnectorWithDb(conf.TDengine.Username, conf.TDengine.Password, conf.TDengine.Host,
conf.TDengine.Port, conf.Metrics.Database.Name, conf.TDengine.Usessl)
if err != nil {
Expand Down
29 changes: 29 additions & 0 deletions util/util.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
package util

import (
"crypto/md5"
"encoding/hex"
"os"
"strconv"
"strings"
"sync/atomic"
"time"
"unicode"

"github.com/taosdata/taoskeeper/infrastructure/config"
)
Expand All @@ -14,6 +17,8 @@ import (
var globalCounter64 uint64
var globalCounter32 uint32

var MAX_TABLE_NAME_LEN = 190

func init() {
atomic.StoreUint64(&globalCounter64, 0)
atomic.StoreUint32(&globalCounter32, 0)
Expand Down Expand Up @@ -123,3 +128,27 @@ func GetQidOwn() uint64 {
qid64 := uint64(config.Conf.InstanceID)<<56 | id
return qid64
}

func GetMd5HexStr(str string) string {
sum := md5.Sum([]byte(str))
return hex.EncodeToString(sum[:])
}

func isValidChar(r rune) bool {
return unicode.IsLetter(r) || unicode.IsDigit(r) || r == '_'
}

func ToValidTableName(input string) string {
var builder strings.Builder

for _, r := range input {
if isValidChar(r) {
builder.WriteRune(unicode.ToLower(r))
} else {
builder.WriteRune('_')
}
}

result := builder.String()
return result
}

0 comments on commit b701f00

Please sign in to comment.