Skip to content

Commit

Permalink
add info about the number of messages to process + counters of how ma…
Browse files Browse the repository at this point in the history
…ny are processed
  • Loading branch information
tttp committed Jul 30, 2023
1 parent 5f59a3e commit f7a78ac
Showing 1 changed file with 19 additions and 1 deletion.
20 changes: 19 additions & 1 deletion src/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,12 @@ import os from 'os';

let connection: any = null;
let consumer: any = null;
export const count = {queued : undefined, ack: 0, nack: 0};

async function exitHandler(evtOrExitCodeOrError: number | string | Error) {
try {
if (connection) {
console.log('closing, waiting for all the messages being processed');
console.log('closing after processing', count.ack,"and rejecting",count.nack);
await consumer.close();
await connection.close();
}
Expand All @@ -37,6 +38,7 @@ export const connect = (queueUrl: string) => {
});
rabbit.on('connection', () => {
console.log('Connection successfully (re)established');
//await ch.close()
});

process.once('SIGINT', exitHandler),
Expand Down Expand Up @@ -114,6 +116,7 @@ export const syncQueue = async (
throw new Error ("the syncer must return a boolean");
}
if (!processed) {
count.nack++;
if (msg.redelivered) {
console.error('already requeued, push to dead-letter', action?.actionId ? 'Action Id:' + action.actionId : '!');
return ConsumerStatus.DROP;
Expand All @@ -122,6 +125,7 @@ export const syncQueue = async (
console.error('we need to nack and requeue', action?.actionId ? 'Action Id:' + action.actionId : '!');
return ConsumerStatus.REQUEUE; // nack + requeue
}
count.ack++;
return ConsumerStatus.ACK;
} catch (e) {
// if the syncer throw an error it's a permanent problem, we need to close
Expand All @@ -134,6 +138,20 @@ export const syncQueue = async (
throw new Error ("message not properly processed #" +action?.actionId);
}
);

const messageCount = async () => {
const {messageCount} = await sub._ch.queueDeclare({
queue: sub._queue,
passive: true
})
console.log("messages in the queue",messageCount);
count.queued= messageCount;
}

sub.on('ready', async () => {
await messageCount();
});

sub.on('error', (err: any) => {
// Maybe the consumer was cancelled, or the connection was reset before a
// message could be acknowledged.
Expand Down

0 comments on commit f7a78ac

Please sign in to comment.