Skip to content
This repository has been archived by the owner on Oct 18, 2024. It is now read-only.

Commit

Permalink
Merge pull request #8 from Stormpass/feature/reconnect-on-error
Browse files Browse the repository at this point in the history
allow reconnect on error
  • Loading branch information
Stormpass authored Mar 12, 2024
2 parents 361bcdb + 1a1d90b commit 3253cdb
Show file tree
Hide file tree
Showing 15 changed files with 250 additions and 190 deletions.
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ and

+ upgrade amqplib so you can use it with node10+
+ fixed direct routing publish issues
+ allow reconnect on error
+ manually control node reconnect
+ fixed multi event listener on connection (cause memory leak)

AMQP nodes for node-red

Expand Down
10 changes: 5 additions & 5 deletions examples/AMQP General Usage.json
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@
"name": "",
"broker": "",
"prefetch": 0,
"maxAttempts": 5,
"reconnectOnError": false,
"noAck": true,
"exchangeName": "amq.topic",
"exchangeType": "topic",
Expand Down Expand Up @@ -158,7 +158,7 @@
"name": "",
"broker": "",
"prefetch": 0,
"maxAttempts": 5,
"reconnectOnError": false,
"noAck": true,
"exchangeName": "amq.fanout",
"exchangeType": "fanout",
Expand All @@ -180,7 +180,7 @@
"name": "",
"broker": "",
"prefetch": 0,
"maxAttempts": 5,
"reconnectOnError": false,
"noAck": true,
"exchangeName": "amq.fanout",
"exchangeType": "fanout",
Expand Down Expand Up @@ -269,7 +269,7 @@
"name": "",
"broker": "",
"prefetch": 0,
"maxAttempts": 5,
"reconnectOnError": false,
"noAck": true,
"exchangeName": "amq.direct",
"exchangeType": "direct",
Expand Down Expand Up @@ -301,7 +301,7 @@
"name": "",
"broker": "",
"prefetch": 0,
"maxAttempts": 5,
"reconnectOnError": false,
"noAck": true,
"exchangeName": "amq.headers",
"exchangeType": "headers",
Expand Down
2 changes: 1 addition & 1 deletion examples/AMQP Out With RPC Pattern.json
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
"name": "Listening for RPC",
"broker": "3efe01c6.16a3de",
"prefetch": 0,
"maxAttempts": 5,
"reconnectOnError": false,
"noAck": true,
"exchangeName": "amq.direct",
"exchangeType": "direct",
Expand Down
2 changes: 1 addition & 1 deletion examples/Manual Ack With Complete.json
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
"name": "amqp-in-manual-ack",
"broker": "",
"prefetch": "0",
"maxAttempts": 5,
"reconnectOnError": false,
"exchangeName": "amq.topic",
"exchangeType": "topic",
"exchangeRoutingKey": "acked.with.complete",
Expand Down
2 changes: 1 addition & 1 deletion examples/Manual Ack With Links.json
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
"name": "amqp-in-manual-ack",
"broker": "",
"prefetch": 0,
"maxAttempts": 5,
"reconnectOnError": false,
"exchangeName": "amq.topic",
"exchangeType": "topic",
"exchangeRoutingKey": "acked.with.link",
Expand Down
2 changes: 1 addition & 1 deletion examples/Manual NAck With Links.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
"name": "",
"broker": "",
"prefetch": "1",
"maxAttempts": 5,
"reconnectOnError": false,
"noAck": false,
"exchangeName": "",
"exchangeType": "direct",
Expand Down
7 changes: 4 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
{
"name": "@stormpass/node-red-contrib-amqp",
"license": "ISC",
"version": "1.3.0",
"description": "Better Amqp nodes for node-red",
"version": "1.4.0",
"description": "Amqp nodes for node-red",
"main": "index.js",
"scripts": {
"start": "nodemon --exec npm run build",
Expand Down Expand Up @@ -37,7 +37,8 @@
},
"contributors": [
"Amo DelBello <[email protected]>",
"Stormpass <[email protected]>"
"Stormpass <[email protected]>",
"Corentin <[email protected]>"
],
"bugs": "https://github.com/stormpass/node-red-contrib-amqp/issues",
"devDependencies": {
Expand Down
3 changes: 1 addition & 2 deletions src/Amqp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ export default class Amqp {
name: config.name,
broker: config.broker,
prefetch: config.prefetch,
maxAttempts: config.maxAttempts,
reconnectOnError: config.reconnectOnError,
noAck: config.noAck,
exchange: {
name: config.exchangeName,
Expand Down Expand Up @@ -75,7 +75,6 @@ export default class Amqp {
this.connection.on('error', (e): void => {
// Set node to disconnected status
this.node.status(NODE_STATUS.Disconnected)
this.node.error(`AMQP Connection Error ${e}`, { payload: { error: e, source: 'Amqp' } })
})

/* istanbul ignore next */
Expand Down
10 changes: 7 additions & 3 deletions src/nodes/amqp-in-manual-ack.html
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
name: { value: '' },
broker: { value: '', type: 'amqp-broker' },
prefetch: { value: 0 },
maxAttempts: { value: 5 },
reconnectOnError: { value: false },
noAck: { value: false },
exchangeName: { value: '' },
exchangeType: { value: 'topic' },
Expand Down Expand Up @@ -78,8 +78,9 @@
<input type="text" id="node-input-prefetch" placeholder="0 for no prefetch restriction">
</div>
<div class="form-row">
<label for="node-input-maxAttempts"><i class="fa fa-filter"></i>&nbsp;&nbsp;Max Attempts</label>
<input type="text" id="node-input-maxAttempts" placeholder="0 for no maxAttempts restriction">
<label>&nbsp;&nbsp;</label>
<input id="node-input-reconnectOnError" style="width:20px; vertical-align:baseline; margin-right:5px;" type="checkbox">
<label for="node-input-reconnectOnError" style="width:auto; margin-top:7px;">reconnectOnError</label>
</div>
<hr>
<h4>Exchange Info</h4>
Expand Down Expand Up @@ -179,6 +180,9 @@ <h3>Inputs</h3>
</dt>
<dd><code>requeue</code> option for nacking. If true will nack and requeue the message. If false will nack and dead letter. Defaults to false.</dd>
</dl>
<p>
<code>msg.payload</code> control amqp-broker reconnect by send {"reconnectCall": true},
</p>

<h3>Outputs</h3>
<dl class="message-properties">
Expand Down
167 changes: 93 additions & 74 deletions src/nodes/amqp-in-manual-ack.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
import { NodeRedApp, EditorNodeProperties } from 'node-red'
import { NODE_STATUS } from '../constants'
import { ErrorType, NodeType, ManualAckType, AmqpOutNodeDefaults, AmqpInNodeDefaults } from '../types'
import { ErrorType, NodeType, ManualAckType, AmqpOutNodeDefaults, AmqpInNodeDefaults, ErrorLocationEnum } from '../types'
import Amqp from '../Amqp'

module.exports = function (RED: NodeRedApp): void {
function AmqpInManualAck(config: EditorNodeProperties): void {
let reconnectTimeout: NodeJS.Timeout
let reconnect = null;
let connection = null;
let channel = null;

RED.events.once('flows:stopped', () => {
clearTimeout(reconnectTimeout)
})
Expand All @@ -15,109 +19,124 @@ module.exports = function (RED: NodeRedApp): void {
RED.nodes.createNode(this, config)
this.status(NODE_STATUS.Disconnected)

const confgiAmqp: AmqpInNodeDefaults & AmqpOutNodeDefaults = config;

const amqp = new Amqp(RED, this, confgiAmqp)

const maxAttempts = confgiAmqp.maxAttempts;
let totalAttempts = 0;

;(async function initializeNode(self): Promise<void> {
const reconnect = () =>
new Promise<void>(resolve => {
if(maxAttempts === 0 || totalAttempts < maxAttempts) {
reconnectTimeout = setTimeout(async () => {
try {
await initializeNode(self)
resolve()
} catch (e) {
await reconnect()
}
}, 2000)
} else {
self.warn(`Max connection attempts reached (${maxAttempts}). No more connection will be tried.`)
const configAmqp: AmqpInNodeDefaults & AmqpOutNodeDefaults = config;

const amqp = new Amqp(RED, this, configAmqp)

const reconnectOnError = configAmqp.reconnectOnError;

const inputListener = async (msg, _, done) => {
// handle manualAck
if (msg.manualAck) {
const ackMode = msg.manualAck.ackMode

switch (ackMode) {
case ManualAckType.AckAll:
amqp.ackAll()
break
case ManualAckType.Nack:
amqp.nack(msg)
break
case ManualAckType.NackAll:
amqp.nackAll(msg)
break
case ManualAckType.Reject:
amqp.reject(msg)
break
case ManualAckType.Ack:
default:
amqp.ack(msg)
break
}
} else {
amqp.ack(msg)
}
// handle manual reconnect
if (msg.payload && msg.payload.reconnectCall && typeof reconnect === 'function') {
await reconnect()
done && done()
} else {
done && done()
}
}
// receive input reconnectCall
this.on('input', inputListener)
// When the server goes down
this.on('close', async (done: () => void): Promise<void> => {
await amqp.close()
done && done()
})

async function initializeNode(nodeIns) {
reconnect = async () => {
// check the channel and clear all the event listener
if (channel && channel.removeAllListeners) {
channel.removeAllListeners()
channel.close();
channel = null;
}

// check the connection and clear all the event listener
if (connection && connection.removeAllListeners) {
connection.removeAllListeners()
connection.close();
connection = null;
}

// always clear timer before set it;
clearTimeout(reconnectTimeout);
reconnectTimeout = setTimeout(() => {
try {
initializeNode(nodeIns)
} catch (e) {
reconnect()
}
})
}, 2000)
}


try {
totalAttempts++;
if(maxAttempts === 0) {
self.log(`AMQP Connection attempt ${totalAttempts}`);
} else {
self.log(`AMQP Connection attempt ${totalAttempts} on ${maxAttempts}`);
}
const connection = await amqp.connect()

// istanbul ignore else
if (connection) {
const channel = await amqp.initialize()
await amqp.consume()

self.on('input', async (msg, send, done) => {
if (msg.manualAck) {
const ackMode = msg.manualAck.ackMode

switch (ackMode) {
case ManualAckType.AckAll:
amqp.ackAll()
break
case ManualAckType.Nack:
amqp.nack(msg)
break
case ManualAckType.NackAll:
amqp.nackAll(msg)
break
case ManualAckType.Reject:
amqp.reject(msg)
break
case ManualAckType.Ack:
default:
amqp.ack(msg)
break
}
} else {
amqp.ack(msg)
}

/* istanbul ignore else */
done && done()
})

// When the server goes down
self.once('close', async (done: () => void): Promise<void> => {
await amqp.close()
done && done()
})

// When the connection goes down
connection.on('close', async e => {
e && (await reconnect())
})

// When the connection goes down
connection.on('error', async e => {
e && (await reconnect())
e && reconnectOnError && (await reconnect())
nodeIns.error(`Connection error ${e}`, { payload: { error: e, location: ErrorLocationEnum.ConnectionErrorEvent } })
})

// When the channel goes down
channel.on('error', async (e) => {
e && (await reconnect())
e && reconnectOnError && (await reconnect())
nodeIns.error(`Channel error ${e}`, { payload: { error: e, location: ErrorLocationEnum.ChannelErrorEvent } })
})

self.status(NODE_STATUS.Connected)
nodeIns.status(NODE_STATUS.Connected)
}
} catch (e) {
if (e.code === ErrorType.ConnectionRefused || e.isOperational) {
await reconnect()
reconnectOnError && (await reconnect())
} else if (e.code === ErrorType.InvalidLogin) {
self.status(NODE_STATUS.Invalid)
self.error(`AmqpInManualAck() Could not connect to broker ${e}`)
nodeIns.status(NODE_STATUS.Invalid)
nodeIns.error(`AmqpInManualAck() Could not connect to broker ${e}`, { payload: { error: e, location: ErrorLocationEnum.ConnectError } })
} else {
self.status(NODE_STATUS.Error)
self.error(`AmqpInManualAck() ${e}`)
nodeIns.status(NODE_STATUS.Error)
nodeIns.error(`AmqpInManualAck() ${e}`, { payload: { error: e, location: ErrorLocationEnum.ConnectError } })
}
}
})(this)
}

// call
initializeNode(this);
}
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
// @ts-ignore
Expand Down
Loading

0 comments on commit 3253cdb

Please sign in to comment.