Skip to content

Commit

Permalink
Merge pull request #260 from taosdata/3.0
Browse files Browse the repository at this point in the history
merge 3.0 to main
  • Loading branch information
huskar-t authored Apr 23, 2024
2 parents 285cf3a + e38bef1 commit 575719f
Show file tree
Hide file tree
Showing 42 changed files with 6,076 additions and 1,204 deletions.
28 changes: 15 additions & 13 deletions .github/workflows/go.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ on:
required: true
type: string

env:
SCCACHE_GHA_ENABLED: "true"

jobs:
build:
Expand Down Expand Up @@ -44,8 +42,6 @@ jobs:
cd TDengine
echo "commit_id=$(git rev-parse HEAD)" >> $GITHUB_OUTPUT
- name: Run sccache-cache
uses: mozilla-actions/[email protected]
- name: Cache server by pr
if: github.event_name == 'pull_request'
Expand Down Expand Up @@ -77,7 +73,7 @@ jobs:
cd TDengine
mkdir debug
cd debug
cmake .. -DBUILD_TEST=off -DBUILD_HTTP=false -DVERNUMBER=3.9.9.9 -DCMAKE_C_COMPILER_LAUNCHER=sccache -DCMAKE_CXX_COMPILER_LAUNCHER=sccache
cmake .. -DBUILD_TEST=off -DBUILD_HTTP=false -DVERNUMBER=3.9.9.9
make -j 4
- name: package
Expand Down Expand Up @@ -114,7 +110,7 @@ jobs:
needs: build
strategy:
matrix:
go: [ '1.14', '1.19' ]
go: [ '1.14', 'stable' ]
name: Go ${{ matrix.go }}
steps:
- name: get cache server by pr
Expand All @@ -137,11 +133,6 @@ jobs:
restore-keys: |
${{ runner.os }}-build-${{ inputs.tbBranch }}-
- name: checkout
uses: actions/checkout@v3
with:
path: 'driver-go'

- name: prepare install
run: sudo apt install -y libgeos-dev

Expand All @@ -150,6 +141,9 @@ jobs:
tar -zxvf server.tar.gz
cd release && sudo sh install.sh
- name: checkout
uses: actions/checkout@v3

- name: shell
run: |
cat >start.sh<<EOF
Expand All @@ -166,7 +160,15 @@ jobs:
uses: actions/setup-go@v4
with:
go-version: ${{ matrix.go }}
cache-dependency-path: driver-go/go.sum
cache-dependency-path: go.sum

- name: Test
run: cd ./driver-go && go test -v ./...
run: sudo go test -v --count=1 -coverprofile=coverage.txt -covermode=atomic ./...

- name: Upload coverage to Codecov
if: ${{ matrix.go }} == 'stable'
uses: codecov/codecov-action@v4-beta
with:
files: ./coverage.txt
env:
CODECOV_TOKEN: ${{ secrets.CODECOV_ORG_TOKEN }}
30 changes: 15 additions & 15 deletions .github/workflows/push.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ on:
- 'main'
- '3.0'

env:
SCCACHE_GHA_ENABLED: "true"

jobs:
build:
Expand All @@ -29,8 +27,6 @@ jobs:
cd TDengine
echo "commit_id=$(git rev-parse HEAD)" >> $GITHUB_OUTPUT
- name: Run sccache-cache
uses: mozilla-actions/[email protected]
- name: Cache server
id: cache-server
Expand All @@ -43,16 +39,14 @@ jobs:
if: steps.cache-server.outputs.cache-hit != 'true'
run: sudo apt install -y libgeos-dev

- name: Run sccache-cache
uses: mozilla-actions/[email protected]

- name: install TDengine
if: steps.cache-server.outputs.cache-hit != 'true'
run: |
cd TDengine
mkdir debug
cd debug
cmake .. -DBUILD_JDBC=false -DBUILD_TEST=off -DBUILD_HTTP=false -DVERNUMBER=3.9.9.9 -DCMAKE_C_COMPILER_LAUNCHER=sccache -DCMAKE_CXX_COMPILER_LAUNCHER=sccache
cmake .. -DBUILD_JDBC=false -DBUILD_TEST=off -DBUILD_HTTP=false -DVERNUMBER=3.9.9.9
make -j 4
- name: package
Expand Down Expand Up @@ -87,7 +81,7 @@ jobs:
needs: build
strategy:
matrix:
go: [ '1.14', '1.19' ]
go: [ '1.14', 'stable' ]
name: Go ${{ matrix.go }}
steps:
- name: get cache server
Expand All @@ -99,11 +93,6 @@ jobs:
restore-keys: |
${{ runner.os }}-build-${{ github.ref_name }}-
- name: checkout
uses: actions/checkout@v3
with:
path: 'driver-go'

- name: prepare install
run: sudo apt install -y libgeos-dev

Expand All @@ -112,6 +101,9 @@ jobs:
tar -zxvf server.tar.gz
cd release && sudo sh install.sh
- name: checkout
uses: actions/checkout@v3

- name: shell
run: |
cat >start.sh<<EOF
Expand All @@ -128,7 +120,15 @@ jobs:
uses: actions/setup-go@v4
with:
go-version: ${{ matrix.go }}
cache-dependency-path: driver-go/go.sum
cache-dependency-path: go.sum

- name: Test
run: cd ./driver-go && go test -v ./...
run: sudo go test -v --count=1 -coverprofile=coverage.txt -covermode=atomic ./...

- name: Upload coverage to Codecov
if: ${{ matrix.go }} == 'stable'
uses: codecov/codecov-action@v4-beta
with:
files: ./coverage.txt
env:
CODECOV_TOKEN: ${{ secrets.CODECOV_ORG_TOKEN }}
22 changes: 12 additions & 10 deletions README-CN.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,17 @@

