From 5411f7d21089a5ddb30c954f5fc98b443292c097 Mon Sep 17 00:00:00 2001 From: Parham Alvani Date: Wed, 2 Oct 2024 20:34:15 +0000 Subject: [PATCH] fix: correct the issue with jetstream consumer --- internal/client/jetstream.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/internal/client/jetstream.go b/internal/client/jetstream.go index 4b40520..bea4aad 100644 --- a/internal/client/jetstream.go +++ b/internal/client/jetstream.go @@ -151,7 +151,7 @@ func (client *Client) StartBlackboxTest(ctx context.Context) { if client.config.IsJetstream { for _, stream := range client.config.Streams { - messageChannel := client.createSubscribe(ctx, stream.Subject) + messageChannel := client.createSubscribe(ctx, stream.Subject, stream.Name) go client.jetstreamPublish(ctx, stream.Subject, stream.Name) go client.jetstreamSubscribe(messageChannel, stream.Name) } @@ -164,17 +164,18 @@ func (client *Client) StartBlackboxTest(ctx context.Context) { } // Subscribe subscribes to a list of subjects and returns a channel with incoming messages. -func (client *Client) createSubscribe(ctx context.Context, subject string) <-chan *Message { +func (client *Client) createSubscribe(ctx context.Context, subject string, stream string) <-chan *Message { messageHandler, h := client.messageHandlerJetstreamFactory() con, err := client.jetstream.CreateOrUpdateConsumer( ctx, - subject, + stream, jetstream.ConsumerConfig{ // nolint: exhaustruct DeliverPolicy: jetstream.DeliverNewPolicy, ReplayPolicy: jetstream.ReplayInstantPolicy, AckPolicy: jetstream.AckExplicitPolicy, MaxAckPending: client.config.MaxPubAcksInflight, + FilterSubject: subject, }, ) if err != nil {