diff --git a/af/tmq/consumer_test.go b/af/tmq/consumer_test.go index c41e0a7..7b5570c 100644 --- a/af/tmq/consumer_test.go +++ b/af/tmq/consumer_test.go @@ -123,7 +123,7 @@ func TestTmq(t *testing.T) { t.Log(position) offsets, err := consumer.Position([]tmq.TopicPartition{e.TopicPartition}) assert.NoError(t, err) - consumer.CommitOffsets(offsets) + _, err = consumer.CommitOffsets(offsets) assert.NoError(t, err) ass, err = consumer.Assignment() t.Log(ass) @@ -238,7 +238,6 @@ func TestSeek(t *testing.T) { assert.NoError(t, err) assert.Equal(t, vgroups, len(assignment)) for i := 0; i < len(assignment); i++ { - assert.Greater(t, assignment[i].Offset, tmq.Offset(0)) assert.Equal(t, topic, *assignment[i].Topic) } @@ -287,7 +286,6 @@ func TestSeek(t *testing.T) { assert.NoError(t, err) assert.Equal(t, vgroups, len(assignment)) for i := 0; i < len(assignment); i++ { - assert.Greater(t, assignment[i].Offset, tmq.Offset(0)) assert.Equal(t, topic, *assignment[i].Topic) } consumer.Close() diff --git a/taosRestful/driver_test.go b/taosRestful/driver_test.go index 77d6afa..9b474c6 100644 --- a/taosRestful/driver_test.go +++ b/taosRestful/driver_test.go @@ -245,33 +245,33 @@ func TestChinese(t *testing.T) { } defer db.Close() defer func() { - _, err = db.Exec("drop database if exists test_chinese") + _, err = db.Exec("drop database if exists test_chinese_rs") if err != nil { t.Error(err) return } }() - _, err = db.Exec("create database if not exists test_chinese") + _, err = db.Exec("create database if not exists test_chinese_rs") if err != nil { t.Error(err) return } - _, err = db.Exec("drop table if exists test_chinese.chinese") + _, err = db.Exec("drop table if exists test_chinese_rs.chinese") if err != nil { t.Error(err) return } - _, err = db.Exec("create table if not exists test_chinese.chinese(ts timestamp,v nchar(32))") + _, err = db.Exec("create table if not exists test_chinese_rs.chinese(ts timestamp,v nchar(32))") if err != nil { t.Error(err) return } - _, err = db.Exec(`INSERT INTO test_chinese.chinese (ts, v) VALUES (?, ?)`, "1641010332000", "'阴天'") + _, err = db.Exec(`INSERT INTO test_chinese_rs.chinese (ts, v) VALUES (?, ?)`, "1641010332000", "'阴天'") if err != nil { t.Error(err) return } - rows, err := db.Query("select * from test_chinese.chinese") + rows, err := db.Query("select * from test_chinese_rs.chinese") if err != nil { t.Error(err) return diff --git a/taosSql/driver_test.go b/taosSql/driver_test.go index 9dfc33c..d12cd18 100644 --- a/taosSql/driver_test.go +++ b/taosSql/driver_test.go @@ -516,33 +516,33 @@ func TestChinese(t *testing.T) { } defer db.Close() defer func() { - _, err = db.Exec("drop database if exists test_chinese") + _, err = db.Exec("drop database if exists test_chinese_native") if err != nil { t.Error(err) return } }() - _, err = db.Exec("create database if not exists test_chinese") + _, err = db.Exec("create database if not exists test_chinese_native") if err != nil { t.Error(err) return } - _, err = db.Exec("drop table if exists test_chinese.chinese") + _, err = db.Exec("drop table if exists test_chinese_native.chinese") if err != nil { t.Error(err) return } - _, err = db.Exec("create table if not exists test_chinese.chinese(ts timestamp,v nchar(32))") + _, err = db.Exec("create table if not exists test_chinese_native.chinese(ts timestamp,v nchar(32))") if err != nil { t.Error(err) return } - _, err = db.Exec(`INSERT INTO test_chinese.chinese (ts, v) VALUES (?, ?)`, "1641010332000", "'阴天'") + _, err = db.Exec(`INSERT INTO test_chinese_native.chinese (ts, v) VALUES (?, ?)`, "1641010332000", "'阴天'") if err != nil { t.Error(err) return } - rows, err := db.Query("select * from test_chinese.chinese") + rows, err := db.Query("select * from test_chinese_native.chinese") if err != nil { t.Error(err) return diff --git a/taosWS/driver_test.go b/taosWS/driver_test.go index da3543d..389be64 100644 --- a/taosWS/driver_test.go +++ b/taosWS/driver_test.go @@ -223,33 +223,33 @@ func TestChinese(t *testing.T) { } defer db.Close() defer func() { - _, err = db.Exec("drop database if exists test_chinese") + _, err = db.Exec("drop database if exists test_chinese_ws") if err != nil { t.Error(err) return } }() - _, err = db.Exec("create database if not exists test_chinese") + _, err = db.Exec("create database if not exists test_chinese_ws") if err != nil { t.Error(err) return } - _, err = db.Exec("drop table if exists test_chinese.chinese") + _, err = db.Exec("drop table if exists test_chinese_ws.chinese") if err != nil { t.Error(err) return } - _, err = db.Exec("create table if not exists test_chinese.chinese(ts timestamp,v nchar(32))") + _, err = db.Exec("create table if not exists test_chinese_ws.chinese(ts timestamp,v nchar(32))") if err != nil { t.Error(err) return } - _, err = db.Exec(`INSERT INTO test_chinese.chinese (ts, v) VALUES (?, ?)`, "1641010332000", "'阴天'") + _, err = db.Exec(`INSERT INTO test_chinese_ws.chinese (ts, v) VALUES (?, ?)`, "1641010332000", "'阴天'") if err != nil { t.Error(err) return } - rows, err := db.Query("select * from test_chinese.chinese") + rows, err := db.Query("select * from test_chinese_ws.chinese") if err != nil { t.Error(err) return diff --git a/wrapper/tmq_test.go b/wrapper/tmq_test.go index 3498d41..c264aa5 100644 --- a/wrapper/tmq_test.go +++ b/wrapper/tmq_test.go @@ -1147,36 +1147,70 @@ func TestTMQModify(t *testing.T) { } d, err := query(targetConn, "describe stb") assert.NoError(t, err) - assert.Equal(t, [][]driver.Value{ - {"ts", "TIMESTAMP", int32(8), ""}, - {"c1", "BOOL", int32(1), ""}, - {"c2", "TINYINT", int32(1), ""}, - {"c3", "SMALLINT", int32(2), ""}, - {"c4", "INT", int32(4), ""}, - {"c5", "BIGINT", int32(8), ""}, - {"c6", "TINYINT UNSIGNED", int32(1), ""}, - {"c7", "SMALLINT UNSIGNED", int32(2), ""}, - {"c8", "INT UNSIGNED", int32(4), ""}, - {"c9", "BIGINT UNSIGNED", int32(8), ""}, - {"c10", "FLOAT", int32(4), ""}, - {"c11", "DOUBLE", int32(8), ""}, - {"c12", "VARCHAR", int32(20), ""}, - {"c13", "NCHAR", int32(20), ""}, - {"tts", "TIMESTAMP", int32(8), "TAG"}, - {"tc1", "BOOL", int32(1), "TAG"}, - {"tc2", "TINYINT", int32(1), "TAG"}, - {"tc3", "SMALLINT", int32(2), "TAG"}, - {"tc4", "INT", int32(4), "TAG"}, - {"tc5", "BIGINT", int32(8), "TAG"}, - {"tc6", "TINYINT UNSIGNED", int32(1), "TAG"}, - {"tc7", "SMALLINT UNSIGNED", int32(2), "TAG"}, - {"tc8", "INT UNSIGNED", int32(4), "TAG"}, - {"tc9", "BIGINT UNSIGNED", int32(8), "TAG"}, - {"tc10", "FLOAT", int32(4), "TAG"}, - {"tc11", "DOUBLE", int32(8), "TAG"}, - {"tc12", "VARCHAR", int32(20), "TAG"}, - {"tc13", "NCHAR", int32(20), "TAG"}, - }, d) + if len(d[0]) == 4 { + assert.Equal(t, [][]driver.Value{ + {"ts", "TIMESTAMP", int32(8), ""}, + {"c1", "BOOL", int32(1), ""}, + {"c2", "TINYINT", int32(1), ""}, + {"c3", "SMALLINT", int32(2), ""}, + {"c4", "INT", int32(4), ""}, + {"c5", "BIGINT", int32(8), ""}, + {"c6", "TINYINT UNSIGNED", int32(1), ""}, + {"c7", "SMALLINT UNSIGNED", int32(2), ""}, + {"c8", "INT UNSIGNED", int32(4), ""}, + {"c9", "BIGINT UNSIGNED", int32(8), ""}, + {"c10", "FLOAT", int32(4), ""}, + {"c11", "DOUBLE", int32(8), ""}, + {"c12", "VARCHAR", int32(20), ""}, + {"c13", "NCHAR", int32(20), ""}, + {"tts", "TIMESTAMP", int32(8), "TAG"}, + {"tc1", "BOOL", int32(1), "TAG"}, + {"tc2", "TINYINT", int32(1), "TAG"}, + {"tc3", "SMALLINT", int32(2), "TAG"}, + {"tc4", "INT", int32(4), "TAG"}, + {"tc5", "BIGINT", int32(8), "TAG"}, + {"tc6", "TINYINT UNSIGNED", int32(1), "TAG"}, + {"tc7", "SMALLINT UNSIGNED", int32(2), "TAG"}, + {"tc8", "INT UNSIGNED", int32(4), "TAG"}, + {"tc9", "BIGINT UNSIGNED", int32(8), "TAG"}, + {"tc10", "FLOAT", int32(4), "TAG"}, + {"tc11", "DOUBLE", int32(8), "TAG"}, + {"tc12", "VARCHAR", int32(20), "TAG"}, + {"tc13", "NCHAR", int32(20), "TAG"}, + }, d) + } else { + assert.Equal(t, [][]driver.Value{ + {"ts", "TIMESTAMP", int32(8), "", ""}, + {"c1", "BOOL", int32(1), "", ""}, + {"c2", "TINYINT", int32(1), "", ""}, + {"c3", "SMALLINT", int32(2), "", ""}, + {"c4", "INT", int32(4), "", ""}, + {"c5", "BIGINT", int32(8), "", ""}, + {"c6", "TINYINT UNSIGNED", int32(1), "", ""}, + {"c7", "SMALLINT UNSIGNED", int32(2), "", ""}, + {"c8", "INT UNSIGNED", int32(4), "", ""}, + {"c9", "BIGINT UNSIGNED", int32(8), "", ""}, + {"c10", "FLOAT", int32(4), "", ""}, + {"c11", "DOUBLE", int32(8), "", ""}, + {"c12", "VARCHAR", int32(20), "", ""}, + {"c13", "NCHAR", int32(20), "", ""}, + {"tts", "TIMESTAMP", int32(8), "TAG", ""}, + {"tc1", "BOOL", int32(1), "TAG", ""}, + {"tc2", "TINYINT", int32(1), "TAG", ""}, + {"tc3", "SMALLINT", int32(2), "TAG", ""}, + {"tc4", "INT", int32(4), "TAG", ""}, + {"tc5", "BIGINT", int32(8), "TAG", ""}, + {"tc6", "TINYINT UNSIGNED", int32(1), "TAG", ""}, + {"tc7", "SMALLINT UNSIGNED", int32(2), "TAG", ""}, + {"tc8", "INT UNSIGNED", int32(4), "TAG", ""}, + {"tc9", "BIGINT UNSIGNED", int32(8), "TAG", ""}, + {"tc10", "FLOAT", int32(4), "TAG", ""}, + {"tc11", "DOUBLE", int32(8), "TAG", ""}, + {"tc12", "VARCHAR", int32(20), "TAG", ""}, + {"tc13", "NCHAR", int32(20), "TAG", ""}, + }, d) + } + }) TMQUnsubscribe(tmq) @@ -1620,7 +1654,7 @@ func TestTMQPosition(t *testing.T) { } vgID := assignment[0].VGroupID position := TMQPosition(tmq, "test_tmq_position_topic", vgID) - assert.Less(t, position, int64(0)) + assert.Equal(t, position, int64(0)) haveMessage := false for i := 0; i < 3; i++ { message := TMQConsumerPoll(tmq, 500) @@ -1726,7 +1760,7 @@ func TestTMQCommitOffset(t *testing.T) { t.Fatal(errors.NewError(int(code), TMQErr2Str(code))) } committed = TMQCommitted(tmq, "test_tmq_commit_offset_topic", vgID) - assert.Equal(t, int64(1), committed) + assert.Equal(t, int64(offset), committed) TaosFreeResult(message) break } @@ -1833,7 +1867,7 @@ func TestTMQCommitOffsetAsync(t *testing.T) { timer.Stop() } committed = TMQCommitted(tmq, topic, vgID) - assert.Equal(t, int64(1), committed) + assert.Equal(t, int64(offset), committed) TaosFreeResult(message) break } diff --git a/ws/client/conn.go b/ws/client/conn.go index 493a7f1..37a5dd2 100644 --- a/ws/client/conn.go +++ b/ws/client/conn.go @@ -87,7 +87,7 @@ func NewClient(conn *websocket.Conn, sendChanLength uint) *Client { } func (c *Client) ReadPump() { - c.conn.SetReadLimit(common.BufferSize4M) + c.conn.SetReadLimit(0) c.conn.SetReadDeadline(time.Now().Add(c.PongWait)) c.conn.SetPongHandler(func(string) error { c.conn.SetReadDeadline(time.Now().Add(c.PongWait)) @@ -105,9 +105,9 @@ func (c *Client) ReadPump() { } switch messageType { case websocket.TextMessage: - c.TextMessageHandler(message) + go c.TextMessageHandler(message) case websocket.BinaryMessage: - c.BinaryMessageHandler(message) + go c.BinaryMessageHandler(message) } } } diff --git a/ws/tmq/consumer.go b/ws/tmq/consumer.go index b3ebced..dac2fdf 100644 --- a/ws/tmq/consumer.go +++ b/ws/tmq/consumer.go @@ -291,6 +291,10 @@ const ( TMQUnsubscribe = "unsubscribe" TMQGetTopicAssignment = "assignment" TMQSeek = "seek" + TMQCommitOffset = "commit_offset" + TMQCommitted = "committed" + TMQPosition = "position" + TMQListTopics = "list_topics" ) var ClosedErr = errors.New("connection closed") @@ -426,6 +430,12 @@ func (c *Consumer) Poll(timeoutMs int) tmq.Event { return tmq.NewTMQErrorWithErr(err) } result.SetData(data) + topic := resp.Topic + result.TopicPartition = tmq.TopicPartition{ + Topic: &topic, + Partition: resp.VgroupID, + Offset: tmq.Offset(resp.Offset), + } return result case common.TMQ_RES_TABLE_META: result := &tmq.MetaMessage{} @@ -436,6 +446,12 @@ func (c *Consumer) Poll(timeoutMs int) tmq.Event { if err != nil { return tmq.NewTMQErrorWithErr(err) } + topic := resp.Topic + result.TopicPartition = tmq.TopicPartition{ + Topic: &topic, + Partition: resp.VgroupID, + Offset: tmq.Offset(resp.Offset), + } result.SetMeta(meta) return result case common.TMQ_RES_METADATA: @@ -455,6 +471,12 @@ func (c *Consumer) Poll(timeoutMs int) tmq.Event { Meta: meta, Data: data, }) + topic := resp.Topic + result.TopicPartition = tmq.TopicPartition{ + Topic: &topic, + Partition: resp.VgroupID, + Offset: tmq.Offset(resp.Offset), + } return result default: return tmq.NewTMQErrorWithErr(err) @@ -615,7 +637,11 @@ func (c *Consumer) doCommit(messageID uint64) ([]tmq.TopicPartition, error) { if resp.Code != 0 { return nil, taosErrors.NewError(resp.Code, resp.Message) } - return nil, nil + partitions, err := c.Assignment() + if err != nil { + return nil, err + } + return c.Committed(partitions, 0) } func (c *Consumer) Unsubscribe() error { @@ -743,3 +769,143 @@ func (c *Consumer) Seek(partition tmq.TopicPartition, ignoredTimeoutMs int) erro } return nil } + +func (c *Consumer) Committed(partitions []tmq.TopicPartition, timeoutMs int) (offsets []tmq.TopicPartition, err error) { + offsets = make([]tmq.TopicPartition, len(partitions)) + reqID := c.generateReqID() + req := &CommittedReq{ + ReqID: reqID, + TopicVgroupIDs: make([]TopicVgroupID, len(partitions)), + } + for i := 0; i < len(partitions); i++ { + req.TopicVgroupIDs[i] = TopicVgroupID{ + Topic: *partitions[i].Topic, + VgroupID: partitions[i].Partition, + } + } + args, err := client.JsonI.Marshal(req) + if err != nil { + return nil, err + } + action := &client.WSAction{ + Action: TMQCommitted, + Args: args, + } + envelope := c.client.GetEnvelope() + err = client.JsonI.NewEncoder(envelope.Msg).Encode(action) + if err != nil { + c.client.PutEnvelope(envelope) + return nil, err + } + respBytes, err := c.sendText(reqID, envelope) + if err != nil { + return nil, err + } + var resp CommittedResp + err = client.JsonI.Unmarshal(respBytes, &resp) + if err != nil { + return nil, err + } + if resp.Code != 0 { + return nil, taosErrors.NewError(resp.Code, resp.Message) + } + for i := 0; i < len(resp.Committed); i++ { + offsets[i] = tmq.TopicPartition{ + Topic: partitions[i].Topic, + Partition: partitions[i].Partition, + Offset: tmq.Offset(resp.Committed[i]), + } + } + return offsets, nil +} + +func (c *Consumer) CommitOffsets(offsets []tmq.TopicPartition) ([]tmq.TopicPartition, error) { + if c.err != nil { + return nil, c.err + } + for i := 0; i < len(offsets); i++ { + reqID := c.generateReqID() + req := &CommitOffsetReq{ + ReqID: reqID, + Topic: *offsets[i].Topic, + VgroupID: offsets[i].Partition, + Offset: int64(offsets[i].Offset), + } + args, err := client.JsonI.Marshal(req) + if err != nil { + return nil, err + } + action := &client.WSAction{ + Action: TMQCommitOffset, + Args: args, + } + envelope := c.client.GetEnvelope() + err = client.JsonI.NewEncoder(envelope.Msg).Encode(action) + if err != nil { + c.client.PutEnvelope(envelope) + return nil, err + } + respBytes, err := c.sendText(reqID, envelope) + if err != nil { + return nil, err + } + var resp CommitOffsetResp + err = client.JsonI.Unmarshal(respBytes, &resp) + if err != nil { + return nil, err + } + if resp.Code != 0 { + return nil, taosErrors.NewError(resp.Code, resp.Message) + } + } + return c.Committed(offsets, 0) +} + +func (c *Consumer) Position(partitions []tmq.TopicPartition) (offsets []tmq.TopicPartition, err error) { + offsets = make([]tmq.TopicPartition, len(partitions)) + reqID := c.generateReqID() + req := &PositionReq{ + ReqID: reqID, + TopicVgroupIDs: make([]TopicVgroupID, len(partitions)), + } + for i := 0; i < len(partitions); i++ { + req.TopicVgroupIDs[i] = TopicVgroupID{ + Topic: *partitions[i].Topic, + VgroupID: partitions[i].Partition, + } + } + args, err := client.JsonI.Marshal(req) + if err != nil { + return nil, err + } + action := &client.WSAction{ + Action: TMQPosition, + Args: args, + } + envelope := c.client.GetEnvelope() + err = client.JsonI.NewEncoder(envelope.Msg).Encode(action) + if err != nil { + c.client.PutEnvelope(envelope) + return nil, err + } + respBytes, err := c.sendText(reqID, envelope) + if err != nil { + return nil, err + } + var resp PositionResp + err = client.JsonI.Unmarshal(respBytes, &resp) + if err != nil { + return nil, err + } + if resp.Code != 0 { + return nil, taosErrors.NewError(resp.Code, resp.Message) + } + for i := 0; i < len(resp.Position); i++ { + offsets[i] = tmq.TopicPartition{ + Topic: partitions[i].Topic, + Partition: partitions[i].Partition, + Offset: tmq.Offset(resp.Position[i]), + } + } + return offsets, nil +} diff --git a/ws/tmq/consumer_test.go b/ws/tmq/consumer_test.go index 493c341..bd7833b 100644 --- a/ws/tmq/consumer_test.go +++ b/ws/tmq/consumer_test.go @@ -22,7 +22,7 @@ func prepareEnv() error { "drop topic if exists test_ws_tmq_topic", "drop database if exists test_ws_tmq", "create database test_ws_tmq WAL_RETENTION_PERIOD 86400", - "create topic test_ws_tmq_topic with meta as database test_ws_tmq", + "create topic test_ws_tmq_topic as database test_ws_tmq", } for _, step := range steps { err = doRequest(step) @@ -121,19 +121,18 @@ func TestConsumer(t *testing.T) { } }() consumer, err := NewConsumer(&tmq.ConfigMap{ - "ws.url": "ws://127.0.0.1:6041/rest/tmq", - "ws.message.channelLen": uint(0), - "ws.message.timeout": common.DefaultMessageTimeout, - "ws.message.writeWait": common.DefaultWriteWait, - "td.connect.user": "root", - "td.connect.pass": "taosdata", - "group.id": "test", - "client.id": "test_consumer", - "auto.offset.reset": "earliest", - "enable.auto.commit": "true", - "auto.commit.interval.ms": "5000", - "experimental.snapshot.enable": "true", - "msg.with.table.name": "true", + "ws.url": "ws://127.0.0.1:6041/rest/tmq", + "ws.message.channelLen": uint(0), + "ws.message.timeout": common.DefaultMessageTimeout, + "ws.message.writeWait": common.DefaultWriteWait, + "td.connect.user": "root", + "td.connect.pass": "taosdata", + "group.id": "test", + "client.id": "test_consumer", + "auto.offset.reset": "earliest", + "enable.auto.commit": "true", + "auto.commit.interval.ms": "5000", + "msg.with.table.name": "true", }) if err != nil { t.Error(err) @@ -146,10 +145,9 @@ func TestConsumer(t *testing.T) { t.Error(err) return } - gotMeta := false gotData := false for i := 0; i < 5; i++ { - if gotData && gotMeta { + if gotData { return } ev := consumer.Poll(0) @@ -177,84 +175,23 @@ func TestConsumer(t *testing.T) { assert.Equal(t, float64(11.123), v[11].(float64)) assert.Equal(t, "binary", v[12].(string)) assert.Equal(t, "nchar", v[13].(string)) - case *tmq.MetaMessage: - gotMeta = true - meta := e.Value().(*tmq.Meta) - assert.Equal(t, "test_ws_tmq", e.DBName()) - assert.Equal(t, "create", meta.Type) - assert.Equal(t, "t_all", meta.TableName) - assert.Equal(t, "normal", meta.TableType) - assert.Equal(t, []*tmq.Column{ - { - Name: "ts", - Type: 9, - Length: 0, - }, - { - Name: "c1", - Type: 1, - Length: 0, - }, - { - Name: "c2", - Type: 2, - Length: 0, - }, - { - Name: "c3", - Type: 3, - Length: 0, - }, - { - Name: "c4", - Type: 4, - Length: 0, - }, - { - Name: "c5", - Type: 5, - Length: 0, - }, - { - Name: "c6", - Type: 11, - Length: 0, - }, - { - Name: "c7", - Type: 12, - Length: 0, - }, - { - Name: "c8", - Type: 13, - Length: 0, - }, - { - Name: "c9", - Type: 14, - Length: 0, - }, - { - Name: "c10", - Type: 6, - Length: 0, - }, - { - Name: "c11", - Type: 7, - Length: 0, - }, - { - Name: "c12", - Type: 8, - Length: 20, - }, - { - Name: "c13", - Type: 10, - Length: 20, - }}, meta.Columns) + t.Log(e.Offset()) + ass, err := consumer.Assignment() + t.Log(ass) + committed, err := consumer.Committed(ass, 0) + t.Log(committed) + position, _ := consumer.Position(ass) + t.Log(position) + offsets, err := consumer.Position([]tmq.TopicPartition{e.TopicPartition}) + assert.NoError(t, err) + _, err = consumer.CommitOffsets(offsets) + assert.NoError(t, err) + ass, err = consumer.Assignment() + t.Log(ass) + committed, err = consumer.Committed(ass, 0) + t.Log(committed) + position, _ = consumer.Position(ass) + t.Log(position) case tmq.Error: t.Error(e) return @@ -262,7 +199,7 @@ func TestConsumer(t *testing.T) { t.Error("unexpected", e) return } - _, err = consumer.Commit() + } if err != nil { @@ -270,9 +207,6 @@ func TestConsumer(t *testing.T) { return } } - if !gotMeta { - t.Error("no meta got") - } if !gotData { t.Error("no data got") } diff --git a/ws/tmq/proto.go b/ws/tmq/proto.go index a067d63..d9b8c1d 100644 --- a/ws/tmq/proto.go +++ b/ws/tmq/proto.go @@ -145,3 +145,54 @@ type OffsetSeekResp struct { ReqID uint64 `json:"req_id"` Timing int64 `json:"timing"` } + +type CommittedReq struct { + ReqID uint64 `json:"req_id"` + TopicVgroupIDs []TopicVgroupID `json:"topic_vgroup_ids"` +} + +type CommittedResp struct { + Code int `json:"code"` + Message string `json:"message"` + Action string `json:"action"` + ReqID uint64 `json:"req_id"` + Timing int64 `json:"timing"` + Committed []int64 `json:"committed"` +} + +type TopicVgroupID struct { + Topic string `json:"topic"` + VgroupID int32 `json:"vgroup_id"` +} + +type CommitOffsetReq struct { + ReqID uint64 `json:"req_id"` + Topic string `json:"topic"` + VgroupID int32 `json:"vgroup_id"` + Offset int64 `json:"offset"` +} + +type CommitOffsetResp struct { + Code int `json:"code"` + Message string `json:"message"` + Action string `json:"action"` + ReqID uint64 `json:"req_id"` + Timing int64 `json:"timing"` + Topic string `json:"topic"` + VgroupID int32 `json:"vgroup_id"` + Offset int64 `json:"offset"` +} + +type PositionReq struct { + ReqID uint64 `json:"req_id"` + TopicVgroupIDs []TopicVgroupID `json:"topic_vgroup_ids"` +} + +type PositionResp struct { + Code int `json:"code"` + Message string `json:"message"` + Action string `json:"action"` + ReqID uint64 `json:"req_id"` + Timing int64 `json:"timing"` + Position []int64 `json:"position"` +}