Skip to content

Commit

Permalink
fix(validation): fix event validation following kafka naming convention
Browse files Browse the repository at this point in the history
  • Loading branch information
fadhillahentino committed Nov 19, 2021
1 parent 34066fb commit 1a08a4a
Show file tree
Hide file tree
Showing 6 changed files with 511 additions and 18 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ require (
github.com/cenkalti/backoff v2.2.1+incompatible
github.com/google/uuid v1.1.1
github.com/mitchellh/mapstructure v1.1.2
github.com/pkg/errors v0.9.1
github.com/segmentio/kafka-go v0.4.23
github.com/sirupsen/logrus v1.4.1
github.com/stretchr/testify v1.6.1
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ github.com/pariz/gountries v0.0.0-20171019111738-adb00f6513a3 h1:lmQNznFSupyfCDE
github.com/pariz/gountries v0.0.0-20171019111738-adb00f6513a3/go.mod h1:U0ETmPPEsfd7CpUKNMYi68xIOL8Ww4jPZlaqNngcwqs=
github.com/pierrec/lz4 v2.6.0+incompatible h1:Ix9yFKn1nSPBLFl/yZknTp8TU5G4Ps0JDmguYK6iH1A=
github.com/pierrec/lz4 v2.6.0+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/segmentio/kafka-go v0.4.23 h1:jjacNjmn1fPvkVGFs6dej98fa7UT/bYF8wZBFMMIld4=
Expand Down
165 changes: 165 additions & 0 deletions kafka_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,171 @@ func TestKafkaPubSubSuccess(t *testing.T) {
}
}

