diff --git a/README.md b/README.md index ac49e93..95d48f1 100644 --- a/README.md +++ b/README.md @@ -49,7 +49,7 @@ bend-ingest-kafka --databend-table="db1.tbl" \ --data-format="json" \ --batch-size=100000 \ - --batch-max-interval=300s + --batch-max-interval=300 ``` #### config file mode @@ -108,7 +108,7 @@ with `config.conf.json` and the table `default.kfk_test` will be created and the | databendDSN | databend dsn | no | "http://localhost:8000" | | databendTable | databend table | no | "db1.tbl" | | batchSize | batch size | 1000 | 1000 | -| batchMaxInterval | batch max interval | 30 | 30s | +| batchMaxInterval | batch max interval | 30 | 30 | | dataFormat | data format | json | "json" | | workers | workers thread number | 1 | 1 | | copyPurge | copy purge | false | false | diff --git a/batch_reader.go b/batch_reader.go index b7ff877..37897d5 100644 --- a/batch_reader.go +++ b/batch_reader.go @@ -74,7 +74,7 @@ func NewKafkaBatchReader(cfg *config.Config) *KafkaBatchReader { }) return &KafkaBatchReader{ batchSize: cfg.BatchSize, - maxBatchInterval: time.Duration(cfg.BatchMaxInterval), + maxBatchInterval: time.Duration(cfg.BatchMaxInterval) * time.Second, kafkaReader: kafkaReader, } } @@ -84,7 +84,7 @@ func (br *KafkaBatchReader) Close() error { } func (br *KafkaBatchReader) fetchMessageWithTimeout(ctx context.Context, timeout time.Duration) (*kafka.Message, error) { - ctx, cancel := context.WithTimeout(ctx, timeout) + ctx, cancel := context.WithTimeout(ctx, 2*timeout) defer cancel() m, err := br.kafkaReader.FetchMessage(ctx) @@ -148,7 +148,6 @@ _loop: partition = lastMessage.Partition key = string(lastMessage.Key) createTime = lastMessage.Time - } return &message.MessagesBatch{ diff --git a/consume_worker_test.go b/consume_worker_test.go index 604adb1..309fb74 100644 --- a/consume_worker_test.go +++ b/consume_worker_test.go @@ -190,24 +190,14 @@ func TestConsumerWithoutTransform(t *testing.T) { } w := NewConsumeWorker(cfg, "worker1", ig) log.Printf("start consume") - w.stepBatch(context.TODO()) + err = w.stepBatch(context.TODO()) + assert.NoError(t, err) result, err := db.Query("select * from test_ingest_raw") assert.NoError(t, err) count := 0 for result.Next() { count += 1 - var i64 int64 - var u64 uint64 - var f64 float64 - var s string - var s2 string - var a16 []int16 - var a8 []uint8 - var d time.Time - var t time.Time - err = result.Scan(&i64, &u64, &f64, &s, &s2, &a16, &a8, &d, &t) - fmt.Println(i64, u64, f64, s, s2, a16, a8, d, t) } assert.NotEqual(t, 0, count)