diff --git a/steps/rabbit/session.go b/steps/rabbit/session.go index e4e8ed96..a9f1c7d7 100644 --- a/steps/rabbit/session.go +++ b/steps/rabbit/session.go @@ -27,6 +27,7 @@ import ( "github.com/sirupsen/logrus" "github.com/streadway/amqp" "github.com/tidwall/sjson" + "golang.org/x/exp/slices" ) const ( @@ -39,6 +40,8 @@ type Session struct { Connection *amqp.Connection // Messages received from the publish/subscribe channel Messages []amqp.Delivery + // Messages consumed from the publish/subscribe channel + ConsumedMessages []amqp.Delivery // Correlator is used to correlate the messages for a specific session Correlator string // rabbit channel @@ -200,10 +203,17 @@ func (s *Session) WaitForTextMessage(ctx context.Context, timeout time.Duration, expectedMsg string, ) error { + s.ConsumedMessages = make([]amqp.Delivery, 0, len(s.Messages)) + return waitUpTo(timeout, func() error { for i := range s.Messages { if string(s.Messages[i].Body) == expectedMsg { s.msg = s.Messages[i] + + // Consume the processed messages + s.ConsumedMessages = append(s.ConsumedMessages, s.Messages[i]) + slices.Delete(s.Messages, i, i+1) + return nil } } @@ -224,11 +234,19 @@ func (s *Session) WaitForJSONMessageWithProperties(ctx context.Context, if err != nil { return fmt.Errorf(convertTableToMapMessage+"%w", err) } + + s.ConsumedMessages = make([]amqp.Delivery, 0, len(s.Messages)) + err = waitUpTo(timeout, func() error { for i := range s.Messages { logrus.Debugf("Checking message: %s", s.Messages[i].Body) if matchMessage(string(s.Messages[i].Body), props) { s.msg = s.Messages[i] + + // Consume the processed messages + s.ConsumedMessages = append(s.ConsumedMessages, s.Messages[i]) + slices.Delete(s.Messages, i, i+1) + return nil } } @@ -242,7 +260,7 @@ func (s *Session) WaitForJSONMessageWithProperties(ctx context.Context, func WaitForWithWantedErrorNormalizer(wantErr bool, err error, propertiesType string) error { if !wantErr { if err != nil { - return fmt.Errorf("no message(s) received match(es) the with %s properties", propertiesType) + return fmt.Errorf("no message(s) received match(es) with %s properties", propertiesType) } } else { if err == nil { @@ -266,12 +284,15 @@ func matchMessage(msg string, expectedProps map[string]interface{}) bool { // WaitForMessagesWithStandardProperties waits for 'count' messages with standard rabbit properties // that are equal to the expected values. +// If strictly is set to true, function loops through all received messages to check that +// the amount exactly matches the expected // When wantErr is set to true function returns error if message is found with the JSON properties // and returns no error when message is not found after timeout. func (s *Session) WaitForMessagesWithStandardProperties( ctx context.Context, timeout time.Duration, count int, + strictly bool, t *godog.Table, wantErr bool, ) error { @@ -279,34 +300,88 @@ func (s *Session) WaitForMessagesWithStandardProperties( if err := golium.ConvertTableWithoutHeaderToStruct(ctx, t, &props); err != nil { return fmt.Errorf("failed configuring rabbit endpoint: %w", err) } + + expectedMessagesCount := count + s.ConsumedMessages = make([]amqp.Delivery, 0, len(s.Messages)) + unconsumedMessages := make([]amqp.Delivery, 0, len(s.Messages)) + processedMessagesCount := 0 + // Consume the processed messages + defer func() { + if processedMessagesCount < len(s.Messages) { + s.Messages = append(unconsumedMessages, s.Messages[processedMessagesCount:]...) + } else { + s.Messages = unconsumedMessages + } + }() + err := waitUpTo(timeout, func() error { err := fmt.Errorf("no message(s) received match(es) the standard properties") if count < 0 { return err } - for i := range s.Messages { - logrus.Debugf("Checking message: %s", s.Messages[i].Body) - s.msg = s.Messages[i] - if err = s.ValidateMessageStandardProperties(ctx, props); err == nil { - count-- - if count == 0 { - return nil - } - } + count = s.processReceivedMessages(props, &unconsumedMessages, &processedMessagesCount, count) + if count == 0 { + return nil + } + + if count < expectedMessagesCount { + err = fmt.Errorf("not all messages matching the standard properties have been received") } return err }) + + // ensures that if there are messages to be processed, none of them match the standard properties + if err == nil && strictly { + count = s.processReceivedMessages(props, &unconsumedMessages, &processedMessagesCount, count) + if count < 0 { + err = fmt.Errorf("more than expected message(s) received match(es) the standard properties") + } + } + return WaitForWithWantedErrorNormalizer(wantErr, err, "standard") } +func (s *Session) processReceivedMessages( + props amqp.Delivery, + unconsumedMessages *[]amqp.Delivery, + processedMessagesCount *int, + count int, +) int { + // Only new messages are processed + for i := *processedMessagesCount; i < len(s.Messages); i++ { + logrus.Debugf("Checking message: %s", s.Messages[i].Body) + *processedMessagesCount++ + + errValidate := s.validateMessageStandardProperties(s.Messages[i], props) + if errValidate == nil { + s.msg = s.Messages[i] + s.ConsumedMessages = append(s.ConsumedMessages, s.Messages[i]) + count-- + if count == 0 { + break + } + } else { + *unconsumedMessages = append(*unconsumedMessages, s.Messages[i]) + } + } + + return count +} + // ValidateMessageStandardProperties checks if the message standard rabbit properties are equal // the expected values. func (s *Session) ValidateMessageStandardProperties( - ctx context.Context, props amqp.Delivery, ) error { - msg := reflect.ValueOf(s.msg) + return s.validateMessageStandardProperties(s.msg, props) +} + +func (s *Session) validateMessageStandardProperties( + msgToValidate amqp.Delivery, + props amqp.Delivery, +) error { + msg := reflect.ValueOf(msgToValidate) expectedMsg := reflect.ValueOf(props) t := expectedMsg.Type() for i := 0; i < expectedMsg.NumField(); i++ { @@ -370,13 +445,13 @@ func (s *Session) ValidateMessageJSONBody(ctx context.Context, } m := golium.NewMapFromJSONBytes(s.msg.Body) if pos != -1 { - nMessages := len(s.Messages) + nMessages := len(s.ConsumedMessages) if pos < 0 || pos >= nMessages { return fmt.Errorf( "trying to validate message in position: '%d', '%d' messages available", pos, nMessages) } - m = golium.NewMapFromJSONBytes(s.Messages[pos].Body) + m = golium.NewMapFromJSONBytes(s.ConsumedMessages[pos].Body) } for key, expectedValue := range props { value := m.Get(key) diff --git a/steps/rabbit/session_test.go b/steps/rabbit/session_test.go index 692b821b..7fc25f02 100644 --- a/steps/rabbit/session_test.go +++ b/steps/rabbit/session_test.go @@ -476,10 +476,12 @@ func TestWaitForJSONMessageWithProperties(t *testing.T) { func TestWaitForMessagesWithStandardProperties(t *testing.T) { type args struct { - timeout time.Duration - count int - table *godog.Table - wantErr bool + timeout time.Duration + count int + strictly bool + messages []amqp.Delivery + table *godog.Table + wantErr bool } tests := []struct { name string @@ -489,81 +491,144 @@ func TestWaitForMessagesWithStandardProperties(t *testing.T) { { name: "Convert table error", args: args{ - count: 0, - timeout: 1 * time.Second, - table: golium.NewTable([][]string{{"Priority", "5"}}), + count: 0, + strictly: false, + messages: []amqp.Delivery{{Priority: 5}}, + timeout: 1 * time.Second, + table: golium.NewTable([][]string{{"Priority", "5"}}), }, wantErr: true, }, { name: "Message count < 0", args: args{ - count: -1, - timeout: 1 * time.Second, - table: golium.NewTable([][]string{{"param", "value"}, {"Priority", "5"}}), - wantErr: false, + count: -1, + strictly: false, + messages: []amqp.Delivery{{Priority: 5}}, + timeout: 1 * time.Second, + table: golium.NewTable([][]string{{"param", "value"}, {"Priority", "5"}}), + wantErr: false, }, wantErr: true, }, { name: "Message count < 0 expecting error", args: args{ - count: -1, - timeout: 1 * time.Second, - table: golium.NewTable([][]string{{"param", "value"}, {"Priority", "5"}}), - wantErr: true, + count: -1, + strictly: false, + messages: []amqp.Delivery{{Priority: 5}}, + timeout: 1 * time.Second, + table: golium.NewTable([][]string{{"param", "value"}, {"Priority", "5"}}), + wantErr: true, }, wantErr: false, }, { name: "Matching properties", args: args{ - count: 1, - timeout: 1 * time.Second, - table: golium.NewTable([][]string{{"param", "value"}, {"Priority", "5"}}), - wantErr: false, + count: 1, + strictly: false, + messages: []amqp.Delivery{{Priority: 5}}, + timeout: 1 * time.Second, + table: golium.NewTable([][]string{{"param", "value"}, {"Priority", "5"}}), + wantErr: false, }, wantErr: false, }, { name: "Matching properties expecting error", args: args{ - count: 1, - timeout: 1 * time.Second, - table: golium.NewTable([][]string{{"param", "value"}, {"Priority", "5"}}), - wantErr: true, + count: 1, + strictly: false, + messages: []amqp.Delivery{{Priority: 5}, {Priority: 5}}, + timeout: 1 * time.Second, + table: golium.NewTable([][]string{{"param", "value"}, {"Priority", "5"}}), + wantErr: true, }, wantErr: true, }, { name: "Not matching properties", args: args{ - count: 1, - timeout: 1 * time.Second, - table: golium.NewTable([][]string{{"param", "value"}, {"Priority", "10"}}), - wantErr: false, + count: 1, + strictly: false, + messages: []amqp.Delivery{{Priority: 5}, {Priority: 5}}, + timeout: 1 * time.Second, + table: golium.NewTable([][]string{{"param", "value"}, {"Priority", "10"}}), + wantErr: false, }, wantErr: true, }, { name: "Not matching properties expecting error", args: args{ - count: 1, - timeout: 1 * time.Second, - table: golium.NewTable([][]string{{"param", "value"}, {"Priority", "10"}}), - wantErr: true, + count: 1, + strictly: false, + messages: []amqp.Delivery{{Priority: 5}}, + timeout: 1 * time.Second, + table: golium.NewTable([][]string{{"param", "value"}, {"Priority", "10"}}), + wantErr: true, }, wantErr: false, }, + { + name: "Not matching number of received messages with standard properties", + args: args{ + count: 2, + strictly: false, + messages: []amqp.Delivery{{Priority: 10}, {Priority: 5}}, + timeout: 1 * time.Second, + table: golium.NewTable([][]string{{"param", "value"}, {"Priority", "5"}}), + wantErr: false, + }, + wantErr: true, + }, + { + name: "Not matching number of received messages with standard properties expecting error", + args: args{ + count: 2, + strictly: false, + messages: []amqp.Delivery{{Priority: 10}, {Priority: 5}}, + timeout: 1 * time.Second, + table: golium.NewTable([][]string{{"param", "value"}, {"Priority", "5"}}), + wantErr: true, + }, + wantErr: false, + }, + { + name: "Strictly matching number of received messages with standard properties", + args: args{ + count: 2, + strictly: true, + messages: []amqp.Delivery{{Priority: 5}, {Priority: 10}, {Priority: 5}}, + timeout: 1 * time.Second, + table: golium.NewTable([][]string{{"param", "value"}, {"Priority", "5"}}), + wantErr: false, + }, + wantErr: false, + }, + { + name: "Not strictly matching number of received messages with standard properties expecting", + args: args{ + count: 2, + strictly: true, + messages: []amqp.Delivery{{Priority: 5}, {Priority: 10}, {Priority: 5}, {Priority: 5}}, + timeout: 1 * time.Second, + table: golium.NewTable([][]string{{"param", "value"}, {"Priority", "5"}}), + wantErr: false, + }, + wantErr: true, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { s := &Session{} - s.Messages = []amqp.Delivery{{Priority: 5}} + s.Messages = tt.args.messages if err := s.WaitForMessagesWithStandardProperties( context.Background(), tt.args.timeout, tt.args.count, + tt.args.strictly, tt.args.table, tt.args.wantErr); (err != nil) != tt.wantErr { t.Errorf( @@ -721,7 +786,7 @@ func TestValidateMessageJSONBody(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { s := &Session{} - s.Messages = []amqp.Delivery{ + s.ConsumedMessages = []amqp.Delivery{ { Body: []byte(`{"id": "1"}`), }, diff --git a/steps/rabbit/steps.go b/steps/rabbit/steps.go index c1e7cead..9e5612f6 100755 --- a/steps/rabbit/steps.go +++ b/steps/rabbit/steps.go @@ -66,15 +66,15 @@ func (cs Steps) InitializeSteps(ctx context.Context, scenCtx *godog.ScenarioCont }) scenCtx.Step(`^I wait up to "(\d+)" seconds? for a rabbit message with the standard properties$`, func(timeout int, t *godog.Table) error { timeoutDuration := time.Duration(timeout) * time.Second - return session.WaitForMessagesWithStandardProperties(ctx, timeoutDuration, 1, t, false) + return session.WaitForMessagesWithStandardProperties(ctx, timeoutDuration, 1, false, t, false) }) - scenCtx.Step(`^I wait up to "(\d+)" seconds? for "(\d+)" rabbit messages with the standard properties$`, func(timeout int, count int, t *godog.Table) error { + scenCtx.Step(`^I wait up to "(\d+)" seconds? for exactly "(\d+)" rabbit messages with the standard properties$`, func(timeout int, count int, t *godog.Table) error { timeoutDuration := time.Duration(timeout) * time.Second - return session.WaitForMessagesWithStandardProperties(ctx, timeoutDuration, count, t, false) + return session.WaitForMessagesWithStandardProperties(ctx, timeoutDuration, count, true, t, false) }) scenCtx.Step(`^I wait up to "(\d+)" seconds? without a rabbit message with the standard properties$`, func(timeout int, t *godog.Table) error { timeoutDuration := time.Duration(timeout) * time.Second - return session.WaitForMessagesWithStandardProperties(ctx, timeoutDuration, 1, t, true) + return session.WaitForMessagesWithStandardProperties(ctx, timeoutDuration, 1, false, t, true) }) scenCtx.Step(`^the rabbit message has the rabbit headers$`, func(t *godog.Table) error { return session.ValidateMessageHeaders(ctx, t) @@ -84,7 +84,7 @@ func (cs Steps) InitializeSteps(ctx context.Context, scenCtx *godog.ScenarioCont if err := golium.ConvertTableWithoutHeaderToStruct(ctx, t, &props); err != nil { return fmt.Errorf("failed configuring rabbit endpoint: %w", err) } - return session.ValidateMessageStandardProperties(ctx, props) + return session.ValidateMessageStandardProperties(props) }) scenCtx.Step(`^the rabbit message body has the text$`, func(m *godog.DocString) error { message := golium.ValueAsString(ctx, m.Content) diff --git a/test/acceptance/features/rabbit.feature b/test/acceptance/features/rabbit.feature index a8b0c76a..f70bf27d 100755 --- a/test/acceptance/features/rabbit.feature +++ b/test/acceptance/features/rabbit.feature @@ -73,7 +73,7 @@ Feature: Rabbit client | param | value | | id2 | abc2 | | name2 | Golium2 | - Then I wait up to "5" seconds for "3" rabbit messages with the standard properties + Then I wait up to "5" seconds for exactly "3" rabbit messages with the standard properties | param | value | | ContentType | application/json | | CorrelationId | [CTXT:CorrelationId] |