Skip to content

Commit

Permalink
wip: adding a new status method
Browse files Browse the repository at this point in the history
  • Loading branch information
nicholasgriffintn committed Dec 31, 2023
1 parent 3768e93 commit 6d1d726
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 36 deletions.
26 changes: 18 additions & 8 deletions src/consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -153,11 +153,29 @@ export class Consumer extends TypedEventEmitter {
/**
* Returns the current polling state of the consumer: `true` if it is
* actively polling, `false` if it is not.
* @deprecated Use getStatus instead, will be removed in v10
*/
public get isRunning(): boolean {
return !this.stopped;
}

/**
* Returns the current status of the consumer, including whether it is
* actively running, the current polling status, and the number of
* concurrent executions.
*/
public get getStatus(): {
isRunning: boolean;
pollingStatus: POLLING_STATUS;
concurrentExecutions: number;
} {
return {
isRunning: this.isRunning,
pollingStatus: this.pollingStatus,
concurrentExecutions: this.concurrentExecutions
};
}

/**
* Validates and then updates the provided option to the provided value.
* @param option The option to validate and then update
Expand Down Expand Up @@ -306,12 +324,6 @@ export class Consumer extends TypedEventEmitter {
response: ReceiveMessageCommandOutput
): Promise<void> {
if (hasMessages(response)) {
const handlerProcessingDebugger = setInterval(() => {
logger.debug('handler_processing', {
detail: 'The handler is still processing the message(s)...'
});
}, 1000);

this.concurrentExecutions += 1;

if (this.handleMessageBatch) {
Expand All @@ -322,8 +334,6 @@ export class Consumer extends TypedEventEmitter {

this.concurrentExecutions -= 1;

clearInterval(handlerProcessingDebugger);

this.emit('response_processed');
} else if (response) {
this.emit('empty');
Expand Down
48 changes: 20 additions & 28 deletions test/tests/consumer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1508,6 +1508,26 @@ describe('Consumer', () => {
});
});

describe('getStatus', async () => {
it('returns the status of the consumer when it is stopped', () => {
assert.equal(consumer.getStatus(), {
isRunning: false,
pollingStatus: 'stopped',
concurrentExecutions: 0
});
});

it('returns the status of the consumer when it is running', () => {
consumer.start();
assert.equal(consumer.getStatus(), {
isRunning: true,
pollingStatus: 'running',
concurrentExecutions: 1
});
consumer.stop();
});
});

describe('updateOption', async () => {
it('updates the visibilityTimeout option and emits an event', () => {
const optionUpdatedListener = sandbox.stub();
Expand Down Expand Up @@ -1703,33 +1723,5 @@ describe('Consumer', () => {
sandbox.assert.calledWithMatch(loggerDebug, 'stopping');
sandbox.assert.calledWithMatch(loggerDebug, 'stopped');
});

it('logs a debug event while the handler is processing, for every second', async () => {
const loggerDebug = sandbox.stub(logger, 'debug');
const clearIntervalSpy = sinon.spy(global, 'clearInterval');

sqs.send.withArgs(mockReceiveMessage).resolves({
Messages: [
{ MessageId: '1', ReceiptHandle: 'receipt-handle-1', Body: 'body-1' }
]
});
consumer = new Consumer({
queueUrl: QUEUE_URL,
region: REGION,
handleMessage: () =>
new Promise((resolve) => setTimeout(resolve, 4000)),
sqs
});

consumer.start();
await Promise.all([clock.tickAsync(5000)]);
sandbox.assert.calledOnce(clearIntervalSpy);
consumer.stop();

sandbox.assert.callCount(loggerDebug, 15);
sandbox.assert.calledWith(loggerDebug, 'handler_processing', {
detail: 'The handler is still processing the message(s)...'
});
});
});
});

0 comments on commit 6d1d726

Please sign in to comment.