Skip to content

Commit

Permalink
Merge pull request #684 from linkedin/kafka-2.7.0
Browse files Browse the repository at this point in the history
Add support for Kafka 2.7.0
  • Loading branch information
bai authored Feb 15, 2021
2 parents 605e219 + 14cb76a commit 8819218
Show file tree
Hide file tree
Showing 6 changed files with 129 additions and 112 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ jobs:
strategy:
fail-fast: false
matrix:
go-version: [1.14.x, 1.15.x]
go-version: [1.15.x]
platform: [ubuntu-latest]

steps:
Expand All @@ -28,7 +28,7 @@ jobs:
${{ runner.os }}-go-
- name: Install dependencies
run: curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s -- -b $(go env GOPATH)/bin v1.26.0
run: curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s -- -b $(go env GOPATH)/bin v1.36.0

- name: Run test suite
run: make test
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# stage 1: builder
FROM golang:1.15.2-alpine as builder
FROM golang:1.15.8-alpine as builder

ENV BURROW_SRC /usr/src/Burrow/

Expand Down
10 changes: 5 additions & 5 deletions core/internal/consumer/kafka_zk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,8 +178,8 @@ func TestKafkaZkClient_watchGroupList(t *testing.T) {

func TestKafkaZkClient_resetOffsetWatchAndSend_BadPath(t *testing.T) {
mockZookeeper := helpers.MockZookeeperClient{}
mockZookeeper.On("GetW", "/consumers/testgroup/offsets/testtopic/0").Return([]byte("81234"), (*zk.Stat)(nil), (<-chan zk.Event)(nil), errors.New("badpath"))
mockZookeeper.On("GetW", "/consumers/testgroup/owners/testtopic/0").Return([]byte("testowner"), (*zk.Stat)(nil), (<-chan zk.Event)(nil), nil)
mockZookeeper.On("GetW", "/consumers/testgroup/offsets/testtopic/0").Return([]byte("81234"), (*zk.Stat)(nil), (<-chan zk.Event)(nil), errors.New("badpath")) // nolint:gocritic
mockZookeeper.On("GetW", "/consumers/testgroup/owners/testtopic/0").Return([]byte("testowner"), (*zk.Stat)(nil), (<-chan zk.Event)(nil), nil) // nolint:gocritic

module := fixtureKafkaZkModule()
module.Configure("test", "consumer.test")
Expand All @@ -200,7 +200,7 @@ func TestKafkaZkClient_resetOffsetWatchAndSend_BadOffset(t *testing.T) {
offsetStat := &zk.Stat{Mtime: 894859}
newWatchEventChan := make(chan zk.Event)
mockZookeeper.On("GetW", "/consumers/testgroup/offsets/testtopic/0").Return([]byte("notanumber"), offsetStat, func() <-chan zk.Event { return newWatchEventChan }(), nil)
mockZookeeper.On("GetW", "/consumers/testgroup/owners/testtopic/0").Return([]byte("testowner"), (*zk.Stat)(nil), (<-chan zk.Event)(nil), nil)
mockZookeeper.On("GetW", "/consumers/testgroup/owners/testtopic/0").Return([]byte("testowner"), (*zk.Stat)(nil), (<-chan zk.Event)(nil), nil) // nolint:gocritic

// This will block if a storage request is sent, as nothing is watching that channel
module.running.Add(1)
Expand All @@ -218,7 +218,7 @@ func TestKafkaZkClient_resetOffsetWatchAndSend_BadOffset(t *testing.T) {

func TestKafkaZkClient_resetPartitionListWatchAndAdd_BadPath(t *testing.T) {
mockZookeeper := helpers.MockZookeeperClient{}
mockZookeeper.On("ChildrenW", "/consumers/testgroup/offsets/testtopic").Return([]string{}, (*zk.Stat)(nil), (<-chan zk.Event)(nil), errors.New("badpath"))
mockZookeeper.On("ChildrenW", "/consumers/testgroup/offsets/testtopic").Return([]string{}, (*zk.Stat)(nil), (<-chan zk.Event)(nil), errors.New("badpath")) // nolint:gocritic

module := fixtureKafkaZkModule()
module.Configure("test", "consumer.test")
Expand Down Expand Up @@ -252,7 +252,7 @@ func TestKafkaZkClient_resetTopicListWatchAndAdd_BadPath(t *testing.T) {

func TestKafkaZkClient_resetGroupListWatchAndAdd_BadPath(t *testing.T) {
mockZookeeper := helpers.MockZookeeperClient{}
mockZookeeper.On("ChildrenW", "/consumers").Return([]string{}, (*zk.Stat)(nil), (<-chan zk.Event)(nil), errors.New("badpath"))
mockZookeeper.On("ChildrenW", "/consumers").Return([]string{}, (*zk.Stat)(nil), (<-chan zk.Event)(nil), errors.New("badpath")) // nolint:gocritic

module := fixtureKafkaZkModule()
module.Configure("test", "consumer.test")
Expand Down
1 change: 1 addition & 0 deletions core/internal/helpers/sarama.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ var kafkaVersions = map[string]sarama.KafkaVersion{
"2.4.0": sarama.V2_4_0_0,
"2.5.0": sarama.V2_5_0_0,
"2.6.0": sarama.V2_6_0_0,
"2.7.0": sarama.V2_7_0_0,
}

func parseKafkaVersion(kafkaVersion string) sarama.KafkaVersion {
Expand Down
45 changes: 20 additions & 25 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,45 +2,40 @@ module github.com/linkedin/Burrow

require (
github.com/OneOfOne/xxhash v1.2.8
github.com/Shopify/sarama v1.27.0
github.com/frankban/quicktest v1.10.1 // indirect
github.com/Shopify/sarama v1.28.0
github.com/fsnotify/fsnotify v1.4.9 // indirect
github.com/google/go-cmp v0.5.1 // indirect
github.com/google/uuid v1.1.1 // indirect
github.com/gopherjs/gopherjs v0.0.0-20200217142428-fce0ec30dd00 // indirect
github.com/google/uuid v1.2.0 // indirect
github.com/gopherjs/gopherjs v0.0.0-20210202160940-bed99a852dfe // indirect
github.com/julienschmidt/httprouter v1.3.0
github.com/karrick/goswarm v1.10.0
github.com/klauspost/compress v1.10.11 // indirect
github.com/mitchellh/mapstructure v1.3.3 // indirect
github.com/pborman/uuid v1.2.0
github.com/pelletier/go-toml v1.8.0 // indirect
github.com/magiconair/properties v1.8.4 // indirect
github.com/mitchellh/mapstructure v1.4.1 // indirect
github.com/pborman/uuid v1.2.1
github.com/pelletier/go-toml v1.8.1 // indirect
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.7.1
github.com/prometheus/common v0.11.1 // indirect
github.com/samuel/go-zookeeper v0.0.0-20200724154423-2164a8ac840e
github.com/smartystreets/assertions v1.1.1 // indirect
github.com/spf13/afero v1.3.4 // indirect
github.com/prometheus/client_golang v1.9.0
github.com/prometheus/procfs v0.6.0 // indirect
github.com/samuel/go-zookeeper v0.0.0-20201211165307-7117e9ea2414
github.com/smartystreets/assertions v1.2.0 // indirect
github.com/spf13/afero v1.5.1 // indirect
github.com/spf13/cast v1.3.1 // indirect
github.com/spf13/jwalterweatherman v1.1.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/spf13/viper v1.7.1
github.com/stretchr/objx v0.3.0 // indirect
github.com/stretchr/testify v1.6.1
github.com/stretchr/testify v1.7.0
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c
go.uber.org/zap v1.15.0
golang.org/x/crypto v0.0.0-20200728195943-123391ffb6de // indirect
golang.org/x/lint v0.0.0-20200302205851-738671d3881b // indirect
golang.org/x/net v0.0.0-20200813134508-3edf25e44fcc // indirect
golang.org/x/sys v0.0.0-20200812155832-6a926be9bd1d // indirect
golang.org/x/tools v0.0.0-20200813231717-0a73ddcff9b8 // indirect
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect
go.uber.org/multierr v1.6.0 // indirect
go.uber.org/zap v1.16.0
golang.org/x/lint v0.0.0-20201208152925-83fdc39ff7b5 // indirect
golang.org/x/mod v0.4.1 // indirect
google.golang.org/protobuf v1.25.0 // indirect
gopkg.in/alexcesaro/quotedprintable.v3 v3.0.0-20150716171945-2caba252f4dc // indirect
gopkg.in/gomail.v2 v2.0.0-20160411212932-81ebce5c23df
gopkg.in/ini.v1 v1.58.0 // indirect
gopkg.in/ini.v1 v1.62.0 // indirect
gopkg.in/natefinch/lumberjack.v2 v2.0.0
gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776 // indirect
honnef.co/go/tools v0.0.1-2020.1.5 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
honnef.co/go/tools v0.1.1 // indirect
)

go 1.15
Loading

0 comments on commit 8819218

Please sign in to comment.