-
Notifications
You must be signed in to change notification settings - Fork 163
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
FAQ - JetStream and consumer queue groups #446
Comments
nats-server version - 2.5.0 I have a publisher for subject1 While publishing a msg to the subject, all the two subscriber instance were consuming the same msg. Can you point me correct direction? |
On the consumer configuration, you need to specify |
Yes I have done it already, then only raised a question here
|
Can you confirm your server version |
I wrote the following test, works for me under 2.5.0. test("jetstream - qsub ackall", async (t) => {
const ns = await NatsServer.start(jetstreamServerConf());
let nc = await connect({ port: ns.port });
const jsm = await nc.jetstreamManager();
const stream = nuid.next();
const subj = nuid.next();
await jsm.streams.add({ name: stream, subjects: [subj] });
const js = nc.jetstream();
const opts = consumerOpts();
opts.queue("q");
opts.durable("n");
opts.deliverTo("here");
opts.ackAll();
opts.callback((_err, m) => {});
const sub = await js.subscribe(subj, opts);
const sub2 = await js.subscribe(subj, opts);
for (let i = 0; i < 100; i++) {
await js.publish(subj, Empty);
}
await nc.flush();
await sub.drain();
await sub2.drain();
t.true(sub.getProcessed() > 0);
t.true(sub2.getProcessed() > 0);
t.is(sub.getProcessed() + sub2.getProcessed(), 100);
const ci = await jsm.consumers.info(stream, "n");
t.is(ci.num_pending, 0);
t.is(ci.num_ack_pending, 0);
await nc.close();
await ns.stop();
}); Please create a different issue if you need additional follow-up. This issue was intended as documentation. |
This is an FAQ for the changes related to server 2.4.0 and nats.js v2.2.0.
If you have a JetStream consumer that was created with a nats.js version prior to v2.2.0 that uses queue groups:
deliver_group
specified.To create a queue subscriber with the magic jetstream
subscribe()
, specify the consumer builder optionqueue(n)
:If using JetStreamManager to create your consumer, the consumer configuration exposes
deliver_group
:For more information please read the release notes:
https://github.com/nats-io/nats.js/releases/tag/v2.2.0
The text was updated successfully, but these errors were encountered: