Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

upgrade to go 1.18 #1253

Draft
wants to merge 14 commits into
base: main
Choose a base branch
from
30 changes: 15 additions & 15 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,19 @@ version: 2
jobs:
lint:
docker:
- image: golangci/golangci-lint:v1.45-alpine
- image: golangci/golangci-lint:v1.53-alpine
steps:
- checkout
- run: golangci-lint run

# The kafka 0.10 tests are maintained as a separate configuration because
# kafka only supported plain text SASL in this version.
kafka-010:
working_directory: &working_directory /go/src/github.com/segmentio/kafka-go
working_directory: &working_directory /home/circleci/src/github.com/segmentio/kafka-go
environment:
KAFKA_VERSION: "0.10.1"
docker:
- image: circleci/golang
- image: cimg/go:1.18
- image: wurstmeister/zookeeper
ports:
- 2181:2181
Expand All @@ -40,12 +40,12 @@ jobs:
steps: &steps
- checkout
- restore_cache:
key: kafka-go-mod-{{ checksum "go.sum" }}-1
key: kafka-go-mod-v3-{{ checksum "go.sum" }}-1
- run:
name: Download dependencies
command: go mod download
- save_cache:
key: kafka-go-mod-{{ checksum "go.sum" }}-1
key: kafka-go-mod-v3-{{ checksum "go.sum" }}-1
paths:
- /go/pkg/mod
- run:
Expand All @@ -67,7 +67,7 @@ jobs:
environment:
KAFKA_VERSION: "0.11.0"
docker:
- image: circleci/golang
- image: cimg/go:1.18
- image: wurstmeister/zookeeper
ports:
- 2181:2181
Expand Down Expand Up @@ -99,7 +99,7 @@ jobs:
environment:
KAFKA_VERSION: "1.0.1"
docker:
- image: circleci/golang
- image: cimg/go:1.18
- image: wurstmeister/zookeeper
ports:
- 2181:2181
Expand All @@ -115,7 +115,7 @@ jobs:
environment:
KAFKA_VERSION: "1.1.1"
docker:
- image: circleci/golang
- image: cimg/go:1.18
- image: wurstmeister/zookeeper
ports:
- 2181:2181
Expand All @@ -131,7 +131,7 @@ jobs:
environment:
KAFKA_VERSION: "2.0.1"
docker:
- image: circleci/golang
- image: cimg/go:1.18
- image: wurstmeister/zookeeper
ports:
- 2181:2181
Expand All @@ -150,7 +150,7 @@ jobs:
environment:
KAFKA_VERSION: "2.1.1"
docker:
- image: circleci/golang
- image: cimg/go:1.18
- image: wurstmeister/zookeeper
ports:
- 2181:2181
Expand All @@ -176,7 +176,7 @@ jobs:
environment:
KAFKA_VERSION: "2.2.2"
docker:
- image: circleci/golang
- image: cimg/go:1.18
- image: wurstmeister/zookeeper
ports:
- 2181:2181
Expand All @@ -193,7 +193,7 @@ jobs:
environment:
KAFKA_VERSION: "2.3.1"
docker:
- image: circleci/golang
- image: cimg/go:1.18
- image: wurstmeister/zookeeper
ports:
- 2181:2181
Expand Down Expand Up @@ -221,7 +221,7 @@ jobs:
# TODO: Figure out why these are happening and fix them (they don't appear to be new).
KAFKA_SKIP_NETTEST: "1"
docker:
- image: circleci/golang
- image: cimg/go:1.18
- image: wurstmeister/zookeeper
ports:
- 2181:2181
Expand Down Expand Up @@ -249,7 +249,7 @@ jobs:
# TODO: Figure out why these are happening and fix them (they don't appear to be new).
KAFKA_SKIP_NETTEST: "1"
docker:
- image: circleci/golang
- image: cimg/go:1.18
- image: wurstmeister/zookeeper
ports:
- 2181:2181
Expand Down Expand Up @@ -279,7 +279,7 @@ jobs:
# TODO: Figure out why these are happening and fix them (they don't appear to be new).
KAFKA_SKIP_NETTEST: "1"
docker:
- image: circleci/golang
- image: cimg/go:1.18
- image: wurstmeister/zookeeper
ports:
- 2181:2181
Expand Down
1 change: 1 addition & 0 deletions compress/snappy/go-xerial-snappy/fuzz.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
//go:build gofuzz
// +build gofuzz

