Skip to content

Commit

Permalink
Merge pull request #183 from taosdata/enh/xftan/TD-25339
Browse files Browse the repository at this point in the history
enh: support tmq_position tmq_committed tmq_commit_offset_sync tmq_commit_offset_async
  • Loading branch information
huskar-t authored Aug 4, 2023
2 parents e4f8e72 + 09e8a3e commit e792740
Show file tree
Hide file tree
Showing 7 changed files with 615 additions and 42 deletions.
71 changes: 70 additions & 1 deletion af/tmq/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ func (c *Consumer) Poll(timeoutMs int) tmq.Event {
db := wrapper.TMQGetDBName(message)
resultType := wrapper.TMQGetResType(message)
offset := tmq.Offset(wrapper.TMQGetVgroupOffset(message))
vgID := wrapper.TMQGetVgroupID(message)
switch resultType {
case common.TMQ_RES_DATA:
result := &tmq.DataMessage{}
Expand All @@ -106,6 +107,11 @@ func (c *Consumer) Poll(timeoutMs int) tmq.Event {
}
result.SetData(data)
result.SetOffset(offset)
result.TopicPartition = tmq.TopicPartition{
Topic: &topic,
Partition: vgID,
Offset: offset,
}
wrapper.TaosFreeResult(message)
return result
case common.TMQ_RES_TABLE_META:
Expand All @@ -118,6 +124,11 @@ func (c *Consumer) Poll(timeoutMs int) tmq.Event {
}
result.SetMeta(meta)
result.SetOffset(offset)
result.TopicPartition = tmq.TopicPartition{
Topic: &topic,
Partition: vgID,
Offset: offset,
}
wrapper.TaosFreeResult(message)
return result
case common.TMQ_RES_METADATA:
Expand All @@ -137,6 +148,11 @@ func (c *Consumer) Poll(timeoutMs int) tmq.Event {
Meta: meta,
Data: data,
})
result.TopicPartition = tmq.TopicPartition{
Topic: &topic,
Partition: vgID,
Offset: offset,
}
wrapper.TaosFreeResult(message)
return result
default:
Expand Down Expand Up @@ -187,7 +203,16 @@ func (c *Consumer) getData(message unsafe.Pointer) ([]*tmq.Data, error) {
}

func (c *Consumer) Commit() ([]tmq.TopicPartition, error) {
return c.doCommit(nil)
errCode := wrapper.TMQCommitSync(c.cConsumer, nil)
if errCode != taosError.SUCCESS {
errStr := wrapper.TMQErr2Str(errCode)
return nil, taosError.NewError(int(errCode), errStr)
}
partitions, err := c.Assignment()
if err != nil {
return nil, err
}
return c.Committed(partitions, 0)
}

func (c *Consumer) doCommit(message unsafe.Pointer) ([]tmq.TopicPartition, error) {
Expand Down Expand Up @@ -235,6 +260,50 @@ func (c *Consumer) Seek(partition tmq.TopicPartition, ignoredTimeoutMs int) erro
return nil
}

func (c *Consumer) Committed(partitions []tmq.TopicPartition, timeoutMs int) (offsets []tmq.TopicPartition, err error) {
offsets = make([]tmq.TopicPartition, len(partitions))
for i := 0; i < len(partitions); i++ {
cOffset := wrapper.TMQCommitted(c.cConsumer, *partitions[i].Topic, partitions[i].Partition)
offset := tmq.Offset(cOffset)
if !offset.Valid() {
return nil, taosError.NewError(int(offset), wrapper.TMQErr2Str(int32(offset)))
}
offsets[i] = tmq.TopicPartition{
Topic: partitions[i].Topic,
Partition: partitions[i].Partition,
Offset: offset,
}
}
return
}

func (c *Consumer) CommitOffsets(offsets []tmq.TopicPartition) ([]tmq.TopicPartition, error) {
for i := 0; i < len(offsets); i++ {
errCode := wrapper.TMQCommitOffsetSync(c.cConsumer, *offsets[i].Topic, offsets[i].Partition, int64(offsets[i].Offset))
if errCode != taosError.SUCCESS {
errStr := wrapper.TMQErr2Str(errCode)
return nil, taosError.NewError(int(errCode), errStr)
}
}
return c.Committed(offsets, 0)
}

func (c *Consumer) Position(partitions []tmq.TopicPartition) (offsets []tmq.TopicPartition, err error) {
offsets = make([]tmq.TopicPartition, len(partitions))
for i := 0; i < len(partitions); i++ {
position := wrapper.TMQPosition(c.cConsumer, *partitions[i].Topic, partitions[i].Partition)
if position < 0 {
return nil, taosError.NewError(int(position), wrapper.TMQErr2Str(int32(position)))
}
offsets[i] = tmq.TopicPartition{
Topic: partitions[i].Topic,
Partition: partitions[i].Partition,
Offset: tmq.Offset(position),
}
}
return
}

// Close release consumer
func (c *Consumer) Close() error {
errCode := wrapper.TMQConsumerClose(c.cConsumer)
Expand Down
55 changes: 40 additions & 15 deletions af/tmq/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ func TestTmq(t *testing.T) {
return
}
sqls := []string{
"drop topic if exists test_tmq_common",
"drop database if exists af_test_tmq",
"create database if not exists af_test_tmq vgroups 2 WAL_RETENTION_PERIOD 86400",
"use af_test_tmq",
Expand All @@ -40,8 +41,8 @@ func TestTmq(t *testing.T) {
") tags(t1 int)",
"create table if not exists ct0 using all_type tags(1000)",
"create table if not exists ct1 using all_type tags(2000)",
"create table if not exists ct3 using all_type tags(3000)",
"create topic if not exists test_tmq_common as select ts,c1,c2,c3,c4,c5,c6,c7,c8,c9,c10,c11,c12,c13 from ct1",
"create table if not exists ct2 using all_type tags(3000)",
"create topic if not exists test_tmq_common as select ts,c1,c2,c3,c4,c5,c6,c7,c8,c9,c10,c11,c12,c13 from all_type",
}

defer func() {
Expand All @@ -56,20 +57,24 @@ func TestTmq(t *testing.T) {
assert.NoError(t, err)
}()
now := time.Now()
err = execWithoutResult(conn, fmt.Sprintf("insert into ct0 values('%s',true,2,3,4,5,6,7,8,9,10,11,'1','2')", now.Format(time.RFC3339Nano)))
assert.NoError(t, err)
err = execWithoutResult(conn, fmt.Sprintf("insert into ct1 values('%s',true,2,3,4,5,6,7,8,9,10,11,'1','2')", now.Format(time.RFC3339Nano)))
assert.NoError(t, err)
err = execWithoutResult(conn, fmt.Sprintf("insert into ct2 values('%s',true,2,3,4,5,6,7,8,9,10,11,'1','2')", now.Format(time.RFC3339Nano)))
assert.NoError(t, err)

consumer, err := NewConsumer(&tmq.ConfigMap{
"group.id": "test",
"auto.offset.reset": "earliest",
"td.connect.ip": "127.0.0.1",
"td.connect.user": "root",
"td.connect.pass": "taosdata",
"td.connect.port": "6030",
"client.id": "test_tmq_c",
"enable.auto.commit": "false",
"experimental.snapshot.enable": "true",
"msg.with.table.name": "true",
"group.id": "test",
"auto.offset.reset": "earliest",
"td.connect.ip": "127.0.0.1",
"td.connect.user": "root",
"td.connect.pass": "taosdata",
"td.connect.port": "6030",
"client.id": "test_tmq_c",
"enable.auto.commit": "false",
//"experimental.snapshot.enable": "true",
"msg.with.table.name": "true",
})
if err != nil {
t.Error(err)
Expand All @@ -80,6 +85,10 @@ func TestTmq(t *testing.T) {
t.Error(err)
return
}
ass, err := consumer.Assignment()
t.Log(ass)
position, _ := consumer.Position(ass)
t.Log(position)
haveMessage := false
for i := 0; i < 5; i++ {
ev := consumer.Poll(500)
Expand All @@ -105,8 +114,23 @@ func TestTmq(t *testing.T) {
assert.Equal(t, float64(11), row1[11].(float64))
assert.Equal(t, "1", row1[12].(string))
assert.Equal(t, "2", row1[13].(string))
_, err = consumer.Commit()
t.Log(e.Offset())
ass, err := consumer.Assignment()
t.Log(ass)
committed, err := consumer.Committed(ass, 0)
t.Log(committed)
position, _ := consumer.Position(ass)
t.Log(position)
offsets, err := consumer.Position([]tmq.TopicPartition{e.TopicPartition})
assert.NoError(t, err)
consumer.CommitOffsets(offsets)
assert.NoError(t, err)
ass, err = consumer.Assignment()
t.Log(ass)
committed, err = consumer.Committed(ass, 0)
t.Log(committed)
position, _ = consumer.Position(ass)
t.Log(position)
err = consumer.Unsubscribe()
assert.NoError(t, err)
err = consumer.Close()
Expand Down Expand Up @@ -201,9 +225,10 @@ func TestSeek(t *testing.T) {
for _, datum := range data {
dataCount += len(datum.Data)
}
time.Sleep(time.Second * 2)
_, err = consumer.Commit()
assert.NoError(t, err)
}
_, err = consumer.Commit()
assert.NoError(t, err)
}
assert.Equal(t, record, dataCount)

Expand Down
27 changes: 15 additions & 12 deletions common/tmq/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,11 @@ type Message interface {
}

type DataMessage struct {
dbName string
topic string
data []*Data
offset Offset
TopicPartition TopicPartition
dbName string
topic string
data []*Data
offset Offset
}

func (m *DataMessage) String() string {
Expand Down Expand Up @@ -109,10 +110,11 @@ func (m *DataMessage) Offset() Offset {
}

type MetaMessage struct {
dbName string
topic string
offset Offset
meta *Meta
TopicPartition TopicPartition
dbName string
topic string
offset Offset
meta *Meta
}

func (m *MetaMessage) Offset() Offset {
Expand Down Expand Up @@ -153,10 +155,11 @@ func (m *MetaMessage) Value() interface{} {
}

type MetaDataMessage struct {
dbName string
topic string
offset Offset
metaData *MetaData
TopicPartition TopicPartition
dbName string
topic string
offset Offset
metaData *MetaData
}

func (m *MetaDataMessage) Offset() Offset {
Expand Down
12 changes: 12 additions & 0 deletions common/tmq/tmq.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,22 @@ type CreateItem struct {

type Offset int64

const OffsetInvalid = Offset(-2147467247)

func (o Offset) String() string {
if o == OffsetInvalid {
return "unset"
}
return fmt.Sprintf("%d", int64(o))
}

func (o Offset) Valid() bool {
if o < 0 && o != OffsetInvalid {
return false
}
return true
}

type TopicPartition struct {
Topic *string
Partition int32
Expand Down
32 changes: 31 additions & 1 deletion wrapper/tmq.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ package wrapper
#include <string.h>
#include <taos.h>
extern void TMQCommitCB(tmq_t *, int32_t, void *param);
extern void TMQAutoCommitCB(tmq_t *, int32_t, void *param);
extern void TMQCommitOffsetCB(tmq_t *, int32_t, void *param);
*/
import "C"
import (
Expand Down Expand Up @@ -68,7 +70,7 @@ func TMQConfDestroy(conf unsafe.Pointer) {

// TMQConfSetAutoCommitCB DLL_EXPORT void tmq_conf_set_auto_commit_cb(tmq_conf_t *conf, tmq_commit_cb *cb, void *param);
func TMQConfSetAutoCommitCB(conf unsafe.Pointer, h cgo.Handle) {
C.tmq_conf_set_auto_commit_cb((*C.struct_tmq_conf_t)(conf), (*C.tmq_commit_cb)(C.TMQCommitCB), h.Pointer())
C.tmq_conf_set_auto_commit_cb((*C.struct_tmq_conf_t)(conf), (*C.tmq_commit_cb)(C.TMQAutoCommitCB), h.Pointer())
}

// TMQCommitAsync DLL_EXPORT void tmq_commit_async(tmq_t *tmq, const TAOS_RES *msg, tmq_commit_cb *cb, void *param);
Expand Down Expand Up @@ -297,3 +299,31 @@ func TMQFreeAssignment(assignment unsafe.Pointer) {
}
C.tmq_free_assignment((*C.tmq_topic_assignment)(assignment))
}

// TMQPosition DLL_EXPORT int64_t tmq_position(tmq_t *tmq, const char *pTopicName, int32_t vgId);
func TMQPosition(consumer unsafe.Pointer, topic string, vGroupID int32) int64 {
topicName := C.CString(topic)
defer C.free(unsafe.Pointer(topicName))
return int64(C.tmq_position((*C.tmq_t)(consumer), topicName, (C.int32_t)(vGroupID)))
}

// TMQCommitted DLL_EXPORT int64_t tmq_committed(tmq_t *tmq, const char *pTopicName, int32_t vgId);
func TMQCommitted(consumer unsafe.Pointer, topic string, vGroupID int32) int64 {
topicName := C.CString(topic)
defer C.free(unsafe.Pointer(topicName))
return int64(C.tmq_committed((*C.tmq_t)(consumer), topicName, (C.int32_t)(vGroupID)))
}

// TMQCommitOffsetSync DLL_EXPORT int32_t tmq_commit_offset_sync(tmq_t *tmq, const char *pTopicName, int32_t vgId, int64_t offset);
func TMQCommitOffsetSync(consumer unsafe.Pointer, topic string, vGroupID int32, offset int64) int32 {
topicName := C.CString(topic)
defer C.free(unsafe.Pointer(topicName))
return int32(C.tmq_commit_offset_sync((*C.tmq_t)(consumer), topicName, (C.int32_t)(vGroupID), (C.int64_t)(offset)))
}

// TMQCommitOffsetAsync DLL_EXPORT void tmq_commit_offset_async(tmq_t *tmq, const char *pTopicName, int32_t vgId, int64_t offset, tmq_commit_cb *cb, void *param);
func TMQCommitOffsetAsync(consumer unsafe.Pointer, topic string, vGroupID int32, offset int64, h cgo.Handle) {
topicName := C.CString(topic)
defer C.free(unsafe.Pointer(topicName))
C.tmq_commit_offset_async((*C.tmq_t)(consumer), topicName, (C.int32_t)(vGroupID), (C.int64_t)(offset), (*C.tmq_commit_cb)(C.TMQCommitOffsetCB), h.Pointer())
}
Loading

0 comments on commit e792740

Please sign in to comment.