Skip to content

Commit

Permalink
Merge pull request #23 from databendcloud/feat/support-replace-into
Browse files Browse the repository at this point in the history
feat: support replace into mode
  • Loading branch information
hantmac authored Jun 17, 2024
2 parents 45de1f9 + fcb137c commit e7d7ebf
Show file tree
Hide file tree
Showing 10 changed files with 581 additions and 33 deletions.
40 changes: 22 additions & 18 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -98,22 +98,26 @@ with `config.conf.json` and the table `default.kfk_test` will be created and the


## Parameter References
| Parameter | Description | Default | example |
|-----------------------|-------------------------|-------------------|-------------------------------|
| Parameter | Description | Default | example |
|-----------------------|-------------------------|-------------------|---------------------------------|
| kafkaBootstrapServers | kafka bootstrap servers | "127.0.0.1:64103" | "127.0.0.1:9092,127.0.0.2:9092" |
| kafkaTopic | kafka topic | "test" | "test" |
| KafkaConsumerGroup | kafka consumer group | "kafka-bend-ingest" | "test" |
| mockData | mock data | "" | "" |
| isJsonTransform | is json transform | true | true |
| databendDSN | databend dsn | no | "http://localhost:8000" |
| databendTable | databend table | no | "db1.tbl" |
| batchSize | batch size | 1000 | 1000 |
| batchMaxInterval | batch max interval | 30 | 30 |
| dataFormat | data format | json | "json" |
| workers | workers thread number | 1 | 1 |
| copyPurge | copy purge | false | false |
| copyForce | copy force | false | false |
| DisableVariantCheck | disable variant check | false | false |
| MinBytes | min bytes | 1024 | 1024 |
| MaxBytes | max bytes | 1048576 | 1048576 |
| MaxWait | max wait | 10 | 10 |
| kafkaTopic | kafka topic | "test" | "test" |
| KafkaConsumerGroup | kafka consumer group | "kafka-bend-ingest" | "test" |
| mockData | mock data | "" | "" |
| isJsonTransform | is json transform | true | true |
| databendDSN | databend dsn | no | "http://localhost:8000" |
| databendTable | databend table | no | "db1.tbl" |
| batchSize | batch size | 1000 | 1000 |
| batchMaxInterval | batch max interval | 30 | 30 |
| dataFormat | data format | json | "json" |
| workers | workers thread number | 1 | 1 |
| copyPurge | copy purge | false | false |
| copyForce | copy force | false | false |
| DisableVariantCheck | disable variant check | false | false |
| MinBytes | min bytes | 1024 | 1024 |
| MaxBytes | max bytes | 1048576 | 1048576 |
| MaxWait | max wait | 10 | 10 |
| useReplaceMode | use replace mode | false | false |

**NOTE:**
- The `useReplaceMode` is used to replace the data in the table, if the data already exists in the table, the new data will replace the old data. But the `useReplaceMode` is only supported when `isJsonTransform` false because it needs to add `koffset` and `kpartition` field in the target table.
5 changes: 3 additions & 2 deletions config/conf.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"kafkaTopic": "ingest_test",
"KafkaConsumerGroup": "test",
"mockData": "",
"isJsonTransform": true,
"isJsonTransform": false,
"databendDSN": "https://cloudapp:password@tn3ftqihs--medium-p8at.gw.aws-us-east-2.default.databend.com:443",
"databendTable": "default.kfk_test",
"batchSize": 10,
Expand All @@ -15,5 +15,6 @@
"disableVariantCheck": true,
"minBytes": 1024,
"maxBytes": 1048576,
"maxWait": 10
"maxWait": 10,
"useReplaceMode": false
}
4 changes: 4 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ type Config struct {
//
// Default: 10s
MaxWait int `json:"maxWait" default:"10"`

// UseReplaceMode determines whether to use the REPLACE INTO statement to insert data.
// replace into will upsert data
UseReplaceMode bool `json:"useReplaceMode" default:"false"`
}

func LoadConfig() (*Config, error) {
Expand Down
21 changes: 15 additions & 6 deletions consume_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,11 @@ func (c *ConsumeWorker) Close() {

func (c *ConsumeWorker) stepBatch() error {
l := logrus.WithFields(logrus.Fields{"consumer_worker": "stepBatch"})
logrus.Debug("read batch")
l.Debug("read batch")
batch, err := c.batchReader.ReadBatch()
if err != nil {
fmt.Fprintf(os.Stderr, "Failed to read batch from Kafka: %v\n", err)
l.Errorf("Failed to read batch from Kafka: %v", err)
return err
}
l.Debug("got batch")
Expand All @@ -48,13 +49,21 @@ func (c *ConsumeWorker) stepBatch() error {
}

l.Debug("DEBUG: ingest data")
if err := c.ig.IngestData(batch); err != nil {
fmt.Fprintf(os.Stderr, "Failed to ingest data between %d-%d into Databend: %v\n", batch.FirstMessageOffset, batch.LastMessageOffset, err)
l.Errorf("Failed to ingest data between %d-%d into Databend: %v", batch.FirstMessageOffset, batch.LastMessageOffset, err)
return err
if c.cfg.UseReplaceMode && !c.cfg.IsJsonTransform {
if err := c.ig.IngestParquetData(batch); err != nil {
fmt.Fprintf(os.Stderr, "Failed to ingest data between %d-%d into Databend: %v\n", batch.FirstMessageOffset, batch.LastMessageOffset, err)
l.Errorf("Failed to ingest data between %d-%d into Databend: %v", batch.FirstMessageOffset, batch.LastMessageOffset, err)
return err
}
} else {
if err := c.ig.IngestData(batch); err != nil {
fmt.Fprintf(os.Stderr, "Failed to ingest data between %d-%d into Databend: %v\n", batch.FirstMessageOffset, batch.LastMessageOffset, err)
l.Errorf("Failed to ingest data between %d-%d into Databend: %v", batch.FirstMessageOffset, batch.LastMessageOffset, err)
return err
}
}

logrus.Debug("DEBUG: commit")
l.Debug("DEBUG: commit")
maxRetries := 5
retryInterval := time.Second
for i := 0; i < maxRetries; i++ {
Expand Down
6 changes: 6 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,21 @@ require (
github.com/segmentio/kafka-go v0.4.39
github.com/sirupsen/logrus v1.9.0
github.com/test-go/testify v1.1.4
github.com/xitongsys/parquet-go v1.6.2
github.com/xitongsys/parquet-go-source v0.0.0-20200817004010-026bad9b25d0
)

require (
github.com/BurntSushi/toml v1.2.1 // indirect
github.com/apache/arrow/go/arrow v0.0.0-20200730104253-651201b0f516 // indirect
github.com/apache/thrift v0.14.2 // indirect
github.com/avast/retry-go v3.0.0+incompatible // indirect
github.com/benbjohnson/clock v1.3.5 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/felixge/httpsnoop v1.0.4 // indirect
github.com/go-logr/logr v1.4.1 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/golang/snappy v0.0.3 // indirect
github.com/klauspost/compress v1.15.9 // indirect
github.com/pierrec/lz4/v4 v4.1.15 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
Expand All @@ -28,4 +33,5 @@ require (
go.opentelemetry.io/otel/metric v1.24.0 // indirect
go.opentelemetry.io/otel/trace v1.24.0 // indirect
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8 // indirect
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 // indirect
)
Loading

0 comments on commit e7d7ebf

Please sign in to comment.