From 1a08a4a9973cf1fb682f31e9e71cabbdcb03998d Mon Sep 17 00:00:00 2001 From: fadhil Date: Fri, 19 Nov 2021 17:01:45 +0700 Subject: [PATCH] fix(validation): fix event validation following kafka naming convention --- go.mod | 1 + go.sum | 2 + kafka_integration_test.go | 165 +++++++++++++++++++++++ kafka_test.go | 6 +- validation.go | 84 +++++++++--- validation_test.go | 271 ++++++++++++++++++++++++++++++++++++++ 6 files changed, 511 insertions(+), 18 deletions(-) create mode 100644 validation_test.go diff --git a/go.mod b/go.mod index 8409419..640d061 100644 --- a/go.mod +++ b/go.mod @@ -7,6 +7,7 @@ require ( github.com/cenkalti/backoff v2.2.1+incompatible github.com/google/uuid v1.1.1 github.com/mitchellh/mapstructure v1.1.2 + github.com/pkg/errors v0.9.1 github.com/segmentio/kafka-go v0.4.23 github.com/sirupsen/logrus v1.4.1 github.com/stretchr/testify v1.6.1 diff --git a/go.sum b/go.sum index 4476cf9..de44069 100644 --- a/go.sum +++ b/go.sum @@ -34,6 +34,8 @@ github.com/pariz/gountries v0.0.0-20171019111738-adb00f6513a3 h1:lmQNznFSupyfCDE github.com/pariz/gountries v0.0.0-20171019111738-adb00f6513a3/go.mod h1:U0ETmPPEsfd7CpUKNMYi68xIOL8Ww4jPZlaqNngcwqs= github.com/pierrec/lz4 v2.6.0+incompatible h1:Ix9yFKn1nSPBLFl/yZknTp8TU5G4Ps0JDmguYK6iH1A= github.com/pierrec/lz4 v2.6.0+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/segmentio/kafka-go v0.4.23 h1:jjacNjmn1fPvkVGFs6dej98fa7UT/bYF8wZBFMMIld4= diff --git a/kafka_integration_test.go b/kafka_integration_test.go index 9904a4f..61f9e41 100644 --- a/kafka_integration_test.go +++ b/kafka_integration_test.go @@ -206,6 +206,171 @@ func TestKafkaPubSubSuccess(t *testing.T) { } } +// nolint dupl +func TestKafkaNonConsumerGroupSuccess(t *testing.T) { + t.Parallel() + ctx, done := context.WithTimeout(context.Background(), time.Duration(timeoutTest)*time.Second) + defer done() + + doneChan := make(chan bool, 1) + + client := createKafkaClient(t) + + topicName := constructTopicTest() + + var mockPayload = make(map[string]interface{}) + mockPayload[testPayload] = Payload{FriendID: "user456"} + + mockAdditionalFields := map[string]interface{}{ + "summary": "user:_failed", + } + + mockEvent := &Event{ + EventName: "testEvent", + Namespace: "event", + ClientID: "7d480ce0e8624b02901bd80d9ba9817c", + TraceID: "01c34ec3b07f4bfaa59ba0184a3de14d", + SpanContext: "test-span-id", + UserID: "e95b150043ff4a2c88427a6eb25e5bc8", + EventID: 3, + EventType: 301, + EventLevel: 3, + ServiceName: "test", + ClientIDs: []string{"7d480ce0e8624b02901bd80d9ba9817c"}, + TargetUserIDs: []string{"1fe7f425a0e049d29d87ca3d32e45b5a"}, + TargetNamespace: "publisher", + Privacy: true, + AdditionalFields: mockAdditionalFields, + Version: defaultVersion, + Key: testKey, + Payload: mockPayload, + } + + err := client.Register( + NewSubscribe(). + Topic(topicName). + EventName(mockEvent.EventName). + Offset(0). + Context(ctx). + Callback(func(ctx context.Context, event *Event, err error) error { + if ctx.Err() != nil { + return ctx.Err() + } + + var eventPayload Payload + if err != nil { + assert.Fail(t, "error when run callback") + } + if err = mapstructure.Decode(event.Payload[testPayload], &eventPayload); err != nil { + assert.Fail(t, "unable to decode payload") + } + assert.Equal(t, mockEvent.EventName, event.EventName, "event name should be equal") + assert.Equal(t, mockEvent.Namespace, event.Namespace, "namespace should be equal") + assert.Equal(t, mockEvent.ClientID, event.ClientID, "client ID should be equal") + assert.Equal(t, mockEvent.TraceID, event.TraceID, "trace ID should be equal") + assert.Equal(t, mockEvent.SpanContext, event.SpanContext, "span context should be equal") + assert.Equal(t, mockEvent.UserID, event.UserID, "user ID should be equal") + assert.Equal(t, mockEvent.SessionID, event.SessionID, "session ID should be equal") + assert.Equal(t, mockEvent.EventID, event.EventID, "EventID should be equal") + assert.Equal(t, mockEvent.EventType, event.EventType, "EventType should be equal") + assert.Equal(t, mockEvent.EventLevel, event.EventLevel, "EventLevel should be equal") + assert.Equal(t, mockEvent.ServiceName, event.ServiceName, "ServiceName should be equal") + assert.Equal(t, mockEvent.ClientIDs, event.ClientIDs, "ClientIDs should be equal") + assert.Equal(t, mockEvent.TargetUserIDs, event.TargetUserIDs, "TargetUserIDs should be equal") + assert.Equal(t, mockEvent.TargetNamespace, event.TargetNamespace, "TargetNamespace should be equal") + assert.Equal(t, mockEvent.Privacy, event.Privacy, "Privacy should be equal") + assert.Equal(t, mockEvent.AdditionalFields, event.AdditionalFields, "AdditionalFields should be equal") + assert.Equal(t, mockEvent.Version, event.Version, "version should be equal") + assert.Equal(t, mockEvent.Key, event.Key, "key should be equal") + if validPayload := reflect.DeepEqual(mockEvent.Payload[testPayload].(Payload), eventPayload); !validPayload { + assert.Fail(t, "payload should be equal") + } + doneChan <- true + + return nil + })) + require.NoError(t, err) + + err = client.Register( + NewSubscribe(). + Topic(topicName). + EventName(mockEvent.EventName + "Other"). + Offset(0). + Context(ctx). + Callback(func(ctx context.Context, event *Event, err error) error { + if ctx.Err() != nil { + return ctx.Err() + } + + var eventPayload Payload + if err != nil { + assert.Fail(t, "error when run callback") + } + if err = mapstructure.Decode(event.Payload[testPayload], &eventPayload); err != nil { + assert.Fail(t, "unable to decode payload") + } + assert.Equal(t, mockEvent.EventName, event.EventName, "event name should be equal") + assert.Equal(t, mockEvent.Namespace, event.Namespace, "namespace should be equal") + assert.Equal(t, mockEvent.ClientID, event.ClientID, "client ID should be equal") + assert.Equal(t, mockEvent.TraceID, event.TraceID, "trace ID should be equal") + assert.Equal(t, mockEvent.SpanContext, event.SpanContext, "span context should be equal") + assert.Equal(t, mockEvent.UserID, event.UserID, "user ID should be equal") + assert.Equal(t, mockEvent.SessionID, event.SessionID, "session ID should be equal") + assert.Equal(t, mockEvent.EventID, event.EventID, "EventID should be equal") + assert.Equal(t, mockEvent.EventType, event.EventType, "EventType should be equal") + assert.Equal(t, mockEvent.EventLevel, event.EventLevel, "EventLevel should be equal") + assert.Equal(t, mockEvent.ServiceName, event.ServiceName, "ServiceName should be equal") + assert.Equal(t, mockEvent.ClientIDs, event.ClientIDs, "ClientIDs should be equal") + assert.Equal(t, mockEvent.TargetUserIDs, event.TargetUserIDs, "TargetUserIDs should be equal") + assert.Equal(t, mockEvent.TargetNamespace, event.TargetNamespace, "TargetNamespace should be equal") + assert.Equal(t, mockEvent.Privacy, event.Privacy, "Privacy should be equal") + assert.Equal(t, mockEvent.AdditionalFields, event.AdditionalFields, "AdditionalFields should be equal") + assert.Equal(t, mockEvent.Version, event.Version, "version should be equal") + assert.Equal(t, mockEvent.Key, event.Key, "key should be equal") + if validPayload := reflect.DeepEqual(mockEvent.Payload[testPayload].(Payload), eventPayload); !validPayload { + assert.Fail(t, "payload should be equal") + } + doneChan <- true + + return nil + })) + require.NoError(t, err) + + err = client.Publish( + NewPublish(). + Topic(topicName). + EventName(mockEvent.EventName). + Namespace(mockEvent.Namespace). + ClientID(mockEvent.ClientID). + UserID(mockEvent.UserID). + SessionID(mockEvent.SessionID). + TraceID(mockEvent.TraceID). + SpanContext(mockEvent.SpanContext). + Context(context.Background()). + EventID(mockEvent.EventID). + EventType(mockEvent.EventType). + EventLevel(mockEvent.EventLevel). + ServiceName(mockEvent.ServiceName). + ClientIDs(mockEvent.ClientIDs). + TargetUserIDs(mockEvent.TargetUserIDs). + TargetNamespace(mockEvent.TargetNamespace). + Privacy(mockEvent.Privacy). + AdditionalFields(mockEvent.AdditionalFields). + Key(testKey). + Payload(mockPayload)) + if err != nil { + assert.FailNow(t, errorPublish, err) + return + } + + select { + case <-doneChan: + return + case <-ctx.Done(): + assert.FailNow(t, errorTimeout) + } +} + // nolint dupl func TestKafkaPubFailed(t *testing.T) { t.Parallel() diff --git a/kafka_test.go b/kafka_test.go index 8e193e1..8a17ebf 100644 --- a/kafka_test.go +++ b/kafka_test.go @@ -149,7 +149,7 @@ func TestKafkaPubInvalidEventStruct(t *testing.T) { Context(context.Background()). Payload(mockPayload)) - assert.Equal(t, errInvalidPubStruct, err, "error should be equal") + assert.Error(t, err, "error should be equal") } // nolint dupl @@ -286,7 +286,7 @@ func TestKafkaSubWithEmptyTopic(t *testing.T) { return nil })) - assert.Equal(t, errInvalidSubStruct, err, "error should be equal") + assert.Error(t, err, "error should be not nil") } // nolint dupl @@ -307,7 +307,7 @@ func TestKafkaSubInvalidEventStruct(t *testing.T) { return nil })) - assert.Equal(t, errInvalidSubStruct, err, "error should be equal") + assert.Error(t, err, "error should be not nil") } // nolint dupl diff --git a/validation.go b/validation.go index 9930bfa..5b27823 100644 --- a/validation.go +++ b/validation.go @@ -18,27 +18,50 @@ package eventstream import ( "context" - "errors" + "regexp" validator "github.com/AccelByte/justice-input-validation-go" + "github.com/pkg/errors" "github.com/sirupsen/logrus" ) +const TopicEventPattern = "^[a-zA-Z0-9]+((['_.-][a-zA-Z0-9])?[a-zA-Z0-9]*)*$" + var ( - errInvalidPubStruct = errors.New("publish struct isn't valid") - errInvalidSubStruct = errors.New("subscribe struct isn't valid") - errInvalidUserID = errors.New("userID isn't valid") - errInvalidClientID = errors.New("clientID isn't valid") - errInvalidSessionID = errors.New("sessionID isn't valid") - errInvalidTraceID = errors.New("traceID isn't valid") - errInvalidCallback = errors.New("callback should not be nil") + errInvalidPubStruct = errors.New("publish struct isn't valid") + errInvalidTopicFormat = errors.New("topic format isn't valid") + errInvalidEventNameFormat = errors.New("eventname format isn't valid") + errInvalidUserID = errors.New("userID isn't valid") + errInvalidClientID = errors.New("clientID isn't valid") + errInvalidSessionID = errors.New("sessionID isn't valid") + errInvalidTraceID = errors.New("traceID isn't valid") + errInvalidCallback = errors.New("callback should not be nil") ) // validatePublishEvent validate published event func validatePublishEvent(publishBuilder *PublishBuilder, strictValidation bool) error { + + for _, topic := range publishBuilder.topic { + if isTopicValid := validateTopicEvent(topic); !isTopicValid { + logrus. + WithField("Topic Name", topic). + WithField("Event Name", publishBuilder.eventName). + Errorf("unable to validate publisher event. error: invalid topic format") + return errInvalidTopicFormat + } + } + + if isEventNameValid := validateTopicEvent(publishBuilder.eventName); !isEventNameValid { + logrus. + WithField("Topic Name", publishBuilder.topic). + WithField("Event Name", publishBuilder.eventName). + Errorf("unable to validate publisher event. error: invalid event name format") + return errInvalidEventNameFormat + } + publishEvent := struct { Topic []string `valid:"required"` - EventName string `valid:"alphanum,stringlength(1|256),required"` + EventName string `valid:"required"` Namespace string ClientID string UserID string @@ -56,8 +79,11 @@ func validatePublishEvent(publishBuilder *PublishBuilder, strictValidation bool) valid, err := validator.ValidateStruct(publishEvent) if err != nil { - logrus.Errorf("unable to validate publish event. error : %v", err) - return errInvalidPubStruct + logrus. + WithField("Topic Name", publishEvent.Topic). + WithField("Event Name", publishEvent.EventName). + Errorf("unable to validate publisher event. error : %v", err) + return err } if !valid { @@ -88,10 +114,27 @@ func validatePublishEvent(publishBuilder *PublishBuilder, strictValidation bool) // validateSubscribeEvent validate subscribe event func validateSubscribeEvent(subscribeBuilder *SubscribeBuilder) error { + + if isTopicValid := validateTopicEvent(subscribeBuilder.topic); !isTopicValid { + logrus. + WithField("Topic Name", subscribeBuilder.topic). + WithField("Event Name", subscribeBuilder.eventName). + Errorf("unable to validate subscribe event. error: invalid topic format") + return errInvalidTopicFormat + } + + if isEventNameValid := validateTopicEvent(subscribeBuilder.eventName); !isEventNameValid { + logrus. + WithField("Topic Name", subscribeBuilder.topic). + WithField("Event Name", subscribeBuilder.eventName). + Errorf("unable to validate subscribe event. error: invalid event name format") + return errInvalidEventNameFormat + } + subscribeEvent := struct { Topic string `valid:"required"` - EventName string `valid:"alphanum,stringlength(1|256),required"` - GroupID string `valid:"alphanum,stringlength(1|256),required"` + EventName string `valid:"required"` + GroupID string Callback func(ctx context.Context, event *Event, err error) error }{ Topic: subscribeBuilder.topic, @@ -102,8 +145,11 @@ func validateSubscribeEvent(subscribeBuilder *SubscribeBuilder) error { _, err := validator.ValidateStruct(subscribeEvent) if err != nil { - logrus.Errorf("unable to validate subscribe event. error : %v", err) - return errInvalidSubStruct + logrus. + WithField("Topic Name", subscribeBuilder.topic). + WithField("Event Name", subscribeBuilder.eventName). + Errorf("unable to validate subscribe event. error : %v", err) + return err } if subscribeEvent.Callback == nil { @@ -112,3 +158,11 @@ func validateSubscribeEvent(subscribeBuilder *SubscribeBuilder) error { return nil } + +func validateTopicEvent(value string) bool { + validRegex, err := regexp.Compile(TopicEventPattern) + if err != nil { + return false + } + return validRegex.MatchString(value) +} diff --git a/validation_test.go b/validation_test.go new file mode 100644 index 0000000..c9da6bc --- /dev/null +++ b/validation_test.go @@ -0,0 +1,271 @@ +/* + * Copyright (c) 2021 AccelByte Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and limitations under the License. + * + */ + +package eventstream + +import ( + "context" + "fmt" + "testing" + + "github.com/stretchr/testify/assert" +) + +// nolint dupl +func TestValidatePublishEvent(t *testing.T) { + testCases := []struct { + input *PublishBuilder + expected bool + }{ + { + input: &PublishBuilder{ + topic: []string{"topic1"}, + eventName: "event1", + }, + expected: true, + }, + { + input: &PublishBuilder{ + topic: []string{"Topic-1"}, + eventName: "Event-1", + }, + expected: true, + }, + { + input: &PublishBuilder{ + topic: []string{"accelbyte.dev.topic-123"}, + eventName: "event-123", + }, + expected: true, + }, + { + input: &PublishBuilder{ + topic: []string{"accelbyte.dev.topic-123", "accelbyte.dev.topic-456"}, + eventName: "event.456", + }, + expected: true, + }, + { + input: &PublishBuilder{ + topic: []string{"accelbyte-dev-topic-123", "accelbyte-dev-topic-456"}, + eventName: "event-456", + }, + expected: true, + }, + { + input: &PublishBuilder{ + topic: []string{"accelbyte_dev_topic_123", "accelbyte_dev_topic_456"}, + eventName: "event_456", + }, + expected: true, + }, + { + input: &PublishBuilder{ + topic: []string{"accelbyte.dev.topic-123", "accelbyte.dev.topic-"}, + eventName: "event.456", + }, + expected: false, + }, + { + input: &PublishBuilder{ + topic: []string{"accelbyte.dev.topic-123", "accelbyte.dev.topic-!@#"}, + eventName: "event.456", + }, + expected: false, + }, + { + input: &PublishBuilder{ + topic: []string{"accelbyte.dev.topic-123", "accelbyte.dev.topic-456"}, + eventName: "event.!@#", + }, + expected: false, + }, + { + input: &PublishBuilder{ + topic: []string{"accelbyte.dev.topic-123", "accelbyte.dev.topic-456"}, + eventName: "event-", + }, + expected: false, + }, + { + input: &PublishBuilder{ + topic: []string{"accelbyte.dev.topic-!@#", "accelbyte.dev.topic-456"}, + eventName: "event%", + }, + expected: false, + }, + } + + for _, testCase := range testCases { + if testCase.expected { + assert.Nil(t, validatePublishEvent(testCase.input, false), + fmt.Sprintf("publisher event isn't correct. topic: %s, event: %s", testCase.input.topic, + testCase.input.eventName)) + } else { + assert.Error(t, validatePublishEvent(testCase.input, false), + fmt.Sprintf("publisher event isn't correct. topic: %s, event: %s", testCase.input.topic, + testCase.input.eventName)) + } + } +} + +// nolint dupl +func TestValidateSubscriberEvent(t *testing.T) { + + callbackFunc := func(ctx context.Context, event *Event, err error) error { + return nil + } + + testCases := []struct { + input *SubscribeBuilder + expected bool + }{ + { + input: &SubscribeBuilder{ + topic: "topic1", + eventName: "event1", + callback: callbackFunc, + }, + expected: true, + }, + { + input: &SubscribeBuilder{ + topic: "Topic-1", + eventName: "Event-1", + callback: callbackFunc, + }, + expected: true, + }, + { + input: &SubscribeBuilder{ + topic: "Topic_1", + eventName: "Event_1", + callback: callbackFunc, + }, + expected: true, + }, + { + input: &SubscribeBuilder{ + topic: "accelbyte.dev.topic-123", + eventName: "event-123", + callback: callbackFunc, + }, + expected: true, + }, + { + input: &SubscribeBuilder{ + topic: "accelbyte.dev.topic-123", + eventName: "event.456", + callback: callbackFunc, + }, + expected: true, + }, + { + input: &SubscribeBuilder{ + topic: "accelbyte.dev.topic-", + eventName: "event.456", + callback: callbackFunc, + }, + expected: false, + }, + { + input: &SubscribeBuilder{ + topic: "accelbyte.dev.topic-!@#", + eventName: "event.456", + callback: callbackFunc, + }, + expected: false, + }, + { + input: &SubscribeBuilder{ + topic: "accelbyte.dev.topic-123", + eventName: "event.!@#", + callback: callbackFunc, + }, + expected: false, + }, + { + input: &SubscribeBuilder{ + topic: "accelbyte.dev.topic-123", + eventName: "event-", + callback: callbackFunc, + }, + expected: false, + }, + { + input: &SubscribeBuilder{ + topic: "accelbyte.dev.topic-!@#", + eventName: "event%", + callback: callbackFunc, + }, + expected: false, + }, + { + input: &SubscribeBuilder{ + topic: "accelbyte.dev.topic-123", + eventName: "event-123", + callback: nil, + }, + expected: false, + }, + } + + for _, testCase := range testCases { + if testCase.expected { + assert.Nil(t, validateSubscribeEvent(testCase.input), + fmt.Sprintf("publisher event isn't correct. topic: %s, event: %s", testCase.input.topic, + testCase.input.eventName)) + } else { + assert.Error(t, validateSubscribeEvent(testCase.input), + fmt.Sprintf("publisher event isn't correct. topic: %s, event: %s", testCase.input.topic, + testCase.input.eventName)) + } + } +} + +func TestValidateTopicEvent(t *testing.T) { + testCases := []struct { + input string + expected bool + }{ + {input: "topic", expected: true}, + {input: "topic123", expected: true}, + {input: "topic_123", expected: true}, + {input: "accelbyte.dev.topic", expected: true}, + {input: "3671efef-876b-4133-897d-79134e936cd9", expected: true}, + {input: "3671efef876b4133897d79134e936cd9", expected: true}, + {input: "accelbyte.dev.topic-123", expected: true}, + {input: "accelbyte.dev.topic-abc", expected: true}, + {input: "accelbyte.dev.topic_123", expected: true}, + {input: "accelbyte.dev.topic_abx", expected: true}, + {input: "accelbyte.dev.topic.123", expected: true}, + {input: "accelbyte.dev.topic.abc", expected: true}, + {input: "topic-", expected: false}, + {input: "topic!@#", expected: false}, + {input: "topic!@#", expected: false}, + {input: "accelbyte.dev.topic.", expected: false}, + {input: "accelbyte.dev.topic-", expected: false}, + {input: "accelbyte.dev.topic_", expected: false}, + {input: "accelbyte.dev.topic-!", expected: false}, + {input: "accelbyte.dev.topic-#", expected: false}, + {input: "accelbyte.dev.topic-1,", expected: false}, + } + + for _, testCase := range testCases { + assert.Equal(t, testCase.expected, validateTopicEvent(testCase.input), + "topic or event name validation isn't correct") + } +}