Skip to content
This repository has been archived by the owner on Oct 18, 2024. It is now read-only.

Commit

Permalink
handle channel error and close event
Browse files Browse the repository at this point in the history
  • Loading branch information
Stormpass committed Mar 19, 2024
1 parent 3253cdb commit 5f46436
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 13 deletions.
6 changes: 4 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "@stormpass/node-red-contrib-amqp",
"license": "ISC",
"version": "1.4.0",
"version": "1.4.1",
"description": "Amqp nodes for node-red",
"main": "index.js",
"scripts": {
Expand All @@ -12,7 +12,9 @@
"lint": "eslint \"src/**\"",
"test": "npm run build && mocha",
"test:watch": "mocha -w",
"test:cov": "npm run build && nyc mocha"
"test:cov": "npm run build && nyc mocha",
"login": "npm login --registry https://registry.npmjs.com --auth-type=web",
"publish": "npm publish --registry https://registry.npmjs.com"
},
"keywords": [
"node-red",
Expand Down
9 changes: 7 additions & 2 deletions src/nodes/amqp-in.ts
Original file line number Diff line number Diff line change
Expand Up @@ -85,13 +85,18 @@ module.exports = function (RED: NodeRedApp): void {

// When the connection goes down
connection.on('error', async e => {
e && reconnectOnError && (await reconnect())
reconnectOnError && (await reconnect())
nodeIns.error(`Connection error ${e}`, { payload: { error: e, location: ErrorLocationEnum.ConnectionErrorEvent } })
})

// When the channel goes down
channel.on('close', async () => {
await reconnect()
})

// When the channel goes down
channel.on('error', async (e) => {
e && reconnectOnError && (await reconnect())
reconnectOnError && (await reconnect())
nodeIns.error(`Channel error ${e}`, { payload: { error: e, location: ErrorLocationEnum.ChannelErrorEvent } })
})

Expand Down
22 changes: 13 additions & 9 deletions src/nodes/amqp-out.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,6 @@ module.exports = function (RED: NodeRedApp): void {
amqpProperties,
} = config

// handle reconnect call
if (msg.payload && msg.payload.reconnectCall && typeof reconnect === 'function') {
await reconnect()
done && done()
return;
}

// message properties override config properties
let properties: MessageProperties
try {
Expand Down Expand Up @@ -142,7 +135,7 @@ module.exports = function (RED: NodeRedApp): void {

// istanbul ignore else
if (connection) {
await amqp.initialize()
channel = await amqp.initialize()

// When the server goes down
connection.on('close', async e => {
Expand All @@ -151,10 +144,21 @@ module.exports = function (RED: NodeRedApp): void {

// When the connection goes down
connection.on('error', async e => {
e && reconnectOnError && (await reconnect())
reconnectOnError && (await reconnect())
nodeIns.error(`Connection error ${e}`, { payload: { error: e, location: ErrorLocationEnum.ConnectionErrorEvent } })
})

// When the channel goes down
channel.on('close', async () => {
await reconnect()
})

// When the channel error occur
channel.on('error', async e => {
reconnectOnError && (await reconnect())
nodeIns.error(`Channel error ${e}`, { payload: { error: e, location: ErrorLocationEnum.ChannelErrorEvent } })
})

nodeIns.status(NODE_STATUS.Connected)
}
} catch (e) {
Expand Down

0 comments on commit 5f46436

Please sign in to comment.