diff --git a/.circleci/config.yml b/.circleci/config.yml index 25fad54e3..a87c81259 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -2,7 +2,7 @@ version: 2 jobs: lint: docker: - - image: golangci/golangci-lint:v1.45-alpine + - image: golangci/golangci-lint:v1.53-alpine steps: - checkout - run: golangci-lint run @@ -10,11 +10,11 @@ jobs: # The kafka 0.10 tests are maintained as a separate configuration because # kafka only supported plain text SASL in this version. kafka-010: - working_directory: &working_directory /go/src/github.com/segmentio/kafka-go + working_directory: &working_directory /home/circleci/src/github.com/segmentio/kafka-go environment: KAFKA_VERSION: "0.10.1" docker: - - image: circleci/golang + - image: cimg/go:1.18 - image: wurstmeister/zookeeper ports: - 2181:2181 @@ -40,12 +40,12 @@ jobs: steps: &steps - checkout - restore_cache: - key: kafka-go-mod-{{ checksum "go.sum" }}-1 + key: kafka-go-mod-v3-{{ checksum "go.sum" }}-1 - run: name: Download dependencies command: go mod download - save_cache: - key: kafka-go-mod-{{ checksum "go.sum" }}-1 + key: kafka-go-mod-v3-{{ checksum "go.sum" }}-1 paths: - /go/pkg/mod - run: @@ -67,7 +67,7 @@ jobs: environment: KAFKA_VERSION: "0.11.0" docker: - - image: circleci/golang + - image: cimg/go:1.18 - image: wurstmeister/zookeeper ports: - 2181:2181 @@ -99,7 +99,7 @@ jobs: environment: KAFKA_VERSION: "1.0.1" docker: - - image: circleci/golang + - image: cimg/go:1.18 - image: wurstmeister/zookeeper ports: - 2181:2181 @@ -115,7 +115,7 @@ jobs: environment: KAFKA_VERSION: "1.1.1" docker: - - image: circleci/golang + - image: cimg/go:1.18 - image: wurstmeister/zookeeper ports: - 2181:2181 @@ -131,7 +131,7 @@ jobs: environment: KAFKA_VERSION: "2.0.1" docker: - - image: circleci/golang + - image: cimg/go:1.18 - image: wurstmeister/zookeeper ports: - 2181:2181 @@ -150,7 +150,7 @@ jobs: environment: KAFKA_VERSION: "2.1.1" docker: - - image: circleci/golang + - image: cimg/go:1.18 - image: wurstmeister/zookeeper ports: - 2181:2181 @@ -176,7 +176,7 @@ jobs: environment: KAFKA_VERSION: "2.2.2" docker: - - image: circleci/golang + - image: cimg/go:1.18 - image: wurstmeister/zookeeper ports: - 2181:2181 @@ -193,7 +193,7 @@ jobs: environment: KAFKA_VERSION: "2.3.1" docker: - - image: circleci/golang + - image: cimg/go:1.18 - image: wurstmeister/zookeeper ports: - 2181:2181 @@ -221,7 +221,7 @@ jobs: # TODO: Figure out why these are happening and fix them (they don't appear to be new). KAFKA_SKIP_NETTEST: "1" docker: - - image: circleci/golang + - image: cimg/go:1.18 - image: wurstmeister/zookeeper ports: - 2181:2181 @@ -249,7 +249,7 @@ jobs: # TODO: Figure out why these are happening and fix them (they don't appear to be new). KAFKA_SKIP_NETTEST: "1" docker: - - image: circleci/golang + - image: cimg/go:1.18 - image: wurstmeister/zookeeper ports: - 2181:2181 @@ -279,7 +279,7 @@ jobs: # TODO: Figure out why these are happening and fix them (they don't appear to be new). KAFKA_SKIP_NETTEST: "1" docker: - - image: circleci/golang + - image: cimg/go:1.18 - image: wurstmeister/zookeeper ports: - 2181:2181 diff --git a/compress/snappy/go-xerial-snappy/fuzz.go b/compress/snappy/go-xerial-snappy/fuzz.go index 6a46f4784..cd9307584 100644 --- a/compress/snappy/go-xerial-snappy/fuzz.go +++ b/compress/snappy/go-xerial-snappy/fuzz.go @@ -1,3 +1,4 @@ +//go:build gofuzz // +build gofuzz package snappy diff --git a/consumergroup.go b/consumergroup.go index b9d0a7e2e..fb8215ef3 100644 --- a/consumergroup.go +++ b/consumergroup.go @@ -925,12 +925,12 @@ func (cg *ConsumerGroup) coordinator() (coordinator, error) { // the leader. Otherwise, GroupMemberAssignments will be nil. // // Possible kafka error codes returned: -// * GroupLoadInProgress: -// * GroupCoordinatorNotAvailable: -// * NotCoordinatorForGroup: -// * InconsistentGroupProtocol: -// * InvalidSessionTimeout: -// * GroupAuthorizationFailed: +// - GroupLoadInProgress: +// - GroupCoordinatorNotAvailable: +// - NotCoordinatorForGroup: +// - InconsistentGroupProtocol: +// - InvalidSessionTimeout: +// - GroupAuthorizationFailed: func (cg *ConsumerGroup) joinGroup(conn coordinator, memberID string) (string, int32, GroupMemberAssignments, error) { request, err := cg.makeJoinGroupRequestV1(memberID) if err != nil { @@ -1073,11 +1073,11 @@ func (cg *ConsumerGroup) makeMemberProtocolMetadata(in []joinGroupResponseMember // Readers subscriptions topic => partitions // // Possible kafka error codes returned: -// * GroupCoordinatorNotAvailable: -// * NotCoordinatorForGroup: -// * IllegalGeneration: -// * RebalanceInProgress: -// * GroupAuthorizationFailed: +// - GroupCoordinatorNotAvailable: +// - NotCoordinatorForGroup: +// - IllegalGeneration: +// - RebalanceInProgress: +// - GroupAuthorizationFailed: func (cg *ConsumerGroup) syncGroup(conn coordinator, memberID string, generationID int32, memberAssignments GroupMemberAssignments) (map[string][]int32, error) { request := cg.makeSyncGroupRequestV0(memberID, generationID, memberAssignments) response, err := conn.syncGroup(request) diff --git a/example_groupbalancer_test.go b/example_groupbalancer_test.go index e81cb4943..be24d1eb5 100644 --- a/example_groupbalancer_test.go +++ b/example_groupbalancer_test.go @@ -31,9 +31,9 @@ func ExampleNewReader_rackAffinity() { } // findRack is the basic rack resolver strategy for use in AWS. It supports -// * ECS with the task metadata endpoint enabled (returns the container -// instance's availability zone) -// * Linux EC2 (returns the instance's availability zone) +// - ECS with the task metadata endpoint enabled (returns the container +// instance's availability zone) +// - Linux EC2 (returns the instance's availability zone) func findRack() string { switch whereAmI() { case "ecs": diff --git a/go.mod b/go.mod index d16e1ae78..712e97837 100644 --- a/go.mod +++ b/go.mod @@ -1,13 +1,22 @@ module github.com/segmentio/kafka-go -go 1.15 +go 1.18 require ( github.com/klauspost/compress v1.15.9 - github.com/pierrec/lz4/v4 v4.1.15 - github.com/stretchr/testify v1.8.0 + github.com/pierrec/lz4/v4 v4.1.19 + github.com/stretchr/testify v1.8.4 github.com/xdg-go/scram v1.1.2 - golang.org/x/net v0.17.0 + golang.org/x/net v0.19.0 +) + +require ( + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/xdg-go/pbkdf2 v1.0.0 // indirect + github.com/xdg-go/stringprep v1.0.4 // indirect + golang.org/x/text v0.14.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect ) retract [v0.4.36, v0.4.37] diff --git a/go.sum b/go.sum index 440b00f6d..7df18fca9 100644 --- a/go.sum +++ b/go.sum @@ -1,17 +1,13 @@ -github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/klauspost/compress v1.15.9 h1:wKRjX6JRtDdrE9qwa4b/Cip7ACOshUI4smpCQanqjSY= github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU= -github.com/pierrec/lz4/v4 v4.1.15 h1:MO0/ucJhngq7299dKLwIMtgTfbkoSPF6AoMYDd8Q4q0= -github.com/pierrec/lz4/v4 v4.1.15/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= +github.com/pierrec/lz4/v4 v4.1.19 h1:tYLzDnjDXh9qIxSTKHwXwOYmm9d887Y7Y1ZkyXYHAN4= +github.com/pierrec/lz4/v4 v4.1.19/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= -github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= -github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk= -github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= +github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c= github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI= github.com/xdg-go/scram v1.1.2 h1:FHX5I5B4i4hKRVRBCFRxq1iQRej7WO3hhBuJf+UUySY= @@ -21,47 +17,32 @@ github.com/xdg-go/stringprep v1.0.4/go.mod h1:mPGuuIYwz7CmR2bT9j4GbQqutWS1zV24gi github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= -golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4= golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= -golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= -golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= -golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= -golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM= -golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE= +golang.org/x/net v0.19.0 h1:zTwKpTd2XuCqf8huc7Fo2iSy+4RHPd10s4KzeTnVr1c= +golang.org/x/net v0.19.0/go.mod h1:CfAk/cbD4CthTvqiEl8NpboMuiuOYsAr/7NOjZJtv1U= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= -golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= -golang.org/x/term v0.8.0/go.mod h1:xPskH00ivmX89bAKVGSKKtLOWNx2+17Eiy94tnKShWo= -golang.org/x/term v0.13.0/go.mod h1:LTmsnFJwVN6bCy1rVCoS+qHT1HhALEFxKncY3WNNh4U= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ= -golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= -golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= -golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k= -golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= +golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= +golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= -golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/groupbalancer.go b/groupbalancer.go index 9491bc501..61741b730 100644 --- a/groupbalancer.go +++ b/groupbalancer.go @@ -41,14 +41,15 @@ type GroupBalancer interface { // RangeGroupBalancer groups consumers by partition // // Example: 5 partitions, 2 consumers -// C0: [0, 1, 2] -// C1: [3, 4] +// +// C0: [0, 1, 2] +// C1: [3, 4] // // Example: 6 partitions, 3 consumers -// C0: [0, 1] -// C1: [2, 3] -// C2: [4, 5] // +// C0: [0, 1] +// C1: [2, 3] +// C2: [4, 5] type RangeGroupBalancer struct{} func (r RangeGroupBalancer) ProtocolName() string { @@ -92,14 +93,15 @@ func (r RangeGroupBalancer) AssignGroups(members []GroupMember, topicPartitions // RoundrobinGroupBalancer divides partitions evenly among consumers // // Example: 5 partitions, 2 consumers -// C0: [0, 2, 4] -// C1: [1, 3] +// +// C0: [0, 2, 4] +// C1: [1, 3] // // Example: 6 partitions, 3 consumers -// C0: [0, 3] -// C1: [1, 4] -// C2: [2, 5] // +// C0: [0, 3] +// C1: [1, 4] +// C2: [2, 5] type RoundRobinGroupBalancer struct{} func (r RoundRobinGroupBalancer) ProtocolName() string { diff --git a/logger.go b/logger.go index d359ab789..4f77fcf0c 100644 --- a/logger.go +++ b/logger.go @@ -7,11 +7,12 @@ type Logger interface { // LoggerFunc is a bridge between Logger and any third party logger // Usage: -// l := NewLogger() // some logger -// r := kafka.NewReader(kafka.ReaderConfig{ -// Logger: kafka.LoggerFunc(l.Infof), -// ErrorLogger: kafka.LoggerFunc(l.Errorf), -// }) +// +// l := NewLogger() // some logger +// r := kafka.NewReader(kafka.ReaderConfig{ +// Logger: kafka.LoggerFunc(l.Infof), +// ErrorLogger: kafka.LoggerFunc(l.Errorf), +// }) type LoggerFunc func(string, ...interface{}) func (f LoggerFunc) Printf(msg string, args ...interface{}) { f(msg, args...) } diff --git a/reader_test.go b/reader_test.go index f413d7429..9291d62e7 100644 --- a/reader_test.go +++ b/reader_test.go @@ -858,6 +858,7 @@ func TestReaderConsumerGroup(t *testing.T) { } for _, test := range tests { + test := test t.Run(test.scenario, func(t *testing.T) { // It appears that some of the tests depend on all these tests being // run concurrently to pass... this is brittle and should be fixed @@ -869,6 +870,7 @@ func TestReaderConsumerGroup(t *testing.T) { defer deleteTopic(t, topic) groupID := makeGroupID() + r := NewReader(ReaderConfig{ Brokers: []string{"localhost:9092"}, Topic: topic, @@ -932,6 +934,8 @@ func testReaderConsumerGroupVerifyOffsetCommitted(t *testing.T, ctx context.Cont t.Errorf("bad commit message: %v", err) } + time.Sleep(5 * time.Second) + offsets := getOffsets(t, r.config) if expected := map[int]int64{0: m.Offset + 1}; !reflect.DeepEqual(expected, offsets) { t.Errorf("expected %v; got %v", expected, offsets)