Skip to content

Commit

Permalink
Merge pull request #235 from taosdata/enh/xftan/TD-28118-3.1
Browse files Browse the repository at this point in the history
synchronous 3.0 branch function
  • Loading branch information
huskar-t authored Jan 26, 2024
2 parents 525e132 + 16ab09c commit 6f95553
Show file tree
Hide file tree
Showing 35 changed files with 4,690 additions and 240 deletions.
6 changes: 1 addition & 5 deletions .github/workflows/go.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@ on:
required: true
type: string

env:
SCCACHE_GHA_ENABLED: "true"

jobs:
build:
Expand Down Expand Up @@ -45,8 +43,6 @@ jobs:
cd TDengine
echo "commit_id=$(git rev-parse HEAD)" >> $GITHUB_OUTPUT
- name: Run sccache-cache
uses: mozilla-actions/[email protected]
- name: Cache server by pr
if: github.event_name == 'pull_request'
Expand Down Expand Up @@ -78,7 +74,7 @@ jobs:
cd TDengine
mkdir debug
cd debug
cmake .. -DBUILD_TEST=off -DBUILD_HTTP=false -DVERNUMBER=3.9.9.9 -DCMAKE_C_COMPILER_LAUNCHER=sccache -DCMAKE_CXX_COMPILER_LAUNCHER=sccache
cmake .. -DBUILD_TEST=off -DBUILD_HTTP=false -DVERNUMBER=3.9.9.9
make -j 4
- name: package
Expand Down
8 changes: 1 addition & 7 deletions .github/workflows/push.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ on:
- '3.0'
- '3.1'

env:
SCCACHE_GHA_ENABLED: "true"

jobs:
build:
Expand All @@ -30,8 +28,6 @@ jobs:
cd TDengine
echo "commit_id=$(git rev-parse HEAD)" >> $GITHUB_OUTPUT
- name: Run sccache-cache
uses: mozilla-actions/[email protected]
- name: Cache server
id: cache-server
Expand All @@ -44,16 +40,14 @@ jobs:
if: steps.cache-server.outputs.cache-hit != 'true'
run: sudo apt install -y libgeos-dev

- name: Run sccache-cache
uses: mozilla-actions/[email protected]

- name: install TDengine
if: steps.cache-server.outputs.cache-hit != 'true'
run: |
cd TDengine
mkdir debug
cd debug
cmake .. -DBUILD_JDBC=false -DBUILD_TEST=off -DBUILD_HTTP=false -DVERNUMBER=3.9.9.9 -DCMAKE_C_COMPILER_LAUNCHER=sccache -DCMAKE_CXX_COMPILER_LAUNCHER=sccache
cmake .. -DBUILD_JDBC=false -DBUILD_TEST=off -DBUILD_HTTP=false -DVERNUMBER=3.9.9.9
make -j 4
- name: package
Expand Down
1 change: 1 addition & 0 deletions README-CN.md
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,7 @@ DSN 格式为:

- `writeTimeout` 通过 websocket 发送数据的超时时间。
- `readTimeout` 通过 websocket 接收响应数据的超时时间。
- `enableCompression` 是否压缩传输数据,默认为 `false` 不发送压缩数据。

## 通过 websocket 使用 tmq

Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,7 @@ Parameters:

- `writeTimeout` The timeout to send data via websocket.
- `readTimeout` The timeout to receive response data via websocket.
- `enableCompression` Whether to compress the transmitted data, the default is `false` and no compressed data is sent.

## Using tmq over websocket

Expand Down
39 changes: 18 additions & 21 deletions af/tmq/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ import (
)

type Consumer struct {
cConsumer unsafe.Pointer
cConsumer unsafe.Pointer
dataParser *parser.TMQRawDataParser
}

// NewConsumer Create new TMQ consumer with TMQ config
Expand All @@ -28,7 +29,8 @@ func NewConsumer(conf *tmq.ConfigMap) (*Consumer, error) {
return nil, err
}
consumer := &Consumer{
cConsumer: cConsumer,
cConsumer: cConsumer,
dataParser: parser.NewTMQRawDataParser(),
}
return consumer, nil
}
Expand Down Expand Up @@ -176,27 +178,22 @@ func (c *Consumer) getMeta(message unsafe.Pointer) (*tmq.Meta, error) {
}