v2 与 v3 版本不兼容,与 TDengine 版本对应如下:

| **driver-go 版本** | **TDengine 版本** | **主要功能** |
|------------------|-----------------|--------------------------------|
| v3.5.0 | 3.0.5.0+ | 获取消费进度及按照指定进度开始消费 |
| v3.3.1 | 3.0.4.1+ | 基于 websocket 的 schemaless 协议写入 |
| v3.1.0 | 3.0.2.2+ | 提供贴近 kafka 的订阅 api |
| v3.0.4 | 3.0.2.2+ | 新增 request id 相关接口 |
| v3.0.3 | 3.0.1.5+ | 基于 websocket 的 statement 写入 |
| v3.0.2 | 3.0.1.5+ | 基于 websocket 的数据查询和写入 |
| v3.0.1 | 3.0.0.0+ | 基于 websocket 的消息订阅 |
| v3.0.0 | 3.0.0.0+ | 适配 TDengine 3.0 查询和写入 |
| **driver-go 版本** | **TDengine 版本** | **主要功能** |
|------------------|----------------------|--------------------------------|
| v3.5.1 | 3.2.1.0+ / 3.1.1.13+ | 原生 stmt 查询和 geometry 类型支持 |
| v3.5.0 | 3.0.5.0+ | 获取消费进度及按照指定进度开始消费 |
| v3.3.1 | 3.0.4.1+ | 基于 websocket 的 schemaless 协议写入 |
| v3.1.0 | 3.0.2.2+ | 提供贴近 kafka 的订阅 api |
| v3.0.4 | 3.0.2.2+ | 新增 request id 相关接口 |
| v3.0.3 | 3.0.1.5+ | 基于 websocket 的 statement 写入 |
| v3.0.2 | 3.0.1.5+ | 基于 websocket 的数据查询和写入 |
| v3.0.1 | 3.0.0.0+ | 基于 websocket 的消息订阅 |
| v3.0.0 | 3.0.0.0+ | 适配 TDengine 3.0 查询和写入 |

## 安装

Expand Down Expand Up @@ -483,6 +484,7 @@ DSN 格式为:

- `writeTimeout` 通过 websocket 发送数据的超时时间。
- `readTimeout` 通过 websocket 接收响应数据的超时时间。
- `enableCompression` 是否压缩传输数据,默认为 `false` 不发送压缩数据。

## 通过 websocket 使用 tmq

Expand Down
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ v2 is not compatible with v3 version and corresponds to the TDengine version as

| **driver-go version** | **TDengine version** | **major features** |
|-----------------------|----------------------|----------------------------------------|
| v3.5.1 | 3.2.1.0+ / 3.1.1.13+ | native stmt query and geometry support |
| v3.5.0 | 3.0.5.0+ | tmq: get assignment and seek offset |
| v3.3.1 | 3.0.4.1+ | schemaless insert over websocket |
| v3.1.0 | 3.0.2.2+ | provide tmq apis close to kafka |
Expand Down Expand Up @@ -484,6 +485,7 @@ Parameters:

- `writeTimeout` The timeout to send data via websocket.
- `readTimeout` The timeout to receive response data via websocket.
- `enableCompression` Whether to compress the transmitted data, the default is `false` and no compressed data is sent.

## Using tmq over websocket

Expand Down
39 changes: 18 additions & 21 deletions af/tmq/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ import (
)

type Consumer struct {
cConsumer unsafe.Pointer
cConsumer unsafe.Pointer
dataParser *parser.TMQRawDataParser
}

// NewConsumer Create new TMQ consumer with TMQ config
Expand All @@ -28,7 +29,8 @@ func NewConsumer(conf *tmq.ConfigMap) (*Consumer, error) {
return nil, err
}
consumer := &Consumer{
cConsumer: cConsumer,
cConsumer: cConsumer,
dataParser: parser.NewTMQRawDataParser(),
}
return consumer, nil
}
Expand Down Expand Up @@ -176,27 +178,22 @@ func (c *Consumer) getMeta(message unsafe.Pointer) (*tmq.Meta, error) {
}

func (c *Consumer) getData(message unsafe.Pointer) ([]*tmq.Data, error) {
errCode, raw := wrapper.TMQGetRaw(message)
if errCode != taosError.SUCCESS {
errStr := wrapper.TaosErrorStr(message)
err := taosError.NewError(int(errCode), errStr)
return nil, err
}
_, _, rawPtr := wrapper.ParseRawMeta(raw)
blockInfos, err := c.dataParser.Parse(rawPtr)
if err != nil {
return nil, err
}
var tmqData []*tmq.Data
for {
blockSize, errCode, block := wrapper.TaosFetchRawBlock(message)
if errCode != int(taosError.SUCCESS) {
errStr := wrapper.TaosErrorStr(message)
err := taosError.NewError(errCode, errStr)
return nil, err
}
if blockSize == 0 {
break
}
tableName := wrapper.TMQGetTableName(message)
fileCount := wrapper.TaosNumFields(message)
rh, err := wrapper.ReadColumn(message, fileCount)
if err != nil {
return nil, err
}
precision := wrapper.TaosResultPrecision(message)
for i := 0; i < len(blockInfos); i++ {
tmqData = append(tmqData, &tmq.Data{
TableName: tableName,
Data: parser.ReadBlock(block, blockSize, rh.ColTypes, precision),
TableName: blockInfos[i].TableName,
Data: parser.ReadBlockSimple(blockInfos[i].RawBlock, blockInfos[i].Precision),
})
}
return tmqData, nil
Expand Down
Loading

0 comments on commit 575719f

Please sign in to comment.