diff --git a/api/adapter2.go b/api/adapter2.go index bd69b95..467b86e 100644 --- a/api/adapter2.go +++ b/api/adapter2.go @@ -138,8 +138,19 @@ func (a *Adapter) parseSql(report AdapterReport) string { } func (a *Adapter) tableName(endpoint string, reqType adapterReqType) string { - sum := md5.Sum([]byte(fmt.Sprintf("%s%d", endpoint, reqType))) - return fmt.Sprintf("t_%s", hex.EncodeToString(sum[:])) + var tbname string + if reqType == rest { + tbname = fmt.Sprintf("adapter_req_%s_%s", endpoint, "rest") + } else { + tbname = fmt.Sprintf("adapter_req_%s_%s", endpoint, "ws") + } + + if len(tbname) <= util.MAX_TABLE_NAME_LEN { + return util.ToValidTableName(tbname) + } else { + sum := md5.Sum([]byte(fmt.Sprintf("%s%d", endpoint, reqType))) + return fmt.Sprintf("adapter_req_%s", hex.EncodeToString(sum[:])) + } } func (a *Adapter) createDatabase() error { diff --git a/api/gen_metric.go b/api/gen_metric.go index 5814a1c..5534fe4 100644 --- a/api/gen_metric.go +++ b/api/gen_metric.go @@ -30,6 +30,8 @@ var gmLogger = log.GetLogger("GEN") var MAX_SQL_LEN = 1000000 +var STABLE_NAME_KEY = "priv_stn" + type ColumnSeq struct { tagNames []string metricNames []string @@ -168,7 +170,7 @@ func NewGeneralMetric(conf *config.Config) *GeneralMetric { Scheme: protocol, Host: fmt.Sprintf("%s:%d", conf.TDengine.Host, conf.TDengine.Port), Path: "/influxdb/v1/write", - RawQuery: fmt.Sprintf("db=%s&precision=ms", conf.Metrics.Database.Name), + RawQuery: fmt.Sprintf("db=%s&precision=ms&table_name_key=%s", conf.Metrics.Database.Name, STABLE_NAME_KEY), }, } return imp @@ -499,6 +501,127 @@ func writeTags(tags []Tag, stbName string, buf *bytes.Buffer) { buf.WriteString(fmt.Sprintf(",%s=%s", name, "unknown")) } } + + // have sub table name + if _, ok := tagMap[STABLE_NAME_KEY]; ok { + return + } + + subTableName := get_sub_table_name_valid(stbName, tagMap) + if subTableName != "" { + buf.WriteString(fmt.Sprintf(",%s=%s", STABLE_NAME_KEY, subTableName)) + } else { + gmLogger.Errorf("get sub stable name error, stable name:%s, tag map:%v", stbName, tagMap) + } +} + +func checkKeysExist(data map[string]string, keys ...string) bool { + for _, key := range keys { + _, ok := data[key] + if !ok { + return false + } + } + return true +} + +func get_sub_table_name_valid(stbName string, tagMap map[string]string) string { + stbName = get_sub_table_name(stbName, tagMap) + return util.ToValidTableName(stbName) +} + +func get_sub_table_name(stbName string, tagMap map[string]string) string { + if strings.HasPrefix(stbName, "taosx") { + switch stbName { + case "taosx_sys": + if checkKeysExist(tagMap, "taosx_id") { + return fmt.Sprintf("sys_%s", tagMap["taosx_id"]) + } + case "taosx_agent": + if checkKeysExist(tagMap, "taosx_id", "agent_id") { + return fmt.Sprintf("agent_%s_%s", tagMap["taosx_id"], tagMap["agent_id"]) + } + case "taosx_connector": + if checkKeysExist(tagMap, "taosx_id", "ds_name", "task_id") { + return fmt.Sprintf("connector_%s_%s_%s", tagMap["taosx_id"], tagMap["ds_name"], tagMap["task_id"]) + } + default: + if strings.HasPrefix(stbName, "taosx_task_") { + ds_name := stbName[len("taosx_task_"):] + if checkKeysExist(tagMap, "taosx_id", "task_id") { + return fmt.Sprintf("task_%s_%s_%s", tagMap["taosx_id"], ds_name, tagMap["task_id"]) + } + } + return "" + } + } + + switch stbName { + case "taosd_cluster_info": + if checkKeysExist(tagMap, "cluster_id") { + return fmt.Sprintf("cluster_%s", tagMap["cluster_id"]) + } + case "taosd_vgroups_info": + if checkKeysExist(tagMap, "cluster_id", "vgroup_id", "database_name") { + return fmt.Sprintf("vginfo_%s_vgroup_%s_cluster_%s", tagMap["database_name"], tagMap["vgroup_id"], tagMap["cluster_id"]) + } + case "taosd_dnodes_info": + if checkKeysExist(tagMap, "cluster_id", "dnode_id") { + return fmt.Sprintf("dinfo_%s_cluster_%s", tagMap["dnode_id"], tagMap["cluster_id"]) + } + case "taosd_dnodes_status": + if checkKeysExist(tagMap, "cluster_id", "dnode_id") { + return fmt.Sprintf("dstatus_%s_cluster_%s", tagMap["dnode_id"], tagMap["cluster_id"]) + } + case "taosd_dnodes_log_dirs": + if checkKeysExist(tagMap, "cluster_id", "dnode_id", "data_dir_name") { + subTableName := fmt.Sprintf("dlog_%s_%s_cluster_%s", tagMap["dnode_id"], tagMap["data_dir_name"], tagMap["cluster_id"]) + if len(subTableName) <= util.MAX_TABLE_NAME_LEN { + return subTableName + } + return fmt.Sprintf("dlog_%s_%s_cluster_%s", tagMap["dnode_id"], + util.GetMd5HexStr(tagMap["data_dir_name"]), + tagMap["cluster_id"]) + } + case "taosd_dnodes_data_dirs": + if checkKeysExist(tagMap, "cluster_id", "dnode_id", "data_dir_name", "data_dir_level") { + subTableName := fmt.Sprintf("ddata_%s_%s_level_%s_cluster_%s", tagMap["dnode_id"], tagMap["data_dir_name"], tagMap["data_dir_level"], tagMap["cluster_id"]) + if len(subTableName) <= util.MAX_TABLE_NAME_LEN { + return subTableName + } + return fmt.Sprintf("ddata_%s_%s_level_%s_cluster_%s", tagMap["dnode_id"], + util.GetMd5HexStr(tagMap["data_dir_name"]), + tagMap["data_dir_level"], + tagMap["cluster_id"]) + } + case "taosd_mnodes_info": + if checkKeysExist(tagMap, "cluster_id", "mnode_id") { + return fmt.Sprintf("minfo_%s_cluster_%s", tagMap["mnode_id"], tagMap["cluster_id"]) + } + case "taosd_vnodes_info": + if checkKeysExist(tagMap, "cluster_id", "database_name", "vgroup_id", "dnode_id") { + return fmt.Sprintf("vninfo_%s_dnode_%s_vgroup_%s_cluster_%s", tagMap["database_name"], tagMap["dnode_id"], tagMap["vgroup_id"], tagMap["cluster_id"]) + } + case "taosd_sql_req": + if checkKeysExist(tagMap, "username", "sql_type", "result", "dnode_id", "vgroup_id", "cluster_id") { + return fmt.Sprintf("taosdsql_%s_%s_%s_%s_vgroup_%s_cluster_%s", tagMap["username"], + tagMap["sql_type"], tagMap["result"], tagMap["dnode_id"], tagMap["vgroup_id"], tagMap["cluster_id"]) + } + case "taos_sql_req": + if checkKeysExist(tagMap, "username", "sql_type", "result", "cluster_id") { + return fmt.Sprintf("taossql_%s_%s_%s_cluster_%s", tagMap["username"], + tagMap["sql_type"], tagMap["result"], tagMap["cluster_id"]) + } + case "taos_slow_sql": + if checkKeysExist(tagMap, "username", "duration", "result", "cluster_id") { + return fmt.Sprintf("slowsql_%s_%s_%s_cluster_%s", tagMap["username"], + tagMap["duration"], tagMap["result"], tagMap["cluster_id"]) + } + + default: + return "" + } + return "" } func contains(array []string, item string) bool { diff --git a/api/gen_metric_test.go b/api/gen_metric_test.go index 2bda965..4ccfa0c 100644 --- a/api/gen_metric_test.go +++ b/api/gen_metric_test.go @@ -254,3 +254,104 @@ func TestGenMetric(t *testing.T) { } }) } +func TestGetSubTableName(t *testing.T) { + tests := []struct { + stbName string + tagMap map[string]string + want string + }{ + { + stbName: "taosx_sys", + tagMap: map[string]string{"taosx_id": "123"}, + want: "sys_123", + }, + { + stbName: "taosx_agent", + tagMap: map[string]string{"taosx_id": "123", "agent_id": "456"}, + want: "agent_123_456", + }, + { + stbName: "taosx_connector", + tagMap: map[string]string{"taosx_id": "123", "ds_name": "ds", "task_id": "789"}, + want: "connector_123_ds_789", + }, + { + stbName: "taosx_task_example", + tagMap: map[string]string{"taosx_id": "123", "task_id": "789"}, + want: "task_123_example_789", + }, + { + stbName: "taosd_cluster_info", + tagMap: map[string]string{"cluster_id": "123"}, + want: "cluster_123", + }, + { + stbName: "taosd_vgroups_info", + tagMap: map[string]string{"cluster_id": "123", "vgroup_id": "456", "database_name": "db"}, + want: "vginfo_db_vgroup_456_cluster_123", + }, + { + stbName: "taosd_dnodes_info", + tagMap: map[string]string{"cluster_id": "123", "dnode_id": "123"}, + want: "dinfo_123_cluster_123", + }, + { + stbName: "taosd_dnodes_status", + tagMap: map[string]string{"cluster_id": "123", "dnode_id": "123"}, + want: "dstatus_123_cluster_123", + }, + { + stbName: "taosd_dnodes_log_dirs", + tagMap: map[string]string{"cluster_id": "123", "dnode_id": "123", "data_dir_name": "log"}, + want: "dlog_123_log_cluster_123", + }, + { + stbName: "taosd_dnodes_log_dirs", + tagMap: map[string]string{"cluster_id": "123", "dnode_id": "123", "data_dir_name": "loglogloglogloglogloglogloglogloglogloglogloglogloglogloglogloglogloglogloglogloglogloglogloglogloglogloglogloglogloglogloglogloglogloglogloglogloglogloglogloglogloglogloglogloglogloglogloglogloglogloglogloglogloglogloglogloglogloglogloglogloglogloglogloglogloglogloglogloglogloglogloglogloglogloglogloglogloglogloglogloglogloglogloglogloglogloglogloglogloglogloglogloglogloglogloglogloglogloglogloglogloglogloglog"}, + want: "dlog_123_9cdc719961a632a27603cd5ed9f1aee2_cluster_123", + }, + { + stbName: "taosd_dnodes_data_dirs", + tagMap: map[string]string{"cluster_id": "123", "dnode_id": "123", "data_dir_name": "data", "data_dir_level": "5"}, + want: "ddata_123_data_level_5_cluster_123", + }, + { + stbName: "taosd_dnodes_data_dirs", + tagMap: map[string]string{"cluster_id": "123", "dnode_id": "123", "data_dir_name": "datadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadatadata", "data_dir_level": "5"}, + want: "ddata_123_03bf8dffdf6b97e08f347c6ae795998b_level_5_cluster_123", + }, + { + stbName: "taosd_mnodes_info", + tagMap: map[string]string{"cluster_id": "123", "mnode_id": "12"}, + want: "minfo_12_cluster_123", + }, + { + stbName: "taosd_vnodes_info", + tagMap: map[string]string{"cluster_id": "123", "database_name": "db", "vgroup_id": "456", "dnode_id": "789"}, + want: "vninfo_db_dnode_789_vgroup_456_cluster_123", + }, + { + stbName: "taosd_sql_req", + tagMap: map[string]string{"username": "user", "sql_type": "select", "result": "success", "dnode_id": "123", "vgroup_id": "456", "cluster_id": "123"}, + want: "taosdsql_user_select_success_123_vgroup_456_cluster_123", + }, + { + stbName: "taos_sql_req", + tagMap: map[string]string{"username": "user", "sql_type": "select", "result": "success", "cluster_id": "123"}, + want: "taossql_user_select_success_cluster_123", + }, + { + stbName: "taos_slow_sql", + tagMap: map[string]string{"username": "user", "duration": "100ms", "result": "success", "cluster_id": "123"}, + want: "slowsql_user_100ms_success_cluster_123", + }, + } + + for _, tt := range tests { + t.Run(tt.stbName, func(t *testing.T) { + if got := get_sub_table_name_valid(tt.stbName, tt.tagMap); got != tt.want { + panic(fmt.Sprintf("get_sub_table_name() = %v, want %v", got, tt.want)) + } + }) + } +} diff --git a/monitor/monitor.go b/monitor/monitor.go index e485d69..6f3083e 100644 --- a/monitor/monitor.go +++ b/monitor/monitor.go @@ -2,8 +2,6 @@ package monitor import ( "context" - "crypto/md5" - "encoding/hex" "fmt" "os" "time" @@ -56,9 +54,15 @@ func StartMonitor(identity string, conf *config.Config, reporter *api.Reporter) reporter.GetTotalRep().Store(0) } - kn := md5.Sum([]byte(identity)) - sql := fmt.Sprintf("insert into `keeper_monitor_%s` using keeper_monitor tags ('%s') values ( now, "+ - " %f, %f, %d)", hex.EncodeToString(kn[:]), identity, cpuPercent, memPercent, totalReport) + var kn string + if len(identity) <= util.MAX_TABLE_NAME_LEN { + kn = util.ToValidTableName(identity) + } else { + kn = util.GetMd5HexStr(identity) + } + + sql := fmt.Sprintf("insert into `km_%s` using keeper_monitor tags ('%s') values ( now, "+ + " %f, %f, %d)", kn, identity, cpuPercent, memPercent, totalReport) conn, err := db.NewConnectorWithDb(conf.TDengine.Username, conf.TDengine.Password, conf.TDengine.Host, conf.TDengine.Port, conf.Metrics.Database.Name, conf.TDengine.Usessl) if err != nil { diff --git a/util/util.go b/util/util.go index bea25ed..a739c23 100644 --- a/util/util.go +++ b/util/util.go @@ -1,11 +1,14 @@ package util import ( + "crypto/md5" + "encoding/hex" "os" "strconv" "strings" "sync/atomic" "time" + "unicode" "github.com/taosdata/taoskeeper/infrastructure/config" ) @@ -14,6 +17,8 @@ import ( var globalCounter64 uint64 var globalCounter32 uint32 +var MAX_TABLE_NAME_LEN = 190 + func init() { atomic.StoreUint64(&globalCounter64, 0) atomic.StoreUint32(&globalCounter32, 0) @@ -123,3 +128,27 @@ func GetQidOwn() uint64 { qid64 := uint64(config.Conf.InstanceID)<<56 | id return qid64 } + +func GetMd5HexStr(str string) string { + sum := md5.Sum([]byte(str)) + return hex.EncodeToString(sum[:]) +} + +func isValidChar(r rune) bool { + return unicode.IsLetter(r) || unicode.IsDigit(r) || r == '_' +} + +func ToValidTableName(input string) string { + var builder strings.Builder + + for _, r := range input { + if isValidChar(r) { + builder.WriteRune(unicode.ToLower(r)) + } else { + builder.WriteRune('_') + } + } + + result := builder.String() + return result +}