func (c *Consumer) getData(message unsafe.Pointer) ([]*tmq.Data, error) {
errCode, raw := wrapper.TMQGetRaw(message)
if errCode != taosError.SUCCESS {
errStr := wrapper.TaosErrorStr(message)
err := taosError.NewError(int(errCode), errStr)
return nil, err
}
_, _, rawPtr := wrapper.ParseRawMeta(raw)
blockInfos, err := c.dataParser.Parse(rawPtr)
if err != nil {
return nil, err
}
var tmqData []*tmq.Data
for {
blockSize, errCode, block := wrapper.TaosFetchRawBlock(message)
if errCode != int(taosError.SUCCESS) {
errStr := wrapper.TaosErrorStr(message)
err := taosError.NewError(errCode, errStr)
return nil, err
}
if blockSize == 0 {
break
}
tableName := wrapper.TMQGetTableName(message)
fileCount := wrapper.TaosNumFields(message)
rh, err := wrapper.ReadColumn(message, fileCount)
if err != nil {
return nil, err
}
precision := wrapper.TaosResultPrecision(message)
for i := 0; i < len(blockInfos); i++ {
tmqData = append(tmqData, &tmq.Data{
TableName: tableName,
Data: parser.ReadBlock(block, blockSize, rh.ColTypes, precision),
TableName: blockInfos[i].TableName,
Data: parser.ReadBlockSimple(blockInfos[i].RawBlock, blockInfos[i].Precision),
})
}
return tmqData, nil
Expand Down
118 changes: 108 additions & 10 deletions af/tmq/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,15 +68,14 @@ func TestTmq(t *testing.T) {
assert.NoError(t, err)

consumer, err := NewConsumer(&tmq.ConfigMap{
"group.id": "test",
"auto.offset.reset": "earliest",
"td.connect.ip": "127.0.0.1",
"td.connect.user": "root",
"td.connect.pass": "taosdata",
"td.connect.port": "6030",
"client.id": "test_tmq_c",
"enable.auto.commit": "false",
//"experimental.snapshot.enable": "true",
"group.id": "test",
"auto.offset.reset": "earliest",
"td.connect.ip": "127.0.0.1",
"td.connect.user": "root",
"td.connect.pass": "taosdata",
"td.connect.port": "6030",
"client.id": "test_tmq_c",
"enable.auto.commit": "false",
"msg.with.table.name": "true",
})
if err != nil {
Expand Down Expand Up @@ -181,7 +180,7 @@ func TestSeek(t *testing.T) {
}

defer func() {
//execWithoutResult(conn, "drop database if exists "+db)
execWithoutResult(conn, "drop database if exists "+db)
}()
for _, sql := range sqls {
err = execWithoutResult(conn, sql)
Expand Down Expand Up @@ -309,3 +308,102 @@ func execWithoutResult(conn unsafe.Pointer, sql string) error {
}
return nil
}

func prepareMultiBlockEnv(conn unsafe.Pointer) error {
var err error
steps := []string{
"drop topic if exists test_tmq_multi_block_topic",
"drop database if exists test_tmq_multi_block",
"create database test_tmq_multi_block vgroups 1 WAL_RETENTION_PERIOD 86400",
"create topic test_tmq_multi_block_topic as database test_tmq_multi_block",
"create table test_tmq_multi_block.t1(ts timestamp,v int)",
"create table test_tmq_multi_block.t2(ts timestamp,v int)",
"create table test_tmq_multi_block.t3(ts timestamp,v int)",
"create table test_tmq_multi_block.t4(ts timestamp,v int)",
"create table test_tmq_multi_block.t5(ts timestamp,v int)",
"create table test_tmq_multi_block.t6(ts timestamp,v int)",
"create table test_tmq_multi_block.t7(ts timestamp,v int)",
"create table test_tmq_multi_block.t8(ts timestamp,v int)",
"create table test_tmq_multi_block.t9(ts timestamp,v int)",
"create table test_tmq_multi_block.t10(ts timestamp,v int)",
"insert into test_tmq_multi_block.t1 values (now,1) test_tmq_multi_block.t2 values (now,2) " +
"test_tmq_multi_block.t3 values (now,3) test_tmq_multi_block.t4 values (now,4)" +
"test_tmq_multi_block.t5 values (now,5) test_tmq_multi_block.t6 values (now,6)" +
"test_tmq_multi_block.t7 values (now,7) test_tmq_multi_block.t8 values (now,8)" +
"test_tmq_multi_block.t9 values (now,9) test_tmq_multi_block.t10 values (now,10)",
}
for _, step := range steps {
err = execWithoutResult(conn, step)
if err != nil {
return err
}
}
return nil
}

func cleanMultiBlockEnv(conn unsafe.Pointer) error {
var err error
time.Sleep(2 * time.Second)
steps := []string{
"drop topic if exists test_tmq_multi_block_topic",
"drop database if exists test_tmq_multi_block",
}
for _, step := range steps {
err = execWithoutResult(conn, step)
if err != nil {
return err
}
}
return nil
}

func TestMultiBlock(t *testing.T) {
conn, err := wrapper.TaosConnect("", "root", "taosdata", "", 0)
if err != nil {
t.Error(err)
return
}
defer wrapper.TaosClose(conn)
err = prepareMultiBlockEnv(conn)
assert.NoError(t, err)
defer cleanMultiBlockEnv(conn)
consumer, err := NewConsumer(&tmq.ConfigMap{
"group.id": "test",
"td.connect.ip": "127.0.0.1",
"td.connect.user": "root",
"td.connect.pass": "taosdata",
"td.connect.port": "6030",
"auto.offset.reset": "earliest",
"client.id": "test_tmq_multi_block_topic",
"enable.auto.commit": "false",
"msg.with.table.name": "true",
})
assert.NoError(t, err)
if err != nil {
t.Error(err)
return
}
defer func() {
consumer.Unsubscribe()
consumer.Close()
}()
topic := []string{"test_tmq_multi_block_topic"}
err = consumer.SubscribeTopics(topic, nil)
if err != nil {
t.Error(err)
return
}
for i := 0; i < 10; i++ {
event := consumer.Poll(500)
if event == nil {
continue
}
switch e := event.(type) {
case *tmq.DataMessage:
data := e.Value().([]*tmq.Data)
assert.Equal(t, "test_tmq_multi_block", e.DBName())
assert.Equal(t, 10, len(data))
return
}
}
}
4 changes: 4 additions & 0 deletions common/param/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ func NewColumnType(size int) *ColumnType {
return &ColumnType{size: size, value: make([]*types.ColumnType, size)}
}

func NewColumnTypeWithValue(value []*types.ColumnType) *ColumnType {
return &ColumnType{size: len(value), value: value, column: len(value)}
}

func (c *ColumnType) AddBool() *ColumnType {
if c.column >= c.size {
return c
Expand Down
9 changes: 9 additions & 0 deletions common/param/param.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,15 @@ func NewParam(size int) *Param {
}
}

func NewParamsWithRowValue(value []driver.Value) []*Param {
params := make([]*Param, len(value))
for i, d := range value {
params[i] = NewParam(1)
params[i].AddValue(d)
}
return params
}

func (p *Param) SetBool(offset int, value bool) {
if offset >= p.size {
return
Expand Down
12 changes: 12 additions & 0 deletions common/parser/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,18 @@ func rawConvertJson(pHeader, pStart unsafe.Pointer, row int) driver.Value {
return binaryVal[:]
}

func ReadBlockSimple(block unsafe.Pointer, precision int) [][]driver.Value {
blockSize := RawBlockGetNumOfRows(block)
colCount := RawBlockGetNumOfCols(block)
colInfo := make([]RawBlockColInfo, colCount)
RawBlockGetColInfo(block, colInfo)
colTypes := make([]uint8, colCount)
for i := int32(0); i < colCount; i++ {
colTypes[i] = uint8(colInfo[i].ColType)
}
return ReadBlock(block, int(blockSize), colTypes, precision)
}

// ReadBlock in-place
func ReadBlock(block unsafe.Pointer, blockSize int, colTypes []uint8, precision int) [][]driver.Value {
r := make([][]driver.Value, blockSize)
Expand Down
10 changes: 2 additions & 8 deletions common/parser/block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -663,12 +663,6 @@ func TestParseBlock(t *testing.T) {
t.Error(errors.NewError(code, errStr))
return
}
fileCount := wrapper.TaosNumFields(res)
rh, err := wrapper.ReadColumn(res, fileCount)
if err != nil {
t.Error(err)
return
}
precision := wrapper.TaosResultPrecision(res)
var data [][]driver.Value
for {
Expand All @@ -684,7 +678,7 @@ func TestParseBlock(t *testing.T) {
break
}
version := RawBlockGetVersion(block)
assert.Equal(t, int32(1), version)
t.Log(version)
length := RawBlockGetLength(block)
assert.Equal(t, int32(447), length)
rows := RawBlockGetNumOfRows(block)
Expand Down Expand Up @@ -771,7 +765,7 @@ func TestParseBlock(t *testing.T) {
},
infos,
)
d := ReadBlock(block, blockSize, rh.ColTypes, precision)
d := ReadBlockSimple(block, precision)
data = append(data, d...)
}
wrapper.TaosFreeResult(res)
Expand Down
Loading

0 comments on commit 6f95553

Please sign in to comment.