Skip to content

Commit

Permalink
Merge pull request #137 from taosdata/feat/TD-31615
Browse files Browse the repository at this point in the history
use uniformated log
  • Loading branch information
huskar-t authored Sep 9, 2024
2 parents 97c284f + 456ba72 commit 4b36a8d
Show file tree
Hide file tree
Showing 25 changed files with 523 additions and 265 deletions.
48 changes: 32 additions & 16 deletions api/adapter2.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,14 @@ import (
"time"

"github.com/gin-gonic/gin"
"github.com/sirupsen/logrus"
"github.com/taosdata/taoskeeper/db"
"github.com/taosdata/taoskeeper/infrastructure/config"
"github.com/taosdata/taoskeeper/infrastructure/log"
"github.com/taosdata/taoskeeper/util"
)

var adapterLog = log.GetLogger("adapter")
var adapterLog = log.GetLogger("ADP")

type adapterReqType int

Expand Down Expand Up @@ -51,20 +53,26 @@ func NewAdapter(c *config.Config) *Adapter {

func (a *Adapter) Init(c gin.IRouter) error {
if err := a.createDatabase(); err != nil {
return fmt.Errorf("create database error: %s", err)
return fmt.Errorf("create database error:%s", err)
}
if err := a.initConnect(); err != nil {
return fmt.Errorf("init db connect error: %s", err)
return fmt.Errorf("init db connect error:%s", err)
}
if err := a.createTable(); err != nil {
return fmt.Errorf("create table error: %s", err)
return fmt.Errorf("create table error:%s", err)
}
c.POST("/adapter_report", a.handleFunc())
return nil
}

func (a *Adapter) handleFunc() gin.HandlerFunc {
return func(c *gin.Context) {
qid := util.GetQid(c.GetHeader("X-QID"))

adapterLog := adapterLog.WithFields(
logrus.Fields{config.ReqIDKey: qid},
)

if a.conn == nil {
adapterLog.Error("no connection")
c.JSON(http.StatusInternalServerError, gin.H{"error": "no connection"})
Expand All @@ -73,23 +81,25 @@ func (a *Adapter) handleFunc() gin.HandlerFunc {

data, err := c.GetRawData()
if err != nil {
adapterLog.WithError(err).Errorf("## get adapter report data error")
adapterLog.Errorf("get adapter report data error, msg:%s", err)
c.JSON(http.StatusBadRequest, gin.H{"error": fmt.Sprintf("get adapter report data error. %s", err)})
return
}
adapterLog.Trace("## receive adapter report data", "data", string(data))
if adapterLog.Logger.IsLevelEnabled(logrus.TraceLevel) {
adapterLog.Tracef("received adapter report data:%s", string(data))
}

var report AdapterReport
if err = json.Unmarshal(data, &report); err != nil {
adapterLog.WithError(err).Errorf("## parse adapter report data %s error", string(data))
adapterLog.Errorf("parse adapter report data error, data:%s, error:%s", string(data), err)
c.JSON(http.StatusBadRequest, gin.H{"error": fmt.Sprintf("parse adapter report data error: %s", err)})
return
}
sql := a.parseSql(report)
adapterLog.Debug("## adapter report", "sql", sql)
adapterLog.Debugf("adapter report sql:%s", sql)

if _, err = a.conn.Exec(context.Background(), sql); err != nil {
adapterLog.Error("## adapter report error", "error", err)
if _, err = a.conn.Exec(context.Background(), sql, qid); err != nil {
adapterLog.Errorf("adapter report error, msg:%s", err)
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
return
}
Expand All @@ -100,7 +110,7 @@ func (a *Adapter) handleFunc() gin.HandlerFunc {
func (a *Adapter) initConnect() error {
conn, err := db.NewConnectorWithDb(a.username, a.password, a.host, a.port, a.db, a.usessl)
if err != nil {
adapterLog.Error("## init db connect error", "error", err)
adapterLog.Dup().Errorf("init db connect error, msg:%s", err)
return err
}
a.conn = conn
Expand Down Expand Up @@ -133,16 +143,22 @@ func (a *Adapter) tableName(endpoint string, reqType adapterReqType) string {
}

func (a *Adapter) createDatabase() error {
qid := util.GetQidOwn()

adapterLog := adapterLog.WithFields(
logrus.Fields{config.ReqIDKey: qid},
)

conn, err := db.NewConnector(a.username, a.password, a.host, a.port, a.usessl)
if err != nil {
return fmt.Errorf("connect to database error: %s", err)
return fmt.Errorf("connect to database error, msg:%s", err)
}
defer func() { _ = conn.Close() }()
sql := a.createDBSql()
adapterLog.Info("## create database", "sql", sql)
_, err = conn.Exec(context.Background(), sql)
adapterLog.Infof("create database, sql:%s", sql)
_, err = conn.Exec(context.Background(), sql, util.GetQidOwn())
if err != nil {
adapterLog.Error("## create database error", "error", err)
adapterLog.Errorf("create database error, msg:%s", err)
return err
}

Expand Down Expand Up @@ -192,7 +208,7 @@ func (a *Adapter) createTable() error {
if a.conn == nil {
return errNoConnection
}
_, err := a.conn.Exec(context.Background(), adapterTableSql)
_, err := a.conn.Exec(context.Background(), adapterTableSql, util.GetQidOwn())
return err
}

Expand Down
13 changes: 8 additions & 5 deletions api/adapter2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,13 @@ import (
"github.com/stretchr/testify/assert"
"github.com/taosdata/taoskeeper/db"
"github.com/taosdata/taoskeeper/infrastructure/config"
"github.com/taosdata/taoskeeper/util"
)

func TestAdapter2(t *testing.T) {
c := &config.Config{
Port: 6043,
InstanceID: 64,
Port: 6043,
TDengine: config.TDengineRestful{
Host: "127.0.0.1",
Port: 6041,
Expand Down Expand Up @@ -43,16 +45,17 @@ func TestAdapter2(t *testing.T) {
"\"ws_other_success\": 1, \"ws_other_fail\": 2, \"ws_query_in_process\": 1, \"ws_write_in_process\": 2 }, " +
"\"endpoint\": \"adapter-1:6041\"}")
req, _ := http.NewRequest(http.MethodPost, "/adapter_report", body)
req.Header.Set("X-QID", "0x1234567890ABCD00")
router.ServeHTTP(w, req)
assert.Equal(t, 200, w.Code)

conn, err := db.NewConnectorWithDb(c.TDengine.Username, c.TDengine.Password, c.TDengine.Host, c.TDengine.Port, c.Metrics.Database.Name, c.TDengine.Usessl)
defer func() {
_, _ = conn.Query(context.Background(), "drop database if exists adapter_report_test")
_, _ = conn.Query(context.Background(), "drop database if exists adapter_report_test", util.GetQidOwn())
}()

assert.NoError(t, err)
data, err := conn.Query(context.Background(), "select * from adapter_report_test.adapter_requests where req_type=0")
data, err := conn.Query(context.Background(), "select * from adapter_report_test.adapter_requests where req_type=0", util.GetQidOwn())

assert.NoError(t, err)
assert.Equal(t, 1, len(data.Data))
Expand All @@ -72,7 +75,7 @@ func TestAdapter2(t *testing.T) {
assert.Equal(t, uint32(1), data.Data[0][14])
assert.Equal(t, uint32(2), data.Data[0][15])

data, err = conn.Query(context.Background(), "select * from adapter_report_test.adapter_requests where req_type=1")
data, err = conn.Query(context.Background(), "select * from adapter_report_test.adapter_requests where req_type=1", util.GetQidOwn())
assert.NoError(t, err)
assert.Equal(t, 1, len(data.Data))
assert.Equal(t, uint32(10), data.Data[0][1])
Expand All @@ -91,5 +94,5 @@ func TestAdapter2(t *testing.T) {
assert.Equal(t, uint32(1), data.Data[0][14])
assert.Equal(t, uint32(2), data.Data[0][15])

conn.Exec(context.Background(), "drop database "+c.Metrics.Database.Name)
conn.Exec(context.Background(), "drop database "+c.Metrics.Database.Name, util.GetQidOwn())
}
25 changes: 17 additions & 8 deletions api/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,27 @@ import (
"fmt"
"time"

"github.com/sirupsen/logrus"
"github.com/taosdata/taoskeeper/db"
"github.com/taosdata/taoskeeper/infrastructure/config"
"github.com/taosdata/taoskeeper/infrastructure/log"
"github.com/taosdata/taoskeeper/util"
)

var commonLogger = log.GetLogger("common")
var commonLogger = log.GetLogger("CMN")

func CreateDatabase(username string, password string, host string, port int, usessl bool, dbname string, databaseOptions map[string]interface{}) {
qid := util.GetQidOwn()

commonLogger := commonLogger.WithFields(
logrus.Fields{config.ReqIDKey: qid},
)

ctx := context.Background()

conn, err := db.NewConnector(username, password, host, port, usessl)
if err != nil {
commonLogger.WithError(err).Errorf("connect to adapter error")
commonLogger.Errorf("connect to adapter error, msg:%s", err)
return
}

Expand All @@ -27,8 +36,8 @@ func CreateDatabase(username string, password string, host string, port int, use
commonLogger.Warningf("create database sql: %s", createDBSql)

for i := 0; i < 3; i++ {
if _, err := conn.Exec(ctx, createDBSql); err != nil {
commonLogger.WithError(err).Errorf("try %v times: create database %s error %v", i+1, dbname, err)
if _, err := conn.Exec(ctx, createDBSql, util.GetQidOwn()); err != nil {
commonLogger.Errorf("try %v times: create database %s error, msg:%v", i+1, dbname, err)
time.Sleep(5 * time.Second)
continue
}
Expand Down Expand Up @@ -60,21 +69,21 @@ func CreatTables(username string, password string, host string, port int, usessl
ctx := context.Background()
conn, err := db.NewConnectorWithDb(username, password, host, port, dbname, usessl)
if err != nil {
commonLogger.WithError(err).Errorf("connect to database error")
commonLogger.Errorf("connect to database error, msg:%s", err)
return
}
defer closeConn(conn)

for _, createSql := range createList {
commonLogger.Infof("execute sql: %s", createSql)
if _, err = conn.Exec(ctx, createSql); err != nil {
commonLogger.Infof("execute sql:%s", createSql)
if _, err = conn.Exec(ctx, createSql, util.GetQidOwn()); err != nil {
commonLogger.Errorf("execute sql: %s, error: %s", createSql, err)
}
}
}

func closeConn(conn *db.Connector) {
if err := conn.Close(); err != nil {
commonLogger.WithError(err).Errorf("close connection error")
commonLogger.Errorf("close connection error, msg:%s", err)
}
}
23 changes: 12 additions & 11 deletions api/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/taosdata/taoskeeper/infrastructure/config"
"github.com/taosdata/taoskeeper/infrastructure/log"
"github.com/taosdata/taoskeeper/process"
"github.com/taosdata/taoskeeper/util"
)

var router *gin.Engine
Expand All @@ -35,9 +36,9 @@ func TestMain(m *testing.M) {
}
defer conn.Close()
ctx := context.Background()
conn.Query(context.Background(), fmt.Sprintf("drop database if exists %s", conf.Metrics.Database.Name))
conn.Query(context.Background(), fmt.Sprintf("drop database if exists %s", conf.Metrics.Database.Name), util.GetQidOwn())

if _, err = conn.Exec(ctx, fmt.Sprintf("create database if not exists %s", dbName)); err != nil {
if _, err = conn.Exec(ctx, fmt.Sprintf("create database if not exists %s", dbName), util.GetQidOwn()); err != nil {
logger.Errorf("execute sql: %s, error: %s", fmt.Sprintf("create database %s", dbName), err)
}
gin.SetMode(gin.ReleaseMode)
Expand Down Expand Up @@ -65,7 +66,7 @@ func TestMain(m *testing.M) {
node := NewNodeExporter(processor)
node.Init(router)
m.Run()
if _, err = conn.Exec(ctx, fmt.Sprintf("drop database if exists %s", dbName)); err != nil {
if _, err = conn.Exec(ctx, fmt.Sprintf("drop database if exists %s", dbName), util.GetQidOwn()); err != nil {
logger.Errorf("execute sql: %s, error: %s", fmt.Sprintf("drop database %s", dbName), err)
}
}
Expand Down Expand Up @@ -227,18 +228,18 @@ func TestPutMetrics(t *testing.T) {
conn, err := db.NewConnectorWithDb(conf.TDengine.Username, conf.TDengine.Password, conf.TDengine.Host,
conf.TDengine.Port, dbName, conf.TDengine.Usessl)
if err != nil {
logger.WithError(err).Errorf("connect to database error")
logger.Errorf("connect to database error, msg:%s", err)
return
}

defer func() {
_, _ = conn.Query(context.Background(), fmt.Sprintf("drop database if exists %s", conf.Metrics.Database.Name))
_, _ = conn.Query(context.Background(), fmt.Sprintf("drop database if exists %s", conf.Metrics.Database.Name), util.GetQidOwn())
}()

ctx := context.Background()
data, err := conn.Query(ctx, "select info from log_summary")
data, err := conn.Query(ctx, "select info from log_summary", util.GetQidOwn())
if err != nil {
logger.Errorf("execute sql: %s, error: %s", "select * from log_summary", err)
logger.Errorf("execute sql:%s, error:%s", "select * from log_summary", err)
t.Fatal(err)
}
for _, info := range data.Data {
Expand Down Expand Up @@ -272,9 +273,9 @@ func TestPutMetrics(t *testing.T) {
}

for table, tableInfo := range tables {
data, err = conn.Query(ctx, fmt.Sprintf("select %s from %s", tableInfo.TsName, table))
data, err = conn.Query(ctx, fmt.Sprintf("select %s from %s", tableInfo.TsName, table), util.GetQidOwn())
if err != nil {
logger.Errorf("execute sql: %s, error: %s", "select * from "+table, err)
logger.Errorf("execute sql:%s, error:%s", "select * from "+table, err)
t.Fatal(err)
}

Expand All @@ -286,9 +287,9 @@ func TestPutMetrics(t *testing.T) {
conf.Drop = "old_taosd_metric_stables"
cmd.Process(conf)

data, err = conn.Query(ctx, "select * from information_schema.ins_stables where stable_name = 'm_info'")
data, err = conn.Query(ctx, "select * from information_schema.ins_stables where stable_name = 'm_info'", util.GetQidOwn())
if err != nil {
logger.Errorf("execute sql: %s, error: %s", "m_info is not droped", err)
logger.Errorf("execute sql:%s, error:%s", "m_info is not droped", err)
t.Fatal(err)
}
assert.Equal(t, 0, len(data.Data))
Expand Down
Loading

0 comments on commit 4b36a8d

Please sign in to comment.