Skip to content

Commit

Permalink
fix: correct the issue with jetstream consumer
Browse files Browse the repository at this point in the history
  • Loading branch information
1995parham committed Oct 2, 2024
1 parent edbd1f9 commit 5411f7d
Showing 1 changed file with 4 additions and 3 deletions.
7 changes: 4 additions & 3 deletions internal/client/jetstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ func (client *Client) StartBlackboxTest(ctx context.Context) {

if client.config.IsJetstream {
for _, stream := range client.config.Streams {
messageChannel := client.createSubscribe(ctx, stream.Subject)
messageChannel := client.createSubscribe(ctx, stream.Subject, stream.Name)
go client.jetstreamPublish(ctx, stream.Subject, stream.Name)
go client.jetstreamSubscribe(messageChannel, stream.Name)
}
Expand All @@ -164,17 +164,18 @@ func (client *Client) StartBlackboxTest(ctx context.Context) {
}

// Subscribe subscribes to a list of subjects and returns a channel with incoming messages.
func (client *Client) createSubscribe(ctx context.Context, subject string) <-chan *Message {
func (client *Client) createSubscribe(ctx context.Context, subject string, stream string) <-chan *Message {
messageHandler, h := client.messageHandlerJetstreamFactory()

con, err := client.jetstream.CreateOrUpdateConsumer(
ctx,
subject,
stream,
jetstream.ConsumerConfig{ // nolint: exhaustruct
DeliverPolicy: jetstream.DeliverNewPolicy,
ReplayPolicy: jetstream.ReplayInstantPolicy,
AckPolicy: jetstream.AckExplicitPolicy,
MaxAckPending: client.config.MaxPubAcksInflight,
FilterSubject: subject,
},
)
if err != nil {
Expand Down

0 comments on commit 5411f7d

Please sign in to comment.