Skip to content

Commit

Permalink
Merge pull request #189 from taosdata/enh/xftan/TD-25345
Browse files Browse the repository at this point in the history
enh: add commit_offset committed position over ws
  • Loading branch information
huskar-t authored Aug 24, 2023
2 parents e792740 + 69a35ed commit cab38e7
Show file tree
Hide file tree
Showing 9 changed files with 339 additions and 156 deletions.
4 changes: 1 addition & 3 deletions af/tmq/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func TestTmq(t *testing.T) {
t.Log(position)
offsets, err := consumer.Position([]tmq.TopicPartition{e.TopicPartition})
assert.NoError(t, err)
consumer.CommitOffsets(offsets)
_, err = consumer.CommitOffsets(offsets)
assert.NoError(t, err)
ass, err = consumer.Assignment()
t.Log(ass)
Expand Down Expand Up @@ -238,7 +238,6 @@ func TestSeek(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, vgroups, len(assignment))
for i := 0; i < len(assignment); i++ {
assert.Greater(t, assignment[i].Offset, tmq.Offset(0))
assert.Equal(t, topic, *assignment[i].Topic)
}

Expand Down Expand Up @@ -287,7 +286,6 @@ func TestSeek(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, vgroups, len(assignment))
for i := 0; i < len(assignment); i++ {
assert.Greater(t, assignment[i].Offset, tmq.Offset(0))
assert.Equal(t, topic, *assignment[i].Topic)
}
consumer.Close()
Expand Down
12 changes: 6 additions & 6 deletions taosRestful/driver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,33 +245,33 @@ func TestChinese(t *testing.T) {
}
defer db.Close()
defer func() {
_, err = db.Exec("drop database if exists test_chinese")
_, err = db.Exec("drop database if exists test_chinese_rs")
if err != nil {
t.Error(err)
return
}
}()
_, err = db.Exec("create database if not exists test_chinese")
_, err = db.Exec("create database if not exists test_chinese_rs")
if err != nil {
t.Error(err)
return
}
_, err = db.Exec("drop table if exists test_chinese.chinese")
_, err = db.Exec("drop table if exists test_chinese_rs.chinese")
if err != nil {
t.Error(err)
return
}
_, err = db.Exec("create table if not exists test_chinese.chinese(ts timestamp,v nchar(32))")
_, err = db.Exec("create table if not exists test_chinese_rs.chinese(ts timestamp,v nchar(32))")
if err != nil {
t.Error(err)
return
}
_, err = db.Exec(`INSERT INTO test_chinese.chinese (ts, v) VALUES (?, ?)`, "1641010332000", "'阴天'")
_, err = db.Exec(`INSERT INTO test_chinese_rs.chinese (ts, v) VALUES (?, ?)`, "1641010332000", "'阴天'")
if err != nil {
t.Error(err)
return
}
rows, err := db.Query("select * from test_chinese.chinese")
rows, err := db.Query("select * from test_chinese_rs.chinese")
if err != nil {
t.Error(err)
return
Expand Down
12 changes: 6 additions & 6 deletions taosSql/driver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -516,33 +516,33 @@ func TestChinese(t *testing.T) {
}
defer db.Close()
defer func() {
_, err = db.Exec("drop database if exists test_chinese")
_, err = db.Exec("drop database if exists test_chinese_native")
if err != nil {
t.Error(err)
return
}
}()
_, err = db.Exec("create database if not exists test_chinese")
_, err = db.Exec("create database if not exists test_chinese_native")
if err != nil {
t.Error(err)
return
}
_, err = db.Exec("drop table if exists test_chinese.chinese")
_, err = db.Exec("drop table if exists test_chinese_native.chinese")
if err != nil {
t.Error(err)
return
}
_, err = db.Exec("create table if not exists test_chinese.chinese(ts timestamp,v nchar(32))")
_, err = db.Exec("create table if not exists test_chinese_native.chinese(ts timestamp,v nchar(32))")
if err != nil {
t.Error(err)
return
}
_, err = db.Exec(`INSERT INTO test_chinese.chinese (ts, v) VALUES (?, ?)`, "1641010332000", "'阴天'")
_, err = db.Exec(`INSERT INTO test_chinese_native.chinese (ts, v) VALUES (?, ?)`, "1641010332000", "'阴天'")
if err != nil {
t.Error(err)
return
}
rows, err := db.Query("select * from test_chinese.chinese")
rows, err := db.Query("select * from test_chinese_native.chinese")
if err != nil {
t.Error(err)
return
Expand Down
12 changes: 6 additions & 6 deletions taosWS/driver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,33 +223,33 @@ func TestChinese(t *testing.T) {
}
defer db.Close()
defer func() {
_, err = db.Exec("drop database if exists test_chinese")
_, err = db.Exec("drop database if exists test_chinese_ws")
if err != nil {
t.Error(err)
return
}
}()
_, err = db.Exec("create database if not exists test_chinese")
_, err = db.Exec("create database if not exists test_chinese_ws")
if err != nil {
t.Error(err)
return
}
_, err = db.Exec("drop table if exists test_chinese.chinese")
_, err = db.Exec("drop table if exists test_chinese_ws.chinese")
if err != nil {
t.Error(err)
return
}
_, err = db.Exec("create table if not exists test_chinese.chinese(ts timestamp,v nchar(32))")
_, err = db.Exec("create table if not exists test_chinese_ws.chinese(ts timestamp,v nchar(32))")
if err != nil {
t.Error(err)
return
}
_, err = db.Exec(`INSERT INTO test_chinese.chinese (ts, v) VALUES (?, ?)`, "1641010332000", "'阴天'")
_, err = db.Exec(`INSERT INTO test_chinese_ws.chinese (ts, v) VALUES (?, ?)`, "1641010332000", "'阴天'")
if err != nil {
t.Error(err)
return
}
rows, err := db.Query("select * from test_chinese.chinese")
rows, err := db.Query("select * from test_chinese_ws.chinese")
if err != nil {
t.Error(err)
return
Expand Down
100 changes: 67 additions & 33 deletions wrapper/tmq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1147,36 +1147,70 @@ func TestTMQModify(t *testing.T) {
}
d, err := query(targetConn, "describe stb")
assert.NoError(t, err)
assert.Equal(t, [][]driver.Value{
{"ts", "TIMESTAMP", int32(8), ""},
{"c1", "BOOL", int32(1), ""},
{"c2", "TINYINT", int32(1), ""},
{"c3", "SMALLINT", int32(2), ""},
{"c4", "INT", int32(4), ""},
{"c5", "BIGINT", int32(8), ""},
{"c6", "TINYINT UNSIGNED", int32(1), ""},
{"c7", "SMALLINT UNSIGNED", int32(2), ""},
{"c8", "INT UNSIGNED", int32(4), ""},
{"c9", "BIGINT UNSIGNED", int32(8), ""},
{"c10", "FLOAT", int32(4), ""},
{"c11", "DOUBLE", int32(8), ""},
{"c12", "VARCHAR", int32(20), ""},
{"c13", "NCHAR", int32(20), ""},
{"tts", "TIMESTAMP", int32(8), "TAG"},
{"tc1", "BOOL", int32(1), "TAG"},
{"tc2", "TINYINT", int32(1), "TAG"},
{"tc3", "SMALLINT", int32(2), "TAG"},
{"tc4", "INT", int32(4), "TAG"},
{"tc5", "BIGINT", int32(8), "TAG"},
{"tc6", "TINYINT UNSIGNED", int32(1), "TAG"},
{"tc7", "SMALLINT UNSIGNED", int32(2), "TAG"},
{"tc8", "INT UNSIGNED", int32(4), "TAG"},
{"tc9", "BIGINT UNSIGNED", int32(8), "TAG"},
{"tc10", "FLOAT", int32(4), "TAG"},
{"tc11", "DOUBLE", int32(8), "TAG"},
{"tc12", "VARCHAR", int32(20), "TAG"},
{"tc13", "NCHAR", int32(20), "TAG"},
}, d)
if len(d[0]) == 4 {
assert.Equal(t, [][]driver.Value{
{"ts", "TIMESTAMP", int32(8), ""},
{"c1", "BOOL", int32(1), ""},
{"c2", "TINYINT", int32(1), ""},
{"c3", "SMALLINT", int32(2), ""},
{"c4", "INT", int32(4), ""},
{"c5", "BIGINT", int32(8), ""},
{"c6", "TINYINT UNSIGNED", int32(1), ""},
{"c7", "SMALLINT UNSIGNED", int32(2), ""},
{"c8", "INT UNSIGNED", int32(4), ""},
{"c9", "BIGINT UNSIGNED", int32(8), ""},
{"c10", "FLOAT", int32(4), ""},
{"c11", "DOUBLE", int32(8), ""},
{"c12", "VARCHAR", int32(20), ""},
{"c13", "NCHAR", int32(20), ""},
{"tts", "TIMESTAMP", int32(8), "TAG"},
{"tc1", "BOOL", int32(1), "TAG"},
{"tc2", "TINYINT", int32(1), "TAG"},
{"tc3", "SMALLINT", int32(2), "TAG"},
{"tc4", "INT", int32(4), "TAG"},
{"tc5", "BIGINT", int32(8), "TAG"},
{"tc6", "TINYINT UNSIGNED", int32(1), "TAG"},
{"tc7", "SMALLINT UNSIGNED", int32(2), "TAG"},
{"tc8", "INT UNSIGNED", int32(4), "TAG"},
{"tc9", "BIGINT UNSIGNED", int32(8), "TAG"},
{"tc10", "FLOAT", int32(4), "TAG"},
{"tc11", "DOUBLE", int32(8), "TAG"},
{"tc12", "VARCHAR", int32(20), "TAG"},
{"tc13", "NCHAR", int32(20), "TAG"},
}, d)
} else {
assert.Equal(t, [][]driver.Value{
{"ts", "TIMESTAMP", int32(8), "", ""},
{"c1", "BOOL", int32(1), "", ""},
{"c2", "TINYINT", int32(1), "", ""},
{"c3", "SMALLINT", int32(2), "", ""},
{"c4", "INT", int32(4), "", ""},
{"c5", "BIGINT", int32(8), "", ""},
{"c6", "TINYINT UNSIGNED", int32(1), "", ""},
{"c7", "SMALLINT UNSIGNED", int32(2), "", ""},
{"c8", "INT UNSIGNED", int32(4), "", ""},
{"c9", "BIGINT UNSIGNED", int32(8), "", ""},
{"c10", "FLOAT", int32(4), "", ""},
{"c11", "DOUBLE", int32(8), "", ""},
{"c12", "VARCHAR", int32(20), "", ""},
{"c13", "NCHAR", int32(20), "", ""},
{"tts", "TIMESTAMP", int32(8), "TAG", ""},
{"tc1", "BOOL", int32(1), "TAG", ""},
{"tc2", "TINYINT", int32(1), "TAG", ""},
{"tc3", "SMALLINT", int32(2), "TAG", ""},
{"tc4", "INT", int32(4), "TAG", ""},
{"tc5", "BIGINT", int32(8), "TAG", ""},
{"tc6", "TINYINT UNSIGNED", int32(1), "TAG", ""},
{"tc7", "SMALLINT UNSIGNED", int32(2), "TAG", ""},
{"tc8", "INT UNSIGNED", int32(4), "TAG", ""},
{"tc9", "BIGINT UNSIGNED", int32(8), "TAG", ""},
{"tc10", "FLOAT", int32(4), "TAG", ""},
{"tc11", "DOUBLE", int32(8), "TAG", ""},
{"tc12", "VARCHAR", int32(20), "TAG", ""},
{"tc13", "NCHAR", int32(20), "TAG", ""},
}, d)
}

})

TMQUnsubscribe(tmq)
Expand Down Expand Up @@ -1620,7 +1654,7 @@ func TestTMQPosition(t *testing.T) {
}
vgID := assignment[0].VGroupID
position := TMQPosition(tmq, "test_tmq_position_topic", vgID)
assert.Less(t, position, int64(0))
assert.Equal(t, position, int64(0))
haveMessage := false
for i := 0; i < 3; i++ {
message := TMQConsumerPoll(tmq, 500)
Expand Down Expand Up @@ -1726,7 +1760,7 @@ func TestTMQCommitOffset(t *testing.T) {
t.Fatal(errors.NewError(int(code), TMQErr2Str(code)))
}
committed = TMQCommitted(tmq, "test_tmq_commit_offset_topic", vgID)
assert.Equal(t, int64(1), committed)
assert.Equal(t, int64(offset), committed)
TaosFreeResult(message)
break
}
Expand Down Expand Up @@ -1833,7 +1867,7 @@ func TestTMQCommitOffsetAsync(t *testing.T) {
timer.Stop()
}
committed = TMQCommitted(tmq, topic, vgID)
assert.Equal(t, int64(1), committed)
assert.Equal(t, int64(offset), committed)
TaosFreeResult(message)
break
}
Expand Down
6 changes: 3 additions & 3 deletions ws/client/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func NewClient(conn *websocket.Conn, sendChanLength uint) *Client {
}

func (c *Client) ReadPump() {
c.conn.SetReadLimit(common.BufferSize4M)
c.conn.SetReadLimit(0)
c.conn.SetReadDeadline(time.Now().Add(c.PongWait))
c.conn.SetPongHandler(func(string) error {
c.conn.SetReadDeadline(time.Now().Add(c.PongWait))
Expand All @@ -105,9 +105,9 @@ func (c *Client) ReadPump() {
}
switch messageType {
case websocket.TextMessage:
c.TextMessageHandler(message)
go c.TextMessageHandler(message)
case websocket.BinaryMessage:
c.BinaryMessageHandler(message)
go c.BinaryMessageHandler(message)
}
}
}
Expand Down
Loading

0 comments on commit cab38e7

Please sign in to comment.