// nolint dupl
func TestKafkaNonConsumerGroupSuccess(t *testing.T) {
t.Parallel()
ctx, done := context.WithTimeout(context.Background(), time.Duration(timeoutTest)*time.Second)
defer done()

doneChan := make(chan bool, 1)

client := createKafkaClient(t)

topicName := constructTopicTest()

var mockPayload = make(map[string]interface{})
mockPayload[testPayload] = Payload{FriendID: "user456"}

mockAdditionalFields := map[string]interface{}{
"summary": "user:_failed",
}

mockEvent := &Event{
EventName: "testEvent",
Namespace: "event",
ClientID: "7d480ce0e8624b02901bd80d9ba9817c",
TraceID: "01c34ec3b07f4bfaa59ba0184a3de14d",
SpanContext: "test-span-id",
UserID: "e95b150043ff4a2c88427a6eb25e5bc8",
EventID: 3,
EventType: 301,
EventLevel: 3,
ServiceName: "test",
ClientIDs: []string{"7d480ce0e8624b02901bd80d9ba9817c"},
TargetUserIDs: []string{"1fe7f425a0e049d29d87ca3d32e45b5a"},
TargetNamespace: "publisher",
Privacy: true,
AdditionalFields: mockAdditionalFields,
Version: defaultVersion,
Key: testKey,
Payload: mockPayload,
}

err := client.Register(
NewSubscribe().
Topic(topicName).
EventName(mockEvent.EventName).
Offset(0).
Context(ctx).
Callback(func(ctx context.Context, event *Event, err error) error {
if ctx.Err() != nil {
return ctx.Err()
}

var eventPayload Payload
if err != nil {
assert.Fail(t, "error when run callback")
}
if err = mapstructure.Decode(event.Payload[testPayload], &eventPayload); err != nil {
assert.Fail(t, "unable to decode payload")
}
assert.Equal(t, mockEvent.EventName, event.EventName, "event name should be equal")
assert.Equal(t, mockEvent.Namespace, event.Namespace, "namespace should be equal")
assert.Equal(t, mockEvent.ClientID, event.ClientID, "client ID should be equal")
assert.Equal(t, mockEvent.TraceID, event.TraceID, "trace ID should be equal")
assert.Equal(t, mockEvent.SpanContext, event.SpanContext, "span context should be equal")
assert.Equal(t, mockEvent.UserID, event.UserID, "user ID should be equal")
assert.Equal(t, mockEvent.SessionID, event.SessionID, "session ID should be equal")
assert.Equal(t, mockEvent.EventID, event.EventID, "EventID should be equal")
assert.Equal(t, mockEvent.EventType, event.EventType, "EventType should be equal")
assert.Equal(t, mockEvent.EventLevel, event.EventLevel, "EventLevel should be equal")
assert.Equal(t, mockEvent.ServiceName, event.ServiceName, "ServiceName should be equal")
assert.Equal(t, mockEvent.ClientIDs, event.ClientIDs, "ClientIDs should be equal")
assert.Equal(t, mockEvent.TargetUserIDs, event.TargetUserIDs, "TargetUserIDs should be equal")
assert.Equal(t, mockEvent.TargetNamespace, event.TargetNamespace, "TargetNamespace should be equal")
assert.Equal(t, mockEvent.Privacy, event.Privacy, "Privacy should be equal")
assert.Equal(t, mockEvent.AdditionalFields, event.AdditionalFields, "AdditionalFields should be equal")
assert.Equal(t, mockEvent.Version, event.Version, "version should be equal")
assert.Equal(t, mockEvent.Key, event.Key, "key should be equal")
if validPayload := reflect.DeepEqual(mockEvent.Payload[testPayload].(Payload), eventPayload); !validPayload {
assert.Fail(t, "payload should be equal")
}
doneChan <- true

return nil
}))
require.NoError(t, err)

err = client.Register(
NewSubscribe().
Topic(topicName).
EventName(mockEvent.EventName + "Other").
Offset(0).
Context(ctx).
Callback(func(ctx context.Context, event *Event, err error) error {
if ctx.Err() != nil {
return ctx.Err()
}

var eventPayload Payload
if err != nil {
assert.Fail(t, "error when run callback")
}
if err = mapstructure.Decode(event.Payload[testPayload], &eventPayload); err != nil {
assert.Fail(t, "unable to decode payload")
}
assert.Equal(t, mockEvent.EventName, event.EventName, "event name should be equal")
assert.Equal(t, mockEvent.Namespace, event.Namespace, "namespace should be equal")
assert.Equal(t, mockEvent.ClientID, event.ClientID, "client ID should be equal")
assert.Equal(t, mockEvent.TraceID, event.TraceID, "trace ID should be equal")
assert.Equal(t, mockEvent.SpanContext, event.SpanContext, "span context should be equal")
assert.Equal(t, mockEvent.UserID, event.UserID, "user ID should be equal")
assert.Equal(t, mockEvent.SessionID, event.SessionID, "session ID should be equal")
assert.Equal(t, mockEvent.EventID, event.EventID, "EventID should be equal")
assert.Equal(t, mockEvent.EventType, event.EventType, "EventType should be equal")
assert.Equal(t, mockEvent.EventLevel, event.EventLevel, "EventLevel should be equal")
assert.Equal(t, mockEvent.ServiceName, event.ServiceName, "ServiceName should be equal")
assert.Equal(t, mockEvent.ClientIDs, event.ClientIDs, "ClientIDs should be equal")
assert.Equal(t, mockEvent.TargetUserIDs, event.TargetUserIDs, "TargetUserIDs should be equal")
assert.Equal(t, mockEvent.TargetNamespace, event.TargetNamespace, "TargetNamespace should be equal")
assert.Equal(t, mockEvent.Privacy, event.Privacy, "Privacy should be equal")
assert.Equal(t, mockEvent.AdditionalFields, event.AdditionalFields, "AdditionalFields should be equal")
assert.Equal(t, mockEvent.Version, event.Version, "version should be equal")
assert.Equal(t, mockEvent.Key, event.Key, "key should be equal")
if validPayload := reflect.DeepEqual(mockEvent.Payload[testPayload].(Payload), eventPayload); !validPayload {
assert.Fail(t, "payload should be equal")
}
doneChan <- true

return nil
}))
require.NoError(t, err)

err = client.Publish(
NewPublish().
Topic(topicName).
EventName(mockEvent.EventName).
Namespace(mockEvent.Namespace).
ClientID(mockEvent.ClientID).
UserID(mockEvent.UserID).
SessionID(mockEvent.SessionID).
TraceID(mockEvent.TraceID).
SpanContext(mockEvent.SpanContext).
Context(context.Background()).
EventID(mockEvent.EventID).
EventType(mockEvent.EventType).
EventLevel(mockEvent.EventLevel).
ServiceName(mockEvent.ServiceName).
ClientIDs(mockEvent.ClientIDs).
TargetUserIDs(mockEvent.TargetUserIDs).
TargetNamespace(mockEvent.TargetNamespace).
Privacy(mockEvent.Privacy).
AdditionalFields(mockEvent.AdditionalFields).
Key(testKey).
Payload(mockPayload))
if err != nil {
assert.FailNow(t, errorPublish, err)
return
}

select {
case <-doneChan:
return
case <-ctx.Done():
assert.FailNow(t, errorTimeout)
}
}

// nolint dupl
func TestKafkaPubFailed(t *testing.T) {
t.Parallel()
Expand Down
6 changes: 3 additions & 3 deletions kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func TestKafkaPubInvalidEventStruct(t *testing.T) {
Context(context.Background()).
Payload(mockPayload))

assert.Equal(t, errInvalidPubStruct, err, "error should be equal")
assert.Error(t, err, "error should be equal")
}

// nolint dupl
Expand Down Expand Up @@ -286,7 +286,7 @@ func TestKafkaSubWithEmptyTopic(t *testing.T) {
return nil
}))

assert.Equal(t, errInvalidSubStruct, err, "error should be equal")
assert.Error(t, err, "error should be not nil")
}

// nolint dupl
Expand All @@ -307,7 +307,7 @@ func TestKafkaSubInvalidEventStruct(t *testing.T) {
return nil
}))

assert.Equal(t, errInvalidSubStruct, err, "error should be equal")
assert.Error(t, err, "error should be not nil")
}

// nolint dupl
Expand Down
84 changes: 69 additions & 15 deletions validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,27 +18,50 @@ package eventstream

