Skip to content

Commit

Permalink
use franz-go in kafka plugin (#646)
Browse files Browse the repository at this point in the history
* add some options for input kafka (heartbeat,session,AWS_MSK_IAM sasl mechanism,auto_commit_interval,balancer_plan,max_concurrent_fetches,fetch_max_bytes,fetch_min_bytes)
* add some options for output kafka (compression,ack)
* remove of using sarama
* fix flaky split_join e2e test
  • Loading branch information
DmitryRomanov authored Jul 19, 2024
1 parent 6481ea8 commit b7f14f5
Show file tree
Hide file tree
Showing 18 changed files with 623 additions and 419 deletions.
113 changes: 113 additions & 0 deletions cfg/kafka_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
package cfg

import (
"crypto/tls"
"os"

"github.com/twmb/franz-go/pkg/kgo"
"github.com/twmb/franz-go/pkg/sasl/aws"
"github.com/twmb/franz-go/pkg/sasl/plain"
"github.com/twmb/franz-go/pkg/sasl/scram"
"github.com/twmb/franz-go/plugin/kzap"
"github.com/twmb/tlscfg"
"go.uber.org/zap"
)

type KafkaClientConfig interface {
GetBrokers() []string
GetClientID() string

IsSaslEnabled() bool
GetSaslConfig() KafkaClientSaslConfig

IsSslEnabled() bool
GetSslConfig() KafkaClientSslConfig
}

type KafkaClientSaslConfig struct {
SaslMechanism string
SaslUsername string
SaslPassword string
}

type KafkaClientSslConfig struct {
CACert string
ClientCert string
ClientKey string
SslSkipVerify bool
}

func GetKafkaClientOptions(c KafkaClientConfig, l *zap.Logger) []kgo.Opt {
opts := []kgo.Opt{
kgo.SeedBrokers(c.GetBrokers()...),
kgo.ClientID(c.GetClientID()),
kgo.WithLogger(kzap.New(l)),
}

if c.IsSaslEnabled() {
saslConfig := c.GetSaslConfig()
switch saslConfig.SaslMechanism {
case "PLAIN":
opts = append(opts, kgo.SASL(plain.Auth{
User: saslConfig.SaslUsername,
Pass: saslConfig.SaslPassword,
}.AsMechanism()))
case "SCRAM-SHA-256":
opts = append(opts, kgo.SASL(scram.Auth{
User: saslConfig.SaslUsername,
Pass: saslConfig.SaslPassword,
}.AsSha256Mechanism()))
case "SCRAM-SHA-512":
opts = append(opts, kgo.SASL(scram.Auth{
User: saslConfig.SaslUsername,
Pass: saslConfig.SaslPassword,
}.AsSha512Mechanism()))
case "AWS_MSK_IAM":
opts = append(opts, kgo.SASL(aws.Auth{
AccessKey: saslConfig.SaslUsername,
SecretKey: saslConfig.SaslPassword,
}.AsManagedStreamingIAMMechanism()))
}
opts = append(opts, kgo.DialTLSConfig(new(tls.Config)))
}

if c.IsSslEnabled() {
sslConfig := c.GetSslConfig()
tlsOpts := []tlscfg.Opt{}
if sslConfig.CACert != "" || sslConfig.ClientCert != "" || sslConfig.ClientKey != "" {
if sslConfig.CACert != "" {
if _, err := os.Stat(sslConfig.CACert); err != nil {
tlsOpts = append(tlsOpts,
tlscfg.WithCA(
[]byte(sslConfig.CACert), tlscfg.ForClient,
),
)
} else {
tlsOpts = append(tlsOpts,
tlscfg.MaybeWithDiskCA(sslConfig.CACert, tlscfg.ForClient),
)
}
}

if _, err := os.Stat(sslConfig.ClientCert); err != nil {
tlsOpts = append(tlsOpts,
tlscfg.WithKeyPair(
[]byte(sslConfig.ClientCert), []byte(sslConfig.ClientKey),
),
)
} else {
tlsOpts = append(tlsOpts,
tlscfg.MaybeWithDiskKeyPair(sslConfig.ClientCert, sslConfig.ClientKey),
)
}
}
tc, err := tlscfg.New(tlsOpts...)
if err != nil {
l.Fatal("unable to create tls config", zap.Error(err))
}
tc.InsecureSkipVerify = sslConfig.SslSkipVerify
opts = append(opts, kgo.DialTLSConfig(tc))
}

return opts
}
29 changes: 15 additions & 14 deletions e2e/kafka_auth/kafka_auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,22 +111,23 @@ func (c *Config) Configure(t *testing.T, _ *cfg.Config, _ string) {
config.ClientCert = "./kafka_auth/certs/client_cert.pem"
}

kafka_out.NewProducer(config,
zap.NewNop().WithOptions(zap.WithFatalHook(zapcore.WriteThenPanic)).Sugar(),
kafka_out.NewClient(config,
zap.NewNop().WithOptions(zap.WithFatalHook(zapcore.WriteThenPanic)),
)
},
func() {
config := &kafka_in.Config{
Brokers: c.Brokers,
Topics: []string{inTopic},
ConsumerGroup: "test-auth",
ClientID: "test-auth-in",
ChannelBufferSize: 256,
Offset_: kafka_in.OffsetTypeNewest,
ConsumerMaxProcessingTime_: 200 * time.Millisecond,
ConsumerMaxWaitTime_: 250 * time.Millisecond,
SslEnabled: true,
SslSkipVerify: true,
Brokers: c.Brokers,
Topics: []string{inTopic},
ConsumerGroup: "test-auth",
ClientID: "test-auth-in",
ChannelBufferSize: 256,
Offset_: kafka_in.OffsetTypeNewest,
ConsumerMaxWaitTime_: 250 * time.Millisecond,
SslEnabled: true,
SslSkipVerify: true,
SessionTimeout_: 10 * time.Second,
AutoCommitInterval_: 1 * time.Second,
}
if tt.sasl.Enabled {
config.SaslEnabled = true
Expand All @@ -140,8 +141,8 @@ func (c *Config) Configure(t *testing.T, _ *cfg.Config, _ string) {
config.ClientCert = "./kafka_auth/certs/client_cert.pem"
}

kafka_in.NewConsumerGroup(config,
zap.NewNop().WithOptions(zap.WithFatalHook(zapcore.WriteThenPanic)).Sugar(),
kafka_in.NewClient(config,
zap.NewNop().WithOptions(zap.WithFatalHook(zapcore.WriteThenPanic)),
)
},
}
Expand Down
35 changes: 23 additions & 12 deletions e2e/kafka_file/kafka_file.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,21 @@
package kafka_file

import (
"context"
"log"
"path"
"testing"
"time"

"github.com/Shopify/sarama"
"github.com/ozontech/file.d/cfg"
kafka_out "github.com/ozontech/file.d/plugin/output/kafka"
"github.com/ozontech/file.d/test"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/twmb/franz-go/pkg/kadm"
"github.com/twmb/franz-go/pkg/kgo"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)

// In this test, a message sender is created that generates one message for each partition. These messages are sent Count times.
Expand Down Expand Up @@ -41,27 +46,33 @@ func (c *Config) Configure(t *testing.T, conf *cfg.Config, pipelineName string)

// Send creates a Partition of messages (one for each partition) and sends them Count times to kafka
func (c *Config) Send(t *testing.T) {
config := sarama.NewConfig()
config.Producer.Flush.Frequency = time.Millisecond
config.Producer.Return.Errors = true
config.Producer.Return.Successes = true
config := &kafka_out.Config{
Brokers: c.Brokers,
MaxMessageBytes_: 512,
BatchSize_: c.Count,
}

producer, err := sarama.NewSyncProducer(c.Brokers, config)
client := kafka_out.NewClient(config,
zap.NewNop().WithOptions(zap.WithFatalHook(zapcore.WriteThenPanic)),
)
adminClient := kadm.NewClient(client)
_, err := adminClient.CreateTopic(context.TODO(), 1, 1, nil, c.Topics[0])
if err != nil {
log.Fatalf("failed to create async producer: %s", err.Error())
t.Logf("cannot create topic: %s %s", c.Topics[0], err.Error())
}
msgs := make([]*sarama.ProducerMessage, c.Partition)
message := sarama.StringEncoder(`{"key":"value"}`)

msgs := make([]*kgo.Record, c.Partition)
for i := range msgs {
msgs[i] = &sarama.ProducerMessage{}
msgs[i] = &kgo.Record{}
msgs[i].Value = []byte(`{"key":"value"}`)
msgs[i].Topic = c.Topics[0]
msgs[i].Value = message
msgs[i].Partition = int32(i)
}

for i := 0; i < c.Count; i++ {
if err = producer.SendMessages(msgs); err != nil {
result := client.ProduceSync(context.TODO(), msgs...)
err := result.FirstErr()
if err != nil {
log.Fatalf("failed to send messages: %s", err.Error())
}
}
Expand Down
19 changes: 0 additions & 19 deletions e2e/split_join/handler.go

This file was deleted.

76 changes: 42 additions & 34 deletions e2e/split_join/split_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,17 @@ import (
"os"
"path"
"path/filepath"
"strings"
"reflect"
"testing"
"time"

"github.com/Shopify/sarama"
"github.com/ozontech/file.d/cfg"
kafka_in "github.com/ozontech/file.d/plugin/input/kafka"
"github.com/stretchr/testify/require"
"github.com/twmb/franz-go/pkg/kadm"
"github.com/twmb/franz-go/pkg/kgo"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)

const (
Expand All @@ -27,7 +31,7 @@ const (

type Config struct {
inputDir string
consumer sarama.ConsumerGroup
client *kgo.Client
topic string
}

Expand All @@ -48,18 +52,23 @@ func (c *Config) Configure(t *testing.T, conf *cfg.Config, pipelineName string)
output.Set("brokers", []string{brokerHost})
output.Set("default_topic", c.topic)

addrs := []string{brokerHost}
config := sarama.NewConfig()
config.Consumer.Offsets.Initial = sarama.OffsetOldest
config := &kafka_in.Config{
Brokers: []string{brokerHost},
Topics: []string{c.topic},
ConsumerGroup: group,
Offset_: kafka_in.OffsetTypeOldest,
SessionTimeout_: 10 * time.Second,
AutoCommitInterval_: 1 * time.Second,
ConsumerMaxWaitTime_: 1 * time.Second,
HeartbeatInterval_: 10 * time.Second,
}

admin, err := sarama.NewClusterAdmin(addrs, config)
r.NoError(err)
r.NoError(admin.CreateTopic(c.topic, &sarama.TopicDetail{
NumPartitions: 1,
ReplicationFactor: 1,
}, false))
c.client = kafka_in.NewClient(config,
zap.NewNop().WithOptions(zap.WithFatalHook(zapcore.WriteThenPanic)),
)

c.consumer, err = sarama.NewConsumerGroup(addrs, group, config)
adminClient := kadm.NewClient(c.client)
_, err := adminClient.CreateTopic(context.TODO(), 1, 1, nil, c.topic)
r.NoError(err)
}

Expand All @@ -83,38 +92,37 @@ func (c *Config) Validate(t *testing.T) {
defer cancel()

expectedEventsCount := messages * arrayLen

strBuilder := strings.Builder{}
result := make(map[string]int, arrayLen)
gotEvents := 0
done := make(chan struct{})

go func() {
r.NoError(c.consumer.Consume(ctx, []string{c.topic}, handlerFunc(func(msg *sarama.ConsumerMessage) {
strBuilder.Write(msg.Value)
strBuilder.WriteString("\n")
gotEvents++
if gotEvents == expectedEventsCount {
close(done)
}
})))
for {
fetches := c.client.PollFetches(ctx)
fetches.EachError(func(topic string, p int32, err error) {})
fetches.EachRecord(func(r *kgo.Record) {
result[string(r.Value)]++
gotEvents++
if gotEvents == expectedEventsCount {
close(done)
}
})
}
}()

select {
case <-done:
case <-ctx.Done():
r.Failf("test timed out", "got: %v, expected: %v, consumed: %s", gotEvents, expectedEventsCount, strBuilder.String())
r.Failf("test timed out", "got: %v, expected: %v", gotEvents, expectedEventsCount)
}

got := strBuilder.String()

expected := strings.Repeat(`{"first":"1"}
{"message":"start continue"}
{"second":"2"}
{"third":"3"}
`,
messages)
expected := map[string]int{
"{\"first\":\"1\"}": messages,
"{\"message\":\"start continue\"}": messages,
"{\"second\":\"2\"}": messages,
"{\"third\":\"3\"}": messages,
}

r.Equal(len(expected), len(got))
r.Equal(expected, got)
r.True(reflect.DeepEqual(expected, result))
r.Equal(expectedEventsCount, gotEvents)
}
Loading

0 comments on commit b7f14f5

Please sign in to comment.