Skip to content

Commit

Permalink
Fix franz offset out (#665)
Browse files Browse the repository at this point in the history
* Reapply "use franz-go in kafka plugin (#646)" (#664)
* goroutine per kafka partition consuming
* fix timestamp in producer
  • Loading branch information
DmitryRomanov authored Sep 16, 2024
1 parent 8c767d0 commit 201f324
Show file tree
Hide file tree
Showing 20 changed files with 802 additions and 445 deletions.
113 changes: 113 additions & 0 deletions cfg/kafka_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
package cfg

import (
"crypto/tls"
"os"

"github.com/twmb/franz-go/pkg/kgo"
"github.com/twmb/franz-go/pkg/sasl/aws"
"github.com/twmb/franz-go/pkg/sasl/plain"
"github.com/twmb/franz-go/pkg/sasl/scram"
"github.com/twmb/franz-go/plugin/kzap"
"github.com/twmb/tlscfg"
"go.uber.org/zap"
)

type KafkaClientConfig interface {
GetBrokers() []string
GetClientID() string

IsSaslEnabled() bool
GetSaslConfig() KafkaClientSaslConfig

IsSslEnabled() bool
GetSslConfig() KafkaClientSslConfig
}

type KafkaClientSaslConfig struct {
SaslMechanism string
SaslUsername string
SaslPassword string
}

type KafkaClientSslConfig struct {
CACert string
ClientCert string
ClientKey string
SslSkipVerify bool
}

func GetKafkaClientOptions(c KafkaClientConfig, l *zap.Logger) []kgo.Opt {
opts := []kgo.Opt{
kgo.SeedBrokers(c.GetBrokers()...),
kgo.ClientID(c.GetClientID()),
kgo.WithLogger(kzap.New(l)),
}

if c.IsSaslEnabled() {
saslConfig := c.GetSaslConfig()
switch saslConfig.SaslMechanism {
case "PLAIN":
opts = append(opts, kgo.SASL(plain.Auth{
User: saslConfig.SaslUsername,
Pass: saslConfig.SaslPassword,
}.AsMechanism()))
case "SCRAM-SHA-256":
opts = append(opts, kgo.SASL(scram.Auth{
User: saslConfig.SaslUsername,
Pass: saslConfig.SaslPassword,
}.AsSha256Mechanism()))
case "SCRAM-SHA-512":
opts = append(opts, kgo.SASL(scram.Auth{
User: saslConfig.SaslUsername,
Pass: saslConfig.SaslPassword,
}.AsSha512Mechanism()))
case "AWS_MSK_IAM":
opts = append(opts, kgo.SASL(aws.Auth{
AccessKey: saslConfig.SaslUsername,
SecretKey: saslConfig.SaslPassword,
}.AsManagedStreamingIAMMechanism()))
}
opts = append(opts, kgo.DialTLSConfig(new(tls.Config)))
}

if c.IsSslEnabled() {
sslConfig := c.GetSslConfig()
tlsOpts := []tlscfg.Opt{}
if sslConfig.CACert != "" || sslConfig.ClientCert != "" || sslConfig.ClientKey != "" {
if sslConfig.CACert != "" {
if _, err := os.Stat(sslConfig.CACert); err != nil {
tlsOpts = append(tlsOpts,
tlscfg.WithCA(
[]byte(sslConfig.CACert), tlscfg.ForClient,
),
)
} else {
tlsOpts = append(tlsOpts,
tlscfg.MaybeWithDiskCA(sslConfig.CACert, tlscfg.ForClient),
)
}
}

if _, err := os.Stat(sslConfig.ClientCert); err != nil {
tlsOpts = append(tlsOpts,
tlscfg.WithKeyPair(
[]byte(sslConfig.ClientCert), []byte(sslConfig.ClientKey),
),
)
} else {
tlsOpts = append(tlsOpts,
tlscfg.MaybeWithDiskKeyPair(sslConfig.ClientCert, sslConfig.ClientKey),
)
}
}
tc, err := tlscfg.New(tlsOpts...)
if err != nil {
l.Fatal("unable to create tls config", zap.Error(err))
}
tc.InsecureSkipVerify = sslConfig.SslSkipVerify
opts = append(opts, kgo.DialTLSConfig(tc))
}

return opts
}
38 changes: 24 additions & 14 deletions e2e/kafka_auth/kafka_auth.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
package kafka_auth

import (
"context"
"testing"
"time"

"github.com/ozontech/file.d/cfg"
kafka_in "github.com/ozontech/file.d/plugin/input/kafka"
kafka_out "github.com/ozontech/file.d/plugin/output/kafka"
"github.com/stretchr/testify/require"
"github.com/twmb/franz-go/pkg/kgo"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)
Expand Down Expand Up @@ -111,22 +113,23 @@ func (c *Config) Configure(t *testing.T, _ *cfg.Config, _ string) {
config.ClientCert = "./kafka_auth/certs/client_cert.pem"
}

kafka_out.NewProducer(config,
zap.NewNop().WithOptions(zap.WithFatalHook(zapcore.WriteThenPanic)).Sugar(),
kafka_out.NewClient(config,
zap.NewNop().WithOptions(zap.WithFatalHook(zapcore.WriteThenPanic)),
)
},
func() {
config := &kafka_in.Config{
Brokers: c.Brokers,
Topics: []string{inTopic},
ConsumerGroup: "test-auth",
ClientID: "test-auth-in",
ChannelBufferSize: 256,
Offset_: kafka_in.OffsetTypeNewest,
ConsumerMaxProcessingTime_: 200 * time.Millisecond,
ConsumerMaxWaitTime_: 250 * time.Millisecond,
SslEnabled: true,
SslSkipVerify: true,
Brokers: c.Brokers,
Topics: []string{inTopic},
ConsumerGroup: "test-auth",
ClientID: "test-auth-in",
ChannelBufferSize: 256,
Offset_: kafka_in.OffsetTypeNewest,
ConsumerMaxWaitTime_: 250 * time.Millisecond,
SslEnabled: true,
SslSkipVerify: true,
SessionTimeout_: 10 * time.Second,
AutoCommitInterval_: 1 * time.Second,
}
if tt.sasl.Enabled {
config.SaslEnabled = true
Expand All @@ -140,8 +143,9 @@ func (c *Config) Configure(t *testing.T, _ *cfg.Config, _ string) {
config.ClientCert = "./kafka_auth/certs/client_cert.pem"
}

kafka_in.NewConsumerGroup(config,
zap.NewNop().WithOptions(zap.WithFatalHook(zapcore.WriteThenPanic)).Sugar(),
kafka_in.NewClient(config,
zap.NewNop().WithOptions(zap.WithFatalHook(zapcore.WriteThenPanic)),
Consumer{},
)
},
}
Expand All @@ -159,3 +163,9 @@ func (c *Config) Configure(t *testing.T, _ *cfg.Config, _ string) {
func (c *Config) Send(_ *testing.T) {}

func (c *Config) Validate(_ *testing.T) {}

type Consumer struct{}

func (c Consumer) Assigned(_ context.Context, _ *kgo.Client, assigned map[string][]int32) {}

func (c Consumer) Lost(_ context.Context, _ *kgo.Client, lost map[string][]int32) {}
35 changes: 23 additions & 12 deletions e2e/kafka_file/kafka_file.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,21 @@
package kafka_file

import (
"context"
"log"
"path"
"testing"
"time"

"github.com/Shopify/sarama"
"github.com/ozontech/file.d/cfg"
kafka_out "github.com/ozontech/file.d/plugin/output/kafka"
"github.com/ozontech/file.d/test"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/twmb/franz-go/pkg/kadm"
"github.com/twmb/franz-go/pkg/kgo"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)

// In this test, a message sender is created that generates one message for each partition. These messages are sent Count times.
Expand Down Expand Up @@ -41,27 +46,33 @@ func (c *Config) Configure(t *testing.T, conf *cfg.Config, pipelineName string)

// Send creates a Partition of messages (one for each partition) and sends them Count times to kafka
func (c *Config) Send(t *testing.T) {
config := sarama.NewConfig()
config.Producer.Flush.Frequency = time.Millisecond
config.Producer.Return.Errors = true
config.Producer.Return.Successes = true
config := &kafka_out.Config{
Brokers: c.Brokers,
MaxMessageBytes_: 512,
BatchSize_: c.Count,
}

producer, err := sarama.NewSyncProducer(c.Brokers, config)
client := kafka_out.NewClient(config,
zap.NewNop().WithOptions(zap.WithFatalHook(zapcore.WriteThenPanic)),
)
adminClient := kadm.NewClient(client)
_, err := adminClient.CreateTopic(context.TODO(), 1, 1, nil, c.Topics[0])
if err != nil {
log.Fatalf("failed to create async producer: %s", err.Error())
t.Logf("cannot create topic: %s %s", c.Topics[0], err.Error())
}
msgs := make([]*sarama.ProducerMessage, c.Partition)
message := sarama.StringEncoder(`{"key":"value"}`)

msgs := make([]*kgo.Record, c.Partition)
for i := range msgs {
msgs[i] = &sarama.ProducerMessage{}
msgs[i] = &kgo.Record{}
msgs[i].Value = []byte(`{"key":"value"}`)
msgs[i].Topic = c.Topics[0]
msgs[i].Value = message
msgs[i].Partition = int32(i)
}

for i := 0; i < c.Count; i++ {
if err = producer.SendMessages(msgs); err != nil {
result := client.ProduceSync(context.TODO(), msgs...)
err := result.FirstErr()
if err != nil {
log.Fatalf("failed to send messages: %s", err.Error())
}
}
Expand Down
19 changes: 0 additions & 19 deletions e2e/split_join/handler.go

This file was deleted.

Loading

0 comments on commit 201f324

Please sign in to comment.