import (
"context"
"errors"
"regexp"

validator "github.com/AccelByte/justice-input-validation-go"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)

const TopicEventPattern = "^[a-zA-Z0-9]+((['_.-][a-zA-Z0-9])?[a-zA-Z0-9]*)*$"

var (
errInvalidPubStruct = errors.New("publish struct isn't valid")
errInvalidSubStruct = errors.New("subscribe struct isn't valid")
errInvalidUserID = errors.New("userID isn't valid")
errInvalidClientID = errors.New("clientID isn't valid")
errInvalidSessionID = errors.New("sessionID isn't valid")
errInvalidTraceID = errors.New("traceID isn't valid")
errInvalidCallback = errors.New("callback should not be nil")
errInvalidPubStruct = errors.New("publish struct isn't valid")
errInvalidTopicFormat = errors.New("topic format isn't valid")
errInvalidEventNameFormat = errors.New("eventname format isn't valid")
errInvalidUserID = errors.New("userID isn't valid")
errInvalidClientID = errors.New("clientID isn't valid")
errInvalidSessionID = errors.New("sessionID isn't valid")
errInvalidTraceID = errors.New("traceID isn't valid")
errInvalidCallback = errors.New("callback should not be nil")
)

// validatePublishEvent validate published event
func validatePublishEvent(publishBuilder *PublishBuilder, strictValidation bool) error {

for _, topic := range publishBuilder.topic {
if isTopicValid := validateTopicEvent(topic); !isTopicValid {
logrus.
WithField("Topic Name", topic).
WithField("Event Name", publishBuilder.eventName).
Errorf("unable to validate publisher event. error: invalid topic format")
return errInvalidTopicFormat
}
}

if isEventNameValid := validateTopicEvent(publishBuilder.eventName); !isEventNameValid {
logrus.
WithField("Topic Name", publishBuilder.topic).
WithField("Event Name", publishBuilder.eventName).
Errorf("unable to validate publisher event. error: invalid event name format")
return errInvalidEventNameFormat
}

publishEvent := struct {
Topic []string `valid:"required"`
EventName string `valid:"alphanum,stringlength(1|256),required"`
EventName string `valid:"required"`
Namespace string
ClientID string
UserID string
Expand All @@ -56,8 +79,11 @@ func validatePublishEvent(publishBuilder *PublishBuilder, strictValidation bool)

valid, err := validator.ValidateStruct(publishEvent)
if err != nil {
logrus.Errorf("unable to validate publish event. error : %v", err)
return errInvalidPubStruct
logrus.
WithField("Topic Name", publishEvent.Topic).
WithField("Event Name", publishEvent.EventName).
Errorf("unable to validate publisher event. error : %v", err)
return err
}

if !valid {
Expand Down Expand Up @@ -88,10 +114,27 @@ func validatePublishEvent(publishBuilder *PublishBuilder, strictValidation bool)

// validateSubscribeEvent validate subscribe event
func validateSubscribeEvent(subscribeBuilder *SubscribeBuilder) error {

if isTopicValid := validateTopicEvent(subscribeBuilder.topic); !isTopicValid {
logrus.
WithField("Topic Name", subscribeBuilder.topic).
WithField("Event Name", subscribeBuilder.eventName).
Errorf("unable to validate subscribe event. error: invalid topic format")
return errInvalidTopicFormat
}

if isEventNameValid := validateTopicEvent(subscribeBuilder.eventName); !isEventNameValid {
logrus.
WithField("Topic Name", subscribeBuilder.topic).
WithField("Event Name", subscribeBuilder.eventName).
Errorf("unable to validate subscribe event. error: invalid event name format")
return errInvalidEventNameFormat
}

subscribeEvent := struct {
Topic string `valid:"required"`
EventName string `valid:"alphanum,stringlength(1|256),required"`
GroupID string `valid:"alphanum,stringlength(1|256),required"`
EventName string `valid:"required"`
GroupID string
Callback func(ctx context.Context, event *Event, err error) error
}{
Topic: subscribeBuilder.topic,
Expand All @@ -102,8 +145,11 @@ func validateSubscribeEvent(subscribeBuilder *SubscribeBuilder) error {

_, err := validator.ValidateStruct(subscribeEvent)
if err != nil {
logrus.Errorf("unable to validate subscribe event. error : %v", err)
return errInvalidSubStruct
logrus.
WithField("Topic Name", subscribeBuilder.topic).
WithField("Event Name", subscribeBuilder.eventName).
Errorf("unable to validate subscribe event. error : %v", err)
return err
}

if subscribeEvent.Callback == nil {
Expand All @@ -112,3 +158,11 @@ func validateSubscribeEvent(subscribeBuilder *SubscribeBuilder) error {

return nil
}

func validateTopicEvent(value string) bool {
validRegex, err := regexp.Compile(TopicEventPattern)
if err != nil {
return false
}
return validRegex.MatchString(value)
}
Loading

0 comments on commit 1a08a4a

Please sign in to comment.