diff --git a/README.md b/README.md index c8e6ef2..0b750c6 100644 --- a/README.md +++ b/README.md @@ -14,7 +14,7 @@ go get -u github.com/AccelByte/eventstream-go-sdk ### Importing ```go -eventstream "github.com/AccelByte/eventstream-go-sdk" +eventstream "github.com/AccelByte/eventstream-go-sdk/v2" ``` To create a new event stream client, use this function: diff --git a/example/main.go b/example/main.go index 822c7a6..dfccabc 100644 --- a/example/main.go +++ b/example/main.go @@ -20,7 +20,7 @@ import ( "context" "fmt" - "github.com/AccelByte/eventstream-go-sdk" + "github.com/AccelByte/eventstream-go-sdk/v2" "github.com/sirupsen/logrus" ) diff --git a/go.mod b/go.mod index 3479866..5c98d6a 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ -module github.com/AccelByte/eventstream-go-sdk +module github.com/AccelByte/eventstream-go-sdk/v2 -go 1.12 +go 1.13 require ( github.com/AccelByte/justice-input-validation-go v0.0.3 diff --git a/kafka.go b/kafka.go index f83a97e..ed87706 100644 --- a/kafka.go +++ b/kafka.go @@ -263,7 +263,7 @@ func (client *KafkaClient) Register(subscribeBuilder *SubscribeBuilder) error { } topic := constructTopic(client.prefix, subscribeBuilder.topic) - groupID := constructGroupID(subscribeBuilder.groupID) + groupID := constructGroupID(client.prefix, subscribeBuilder.groupID) isRegistered, err := client.registerSubscriber(subscribeBuilder) if err != nil { diff --git a/kafka_integration_test.go b/kafka_integration_test.go index 957bc04..fa9423b 100644 --- a/kafka_integration_test.go +++ b/kafka_integration_test.go @@ -105,6 +105,7 @@ func TestKafkaPubSubSuccess(t *testing.T) { NewSubscribe(). Topic(topicName). EventName(mockEvent.EventName). + GroupID(generateID()). Context(ctx). Callback(func(ctx context.Context, event *Event, err error) { if ctx.Err() != nil { @@ -314,6 +315,7 @@ func TestKafkaPubSubMultipleTopicSuccess(t *testing.T) { NewSubscribe(). Topic(topicName1). EventName(mockEvent.EventName). + GroupID(generateID()). Context(ctx). Callback(func(ctx context.Context, event *Event, err error) { if ctx.Err() != nil { @@ -358,6 +360,7 @@ func TestKafkaPubSubMultipleTopicSuccess(t *testing.T) { NewSubscribe(). Topic(topicName2). EventName(mockEvent.EventName). + GroupID(generateID()). Context(ctx). Callback(func(ctx context.Context, event *Event, err error) { if ctx.Err() != nil { @@ -610,7 +613,7 @@ func TestKafkaPubSubSameGroupID(t *testing.T) { ctx, done := context.WithTimeout(context.Background(), time.Duration(timeoutTest)*time.Second) defer done() - doneChan := make(chan bool, 1) + doneChan := make(chan bool, 2) client := createKafkaClient(t) @@ -761,12 +764,27 @@ func TestKafkaPubSubSameGroupID(t *testing.T) { return } - select { - case <-doneChan: - return - case <-ctx.Done(): - assert.FailNow(t, errorTimeout) + awaitDurationTimer := time.NewTimer(time.Duration(timeoutTest) * time.Second / 2) + defer awaitDurationTimer.Stop() + + doneItr := 0 +L: + for { + select { + case <-doneChan: + doneItr++ + if doneItr > 1 { + // expected to receive only one message + break L + } + case <-awaitDurationTimer.C: + // enough, let's go to check the number of triggered callbacks + break L + } } + + // only one subscriber should receive the event because there is the same subscription-group + require.Equal(t, 1, doneItr) } // nolint:funlen @@ -807,10 +825,12 @@ func TestKafkaRegisterMultipleSubscriberCallbackSuccess(t *testing.T) { Payload: mockPayload, } + groupID := generateID() err := client.Register( NewSubscribe(). Topic(topicName). EventName(mockEvent.EventName). + GroupID(groupID). Context(ctx). Callback(func(ctx context.Context, _ *Event, err error) { if ctx.Err() != nil { @@ -820,15 +840,13 @@ func TestKafkaRegisterMultipleSubscriberCallbackSuccess(t *testing.T) { assert.NoError(t, err, "there's error right before event consumed: %v", err) doneChan <- true })) - if err != nil { - assert.FailNow(t, errorSubscribe, err) - return - } + require.NoError(t, err) err = client.Register( NewSubscribe(). Topic("anotherevent"). EventName(mockEvent.EventName). + GroupID(groupID). Context(ctx). Callback(func(ctx context.Context, _ *Event, err error) { if ctx.Err() != nil { @@ -907,6 +925,7 @@ func TestKafkaUnregisterTopicSuccess(t *testing.T) { NewSubscribe(). Topic(topicName). EventName(mockEvent.EventName). + GroupID(generateID()). Context(ctx). Callback(func(ctx context.Context, _ *Event, err error) { require.NoError(t, err) @@ -926,6 +945,7 @@ func TestKafkaUnregisterTopicSuccess(t *testing.T) { NewSubscribe(). Topic("anotherevent"). EventName(mockEvent.EventName). + GroupID(generateID()). Context(subscribeCtx). Callback(func(ctx context.Context, event *Event, err error) { if ctx.Err() != nil { diff --git a/kafka_test.go b/kafka_test.go index f315a64..11b2f6b 100644 --- a/kafka_test.go +++ b/kafka_test.go @@ -303,6 +303,7 @@ func TestKafkaSubNilCallback(t *testing.T) { NewSubscribe(). Topic(topicName). EventName(mockEvent.EventName). + GroupID(generateID()). Callback(nil)) assert.Equal(t, errInvalidCallback, err, "error should be equal") diff --git a/utils.go b/utils.go index 7c850a2..374e263 100644 --- a/utils.go +++ b/utils.go @@ -34,12 +34,12 @@ func constructTopic(prefix, topic string) string { } // constructGroupID construct groupID or queue group name -func constructGroupID(groupID string) string { +func constructGroupID(prefix, groupID string) string { if groupID == "" { groupID = defaultGroupID } - return groupID + return prefix + separator + groupID } // generateID returns UUID without dash diff --git a/validation.go b/validation.go index fa41c1f..3ed7710 100644 --- a/validation.go +++ b/validation.go @@ -79,7 +79,7 @@ func validateSubscribeEvent(subscribeBuilder *SubscribeBuilder) error { subscribeEvent := struct { Topic string `valid:"required"` EventName string `valid:"alphanum,stringlength(1|256),required"` - GroupID string + GroupID string `valid:"alphanum,stringlength(1|256),required"` Callback func(ctx context.Context, event *Event, err error) }{ Topic: subscribeBuilder.topic,