Skip to content

Commit

Permalink
Merge pull request #46 from marselsampe/feat/support-sasl-scram-auth
Browse files Browse the repository at this point in the history
feat: Support Kafka SASL Scram authentication
  • Loading branch information
angelo-chan authored Sep 20, 2022
2 parents e5849cd + 4939b83 commit c112661
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 1 deletion.
19 changes: 19 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,25 @@ SDK support with custom configuration for kafka stream, that is :
}
```

#### Authentication
Supported authentication mode for kafka stream:

**1. SASL SCRAM**

Example configuration:

```go
config := &eventstream.BrokerConfig{
...
SecurityConfig: &eventstream.SecurityConfig{
AuthenticationType: "SASL-SCRAM",
SASLUsername: "your-username",
SASLPassword: "your-password",
},
CACertFile: "path-to-cacert-file",
}
```

### Stdout Stream
This stream is for testing purpose. This will print the event in stdout. It should not be used in production since this
will print unnecessary log.
Expand Down
8 changes: 8 additions & 0 deletions eventstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,14 @@ type BrokerConfig struct {
ReadTimeout time.Duration
WriteTimeout time.Duration
Balancer kafka.Balancer
SecurityConfig *SecurityConfig
}

// SecurityConfig contains security configuration for message broker
type SecurityConfig struct {
AuthenticationType string
SASLUsername string
SASLPassword string
}

// PublishBuilder defines the structure of message which is sent through message broker
Expand Down
4 changes: 4 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,11 @@ require (
github.com/pariz/gountries v0.0.0-20171019111738-adb00f6513a3 // indirect
github.com/pierrec/lz4 v2.6.0+incompatible // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c // indirect
github.com/xdg/stringprep v1.0.0 // indirect
golang.org/x/crypto v0.0.0-20190506204251-e1dfcc566284 // indirect
golang.org/x/sys v0.0.0-20190412213103-97732733099d // indirect
golang.org/x/text v0.3.0 // indirect
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 // indirect
gopkg.in/yaml.v2 v2.2.2 // indirect
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c // indirect
Expand Down
19 changes: 18 additions & 1 deletion kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,15 @@ import (

"github.com/cenkalti/backoff"
"github.com/segmentio/kafka-go"
"github.com/segmentio/kafka-go/sasl/scram"
"github.com/sirupsen/logrus"
)

const (
defaultReaderSize = 10e6 // 10MB
maxBackOffCount = 3
kafkaMaxWait = time.Second
saslScramAuth = "SASL-SCRAM"
)

var (
Expand Down Expand Up @@ -86,12 +88,27 @@ func setConfig(writerConfig *kafka.WriterConfig, readerConfig *kafka.ReaderConfi
readerConfig.Dialer = dialer
}

if config.SecurityConfig.AuthenticationType == saslScramAuth {
if config.CACertFile == "" {
err := errors.New(fmt.Sprintf("CA Cert File is required for %s authentication", saslScramAuth))
logrus.Error("unable to initialize kafka scram authentication", err)
return err
}

mechanism, err := scram.Mechanism(scram.SHA512, config.SecurityConfig.SASLUsername, config.SecurityConfig.SASLPassword)
if err != nil {
logrus.Error("unable to initialize kafka scram authentication", err)
return err
}
dialer.SASLMechanism = mechanism
}

if config.CACertFile != "" {
logrus.Debug("set TLS certificate")

cert, err := GetTLSCertFromFile(config.CACertFile)
if err != nil {
logrus.Error(err, "unable to get TLS certificate")
logrus.Error("unable to get TLS certificate", err)
return err
}

Expand Down

0 comments on commit c112661

Please sign in to comment.