Skip to content

Commit

Permalink
feat: add passive option to queueFsm
Browse files Browse the repository at this point in the history
allows for checkQueue instead of assertQueue when clients have restricted permissions

fixes issue arobson#183
  • Loading branch information
zlintz committed Dec 5, 2018
1 parent 80b63c0 commit c7bae6b
Show file tree
Hide file tree
Showing 4 changed files with 121 additions and 8 deletions.
56 changes: 56 additions & 0 deletions spec/behavior/queue.spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
require('../setup.js');
var ampqQueue = require('../../src/amqp/queue');

describe('AMQP Queue', function () {
let amqpChannelMock, options, topology, serializers;

beforeEach(() => {
amqpChannelMock = {
ack: sinon.stub().callsFake(() => Promise.resolve()),
nack: sinon.stub().callsFake(() => Promise.resolve()),
checkQueue: sinon.stub().callsFake(() => Promise.resolve()),
assertQueue: sinon.stub().callsFake(() => Promise.resolve())
};

options = {
uniqueName: 'one-unique-name-coming-up'
};

topology = {
connection: {
getChannel: sinon.stub().callsFake(() => Promise.resolve(amqpChannelMock))
}
};

serializers = sinon.stub();
});

describe('when executing "define"', () => {
describe('when options.passive is not set', () => {
it('calls assertQueue', function () {
return ampqQueue(options, topology, serializers)
.then((instance) => {
return instance.define();
})
.then(() => {
amqpChannelMock.checkQueue.calledOnce.should.equal(false);
amqpChannelMock.assertQueue.calledOnce.should.equal(true);
});
});
});

describe('when options.passive is true', function () {
it('calls checkQueue instead of assertQueue', async () => {
options.passive = true;
return ampqQueue(options, topology, serializers)
.then((instance) => {
return instance.define();
})
.then(() => {
amqpChannelMock.checkQueue.calledOnce.should.equal(true);
amqpChannelMock.assertQueue.calledOnce.should.equal(false);
});
});
});
});
});
48 changes: 48 additions & 0 deletions spec/integration/addPassiveQueue.spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
require('../setup');
const rabbit = require('../../src/index.js');

describe('Adding Queues', function () {
describe('with a connection', function () {
describe('when the queue does not already exist', function () {
it('should error on addQueue in passive mode', function () {
return rabbit.configure({
connection: {
name: 'temp'
}
}).then(() => {
return rabbit.addQueue('no-queue-here', { passive: true }, 'temp')
.then(
() => { throw new Error('Should not have succeeded in the checkQueue call'); },
(err) => {
err.toString().should.contain("Failed to create queue 'no-queue-here' on connection 'temp' with 'Error: Operation failed: QueueDeclare; 404 (NOT-FOUND)");
});
});
});
});

describe('when the queue does exist', function () {
const existingQueueName = 'totes-exists-already';
it('should NOT error on addQueue when in passive mode', function () {
return rabbit.configure({
connection: {
name: 'temp'
},
queues: [
{ name: existingQueueName, connection: 'temp' }
]
}).then(() => {
return rabbit.addQueue(existingQueueName, { passive: true }, 'temp');
});
});

afterEach(function () {
return rabbit.deleteQueue(existingQueueName, 'temp');
});
});

afterEach(function () {
rabbit.reset();
return rabbit.shutdown('temp');
});
});
});
24 changes: 16 additions & 8 deletions src/amqp/queue.js
Original file line number Diff line number Diff line change
Expand Up @@ -45,16 +45,24 @@ function define (channel, options, subscriber, connectionName) {
deadletter: 'deadLetterExchange',
deadLetter: 'deadLetterExchange',
deadLetterRoutingKey: 'deadLetterRoutingKey'
}, 'subscribe', 'limit', 'noBatch', 'unique');
}, 'subscribe', 'limit', 'noBatch', 'unique', 'passive');
topLog.info("Declaring queue '%s' on connection '%s' with the options: %s",
options.uniqueName, connectionName, JSON.stringify(options));
return channel.assertQueue(options.uniqueName, valid)
.then(function (q) {
if (options.limit) {
channel.prefetch(options.limit);
}
return q;
});

let queuePromise;

if (options.passive) {
queuePromise = channel.checkQueue(options.uniqueName);
} else {
queuePromise = channel.assertQueue(options.uniqueName, valid);
}

return queuePromise.then(function (q) {
if (options.limit) {
channel.prefetch(options.limit);
}
return q;
});
}

function finalize (channel, messages) {
Expand Down
1 change: 1 addition & 0 deletions src/queueFsm.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ var Factory = function (options, connection, topology, serializers, queueFn) {
var Fsm = machina.Fsm.extend({
name: options.name,
uniqueName: options.uniqueName,
passive: options.passive,
responseSubscriptions: {},
signalSubscription: undefined,
subscribed: false,
Expand Down

0 comments on commit c7bae6b

Please sign in to comment.