Skip to content

Commit

Permalink
Merge pull request #13 from databendcloud/fix/increace-ctx-timeout
Browse files Browse the repository at this point in the history
fix: increase the FetchMessage timeout
  • Loading branch information
hantmac authored Jun 3, 2024
2 parents 37190ec + 2cf3978 commit f6b1126
Show file tree
Hide file tree
Showing 3 changed files with 6 additions and 17 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ bend-ingest-kafka
--databend-table="db1.tbl" \
--data-format="json" \
--batch-size=100000 \
--batch-max-interval=300s
--batch-max-interval=300
```

#### config file mode
Expand Down Expand Up @@ -108,7 +108,7 @@ with `config.conf.json` and the table `default.kfk_test` will be created and the
| databendDSN | databend dsn | no | "http://localhost:8000" |
| databendTable | databend table | no | "db1.tbl" |
| batchSize | batch size | 1000 | 1000 |
| batchMaxInterval | batch max interval | 30 | 30s |
| batchMaxInterval | batch max interval | 30 | 30 |
| dataFormat | data format | json | "json" |
| workers | workers thread number | 1 | 1 |
| copyPurge | copy purge | false | false |
Expand Down
5 changes: 2 additions & 3 deletions batch_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func NewKafkaBatchReader(cfg *config.Config) *KafkaBatchReader {
})
return &KafkaBatchReader{
batchSize: cfg.BatchSize,
maxBatchInterval: time.Duration(cfg.BatchMaxInterval),
maxBatchInterval: time.Duration(cfg.BatchMaxInterval) * time.Second,
kafkaReader: kafkaReader,
}
}
Expand All @@ -84,7 +84,7 @@ func (br *KafkaBatchReader) Close() error {
}

func (br *KafkaBatchReader) fetchMessageWithTimeout(ctx context.Context, timeout time.Duration) (*kafka.Message, error) {
ctx, cancel := context.WithTimeout(ctx, timeout)
ctx, cancel := context.WithTimeout(ctx, 2*timeout)
defer cancel()

m, err := br.kafkaReader.FetchMessage(ctx)
Expand Down Expand Up @@ -148,7 +148,6 @@ _loop:
partition = lastMessage.Partition
key = string(lastMessage.Key)
createTime = lastMessage.Time

}

return &message.MessagesBatch{
Expand Down
14 changes: 2 additions & 12 deletions consume_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,24 +190,14 @@ func TestConsumerWithoutTransform(t *testing.T) {
}
w := NewConsumeWorker(cfg, "worker1", ig)
log.Printf("start consume")
w.stepBatch(context.TODO())
err = w.stepBatch(context.TODO())
assert.NoError(t, err)

result, err := db.Query("select * from test_ingest_raw")
assert.NoError(t, err)
count := 0
for result.Next() {
count += 1
var i64 int64
var u64 uint64
var f64 float64
var s string
var s2 string
var a16 []int16
var a8 []uint8
var d time.Time
var t time.Time
err = result.Scan(&i64, &u64, &f64, &s, &s2, &a16, &a8, &d, &t)
fmt.Println(i64, u64, f64, s, s2, a16, a8, d, t)
}

assert.NotEqual(t, 0, count)
Expand Down

0 comments on commit f6b1126

Please sign in to comment.