Skip to content

Commit

Permalink
feat: initial group connect tests
Browse files Browse the repository at this point in the history
  • Loading branch information
jeroenrinzema committed Dec 2, 2019
1 parent cd949bb commit f33a739
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 0 deletions.
25 changes: 25 additions & 0 deletions dialects/kafka/consumer/group_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package consumer

import (
"testing"

"github.com/Shopify/sarama"
)

func TestGroupHandleConnect(t *testing.T) {
client, group, topics, broker, _ := NewMockClient(t)
defer broker.Close()

handle := NewGroupHandle(client)
cluster, err := sarama.NewClient([]string{broker.Addr()}, NewMockConfig())
if err != nil {
t.Fatal(err)
}

go handle.Setup(nil)

err = handle.Connect(cluster, topics, group)
if err != nil {
t.Fatal(err)
}
}
51 changes: 51 additions & 0 deletions dialects/kafka/consumer/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package consumer

import (
"testing"

"github.com/Shopify/sarama"
)

// NewMockClient constructs a new mock client using the default configurations.
// The constructed client, topics, broker and responder are returned to be used inside the test case.
func NewMockClient(t *testing.T) (client *Client, group string, topics []string, broker *sarama.MockBroker, responder *sarama.MockFetchResponse) {
topic := "mock"
broker, responder = NewMockBroker(t, 0, topic)

topics = []string{topic}
brokers := []string{broker.Addr()}
group = "mock"

client = NewClient(brokers, group)

return client, group, topics, broker, responder
}

// NewMockBroker constructs a new sarama mock broker.
// The constructed mock broker and responder are returned which could be used for further configuration.
// A partition is created for the given topic. The broker is assigned as leader for the given topic + partition.
func NewMockBroker(t *testing.T, partition int32, topic string) (*sarama.MockBroker, *sarama.MockFetchResponse) {
broker := sarama.NewMockBroker(t, 0)
responder := sarama.NewMockFetchResponse(t, 1)

broker.SetHandlerByMap(map[string]sarama.MockResponse{
"MetadataRequest": sarama.NewMockMetadataResponse(t).
SetBroker(broker.Addr(), broker.BrokerID()).
SetLeader(topic, partition, broker.BrokerID()),
"OffsetRequest": sarama.NewMockOffsetResponse(t).
SetOffset(topic, partition, sarama.OffsetOldest, 0).
SetOffset(topic, partition, sarama.OffsetNewest, 1000),
"FetchRequest": responder,
})

return broker, responder
}

// NewMockConfig constructs a new predefined Sarama mock configuration.
// This configuration contains predefined values such as the cluster version.
func NewMockConfig() *sarama.Config {
config := sarama.NewConfig()
config.Version = sarama.V0_10_2_0

return config
}

0 comments on commit f33a739

Please sign in to comment.