diff --git a/.github/workflows/go.yml b/.github/workflows/go.yml index 7d6ed0f..609505c 100644 --- a/.github/workflows/go.yml +++ b/.github/workflows/go.yml @@ -77,7 +77,7 @@ jobs: cd TDengine mkdir debug cd debug - cmake .. -DBUILD_JDBC=false -DBUILD_TEST=off -DBUILD_HTTP=false -DVERNUMBER=3.9.9.9 -DCMAKE_C_COMPILER_LAUNCHER=sccache -DCMAKE_CXX_COMPILER_LAUNCHER=sccache + cmake .. -DBUILD_TEST=off -DBUILD_HTTP=false -DVERNUMBER=3.9.9.9 -DCMAKE_C_COMPILER_LAUNCHER=sccache -DCMAKE_CXX_COMPILER_LAUNCHER=sccache make -j 4 - name: package diff --git a/af/tmq/consumer.go b/af/tmq/consumer.go index b0854e7..ef2338e 100644 --- a/af/tmq/consumer.go +++ b/af/tmq/consumer.go @@ -95,6 +95,7 @@ func (c *Consumer) Poll(timeoutMs int) tmq.Event { db := wrapper.TMQGetDBName(message) resultType := wrapper.TMQGetResType(message) offset := tmq.Offset(wrapper.TMQGetVgroupOffset(message)) + vgID := wrapper.TMQGetVgroupID(message) switch resultType { case common.TMQ_RES_DATA: result := &tmq.DataMessage{} @@ -106,6 +107,11 @@ func (c *Consumer) Poll(timeoutMs int) tmq.Event { } result.SetData(data) result.SetOffset(offset) + result.TopicPartition = tmq.TopicPartition{ + Topic: &topic, + Partition: vgID, + Offset: offset, + } wrapper.TaosFreeResult(message) return result case common.TMQ_RES_TABLE_META: @@ -118,6 +124,11 @@ func (c *Consumer) Poll(timeoutMs int) tmq.Event { } result.SetMeta(meta) result.SetOffset(offset) + result.TopicPartition = tmq.TopicPartition{ + Topic: &topic, + Partition: vgID, + Offset: offset, + } wrapper.TaosFreeResult(message) return result case common.TMQ_RES_METADATA: @@ -137,6 +148,11 @@ func (c *Consumer) Poll(timeoutMs int) tmq.Event { Meta: meta, Data: data, }) + result.TopicPartition = tmq.TopicPartition{ + Topic: &topic, + Partition: vgID, + Offset: offset, + } wrapper.TaosFreeResult(message) return result default: @@ -187,7 +203,16 @@ func (c *Consumer) getData(message unsafe.Pointer) ([]*tmq.Data, error) { } func (c *Consumer) Commit() ([]tmq.TopicPartition, error) { - return c.doCommit(nil) + errCode := wrapper.TMQCommitSync(c.cConsumer, nil) + if errCode != taosError.SUCCESS { + errStr := wrapper.TMQErr2Str(errCode) + return nil, taosError.NewError(int(errCode), errStr) + } + partitions, err := c.Assignment() + if err != nil { + return nil, err + } + return c.Committed(partitions, 0) } func (c *Consumer) doCommit(message unsafe.Pointer) ([]tmq.TopicPartition, error) { @@ -235,6 +260,50 @@ func (c *Consumer) Seek(partition tmq.TopicPartition, ignoredTimeoutMs int) erro return nil } +func (c *Consumer) Committed(partitions []tmq.TopicPartition, timeoutMs int) (offsets []tmq.TopicPartition, err error) { + offsets = make([]tmq.TopicPartition, len(partitions)) + for i := 0; i < len(partitions); i++ { + cOffset := wrapper.TMQCommitted(c.cConsumer, *partitions[i].Topic, partitions[i].Partition) + offset := tmq.Offset(cOffset) + if !offset.Valid() { + return nil, taosError.NewError(int(offset), wrapper.TMQErr2Str(int32(offset))) + } + offsets[i] = tmq.TopicPartition{ + Topic: partitions[i].Topic, + Partition: partitions[i].Partition, + Offset: offset, + } + } + return +} + +func (c *Consumer) CommitOffsets(offsets []tmq.TopicPartition) ([]tmq.TopicPartition, error) { + for i := 0; i < len(offsets); i++ { + errCode := wrapper.TMQCommitOffsetSync(c.cConsumer, *offsets[i].Topic, offsets[i].Partition, int64(offsets[i].Offset)) + if errCode != taosError.SUCCESS { + errStr := wrapper.TMQErr2Str(errCode) + return nil, taosError.NewError(int(errCode), errStr) + } + } + return c.Committed(offsets, 0) +} + +func (c *Consumer) Position(partitions []tmq.TopicPartition) (offsets []tmq.TopicPartition, err error) { + offsets = make([]tmq.TopicPartition, len(partitions)) + for i := 0; i < len(partitions); i++ { + position := wrapper.TMQPosition(c.cConsumer, *partitions[i].Topic, partitions[i].Partition) + if position < 0 { + return nil, taosError.NewError(int(position), wrapper.TMQErr2Str(int32(position))) + } + offsets[i] = tmq.TopicPartition{ + Topic: partitions[i].Topic, + Partition: partitions[i].Partition, + Offset: tmq.Offset(position), + } + } + return +} + // Close release consumer func (c *Consumer) Close() error { errCode := wrapper.TMQConsumerClose(c.cConsumer) diff --git a/af/tmq/consumer_test.go b/af/tmq/consumer_test.go index db5851c..4dd024c 100644 --- a/af/tmq/consumer_test.go +++ b/af/tmq/consumer_test.go @@ -23,6 +23,7 @@ func TestTmq(t *testing.T) { return } sqls := []string{ + "drop topic if exists test_tmq_common", "drop database if exists af_test_tmq", "create database if not exists af_test_tmq vgroups 2 WAL_RETENTION_PERIOD 86400", "use af_test_tmq", @@ -43,8 +44,8 @@ func TestTmq(t *testing.T) { ") tags(t1 int)", "create table if not exists ct0 using all_type tags(1000)", "create table if not exists ct1 using all_type tags(2000)", - "create table if not exists ct3 using all_type tags(3000)", - "create topic if not exists test_tmq_common as select ts,c1,c2,c3,c4,c5,c6,c7,c8,c9,c10,c11,c12,c13 from ct1", + "create table if not exists ct2 using all_type tags(3000)", + "create topic if not exists test_tmq_common as select ts,c1,c2,c3,c4,c5,c6,c7,c8,c9,c10,c11,c12,c13 from all_type", } defer func() { @@ -59,20 +60,24 @@ func TestTmq(t *testing.T) { assert.NoError(t, err) }() now := time.Now() + err = execWithoutResult(conn, fmt.Sprintf("insert into ct0 values('%s',true,2,3,4,5,6,7,8,9,10,11,'1','2')", now.Format(time.RFC3339Nano))) + assert.NoError(t, err) err = execWithoutResult(conn, fmt.Sprintf("insert into ct1 values('%s',true,2,3,4,5,6,7,8,9,10,11,'1','2')", now.Format(time.RFC3339Nano))) assert.NoError(t, err) + err = execWithoutResult(conn, fmt.Sprintf("insert into ct2 values('%s',true,2,3,4,5,6,7,8,9,10,11,'1','2')", now.Format(time.RFC3339Nano))) + assert.NoError(t, err) consumer, err := NewConsumer(&tmq.ConfigMap{ - "group.id": "test", - "auto.offset.reset": "earliest", - "td.connect.ip": "127.0.0.1", - "td.connect.user": "root", - "td.connect.pass": "taosdata", - "td.connect.port": "6030", - "client.id": "test_tmq_c", - "enable.auto.commit": "false", - "experimental.snapshot.enable": "true", - "msg.with.table.name": "true", + "group.id": "test", + "auto.offset.reset": "earliest", + "td.connect.ip": "127.0.0.1", + "td.connect.user": "root", + "td.connect.pass": "taosdata", + "td.connect.port": "6030", + "client.id": "test_tmq_c", + "enable.auto.commit": "false", + //"experimental.snapshot.enable": "true", + "msg.with.table.name": "true", }) if err != nil { t.Error(err) @@ -83,6 +88,10 @@ func TestTmq(t *testing.T) { t.Error(err) return } + ass, err := consumer.Assignment() + t.Log(ass) + position, _ := consumer.Position(ass) + t.Log(position) haveMessage := false for i := 0; i < 5; i++ { ev := consumer.Poll(500) @@ -108,8 +117,23 @@ func TestTmq(t *testing.T) { assert.Equal(t, float64(11), row1[11].(float64)) assert.Equal(t, "1", row1[12].(string)) assert.Equal(t, "2", row1[13].(string)) - _, err = consumer.Commit() + t.Log(e.Offset()) + ass, err := consumer.Assignment() + t.Log(ass) + committed, err := consumer.Committed(ass, 0) + t.Log(committed) + position, _ := consumer.Position(ass) + t.Log(position) + offsets, err := consumer.Position([]tmq.TopicPartition{e.TopicPartition}) + assert.NoError(t, err) + _, err = consumer.CommitOffsets(offsets) assert.NoError(t, err) + ass, err = consumer.Assignment() + t.Log(ass) + committed, err = consumer.Committed(ass, 0) + t.Log(committed) + position, _ = consumer.Position(ass) + t.Log(position) err = consumer.Unsubscribe() assert.NoError(t, err) err = consumer.Close() @@ -208,9 +232,10 @@ func TestSeek(t *testing.T) { for _, datum := range data { dataCount += len(datum.Data) } + time.Sleep(time.Second * 2) + _, err = consumer.Commit() + assert.NoError(t, err) } - _, err = consumer.Commit() - assert.NoError(t, err) } assert.Equal(t, record, dataCount) diff --git a/common/column.go b/common/column.go index de5b840..0408b77 100644 --- a/common/column.go +++ b/common/column.go @@ -20,6 +20,7 @@ var ( NullTime = reflect.TypeOf(types.NullTime{}) NullBool = reflect.TypeOf(types.NullBool{}) NullString = reflect.TypeOf(types.NullString{}) + Bytes = reflect.TypeOf([]byte{}) NullJson = reflect.TypeOf(types.NullJson{}) UnknownType = reflect.TypeOf(new(interface{})).Elem() ) @@ -40,4 +41,5 @@ var ColumnTypeMap = map[int]reflect.Type{ TSDB_DATA_TYPE_NCHAR: NullString, TSDB_DATA_TYPE_TIMESTAMP: NullTime, TSDB_DATA_TYPE_JSON: NullJson, + TSDB_DATA_TYPE_VARBINARY: Bytes, } diff --git a/common/const.go b/common/const.go index 9fa6b05..93d7e0d 100644 --- a/common/const.go +++ b/common/const.go @@ -64,6 +64,7 @@ const ( TSDB_DATA_TYPE_UINT_Str = "INT UNSIGNED" TSDB_DATA_TYPE_UBIGINT_Str = "BIGINT UNSIGNED" TSDB_DATA_TYPE_JSON_Str = "JSON" + TSDB_DATA_TYPE_VARBINARY_Str = "VARBINARY" ) var TypeNameMap = map[int]string{ @@ -83,6 +84,7 @@ var TypeNameMap = map[int]string{ TSDB_DATA_TYPE_UINT: TSDB_DATA_TYPE_UINT_Str, TSDB_DATA_TYPE_UBIGINT: TSDB_DATA_TYPE_UBIGINT_Str, TSDB_DATA_TYPE_JSON: TSDB_DATA_TYPE_JSON_Str, + TSDB_DATA_TYPE_VARBINARY: TSDB_DATA_TYPE_VARBINARY_Str, } var NameTypeMap = map[string]int{ @@ -102,6 +104,7 @@ var NameTypeMap = map[string]int{ TSDB_DATA_TYPE_UINT_Str: TSDB_DATA_TYPE_UINT, TSDB_DATA_TYPE_UBIGINT_Str: TSDB_DATA_TYPE_UBIGINT, TSDB_DATA_TYPE_JSON_Str: TSDB_DATA_TYPE_JSON, + TSDB_DATA_TYPE_VARBINARY_Str: TSDB_DATA_TYPE_VARBINARY, } const ( @@ -142,4 +145,12 @@ const ( const ReqIDKey = "taos_req_id" -const TAOS_NOTIFY_PASSVER = 0 +const ( + TAOS_NOTIFY_PASSVER = 0 + TAOS_NOTIFY_WHITELIST_VER = 1 + TAOS_NOTIFY_USER_DROPPED = 2 +) + +const ( + TAOS_CONN_MODE_BI = 0 +) diff --git a/common/param/column.go b/common/param/column.go index 8e5b68f..a11839c 100644 --- a/common/param/column.go +++ b/common/param/column.go @@ -149,6 +149,18 @@ func (c *ColumnType) AddBinary(strMaxLen int) *ColumnType { return c } +func (c *ColumnType) AddVarBinary(strMaxLen int) *ColumnType { + if c.column >= c.size { + return c + } + c.value[c.column] = &types.ColumnType{ + Type: types.TaosVarBinaryType, + MaxLen: strMaxLen, + } + c.column += 1 + return c +} + func (c *ColumnType) AddNchar(strMaxLen int) *ColumnType { if c.column >= c.size { return c diff --git a/common/param/param.go b/common/param/param.go index a9ec02d..680d5f1 100644 --- a/common/param/param.go +++ b/common/param/param.go @@ -111,6 +111,13 @@ func (p *Param) SetBinary(offset int, value []byte) { p.value[offset] = taosTypes.TaosBinary(value) } +func (p *Param) SetVarBinary(offset int, value []byte) { + if offset >= p.size { + return + } + p.value[offset] = taosTypes.TaosVarBinary(value) +} + func (p *Param) SetNchar(offset int, value string) { if offset >= p.size { return @@ -252,6 +259,15 @@ func (p *Param) AddBinary(value []byte) *Param { return p } +func (p *Param) AddVarBinary(value []byte) *Param { + if p.offset >= p.size { + return p + } + p.value[p.offset] = taosTypes.TaosVarBinary(value) + p.offset += 1 + return p +} + func (p *Param) AddNchar(value string) *Param { if p.offset >= p.size { return p diff --git a/common/parser/block.go b/common/parser/block.go index f573804..29522a1 100644 --- a/common/parser/block.go +++ b/common/parser/block.go @@ -6,6 +6,7 @@ import ( "unsafe" "github.com/taosdata/driver-go/v3/common" + "github.com/taosdata/driver-go/v3/common/pointer" ) const ( @@ -33,27 +34,27 @@ const ( ) func RawBlockGetVersion(rawBlock unsafe.Pointer) int32 { - return *((*int32)(unsafe.Pointer(uintptr(rawBlock) + RawBlockVersionOffset))) + return *((*int32)(pointer.AddUintptr(rawBlock, RawBlockVersionOffset))) } func RawBlockGetLength(rawBlock unsafe.Pointer) int32 { - return *((*int32)(unsafe.Pointer(uintptr(rawBlock) + RawBlockLengthOffset))) + return *((*int32)(pointer.AddUintptr(rawBlock, RawBlockLengthOffset))) } func RawBlockGetNumOfRows(rawBlock unsafe.Pointer) int32 { - return *((*int32)(unsafe.Pointer(uintptr(rawBlock) + NumOfRowsOffset))) + return *((*int32)(pointer.AddUintptr(rawBlock, NumOfRowsOffset))) } func RawBlockGetNumOfCols(rawBlock unsafe.Pointer) int32 { - return *((*int32)(unsafe.Pointer(uintptr(rawBlock) + NumOfColsOffset))) + return *((*int32)(pointer.AddUintptr(rawBlock, NumOfColsOffset))) } func RawBlockGetHasColumnSegment(rawBlock unsafe.Pointer) int32 { - return *((*int32)(unsafe.Pointer(uintptr(rawBlock) + HasColumnSegmentOffset))) + return *((*int32)(pointer.AddUintptr(rawBlock, HasColumnSegmentOffset))) } func RawBlockGetGroupID(rawBlock unsafe.Pointer) uint64 { - return *((*uint64)(unsafe.Pointer(uintptr(rawBlock) + GroupIDOffset))) + return *((*uint64)(pointer.AddUintptr(rawBlock, GroupIDOffset))) } type RawBlockColInfo struct { @@ -63,9 +64,9 @@ type RawBlockColInfo struct { func RawBlockGetColInfo(rawBlock unsafe.Pointer, infos []RawBlockColInfo) { for i := 0; i < len(infos); i++ { - offset := uintptr(rawBlock) + ColInfoOffset + ColInfoSize*uintptr(i) - infos[i].ColType = *((*int8)(unsafe.Pointer(offset))) - infos[i].Bytes = *((*int32)(unsafe.Pointer(offset + Int8Size))) + offset := ColInfoOffset + ColInfoSize*uintptr(i) + infos[i].ColType = *((*int8)(pointer.AddUintptr(rawBlock, offset))) + infos[i].Bytes = *((*int32)(pointer.AddUintptr(rawBlock, offset+Int8Size))) } } @@ -80,7 +81,7 @@ func RawBlockGetColDataOffset(colCount int) uintptr { type FormatTimeFunc func(ts int64, precision int) driver.Value func IsVarDataType(colType uint8) bool { - return colType == common.TSDB_DATA_TYPE_BINARY || colType == common.TSDB_DATA_TYPE_NCHAR || colType == common.TSDB_DATA_TYPE_JSON + return colType == common.TSDB_DATA_TYPE_BINARY || colType == common.TSDB_DATA_TYPE_NCHAR || colType == common.TSDB_DATA_TYPE_JSON || colType == common.TSDB_DATA_TYPE_VARBINARY } func BitmapLen(n int) int { @@ -99,9 +100,9 @@ func BMIsNull(c byte, n int) bool { return c&(1<<(7-BitPos(n))) == (1 << (7 - BitPos(n))) } -type rawConvertFunc func(pStart uintptr, row int, arg ...interface{}) driver.Value +type rawConvertFunc func(pStart unsafe.Pointer, row int, arg ...interface{}) driver.Value -type rawConvertVarDataFunc func(pHeader, pStart uintptr, row int) driver.Value +type rawConvertVarDataFunc func(pHeader, pStart unsafe.Pointer, row int) driver.Value var rawConvertFuncMap = map[uint8]rawConvertFunc{ uint8(common.TSDB_DATA_TYPE_BOOL): rawConvertBool, @@ -119,81 +120,86 @@ var rawConvertFuncMap = map[uint8]rawConvertFunc{ } var rawConvertVarDataMap = map[uint8]rawConvertVarDataFunc{ - uint8(common.TSDB_DATA_TYPE_BINARY): rawConvertBinary, - uint8(common.TSDB_DATA_TYPE_NCHAR): rawConvertNchar, - uint8(common.TSDB_DATA_TYPE_JSON): rawConvertJson, + uint8(common.TSDB_DATA_TYPE_BINARY): rawConvertBinary, + uint8(common.TSDB_DATA_TYPE_NCHAR): rawConvertNchar, + uint8(common.TSDB_DATA_TYPE_JSON): rawConvertJson, + uint8(common.TSDB_DATA_TYPE_VARBINARY): rawConvertVarBinary, } -func ItemIsNull(pHeader uintptr, row int) bool { +func ItemIsNull(pHeader unsafe.Pointer, row int) bool { offset := CharOffset(row) - c := *((*byte)(unsafe.Pointer(pHeader + uintptr(offset)))) + c := *((*byte)(pointer.AddUintptr(pHeader, uintptr(offset)))) return BMIsNull(c, row) } -func rawConvertBool(pStart uintptr, row int, _ ...interface{}) driver.Value { - if (*((*byte)(unsafe.Pointer(pStart + uintptr(row)*1)))) != 0 { +func rawConvertBool(pStart unsafe.Pointer, row int, _ ...interface{}) driver.Value { + if (*((*byte)(pointer.AddUintptr(pStart, uintptr(row)*1)))) != 0 { return true } else { return false } } -func rawConvertTinyint(pStart uintptr, row int, _ ...interface{}) driver.Value { - return *((*int8)(unsafe.Pointer(pStart + uintptr(row)*Int8Size))) +func rawConvertTinyint(pStart unsafe.Pointer, row int, _ ...interface{}) driver.Value { + return *((*int8)(pointer.AddUintptr(pStart, uintptr(row)*Int8Size))) } -func rawConvertSmallint(pStart uintptr, row int, _ ...interface{}) driver.Value { - return *((*int16)(unsafe.Pointer(pStart + uintptr(row)*Int16Size))) +func rawConvertSmallint(pStart unsafe.Pointer, row int, _ ...interface{}) driver.Value { + return *((*int16)(pointer.AddUintptr(pStart, uintptr(row)*Int16Size))) } -func rawConvertInt(pStart uintptr, row int, _ ...interface{}) driver.Value { - return *((*int32)(unsafe.Pointer(pStart + uintptr(row)*Int32Size))) +func rawConvertInt(pStart unsafe.Pointer, row int, _ ...interface{}) driver.Value { + return *((*int32)(pointer.AddUintptr(pStart, uintptr(row)*Int32Size))) } -func rawConvertBigint(pStart uintptr, row int, _ ...interface{}) driver.Value { - return *((*int64)(unsafe.Pointer(pStart + uintptr(row)*Int64Size))) +func rawConvertBigint(pStart unsafe.Pointer, row int, _ ...interface{}) driver.Value { + return *((*int64)(pointer.AddUintptr(pStart, uintptr(row)*Int64Size))) } -func rawConvertUTinyint(pStart uintptr, row int, _ ...interface{}) driver.Value { - return *((*uint8)(unsafe.Pointer(pStart + uintptr(row)*UInt8Size))) +func rawConvertUTinyint(pStart unsafe.Pointer, row int, _ ...interface{}) driver.Value { + return *((*uint8)(pointer.AddUintptr(pStart, uintptr(row)*UInt8Size))) } -func rawConvertUSmallint(pStart uintptr, row int, _ ...interface{}) driver.Value { - return *((*uint16)(unsafe.Pointer(pStart + uintptr(row)*UInt16Size))) +func rawConvertUSmallint(pStart unsafe.Pointer, row int, _ ...interface{}) driver.Value { + return *((*uint16)(pointer.AddUintptr(pStart, uintptr(row)*UInt16Size))) } -func rawConvertUInt(pStart uintptr, row int, _ ...interface{}) driver.Value { - return *((*uint32)(unsafe.Pointer(pStart + uintptr(row)*UInt32Size))) +func rawConvertUInt(pStart unsafe.Pointer, row int, _ ...interface{}) driver.Value { + return *((*uint32)(pointer.AddUintptr(pStart, uintptr(row)*UInt32Size))) } -func rawConvertUBigint(pStart uintptr, row int, _ ...interface{}) driver.Value { - return *((*uint64)(unsafe.Pointer(pStart + uintptr(row)*UInt64Size))) +func rawConvertUBigint(pStart unsafe.Pointer, row int, _ ...interface{}) driver.Value { + return *((*uint64)(pointer.AddUintptr(pStart, uintptr(row)*UInt64Size))) } -func rawConvertFloat(pStart uintptr, row int, _ ...interface{}) driver.Value { - return math.Float32frombits(*((*uint32)(unsafe.Pointer(pStart + uintptr(row)*Float32Size)))) +func rawConvertFloat(pStart unsafe.Pointer, row int, _ ...interface{}) driver.Value { + return math.Float32frombits(*((*uint32)(pointer.AddUintptr(pStart, uintptr(row)*Float32Size)))) } -func rawConvertDouble(pStart uintptr, row int, _ ...interface{}) driver.Value { - return math.Float64frombits(*((*uint64)(unsafe.Pointer(pStart + uintptr(row)*Float64Size)))) +func rawConvertDouble(pStart unsafe.Pointer, row int, _ ...interface{}) driver.Value { + return math.Float64frombits(*((*uint64)(pointer.AddUintptr(pStart, uintptr(row)*Float64Size)))) } -func rawConvertTime(pStart uintptr, row int, arg ...interface{}) driver.Value { +func rawConvertTime(pStart unsafe.Pointer, row int, arg ...interface{}) driver.Value { if len(arg) == 1 { - return common.TimestampConvertToTime(*((*int64)(unsafe.Pointer(pStart + uintptr(row)*Int64Size))), arg[0].(int)) + return common.TimestampConvertToTime(*((*int64)(pointer.AddUintptr(pStart, uintptr(row)*Int64Size))), arg[0].(int)) } else if len(arg) == 2 { - return arg[1].(FormatTimeFunc)(*((*int64)(unsafe.Pointer(pStart + uintptr(row)*Int64Size))), arg[0].(int)) + return arg[1].(FormatTimeFunc)(*((*int64)(pointer.AddUintptr(pStart, uintptr(row)*Int64Size))), arg[0].(int)) } else { panic("convertTime error") } } -func rawConvertBinary(pHeader, pStart uintptr, row int) driver.Value { - offset := *((*int32)(unsafe.Pointer(pHeader + uintptr(row*4)))) +func rawConvertVarBinary(pHeader, pStart unsafe.Pointer, row int) driver.Value { + return rawConvertBinary(pHeader, pStart, row) +} + +func rawConvertBinary(pHeader, pStart unsafe.Pointer, row int) driver.Value { + offset := *((*int32)(pointer.AddUintptr(pHeader, uintptr(row*4)))) if offset == -1 { return nil } - currentRow := unsafe.Pointer(pStart + uintptr(offset)) + currentRow := pointer.AddUintptr(pStart, uintptr(offset)) clen := *((*int16)(currentRow)) currentRow = unsafe.Pointer(uintptr(currentRow) + 2) @@ -205,12 +211,12 @@ func rawConvertBinary(pHeader, pStart uintptr, row int) driver.Value { return string(binaryVal[:]) } -func rawConvertNchar(pHeader, pStart uintptr, row int) driver.Value { - offset := *((*int32)(unsafe.Pointer(pHeader + uintptr(row*4)))) +func rawConvertNchar(pHeader, pStart unsafe.Pointer, row int) driver.Value { + offset := *((*int32)(pointer.AddUintptr(pHeader, uintptr(row*4)))) if offset == -1 { return nil } - currentRow := unsafe.Pointer(pStart + uintptr(offset)) + currentRow := pointer.AddUintptr(pStart, uintptr(offset)) clen := *((*int16)(currentRow)) / 4 currentRow = unsafe.Pointer(uintptr(currentRow) + 2) @@ -222,19 +228,19 @@ func rawConvertNchar(pHeader, pStart uintptr, row int) driver.Value { return string(binaryVal) } -func rawConvertJson(pHeader, pStart uintptr, row int) driver.Value { - offset := *((*int32)(unsafe.Pointer(pHeader + uintptr(row*4)))) +func rawConvertJson(pHeader, pStart unsafe.Pointer, row int) driver.Value { + offset := *((*int32)(pointer.AddUintptr(pHeader, uintptr(row*4)))) if offset == -1 { return nil } - currentRow := unsafe.Pointer(pStart + uintptr(offset)) + currentRow := pointer.AddUintptr(pStart, uintptr(offset)) clen := *((*int16)(currentRow)) - currentRow = unsafe.Pointer(uintptr(currentRow) + 2) + currentRow = pointer.AddUintptr(currentRow, 2) binaryVal := make([]byte, clen) for index := int16(0); index < clen; index++ { - binaryVal[index] = *((*byte)(unsafe.Pointer(uintptr(currentRow) + uintptr(index)))) + binaryVal[index] = *((*byte)(pointer.AddUintptr(currentRow, uintptr(index)))) } return binaryVal[:] } @@ -245,13 +251,13 @@ func ReadBlock(block unsafe.Pointer, blockSize int, colTypes []uint8, precision colCount := len(colTypes) nullBitMapOffset := uintptr(BitmapLen(blockSize)) lengthOffset := RawBlockGetColumnLengthOffset(colCount) - pHeader := uintptr(block) + RawBlockGetColDataOffset(colCount) - var pStart uintptr + pHeader := pointer.AddUintptr(block, RawBlockGetColDataOffset(colCount)) + var pStart unsafe.Pointer for column := 0; column < colCount; column++ { - colLength := *((*int32)(unsafe.Pointer(uintptr(block) + lengthOffset + uintptr(column)*Int32Size))) + colLength := *((*int32)(pointer.AddUintptr(block, lengthOffset+uintptr(column)*Int32Size))) if IsVarDataType(colTypes[column]) { convertF := rawConvertVarDataMap[colTypes[column]] - pStart = pHeader + Int32Size*uintptr(blockSize) + pStart = pointer.AddUintptr(pHeader, Int32Size*uintptr(blockSize)) for row := 0; row < blockSize; row++ { if column == 0 { r[row] = make([]driver.Value, colCount) @@ -260,7 +266,7 @@ func ReadBlock(block unsafe.Pointer, blockSize int, colTypes []uint8, precision } } else { convertF := rawConvertFuncMap[colTypes[column]] - pStart = pHeader + nullBitMapOffset + pStart = pointer.AddUintptr(pHeader, nullBitMapOffset) for row := 0; row < blockSize; row++ { if column == 0 { r[row] = make([]driver.Value, colCount) @@ -272,7 +278,7 @@ func ReadBlock(block unsafe.Pointer, blockSize int, colTypes []uint8, precision } } } - pHeader = pStart + uintptr(colLength) + pHeader = pointer.AddUintptr(pStart, uintptr(colLength)) } return r } @@ -281,24 +287,24 @@ func ReadRow(dest []driver.Value, block unsafe.Pointer, blockSize int, row int, colCount := len(colTypes) nullBitMapOffset := uintptr(BitmapLen(blockSize)) lengthOffset := RawBlockGetColumnLengthOffset(colCount) - pHeader := uintptr(block) + RawBlockGetColDataOffset(colCount) - var pStart uintptr + pHeader := pointer.AddUintptr(block, RawBlockGetColDataOffset(colCount)) + var pStart unsafe.Pointer for column := 0; column < colCount; column++ { - colLength := *((*int32)(unsafe.Pointer(uintptr(block) + lengthOffset + uintptr(column)*Int32Size))) + colLength := *((*int32)(pointer.AddUintptr(block, lengthOffset+uintptr(column)*Int32Size))) if IsVarDataType(colTypes[column]) { convertF := rawConvertVarDataMap[colTypes[column]] - pStart = pHeader + Int32Size*uintptr(blockSize) + pStart = pointer.AddUintptr(pHeader, Int32Size*uintptr(blockSize)) dest[column] = convertF(pHeader, pStart, row) } else { convertF := rawConvertFuncMap[colTypes[column]] - pStart = pHeader + nullBitMapOffset + pStart = pointer.AddUintptr(pHeader, nullBitMapOffset) if ItemIsNull(pHeader, row) { dest[column] = nil } else { dest[column] = convertF(pStart, row, precision) } } - pHeader = pStart + uintptr(colLength) + pHeader = pointer.AddUintptr(pStart, uintptr(colLength)) } } @@ -307,13 +313,13 @@ func ReadBlockWithTimeFormat(block unsafe.Pointer, blockSize int, colTypes []uin colCount := len(colTypes) nullBitMapOffset := uintptr(BitmapLen(blockSize)) lengthOffset := RawBlockGetColumnLengthOffset(colCount) - pHeader := uintptr(block) + RawBlockGetColDataOffset(colCount) - var pStart uintptr + pHeader := pointer.AddUintptr(block, RawBlockGetColDataOffset(colCount)) + var pStart unsafe.Pointer for column := 0; column < colCount; column++ { - colLength := *((*int32)(unsafe.Pointer(uintptr(block) + lengthOffset + uintptr(column)*Int32Size))) + colLength := *((*int32)(pointer.AddUintptr(block, lengthOffset+uintptr(column)*Int32Size))) if IsVarDataType(colTypes[column]) { convertF := rawConvertVarDataMap[colTypes[column]] - pStart = pHeader + uintptr(4*blockSize) + pStart = pointer.AddUintptr(pHeader, uintptr(4*blockSize)) for row := 0; row < blockSize; row++ { if column == 0 { r[row] = make([]driver.Value, colCount) @@ -322,7 +328,7 @@ func ReadBlockWithTimeFormat(block unsafe.Pointer, blockSize int, colTypes []uin } } else { convertF := rawConvertFuncMap[colTypes[column]] - pStart = pHeader + nullBitMapOffset + pStart = pointer.AddUintptr(pHeader, nullBitMapOffset) for row := 0; row < blockSize; row++ { if column == 0 { r[row] = make([]driver.Value, colCount) @@ -334,52 +340,19 @@ func ReadBlockWithTimeFormat(block unsafe.Pointer, blockSize int, colTypes []uin } } } - pHeader = pStart + uintptr(colLength) + pHeader = pointer.AddUintptr(pStart, uintptr(colLength)) } return r } -func ItemRawBlock(colType uint8, pHeader, pStart uintptr, row int, precision int, timeFormat FormatTimeFunc) driver.Value { +func ItemRawBlock(colType uint8, pHeader, pStart unsafe.Pointer, row int, precision int, timeFormat FormatTimeFunc) driver.Value { if IsVarDataType(colType) { - switch colType { - case uint8(common.TSDB_DATA_TYPE_BINARY): - return rawConvertBinary(pHeader, pStart, row) - case uint8(common.TSDB_DATA_TYPE_NCHAR): - return rawConvertNchar(pHeader, pStart, row) - case uint8(common.TSDB_DATA_TYPE_JSON): - return rawConvertJson(pHeader, pStart, row) - } + return rawConvertVarDataMap[colType](pHeader, pStart, row) } else { if ItemIsNull(pHeader, row) { return nil } else { - switch colType { - case uint8(common.TSDB_DATA_TYPE_BOOL): - return rawConvertBool(pStart, row) - case uint8(common.TSDB_DATA_TYPE_TINYINT): - return rawConvertTinyint(pStart, row) - case uint8(common.TSDB_DATA_TYPE_SMALLINT): - return rawConvertSmallint(pStart, row) - case uint8(common.TSDB_DATA_TYPE_INT): - return rawConvertInt(pStart, row) - case uint8(common.TSDB_DATA_TYPE_BIGINT): - return rawConvertBigint(pStart, row) - case uint8(common.TSDB_DATA_TYPE_UTINYINT): - return rawConvertUTinyint(pStart, row) - case uint8(common.TSDB_DATA_TYPE_USMALLINT): - return rawConvertUSmallint(pStart, row) - case uint8(common.TSDB_DATA_TYPE_UINT): - return rawConvertUInt(pStart, row) - case uint8(common.TSDB_DATA_TYPE_UBIGINT): - return rawConvertUBigint(pStart, row) - case uint8(common.TSDB_DATA_TYPE_FLOAT): - return rawConvertFloat(pStart, row) - case uint8(common.TSDB_DATA_TYPE_DOUBLE): - return rawConvertDouble(pStart, row) - case uint8(common.TSDB_DATA_TYPE_TIMESTAMP): - return rawConvertTime(pStart, row, precision, timeFormat) - } + return rawConvertFuncMap[colType](pStart, row, precision, timeFormat) } } - return nil } diff --git a/common/parser/block_test.go b/common/parser/block_test.go index 2220237..f62d7b7 100644 --- a/common/parser/block_test.go +++ b/common/parser/block_test.go @@ -9,6 +9,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/taosdata/driver-go/v3/common" + "github.com/taosdata/driver-go/v3/common/pointer" "github.com/taosdata/driver-go/v3/errors" "github.com/taosdata/driver-go/v3/wrapper" ) @@ -105,8 +106,8 @@ func TestReadBlock(t *testing.T) { return } precision := wrapper.TaosResultPrecision(res) - pHeaderList := make([]uintptr, fileCount) - pStartList := make([]uintptr, fileCount) + pHeaderList := make([]unsafe.Pointer, fileCount) + pStartList := make([]unsafe.Pointer, fileCount) var data [][]driver.Value for { blockSize, errCode, block := wrapper.TaosFetchRawBlock(res) @@ -122,20 +123,20 @@ func TestReadBlock(t *testing.T) { } nullBitMapOffset := uintptr(BitmapLen(blockSize)) lengthOffset := RawBlockGetColumnLengthOffset(fileCount) - tmpPHeader := uintptr(block) + RawBlockGetColDataOffset(fileCount) - var tmpPStart uintptr + tmpPHeader := pointer.AddUintptr(block, RawBlockGetColDataOffset(fileCount)) + var tmpPStart unsafe.Pointer for column := 0; column < fileCount; column++ { - colLength := *((*int32)(unsafe.Pointer(uintptr(block) + lengthOffset + uintptr(column)*Int32Size))) + colLength := *((*int32)(pointer.AddUintptr(block, lengthOffset+uintptr(column)*Int32Size))) if IsVarDataType(rh.ColTypes[column]) { pHeaderList[column] = tmpPHeader - tmpPStart = tmpPHeader + Int32Size*uintptr(blockSize) + tmpPStart = pointer.AddUintptr(tmpPHeader, Int32Size*uintptr(blockSize)) pStartList[column] = tmpPStart } else { pHeaderList[column] = tmpPHeader - tmpPStart = tmpPHeader + nullBitMapOffset + tmpPStart = pointer.AddUintptr(tmpPHeader, nullBitMapOffset) pStartList[column] = tmpPStart } - tmpPHeader = tmpPStart + uintptr(colLength) + tmpPHeader = pointer.AddUintptr(tmpPStart, uintptr(colLength)) } for row := 0; row < blockSize; row++ { rowV := make([]driver.Value, fileCount) @@ -250,8 +251,8 @@ func TestBlockTag(t *testing.T) { return } precision := wrapper.TaosResultPrecision(res) - pHeaderList := make([]uintptr, fileCount) - pStartList := make([]uintptr, fileCount) + pHeaderList := make([]unsafe.Pointer, fileCount) + pStartList := make([]unsafe.Pointer, fileCount) var data [][]driver.Value for { blockSize, errCode, block := wrapper.TaosFetchRawBlock(res) @@ -267,20 +268,20 @@ func TestBlockTag(t *testing.T) { } nullBitMapOffset := uintptr(BitmapLen(blockSize)) lengthOffset := RawBlockGetColumnLengthOffset(fileCount) - tmpPHeader := uintptr(block) + RawBlockGetColDataOffset(fileCount) // length i32, group u64 - var tmpPStart uintptr + tmpPHeader := pointer.AddUintptr(block, RawBlockGetColDataOffset(fileCount)) // length i32, group u64 + var tmpPStart unsafe.Pointer for column := 0; column < fileCount; column++ { - colLength := *((*int32)(unsafe.Pointer(uintptr(block) + lengthOffset + uintptr(column)*Int32Size))) + colLength := *((*int32)(pointer.AddUintptr(block, lengthOffset+uintptr(column)*Int32Size))) if IsVarDataType(rh.ColTypes[column]) { pHeaderList[column] = tmpPHeader - tmpPStart = tmpPHeader + Int32Size*uintptr(blockSize) + tmpPStart = pointer.AddUintptr(tmpPHeader, Int32Size*uintptr(blockSize)) pStartList[column] = tmpPStart } else { pHeaderList[column] = tmpPHeader - tmpPStart = tmpPHeader + nullBitMapOffset + tmpPStart = pointer.AddUintptr(tmpPHeader, nullBitMapOffset) pStartList[column] = tmpPStart } - tmpPHeader = tmpPStart + uintptr(colLength) + tmpPHeader = pointer.AddUintptr(tmpPStart, uintptr(colLength)) } for row := 0; row < blockSize; row++ { rowV := make([]driver.Value, fileCount) diff --git a/common/pointer/unsafe.go b/common/pointer/unsafe.go new file mode 100644 index 0000000..7e5e36f --- /dev/null +++ b/common/pointer/unsafe.go @@ -0,0 +1,7 @@ +package pointer + +import "unsafe" + +func AddUintptr(ptr unsafe.Pointer, len uintptr) unsafe.Pointer { + return unsafe.Pointer(uintptr(ptr) + len) +} diff --git a/common/reqid.go b/common/reqid.go index f5d711f..02f1c72 100644 --- a/common/reqid.go +++ b/common/reqid.go @@ -8,6 +8,7 @@ import ( "unsafe" "github.com/google/uuid" + "github.com/taosdata/driver-go/v3/common/pointer" ) var tUUIDHashId int64 @@ -36,10 +37,9 @@ func murmurHash32(data []byte, seed uint32) uint32 { h1 := seed nBlocks := len(data) / 4 - p := uintptr(unsafe.Pointer(&data[0])) - p1 := p + uintptr(4*nBlocks) - for ; p < p1; p += 4 { - k1 := *(*uint32)(unsafe.Pointer(p)) + p := unsafe.Pointer(&data[0]) + for i := 0; i < nBlocks; i++ { + k1 := *(*uint32)(pointer.AddUintptr(p, uintptr(i*4))) k1 *= c1 k1 = bits.RotateLeft32(k1, 15) diff --git a/common/serializer/block.go b/common/serializer/block.go index 4a23b53..830708e 100644 --- a/common/serializer/block.go +++ b/common/serializer/block.go @@ -366,6 +366,34 @@ func SerializeRawBlock(params []*param.Param, colType *param.ColumnType) ([]byte } lengthData = appendUint32(lengthData, uint32(length)) data = append(data, dataTmp...) + case taosTypes.TaosVarBinaryType: + colInfoData = append(colInfoData, common.TSDB_DATA_TYPE_VARBINARY) + colInfoData = appendUint32(colInfoData, uint32(0)) + length := 0 + dataTmp := make([]byte, Int32Size*rows) + rowData := params[colIndex].GetValues() + for rowIndex := 0; rowIndex < rows; rowIndex++ { + offset := Int32Size * rowIndex + if rowData[rowIndex] == nil { + for i := 0; i < Int32Size; i++ { + // -1 + dataTmp[offset+i] = byte(255) + } + } else { + v, is := rowData[rowIndex].(taosTypes.TaosVarBinary) + if !is { + return nil, DataTypeWrong + } + for i := 0; i < Int32Size; i++ { + dataTmp[offset+i] = byte(length >> (8 * i)) + } + dataTmp = appendUint16(dataTmp, uint16(len(v))) + dataTmp = append(dataTmp, v...) + length += len(v) + Int16Size + } + } + lengthData = appendUint32(lengthData, uint32(length)) + data = append(data, dataTmp...) case taosTypes.TaosNcharType: colInfoData = append(colInfoData, common.TSDB_DATA_TYPE_NCHAR) colInfoData = appendUint32(colInfoData, uint32(0)) diff --git a/common/stmt/field.go b/common/stmt/field.go index ee7e441..c8a46d4 100644 --- a/common/stmt/field.go +++ b/common/stmt/field.go @@ -41,6 +41,8 @@ func (s *StmtField) GetType() (*types.ColumnType, error) { return &types.ColumnType{Type: types.TaosDoubleType}, nil case common.TSDB_DATA_TYPE_BINARY: return &types.ColumnType{Type: types.TaosBinaryType}, nil + case common.TSDB_DATA_TYPE_VARBINARY: + return &types.ColumnType{Type: types.TaosVarBinaryType}, nil case common.TSDB_DATA_TYPE_NCHAR: return &types.ColumnType{Type: types.TaosNcharType}, nil case common.TSDB_DATA_TYPE_TIMESTAMP: diff --git a/common/tmq/event.go b/common/tmq/event.go index 2b5979b..c62cc93 100644 --- a/common/tmq/event.go +++ b/common/tmq/event.go @@ -65,10 +65,11 @@ type Message interface { } type DataMessage struct { - dbName string - topic string - data []*Data - offset Offset + TopicPartition TopicPartition + dbName string + topic string + data []*Data + offset Offset } func (m *DataMessage) String() string { @@ -109,10 +110,11 @@ func (m *DataMessage) Offset() Offset { } type MetaMessage struct { - dbName string - topic string - offset Offset - meta *Meta + TopicPartition TopicPartition + dbName string + topic string + offset Offset + meta *Meta } func (m *MetaMessage) Offset() Offset { @@ -153,10 +155,11 @@ func (m *MetaMessage) Value() interface{} { } type MetaDataMessage struct { - dbName string - topic string - offset Offset - metaData *MetaData + TopicPartition TopicPartition + dbName string + topic string + offset Offset + metaData *MetaData } func (m *MetaDataMessage) Offset() Offset { diff --git a/common/tmq/tmq.go b/common/tmq/tmq.go index d830400..a6e5617 100644 --- a/common/tmq/tmq.go +++ b/common/tmq/tmq.go @@ -42,10 +42,22 @@ type CreateItem struct { type Offset int64 +const OffsetInvalid = Offset(-2147467247) + func (o Offset) String() string { + if o == OffsetInvalid { + return "unset" + } return fmt.Sprintf("%d", int64(o)) } +func (o Offset) Valid() bool { + if o < 0 && o != OffsetInvalid { + return false + } + return true +} + type TopicPartition struct { Topic *string Partition int32 diff --git a/taosSql/driver_test.go b/taosSql/driver_test.go index 9dfc33c..51a76be 100644 --- a/taosSql/driver_test.go +++ b/taosSql/driver_test.go @@ -309,43 +309,43 @@ func TestJson(t *testing.T) { } defer db.Close() defer func() { - _, err = db.Exec("drop database if exists test_json") + _, err = db.Exec("drop database if exists test_json_native") if err != nil { t.Error(err) return } }() - _, err = db.Exec("create database if not exists test_json") + _, err = db.Exec("create database if not exists test_json_native") if err != nil { t.Error(err) return } - _, err = db.Exec("drop table if exists test_json.tjson") + _, err = db.Exec("drop table if exists test_json_native.tjson") if err != nil { t.Error(err) return } - _, err = db.Exec("create stable if not exists test_json.tjson(ts timestamp,v int )tags(t json)") + _, err = db.Exec("create stable if not exists test_json_native.tjson(ts timestamp,v int )tags(t json)") if err != nil { t.Error(err) return } - _, err = db.Exec(`insert into test_json.tj_1 using test_json.tjson tags('{"a":1,"b":"b"}')values (now,1)`) + _, err = db.Exec(`insert into test_json_native.tj_1 using test_json_native.tjson tags('{"a":1,"b":"b"}')values (now,1)`) if err != nil { t.Error(err) return } - _, err = db.Exec(`insert into test_json.tj_2 using test_json.tjson tags('{"a":1,"c":"c"}')values (now,1)`) + _, err = db.Exec(`insert into test_json_native.tj_2 using test_json_native.tjson tags('{"a":1,"c":"c"}')values (now,1)`) if err != nil { t.Error(err) return } - _, err = db.Exec(`insert into test_json.tj_3 using test_json.tjson tags('null')values (now,1)`) + _, err = db.Exec(`insert into test_json_native.tj_3 using test_json_native.tjson tags('null')values (now,1)`) if err != nil { t.Error(err) return } - rows, err := db.Query("select * from test_json.tjson") + rows, err := db.Query("select * from test_json_native.tjson") if err != nil { t.Error(err) return @@ -385,38 +385,38 @@ func TestJsonSearch(t *testing.T) { } defer db.Close() defer func() { - _, err = db.Exec("drop database if exists test_json") + _, err = db.Exec("drop database if exists test_json_native_search") if err != nil { t.Error(err) return } }() - _, err = db.Exec("create database if not exists test_json") + _, err = db.Exec("create database if not exists test_json_native_search") if err != nil { t.Error(err) return } - _, err = db.Exec("drop table if exists test_json.tjson_search") + _, err = db.Exec("drop table if exists test_json_native_search.tjson_search") if err != nil { t.Error(err) return } - _, err = db.Exec("create stable if not exists test_json.tjson_search(ts timestamp,v int )tags(t json)") + _, err = db.Exec("create stable if not exists test_json_native_search.tjson_search(ts timestamp,v int )tags(t json)") if err != nil { t.Error(err) return } - _, err = db.Exec(`insert into test_json.tjs_1 using test_json.tjson_search tags('{"a":1,"b":"b"}')values (now,1)`) + _, err = db.Exec(`insert into test_json_native_search.tjs_1 using test_json_native_search.tjson_search tags('{"a":1,"b":"b"}')values (now,1)`) if err != nil { t.Error(err) return } - _, err = db.Exec(`insert into test_json.tjs_2 using test_json.tjson_search tags('{"a":1,"c":"c"}')values (now,2)`) + _, err = db.Exec(`insert into test_json_native_search.tjs_2 using test_json_native_search.tjson_search tags('{"a":1,"c":"c"}')values (now,2)`) if err != nil { t.Error(err) return } - rows, err := db.Query("select * from test_json.tjson_search where t contains 'a' and t->'b'='b' and v = 1") + rows, err := db.Query("select * from test_json_native_search.tjson_search where t contains 'a' and t->'b'='b' and v = 1") if err != nil { t.Error(err) return @@ -451,38 +451,38 @@ func TestJsonMatch(t *testing.T) { } defer db.Close() defer func() { - _, err = db.Exec("drop database if exists test_json") + _, err = db.Exec("drop database if exists test_json_native_match") if err != nil { t.Error(err) return } }() - _, err = db.Exec("create database if not exists test_json") + _, err = db.Exec("create database if not exists test_json_native_match") if err != nil { t.Error(err) return } - _, err = db.Exec("drop table if exists test_json.tjson_match") + _, err = db.Exec("drop table if exists test_json_native_match.tjson_match") if err != nil { t.Error(err) return } - _, err = db.Exec("create stable if not exists test_json.tjson_match(ts timestamp,v int )tags(t json)") + _, err = db.Exec("create stable if not exists test_json_native_match.tjson_match(ts timestamp,v int )tags(t json)") if err != nil { t.Error(err) return } - _, err = db.Exec(`insert into test_json.tjm_1 using test_json.tjson_match tags('{"a":1,"b":"b"}')values (now,1)`) + _, err = db.Exec(`insert into test_json_native_match.tjm_1 using test_json_native_match.tjson_match tags('{"a":1,"b":"b"}')values (now,1)`) if err != nil { t.Error(err) return } - _, err = db.Exec(`insert into test_json.tjm_2 using test_json.tjson_match tags('{"a":1,"c":"c"}')values (now,2)`) + _, err = db.Exec(`insert into test_json_native_match.tjm_2 using test_json_native_match.tjson_match tags('{"a":1,"c":"c"}')values (now,2)`) if err != nil { t.Error(err) return } - rows, err := db.Query("select * from test_json.tjson_match where t contains 'a' and t->'b' match '.*b.*|.*e.*' and v = 1") + rows, err := db.Query("select * from test_json_native_match.tjson_match where t contains 'a' and t->'b' match '.*b.*|.*e.*' and v = 1") if err != nil { t.Error(err) return @@ -516,33 +516,33 @@ func TestChinese(t *testing.T) { } defer db.Close() defer func() { - _, err = db.Exec("drop database if exists test_chinese") + _, err = db.Exec("drop database if exists test_chinese_native") if err != nil { t.Error(err) return } }() - _, err = db.Exec("create database if not exists test_chinese") + _, err = db.Exec("create database if not exists test_chinese_native") if err != nil { t.Error(err) return } - _, err = db.Exec("drop table if exists test_chinese.chinese") + _, err = db.Exec("drop table if exists test_chinese_native.chinese") if err != nil { t.Error(err) return } - _, err = db.Exec("create table if not exists test_chinese.chinese(ts timestamp,v nchar(32))") + _, err = db.Exec("create table if not exists test_chinese_native.chinese(ts timestamp,v nchar(32))") if err != nil { t.Error(err) return } - _, err = db.Exec(`INSERT INTO test_chinese.chinese (ts, v) VALUES (?, ?)`, "1641010332000", "'阴天'") + _, err = db.Exec(`INSERT INTO test_chinese_native.chinese (ts, v) VALUES (?, ?)`, "1641010332000", "'阴天'") if err != nil { t.Error(err) return } - rows, err := db.Query("select * from test_chinese.chinese") + rows, err := db.Query("select * from test_chinese_native.chinese") if err != nil { t.Error(err) return diff --git a/taosSql/statement.go b/taosSql/statement.go index 399c9a6..c2219ad 100644 --- a/taosSql/statement.go +++ b/taosSql/statement.go @@ -307,6 +307,15 @@ func (stmt *Stmt) CheckNamedValue(v *driver.NamedValue) error { default: return fmt.Errorf("CheckNamedValue:%v can not convert to binary", v) } + case common.TSDB_DATA_TYPE_VARBINARY: + switch v.Value.(type) { + case string: + v.Value = types.TaosVarBinary(v.Value.(string)) + case []byte: + v.Value = types.TaosVarBinary(v.Value.([]byte)) + default: + return fmt.Errorf("CheckNamedValue:%v can not convert to varbinary", v) + } case common.TSDB_DATA_TYPE_TIMESTAMP: t, is := v.Value.(time.Time) if is { diff --git a/taosWS/rows.go b/taosWS/rows.go index b75f4e2..b462f4e 100644 --- a/taosWS/rows.go +++ b/taosWS/rows.go @@ -10,6 +10,7 @@ import ( "github.com/taosdata/driver-go/v3/common" "github.com/taosdata/driver-go/v3/common/parser" + "github.com/taosdata/driver-go/v3/common/pointer" taosErrors "github.com/taosdata/driver-go/v3/errors" ) @@ -151,7 +152,7 @@ func (rs *rows) fetchBlock() error { return err } rs.block = respBytes - rs.blockPtr = unsafe.Pointer(*(*uintptr)(unsafe.Pointer(&rs.block)) + uintptr(16)) + rs.blockPtr = pointer.AddUintptr(unsafe.Pointer(&rs.block[0]), 16) rs.blockOffset = 0 return nil } diff --git a/types/taostype.go b/types/taostype.go index 50a5da8..e6c7947 100644 --- a/types/taostype.go +++ b/types/taostype.go @@ -18,6 +18,7 @@ type ( TaosFloat float32 TaosDouble float64 TaosBinary []byte + TaosVarBinary []byte TaosNchar string TaosTimestamp struct { T time.Time @@ -39,6 +40,7 @@ var ( TaosFloatType = reflect.TypeOf(TaosFloat(0)) TaosDoubleType = reflect.TypeOf(TaosDouble(0)) TaosBinaryType = reflect.TypeOf(TaosBinary(nil)) + TaosVarBinaryType = reflect.TypeOf(TaosVarBinary(nil)) TaosNcharType = reflect.TypeOf(TaosNchar("")) TaosTimestampType = reflect.TypeOf(TaosTimestamp{}) TaosJsonType = reflect.TypeOf(TaosJson("")) diff --git a/wrapper/notify_test.go b/wrapper/notify_test.go index 6ed6fe0..fa8cd06 100644 --- a/wrapper/notify_test.go +++ b/wrapper/notify_test.go @@ -52,4 +52,46 @@ func TestNotify(t *testing.T) { case <-timeout.Done(): t.Error("wait for notify callback timeout") } + { + notify := make(chan int64, 1) + handler := cgo.NewHandle(notify) + errCode := TaosSetNotifyCB(conn2, handler, common.TAOS_NOTIFY_WHITELIST_VER) + if errCode != 0 { + errStr := TaosErrorStr(nil) + t.Error(errCode, errStr) + } + err = exec(conn, "ALTER USER t_notify ADD HOST '192.168.1.98/0','192.168.1.98/32'") + assert.NoError(t, err) + timeout, cancel = context.WithTimeout(context.Background(), time.Second*5) + defer cancel() + now := time.Now() + select { + case version := <-notify: + fmt.Println(time.Now().Sub(now)) + t.Log(version) + case <-timeout.Done(): + t.Error("wait for notify callback timeout") + } + } + { + notify := make(chan struct{}, 1) + handler := cgo.NewHandle(notify) + errCode := TaosSetNotifyCB(conn2, handler, common.TAOS_NOTIFY_USER_DROPPED) + if errCode != 0 { + errStr := TaosErrorStr(nil) + t.Error(errCode, errStr) + } + err = exec(conn, "drop USER t_notify") + assert.NoError(t, err) + timeout, cancel = context.WithTimeout(context.Background(), time.Second*5) + defer cancel() + now := time.Now() + select { + case _ = <-notify: + fmt.Println(time.Now().Sub(now)) + t.Log("user dropped") + case <-timeout.Done(): + t.Error("wait for notify callback timeout") + } + } } diff --git a/wrapper/notifycb.go b/wrapper/notifycb.go index 7063496..d933345 100644 --- a/wrapper/notifycb.go +++ b/wrapper/notifycb.go @@ -17,11 +17,20 @@ import ( //export NotifyCallback func NotifyCallback(p unsafe.Pointer, ext unsafe.Pointer, notifyType C.int) { defer func() { + // channel may be closed recover() }() - if int(notifyType) == common.TAOS_NOTIFY_PASSVER { + switch int(notifyType) { + case common.TAOS_NOTIFY_PASSVER: version := int32(*(*C.int32_t)(ext)) c := (*(*cgo.Handle)(p)).Value().(chan int32) c <- version + case common.TAOS_NOTIFY_WHITELIST_VER: + version := int64(*(*C.int64_t)(ext)) + c := (*(*cgo.Handle)(p)).Value().(chan int64) + c <- version + case common.TAOS_NOTIFY_USER_DROPPED: + c := (*(*cgo.Handle)(p)).Value().(chan struct{}) + c <- struct{}{} } } diff --git a/wrapper/row.go b/wrapper/row.go index 6ee6508..042d4b3 100644 --- a/wrapper/row.go +++ b/wrapper/row.go @@ -9,6 +9,7 @@ import ( "unsafe" "github.com/taosdata/driver-go/v3/common" + "github.com/taosdata/driver-go/v3/common/pointer" ) const ( @@ -18,7 +19,8 @@ const ( type FormatTimeFunc func(ts int64, precision int) driver.Value func FetchRow(row unsafe.Pointer, offset int, colType uint8, length int, arg ...interface{}) driver.Value { - p := unsafe.Pointer(*(*uintptr)(unsafe.Pointer(uintptr(row) + uintptr(offset)*PointerSize))) + base := *(**C.void)(pointer.AddUintptr(row, uintptr(offset)*PointerSize)) + p := unsafe.Pointer(base) if p == nil { return nil } @@ -49,10 +51,10 @@ func FetchRow(row unsafe.Pointer, offset int, colType uint8, length int, arg ... return *((*float32)(p)) case C.TSDB_DATA_TYPE_DOUBLE: return *((*float64)(p)) - case C.TSDB_DATA_TYPE_BINARY, C.TSDB_DATA_TYPE_NCHAR: + case C.TSDB_DATA_TYPE_BINARY, C.TSDB_DATA_TYPE_NCHAR, C.TSDB_DATA_TYPE_VARBINARY: data := make([]byte, length) for i := 0; i < length; i++ { - data[i] = *((*byte)(unsafe.Pointer(uintptr(p) + uintptr(i)))) + data[i] = *((*byte)(pointer.AddUintptr(p, uintptr(i)))) } return string(data) case C.TSDB_DATA_TYPE_TIMESTAMP: @@ -66,7 +68,7 @@ func FetchRow(row unsafe.Pointer, offset int, colType uint8, length int, arg ... case C.TSDB_DATA_TYPE_JSON: data := make([]byte, length) for i := 0; i < length; i++ { - data[i] = *((*byte)(unsafe.Pointer(uintptr(p) + uintptr(i)))) + data[i] = *((*byte)(pointer.AddUintptr(p, uintptr(i)))) } return data default: diff --git a/wrapper/stmt.go b/wrapper/stmt.go index fe3239b..5bba031 100644 --- a/wrapper/stmt.go +++ b/wrapper/stmt.go @@ -15,6 +15,7 @@ import ( "github.com/taosdata/driver-go/v3/common" "github.com/taosdata/driver-go/v3/common/stmt" + taosError "github.com/taosdata/driver-go/v3/errors" taosTypes "github.com/taosdata/driver-go/v3/types" ) @@ -234,6 +235,17 @@ func generateTaosBindList(params []driver.Value) ([]C.TAOS_MULTI_BIND, []unsafe. *(bind.length) = C.int32_t(clen) needFreePointer = append(needFreePointer, p) bind.buffer_length = C.uintptr_t(clen) + case taosTypes.TaosVarBinary: + bind.buffer_type = C.TSDB_DATA_TYPE_VARBINARY + cbuf := C.CString(string(value)) + needFreePointer = append(needFreePointer, unsafe.Pointer(cbuf)) + bind.buffer = unsafe.Pointer(cbuf) + clen := int32(len(value)) + p := C.malloc(C.size_t(unsafe.Sizeof(clen))) + bind.length = (*C.int32_t)(p) + *(bind.length) = C.int32_t(clen) + needFreePointer = append(needFreePointer, p) + bind.buffer_length = C.uintptr_t(clen) case taosTypes.TaosNchar: bind.buffer_type = C.TSDB_DATA_TYPE_NCHAR p := unsafe.Pointer(C.CString(string(value))) @@ -338,6 +350,9 @@ func TaosStmtBindParamBatch(stmt unsafe.Pointer, multiBind [][]driver.Value, bin } else { *(*C.int8_t)(current) = C.int8_t(0) } + + l := unsafe.Pointer(uintptr(lengthList) + uintptr(4*i)) + *(*C.int32_t)(l) = C.int32_t(1) } } case taosTypes.TaosTinyintType: @@ -354,6 +369,9 @@ func TaosStmtBindParamBatch(stmt unsafe.Pointer, multiBind [][]driver.Value, bin value := rowData.(taosTypes.TaosTinyint) current := unsafe.Pointer(uintptr(p) + uintptr(i)) *(*C.int8_t)(current) = C.int8_t(value) + + l := unsafe.Pointer(uintptr(lengthList) + uintptr(4*i)) + *(*C.int32_t)(l) = C.int32_t(1) } } case taosTypes.TaosSmallintType: @@ -370,6 +388,9 @@ func TaosStmtBindParamBatch(stmt unsafe.Pointer, multiBind [][]driver.Value, bin value := rowData.(taosTypes.TaosSmallint) current := unsafe.Pointer(uintptr(p) + uintptr(2*i)) *(*C.int16_t)(current) = C.int16_t(value) + + l := unsafe.Pointer(uintptr(lengthList) + uintptr(4*i)) + *(*C.int32_t)(l) = C.int32_t(2) } } case taosTypes.TaosIntType: @@ -386,6 +407,9 @@ func TaosStmtBindParamBatch(stmt unsafe.Pointer, multiBind [][]driver.Value, bin value := rowData.(taosTypes.TaosInt) current := unsafe.Pointer(uintptr(p) + uintptr(4*i)) *(*C.int32_t)(current) = C.int32_t(value) + + l := unsafe.Pointer(uintptr(lengthList) + uintptr(4*i)) + *(*C.int32_t)(l) = C.int32_t(4) } } case taosTypes.TaosBigintType: @@ -402,6 +426,9 @@ func TaosStmtBindParamBatch(stmt unsafe.Pointer, multiBind [][]driver.Value, bin value := rowData.(taosTypes.TaosBigint) current := unsafe.Pointer(uintptr(p) + uintptr(8*i)) *(*C.int64_t)(current) = C.int64_t(value) + + l := unsafe.Pointer(uintptr(lengthList) + uintptr(4*i)) + *(*C.int32_t)(l) = C.int32_t(8) } } case taosTypes.TaosUTinyintType: @@ -418,6 +445,9 @@ func TaosStmtBindParamBatch(stmt unsafe.Pointer, multiBind [][]driver.Value, bin value := rowData.(taosTypes.TaosUTinyint) current := unsafe.Pointer(uintptr(p) + uintptr(i)) *(*C.uint8_t)(current) = C.uint8_t(value) + + l := unsafe.Pointer(uintptr(lengthList) + uintptr(4*i)) + *(*C.int32_t)(l) = C.int32_t(1) } } case taosTypes.TaosUSmallintType: @@ -434,6 +464,9 @@ func TaosStmtBindParamBatch(stmt unsafe.Pointer, multiBind [][]driver.Value, bin value := rowData.(taosTypes.TaosUSmallint) current := unsafe.Pointer(uintptr(p) + uintptr(2*i)) *(*C.uint16_t)(current) = C.uint16_t(value) + + l := unsafe.Pointer(uintptr(lengthList) + uintptr(4*i)) + *(*C.int32_t)(l) = C.int32_t(2) } } case taosTypes.TaosUIntType: @@ -450,6 +483,9 @@ func TaosStmtBindParamBatch(stmt unsafe.Pointer, multiBind [][]driver.Value, bin value := rowData.(taosTypes.TaosUInt) current := unsafe.Pointer(uintptr(p) + uintptr(4*i)) *(*C.uint32_t)(current) = C.uint32_t(value) + + l := unsafe.Pointer(uintptr(lengthList) + uintptr(4*i)) + *(*C.int32_t)(l) = C.int32_t(4) } } case taosTypes.TaosUBigintType: @@ -466,6 +502,9 @@ func TaosStmtBindParamBatch(stmt unsafe.Pointer, multiBind [][]driver.Value, bin value := rowData.(taosTypes.TaosUBigint) current := unsafe.Pointer(uintptr(p) + uintptr(8*i)) *(*C.uint64_t)(current) = C.uint64_t(value) + + l := unsafe.Pointer(uintptr(lengthList) + uintptr(4*i)) + *(*C.int32_t)(l) = C.int32_t(8) } } case taosTypes.TaosFloatType: @@ -482,6 +521,9 @@ func TaosStmtBindParamBatch(stmt unsafe.Pointer, multiBind [][]driver.Value, bin value := rowData.(taosTypes.TaosFloat) current := unsafe.Pointer(uintptr(p) + uintptr(4*i)) *(*C.float)(current) = C.float(value) + + l := unsafe.Pointer(uintptr(lengthList) + uintptr(4*i)) + *(*C.int32_t)(l) = C.int32_t(4) } } case taosTypes.TaosDoubleType: @@ -498,6 +540,9 @@ func TaosStmtBindParamBatch(stmt unsafe.Pointer, multiBind [][]driver.Value, bin value := rowData.(taosTypes.TaosDouble) current := unsafe.Pointer(uintptr(p) + uintptr(8*i)) *(*C.double)(current) = C.double(value) + + l := unsafe.Pointer(uintptr(lengthList) + uintptr(4*i)) + *(*C.int32_t)(l) = C.int32_t(8) } } case taosTypes.TaosBinaryType: @@ -518,6 +563,24 @@ func TaosStmtBindParamBatch(stmt unsafe.Pointer, multiBind [][]driver.Value, bin *(*C.int32_t)(l) = C.int32_t(len(value)) } } + case taosTypes.TaosVarBinaryType: + p = unsafe.Pointer(C.malloc(C.size_t(C.uint(columnType.MaxLen * rowLen)))) + bind.buffer_type = C.TSDB_DATA_TYPE_VARBINARY + bind.buffer_length = C.uintptr_t(columnType.MaxLen) + for i, rowData := range columnData { + currentNull := unsafe.Pointer(uintptr(nullList) + uintptr(i)) + if rowData == nil { + *(*C.char)(currentNull) = C.char(1) + } else { + *(*C.char)(currentNull) = C.char(0) + value := rowData.(taosTypes.TaosVarBinary) + for j := 0; j < len(value); j++ { + *(*C.char)(unsafe.Pointer(uintptr(p) + uintptr(columnType.MaxLen*i+j))) = (C.char)(value[j]) + } + l := unsafe.Pointer(uintptr(lengthList) + uintptr(4*i)) + *(*C.int32_t)(l) = C.int32_t(len(value)) + } + } case taosTypes.TaosNcharType: p = unsafe.Pointer(C.malloc(C.size_t(C.uint(columnType.MaxLen * rowLen)))) bind.buffer_type = C.TSDB_DATA_TYPE_NCHAR @@ -551,6 +614,9 @@ func TaosStmtBindParamBatch(stmt unsafe.Pointer, multiBind [][]driver.Value, bin ts := common.TimeToTimestamp(value.T, value.Precision) current := unsafe.Pointer(uintptr(p) + uintptr(8*i)) *(*C.int64_t)(current) = C.int64_t(ts) + + l := unsafe.Pointer(uintptr(lengthList) + uintptr(4*i)) + *(*C.int32_t)(l) = C.int32_t(8) } } } @@ -644,3 +710,15 @@ func StmtParseFields(num int, fields unsafe.Pointer) []*stmt.StmtField { func TaosStmtReclaimFields(stmt unsafe.Pointer, fields unsafe.Pointer) { C.taos_stmt_reclaim_fields(stmt, (*C.TAOS_FIELD_E)(fields)) } + +// TaosStmtGetParam DLL_EXPORT int taos_stmt_get_param(TAOS_STMT *stmt, int idx, int *type, int *bytes) +func TaosStmtGetParam(stmt unsafe.Pointer, idx int) (dataType int, dataLength int, err error) { + code := C.taos_stmt_get_param(stmt, C.int(idx), (*C.int)(unsafe.Pointer(&dataType)), (*C.int)(unsafe.Pointer(&dataLength))) + if code != 0 { + err = &taosError.TaosError{ + Code: int32(code), + ErrStr: TaosStmtErrStr(stmt), + } + } + return +} diff --git a/wrapper/stmt_test.go b/wrapper/stmt_test.go index 71283bc..a3af59f 100644 --- a/wrapper/stmt_test.go +++ b/wrapper/stmt_test.go @@ -138,6 +138,16 @@ func TestStmt(t *testing.T) { }}, expectValue: "yes", }, //3 + { + tbType: "ts timestamp, v varbinary(8)", + pos: "?, ?", + params: [][]driver.Value{{taosTypes.TaosTimestamp{T: now, Precision: common.PrecisionMilliSecond}}, {taosTypes.TaosVarBinary("yes")}}, + bindType: []*taosTypes.ColumnType{{Type: taosTypes.TaosTimestampType}, { + Type: taosTypes.TaosVarBinaryType, + MaxLen: 3, + }}, + expectValue: "yes", + }, //3 { tbType: "ts timestamp, v nchar(8)", pos: "?, ?", @@ -336,6 +346,12 @@ func TestStmtExec(t *testing.T) { params: []driver.Value{taosTypes.TaosTimestamp{T: now, Precision: common.PrecisionMilliSecond}, taosTypes.TaosBinary("yes")}, expectValue: "yes", }, //3 + { + tbType: "ts timestamp, v varbinary(8)", + pos: "?, ?", + params: []driver.Value{taosTypes.TaosTimestamp{T: now, Precision: common.PrecisionMilliSecond}, taosTypes.TaosVarBinary("yes")}, + expectValue: "yes", + }, //3 { tbType: "ts timestamp, v nchar(8)", pos: "?, ?", @@ -1141,3 +1157,46 @@ func TestTaosStmtSetTags(t *testing.T) { assert.Equal(t, int32(102), data[0][2].(int32)) assert.Equal(t, []byte(`{"a":"b"}`), data[0][3].([]byte)) } + +func TestTaosStmtGetParam(t *testing.T) { + conn, err := TaosConnect("", "root", "taosdata", "", 0) + assert.NoError(t, err) + defer TaosClose(conn) + + err = exec(conn, "drop database if exists test_stmt_get_param") + assert.NoError(t, err) + err = exec(conn, "create database if not exists test_stmt_get_param") + assert.NoError(t, err) + defer exec(conn, "drop database if exists test_stmt_get_param") + + err = exec(conn, + "create table if not exists test_stmt_get_param.stb(ts TIMESTAMP,current float,voltage int,phase float) TAGS (groupid int,location varchar(24))") + assert.NoError(t, err) + + stmt := TaosStmtInit(conn) + assert.NotNilf(t, stmt, "failed to init stmt") + defer TaosStmtClose(stmt) + + code := TaosStmtPrepare(stmt, "insert into test_stmt_get_param.tb_0 using test_stmt_get_param.stb tags(?,?) values (?,?,?,?)") + assert.Equal(t, 0, code, TaosStmtErrStr(stmt)) + + dt, dl, err := TaosStmtGetParam(stmt, 0) // ts + assert.NoError(t, err) + assert.Equal(t, 9, dt) + assert.Equal(t, 8, dl) + + dt, dl, err = TaosStmtGetParam(stmt, 1) // current + assert.NoError(t, err) + assert.Equal(t, 6, dt) + assert.Equal(t, 4, dl) + + dt, dl, err = TaosStmtGetParam(stmt, 2) // voltage + assert.NoError(t, err) + assert.Equal(t, 4, dt) + assert.Equal(t, 4, dl) + + dt, dl, err = TaosStmtGetParam(stmt, 3) // phase + assert.NoError(t, err) + assert.Equal(t, 6, dt) + assert.Equal(t, 4, dl) +} diff --git a/wrapper/taosc.go b/wrapper/taosc.go index 803ed16..2e952a2 100644 --- a/wrapper/taosc.go +++ b/wrapper/taosc.go @@ -33,6 +33,7 @@ import ( "strings" "unsafe" + "github.com/taosdata/driver-go/v3/common/pointer" "github.com/taosdata/driver-go/v3/errors" "github.com/taosdata/driver-go/v3/wrapper/cgo" ) @@ -251,7 +252,38 @@ func TaosGetTablesVgID(conn unsafe.Pointer, db string, tables []string) (vgIDs [ } vgIDs = make([]int, numTables) for i := 0; i < numTables; i++ { - vgIDs[i] = int(*(*C.int)(unsafe.Pointer(uintptr(p) + uintptr(C.sizeof_int*C.int(i))))) + vgIDs[i] = int(*(*C.int)(pointer.AddUintptr(p, uintptr(C.sizeof_int*C.int(i))))) } return } + +//typedef enum { +//TAOS_CONN_MODE_BI = 0, +//} TAOS_CONN_MODE; +// +//DLL_EXPORT int taos_set_conn_mode(TAOS* taos, int mode, int value); + +func TaosSetConnMode(conn unsafe.Pointer, mode int, value int) int { + return int(C.taos_set_conn_mode(conn, C.int(mode), C.int(value))) +} + +// TaosGetCurrentDB DLL_EXPORT int taos_get_current_db(TAOS *taos, char *database, int len, int *required) +func TaosGetCurrentDB(conn unsafe.Pointer) (db string, err error) { + cDb := C.CString(db) + defer C.free(unsafe.Pointer(cDb)) + var required int + + code := C.taos_get_current_db(conn, cDb, C.int(193), (*C.int)(unsafe.Pointer(&required))) + if code != 0 { + err = errors.NewError(int(code), TaosErrorStr(nil)) + } + db = C.GoString(cDb) + + return +} + +// TaosGetServerInfo DLL_EXPORT const char *taos_get_server_info(TAOS *taos) +func TaosGetServerInfo(conn unsafe.Pointer) string { + info := C.taos_get_server_info(conn) + return C.GoString(info) +} diff --git a/wrapper/taosc_test.go b/wrapper/taosc_test.go index de36be7..18ff226 100644 --- a/wrapper/taosc_test.go +++ b/wrapper/taosc_test.go @@ -570,3 +570,38 @@ func TestTaosGetTablesVgID(t *testing.T) { assert.Equal(t, 2, len(vgs2)) assert.Equal(t, vgs2, vgs1) } + +func TestTaosSetConnMode(t *testing.T) { + conn, err := TaosConnect("", "root", "taosdata", "", 0) + assert.NoError(t, err) + defer TaosClose(conn) + code := TaosSetConnMode(conn, 0, 1) + if code != 0 { + t.Errorf("TaosSetConnMode() error code= %d, msg: %s", code, TaosErrorStr(nil)) + } +} + +func TestTaosGetCurrentDB(t *testing.T) { + conn, err := TaosConnect("", "root", "taosdata", "", 0) + assert.NoError(t, err) + defer TaosClose(conn) + dbName := "current_db_test" + _ = exec(conn, fmt.Sprintf("drop database if exists %s", dbName)) + err = exec(conn, fmt.Sprintf("create database %s", dbName)) + assert.NoError(t, err) + defer func() { + _ = exec(conn, fmt.Sprintf("drop database if exists %s", dbName)) + }() + _ = exec(conn, fmt.Sprintf("use %s", dbName)) + db, err := TaosGetCurrentDB(conn) + assert.NoError(t, err) + assert.Equal(t, dbName, db) +} + +func TestTaosGetServerInfo(t *testing.T) { + conn, err := TaosConnect("", "root", "taosdata", "", 0) + assert.NoError(t, err) + defer TaosClose(conn) + info := TaosGetServerInfo(conn) + assert.NotEmpty(t, info) +} diff --git a/wrapper/tmq.go b/wrapper/tmq.go index f0d004c..949bf10 100644 --- a/wrapper/tmq.go +++ b/wrapper/tmq.go @@ -6,12 +6,15 @@ package wrapper #include #include extern void TMQCommitCB(tmq_t *, int32_t, void *param); +extern void TMQAutoCommitCB(tmq_t *, int32_t, void *param); +extern void TMQCommitOffsetCB(tmq_t *, int32_t, void *param); */ import "C" import ( "sync" "unsafe" + "github.com/taosdata/driver-go/v3/common/pointer" "github.com/taosdata/driver-go/v3/common/tmq" "github.com/taosdata/driver-go/v3/errors" "github.com/taosdata/driver-go/v3/wrapper/cgo" @@ -68,7 +71,7 @@ func TMQConfDestroy(conf unsafe.Pointer) { // TMQConfSetAutoCommitCB DLL_EXPORT void tmq_conf_set_auto_commit_cb(tmq_conf_t *conf, tmq_commit_cb *cb, void *param); func TMQConfSetAutoCommitCB(conf unsafe.Pointer, h cgo.Handle) { - C.tmq_conf_set_auto_commit_cb((*C.struct_tmq_conf_t)(conf), (*C.tmq_commit_cb)(C.TMQCommitCB), h.Pointer()) + C.tmq_conf_set_auto_commit_cb((*C.struct_tmq_conf_t)(conf), (*C.tmq_commit_cb)(C.TMQAutoCommitCB), h.Pointer()) } // TMQCommitAsync DLL_EXPORT void tmq_commit_async(tmq_t *tmq, const TAOS_RES *msg, tmq_commit_cb *cb, void *param); @@ -105,10 +108,10 @@ func TMQListGetSize(list unsafe.Pointer) int32 { // TMQListToCArray char **tmq_list_to_c_array(const tmq_list_t *); func TMQListToCArray(list unsafe.Pointer, size int) []string { - head := uintptr(unsafe.Pointer(C.tmq_list_to_c_array((*C.tmq_list_t)(list)))) + head := unsafe.Pointer(C.tmq_list_to_c_array((*C.tmq_list_t)(list))) result := make([]string, size) for i := 0; i < size; i++ { - result[i] = C.GoString(*(**C.char)(unsafe.Pointer(head + PointerSize*uintptr(i)))) + result[i] = C.GoString(*(**C.char)(pointer.AddUintptr(head, PointerSize*uintptr(i)))) } return result } @@ -297,3 +300,36 @@ func TMQFreeAssignment(assignment unsafe.Pointer) { } C.tmq_free_assignment((*C.tmq_topic_assignment)(assignment)) } + +// TMQPosition DLL_EXPORT int64_t tmq_position(tmq_t *tmq, const char *pTopicName, int32_t vgId); +func TMQPosition(consumer unsafe.Pointer, topic string, vGroupID int32) int64 { + topicName := C.CString(topic) + defer C.free(unsafe.Pointer(topicName)) + return int64(C.tmq_position((*C.tmq_t)(consumer), topicName, (C.int32_t)(vGroupID))) +} + +// TMQCommitted DLL_EXPORT int64_t tmq_committed(tmq_t *tmq, const char *pTopicName, int32_t vgId); +func TMQCommitted(consumer unsafe.Pointer, topic string, vGroupID int32) int64 { + topicName := C.CString(topic) + defer C.free(unsafe.Pointer(topicName)) + return int64(C.tmq_committed((*C.tmq_t)(consumer), topicName, (C.int32_t)(vGroupID))) +} + +// TMQCommitOffsetSync DLL_EXPORT int32_t tmq_commit_offset_sync(tmq_t *tmq, const char *pTopicName, int32_t vgId, int64_t offset); +func TMQCommitOffsetSync(consumer unsafe.Pointer, topic string, vGroupID int32, offset int64) int32 { + topicName := C.CString(topic) + defer C.free(unsafe.Pointer(topicName)) + return int32(C.tmq_commit_offset_sync((*C.tmq_t)(consumer), topicName, (C.int32_t)(vGroupID), (C.int64_t)(offset))) +} + +// TMQCommitOffsetAsync DLL_EXPORT void tmq_commit_offset_async(tmq_t *tmq, const char *pTopicName, int32_t vgId, int64_t offset, tmq_commit_cb *cb, void *param); +func TMQCommitOffsetAsync(consumer unsafe.Pointer, topic string, vGroupID int32, offset int64, h cgo.Handle) { + topicName := C.CString(topic) + defer C.free(unsafe.Pointer(topicName)) + C.tmq_commit_offset_async((*C.tmq_t)(consumer), topicName, (C.int32_t)(vGroupID), (C.int64_t)(offset), (*C.tmq_commit_cb)(C.TMQCommitOffsetCB), h.Pointer()) +} + +// TMQGetConnect TAOS *tmq_get_connect(tmq_t *tmq) +func TMQGetConnect(consumer unsafe.Pointer) unsafe.Pointer { + return unsafe.Pointer(C.tmq_get_connect((*C.tmq_t)(consumer))) +} diff --git a/wrapper/tmq_test.go b/wrapper/tmq_test.go index 97c6d82..b4992cf 100644 --- a/wrapper/tmq_test.go +++ b/wrapper/tmq_test.go @@ -1170,36 +1170,70 @@ func TestTMQModify(t *testing.T) { } d, err := query(targetConn, "describe stb") assert.NoError(t, err) - assert.Equal(t, [][]driver.Value{ - {"ts", "TIMESTAMP", int32(8), ""}, - {"c1", "BOOL", int32(1), ""}, - {"c2", "TINYINT", int32(1), ""}, - {"c3", "SMALLINT", int32(2), ""}, - {"c4", "INT", int32(4), ""}, - {"c5", "BIGINT", int32(8), ""}, - {"c6", "TINYINT UNSIGNED", int32(1), ""}, - {"c7", "SMALLINT UNSIGNED", int32(2), ""}, - {"c8", "INT UNSIGNED", int32(4), ""}, - {"c9", "BIGINT UNSIGNED", int32(8), ""}, - {"c10", "FLOAT", int32(4), ""}, - {"c11", "DOUBLE", int32(8), ""}, - {"c12", "VARCHAR", int32(20), ""}, - {"c13", "NCHAR", int32(20), ""}, - {"tts", "TIMESTAMP", int32(8), "TAG"}, - {"tc1", "BOOL", int32(1), "TAG"}, - {"tc2", "TINYINT", int32(1), "TAG"}, - {"tc3", "SMALLINT", int32(2), "TAG"}, - {"tc4", "INT", int32(4), "TAG"}, - {"tc5", "BIGINT", int32(8), "TAG"}, - {"tc6", "TINYINT UNSIGNED", int32(1), "TAG"}, - {"tc7", "SMALLINT UNSIGNED", int32(2), "TAG"}, - {"tc8", "INT UNSIGNED", int32(4), "TAG"}, - {"tc9", "BIGINT UNSIGNED", int32(8), "TAG"}, - {"tc10", "FLOAT", int32(4), "TAG"}, - {"tc11", "DOUBLE", int32(8), "TAG"}, - {"tc12", "VARCHAR", int32(20), "TAG"}, - {"tc13", "NCHAR", int32(20), "TAG"}, - }, d) + if len(d[0]) == 4 { + assert.Equal(t, [][]driver.Value{ + {"ts", "TIMESTAMP", int32(8), ""}, + {"c1", "BOOL", int32(1), ""}, + {"c2", "TINYINT", int32(1), ""}, + {"c3", "SMALLINT", int32(2), ""}, + {"c4", "INT", int32(4), ""}, + {"c5", "BIGINT", int32(8), ""}, + {"c6", "TINYINT UNSIGNED", int32(1), ""}, + {"c7", "SMALLINT UNSIGNED", int32(2), ""}, + {"c8", "INT UNSIGNED", int32(4), ""}, + {"c9", "BIGINT UNSIGNED", int32(8), ""}, + {"c10", "FLOAT", int32(4), ""}, + {"c11", "DOUBLE", int32(8), ""}, + {"c12", "VARCHAR", int32(20), ""}, + {"c13", "NCHAR", int32(20), ""}, + {"tts", "TIMESTAMP", int32(8), "TAG"}, + {"tc1", "BOOL", int32(1), "TAG"}, + {"tc2", "TINYINT", int32(1), "TAG"}, + {"tc3", "SMALLINT", int32(2), "TAG"}, + {"tc4", "INT", int32(4), "TAG"}, + {"tc5", "BIGINT", int32(8), "TAG"}, + {"tc6", "TINYINT UNSIGNED", int32(1), "TAG"}, + {"tc7", "SMALLINT UNSIGNED", int32(2), "TAG"}, + {"tc8", "INT UNSIGNED", int32(4), "TAG"}, + {"tc9", "BIGINT UNSIGNED", int32(8), "TAG"}, + {"tc10", "FLOAT", int32(4), "TAG"}, + {"tc11", "DOUBLE", int32(8), "TAG"}, + {"tc12", "VARCHAR", int32(20), "TAG"}, + {"tc13", "NCHAR", int32(20), "TAG"}, + }, d) + } else { + assert.Equal(t, [][]driver.Value{ + {"ts", "TIMESTAMP", int32(8), "", ""}, + {"c1", "BOOL", int32(1), "", ""}, + {"c2", "TINYINT", int32(1), "", ""}, + {"c3", "SMALLINT", int32(2), "", ""}, + {"c4", "INT", int32(4), "", ""}, + {"c5", "BIGINT", int32(8), "", ""}, + {"c6", "TINYINT UNSIGNED", int32(1), "", ""}, + {"c7", "SMALLINT UNSIGNED", int32(2), "", ""}, + {"c8", "INT UNSIGNED", int32(4), "", ""}, + {"c9", "BIGINT UNSIGNED", int32(8), "", ""}, + {"c10", "FLOAT", int32(4), "", ""}, + {"c11", "DOUBLE", int32(8), "", ""}, + {"c12", "VARCHAR", int32(20), "", ""}, + {"c13", "NCHAR", int32(20), "", ""}, + {"tts", "TIMESTAMP", int32(8), "TAG", ""}, + {"tc1", "BOOL", int32(1), "TAG", ""}, + {"tc2", "TINYINT", int32(1), "TAG", ""}, + {"tc3", "SMALLINT", int32(2), "TAG", ""}, + {"tc4", "INT", int32(4), "TAG", ""}, + {"tc5", "BIGINT", int32(8), "TAG", ""}, + {"tc6", "TINYINT UNSIGNED", int32(1), "TAG", ""}, + {"tc7", "SMALLINT UNSIGNED", int32(2), "TAG", ""}, + {"tc8", "INT UNSIGNED", int32(4), "TAG", ""}, + {"tc9", "BIGINT UNSIGNED", int32(8), "TAG", ""}, + {"tc10", "FLOAT", int32(4), "TAG", ""}, + {"tc11", "DOUBLE", int32(8), "TAG", ""}, + {"tc12", "VARCHAR", int32(20), "TAG", ""}, + {"tc13", "NCHAR", int32(20), "TAG", ""}, + }, d) + } + }) TMQUnsubscribe(tmq) @@ -1470,7 +1504,7 @@ func TestTMQGetTopicAssignment(t *testing.T) { code, assignment := TMQGetTopicAssignment(tmq, "test_tmq_assignment") if code != 0 { - t.Fatal(errors.NewError(int(code), TaosErrorStr(nil))) + t.Fatal(errors.NewError(int(code), TMQErr2Str(code))) } assert.Equal(t, 1, len(assignment)) assert.Equal(t, int64(0), assignment[0].Begin) @@ -1479,7 +1513,7 @@ func TestTMQGetTopicAssignment(t *testing.T) { end := assignment[0].End vgID, vgCode := TaosGetTableVgID(conn, "test_tmq_get_topic_assignment", "t") if vgCode != 0 { - t.Fatal(errors.NewError(int(vgCode), TaosErrorStr(nil))) + t.Fatal(errors.NewError(int(vgCode), TMQErr2Str(code))) } assert.Equal(t, int32(vgID), assignment[0].VGroupID) @@ -1497,7 +1531,7 @@ func TestTMQGetTopicAssignment(t *testing.T) { assert.True(t, haveMessage, "expect have message") code, assignment = TMQGetTopicAssignment(tmq, "test_tmq_assignment") if code != 0 { - t.Fatal(errors.NewError(int(code), TaosErrorStr(nil))) + t.Fatal(errors.NewError(int(code), TMQErr2Str(code))) } assert.Equal(t, 1, len(assignment)) assert.Equal(t, int64(0), assignment[0].Begin) @@ -1508,11 +1542,11 @@ func TestTMQGetTopicAssignment(t *testing.T) { //seek code = TMQOffsetSeek(tmq, "test_tmq_assignment", int32(vgID), 0) if code != 0 { - t.Fatal(errors.NewError(int(code), TaosErrorStr(nil))) + t.Fatal(errors.NewError(int(code), TMQErr2Str(code))) } code, assignment = TMQGetTopicAssignment(tmq, "test_tmq_assignment") if code != 0 { - t.Fatal(errors.NewError(int(code), TaosErrorStr(nil))) + t.Fatal(errors.NewError(int(code), TMQErr2Str(code))) } assert.Equal(t, 1, len(assignment)) assert.Equal(t, int64(0), assignment[0].Begin) @@ -1534,7 +1568,7 @@ func TestTMQGetTopicAssignment(t *testing.T) { assert.True(t, haveMessage, "expect have message") code, assignment = TMQGetTopicAssignment(tmq, "test_tmq_assignment") if code != 0 { - t.Fatal(errors.NewError(int(code), TaosErrorStr(nil))) + t.Fatal(errors.NewError(int(code), TMQErr2Str(code))) } assert.Equal(t, 1, len(assignment)) assert.Equal(t, int64(0), assignment[0].Begin) @@ -1545,11 +1579,11 @@ func TestTMQGetTopicAssignment(t *testing.T) { // seek twice code = TMQOffsetSeek(tmq, "test_tmq_assignment", int32(vgID), 1) if code != 0 { - t.Fatal(errors.NewError(int(code), TaosErrorStr(nil))) + t.Fatal(errors.NewError(int(code), TMQErr2Str(code))) } code, assignment = TMQGetTopicAssignment(tmq, "test_tmq_assignment") if code != 0 { - t.Fatal(errors.NewError(int(code), TaosErrorStr(nil))) + t.Fatal(errors.NewError(int(code), TMQErr2Str(code))) } assert.Equal(t, 1, len(assignment)) assert.Equal(t, int64(0), assignment[0].Begin) @@ -1572,7 +1606,7 @@ func TestTMQGetTopicAssignment(t *testing.T) { assert.True(t, haveMessage, "expect have message") code, assignment = TMQGetTopicAssignment(tmq, "test_tmq_assignment") if code != 0 { - t.Fatal(errors.NewError(int(code), TaosErrorStr(nil))) + t.Fatal(errors.NewError(int(code), TMQErr2Str(code))) } assert.Equal(t, 1, len(assignment)) assert.Equal(t, int64(0), assignment[0].Begin) @@ -1581,6 +1615,422 @@ func TestTMQGetTopicAssignment(t *testing.T) { assert.Equal(t, int32(vgID), assignment[0].VGroupID) } +func TestTMQPosition(t *testing.T) { + conn, err := TaosConnect("", "root", "taosdata", "", 0) + if err != nil { + t.Fatal(err) + return + } + defer TaosClose(conn) + + defer func() { + if err = taosOperation(conn, "drop database if exists test_tmq_position"); err != nil { + t.Error(err) + } + }() + + if err = taosOperation(conn, "create database if not exists test_tmq_position vgroups 1 WAL_RETENTION_PERIOD 86400"); err != nil { + t.Fatal(err) + return + } + if err = taosOperation(conn, "use test_tmq_position"); err != nil { + t.Fatal(err) + return + } + if err = taosOperation(conn, "create table if not exists t (ts timestamp,v int)"); err != nil { + t.Fatal(err) + return + } + + // create topic + if err = taosOperation(conn, "create topic if not exists test_tmq_position_topic as select * from t"); err != nil { + t.Fatal(err) + return + } + + defer func() { + if err = taosOperation(conn, "drop topic if exists test_tmq_position_topic"); err != nil { + t.Error(err) + } + }() + + conf := TMQConfNew() + defer TMQConfDestroy(conf) + TMQConfSet(conf, "group.id", "position") + TMQConfSet(conf, "auto.offset.reset", "earliest") + + tmq, err := TMQConsumerNew(conf) + if err != nil { + t.Fatal(err) + } + defer TMQConsumerClose(tmq) + + topicList := TMQListNew() + TMQListAppend(topicList, "test_tmq_position_topic") + + errCode := TMQSubscribe(tmq, topicList) + if errCode != 0 { + errStr := TMQErr2Str(errCode) + t.Fatal(errors.NewError(int(errCode), errStr)) + return + } + _ = taosOperation(conn, "insert into t values(now,1)") + code, assignment := TMQGetTopicAssignment(tmq, "test_tmq_position_topic") + if code != 0 { + t.Fatal(errors.NewError(int(code), TMQErr2Str(code))) + } + vgID := assignment[0].VGroupID + position := TMQPosition(tmq, "test_tmq_position_topic", vgID) + assert.Equal(t, position, int64(0)) + haveMessage := false + for i := 0; i < 3; i++ { + message := TMQConsumerPoll(tmq, 500) + if message != nil { + haveMessage = true + position := TMQPosition(tmq, "test_tmq_position_topic", vgID) + assert.Greater(t, position, int64(0)) + committed := TMQCommitted(tmq, "test_tmq_position_topic", vgID) + assert.Less(t, committed, int64(0)) + TMQCommitSync(tmq, message) + position = TMQPosition(tmq, "test_tmq_position_topic", vgID) + committed = TMQCommitted(tmq, "test_tmq_position_topic", vgID) + assert.Equal(t, position, committed) + TaosFreeResult(message) + break + } + } + assert.True(t, haveMessage, "expect have message") + errCode = TMQUnsubscribe(tmq) + if errCode != 0 { + errStr := TMQErr2Str(errCode) + t.Error(errors.NewError(int(errCode), errStr)) + return + } +} + +func TestTMQCommitOffset(t *testing.T) { + conn, err := TaosConnect("", "root", "taosdata", "", 0) + if err != nil { + t.Fatal(err) + return + } + defer TaosClose(conn) + + defer func() { + if err = taosOperation(conn, "drop database if exists test_tmq_commit_offset"); err != nil { + t.Error(err) + } + }() + + if err = taosOperation(conn, "create database if not exists test_tmq_commit_offset vgroups 1 WAL_RETENTION_PERIOD 86400"); err != nil { + t.Fatal(err) + return + } + if err = taosOperation(conn, "use test_tmq_commit_offset"); err != nil { + t.Fatal(err) + return + } + if err = taosOperation(conn, "create table if not exists t (ts timestamp,v int)"); err != nil { + t.Fatal(err) + return + } + + // create topic + if err = taosOperation(conn, "create topic if not exists test_tmq_commit_offset_topic as select * from t"); err != nil { + t.Fatal(err) + return + } + + defer func() { + if err = taosOperation(conn, "drop topic if exists test_tmq_commit_offset_topic"); err != nil { + t.Error(err) + } + }() + + conf := TMQConfNew() + defer TMQConfDestroy(conf) + TMQConfSet(conf, "group.id", "commit") + TMQConfSet(conf, "auto.offset.reset", "earliest") + + tmq, err := TMQConsumerNew(conf) + if err != nil { + t.Fatal(err) + } + defer TMQConsumerClose(tmq) + + topicList := TMQListNew() + TMQListAppend(topicList, "test_tmq_commit_offset_topic") + + errCode := TMQSubscribe(tmq, topicList) + if errCode != 0 { + errStr := TMQErr2Str(errCode) + t.Fatal(errors.NewError(int(errCode), errStr)) + return + } + _ = taosOperation(conn, "insert into t values(now,1)") + code, assignment := TMQGetTopicAssignment(tmq, "test_tmq_commit_offset_topic") + if code != 0 { + t.Fatal(errors.NewError(int(code), TMQErr2Str(code))) + } + vgID := assignment[0].VGroupID + haveMessage := false + for i := 0; i < 3; i++ { + message := TMQConsumerPoll(tmq, 500) + if message != nil { + haveMessage = true + position := TMQPosition(tmq, "test_tmq_commit_offset_topic", vgID) + assert.Greater(t, position, int64(0)) + committed := TMQCommitted(tmq, "test_tmq_commit_offset_topic", vgID) + assert.Less(t, committed, int64(0)) + offset := TMQGetVgroupOffset(message) + code = TMQCommitOffsetSync(tmq, "test_tmq_commit_offset_topic", vgID, offset) + if code != 0 { + t.Fatal(errors.NewError(int(code), TMQErr2Str(code))) + } + committed = TMQCommitted(tmq, "test_tmq_commit_offset_topic", vgID) + assert.Equal(t, int64(offset), committed) + TaosFreeResult(message) + break + } + } + assert.True(t, haveMessage, "expect have message") + errCode = TMQUnsubscribe(tmq) + if errCode != 0 { + errStr := TMQErr2Str(errCode) + t.Error(errors.NewError(int(errCode), errStr)) + return + } +} + +func TestTMQCommitOffsetAsync(t *testing.T) { + topic := "test_tmq_commit_offset_a_topic" + tableName := "test_tmq_commit_offset_a" + conn, err := TaosConnect("", "root", "taosdata", "", 0) + if err != nil { + t.Fatal(err) + return + } + defer TaosClose(conn) + + defer func() { + if err = taosOperation(conn, "drop database if exists "+tableName); err != nil { + t.Error(err) + } + }() + + if err = taosOperation(conn, "create database if not exists "+tableName+" vgroups 1 WAL_RETENTION_PERIOD 86400"); err != nil { + t.Fatal(err) + return + } + if err = taosOperation(conn, "use "+tableName); err != nil { + t.Fatal(err) + return + } + if err = taosOperation(conn, "create table if not exists t (ts timestamp,v int)"); err != nil { + t.Fatal(err) + return + } + + // create topic + + if err = taosOperation(conn, "create topic if not exists "+topic+" as select * from t"); err != nil { + t.Fatal(err) + return + } + + defer func() { + if err = taosOperation(conn, "drop topic if exists "+topic); err != nil { + t.Error(err) + } + }() + + conf := TMQConfNew() + defer TMQConfDestroy(conf) + TMQConfSet(conf, "group.id", "commit_a") + TMQConfSet(conf, "auto.offset.reset", "earliest") + + tmq, err := TMQConsumerNew(conf) + if err != nil { + t.Fatal(err) + } + defer TMQConsumerClose(tmq) + + topicList := TMQListNew() + TMQListAppend(topicList, topic) + + errCode := TMQSubscribe(tmq, topicList) + if errCode != 0 { + errStr := TMQErr2Str(errCode) + t.Fatal(errors.NewError(int(errCode), errStr)) + return + } + _ = taosOperation(conn, "insert into t values(now,1)") + code, assignment := TMQGetTopicAssignment(tmq, topic) + if code != 0 { + t.Fatal(errors.NewError(int(code), TMQErr2Str(code))) + } + vgID := assignment[0].VGroupID + haveMessage := false + for i := 0; i < 3; i++ { + message := TMQConsumerPoll(tmq, 500) + if message != nil { + haveMessage = true + position := TMQPosition(tmq, topic, vgID) + assert.Greater(t, position, int64(0)) + committed := TMQCommitted(tmq, topic, vgID) + assert.Less(t, committed, int64(0)) + offset := TMQGetVgroupOffset(message) + c := make(chan *TMQCommitCallbackResult, 1) + handler := cgo.NewHandle(c) + TMQCommitOffsetAsync(tmq, topic, vgID, offset, handler) + timer := time.NewTimer(time.Second * 5) + select { + case r := <-c: + code = r.ErrCode + if code != 0 { + t.Fatal(errors.NewError(int(code), TMQErr2Str(code))) + } + timer.Stop() + case <-timer.C: + t.Fatal("commit async timeout") + timer.Stop() + } + committed = TMQCommitted(tmq, topic, vgID) + assert.Equal(t, int64(offset), committed) + TaosFreeResult(message) + break + } + } + assert.True(t, haveMessage, "expect have message") + errCode = TMQUnsubscribe(tmq) + if errCode != 0 { + errStr := TMQErr2Str(errCode) + t.Error(errors.NewError(int(errCode), errStr)) + return + } +} + +func TestTMQCommitAsyncCallback(t *testing.T) { + topic := "test_tmq_commit_a_cb_topic" + tableName := "test_tmq_commit_a_cb" + conn, err := TaosConnect("", "root", "taosdata", "", 0) + if err != nil { + t.Fatal(err) + return + } + defer TaosClose(conn) + + defer func() { + if err = taosOperation(conn, "drop database if exists "+tableName); err != nil { + t.Error(err) + } + }() + + if err = taosOperation(conn, "create database if not exists "+tableName+" vgroups 1 WAL_RETENTION_PERIOD 86400"); err != nil { + t.Fatal(err) + return + } + if err = taosOperation(conn, "use "+tableName); err != nil { + t.Fatal(err) + return + } + if err = taosOperation(conn, "create table if not exists t (ts timestamp,v int)"); err != nil { + t.Fatal(err) + return + } + + // create topic + + if err = taosOperation(conn, "create topic if not exists "+topic+" as select * from t"); err != nil { + t.Fatal(err) + return + } + + defer func() { + if err = taosOperation(conn, "drop topic if exists "+topic); err != nil { + t.Error(err) + } + }() + + conf := TMQConfNew() + defer TMQConfDestroy(conf) + TMQConfSet(conf, "group.id", "commit_a") + TMQConfSet(conf, "enable.auto.commit", "false") + TMQConfSet(conf, "auto.offset.reset", "earliest") + TMQConfSet(conf, "auto.commit.interval.ms", "100") + c := make(chan *TMQCommitCallbackResult, 1) + h := cgo.NewHandle(c) + TMQConfSetAutoCommitCB(conf, h) + go func() { + for r := range c { + t.Log("auto commit", r) + PutTMQCommitCallbackResult(r) + } + }() + + tmq, err := TMQConsumerNew(conf) + if err != nil { + t.Fatal(err) + } + defer TMQConsumerClose(tmq) + + topicList := TMQListNew() + TMQListAppend(topicList, topic) + + errCode := TMQSubscribe(tmq, topicList) + if errCode != 0 { + errStr := TMQErr2Str(errCode) + t.Fatal(errors.NewError(int(errCode), errStr)) + return + } + _ = taosOperation(conn, "insert into t values(now,1)") + code, assignment := TMQGetTopicAssignment(tmq, topic) + if code != 0 { + t.Fatal(errors.NewError(int(code), TMQErr2Str(code))) + } + vgID := assignment[0].VGroupID + haveMessage := false + for i := 0; i < 3; i++ { + message := TMQConsumerPoll(tmq, 500) + if message != nil { + haveMessage = true + position := TMQPosition(tmq, topic, vgID) + assert.Greater(t, position, int64(0)) + committed := TMQCommitted(tmq, topic, vgID) + assert.Less(t, committed, int64(0)) + offset := TMQGetVgroupOffset(message) + TMQCommitOffsetSync(tmq, topic, vgID, offset) + committed = TMQCommitted(tmq, topic, vgID) + assert.Equal(t, offset, committed) + TaosFreeResult(message) + } + } + assert.True(t, haveMessage, "expect have message") + committed := TMQCommitted(tmq, topic, vgID) + t.Log(committed) + code, assignment = TMQGetTopicAssignment(tmq, topic) + if code != 0 { + t.Fatal(errors.NewError(int(code), TMQErr2Str(code))) + } + t.Log(assignment[0].Offset) + TMQCommitOffsetSync(tmq, topic, vgID, 1) + committed = TMQCommitted(tmq, topic, vgID) + assert.Equal(t, int64(1), committed) + code, assignment = TMQGetTopicAssignment(tmq, topic) + if code != 0 { + t.Fatal(errors.NewError(int(code), TMQErr2Str(code))) + } + t.Log(assignment[0].Offset) + position := TMQPosition(tmq, topic, vgID) + t.Log(position) + errCode = TMQUnsubscribe(tmq) + if errCode != 0 { + errStr := TMQErr2Str(errCode) + t.Error(errors.NewError(int(errCode), errStr)) + return + } +} + func taosOperation(conn unsafe.Pointer, sql string) (err error) { res := TaosQuery(conn, sql) defer TaosFreeResult(res) diff --git a/wrapper/tmqcb.go b/wrapper/tmqcb.go index 82ae1ee..3893944 100644 --- a/wrapper/tmqcb.go +++ b/wrapper/tmqcb.go @@ -25,3 +25,25 @@ func TMQCommitCB(consumer unsafe.Pointer, resp C.int32_t, param unsafe.Pointer) }() c <- r } + +//export TMQAutoCommitCB +func TMQAutoCommitCB(consumer unsafe.Pointer, resp C.int32_t, param unsafe.Pointer) { + c := (*(*cgo.Handle)(param)).Value().(chan *TMQCommitCallbackResult) + r := GetTMQCommitCallbackResult(int32(resp), consumer) + defer func() { + // Avoid panic due to channel closed + recover() + }() + c <- r +} + +//export TMQCommitOffsetCB +func TMQCommitOffsetCB(consumer unsafe.Pointer, resp C.int32_t, param unsafe.Pointer) { + c := (*(*cgo.Handle)(param)).Value().(chan *TMQCommitCallbackResult) + r := GetTMQCommitCallbackResult(int32(resp), consumer) + defer func() { + // Avoid panic due to channel closed + recover() + }() + c <- r +} diff --git a/wrapper/whitelist.go b/wrapper/whitelist.go new file mode 100644 index 0000000..32da258 --- /dev/null +++ b/wrapper/whitelist.go @@ -0,0 +1,29 @@ +package wrapper + +/* +#cgo CFLAGS: -IC:/TDengine/include -I/usr/include +#cgo linux LDFLAGS: -L/usr/lib -ltaos +#cgo windows LDFLAGS: -LC:/TDengine/driver -ltaos +#cgo darwin LDFLAGS: -L/usr/local/lib -ltaos +#include +#include +#include +#include +extern void WhitelistCallback(void *param, int code, TAOS *taos, int numOfWhiteLists, uint64_t* pWhiteLists); +void taos_fetch_whitelist_a_wrapper(TAOS *taos, void *param){ + return taos_fetch_whitelist_a(taos, WhitelistCallback, param); +}; +*/ +import "C" +import ( + "unsafe" + + "github.com/taosdata/driver-go/v3/wrapper/cgo" +) + +// typedef void (*__taos_async_whitelist_fn_t)(void *param, int code, TAOS *taos, int numOfWhiteLists, uint64_t* pWhiteLists); + +// TaosFetchWhitelistA DLL_EXPORT void taos_fetch_whitelist_a(TAOS *taos, __taos_async_whitelist_fn_t fp, void *param); +func TaosFetchWhitelistA(taosConnect unsafe.Pointer, caller cgo.Handle) { + C.taos_fetch_whitelist_a_wrapper(taosConnect, caller.Pointer()) +} diff --git a/wrapper/whitelist_test.go b/wrapper/whitelist_test.go new file mode 100644 index 0000000..26a3a6e --- /dev/null +++ b/wrapper/whitelist_test.go @@ -0,0 +1,21 @@ +package wrapper + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/taosdata/driver-go/v3/wrapper/cgo" +) + +func TestGetWhiteList(t *testing.T) { + conn, err := TaosConnect("", "root", "taosdata", "", 0) + assert.NoError(t, err) + defer TaosClose(conn) + c := make(chan *WhitelistResult, 1) + handler := cgo.NewHandle(c) + TaosFetchWhitelistA(conn, handler) + data := <-c + assert.Equal(t, int32(0), data.ErrCode) + assert.Equal(t, 1, len(data.IPNets)) + assert.Equal(t, "0.0.0.0/0", data.IPNets[0].String()) +} diff --git a/wrapper/whitelistcb.go b/wrapper/whitelistcb.go new file mode 100644 index 0000000..34710c8 --- /dev/null +++ b/wrapper/whitelistcb.go @@ -0,0 +1,35 @@ +package wrapper + +import "C" +import ( + "net" + "unsafe" + + "github.com/taosdata/driver-go/v3/wrapper/cgo" +) + +type WhitelistResult struct { + ErrCode int32 + IPNets []*net.IPNet +} + +//export WhitelistCallback +func WhitelistCallback(param unsafe.Pointer, code int, taosConnect unsafe.Pointer, numOfWhiteLists int, pWhiteLists unsafe.Pointer) { + c := (*(*cgo.Handle)(param)).Value().(chan *WhitelistResult) + if code != 0 { + c <- &WhitelistResult{ErrCode: int32(code)} + return + } + ips := make([]*net.IPNet, 0, numOfWhiteLists) + for i := 0; i < numOfWhiteLists; i++ { + ipNet := make([]byte, 8) + for j := 0; j < 8; j++ { + ipNet[j] = *(*byte)(unsafe.Pointer(uintptr(pWhiteLists) + uintptr(i*8) + uintptr(j))) + } + ip := net.IP{ipNet[0], ipNet[1], ipNet[2], ipNet[3]} + ones := int(ipNet[4]) + ipMask := net.CIDRMask(ones, 32) + ips = append(ips, &net.IPNet{IP: ip, Mask: ipMask}) + } + c <- &WhitelistResult{IPNets: ips} +} diff --git a/ws/client/conn.go b/ws/client/conn.go index 493a7f1..37a5dd2 100644 --- a/ws/client/conn.go +++ b/ws/client/conn.go @@ -87,7 +87,7 @@ func NewClient(conn *websocket.Conn, sendChanLength uint) *Client { } func (c *Client) ReadPump() { - c.conn.SetReadLimit(common.BufferSize4M) + c.conn.SetReadLimit(0) c.conn.SetReadDeadline(time.Now().Add(c.PongWait)) c.conn.SetPongHandler(func(string) error { c.conn.SetReadDeadline(time.Now().Add(c.PongWait)) @@ -105,9 +105,9 @@ func (c *Client) ReadPump() { } switch messageType { case websocket.TextMessage: - c.TextMessageHandler(message) + go c.TextMessageHandler(message) case websocket.BinaryMessage: - c.BinaryMessageHandler(message) + go c.BinaryMessageHandler(message) } } } diff --git a/ws/tmq/consumer.go b/ws/tmq/consumer.go index b3ebced..df1889b 100644 --- a/ws/tmq/consumer.go +++ b/ws/tmq/consumer.go @@ -291,6 +291,10 @@ const ( TMQUnsubscribe = "unsubscribe" TMQGetTopicAssignment = "assignment" TMQSeek = "seek" + TMQCommitOffset = "commit_offset" + TMQCommitted = "committed" + TMQPosition = "position" + TMQListTopics = "list_topics" ) var ClosedErr = errors.New("connection closed") @@ -426,6 +430,12 @@ func (c *Consumer) Poll(timeoutMs int) tmq.Event { return tmq.NewTMQErrorWithErr(err) } result.SetData(data) + topic := resp.Topic + result.TopicPartition = tmq.TopicPartition{ + Topic: &topic, + Partition: resp.VgroupID, + Offset: tmq.Offset(resp.Offset), + } return result case common.TMQ_RES_TABLE_META: result := &tmq.MetaMessage{} @@ -436,6 +446,12 @@ func (c *Consumer) Poll(timeoutMs int) tmq.Event { if err != nil { return tmq.NewTMQErrorWithErr(err) } + topic := resp.Topic + result.TopicPartition = tmq.TopicPartition{ + Topic: &topic, + Partition: resp.VgroupID, + Offset: tmq.Offset(resp.Offset), + } result.SetMeta(meta) return result case common.TMQ_RES_METADATA: @@ -455,6 +471,12 @@ func (c *Consumer) Poll(timeoutMs int) tmq.Event { Meta: meta, Data: data, }) + topic := resp.Topic + result.TopicPartition = tmq.TopicPartition{ + Topic: &topic, + Partition: resp.VgroupID, + Offset: tmq.Offset(resp.Offset), + } return result default: return tmq.NewTMQErrorWithErr(err) @@ -566,7 +588,8 @@ func (c *Consumer) fetch(messageID uint64) ([]*tmq.Data, error) { return nil, err } block := respBytes[24:] - data := parser.ReadBlock(unsafe.Pointer(*(*uintptr)(unsafe.Pointer(&block))), resp.Rows, resp.FieldsTypes, resp.Precision) + p := unsafe.Pointer(&block[0]) + data := parser.ReadBlock(p, resp.Rows, resp.FieldsTypes, resp.Precision) tmqData = append(tmqData, &tmq.Data{ TableName: resp.TableName, Data: data, @@ -615,7 +638,11 @@ func (c *Consumer) doCommit(messageID uint64) ([]tmq.TopicPartition, error) { if resp.Code != 0 { return nil, taosErrors.NewError(resp.Code, resp.Message) } - return nil, nil + partitions, err := c.Assignment() + if err != nil { + return nil, err + } + return c.Committed(partitions, 0) } func (c *Consumer) Unsubscribe() error { @@ -743,3 +770,143 @@ func (c *Consumer) Seek(partition tmq.TopicPartition, ignoredTimeoutMs int) erro } return nil } + +func (c *Consumer) Committed(partitions []tmq.TopicPartition, timeoutMs int) (offsets []tmq.TopicPartition, err error) { + offsets = make([]tmq.TopicPartition, len(partitions)) + reqID := c.generateReqID() + req := &CommittedReq{ + ReqID: reqID, + TopicVgroupIDs: make([]TopicVgroupID, len(partitions)), + } + for i := 0; i < len(partitions); i++ { + req.TopicVgroupIDs[i] = TopicVgroupID{ + Topic: *partitions[i].Topic, + VgroupID: partitions[i].Partition, + } + } + args, err := client.JsonI.Marshal(req) + if err != nil { + return nil, err + } + action := &client.WSAction{ + Action: TMQCommitted, + Args: args, + } + envelope := c.client.GetEnvelope() + err = client.JsonI.NewEncoder(envelope.Msg).Encode(action) + if err != nil { + c.client.PutEnvelope(envelope) + return nil, err + } + respBytes, err := c.sendText(reqID, envelope) + if err != nil { + return nil, err + } + var resp CommittedResp + err = client.JsonI.Unmarshal(respBytes, &resp) + if err != nil { + return nil, err + } + if resp.Code != 0 { + return nil, taosErrors.NewError(resp.Code, resp.Message) + } + for i := 0; i < len(resp.Committed); i++ { + offsets[i] = tmq.TopicPartition{ + Topic: partitions[i].Topic, + Partition: partitions[i].Partition, + Offset: tmq.Offset(resp.Committed[i]), + } + } + return offsets, nil +} + +func (c *Consumer) CommitOffsets(offsets []tmq.TopicPartition) ([]tmq.TopicPartition, error) { + if c.err != nil { + return nil, c.err + } + for i := 0; i < len(offsets); i++ { + reqID := c.generateReqID() + req := &CommitOffsetReq{ + ReqID: reqID, + Topic: *offsets[i].Topic, + VgroupID: offsets[i].Partition, + Offset: int64(offsets[i].Offset), + } + args, err := client.JsonI.Marshal(req) + if err != nil { + return nil, err + } + action := &client.WSAction{ + Action: TMQCommitOffset, + Args: args, + } + envelope := c.client.GetEnvelope() + err = client.JsonI.NewEncoder(envelope.Msg).Encode(action) + if err != nil { + c.client.PutEnvelope(envelope) + return nil, err + } + respBytes, err := c.sendText(reqID, envelope) + if err != nil { + return nil, err + } + var resp CommitOffsetResp + err = client.JsonI.Unmarshal(respBytes, &resp) + if err != nil { + return nil, err + } + if resp.Code != 0 { + return nil, taosErrors.NewError(resp.Code, resp.Message) + } + } + return c.Committed(offsets, 0) +} + +func (c *Consumer) Position(partitions []tmq.TopicPartition) (offsets []tmq.TopicPartition, err error) { + offsets = make([]tmq.TopicPartition, len(partitions)) + reqID := c.generateReqID() + req := &PositionReq{ + ReqID: reqID, + TopicVgroupIDs: make([]TopicVgroupID, len(partitions)), + } + for i := 0; i < len(partitions); i++ { + req.TopicVgroupIDs[i] = TopicVgroupID{ + Topic: *partitions[i].Topic, + VgroupID: partitions[i].Partition, + } + } + args, err := client.JsonI.Marshal(req) + if err != nil { + return nil, err + } + action := &client.WSAction{ + Action: TMQPosition, + Args: args, + } + envelope := c.client.GetEnvelope() + err = client.JsonI.NewEncoder(envelope.Msg).Encode(action) + if err != nil { + c.client.PutEnvelope(envelope) + return nil, err + } + respBytes, err := c.sendText(reqID, envelope) + if err != nil { + return nil, err + } + var resp PositionResp + err = client.JsonI.Unmarshal(respBytes, &resp) + if err != nil { + return nil, err + } + if resp.Code != 0 { + return nil, taosErrors.NewError(resp.Code, resp.Message) + } + for i := 0; i < len(resp.Position); i++ { + offsets[i] = tmq.TopicPartition{ + Topic: partitions[i].Topic, + Partition: partitions[i].Partition, + Offset: tmq.Offset(resp.Position[i]), + } + } + return offsets, nil +} diff --git a/ws/tmq/consumer_test.go b/ws/tmq/consumer_test.go index d44623d..a7bf95e 100644 --- a/ws/tmq/consumer_test.go +++ b/ws/tmq/consumer_test.go @@ -22,7 +22,7 @@ func prepareEnv() error { "drop topic if exists test_ws_tmq_topic", "drop database if exists test_ws_tmq", "create database test_ws_tmq WAL_RETENTION_PERIOD 86400", - "create topic test_ws_tmq_topic with meta as database test_ws_tmq", + "create topic test_ws_tmq_topic as database test_ws_tmq", } for _, step := range steps { err = doRequest(step) @@ -124,19 +124,18 @@ func TestConsumer(t *testing.T) { } }() consumer, err := NewConsumer(&tmq.ConfigMap{ - "ws.url": "ws://127.0.0.1:6041/rest/tmq", - "ws.message.channelLen": uint(0), - "ws.message.timeout": common.DefaultMessageTimeout, - "ws.message.writeWait": common.DefaultWriteWait, - "td.connect.user": "root", - "td.connect.pass": "taosdata", - "group.id": "test", - "client.id": "test_consumer", - "auto.offset.reset": "earliest", - "enable.auto.commit": "true", - "auto.commit.interval.ms": "5000", - "experimental.snapshot.enable": "true", - "msg.with.table.name": "true", + "ws.url": "ws://127.0.0.1:6041/rest/tmq", + "ws.message.channelLen": uint(0), + "ws.message.timeout": common.DefaultMessageTimeout, + "ws.message.writeWait": common.DefaultWriteWait, + "td.connect.user": "root", + "td.connect.pass": "taosdata", + "group.id": "test", + "client.id": "test_consumer", + "auto.offset.reset": "earliest", + "enable.auto.commit": "true", + "auto.commit.interval.ms": "5000", + "msg.with.table.name": "true", }) if err != nil { t.Error(err) @@ -149,10 +148,9 @@ func TestConsumer(t *testing.T) { t.Error(err) return } - gotMeta := false gotData := false for i := 0; i < 5; i++ { - if gotData && gotMeta { + if gotData { return } ev := consumer.Poll(0) @@ -180,84 +178,23 @@ func TestConsumer(t *testing.T) { assert.Equal(t, float64(11.123), v[11].(float64)) assert.Equal(t, "binary", v[12].(string)) assert.Equal(t, "nchar", v[13].(string)) - case *tmq.MetaMessage: - gotMeta = true - meta := e.Value().(*tmq.Meta) - assert.Equal(t, "test_ws_tmq", e.DBName()) - assert.Equal(t, "create", meta.Type) - assert.Equal(t, "t_all", meta.TableName) - assert.Equal(t, "normal", meta.TableType) - assert.Equal(t, []*tmq.Column{ - { - Name: "ts", - Type: 9, - Length: 0, - }, - { - Name: "c1", - Type: 1, - Length: 0, - }, - { - Name: "c2", - Type: 2, - Length: 0, - }, - { - Name: "c3", - Type: 3, - Length: 0, - }, - { - Name: "c4", - Type: 4, - Length: 0, - }, - { - Name: "c5", - Type: 5, - Length: 0, - }, - { - Name: "c6", - Type: 11, - Length: 0, - }, - { - Name: "c7", - Type: 12, - Length: 0, - }, - { - Name: "c8", - Type: 13, - Length: 0, - }, - { - Name: "c9", - Type: 14, - Length: 0, - }, - { - Name: "c10", - Type: 6, - Length: 0, - }, - { - Name: "c11", - Type: 7, - Length: 0, - }, - { - Name: "c12", - Type: 8, - Length: 20, - }, - { - Name: "c13", - Type: 10, - Length: 20, - }}, meta.Columns) + t.Log(e.Offset()) + ass, err := consumer.Assignment() + t.Log(ass) + committed, err := consumer.Committed(ass, 0) + t.Log(committed) + position, _ := consumer.Position(ass) + t.Log(position) + offsets, err := consumer.Position([]tmq.TopicPartition{e.TopicPartition}) + assert.NoError(t, err) + _, err = consumer.CommitOffsets(offsets) + assert.NoError(t, err) + ass, err = consumer.Assignment() + t.Log(ass) + committed, err = consumer.Committed(ass, 0) + t.Log(committed) + position, _ = consumer.Position(ass) + t.Log(position) case tmq.Error: t.Error(e) return @@ -265,7 +202,7 @@ func TestConsumer(t *testing.T) { t.Error("unexpected", e) return } - _, err = consumer.Commit() + } if err != nil { @@ -273,9 +210,6 @@ func TestConsumer(t *testing.T) { return } } - if !gotMeta { - t.Error("no meta got") - } if !gotData { t.Error("no data got") } diff --git a/ws/tmq/proto.go b/ws/tmq/proto.go index a067d63..d9b8c1d 100644 --- a/ws/tmq/proto.go +++ b/ws/tmq/proto.go @@ -145,3 +145,54 @@ type OffsetSeekResp struct { ReqID uint64 `json:"req_id"` Timing int64 `json:"timing"` } + +type CommittedReq struct { + ReqID uint64 `json:"req_id"` + TopicVgroupIDs []TopicVgroupID `json:"topic_vgroup_ids"` +} + +type CommittedResp struct { + Code int `json:"code"` + Message string `json:"message"` + Action string `json:"action"` + ReqID uint64 `json:"req_id"` + Timing int64 `json:"timing"` + Committed []int64 `json:"committed"` +} + +type TopicVgroupID struct { + Topic string `json:"topic"` + VgroupID int32 `json:"vgroup_id"` +} + +type CommitOffsetReq struct { + ReqID uint64 `json:"req_id"` + Topic string `json:"topic"` + VgroupID int32 `json:"vgroup_id"` + Offset int64 `json:"offset"` +} + +type CommitOffsetResp struct { + Code int `json:"code"` + Message string `json:"message"` + Action string `json:"action"` + ReqID uint64 `json:"req_id"` + Timing int64 `json:"timing"` + Topic string `json:"topic"` + VgroupID int32 `json:"vgroup_id"` + Offset int64 `json:"offset"` +} + +type PositionReq struct { + ReqID uint64 `json:"req_id"` + TopicVgroupIDs []TopicVgroupID `json:"topic_vgroup_ids"` +} + +type PositionResp struct { + Code int `json:"code"` + Message string `json:"message"` + Action string `json:"action"` + ReqID uint64 `json:"req_id"` + Timing int64 `json:"timing"` + Position []int64 `json:"position"` +}