Skip to content

Commit

Permalink
Merge pull request #18 from IAD/JUS-1604-uniq-consumer-group-per-service
Browse files Browse the repository at this point in the history
JUS-1604 auto-add prefix for consumer-groups to avoid collision between services
  • Loading branch information
fadhillahentino authored Apr 3, 2020
2 parents b20b103 + 8e25b52 commit b7e2902
Show file tree
Hide file tree
Showing 8 changed files with 39 additions and 18 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ go get -u github.com/AccelByte/eventstream-go-sdk
### Importing

```go
eventstream "github.com/AccelByte/eventstream-go-sdk"
eventstream "github.com/AccelByte/eventstream-go-sdk/v2"
```

To create a new event stream client, use this function:
Expand Down
2 changes: 1 addition & 1 deletion example/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
"context"
"fmt"

"github.com/AccelByte/eventstream-go-sdk"
"github.com/AccelByte/eventstream-go-sdk/v2"
"github.com/sirupsen/logrus"
)

Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/AccelByte/eventstream-go-sdk
module github.com/AccelByte/eventstream-go-sdk/v2

go 1.12
go 1.13

require (
github.com/AccelByte/justice-input-validation-go v0.0.3
Expand Down
2 changes: 1 addition & 1 deletion kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ func (client *KafkaClient) Register(subscribeBuilder *SubscribeBuilder) error {
}

topic := constructTopic(client.prefix, subscribeBuilder.topic)
groupID := constructGroupID(subscribeBuilder.groupID)
groupID := constructGroupID(client.prefix, subscribeBuilder.groupID)

isRegistered, err := client.registerSubscriber(subscribeBuilder)
if err != nil {
Expand Down
40 changes: 30 additions & 10 deletions kafka_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ func TestKafkaPubSubSuccess(t *testing.T) {
NewSubscribe().
Topic(topicName).
EventName(mockEvent.EventName).
GroupID(generateID()).
Context(ctx).
Callback(func(ctx context.Context, event *Event, err error) {
if ctx.Err() != nil {
Expand Down Expand Up @@ -314,6 +315,7 @@ func TestKafkaPubSubMultipleTopicSuccess(t *testing.T) {
NewSubscribe().
Topic(topicName1).
EventName(mockEvent.EventName).
GroupID(generateID()).
Context(ctx).
Callback(func(ctx context.Context, event *Event, err error) {
if ctx.Err() != nil {
Expand Down Expand Up @@ -358,6 +360,7 @@ func TestKafkaPubSubMultipleTopicSuccess(t *testing.T) {
NewSubscribe().
Topic(topicName2).
EventName(mockEvent.EventName).
GroupID(generateID()).
Context(ctx).
Callback(func(ctx context.Context, event *Event, err error) {
if ctx.Err() != nil {
Expand Down Expand Up @@ -610,7 +613,7 @@ func TestKafkaPubSubSameGroupID(t *testing.T) {
ctx, done := context.WithTimeout(context.Background(), time.Duration(timeoutTest)*time.Second)
defer done()

doneChan := make(chan bool, 1)
doneChan := make(chan bool, 2)

client := createKafkaClient(t)

Expand Down Expand Up @@ -761,12 +764,27 @@ func TestKafkaPubSubSameGroupID(t *testing.T) {
return
}

select {
case <-doneChan:
return
case <-ctx.Done():
assert.FailNow(t, errorTimeout)
awaitDurationTimer := time.NewTimer(time.Duration(timeoutTest) * time.Second / 2)
defer awaitDurationTimer.Stop()

doneItr := 0
L:
for {
select {
case <-doneChan:
doneItr++
if doneItr > 1 {
// expected to receive only one message
break L
}
case <-awaitDurationTimer.C:
// enough, let's go to check the number of triggered callbacks
break L
}
}

// only one subscriber should receive the event because there is the same subscription-group
require.Equal(t, 1, doneItr)
}

// nolint:funlen
Expand Down Expand Up @@ -807,10 +825,12 @@ func TestKafkaRegisterMultipleSubscriberCallbackSuccess(t *testing.T) {
Payload: mockPayload,
}

groupID := generateID()
err := client.Register(
NewSubscribe().
Topic(topicName).
EventName(mockEvent.EventName).
GroupID(groupID).
Context(ctx).
Callback(func(ctx context.Context, _ *Event, err error) {
if ctx.Err() != nil {
Expand All @@ -820,15 +840,13 @@ func TestKafkaRegisterMultipleSubscriberCallbackSuccess(t *testing.T) {
assert.NoError(t, err, "there's error right before event consumed: %v", err)
doneChan <- true
}))
if err != nil {
assert.FailNow(t, errorSubscribe, err)
return
}
require.NoError(t, err)

err = client.Register(
NewSubscribe().
Topic("anotherevent").
EventName(mockEvent.EventName).
GroupID(groupID).
Context(ctx).
Callback(func(ctx context.Context, _ *Event, err error) {
if ctx.Err() != nil {
Expand Down Expand Up @@ -907,6 +925,7 @@ func TestKafkaUnregisterTopicSuccess(t *testing.T) {
NewSubscribe().
Topic(topicName).
EventName(mockEvent.EventName).
GroupID(generateID()).
Context(ctx).
Callback(func(ctx context.Context, _ *Event, err error) {
require.NoError(t, err)
Expand All @@ -926,6 +945,7 @@ func TestKafkaUnregisterTopicSuccess(t *testing.T) {
NewSubscribe().
Topic("anotherevent").
EventName(mockEvent.EventName).
GroupID(generateID()).
Context(subscribeCtx).
Callback(func(ctx context.Context, event *Event, err error) {
if ctx.Err() != nil {
Expand Down
1 change: 1 addition & 0 deletions kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,7 @@ func TestKafkaSubNilCallback(t *testing.T) {
NewSubscribe().
Topic(topicName).
EventName(mockEvent.EventName).
GroupID(generateID()).
Callback(nil))

assert.Equal(t, errInvalidCallback, err, "error should be equal")
Expand Down
4 changes: 2 additions & 2 deletions utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,12 @@ func constructTopic(prefix, topic string) string {
}

// constructGroupID construct groupID or queue group name
func constructGroupID(groupID string) string {
func constructGroupID(prefix, groupID string) string {
if groupID == "" {
groupID = defaultGroupID
}

return groupID
return prefix + separator + groupID
}

// generateID returns UUID without dash
Expand Down
2 changes: 1 addition & 1 deletion validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func validateSubscribeEvent(subscribeBuilder *SubscribeBuilder) error {
subscribeEvent := struct {
Topic string `valid:"required"`
EventName string `valid:"alphanum,stringlength(1|256),required"`
GroupID string
GroupID string `valid:"alphanum,stringlength(1|256),required"`
Callback func(ctx context.Context, event *Event, err error)
}{
Topic: subscribeBuilder.topic,
Expand Down

0 comments on commit b7e2902

Please sign in to comment.