Skip to content

Commit

Permalink
4224 fix: [GOLIUM][TECHDEBT] Check rabbitmq step to filter messages b…
Browse files Browse the repository at this point in the history
…y standard properties (#117)

* 4224 fix: [GOLIUM][TECHDEBT] Check rabbitmq step to filter messages by standard properties

* refactoring to reduce complexity and characters per line limit

* remove unused parameter

* remove unused parameter

* change the behavior of the step to strictly expected messages

* fix pass by reference
  • Loading branch information
ruben-garciad authored Jun 7, 2023
1 parent 518633d commit 77eea8a
Show file tree
Hide file tree
Showing 4 changed files with 193 additions and 53 deletions.
103 changes: 89 additions & 14 deletions steps/rabbit/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/sirupsen/logrus"
"github.com/streadway/amqp"
"github.com/tidwall/sjson"
"golang.org/x/exp/slices"
)

const (
Expand All @@ -39,6 +40,8 @@ type Session struct {
Connection *amqp.Connection
// Messages received from the publish/subscribe channel
Messages []amqp.Delivery
// Messages consumed from the publish/subscribe channel
ConsumedMessages []amqp.Delivery
// Correlator is used to correlate the messages for a specific session
Correlator string
// rabbit channel
Expand Down Expand Up @@ -200,10 +203,17 @@ func (s *Session) WaitForTextMessage(ctx context.Context,
timeout time.Duration,
expectedMsg string,
) error {
s.ConsumedMessages = make([]amqp.Delivery, 0, len(s.Messages))

return waitUpTo(timeout, func() error {
for i := range s.Messages {
if string(s.Messages[i].Body) == expectedMsg {
s.msg = s.Messages[i]

// Consume the processed messages
s.ConsumedMessages = append(s.ConsumedMessages, s.Messages[i])
slices.Delete(s.Messages, i, i+1)

return nil
}
}
Expand All @@ -224,11 +234,19 @@ func (s *Session) WaitForJSONMessageWithProperties(ctx context.Context,
if err != nil {
return fmt.Errorf(convertTableToMapMessage+"%w", err)
}

s.ConsumedMessages = make([]amqp.Delivery, 0, len(s.Messages))

err = waitUpTo(timeout, func() error {
for i := range s.Messages {
logrus.Debugf("Checking message: %s", s.Messages[i].Body)
if matchMessage(string(s.Messages[i].Body), props) {
s.msg = s.Messages[i]

// Consume the processed messages
s.ConsumedMessages = append(s.ConsumedMessages, s.Messages[i])
slices.Delete(s.Messages, i, i+1)

return nil
}
}
Expand All @@ -242,7 +260,7 @@ func (s *Session) WaitForJSONMessageWithProperties(ctx context.Context,
func WaitForWithWantedErrorNormalizer(wantErr bool, err error, propertiesType string) error {
if !wantErr {
if err != nil {
return fmt.Errorf("no message(s) received match(es) the with %s properties", propertiesType)
return fmt.Errorf("no message(s) received match(es) with %s properties", propertiesType)
}
} else {
if err == nil {
Expand All @@ -266,47 +284,104 @@ func matchMessage(msg string, expectedProps map[string]interface{}) bool {

// WaitForMessagesWithStandardProperties waits for 'count' messages with standard rabbit properties
// that are equal to the expected values.
// If strictly is set to true, function loops through all received messages to check that
// the amount exactly matches the expected
// When wantErr is set to true function returns error if message is found with the JSON properties
// and returns no error when message is not found after timeout.
func (s *Session) WaitForMessagesWithStandardProperties(
ctx context.Context,
timeout time.Duration,
count int,
strictly bool,
t *godog.Table,
wantErr bool,
) error {
var props amqp.Delivery
if err := golium.ConvertTableWithoutHeaderToStruct(ctx, t, &props); err != nil {
return fmt.Errorf("failed configuring rabbit endpoint: %w", err)
}

expectedMessagesCount := count
s.ConsumedMessages = make([]amqp.Delivery, 0, len(s.Messages))
unconsumedMessages := make([]amqp.Delivery, 0, len(s.Messages))
processedMessagesCount := 0
// Consume the processed messages
defer func() {
if processedMessagesCount < len(s.Messages) {
s.Messages = append(unconsumedMessages, s.Messages[processedMessagesCount:]...)
} else {
s.Messages = unconsumedMessages
}
}()

err := waitUpTo(timeout, func() error {
err := fmt.Errorf("no message(s) received match(es) the standard properties")
if count < 0 {
return err
}

for i := range s.Messages {
logrus.Debugf("Checking message: %s", s.Messages[i].Body)
s.msg = s.Messages[i]
if err = s.ValidateMessageStandardProperties(ctx, props); err == nil {
count--
if count == 0 {
return nil
}
}
count = s.processReceivedMessages(props, &unconsumedMessages, &processedMessagesCount, count)
if count == 0 {
return nil
}

if count < expectedMessagesCount {
err = fmt.Errorf("not all messages matching the standard properties have been received")
}
return err
})

// ensures that if there are messages to be processed, none of them match the standard properties
if err == nil && strictly {
count = s.processReceivedMessages(props, &unconsumedMessages, &processedMessagesCount, count)
if count < 0 {
err = fmt.Errorf("more than expected message(s) received match(es) the standard properties")
}
}

return WaitForWithWantedErrorNormalizer(wantErr, err, "standard")
}

func (s *Session) processReceivedMessages(
props amqp.Delivery,
unconsumedMessages *[]amqp.Delivery,
processedMessagesCount *int,
count int,
) int {
// Only new messages are processed
for i := *processedMessagesCount; i < len(s.Messages); i++ {
logrus.Debugf("Checking message: %s", s.Messages[i].Body)
*processedMessagesCount++

errValidate := s.validateMessageStandardProperties(s.Messages[i], props)
if errValidate == nil {
s.msg = s.Messages[i]
s.ConsumedMessages = append(s.ConsumedMessages, s.Messages[i])
count--
if count == 0 {
break
}
} else {
*unconsumedMessages = append(*unconsumedMessages, s.Messages[i])
}
}

return count
}

// ValidateMessageStandardProperties checks if the message standard rabbit properties are equal
// the expected values.
func (s *Session) ValidateMessageStandardProperties(
ctx context.Context,
props amqp.Delivery,
) error {
msg := reflect.ValueOf(s.msg)
return s.validateMessageStandardProperties(s.msg, props)
}

func (s *Session) validateMessageStandardProperties(
msgToValidate amqp.Delivery,
props amqp.Delivery,
) error {
msg := reflect.ValueOf(msgToValidate)
expectedMsg := reflect.ValueOf(props)
t := expectedMsg.Type()
for i := 0; i < expectedMsg.NumField(); i++ {
Expand Down Expand Up @@ -370,13 +445,13 @@ func (s *Session) ValidateMessageJSONBody(ctx context.Context,
}
m := golium.NewMapFromJSONBytes(s.msg.Body)
if pos != -1 {
nMessages := len(s.Messages)
nMessages := len(s.ConsumedMessages)
if pos < 0 || pos >= nMessages {
return fmt.Errorf(
"trying to validate message in position: '%d', '%d' messages available",
pos, nMessages)
}
m = golium.NewMapFromJSONBytes(s.Messages[pos].Body)
m = golium.NewMapFromJSONBytes(s.ConsumedMessages[pos].Body)
}
for key, expectedValue := range props {
value := m.Get(key)
Expand Down
131 changes: 98 additions & 33 deletions steps/rabbit/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -476,10 +476,12 @@ func TestWaitForJSONMessageWithProperties(t *testing.T) {

func TestWaitForMessagesWithStandardProperties(t *testing.T) {
type args struct {
timeout time.Duration
count int
table *godog.Table
wantErr bool
timeout time.Duration
count int
strictly bool
messages []amqp.Delivery
table *godog.Table
wantErr bool
}
tests := []struct {
name string
Expand All @@ -489,81 +491,144 @@ func TestWaitForMessagesWithStandardProperties(t *testing.T) {
{
name: "Convert table error",
args: args{
count: 0,
timeout: 1 * time.Second,
table: golium.NewTable([][]string{{"Priority", "5"}}),
count: 0,
strictly: false,
messages: []amqp.Delivery{{Priority: 5}},
timeout: 1 * time.Second,
table: golium.NewTable([][]string{{"Priority", "5"}}),
},
wantErr: true,
},
{
name: "Message count < 0",
args: args{
count: -1,
timeout: 1 * time.Second,
table: golium.NewTable([][]string{{"param", "value"}, {"Priority", "5"}}),
wantErr: false,
count: -1,
strictly: false,
messages: []amqp.Delivery{{Priority: 5}},
timeout: 1 * time.Second,
table: golium.NewTable([][]string{{"param", "value"}, {"Priority", "5"}}),
wantErr: false,
},
wantErr: true,
},
{
name: "Message count < 0 expecting error",
args: args{
count: -1,
timeout: 1 * time.Second,
table: golium.NewTable([][]string{{"param", "value"}, {"Priority", "5"}}),
wantErr: true,
count: -1,
strictly: false,
messages: []amqp.Delivery{{Priority: 5}},
timeout: 1 * time.Second,
table: golium.NewTable([][]string{{"param", "value"}, {"Priority", "5"}}),
wantErr: true,
},
wantErr: false,
},
{
name: "Matching properties",
args: args{
count: 1,
timeout: 1 * time.Second,
table: golium.NewTable([][]string{{"param", "value"}, {"Priority", "5"}}),
wantErr: false,
count: 1,
strictly: false,
messages: []amqp.Delivery{{Priority: 5}},
timeout: 1 * time.Second,
table: golium.NewTable([][]string{{"param", "value"}, {"Priority", "5"}}),
wantErr: false,
},
wantErr: false,
},
{
name: "Matching properties expecting error",
args: args{
count: 1,
timeout: 1 * time.Second,
table: golium.NewTable([][]string{{"param", "value"}, {"Priority", "5"}}),
wantErr: true,
count: 1,
strictly: false,
messages: []amqp.Delivery{{Priority: 5}, {Priority: 5}},
timeout: 1 * time.Second,
table: golium.NewTable([][]string{{"param", "value"}, {"Priority", "5"}}),
wantErr: true,
},
wantErr: true,
},
{
name: "Not matching properties",
args: args{
count: 1,
timeout: 1 * time.Second,
table: golium.NewTable([][]string{{"param", "value"}, {"Priority", "10"}}),
wantErr: false,
count: 1,
strictly: false,
messages: []amqp.Delivery{{Priority: 5}, {Priority: 5}},
timeout: 1 * time.Second,
table: golium.NewTable([][]string{{"param", "value"}, {"Priority", "10"}}),
wantErr: false,
},
wantErr: true,
},
{
name: "Not matching properties expecting error",
args: args{
count: 1,
timeout: 1 * time.Second,
table: golium.NewTable([][]string{{"param", "value"}, {"Priority", "10"}}),
wantErr: true,
count: 1,
strictly: false,
messages: []amqp.Delivery{{Priority: 5}},
timeout: 1 * time.Second,
table: golium.NewTable([][]string{{"param", "value"}, {"Priority", "10"}}),
wantErr: true,
},
wantErr: false,
},
{
name: "Not matching number of received messages with standard properties",
args: args{
count: 2,
strictly: false,
messages: []amqp.Delivery{{Priority: 10}, {Priority: 5}},
timeout: 1 * time.Second,
table: golium.NewTable([][]string{{"param", "value"}, {"Priority", "5"}}),
wantErr: false,
},
wantErr: true,
},
{
name: "Not matching number of received messages with standard properties expecting error",
args: args{
count: 2,
strictly: false,
messages: []amqp.Delivery{{Priority: 10}, {Priority: 5}},
timeout: 1 * time.Second,
table: golium.NewTable([][]string{{"param", "value"}, {"Priority", "5"}}),
wantErr: true,
},
wantErr: false,
},
{
name: "Strictly matching number of received messages with standard properties",
args: args{
count: 2,
strictly: true,
messages: []amqp.Delivery{{Priority: 5}, {Priority: 10}, {Priority: 5}},
timeout: 1 * time.Second,
table: golium.NewTable([][]string{{"param", "value"}, {"Priority", "5"}}),
wantErr: false,
},
wantErr: false,
},
{
name: "Not strictly matching number of received messages with standard properties expecting",
args: args{
count: 2,
strictly: true,
messages: []amqp.Delivery{{Priority: 5}, {Priority: 10}, {Priority: 5}, {Priority: 5}},
timeout: 1 * time.Second,
table: golium.NewTable([][]string{{"param", "value"}, {"Priority", "5"}}),
wantErr: false,
},
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
s := &Session{}
s.Messages = []amqp.Delivery{{Priority: 5}}
s.Messages = tt.args.messages
if err := s.WaitForMessagesWithStandardProperties(
context.Background(),
tt.args.timeout,
tt.args.count,
tt.args.strictly,
tt.args.table,
tt.args.wantErr); (err != nil) != tt.wantErr {
t.Errorf(
Expand Down Expand Up @@ -721,7 +786,7 @@ func TestValidateMessageJSONBody(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
s := &Session{}
s.Messages = []amqp.Delivery{
s.ConsumedMessages = []amqp.Delivery{
{
Body: []byte(`{"id": "1"}`),
},
Expand Down
Loading

0 comments on commit 77eea8a

Please sign in to comment.