diff --git a/README.md b/README.md index 809be75..1adca92 100644 --- a/README.md +++ b/README.md @@ -58,6 +58,25 @@ SDK support with custom configuration for kafka stream, that is : } ``` +#### Authentication +Supported authentication mode for kafka stream: + +**1. SASL SCRAM** + +Example configuration: + +```go + config := &eventstream.BrokerConfig{ + ... + SecurityConfig: &eventstream.SecurityConfig{ + AuthenticationType: "SASL-SCRAM", + SASLUsername: "your-username", + SASLPassword: "your-password", + }, + CACertFile: "path-to-cacert-file", + } +``` + ### Stdout Stream This stream is for testing purpose. This will print the event in stdout. It should not be used in production since this will print unnecessary log. diff --git a/eventstream.go b/eventstream.go index cf7585d..6ba332a 100644 --- a/eventstream.go +++ b/eventstream.go @@ -81,6 +81,14 @@ type BrokerConfig struct { ReadTimeout time.Duration WriteTimeout time.Duration Balancer kafka.Balancer + SecurityConfig *SecurityConfig +} + +// SecurityConfig contains security configuration for message broker +type SecurityConfig struct { + AuthenticationType string + SASLUsername string + SASLPassword string } // PublishBuilder defines the structure of message which is sent through message broker diff --git a/go.mod b/go.mod index 06c32d8..187cbe0 100644 --- a/go.mod +++ b/go.mod @@ -23,7 +23,11 @@ require ( github.com/pariz/gountries v0.0.0-20171019111738-adb00f6513a3 // indirect github.com/pierrec/lz4 v2.6.0+incompatible // indirect github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c // indirect + github.com/xdg/stringprep v1.0.0 // indirect + golang.org/x/crypto v0.0.0-20190506204251-e1dfcc566284 // indirect golang.org/x/sys v0.0.0-20190412213103-97732733099d // indirect + golang.org/x/text v0.3.0 // indirect gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 // indirect gopkg.in/yaml.v2 v2.2.2 // indirect gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c // indirect diff --git a/kafka.go b/kafka.go index 3b98da9..0e80980 100644 --- a/kafka.go +++ b/kafka.go @@ -28,6 +28,7 @@ import ( "github.com/cenkalti/backoff" "github.com/segmentio/kafka-go" + "github.com/segmentio/kafka-go/sasl/scram" "github.com/sirupsen/logrus" ) @@ -35,6 +36,7 @@ const ( defaultReaderSize = 10e6 // 10MB maxBackOffCount = 3 kafkaMaxWait = time.Second + saslScramAuth = "SASL-SCRAM" ) var ( @@ -86,12 +88,27 @@ func setConfig(writerConfig *kafka.WriterConfig, readerConfig *kafka.ReaderConfi readerConfig.Dialer = dialer } + if config.SecurityConfig.AuthenticationType == saslScramAuth { + if config.CACertFile == "" { + err := errors.New(fmt.Sprintf("CA Cert File is required for %s authentication", saslScramAuth)) + logrus.Error("unable to initialize kafka scram authentication", err) + return err + } + + mechanism, err := scram.Mechanism(scram.SHA512, config.SecurityConfig.SASLUsername, config.SecurityConfig.SASLPassword) + if err != nil { + logrus.Error("unable to initialize kafka scram authentication", err) + return err + } + dialer.SASLMechanism = mechanism + } + if config.CACertFile != "" { logrus.Debug("set TLS certificate") cert, err := GetTLSCertFromFile(config.CACertFile) if err != nil { - logrus.Error(err, "unable to get TLS certificate") + logrus.Error("unable to get TLS certificate", err) return err }