package snappy
Expand Down
22 changes: 11 additions & 11 deletions consumergroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -925,12 +925,12 @@ func (cg *ConsumerGroup) coordinator() (coordinator, error) {
// the leader. Otherwise, GroupMemberAssignments will be nil.
//
// Possible kafka error codes returned:
// * GroupLoadInProgress:
// * GroupCoordinatorNotAvailable:
// * NotCoordinatorForGroup:
// * InconsistentGroupProtocol:
// * InvalidSessionTimeout:
// * GroupAuthorizationFailed:
// - GroupLoadInProgress:
// - GroupCoordinatorNotAvailable:
// - NotCoordinatorForGroup:
// - InconsistentGroupProtocol:
// - InvalidSessionTimeout:
// - GroupAuthorizationFailed:
func (cg *ConsumerGroup) joinGroup(conn coordinator, memberID string) (string, int32, GroupMemberAssignments, error) {
request, err := cg.makeJoinGroupRequestV1(memberID)
if err != nil {
Expand Down Expand Up @@ -1073,11 +1073,11 @@ func (cg *ConsumerGroup) makeMemberProtocolMetadata(in []joinGroupResponseMember
// Readers subscriptions topic => partitions
//
// Possible kafka error codes returned:
// * GroupCoordinatorNotAvailable:
// * NotCoordinatorForGroup:
// * IllegalGeneration:
// * RebalanceInProgress:
// * GroupAuthorizationFailed:
// - GroupCoordinatorNotAvailable:
// - NotCoordinatorForGroup:
// - IllegalGeneration:
// - RebalanceInProgress:
// - GroupAuthorizationFailed:
func (cg *ConsumerGroup) syncGroup(conn coordinator, memberID string, generationID int32, memberAssignments GroupMemberAssignments) (map[string][]int32, error) {
request := cg.makeSyncGroupRequestV0(memberID, generationID, memberAssignments)
response, err := conn.syncGroup(request)
Expand Down
6 changes: 3 additions & 3 deletions example_groupbalancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ func ExampleNewReader_rackAffinity() {
}

// findRack is the basic rack resolver strategy for use in AWS. It supports
// * ECS with the task metadata endpoint enabled (returns the container
// instance's availability zone)
// * Linux EC2 (returns the instance's availability zone)
// - ECS with the task metadata endpoint enabled (returns the container
// instance's availability zone)
// - Linux EC2 (returns the instance's availability zone)
func findRack() string {
switch whereAmI() {
case "ecs":
Expand Down
17 changes: 13 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,13 +1,22 @@
module github.com/segmentio/kafka-go

go 1.15
go 1.18

require (
github.com/klauspost/compress v1.15.9
github.com/pierrec/lz4/v4 v4.1.15
github.com/stretchr/testify v1.8.0
github.com/pierrec/lz4/v4 v4.1.19
github.com/stretchr/testify v1.8.4
github.com/xdg-go/scram v1.1.2
golang.org/x/net v0.17.0
golang.org/x/net v0.19.0
)

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/xdg-go/pbkdf2 v1.0.0 // indirect
github.com/xdg-go/stringprep v1.0.4 // indirect
golang.org/x/text v0.14.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

retract [v0.4.36, v0.4.37]
35 changes: 8 additions & 27 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,17 +1,13 @@
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/klauspost/compress v1.15.9 h1:wKRjX6JRtDdrE9qwa4b/Cip7ACOshUI4smpCQanqjSY=
github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU=
github.com/pierrec/lz4/v4 v4.1.15 h1:MO0/ucJhngq7299dKLwIMtgTfbkoSPF6AoMYDd8Q4q0=
github.com/pierrec/lz4/v4 v4.1.15/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pierrec/lz4/v4 v4.1.19 h1:tYLzDnjDXh9qIxSTKHwXwOYmm9d887Y7Y1ZkyXYHAN4=
github.com/pierrec/lz4/v4 v4.1.19/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c=
github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI=
github.com/xdg-go/scram v1.1.2 h1:FHX5I5B4i4hKRVRBCFRxq1iQRej7WO3hhBuJf+UUySY=
Expand All @@ -21,47 +17,32 @@ github.com/xdg-go/stringprep v1.0.4/go.mod h1:mPGuuIYwz7CmR2bT9j4GbQqutWS1zV24gi
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4=
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4=
golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg=
golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM=
golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE=
golang.org/x/net v0.19.0 h1:zTwKpTd2XuCqf8huc7Fo2iSy+4RHPd10s4KzeTnVr1c=
golang.org/x/net v0.19.0/go.mod h1:CfAk/cbD4CthTvqiEl8NpboMuiuOYsAr/7NOjZJtv1U=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k=
golang.org/x/term v0.8.0/go.mod h1:xPskH00ivmX89bAKVGSKKtLOWNx2+17Eiy94tnKShWo=
golang.org/x/term v0.13.0/go.mod h1:LTmsnFJwVN6bCy1rVCoS+qHT1HhALEFxKncY3WNNh4U=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ=
golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8=
golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k=
golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ=
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc=
golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
22 changes: 12 additions & 10 deletions groupbalancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,15 @@ type GroupBalancer interface {
// RangeGroupBalancer groups consumers by partition
//
// Example: 5 partitions, 2 consumers
// C0: [0, 1, 2]
// C1: [3, 4]
//
// C0: [0, 1, 2]
// C1: [3, 4]
//
// Example: 6 partitions, 3 consumers
// C0: [0, 1]
// C1: [2, 3]
// C2: [4, 5]
//
// C0: [0, 1]
// C1: [2, 3]
// C2: [4, 5]
type RangeGroupBalancer struct{}

func (r RangeGroupBalancer) ProtocolName() string {
Expand Down Expand Up @@ -92,14 +93,15 @@ func (r RangeGroupBalancer) AssignGroups(members []GroupMember, topicPartitions
// RoundrobinGroupBalancer divides partitions evenly among consumers
//
// Example: 5 partitions, 2 consumers
// C0: [0, 2, 4]
// C1: [1, 3]
//
// C0: [0, 2, 4]
// C1: [1, 3]
//
// Example: 6 partitions, 3 consumers
// C0: [0, 3]
// C1: [1, 4]
// C2: [2, 5]
//
// C0: [0, 3]
// C1: [1, 4]
// C2: [2, 5]
type RoundRobinGroupBalancer struct{}

func (r RoundRobinGroupBalancer) ProtocolName() string {
Expand Down
11 changes: 6 additions & 5 deletions logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,12 @@ type Logger interface {

// LoggerFunc is a bridge between Logger and any third party logger
// Usage:
// l := NewLogger() // some logger
// r := kafka.NewReader(kafka.ReaderConfig{
// Logger: kafka.LoggerFunc(l.Infof),
// ErrorLogger: kafka.LoggerFunc(l.Errorf),
// })
//
// l := NewLogger() // some logger
// r := kafka.NewReader(kafka.ReaderConfig{
// Logger: kafka.LoggerFunc(l.Infof),
// ErrorLogger: kafka.LoggerFunc(l.Errorf),
// })
type LoggerFunc func(string, ...interface{})

func (f LoggerFunc) Printf(msg string, args ...interface{}) { f(msg, args...) }
4 changes: 4 additions & 0 deletions reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -858,6 +858,7 @@ func TestReaderConsumerGroup(t *testing.T) {
}

for _, test := range tests {
test := test
t.Run(test.scenario, func(t *testing.T) {
// It appears that some of the tests depend on all these tests being
// run concurrently to pass... this is brittle and should be fixed
Expand All @@ -869,6 +870,7 @@ func TestReaderConsumerGroup(t *testing.T) {
defer deleteTopic(t, topic)

groupID := makeGroupID()

r := NewReader(ReaderConfig{
Brokers: []string{"localhost:9092"},
Topic: topic,
Expand Down Expand Up @@ -932,6 +934,8 @@ func testReaderConsumerGroupVerifyOffsetCommitted(t *testing.T, ctx context.Cont
t.Errorf("bad commit message: %v", err)
}

time.Sleep(5 * time.Second)

offsets := getOffsets(t, r.config)
if expected := map[int]int64{0: m.Offset + 1}; !reflect.DeepEqual(expected, offsets) {
t.Errorf("expected %v; got %v", expected, offsets)
Expand Down