diff --git a/src/queue.ts b/src/queue.ts index 6ff6010..e6531ee 100644 --- a/src/queue.ts +++ b/src/queue.ts @@ -12,11 +12,12 @@ import os from 'os'; let connection: any = null; let consumer: any = null; +export const count = {queued : undefined, ack: 0, nack: 0}; async function exitHandler(evtOrExitCodeOrError: number | string | Error) { try { if (connection) { - console.log('closing, waiting for all the messages being processed'); + console.log('closing after processing', count.ack,"and rejecting",count.nack); await consumer.close(); await connection.close(); } @@ -37,6 +38,7 @@ export const connect = (queueUrl: string) => { }); rabbit.on('connection', () => { console.log('Connection successfully (re)established'); +//await ch.close() }); process.once('SIGINT', exitHandler), @@ -114,6 +116,7 @@ export const syncQueue = async ( throw new Error ("the syncer must return a boolean"); } if (!processed) { + count.nack++; if (msg.redelivered) { console.error('already requeued, push to dead-letter', action?.actionId ? 'Action Id:' + action.actionId : '!'); return ConsumerStatus.DROP; @@ -122,6 +125,7 @@ export const syncQueue = async ( console.error('we need to nack and requeue', action?.actionId ? 'Action Id:' + action.actionId : '!'); return ConsumerStatus.REQUEUE; // nack + requeue } + count.ack++; return ConsumerStatus.ACK; } catch (e) { // if the syncer throw an error it's a permanent problem, we need to close @@ -134,6 +138,20 @@ export const syncQueue = async ( throw new Error ("message not properly processed #" +action?.actionId); } ); + + const messageCount = async () => { + const {messageCount} = await sub._ch.queueDeclare({ + queue: sub._queue, + passive: true + }) + console.log("messages in the queue",messageCount); + count.queued= messageCount; + } + + sub.on('ready', async () => { + await messageCount(); + }); + sub.on('error', (err: any) => { // Maybe the consumer was cancelled, or the connection was reset before a // message could be acknowledged.