From 18ff89aff1802e446f752b38cfc5f23fc60e1a99 Mon Sep 17 00:00:00 2001 From: t_max <1172915550@qq.com> Date: Thu, 21 Mar 2024 16:55:19 +0800 Subject: [PATCH] test: add test case --- af/conn.go | 11 +- af/conn_test.go | 48 +++ af/insertstmt/stmt.go | 32 +- af/insertstmt/stmt_test.go | 113 ++++++ af/rows.go | 22 +- af/stmt.go | 80 ++-- af/stmt_test.go | 55 +++ af/tmq/consumer.go | 39 +- af/tmq/consumer_test.go | 86 ++++ common/datatype.go | 10 - common/param/column_test.go | 435 ++++++++++++++++++++ common/param/param_test.go | 654 +++++++++++++++++++++++++++++++ common/pointer/unsafe_test.go | 18 + common/stmt/field_test.go | 143 +++++++ common/tmq/config_test.go | 52 +++ common/tmq/event_test.go | 352 +++++++++++++++++ common/tmq/tmq_test.go | 129 ++++++ errors/errors_test.go | 16 +- ws/client/conn_test.go | 98 +++++ ws/schemaless/schemaless_test.go | 4 +- ws/tmq/config.go | 116 ++---- ws/tmq/consumer.go | 50 +-- ws/tmq/consumer_test.go | 273 +++++++++++++ 23 files changed, 2605 insertions(+), 231 deletions(-) create mode 100644 af/insertstmt/stmt_test.go create mode 100644 af/stmt_test.go create mode 100644 common/param/column_test.go create mode 100644 common/param/param_test.go create mode 100644 common/pointer/unsafe_test.go create mode 100644 common/stmt/field_test.go create mode 100644 common/tmq/config_test.go create mode 100644 common/tmq/event_test.go create mode 100644 ws/client/conn_test.go diff --git a/af/conn.go b/af/conn.go index 869c7c3..77a253f 100644 --- a/af/conn.go +++ b/af/conn.go @@ -58,8 +58,7 @@ func (conn *Connector) Close() error { func (conn *Connector) StmtExecute(sql string, params *param.Param) (res driver.Result, err error) { stmt := NewStmt(conn.taos) if stmt == nil { - err = &errors.TaosError{Code: 0xffff, ErrStr: "failed to init stmt"} - return + return nil, &errors.TaosError{Code: 0xffff, ErrStr: "failed to init stmt"} } defer stmt.Close() @@ -229,6 +228,11 @@ func (conn *Connector) InsertStmt() *insertstmt.InsertStmt { return insertstmt.NewInsertStmt(conn.taos) } +// Stmt Prepare stmt +func (conn *Connector) Stmt() *Stmt { + return NewStmt(conn.taos) +} + // InsertStmtWithReqID Prepare batch insert stmt with reqID func (conn *Connector) InsertStmtWithReqID(reqID int64) *insertstmt.InsertStmt { return insertstmt.NewInsertStmtWithReqID(conn.taos, reqID) @@ -240,8 +244,7 @@ func (conn *Connector) SelectDB(db string) error { code := wrapper.TaosSelectDB(conn.taos, db) locker.Unlock() if code != 0 { - err := taosError.NewError(code, wrapper.TaosErrorStr(nil)) - return err + return taosError.NewError(code, wrapper.TaosErrorStr(nil)) } return nil } diff --git a/af/conn_test.go b/af/conn_test.go index ad6d942..354771a 100644 --- a/af/conn_test.go +++ b/af/conn_test.go @@ -7,8 +7,10 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" "github.com/taosdata/driver-go/v3/common" param2 "github.com/taosdata/driver-go/v3/common/param" + "github.com/taosdata/driver-go/v3/wrapper" ) func TestMain(m *testing.M) { @@ -925,3 +927,49 @@ func TestConnector_QueryWithReqID(t *testing.T) { t.Fatal("result is error") } } + +func TestNewConnector(t *testing.T) { + tc, err := wrapper.TaosConnect("", "root", "taosdata", "", 0) + assert.NoError(t, err) + conn, err := NewConnector(tc) + assert.NoError(t, err) + assert.Equal(t, tc, conn.taos) + _, err = NewConnector(nil) + assert.Error(t, err) +} + +func TestSelectDB(t *testing.T) { + db, err := Open("", "", "", "", 0) + assert.NoError(t, err) + _, err = db.Exec("create database if not exists test_af precision 'us' keep 36500") + assert.NoError(t, err) + rows, err := db.Query("select database()") + assert.NoError(t, err) + dest := make([]driver.Value, 1) + err = rows.Next(dest) + assert.NoError(t, err) + err = rows.Close() + assert.NoError(t, err) + assert.Nil(t, dest[0]) + err = db.SelectDB("test_af") + assert.NoError(t, err) + rows, err = db.Query("select database()") + assert.NoError(t, err) + dest = make([]driver.Value, 1) + err = rows.Next(dest) + assert.NoError(t, err) + err = rows.Close() + assert.NoError(t, err) + assert.Equal(t, "test_af", dest[0]) + err = db.Close() + assert.NoError(t, err) +} + +func TestGetTableVGroupID(t *testing.T) { + db := testDatabase(t) + _, err := db.Exec("create table test_vg (ts timestamp,v int)") + assert.NoError(t, err) + vgID, err := db.GetTableVGroupID("test_af", "test_vg") + assert.NoError(t, err) + t.Log(vgID) +} diff --git a/af/insertstmt/stmt.go b/af/insertstmt/stmt.go index aae9795..d764560 100644 --- a/af/insertstmt/stmt.go +++ b/af/insertstmt/stmt.go @@ -34,13 +34,11 @@ func (stmt *InsertStmt) Prepare(sql string) error { code := wrapper.TaosStmtPrepare(stmt.stmt, sql) locker.Unlock() if code != 0 { - errStr := wrapper.TaosStmtErrStr(stmt.stmt) - return taosError.NewError(code, errStr) + return stmt.stmtErr(code) } isInsert, code := wrapper.TaosStmtIsInsert(stmt.stmt) if code != 0 { - errStr := wrapper.TaosStmtErrStr(stmt.stmt) - return taosError.NewError(code, errStr) + return stmt.stmtErr(code) } if !isInsert { return errors.New("only support insert") @@ -53,8 +51,7 @@ func (stmt *InsertStmt) SetTableName(name string) error { code := wrapper.TaosStmtSetTBName(stmt.stmt, name) locker.Unlock() if code != 0 { - errStr := wrapper.TaosStmtErrStr(stmt.stmt) - return taosError.NewError(code, errStr) + return stmt.stmtErr(code) } return nil } @@ -64,8 +61,7 @@ func (stmt *InsertStmt) SetSubTableName(name string) error { code := wrapper.TaosStmtSetSubTBName(stmt.stmt, name) locker.Unlock() if code != 0 { - errStr := wrapper.TaosStmtErrStr(stmt.stmt) - return taosError.NewError(code, errStr) + return stmt.stmtErr(code) } return nil } @@ -75,8 +71,7 @@ func (stmt *InsertStmt) SetTableNameWithTags(tableName string, tags *param.Param code := wrapper.TaosStmtSetTBNameTags(stmt.stmt, tableName, tags.GetValues()) locker.Unlock() if code != 0 { - errStr := wrapper.TaosStmtErrStr(stmt.stmt) - return taosError.NewError(code, errStr) + return stmt.stmtErr(code) } return nil } @@ -95,8 +90,7 @@ func (stmt *InsertStmt) BindParam(params []*param.Param, bindType *param.ColumnT code := wrapper.TaosStmtBindParamBatch(stmt.stmt, data, columnTypes) locker.Unlock() if code != 0 { - errStr := wrapper.TaosStmtErrStr(stmt.stmt) - return taosError.NewError(code, errStr) + return stmt.stmtErr(code) } return nil } @@ -106,8 +100,7 @@ func (stmt *InsertStmt) AddBatch() error { code := wrapper.TaosStmtAddBatch(stmt.stmt) locker.Unlock() if code != 0 { - errStr := wrapper.TaosStmtErrStr(stmt.stmt) - return taosError.NewError(code, errStr) + return stmt.stmtErr(code) } return nil } @@ -117,8 +110,7 @@ func (stmt *InsertStmt) Execute() error { code := wrapper.TaosStmtExecute(stmt.stmt) locker.Unlock() if code != 0 { - errStr := wrapper.TaosStmtErrStr(stmt.stmt) - return taosError.NewError(code, errStr) + return stmt.stmtErr(code) } return nil } @@ -133,9 +125,13 @@ func (stmt *InsertStmt) Close() error { locker.Unlock() var err error if code != 0 { - errStr := wrapper.TaosStmtErrStr(stmt.stmt) - err = taosError.NewError(code, errStr) + err = stmt.stmtErr(code) } stmt.stmt = nil return err } + +func (stmt *InsertStmt) stmtErr(code int) error { + errStr := wrapper.TaosStmtErrStr(stmt.stmt) + return taosError.NewError(code, errStr) +} diff --git a/af/insertstmt/stmt_test.go b/af/insertstmt/stmt_test.go new file mode 100644 index 0000000..0b50f39 --- /dev/null +++ b/af/insertstmt/stmt_test.go @@ -0,0 +1,113 @@ +package insertstmt + +import ( + "testing" + "time" + "unsafe" + + "github.com/stretchr/testify/assert" + "github.com/taosdata/driver-go/v3/common" + "github.com/taosdata/driver-go/v3/common/param" + taosError "github.com/taosdata/driver-go/v3/errors" + "github.com/taosdata/driver-go/v3/wrapper" +) + +func prepareEnv(conn unsafe.Pointer) error { + sqls := []string{ + "create database if not exists insert_stmt", + "use insert_stmt", + "create table test (ts timestamp, a int, b float)", + "create table stb(ts timestamp, v int) tags(a binary(10))", + } + for i := 0; i < len(sqls); i++ { + if err := exec(conn, sqls[i]); err != nil { + return err + } + } + return nil +} +func cleanEnv(conn unsafe.Pointer) error { + sqls := []string{ + "drop database if exists insert_stmt", + } + for i := 0; i < len(sqls); i++ { + if err := exec(conn, sqls[i]); err != nil { + return err + } + } + return nil +} +func TestStmt(t *testing.T) { + conn, err := wrapper.TaosConnect("", "root", "taosdata", "", 0) + assert.NoError(t, err) + defer wrapper.TaosClose(conn) + err = prepareEnv(conn) + assert.NoError(t, err) + defer cleanEnv(conn) + s := NewInsertStmt(conn) + defer s.Close() + err = s.Prepare("insert into ? values(?,?,?)") + assert.NoError(t, err) + err = s.SetTableName("test") + assert.NoError(t, err) + params := []*param.Param{ + param.NewParam(1).AddTimestamp(time.Now(), common.PrecisionMilliSecond), + param.NewParam(1).AddInt(1), + param.NewParam(1).AddFloat(1.1), + } + err = s.BindParam(params, param.NewColumnType(3).AddTimestamp().AddInt().AddFloat()) + assert.NoError(t, err) + err = s.AddBatch() + assert.NoError(t, err) + err = s.Execute() + assert.NoError(t, err) + affected := s.GetAffectedRows() + assert.Equal(t, int(1), affected) + + err = s.Prepare("insert into ? using stb tags(?) values(?,?)") + assert.NoError(t, err) + err = s.SetTableNameWithTags("ctb1", param.NewParam(1).AddBinary([]byte("test"))) + assert.NoError(t, err) + params = []*param.Param{ + param.NewParam(1).AddTimestamp(time.Now(), common.PrecisionMilliSecond), + param.NewParam(1).AddInt(1), + } + err = s.BindParam(params, param.NewColumnType(2).AddTimestamp().AddInt()) + assert.NoError(t, err) + err = s.AddBatch() + assert.NoError(t, err) + err = s.Execute() + assert.NoError(t, err) + affected = s.GetAffectedRows() + assert.Equal(t, int(1), affected) + + err = s.Prepare("insert into ? using stb tags('ctb2') values(?,?)") + assert.NoError(t, err) + err = s.SetSubTableName("ctb2") + assert.NoError(t, err) + params = []*param.Param{ + param.NewParam(1).AddTimestamp(time.Now(), common.PrecisionMilliSecond), + param.NewParam(1).AddInt(1), + } + err = s.BindParam(params, param.NewColumnType(2).AddTimestamp().AddInt()) + assert.NoError(t, err) + err = s.AddBatch() + assert.NoError(t, err) + err = s.Execute() + assert.NoError(t, err) + affected = s.GetAffectedRows() + assert.Equal(t, int(1), affected) + +} + +func exec(conn unsafe.Pointer, sql string) error { + res := wrapper.TaosQuery(conn, sql) + defer func() { + wrapper.TaosFreeResult(res) + }() + if code := wrapper.TaosError(res); code != 0 { + errStr := wrapper.TaosErrorStr(res) + return taosError.NewError(code, errStr) + } + return nil +} diff --git a/af/rows.go b/af/rows.go index a1702ae..75aaa82 100644 --- a/af/rows.go +++ b/af/rows.go @@ -23,6 +23,7 @@ type rows struct { blockSize int result unsafe.Pointer precision int + isStmt bool } func (rs *rows) Columns() []string { @@ -55,8 +56,7 @@ func (rs *rows) Next(dest []driver.Value) error { if rs.result == nil { return &errors.TaosError{Code: 0xffff, ErrStr: "result is nil!"} } - - if rs.block == nil { + if rs.block == nil || rs.blockOffset >= rs.blockSize { if err := rs.taosFetchBlock(); err != nil { return err } @@ -67,16 +67,6 @@ func (rs *rows) Next(dest []driver.Value) error { return io.EOF } - if rs.blockOffset >= rs.blockSize { - if err := rs.taosFetchBlock(); err != nil { - return err - } - } - if rs.blockSize == 0 { - rs.block = nil - rs.freeResult() - return io.EOF - } parser.ReadRow(dest, rs.block, rs.blockSize, rs.blockOffset, rs.rowsHeader.ColTypes, rs.precision) rs.blockOffset++ return nil @@ -111,9 +101,11 @@ func (rs *rows) asyncFetchRows() *handler.AsyncResult { func (rs *rows) freeResult() { if rs.result != nil { - locker.Lock() - wrapper.TaosFreeResult(rs.result) - locker.Unlock() + if !rs.isStmt { + locker.Lock() + wrapper.TaosFreeResult(rs.result) + locker.Unlock() + } rs.result = nil } diff --git a/af/stmt.go b/af/stmt.go index 18d52c2..dbe849b 100644 --- a/af/stmt.go +++ b/af/stmt.go @@ -2,9 +2,11 @@ package af import "C" import ( + "database/sql/driver" "fmt" "unsafe" + "github.com/taosdata/driver-go/v3/af/async" "github.com/taosdata/driver-go/v3/af/locker" "github.com/taosdata/driver-go/v3/common/param" taosError "github.com/taosdata/driver-go/v3/errors" @@ -36,22 +38,22 @@ func (s *Stmt) Prepare(sql string) error { code := wrapper.TaosStmtPrepare(s.stmt, sql) locker.Unlock() if code != 0 { - errStr := wrapper.TaosStmtErrStr(s.stmt) - return taosError.NewError(code, errStr) + return s.stmtErr(code) } isInsert, code := wrapper.TaosStmtIsInsert(s.stmt) if code != 0 { - errStr := wrapper.TaosStmtErrStr(s.stmt) - return taosError.NewError(code, errStr) + return s.stmtErr(code) } s.isInsert = isInsert + return nil +} + +func (s *Stmt) NumParams() (int, error) { numParams, code := wrapper.TaosStmtNumParams(s.stmt) if code != 0 { - errStr := wrapper.TaosStmtErrStr(s.stmt) - return taosError.NewError(code, errStr) + return 0, s.stmtErr(code) } - s.paramCount = numParams - return nil + return numParams, nil } func (s *Stmt) SetTableNameWithTags(tableName string, tags *param.Param) error { @@ -59,8 +61,7 @@ func (s *Stmt) SetTableNameWithTags(tableName string, tags *param.Param) error { code := wrapper.TaosStmtSetTBNameTags(s.stmt, tableName, tags.GetValues()) locker.Unlock() if code != 0 { - errStr := wrapper.TaosStmtErrStr(s.stmt) - return taosError.NewError(code, errStr) + return s.stmtErr(code) } return nil } @@ -70,36 +71,33 @@ func (s *Stmt) SetTableName(tableName string) error { code := wrapper.TaosStmtSetTBName(s.stmt, tableName) locker.Unlock() if code != 0 { - errStr := wrapper.TaosStmtErrStr(s.stmt) - return taosError.NewError(code, errStr) + return s.stmtErr(code) } return nil } func (s *Stmt) BindRow(row *param.Param) error { - if s.paramCount == 0 { - locker.Lock() - code := wrapper.TaosStmtBindParam(s.stmt, nil) - locker.Unlock() - if code != 0 { - errStr := wrapper.TaosStmtErrStr(s.stmt) - return taosError.NewError(code, errStr) + if s.isInsert { + if s.paramCount == 0 { + paramCount, err := s.NumParams() + if err != nil { + return err + } + s.paramCount = paramCount } - return nil } if row == nil { return fmt.Errorf("row param got nil") } value := row.GetValues() - if len(value) != s.paramCount { + if s.isInsert && len(value) != s.paramCount { return fmt.Errorf("row param count error : expect %d got %d", s.paramCount, len(value)) } locker.Lock() code := wrapper.TaosStmtBindParam(s.stmt, value) locker.Unlock() if code != 0 { - errStr := wrapper.TaosStmtErrStr(s.stmt) - return taosError.NewError(code, errStr) + return s.stmtErr(code) } return nil } @@ -116,8 +114,7 @@ func (s *Stmt) AddBatch() error { code := wrapper.TaosStmtAddBatch(s.stmt) locker.Unlock() if code != 0 { - errStr := wrapper.TaosStmtErrStr(s.stmt) - return taosError.NewError(code, errStr) + return s.stmtErr(code) } return nil } @@ -127,20 +124,45 @@ func (s *Stmt) Execute() error { code := wrapper.TaosStmtExecute(s.stmt) locker.Unlock() if code != 0 { - errStr := wrapper.TaosStmtErrStr(s.stmt) - return taosError.NewError(code, errStr) + return s.stmtErr(code) } return nil } +func (s *Stmt) UseResult() (driver.Rows, error) { + locker.Lock() + res := wrapper.TaosStmtUseResult(s.stmt) + locker.Unlock() + numFields := wrapper.TaosNumFields(res) + rowsHeader, err := wrapper.ReadColumn(res, numFields) + h := async.GetHandler() + if err != nil { + async.PutHandler(h) + return nil, err + } + precision := wrapper.TaosResultPrecision(res) + rs := &rows{ + handler: h, + rowsHeader: rowsHeader, + result: res, + precision: precision, + isStmt: true, + } + return rs, nil +} + func (s *Stmt) Close() error { locker.Lock() code := wrapper.TaosStmtClose(s.stmt) locker.Unlock() s.stmt = nil if code != 0 { - errStr := wrapper.TaosStmtErrStr(s.stmt) - return taosError.NewError(code, errStr) + return s.stmtErr(code) } return nil } + +func (s *Stmt) stmtErr(code int) error { + errStr := wrapper.TaosStmtErrStr(s.stmt) + return taosError.NewError(code, errStr) +} diff --git a/af/stmt_test.go b/af/stmt_test.go new file mode 100644 index 0000000..6a23e9f --- /dev/null +++ b/af/stmt_test.go @@ -0,0 +1,55 @@ +package af + +import ( + "database/sql/driver" + "io" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/taosdata/driver-go/v3/common" + "github.com/taosdata/driver-go/v3/common/param" +) + +func TestNewStmt(t *testing.T) { + db := testDatabase(t) + _, err := db.Exec("create table test_stmt (ts timestamp,v int)") + assert.NoError(t, err) + stmt := db.Stmt() + err = stmt.Prepare("insert into ? values(?,?)") + assert.NoError(t, err) + err = stmt.SetTableName("test_stmt") + assert.NoError(t, err) + ts := time.Now().UnixNano() / 1e3 + err = stmt.BindRow(param.NewParam(2).AddTimestamp(time.Unix(0, ts*1e3), common.PrecisionMicroSecond).AddInt(1)) + assert.NoError(t, err) + err = stmt.AddBatch() + assert.NoError(t, err) + err = stmt.Execute() + assert.NoError(t, err) + affected := stmt.GetAffectedRows() + assert.Equal(t, int(1), affected) + err = stmt.Prepare("select * from test_stmt where v = ?") + assert.NoError(t, err) + err = stmt.BindRow(param.NewParam(1).AddInt(1)) + assert.NoError(t, err) + err = stmt.AddBatch() + assert.NoError(t, err) + err = stmt.Execute() + assert.NoError(t, err) + rows, err := stmt.UseResult() + assert.NoError(t, err) + dest := make([]driver.Value, 2) + err = rows.Next(dest) + assert.NoError(t, err) + assert.Equal(t, ts, dest[0].(time.Time).UnixNano()/1e3) + assert.Equal(t, int32(1), dest[1].(int32)) + err = rows.Next(dest) + assert.ErrorIs(t, err, io.EOF) + err = rows.Close() + assert.NoError(t, err) + err = stmt.Close() + assert.NoError(t, err) + err = db.Close() + assert.NoError(t, err) +} diff --git a/af/tmq/consumer.go b/af/tmq/consumer.go index b6330d8..aab447c 100644 --- a/af/tmq/consumer.go +++ b/af/tmq/consumer.go @@ -65,14 +65,12 @@ func (c *Consumer) SubscribeTopics(topics []string, rebalanceCb RebalanceCb) err for _, topic := range topics { errCode := wrapper.TMQListAppend(topicList, topic) if errCode != 0 { - errStr := wrapper.TMQErr2Str(errCode) - return taosError.NewError(int(errCode), errStr) + return c.tmqError(errCode) } } errCode := wrapper.TMQSubscribe(c.cConsumer, topicList) if errCode != 0 { - errStr := wrapper.TMQErr2Str(errCode) - return taosError.NewError(int(errCode), errStr) + return c.tmqError(errCode) } return nil } @@ -81,8 +79,7 @@ func (c *Consumer) SubscribeTopics(topics []string, rebalanceCb RebalanceCb) err func (c *Consumer) Unsubscribe() error { errCode := wrapper.TMQUnsubscribe(c.cConsumer) if errCode != taosError.SUCCESS { - errStr := wrapper.TMQErr2Str(errCode) - return taosError.NewError(int(errCode), errStr) + return c.tmqError(errCode) } return nil } @@ -202,8 +199,7 @@ func (c *Consumer) getData(message unsafe.Pointer) ([]*tmq.Data, error) { func (c *Consumer) Commit() ([]tmq.TopicPartition, error) { errCode := wrapper.TMQCommitSync(c.cConsumer, nil) if errCode != taosError.SUCCESS { - errStr := wrapper.TMQErr2Str(errCode) - return nil, taosError.NewError(int(errCode), errStr) + return nil, c.tmqError(errCode) } partitions, err := c.Assignment() if err != nil { @@ -215,8 +211,7 @@ func (c *Consumer) Commit() ([]tmq.TopicPartition, error) { func (c *Consumer) doCommit(message unsafe.Pointer) ([]tmq.TopicPartition, error) { errCode := wrapper.TMQCommitSync(c.cConsumer, message) if errCode != taosError.SUCCESS { - errStr := wrapper.TMQErr2Str(errCode) - return nil, taosError.NewError(int(errCode), errStr) + return nil, c.tmqError(errCode) } return nil, nil } @@ -224,8 +219,7 @@ func (c *Consumer) doCommit(message unsafe.Pointer) ([]tmq.TopicPartition, error func (c *Consumer) Assignment() (partitions []tmq.TopicPartition, err error) { errCode, list := wrapper.TMQSubscription(c.cConsumer) if errCode != taosError.SUCCESS { - errStr := wrapper.TMQErr2Str(errCode) - return nil, taosError.NewError(int(errCode), errStr) + return nil, c.tmqError(errCode) } defer wrapper.TMQListDestroy(list) size := wrapper.TMQListGetSize(list) @@ -233,8 +227,7 @@ func (c *Consumer) Assignment() (partitions []tmq.TopicPartition, err error) { for _, topic := range topics { errCode, assignment := wrapper.TMQGetTopicAssignment(c.cConsumer, topic) if errCode != taosError.SUCCESS { - errStr := wrapper.TMQErr2Str(errCode) - return nil, taosError.NewError(int(errCode), errStr) + return nil, c.tmqError(errCode) } for i := 0; i < len(assignment); i++ { topicName := topic @@ -251,8 +244,7 @@ func (c *Consumer) Assignment() (partitions []tmq.TopicPartition, err error) { func (c *Consumer) Seek(partition tmq.TopicPartition, ignoredTimeoutMs int) error { errCode := wrapper.TMQOffsetSeek(c.cConsumer, *partition.Topic, partition.Partition, int64(partition.Offset)) if errCode != taosError.SUCCESS { - errStr := wrapper.TMQErr2Str(errCode) - return taosError.NewError(int(errCode), errStr) + return c.tmqError(errCode) } return nil } @@ -263,7 +255,7 @@ func (c *Consumer) Committed(partitions []tmq.TopicPartition, timeoutMs int) (of cOffset := wrapper.TMQCommitted(c.cConsumer, *partitions[i].Topic, partitions[i].Partition) offset := tmq.Offset(cOffset) if !offset.Valid() { - return nil, taosError.NewError(int(offset), wrapper.TMQErr2Str(int32(offset))) + return nil, c.tmqError(int32(offset)) } offsets[i] = tmq.TopicPartition{ Topic: partitions[i].Topic, @@ -278,8 +270,7 @@ func (c *Consumer) CommitOffsets(offsets []tmq.TopicPartition) ([]tmq.TopicParti for i := 0; i < len(offsets); i++ { errCode := wrapper.TMQCommitOffsetSync(c.cConsumer, *offsets[i].Topic, offsets[i].Partition, int64(offsets[i].Offset)) if errCode != taosError.SUCCESS { - errStr := wrapper.TMQErr2Str(errCode) - return nil, taosError.NewError(int(errCode), errStr) + return nil, c.tmqError(errCode) } } return c.Committed(offsets, 0) @@ -290,7 +281,7 @@ func (c *Consumer) Position(partitions []tmq.TopicPartition) (offsets []tmq.Topi for i := 0; i < len(partitions); i++ { position := wrapper.TMQPosition(c.cConsumer, *partitions[i].Topic, partitions[i].Partition) if position < 0 { - return nil, taosError.NewError(int(position), wrapper.TMQErr2Str(int32(position))) + return nil, c.tmqError(int32(position)) } offsets[i] = tmq.TopicPartition{ Topic: partitions[i].Topic, @@ -305,8 +296,12 @@ func (c *Consumer) Position(partitions []tmq.TopicPartition) (offsets []tmq.Topi func (c *Consumer) Close() error { errCode := wrapper.TMQConsumerClose(c.cConsumer) if errCode != 0 { - errStr := wrapper.TMQErr2Str(errCode) - return taosError.NewError(int(errCode), errStr) + return c.tmqError(errCode) } return nil } + +func (c *Consumer) tmqError(errCode int32) error { + errStr := wrapper.TMQErr2Str(errCode) + return taosError.NewError(int(errCode), errStr) +} diff --git a/af/tmq/consumer_test.go b/af/tmq/consumer_test.go index 20232e0..47252b7 100644 --- a/af/tmq/consumer_test.go +++ b/af/tmq/consumer_test.go @@ -407,3 +407,89 @@ func TestMultiBlock(t *testing.T) { } } } + +func prepareMetaEnv(conn unsafe.Pointer) error { + var err error + steps := []string{ + "drop topic if exists test_tmq_meta_topic", + "drop database if exists test_tmq_meta", + "create database test_tmq_meta vgroups 1 WAL_RETENTION_PERIOD 86400", + "create topic test_tmq_meta_topic with meta as database test_tmq_meta", + } + for _, step := range steps { + err = execWithoutResult(conn, step) + if err != nil { + return err + } + } + return nil +} + +func cleanMetaEnv(conn unsafe.Pointer) error { + var err error + time.Sleep(2 * time.Second) + steps := []string{ + "drop topic if exists test_tmq_meta_topic", + "drop database if exists test_tmq_meta", + } + for _, step := range steps { + err = execWithoutResult(conn, step) + if err != nil { + return err + } + } + return nil +} + +func TestMeta(t *testing.T) { + conn, err := wrapper.TaosConnect("", "root", "taosdata", "", 0) + assert.NoError(t, err) + defer wrapper.TaosClose(conn) + err = prepareMetaEnv(conn) + assert.NoError(t, err) + defer cleanMetaEnv(conn) + consumer, err := NewConsumer(&tmq.ConfigMap{ + "group.id": "test", + "td.connect.ip": "127.0.0.1", + "td.connect.user": "root", + "td.connect.pass": "taosdata", + "td.connect.port": "6030", + "auto.offset.reset": "earliest", + "client.id": "test_tmq_multi_block_topic", + "enable.auto.commit": "false", + "msg.with.table.name": "true", + }) + err = consumer.Subscribe("test_tmq_meta_topic", nil) + assert.NoError(t, err) + defer func() { + consumer.Unsubscribe() + consumer.Close() + }() + go func() { + execWithoutResult(conn, "create table test_tmq_meta.st(ts timestamp,v int) tags (cn binary(20))") + execWithoutResult(conn, "create table test_tmq_meta.t1 using test_tmq_meta.st tags ('t1')") + execWithoutResult(conn, "insert into test_tmq_meta.t1 values (now,1)") + execWithoutResult(conn, "insert into test_tmq_meta.t2 using test_tmq_meta.st tags ('t1') values (now,2)") + time.Sleep(time.Second) + execWithoutResult(conn, "insert into test_tmq_meta.t1 values (now,1)") + execWithoutResult(conn, "insert into test_tmq_meta.t1 values (now,1)") + }() + for i := 0; i < 10; i++ { + event := consumer.Poll(500) + if event == nil { + continue + } + switch e := event.(type) { + case *tmq.DataMessage: + t.Log(e) + assert.Equal(t, "test_tmq_meta", e.DBName()) + case *tmq.MetaDataMessage: + assert.Equal(t, "test_tmq_meta", e.DBName()) + assert.Equal(t, "test_tmq_meta_topic", e.Topic()) + t.Log(e) + case *tmq.MetaMessage: + assert.Equal(t, "test_tmq_meta", e.DBName()) + t.Log(e) + } + } +} diff --git a/common/datatype.go b/common/datatype.go index ea85688..b5406ed 100644 --- a/common/datatype.go +++ b/common/datatype.go @@ -1,7 +1,6 @@ package common import ( - "errors" "reflect" ) @@ -288,12 +287,3 @@ var NameTypeMap = map[string]int{ TSDB_DATA_TYPE_VARBINARY_Str: TSDB_DATA_TYPE_VARBINARY, TSDB_DATA_TYPE_GEOMETRY_Str: TSDB_DATA_TYPE_GEOMETRY, } - -var NotSupportType = errors.New("not support type") - -func GetColType(colType int) (*DBType, error) { - if colType > len(allType) || colType < 0 { - return nil, NotSupportType - } - return allType[colType], nil -} diff --git a/common/param/column_test.go b/common/param/column_test.go new file mode 100644 index 0000000..a684a8c --- /dev/null +++ b/common/param/column_test.go @@ -0,0 +1,435 @@ +package param + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/taosdata/driver-go/v3/types" +) + +func TestColumnType_AddBool(t *testing.T) { + colType := NewColumnType(1) + colType.AddBool() + + expected := []*types.ColumnType{ + { + Type: types.TaosBoolType, + }, + } + + values, err := colType.GetValue() + assert.NoError(t, err) + assert.Equal(t, expected, values) + + colType.AddBool() + + values, err = colType.GetValue() + assert.NoError(t, err) + assert.Equal(t, expected, values) +} + +func TestColumnType_AddTinyint(t *testing.T) { + colType := NewColumnType(1) + + colType.AddTinyint() + + expected := []*types.ColumnType{ + { + Type: types.TaosTinyintType, + }, + } + + values, err := colType.GetValue() + assert.NoError(t, err) + assert.Equal(t, expected, values) + + colType.AddTinyint() + + values, err = colType.GetValue() + assert.NoError(t, err) + assert.Equal(t, expected, values) +} + +func TestColumnType_AddSmallint(t *testing.T) { + colType := NewColumnType(1) + + colType.AddSmallint() + + expected := []*types.ColumnType{ + { + Type: types.TaosSmallintType, + }, + } + + values, err := colType.GetValue() + assert.NoError(t, err) + assert.Equal(t, expected, values) + + colType.AddSmallint() + + values, err = colType.GetValue() + assert.NoError(t, err) + assert.Equal(t, expected, values) +} + +func TestColumnType_AddInt(t *testing.T) { + colType := NewColumnType(1) + + colType.AddInt() + + expected := []*types.ColumnType{ + { + Type: types.TaosIntType, + }, + } + + values, err := colType.GetValue() + assert.NoError(t, err) + assert.Equal(t, expected, values) + + colType.AddInt() + + values, err = colType.GetValue() + assert.NoError(t, err) + assert.Equal(t, expected, values) +} + +func TestColumnType_AddBigint(t *testing.T) { + colType := NewColumnType(1) + + colType.AddBigint() + + expected := []*types.ColumnType{ + { + Type: types.TaosBigintType, + }, + } + + values, err := colType.GetValue() + assert.NoError(t, err) + assert.Equal(t, expected, values) + + colType.AddBigint() + + values, err = colType.GetValue() + assert.NoError(t, err) + assert.Equal(t, expected, values) +} + +func TestColumnType_AddUTinyint(t *testing.T) { + colType := NewColumnType(1) + + colType.AddUTinyint() + + expected := []*types.ColumnType{ + { + Type: types.TaosUTinyintType, + }, + } + + values, err := colType.GetValue() + assert.NoError(t, err) + assert.Equal(t, expected, values) + + colType.AddUTinyint() + + values, err = colType.GetValue() + assert.NoError(t, err) + assert.Equal(t, expected, values) +} + +func TestColumnType_AddUSmallint(t *testing.T) { + colType := NewColumnType(1) + + colType.AddUSmallint() + + expected := []*types.ColumnType{ + { + Type: types.TaosUSmallintType, + }, + } + + values, err := colType.GetValue() + assert.NoError(t, err) + assert.Equal(t, expected, values) + + colType.AddUSmallint() + + values, err = colType.GetValue() + assert.NoError(t, err) + assert.Equal(t, expected, values) +} + +func TestColumnType_AddUInt(t *testing.T) { + colType := NewColumnType(1) + + colType.AddUInt() + + expected := []*types.ColumnType{ + { + Type: types.TaosUIntType, + }, + } + + values, err := colType.GetValue() + assert.NoError(t, err) + assert.Equal(t, expected, values) + + colType.AddUInt() + + values, err = colType.GetValue() + assert.NoError(t, err) + assert.Equal(t, expected, values) +} + +func TestColumnType_AddUBigint(t *testing.T) { + colType := NewColumnType(1) + + colType.AddUBigint() + + expected := []*types.ColumnType{ + { + Type: types.TaosUBigintType, + }, + } + + values, err := colType.GetValue() + assert.NoError(t, err) + assert.Equal(t, expected, values) + + colType.AddUBigint() + + values, err = colType.GetValue() + assert.NoError(t, err) + assert.Equal(t, expected, values) +} + +func TestColumnType_AddFloat(t *testing.T) { + colType := NewColumnType(1) + + colType.AddFloat() + + expected := []*types.ColumnType{ + { + Type: types.TaosFloatType, + }, + } + + values, err := colType.GetValue() + assert.NoError(t, err) + assert.Equal(t, expected, values) + + colType.AddFloat() + + values, err = colType.GetValue() + assert.NoError(t, err) + assert.Equal(t, expected, values) +} + +func TestColumnType_AddDouble(t *testing.T) { + colType := NewColumnType(1) + + colType.AddDouble() + + expected := []*types.ColumnType{ + { + Type: types.TaosDoubleType, + }, + } + + values, err := colType.GetValue() + assert.NoError(t, err) + assert.Equal(t, expected, values) + + colType.AddDouble() + + values, err = colType.GetValue() + assert.NoError(t, err) + assert.Equal(t, expected, values) +} + +func TestColumnType_AddBinary(t *testing.T) { + colType := NewColumnType(1) + + colType.AddBinary(100) + + expected := []*types.ColumnType{ + { + Type: types.TaosBinaryType, + MaxLen: 100, + }, + } + + values, err := colType.GetValue() + assert.NoError(t, err) + assert.Equal(t, expected, values) + + colType.AddBinary(50) + + values, err = colType.GetValue() + assert.NoError(t, err) + assert.Equal(t, expected, values) +} + +func TestColumnType_AddVarBinary(t *testing.T) { + colType := NewColumnType(1) + + colType.AddVarBinary(100) + + expected := []*types.ColumnType{ + { + Type: types.TaosVarBinaryType, + MaxLen: 100, + }, + } + + values, err := colType.GetValue() + assert.NoError(t, err) + assert.Equal(t, expected, values) + + colType.AddVarBinary(50) + values, err = colType.GetValue() + assert.NoError(t, err) + assert.Equal(t, expected, values) +} + +func TestColumnType_AddNchar(t *testing.T) { + colType := NewColumnType(1) + + colType.AddNchar(100) + + expected := []*types.ColumnType{ + { + Type: types.TaosNcharType, + MaxLen: 100, + }, + } + + values, err := colType.GetValue() + assert.NoError(t, err) + assert.Equal(t, expected, values) + + colType.AddNchar(50) + + values, err = colType.GetValue() + assert.NoError(t, err) + assert.Equal(t, expected, values) +} + +func TestColumnType_AddTimestamp(t *testing.T) { + colType := NewColumnType(1) + + colType.AddTimestamp() + + expected := []*types.ColumnType{ + { + Type: types.TaosTimestampType, + }, + } + + values, err := colType.GetValue() + assert.NoError(t, err) + assert.Equal(t, expected, values) + + colType.AddTimestamp() + + values, err = colType.GetValue() + assert.NoError(t, err) + assert.Equal(t, expected, values) +} + +func TestColumnType_AddJson(t *testing.T) { + colType := NewColumnType(1) + + colType.AddJson(100) + + expected := []*types.ColumnType{ + { + Type: types.TaosJsonType, + MaxLen: 100, + }, + } + + values, err := colType.GetValue() + assert.NoError(t, err) + assert.Equal(t, expected, values) + + colType.AddJson(50) + + values, err = colType.GetValue() + assert.NoError(t, err) + assert.Equal(t, expected, values) +} + +func TestColumnType_AddGeometry(t *testing.T) { + colType := NewColumnType(1) + + colType.AddGeometry(100) + + expected := []*types.ColumnType{ + { + Type: types.TaosGeometryType, + MaxLen: 100, + }, + } + + values, err := colType.GetValue() + assert.NoError(t, err) + assert.Equal(t, expected, values) + + colType.AddGeometry(50) + + values, err = colType.GetValue() + assert.NoError(t, err) + assert.Equal(t, expected, values) +} + +func TestColumnType_GetValue(t *testing.T) { + // Initialize ColumnType with size 3 + colType := NewColumnType(3) + + // Add column types + colType.AddBool() + colType.AddTinyint() + colType.AddFloat() + + // Try to get values + values, err := colType.GetValue() + assert.NoError(t, err) + + // Check if the length of values matches the expected size + expectedSize := 3 + assert.Equal(t, expectedSize, len(values)) + + // Initialize ColumnType with size 3 + colType = NewColumnType(3) + + // Add only 2 column types + colType.AddBool() + colType.AddTinyint() + + // Try to get values + _, err = colType.GetValue() + + // Check if an error is returned due to incomplete column + assert.Error(t, err) + assert.Equal(t, "incomplete column expect 3 columns set 2 columns", err.Error()) +} + +func TestNewColumnTypeWithValue(t *testing.T) { + value := []*types.ColumnType{ + {Type: types.TaosBoolType}, + {Type: types.TaosTinyintType}, + } + + colType := NewColumnTypeWithValue(value) + + expectedSize := len(value) + assert.Equal(t, expectedSize, colType.size) + + expectedValue := value + assert.Equal(t, expectedValue, colType.value) + + expectedColumn := len(value) + assert.Equal(t, expectedColumn, colType.column) +} diff --git a/common/param/param_test.go b/common/param/param_test.go new file mode 100644 index 0000000..8bf3a18 --- /dev/null +++ b/common/param/param_test.go @@ -0,0 +1,654 @@ +package param + +import ( + "database/sql/driver" + "testing" + "time" + + "github.com/stretchr/testify/assert" + taosTypes "github.com/taosdata/driver-go/v3/types" +) + +func TestParam_SetBool(t *testing.T) { + param := NewParam(1) + param.SetBool(0, true) + + expected := []driver.Value{taosTypes.TaosBool(true)} + assert.Equal(t, expected, param.GetValues()) + + param = NewParam(0) + param.SetBool(0, true) + assert.Equal(t, 0, len(param.GetValues())) +} + +func TestParam_SetNull(t *testing.T) { + param := NewParam(1) + param.SetNull(0) + + if param.GetValues()[0] != nil { + t.Errorf("SetNull failed, expected nil, got %v", param.GetValues()[0]) + } + param = NewParam(0) + param.SetNull(0) + assert.Equal(t, 0, len(param.GetValues())) +} + +func TestParam_SetTinyint(t *testing.T) { + param := NewParam(1) + param.SetTinyint(0, 42) + + expected := []driver.Value{taosTypes.TaosTinyint(42)} + assert.Equal(t, expected, param.GetValues()) + + // Test when offset is out of range + param.SetTinyint(1, 42) // Attempt to set at index 1 with size 1 + assert.Equal(t, expected, param.GetValues()) // Should not modify values +} + +func TestParam_SetSmallint(t *testing.T) { + param := NewParam(1) + param.SetSmallint(0, 42) + + expected := []driver.Value{taosTypes.TaosSmallint(42)} + assert.Equal(t, expected, param.GetValues()) + + // Test when offset is out of range + param.SetSmallint(1, 42) // Attempt to set at index 1 with size 1 + assert.Equal(t, expected, param.GetValues()) // Should not modify values +} + +func TestParam_SetInt(t *testing.T) { + param := NewParam(1) + param.SetInt(0, 42) + + expected := []driver.Value{taosTypes.TaosInt(42)} + assert.Equal(t, expected, param.GetValues()) + + // Test when offset is out of range + param.SetInt(1, 42) // Attempt to set at index 1 with size 1 + assert.Equal(t, expected, param.GetValues()) // Should not modify values +} + +func TestParam_SetBigint(t *testing.T) { + param := NewParam(1) + param.SetBigint(0, 42) + + expected := []driver.Value{taosTypes.TaosBigint(42)} + assert.Equal(t, expected, param.GetValues()) + + // Test when offset is out of range + param.SetBigint(1, 42) // Attempt to set at index 1 with size 1 + assert.Equal(t, expected, param.GetValues()) // Should not modify values +} + +func TestParam_SetUTinyint(t *testing.T) { + param := NewParam(1) + param.SetUTinyint(0, 42) + + expected := []driver.Value{taosTypes.TaosUTinyint(42)} + assert.Equal(t, expected, param.GetValues()) + + // Test when offset is out of range + param.SetUTinyint(1, 42) // Attempt to set at index 1 with size 1 + assert.Equal(t, expected, param.GetValues()) // Should not modify values +} + +func TestParam_SetUSmallint(t *testing.T) { + param := NewParam(1) + param.SetUSmallint(0, 42) + + expected := []driver.Value{taosTypes.TaosUSmallint(42)} + assert.Equal(t, expected, param.GetValues()) + + // Test when offset is out of range + param.SetUSmallint(1, 42) // Attempt to set at index 1 with size 1 + assert.Equal(t, expected, param.GetValues()) // Should not modify values +} + +func TestParam_SetUInt(t *testing.T) { + param := NewParam(1) + param.SetUInt(0, 42) + + expected := []driver.Value{taosTypes.TaosUInt(42)} + assert.Equal(t, expected, param.GetValues()) + + // Test when offset is out of range + param.SetUInt(1, 42) // Attempt to set at index 1 with size 1 + assert.Equal(t, expected, param.GetValues()) // Should not modify values +} + +func TestParam_SetUBigint(t *testing.T) { + param := NewParam(1) + param.SetUBigint(0, 42) + + expected := []driver.Value{taosTypes.TaosUBigint(42)} + assert.Equal(t, expected, param.GetValues()) + + // Test when offset is out of range + param.SetUBigint(1, 42) // Attempt to set at index 1 with size 1 + assert.Equal(t, expected, param.GetValues()) // Should not modify values +} + +func TestParam_SetFloat(t *testing.T) { + param := NewParam(1) + param.SetFloat(0, 3.14) + + expected := []driver.Value{taosTypes.TaosFloat(3.14)} + assert.Equal(t, expected, param.GetValues()) + + // Test when offset is out of range + param.SetFloat(1, 3.14) // Attempt to set at index 1 with size 1 + assert.Equal(t, expected, param.GetValues()) // Should not modify values +} + +func TestParam_SetDouble(t *testing.T) { + param := NewParam(1) + param.SetDouble(0, 3.14) + + expected := []driver.Value{taosTypes.TaosDouble(3.14)} + assert.Equal(t, expected, param.GetValues()) + + // Test when offset is out of range + param.SetDouble(1, 3.14) // Attempt to set at index 1 with size 1 + assert.Equal(t, expected, param.GetValues()) // Should not modify values +} + +func TestParam_SetBinary(t *testing.T) { + param := NewParam(1) + param.SetBinary(0, []byte{0x01, 0x02}) + + expected := []driver.Value{taosTypes.TaosBinary([]byte{0x01, 0x02})} + assert.Equal(t, expected, param.GetValues()) + + // Test when offset is out of range + param.SetBinary(1, []byte{0x01, 0x02}) // Attempt to set at index 1 with size 1 + assert.Equal(t, expected, param.GetValues()) // Should not modify values +} + +func TestParam_SetVarBinary(t *testing.T) { + param := NewParam(1) + param.SetVarBinary(0, []byte{0x01, 0x02}) + + expected := []driver.Value{taosTypes.TaosVarBinary([]byte{0x01, 0x02})} + assert.Equal(t, expected, param.GetValues()) + + // Test when offset is out of range + param.SetVarBinary(1, []byte{0x01, 0x02}) // Attempt to set at index 1 with size 1 + assert.Equal(t, expected, param.GetValues()) // Should not modify values +} + +func TestParam_SetNchar(t *testing.T) { + param := NewParam(1) + param.SetNchar(0, "hello") + + expected := []driver.Value{taosTypes.TaosNchar("hello")} + assert.Equal(t, expected, param.GetValues()) + + // Test when offset is out of range + param.SetNchar(1, "hello") // Attempt to set at index 1 with size 1 + assert.Equal(t, expected, param.GetValues()) // Should not modify values +} + +func TestParam_SetTimestamp(t *testing.T) { + timestamp := time.Date(2022, time.January, 1, 12, 0, 0, 0, time.UTC) + param := NewParam(1) + param.SetTimestamp(0, timestamp, 6) + + expected := []driver.Value{taosTypes.TaosTimestamp{T: timestamp, Precision: 6}} + assert.Equal(t, expected, param.GetValues()) + + // Test when offset is out of range + param.SetTimestamp(1, timestamp, 6) // Attempt to set at index 1 with size 1 + assert.Equal(t, expected, param.GetValues()) // Should not modify values +} + +func TestParam_SetJson(t *testing.T) { + jsonData := []byte(`{"key": "value"}`) + param := NewParam(1) + param.SetJson(0, jsonData) + + expected := []driver.Value{taosTypes.TaosJson(jsonData)} + assert.Equal(t, expected, param.GetValues()) + + // Test when offset is out of range + param.SetJson(1, jsonData) // Attempt to set at index 1 with size 1 + assert.Equal(t, expected, param.GetValues()) // Should not modify values +} + +func TestParam_SetGeometry(t *testing.T) { + geometryData := []byte{0x01, 0x02, 0x03, 0x04} + param := NewParam(1) + param.SetGeometry(0, geometryData) + + expected := []driver.Value{taosTypes.TaosGeometry(geometryData)} + assert.Equal(t, expected, param.GetValues()) + + // Test when offset is out of range + param.SetGeometry(1, geometryData) // Attempt to set at index 1 with size 1 + assert.Equal(t, expected, param.GetValues()) // Should not modify values +} + +func TestParam_AddBool(t *testing.T) { + param := NewParam(2) // Initialize with size 2 + + // Add a bool value + param.AddBool(true) + + expected := []driver.Value{taosTypes.TaosBool(true), nil} + assert.Equal(t, expected, param.GetValues()) + + // Add another bool value + param.AddBool(false) + + expected = []driver.Value{taosTypes.TaosBool(true), taosTypes.TaosBool(false)} + assert.Equal(t, expected, param.GetValues()) + + // Test when offset is out of range + param.AddBool(true) // Attempt to add at index 2 with size 2 + assert.Equal(t, expected, param.GetValues()) // Should not modify values +} + +func TestParam_AddNull(t *testing.T) { + param := NewParam(2) // Initialize with size 2 + + // Add a null value + param.AddNull() + + expected := []driver.Value{nil, nil} + assert.Equal(t, expected, param.GetValues()) + + // Add another null value + param.AddNull() + + expected = []driver.Value{nil, nil} + assert.Equal(t, expected, param.GetValues()) + + // Test when offset is out of range + param.AddNull() // Attempt to add at index 2 with size 2 + assert.Equal(t, expected, param.GetValues()) // Should not modify values +} + +func TestParam_AddTinyint(t *testing.T) { + param := NewParam(2) // Initialize with size 2 + + // Add a tinyint value + param.AddTinyint(42) + + expected := []driver.Value{taosTypes.TaosTinyint(42), nil} + assert.Equal(t, expected, param.GetValues()) + + // Add another tinyint value + param.AddTinyint(84) + + expected = []driver.Value{taosTypes.TaosTinyint(42), taosTypes.TaosTinyint(84)} + assert.Equal(t, expected, param.GetValues()) + + // Test when offset is out of range + param.AddTinyint(126) // Attempt to add at index 2 with size 2 + assert.Equal(t, expected, param.GetValues()) // Should not modify values +} + +func TestParam_AddSmallint(t *testing.T) { + param := NewParam(2) // Initialize with size 2 + + // Add a smallint value + param.AddSmallint(42) + + expected := []driver.Value{taosTypes.TaosSmallint(42), nil} + assert.Equal(t, expected, param.GetValues()) + + // Add another smallint value + param.AddSmallint(84) + + expected = []driver.Value{taosTypes.TaosSmallint(42), taosTypes.TaosSmallint(84)} + assert.Equal(t, expected, param.GetValues()) + + // Test when offset is out of range + param.AddSmallint(126) // Attempt to add at index 2 with size 2 + assert.Equal(t, expected, param.GetValues()) // Should not modify values +} + +func TestParam_AddInt(t *testing.T) { + param := NewParam(2) // Initialize with size 2 + + // Add an int value + param.AddInt(42) + + expected := []driver.Value{taosTypes.TaosInt(42), nil} + assert.Equal(t, expected, param.GetValues()) + + // Add another int value + param.AddInt(84) + + expected = []driver.Value{taosTypes.TaosInt(42), taosTypes.TaosInt(84)} + assert.Equal(t, expected, param.GetValues()) + + // Test when offset is out of range + param.AddInt(126) // Attempt to add at index 2 with size 2 + assert.Equal(t, expected, param.GetValues()) // Should not mod +} + +func TestParam_AddBigint(t *testing.T) { + param := NewParam(2) // Initialize with size 2 + + // Add a bigint value + param.AddBigint(42) + + expected := []driver.Value{taosTypes.TaosBigint(42), nil} + assert.Equal(t, expected, param.GetValues()) + + // Add another bigint value + param.AddBigint(84) + + expected = []driver.Value{taosTypes.TaosBigint(42), taosTypes.TaosBigint(84)} + assert.Equal(t, expected, param.GetValues()) + + // Test when offset is out of range + param.AddBigint(126) // Attempt to add at index 2 with size 2 + assert.Equal(t, expected, param.GetValues()) // Should not modify values +} + +func TestParam_AddUTinyint(t *testing.T) { + param := NewParam(2) // Initialize with size 2 + + // Add a utinyint value + param.AddUTinyint(42) + + expected := []driver.Value{taosTypes.TaosUTinyint(42), nil} + assert.Equal(t, expected, param.GetValues()) + + // Add another utinyint value + param.AddUTinyint(84) + + expected = []driver.Value{taosTypes.TaosUTinyint(42), taosTypes.TaosUTinyint(84)} + assert.Equal(t, expected, param.GetValues()) + + // Test when offset is out of range + param.AddUTinyint(126) // Attempt to add at index 2 with size 2 + assert.Equal(t, expected, param.GetValues()) // Should not modify values +} + +func TestParam_AddUSmallint(t *testing.T) { + param := NewParam(2) // Initialize with size 2 + + // Add a usmallint value + param.AddUSmallint(42) + + expected := []driver.Value{taosTypes.TaosUSmallint(42), nil} + assert.Equal(t, expected, param.GetValues()) + + // Add another usmallint value + param.AddUSmallint(84) + + expected = []driver.Value{taosTypes.TaosUSmallint(42), taosTypes.TaosUSmallint(84)} + assert.Equal(t, expected, param.GetValues()) + + // Test when offset is out of range + param.AddUSmallint(126) // Attempt to add at index 2 with size 2 + assert.Equal(t, expected, param.GetValues()) // Should not modify values +} + +func TestParam_AddUInt(t *testing.T) { + param := NewParam(2) // Initialize with size 2 + + // Add a uint value + param.AddUInt(42) + + expected := []driver.Value{taosTypes.TaosUInt(42), nil} + assert.Equal(t, expected, param.GetValues()) + + // Add another uint value + param.AddUInt(84) + + expected = []driver.Value{taosTypes.TaosUInt(42), taosTypes.TaosUInt(84)} + assert.Equal(t, expected, param.GetValues()) + + // Test when offset is out of range + param.AddUInt(126) // Attempt to add at index 2 with size 2 + assert.Equal(t, expected, param.GetValues()) // Should not modify values +} + +func TestParam_AddUBigint(t *testing.T) { + param := NewParam(2) // Initialize with size 2 + + // Add a ubigint value + param.AddUBigint(42) + + expected := []driver.Value{taosTypes.TaosUBigint(42), nil} + assert.Equal(t, expected, param.GetValues()) + + // Add another ubigint value + param.AddUBigint(84) + + expected = []driver.Value{taosTypes.TaosUBigint(42), taosTypes.TaosUBigint(84)} + assert.Equal(t, expected, param.GetValues()) + + // Test when offset is out of range + param.AddUBigint(126) // Attempt to add at index 2 with size 2 + assert.Equal(t, expected, param.GetValues()) // Should not modify values +} + +func TestParam_AddFloat(t *testing.T) { + param := NewParam(2) // Initialize with size 2 + + // Add a float value + param.AddFloat(3.14) + + expected := []driver.Value{taosTypes.TaosFloat(3.14), nil} + assert.Equal(t, expected, param.GetValues()) + + // Add another float value + param.AddFloat(6.28) + + expected = []driver.Value{taosTypes.TaosFloat(3.14), taosTypes.TaosFloat(6.28)} + assert.Equal(t, expected, param.GetValues()) + + // Test when offset is out of range + param.AddFloat(9.42) // Attempt to add at index 2 with size 2 + assert.Equal(t, expected, param.GetValues()) // Should not modify values +} + +func TestParam_AddDouble(t *testing.T) { + param := NewParam(2) // Initialize with size 2 + + // Add a double value + param.AddDouble(3.14) + + expected := []driver.Value{taosTypes.TaosDouble(3.14), nil} + assert.Equal(t, expected, param.GetValues()) + + // Add another double value + param.AddDouble(6.28) + + expected = []driver.Value{taosTypes.TaosDouble(3.14), taosTypes.TaosDouble(6.28)} + assert.Equal(t, expected, param.GetValues()) + + // Test when offset is out of range + param.AddDouble(9.42) // Attempt to add at index 2 with size 2 + assert.Equal(t, expected, param.GetValues()) // Should not modify values +} + +func TestParam_AddBinary(t *testing.T) { + param := NewParam(2) // Initialize with size 2 + binaryData := []byte{0x01, 0x02, 0x03} + + // Add a binary value + param.AddBinary(binaryData) + + expected := []driver.Value{taosTypes.TaosBinary(binaryData), nil} + assert.Equal(t, expected, param.GetValues()) + + // Add another binary value + param.AddBinary([]byte{0x04, 0x05, 0x06}) + + expected = []driver.Value{taosTypes.TaosBinary(binaryData), taosTypes.TaosBinary([]byte{0x04, 0x05, 0x06})} + assert.Equal(t, expected, param.GetValues()) + + // Test when offset is out of range + param.AddBinary([]byte{0x07, 0x08, 0x09}) // Attempt to add at index 2 with size 2 + assert.Equal(t, expected, param.GetValues()) // Should not modify values +} + +func TestParam_AddVarBinary(t *testing.T) { + param := NewParam(2) // Initialize with size 2 + binaryData := []byte{0x01, 0x02, 0x03} + + // Add a varbinary value + param.AddVarBinary(binaryData) + + expected := []driver.Value{taosTypes.TaosVarBinary(binaryData), nil} + assert.Equal(t, expected, param.GetValues()) + + // Add another varbinary value + param.AddVarBinary([]byte{0x04, 0x05, 0x06}) + + expected = []driver.Value{taosTypes.TaosVarBinary(binaryData), taosTypes.TaosVarBinary([]byte{0x04, 0x05, 0x06})} + assert.Equal(t, expected, param.GetValues()) + + // Test when offset is out of range + param.AddVarBinary([]byte{0x07, 0x08, 0x09}) // Attempt to add at index 2 with size 2 + assert.Equal(t, expected, param.GetValues()) // Should not modify values +} + +func TestParam_AddNchar(t *testing.T) { + param := NewParam(2) // Initialize with size 2 + + // Add an nchar value + param.AddNchar("hello") + + expected := []driver.Value{taosTypes.TaosNchar("hello"), nil} + assert.Equal(t, expected, param.GetValues()) + + // Add another nchar value + param.AddNchar("world") + + expected = []driver.Value{taosTypes.TaosNchar("hello"), taosTypes.TaosNchar("world")} + assert.Equal(t, expected, param.GetValues()) + + // Test when offset is out of range + param.AddNchar("test") // Attempt to add at index 2 with size 2 + assert.Equal(t, expected, param.GetValues()) // Should not modify values +} + +func TestParam_AddTimestamp(t *testing.T) { + timestamp := time.Date(2022, time.January, 1, 12, 0, 0, 0, time.UTC) + param := NewParam(2) // Initialize with size 2 + + // Add a timestamp value + param.AddTimestamp(timestamp, 6) + + expected := []driver.Value{taosTypes.TaosTimestamp{T: timestamp, Precision: 6}, nil} + assert.Equal(t, expected, param.GetValues()) + + // Add another timestamp value + param.AddTimestamp(timestamp.Add(time.Hour), 9) + + expected = []driver.Value{ + taosTypes.TaosTimestamp{T: timestamp, Precision: 6}, + taosTypes.TaosTimestamp{T: timestamp.Add(time.Hour), Precision: 9}, + } + assert.Equal(t, expected, param.GetValues()) + + // Test when offset is out of range + param.AddTimestamp(timestamp.Add(2*time.Hour), 6) // Attempt to add at index 2 with size 2 + assert.Equal(t, expected, param.GetValues()) // Should not modify values +} + +func TestParam_AddJson(t *testing.T) { + jsonData := []byte(`{"key": "value"}`) + param := NewParam(2) // Initialize with size 2 + + // Add a JSON value + param.AddJson(jsonData) + + expected := []driver.Value{taosTypes.TaosJson(jsonData), nil} + assert.Equal(t, expected, param.GetValues()) + + // Add another JSON value + param.AddJson([]byte(`{"key2": "value2"}`)) + + expected = []driver.Value{ + taosTypes.TaosJson(jsonData), + taosTypes.TaosJson([]byte(`{"key2": "value2"}`)), + } + assert.Equal(t, expected, param.GetValues()) + + // Test when offset is out of range + param.AddJson([]byte(`{"key3": "value3"}`)) // Attempt to add at index 2 with size 2 + assert.Equal(t, expected, param.GetValues()) // Should not modify values +} + +func TestParam_AddGeometry(t *testing.T) { + geometryData := []byte{0x01, 0x02, 0x03} + param := NewParam(2) // Initialize with size 2 + + // Add a geometry value + param.AddGeometry(geometryData) + + expected := []driver.Value{taosTypes.TaosGeometry(geometryData), nil} + assert.Equal(t, expected, param.GetValues()) + + // Add another geometry value + param.AddGeometry([]byte{0x04, 0x05, 0x06}) + + expected = []driver.Value{ + taosTypes.TaosGeometry(geometryData), + taosTypes.TaosGeometry([]byte{0x04, 0x05, 0x06}), + } + assert.Equal(t, expected, param.GetValues()) + + // Test when offset is out of range + param.AddGeometry([]byte{0x07, 0x08, 0x09}) // Attempt to add at index 2 with size 2 + assert.Equal(t, expected, param.GetValues()) // Should not modify values +} + +func TestParam_AddValue(t *testing.T) { + param := NewParam(2) // Initialize with size 2 + + // Add a binary value + binaryData := []byte{0x01, 0x02, 0x03} + param.AddValue(taosTypes.TaosBinary(binaryData)) + + expected := []driver.Value{taosTypes.TaosBinary(binaryData), nil} + assert.Equal(t, expected, param.GetValues()) + + // Add a varchar value + param.AddValue(taosTypes.TaosVarBinary("hello")) + + expected = []driver.Value{taosTypes.TaosBinary(binaryData), taosTypes.TaosVarBinary("hello")} + assert.Equal(t, expected, param.GetValues()) + + // Test when offset is out of range + param.AddValue(taosTypes.TaosVarBinary("world")) + assert.Equal(t, expected, param.GetValues()) // Should not modify values +} + +func TestNewParamsWithRowValue(t *testing.T) { + rowValues := []driver.Value{taosTypes.TaosBool(true), taosTypes.TaosInt(42), taosTypes.TaosNchar("hello")} + + params := NewParamsWithRowValue(rowValues) + + expected := []*Param{ + { + size: 1, + value: []driver.Value{taosTypes.TaosBool(true)}, + offset: 1, + }, + { + size: 1, + value: []driver.Value{taosTypes.TaosInt(42)}, + offset: 1, + }, + { + size: 1, + value: []driver.Value{taosTypes.TaosNchar("hello")}, + offset: 1, + }, + } + + for i, param := range params { + assert.Equal(t, expected[i].size, param.size) + assert.Equal(t, expected[i].value, param.value) + assert.Equal(t, expected[i].offset, param.offset) + } +} diff --git a/common/pointer/unsafe_test.go b/common/pointer/unsafe_test.go new file mode 100644 index 0000000..05e0e8b --- /dev/null +++ b/common/pointer/unsafe_test.go @@ -0,0 +1,18 @@ +package pointer + +import ( + "testing" + "unsafe" + + "github.com/stretchr/testify/assert" +) + +func TestAddUintptr(t *testing.T) { + data := []byte{1, 2, 3, 4, 5} + p1 := unsafe.Pointer(&data[0]) + p2 := AddUintptr(p1, 1) + assert.Equal(t, unsafe.Pointer(&data[1]), p2) + v2 := *(*byte)(p2) + assert.Equal(t, byte(2), v2) + +} diff --git a/common/stmt/field_test.go b/common/stmt/field_test.go new file mode 100644 index 0000000..d50678b --- /dev/null +++ b/common/stmt/field_test.go @@ -0,0 +1,143 @@ +package stmt + +import ( + "testing" + + "github.com/taosdata/driver-go/v3/common" + "github.com/taosdata/driver-go/v3/types" +) + +func TestGetType(t *testing.T) { + tests := []struct { + name string + fieldType int8 + want *types.ColumnType + wantErr bool + }{ + { + name: "Test Bool Type", + fieldType: common.TSDB_DATA_TYPE_BOOL, + want: &types.ColumnType{Type: types.TaosBoolType}, + wantErr: false, + }, + { + name: "Test TinyInt Type", + fieldType: common.TSDB_DATA_TYPE_TINYINT, + want: &types.ColumnType{Type: types.TaosTinyintType}, + wantErr: false, + }, + { + name: "Test SmallInt Type", + fieldType: common.TSDB_DATA_TYPE_SMALLINT, + want: &types.ColumnType{Type: types.TaosSmallintType}, + wantErr: false, + }, + { + name: "Test Int Type", + fieldType: common.TSDB_DATA_TYPE_INT, + want: &types.ColumnType{Type: types.TaosIntType}, + wantErr: false, + }, + { + name: "Test BigInt Type", + fieldType: common.TSDB_DATA_TYPE_BIGINT, + want: &types.ColumnType{Type: types.TaosBigintType}, + wantErr: false, + }, + { + name: "Test UTinyInt Type", + fieldType: common.TSDB_DATA_TYPE_UTINYINT, + want: &types.ColumnType{Type: types.TaosUTinyintType}, + wantErr: false, + }, + { + name: "Test USmallInt Type", + fieldType: common.TSDB_DATA_TYPE_USMALLINT, + want: &types.ColumnType{Type: types.TaosUSmallintType}, + wantErr: false, + }, + { + name: "Test UInt Type", + fieldType: common.TSDB_DATA_TYPE_UINT, + want: &types.ColumnType{Type: types.TaosUIntType}, + wantErr: false, + }, + { + name: "Test UBigInt Type", + fieldType: common.TSDB_DATA_TYPE_UBIGINT, + want: &types.ColumnType{Type: types.TaosUBigintType}, + wantErr: false, + }, + { + name: "Test Float Type", + fieldType: common.TSDB_DATA_TYPE_FLOAT, + want: &types.ColumnType{Type: types.TaosFloatType}, + wantErr: false, + }, + { + name: "Test Double Type", + fieldType: common.TSDB_DATA_TYPE_DOUBLE, + want: &types.ColumnType{Type: types.TaosDoubleType}, + wantErr: false, + }, + { + name: "Test Binary Type", + fieldType: common.TSDB_DATA_TYPE_BINARY, + want: &types.ColumnType{Type: types.TaosBinaryType}, + wantErr: false, + }, + { + name: "Test VarBinary Type", + fieldType: common.TSDB_DATA_TYPE_VARBINARY, + want: &types.ColumnType{Type: types.TaosVarBinaryType}, + wantErr: false, + }, + { + name: "Test Nchar Type", + fieldType: common.TSDB_DATA_TYPE_NCHAR, + want: &types.ColumnType{Type: types.TaosNcharType}, + wantErr: false, + }, + { + name: "Test Timestamp Type", + fieldType: common.TSDB_DATA_TYPE_TIMESTAMP, + want: &types.ColumnType{Type: types.TaosTimestampType}, + wantErr: false, + }, + { + name: "Test Json Type", + fieldType: common.TSDB_DATA_TYPE_JSON, + want: &types.ColumnType{Type: types.TaosJsonType}, + wantErr: false, + }, + { + name: "Test Geometry Type", + fieldType: common.TSDB_DATA_TYPE_GEOMETRY, + want: &types.ColumnType{Type: types.TaosGeometryType}, + wantErr: false, + }, + { + name: "Test Unsupported Type", + fieldType: 0, // An undefined type + want: nil, + wantErr: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + s := &StmtField{ + FieldType: tt.fieldType, + } + + got, err := s.GetType() + if (err != nil) != tt.wantErr { + t.Errorf("GetType() error = %v, wantErr %v", err, tt.wantErr) + return + } + if got != nil && tt.want != nil && got.Type != tt.want.Type { + t.Errorf("GetType() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/common/tmq/config_test.go b/common/tmq/config_test.go new file mode 100644 index 0000000..c5a62d5 --- /dev/null +++ b/common/tmq/config_test.go @@ -0,0 +1,52 @@ +package tmq + +import ( + "fmt" + "reflect" + "testing" +) + +func TestConfigMap_Get(t *testing.T) { + t.Parallel() + + config := ConfigMap{ + "key1": "value1", + "key2": 123, + } + + t.Run("Existing Key", func(t *testing.T) { + want := "value1" + if got, err := config.Get("key1", nil); err != nil || got != want { + t.Errorf("Get() = %v, want %v (error: %v)", got, want, err) + } + }) + + t.Run("Type Mismatch", func(t *testing.T) { + wantErr := fmt.Errorf("key2 expects type string, not int") + if got, err := config.Get("key2", "default"); err == nil || got != nil || err.Error() != wantErr.Error() { + t.Errorf("Get() = %v, want error: %v", got, wantErr) + } + }) + + t.Run("Non-Existing Key with Default Value", func(t *testing.T) { + want := "default" + if got, err := config.Get("key3", "default"); err != nil || got != want { + t.Errorf("Get() = %v, want %v (error: %v)", got, want, err) + } + }) +} + +func TestConfigMap_Clone(t *testing.T) { + t.Parallel() + + config := ConfigMap{ + "key1": "value1", + "key2": 123, + } + + clone := config.Clone() + + if !reflect.DeepEqual(config, clone) { + t.Errorf("Clone() = %v, want %v", clone, config) + } +} diff --git a/common/tmq/event_test.go b/common/tmq/event_test.go new file mode 100644 index 0000000..5043a28 --- /dev/null +++ b/common/tmq/event_test.go @@ -0,0 +1,352 @@ +package tmq + +import ( + "database/sql/driver" + "fmt" + "testing" + + "github.com/stretchr/testify/assert" + taosError "github.com/taosdata/driver-go/v3/errors" +) + +func TestDataMessage_String(t *testing.T) { + t.Parallel() + + data := []*Data{ + {TableName: "table1", Data: [][]driver.Value{{1, "data1"}}}, + {TableName: "table2", Data: [][]driver.Value{{2, "data2"}}}, + } + message := &DataMessage{ + TopicPartition: TopicPartition{ + Topic: stringPtr("test-topic"), + Partition: 0, + Offset: 100, + }, + dbName: "test-db", + topic: "test-topic", + data: data, + offset: 100, + } + + want := `DataMessage: test-topic[test-db]:[{"TableName":"table1","Data":[[1,"data1"]]},{"TableName":"table2","Data":[[2,"data2"]]}]` + + if got := message.String(); got != want { + t.Errorf("DataMessage.String() = %v, want %v", got, want) + } +} + +func TestMetaMessage_String(t *testing.T) { + t.Parallel() + + meta := &Meta{ + Type: "type", + TableName: "table", + TableType: "tableType", + } + message := &MetaMessage{ + TopicPartition: TopicPartition{ + Topic: stringPtr("test-topic"), + Partition: 0, + Offset: 100, + }, + dbName: "test-db", + topic: "test-topic", + offset: 100, + meta: meta, + } + + want := `MetaMessage: test-topic[test-db]:{"type":"type","tableName":"table","tableType":"tableType","createList":null,"columns":null,"using":"","tagNum":0,"tags":null,"tableNameList":null,"alterType":0,"colName":"","colNewName":"","colType":0,"colLength":0,"colValue":"","colValueNull":false}` + + if got := message.String(); got != want { + t.Errorf("MetaMessage.String() = %v, want %v", got, want) + } +} + +func TestMetaDataMessage_String(t *testing.T) { + t.Parallel() + + meta := &Meta{ + Type: "type", + TableName: "table", + TableType: "tableType", + } + data := []*Data{ + {TableName: "table1", Data: [][]driver.Value{{1, "data1"}}}, + {TableName: "table2", Data: [][]driver.Value{{2, "data2"}}}, + } + metaData := &MetaData{ + Meta: meta, + Data: data, + } + message := &MetaDataMessage{ + TopicPartition: TopicPartition{ + Topic: stringPtr("test-topic"), + Partition: 0, + Offset: 100, + }, + dbName: "test-db", + topic: "test-topic", + offset: 100, + metaData: metaData, + } + + want := `MetaDataMessage: test-topic[test-db]:{"Meta":{"type":"type","tableName":"table","tableType":"tableType","createList":null,"columns":null,"using":"","tagNum":0,"tags":null,"tableNameList":null,"alterType":0,"colName":"","colNewName":"","colType":0,"colLength":0,"colValue":"","colValueNull":false},"Data":[{"TableName":"table1","Data":[[1,"data1"]]},{"TableName":"table2","Data":[[2,"data2"]]}]}` + if got := message.String(); got != want { + t.Errorf("MetaDataMessage.String() = %v, want %v", got, want) + } +} + +func TestNewTMQError(t *testing.T) { + t.Parallel() + + code := 123 + str := "test error" + err := NewTMQError(code, str) + + if err.code != code { + t.Errorf("NewTMQError() code = %v, want %v", err.code, code) + } + + if err.str != str { + t.Errorf("NewTMQError() str = %v, want %v", err.str, str) + } +} + +func TestNewTMQErrorWithErr(t *testing.T) { + t.Parallel() + + testCases := []struct { + name string + err error + code int + str string + }{ + { + name: "TaosError", + err: &taosError.TaosError{ + Code: 456, + ErrStr: "taos error", + }, + code: 456, + str: "taos error", + }, + { + name: "OtherError", + err: fmt.Errorf("other error"), + code: ErrorOther, + str: "other error", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + err := NewTMQErrorWithErr(tc.err) + + if err.code != tc.code { + t.Errorf("NewTMQErrorWithErr() code = %v, want %v", err.code, tc.code) + } + + if err.str != tc.str { + t.Errorf("NewTMQErrorWithErr() str = %v, want %v", err.str, tc.str) + } + }) + } +} + +func TestError_String(t *testing.T) { + t.Parallel() + + code := 789 + str := "test error" + err := Error{code: code, str: str} + want := fmt.Sprintf("[0x%x] %s", code, str) + + if got := err.String(); got != want { + t.Errorf("Error.String() = %v, want %v", got, want) + } +} + +func TestError_Error(t *testing.T) { + t.Parallel() + + code := 789 + str := "test error" + err := Error{code: code, str: str} + want := fmt.Sprintf("[0x%x] %s", code, str) + + if got := err.Error(); got != want { + t.Errorf("Error.Error() = %v, want %v", got, want) + } +} + +func TestError_Code(t *testing.T) { + t.Parallel() + + code := 789 + err := Error{code: code} + + if got := err.Code(); got != code { + t.Errorf("Error.Code() = %v, want %v", got, code) + } +} + +func TestMetaMessage_Offset(t *testing.T) { + t.Parallel() + + message := &MetaMessage{ + offset: 100, + } + + want := Offset(100) + if got := message.Offset(); got != want { + t.Errorf("Offset() = %v, want %v", got, want) + } +} + +func TestMetaMessage_SetDbName(t *testing.T) { + t.Parallel() + + message := &MetaMessage{} + message.SetDbName("test-db") + + want := "test-db" + if got := message.DBName(); got != want { + t.Errorf("DBName() = %v, want %v", got, want) + } +} + +func TestMetaMessage_SetTopic(t *testing.T) { + t.Parallel() + + message := &MetaMessage{} + message.SetTopic("test-topic") + + want := "test-topic" + if got := message.Topic(); got != want { + t.Errorf("Topic() = %v, want %v", got, want) + } +} + +func TestMetaMessage_SetOffset(t *testing.T) { + t.Parallel() + + message := &MetaMessage{} + message.SetOffset(200) + + want := Offset(200) + if got := message.Offset(); got != want { + t.Errorf("Offset() = %v, want %v", got, want) + } +} + +func TestMetaMessage_SetMeta(t *testing.T) { + t.Parallel() + + meta := &Meta{} + message := &MetaMessage{} + message.SetMeta(meta) + + want := meta + if got := message.Value(); got != want { + t.Errorf("Value() = %v, want %v", got, want) + } +} + +func TestDataMessage_SetDbName(t *testing.T) { + t.Parallel() + + message := &DataMessage{} + message.SetDbName("test-db") + + want := "test-db" + if got := message.DBName(); got != want { + t.Errorf("DBName() = %v, want %v", got, want) + } +} + +func TestDataMessage_SetTopic(t *testing.T) { + t.Parallel() + + message := &DataMessage{} + message.SetTopic("test-topic") + + want := "test-topic" + if got := message.Topic(); got != want { + t.Errorf("Topic() = %v, want %v", got, want) + } +} + +func TestDataMessage_SetData(t *testing.T) { + t.Parallel() + + data := []*Data{ + {TableName: "table1", Data: [][]driver.Value{{1, "data1"}}}, + {TableName: "table2", Data: [][]driver.Value{{2, "data2"}}}, + } + message := &DataMessage{} + message.SetData(data) + + want := data + assert.Equal(t, want, message.Value()) +} + +func TestDataMessage_SetOffset(t *testing.T) { + t.Parallel() + + message := &DataMessage{} + message.SetOffset(200) + + want := Offset(200) + if got := message.Offset(); got != want { + t.Errorf("Offset() = %v, want %v", got, want) + } +} + +func TestMetaDataMessage_SetDbName(t *testing.T) { + t.Parallel() + + message := &MetaDataMessage{} + message.SetDbName("test-db") + + want := "test-db" + if got := message.DBName(); got != want { + t.Errorf("DBName() = %v, want %v", got, want) + } +} + +func TestMetaDataMessage_SetTopic(t *testing.T) { + t.Parallel() + + message := &MetaDataMessage{} + message.SetTopic("test-topic") + + want := "test-topic" + if got := message.Topic(); got != want { + t.Errorf("Topic() = %v, want %v", got, want) + } +} + +func TestMetaDataMessage_SetOffset(t *testing.T) { + t.Parallel() + + message := &MetaDataMessage{} + message.SetOffset(200) + + want := Offset(200) + if got := message.Offset(); got != want { + t.Errorf("Offset() = %v, want %v", got, want) + } +} + +func TestMetaDataMessage_SetMetaData(t *testing.T) { + t.Parallel() + + metaData := &MetaData{} + message := &MetaDataMessage{} + message.SetMetaData(metaData) + + want := metaData + if got := message.Value(); got != want { + t.Errorf("Value() = %v, want %v", got, want) + } +} diff --git a/common/tmq/tmq_test.go b/common/tmq/tmq_test.go index 7279c1d..ae9fefb 100644 --- a/common/tmq/tmq_test.go +++ b/common/tmq/tmq_test.go @@ -2,6 +2,8 @@ package tmq import ( "encoding/json" + "errors" + "reflect" "testing" ) @@ -66,3 +68,130 @@ func TestDropJson(t *testing.T) { } t.Log(obj) } + +func TestOffset_String(t *testing.T) { + tests := []struct { + name string + o Offset + want string + }{ + { + name: "Valid Offset", + o: 100, + want: "100", + }, + { + name: "Invalid Offset", + o: OffsetInvalid, + want: "unset", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := tt.o.String(); got != tt.want { + t.Errorf("Offset.String() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestOffset_Valid(t *testing.T) { + tests := []struct { + name string + o Offset + want bool + }{ + { + name: "Valid Offset", + o: 100, + want: true, + }, + { + name: "Invalid Offset", + o: OffsetInvalid, + want: true, + }, + { + name: "Negative Offset", + o: -100, + want: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := tt.o.Valid(); got != tt.want { + t.Errorf("Offset.Valid() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestTopicPartition_String(t *testing.T) { + tests := []struct { + name string + tp TopicPartition + want string + }{ + { + name: "With Error", + tp: TopicPartition{ + Topic: stringPtr("test-topic"), + Partition: 0, + Offset: 100, + Error: errors.New("error message"), + }, + want: "test-topic[0]@100(error message)", + }, + { + name: "Without Error", + tp: TopicPartition{ + Topic: stringPtr("test-topic"), + Partition: 0, + Offset: 100, + }, + want: "test-topic[0]@100", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := tt.tp.String(); got != tt.want { + t.Errorf("TopicPartition.String() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestAssignment_MarshalJSON(t *testing.T) { + tests := []struct { + name string + a Assignment + want string + }{ + { + name: "Marshal Assignment", + a: Assignment{ + VGroupID: 1, + Offset: 100, + Begin: 50, + End: 150, + }, + want: `{"vgroup_id":1,"offset":100,"begin":50,"end":150}`, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := json.Marshal(tt.a) + if err != nil { + t.Errorf("MarshalJSON error: %v", err) + return + } + if !reflect.DeepEqual(string(got), tt.want) { + t.Errorf("MarshalJSON = %v, want %v", string(got), tt.want) + } + }) + } +} + +func stringPtr(s string) *string { + return &s +} diff --git a/errors/errors_test.go b/errors/errors_test.go index e29f9ae..0064c90 100644 --- a/errors/errors_test.go +++ b/errors/errors_test.go @@ -1,6 +1,10 @@ package errors -import "testing" +import ( + "testing" + + "github.com/stretchr/testify/assert" +) // @author: xftan // @date: 2023/10/13 11:20 @@ -32,3 +36,13 @@ func TestNewError(t *testing.T) { }) } } + +func TestError(t *testing.T) { + invalidError := ErrTscInvalidConnection.Error() + assert.Equal(t, "[0x20b] Invalid connection", invalidError) + unknownError := &TaosError{ + Code: 0xffff, + ErrStr: "unknown error", + } + assert.Equal(t, "unknown error", unknownError.Error()) +} diff --git a/ws/client/conn_test.go b/ws/client/conn_test.go new file mode 100644 index 0000000..c7f9a7c --- /dev/null +++ b/ws/client/conn_test.go @@ -0,0 +1,98 @@ +package client + +import ( + "bytes" + "net/http" + "net/http/httptest" + "strings" + "testing" + "time" + + "github.com/gorilla/websocket" + "github.com/stretchr/testify/assert" +) + +func TestEnvelopePool(t *testing.T) { + pool := &EnvelopePool{} + + // Test Get method + env := pool.Get() + assert.NotNil(t, env) + assert.NotNil(t, env.Msg) + + // Test Put method + env.Msg.WriteString("test") + pool.Put(env) + + // Test if the envelope is reset after put + env = pool.Get() + assert.Equal(t, 0, env.Msg.Len()) +} + +func TestEnvelope_Reset(t *testing.T) { + env := &Envelope{ + Type: 1, + Msg: bytes.NewBufferString("test"), + } + + env.Reset() + + assert.Equal(t, 0, env.Msg.Len()) +} + +var upgrader = websocket.Upgrader{ + ReadBufferSize: 1024, + WriteBufferSize: 1024, +} + +func wsEchoServer(w http.ResponseWriter, r *http.Request) { + conn, err := upgrader.Upgrade(w, r, nil) + if err != nil { + return + } + defer conn.Close() + + for { + messageType, message, err := conn.ReadMessage() + if err != nil { + return + } + + if err := conn.WriteMessage(messageType, message); err != nil { + return + } + } +} + +func TestClient(t *testing.T) { + s := httptest.NewServer(http.HandlerFunc(wsEchoServer)) + defer s.Close() + t.Log(s.URL) + ep := "ws" + strings.TrimPrefix(s.URL, "http") + ws, _, err := websocket.DefaultDialer.Dial(ep, nil) + assert.NoError(t, err) + c := NewClient(ws, 1) + gotMessage := make(chan struct{}) + c.TextMessageHandler = func(message []byte) { + assert.Equal(t, "test", string(message)) + gotMessage <- struct{}{} + } + running := c.IsRunning() + assert.True(t, running) + defer c.Close() + go c.ReadPump() + go c.WritePump() + env := c.GetEnvelope() + env.Type = websocket.TextMessage + env.Msg.WriteString("test") + c.Send(env) + env = c.GetEnvelope() + c.PutEnvelope(env) + timeout := time.NewTimer(time.Second * 3) + select { + case <-gotMessage: + t.Log("got message") + case <-timeout.C: + t.Error("timeout") + } +} diff --git a/ws/schemaless/schemaless_test.go b/ws/schemaless/schemaless_test.go index d3754e2..83cf47d 100644 --- a/ws/schemaless/schemaless_test.go +++ b/ws/schemaless/schemaless_test.go @@ -70,13 +70,13 @@ func TestSchemaless_Insert(t *testing.T) { SetPassword("taosdata"), SetEnableCompression(true), SetErrorHandler(func(err error) { - t.Fatal(err) + t.Log(err) }), )) if err != nil { t.Fatal(err) } - //defer s.Close() + defer s.Close() for _, c := range cases { t.Run(c.name, func(t *testing.T) { diff --git a/ws/tmq/config.go b/ws/tmq/config.go index 36ec47e..99c96a3 100644 --- a/ws/tmq/config.go +++ b/ws/tmq/config.go @@ -2,10 +2,7 @@ package tmq import ( "errors" - "fmt" "time" - - "github.com/taosdata/driver-go/v3/common/tmq" ) type config struct { @@ -32,119 +29,58 @@ func newConfig(url string, chanLength uint) *config { } } -func (c *config) setConnectUser(user tmq.ConfigValue) error { - var ok bool - c.User, ok = user.(string) - if !ok { - return fmt.Errorf("td.connect.user requires string got %T", user) - } - return nil +func (c *config) setConnectUser(user string) { + c.User = user } -func (c *config) setConnectPass(pass tmq.ConfigValue) error { - var ok bool - c.Password, ok = pass.(string) - if !ok { - return fmt.Errorf("td.connect.pass requires string got %T", pass) - } - return nil +func (c *config) setConnectPass(pass string) { + c.Password = pass } -func (c *config) setGroupID(groupID tmq.ConfigValue) error { - var ok bool - c.GroupID, ok = groupID.(string) - if !ok { - return fmt.Errorf("group.id requires string got %T", groupID) - } - return nil +func (c *config) setGroupID(groupID string) { + c.GroupID = groupID } -func (c *config) setClientID(clientID tmq.ConfigValue) error { - var ok bool - c.ClientID, ok = clientID.(string) - if !ok { - return fmt.Errorf("client.id requires string got %T", clientID) - } - return nil +func (c *config) setClientID(clientID string) { + c.ClientID = clientID } -func (c *config) setAutoOffsetReset(offsetReset tmq.ConfigValue) error { - var ok bool - c.OffsetRest, ok = offsetReset.(string) - if !ok { - return fmt.Errorf("auto.offset.reset requires string got %T", offsetReset) - } - return nil +func (c *config) setAutoOffsetReset(offsetReset string) { + c.OffsetRest = offsetReset } -func (c *config) setMessageTimeout(timeout tmq.ConfigValue) error { - var ok bool - c.MessageTimeout, ok = timeout.(time.Duration) - if !ok { - return fmt.Errorf("ws.message.timeout requires time.Duration got %T", timeout) - } - if c.MessageTimeout < time.Second { +func (c *config) setMessageTimeout(timeout time.Duration) error { + if timeout < time.Second { return errors.New("ws.message.timeout cannot be less than 1 second") } + c.MessageTimeout = timeout return nil } -func (c *config) setWriteWait(writeWait tmq.ConfigValue) error { - var ok bool - c.WriteWait, ok = writeWait.(time.Duration) - if !ok { - return fmt.Errorf("ws.message.writeWait requires time.Duration got %T", writeWait) - } - if c.WriteWait < time.Second { +func (c *config) setWriteWait(writeWait time.Duration) error { + if writeWait < time.Second { return errors.New("ws.message.writeWait cannot be less than 1 second") } - if c.WriteWait < 0 { - return errors.New("ws.message.writeWait cannot be less than 0") - } + c.WriteWait = writeWait return nil } -func (c *config) setAutoCommit(enable tmq.ConfigValue) error { - var ok bool - c.AutoCommit, ok = enable.(string) - if !ok { - return fmt.Errorf("enable.auto.commit requires string got %T", enable) - } - return nil +func (c *config) setAutoCommit(enable string) { + c.AutoCommit = enable } -func (c *config) setAutoCommitIntervalMS(autoCommitIntervalMS tmq.ConfigValue) error { - var ok bool - c.AutoCommitIntervalMS, ok = autoCommitIntervalMS.(string) - if !ok { - return fmt.Errorf("auto.commit.interval.ms requires string got %T", autoCommitIntervalMS) - } - return nil +func (c *config) setAutoCommitIntervalMS(autoCommitIntervalMS string) { + c.AutoCommitIntervalMS = autoCommitIntervalMS } -func (c *config) setSnapshotEnable(enableSnapshot tmq.ConfigValue) error { - var ok bool - c.SnapshotEnable, ok = enableSnapshot.(string) - if !ok { - return fmt.Errorf("experimental.snapshot.enable requires string got %T", enableSnapshot) - } - return nil +func (c *config) setSnapshotEnable(enableSnapshot string) { + c.SnapshotEnable = enableSnapshot } -func (c *config) setWithTableName(withTableName tmq.ConfigValue) error { - var ok bool - c.WithTableName, ok = withTableName.(string) - if !ok { - return fmt.Errorf("msg.with.table.name requires string got %T", withTableName) - } - return nil +func (c *config) setWithTableName(withTableName string) { + c.WithTableName = withTableName } -func (c *config) setEnableCompression(enableCompression tmq.ConfigValue) error { - var ok bool - c.EnableCompression, ok = enableCompression.(bool) - if !ok { - return fmt.Errorf("ws.message.enableCompression requires bool got %T", enableCompression) - } - return nil +func (c *config) setEnableCompression(enableCompression bool) { + c.EnableCompression = enableCompression } diff --git a/ws/tmq/consumer.go b/ws/tmq/consumer.go index ec8e40e..b4f3e8b 100644 --- a/ws/tmq/consumer.go +++ b/ws/tmq/consumer.go @@ -192,46 +192,16 @@ func configMapToConfig(m *tmq.ConfigMap) (*config, error) { if err != nil { return nil, err } - err = config.setConnectUser(user) - if err != nil { - return nil, err - } - err = config.setConnectPass(pass) - if err != nil { - return nil, err - } - err = config.setGroupID(groupID) - if err != nil { - return nil, err - } - err = config.setClientID(clientID) - if err != nil { - return nil, err - } - err = config.setAutoOffsetReset(offsetReset) - if err != nil { - return nil, err - } - err = config.setAutoCommit(enableAutoCommit) - if err != nil { - return nil, err - } - err = config.setAutoCommitIntervalMS(autoCommitIntervalMS) - if err != nil { - return nil, err - } - err = config.setSnapshotEnable(enableSnapshot) - if err != nil { - return nil, err - } - err = config.setWithTableName(withTableName) - if err != nil { - return nil, err - } - err = config.setEnableCompression(enableCompression) - if err != nil { - return nil, err - } + config.setConnectUser(user.(string)) + config.setConnectPass(pass.(string)) + config.setGroupID(groupID.(string)) + config.setClientID(clientID.(string)) + config.setAutoOffsetReset(offsetReset.(string)) + config.setAutoCommit(enableAutoCommit.(string)) + config.setAutoCommitIntervalMS(autoCommitIntervalMS.(string)) + config.setSnapshotEnable(enableSnapshot.(string)) + config.setWithTableName(withTableName.(string)) + config.setEnableCompression(enableCompression.(bool)) return config, nil } diff --git a/ws/tmq/consumer_test.go b/ws/tmq/consumer_test.go index dc394e9..6407984 100644 --- a/ws/tmq/consumer_test.go +++ b/ws/tmq/consumer_test.go @@ -546,3 +546,276 @@ func TestMultiBlock(t *testing.T) { } } } + +func Test_configMapToConfigWrong(t *testing.T) { + type args struct { + m *tmq.ConfigMap + } + tests := []struct { + name string + args args + wantErr string + }{ + { + name: "url", + args: args{ + m: &tmq.ConfigMap{ + "ws.url": 123, + }, + }, + wantErr: "ws.url expects type string, not int", + }, + { + name: "empty url", + args: args{ + m: &tmq.ConfigMap{ + "ws.url": "", + }, + }, + wantErr: "ws.url required", + }, + { + name: "channelLen", + args: args{ + m: &tmq.ConfigMap{ + "ws.url": "ws://127.0.0.1:6041", + "ws.message.channelLen": "not a uint", + }, + }, + wantErr: "ws.message.channelLen expects type uint, not string", + }, + { + name: "ws.message.timeout", + args: args{ + m: &tmq.ConfigMap{ + "ws.url": "ws://127.0.0.1:6041", + "ws.message.timeout": "xx", + }, + }, + wantErr: "ws.message.timeout expects type time.Duration, not string", + }, + { + name: "ws.message.writeWait", + args: args{ + m: &tmq.ConfigMap{ + "ws.url": "ws://127.0.0.1:6041", + "ws.message.writeWait": "xx", + }, + }, + wantErr: "ws.message.writeWait expects type time.Duration, not string", + }, + { + name: "td.connect.user", + args: args{ + m: &tmq.ConfigMap{ + "ws.url": "ws://127.0.0.1:6041", + "td.connect.user": 123, + }, + }, + wantErr: "td.connect.user expects type string, not int", + }, + { + name: "td.connect.pass", + args: args{ + m: &tmq.ConfigMap{ + "ws.url": "ws://127.0.0.1:6041", + "td.connect.pass": 123, + }, + }, + wantErr: "td.connect.pass expects type string, not int", + }, + { + name: "group.id", + args: args{ + m: &tmq.ConfigMap{ + "ws.url": "ws://127.0.0.1:6041", + "group.id": 123, + }, + }, + wantErr: "group.id expects type string, not int", + }, + { + name: "client.id", + args: args{ + m: &tmq.ConfigMap{ + "ws.url": "ws://127.0.0.1:6041", + "client.id": 123, + }, + }, + wantErr: "client.id expects type string, not int", + }, + { + name: "auto.offset.reset", + args: args{ + m: &tmq.ConfigMap{ + "ws.url": "ws://127.0.0.1:6041", + "auto.offset.reset": 123, + }, + }, + wantErr: "auto.offset.reset expects type string, not int", + }, + { + name: "enable.auto.commit", + args: args{ + m: &tmq.ConfigMap{ + "ws.url": "ws://127.0.0.1:6041", + "enable.auto.commit": 123, + }, + }, + wantErr: "enable.auto.commit expects type string, not int", + }, + { + name: "auto.commit.interval.ms", + args: args{ + m: &tmq.ConfigMap{ + "ws.url": "ws://127.0.0.1:6041", + "auto.commit.interval.ms": 123, + }, + }, + wantErr: "auto.commit.interval.ms expects type string, not int", + }, + { + name: "experimental.snapshot.enable", + args: args{ + m: &tmq.ConfigMap{ + "ws.url": "ws://127.0.0.1:6041", + "experimental.snapshot.enable": 123, + }, + }, + wantErr: "experimental.snapshot.enable expects type string, not int", + }, + { + name: "msg.with.table.name", + args: args{ + m: &tmq.ConfigMap{ + "ws.url": "ws://127.0.0.1:6041", + "msg.with.table.name": 123, + }, + }, + wantErr: "msg.with.table.name expects type string, not int", + }, + { + name: "ws.message.enableCompression", + args: args{ + m: &tmq.ConfigMap{ + "ws.url": "ws://127.0.0.1:6041", + "ws.message.enableCompression": 123, + }, + }, + wantErr: "ws.message.enableCompression expects type bool, not int", + }, + { + name: "ws.message.timeout < 1s", + args: args{ + m: &tmq.ConfigMap{ + "ws.url": "ws://127.0.0.1:6041", + "ws.message.timeout": time.Millisecond, + }, + }, + wantErr: "ws.message.timeout cannot be less than 1 second", + }, + { + name: "ws.message.writeWait < 1s", + args: args{ + m: &tmq.ConfigMap{ + "ws.url": "ws://127.0.0.1:6041", + "ws.message.writeWait": time.Millisecond, + }, + }, + wantErr: "ws.message.writeWait cannot be less than 1 second", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := configMapToConfig(tt.args.m) + assert.Nil(t, got) + assert.Equal(t, tt.wantErr, err.Error()) + }) + } +} + +func prepareMetaEnv() error { + var err error + steps := []string{ + "drop topic if exists test_ws_tmq_meta_topic", + "drop database if exists test_ws_tmq_meta", + "create database test_ws_tmq_meta vgroups 1 WAL_RETENTION_PERIOD 86400", + "create topic test_ws_tmq_meta_topic with meta as database test_ws_tmq_meta", + } + for _, step := range steps { + err = doRequest(step) + if err != nil { + return err + } + } + return nil +} + +func cleanMetaEnv() error { + var err error + time.Sleep(2 * time.Second) + steps := []string{ + "drop topic if exists test_ws_tmq_meta_topic", + "drop database if exists test_ws_tmq_meta", + } + for _, step := range steps { + err = doRequest(step) + if err != nil { + return err + } + } + return nil +} + +func TestMeta(t *testing.T) { + err := prepareMetaEnv() + assert.NoError(t, err) + defer cleanMetaEnv() + consumer, err := NewConsumer(&tmq.ConfigMap{ + "ws.url": "ws://127.0.0.1:6041", + "ws.message.channelLen": uint(0), + "ws.message.timeout": common.DefaultMessageTimeout, + "ws.message.writeWait": common.DefaultWriteWait, + "td.connect.user": "root", + "td.connect.pass": "taosdata", + "group.id": "test", + "client.id": "test_consumer", + "auto.offset.reset": "earliest", + "enable.auto.commit": "true", + "auto.commit.interval.ms": "1000", + "msg.with.table.name": "true", + }) + err = consumer.Subscribe("test_ws_tmq_meta_topic", nil) + assert.NoError(t, err) + defer func() { + consumer.Unsubscribe() + consumer.Close() + }() + go func() { + doRequest("create table test_ws_tmq_meta.st(ts timestamp,v int) tags (cn binary(20))") + doRequest("create table test_ws_tmq_meta.t1 using test_ws_tmq_meta.st tags ('t1')") + doRequest("insert into test_ws_tmq_meta.t1 values (now,1)") + doRequest("insert into test_ws_tmq_meta.t2 using test_ws_tmq_meta.st tags ('t1') values (now,2)") + time.Sleep(time.Second) + doRequest("insert into test_ws_tmq_meta.t1 values (now,1)") + doRequest("insert into test_ws_tmq_meta.t1 values (now,1)") + }() + for i := 0; i < 10; i++ { + event := consumer.Poll(500) + if event == nil { + continue + } + switch e := event.(type) { + case *tmq.DataMessage: + t.Log(e) + assert.Equal(t, "test_ws_tmq_meta", e.DBName()) + case *tmq.MetaDataMessage: + assert.Equal(t, "test_ws_tmq_meta", e.DBName()) + assert.Equal(t, "test_ws_tmq_meta_topic", e.Topic()) + t.Log(e) + case *tmq.MetaMessage: + assert.Equal(t, "test_ws_tmq_meta", e.DBName()) + t.Log(e) + } + } +}