Skip to content

Commit

Permalink
fix: removing bind and refactoring consumer (#478)
Browse files Browse the repository at this point in the history
* fix: Refactor consumer
- Add missing type declarations
- Remove autoBind usage
- Update passing in-class methods to intervals using arrow functions

* fix: Fix calling processMessage in handleSqsResponse
  • Loading branch information
mogu4iy authored Mar 21, 2024
1 parent 2e92e6f commit e7ef075
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 45 deletions.
22 changes: 0 additions & 22 deletions src/bind.ts

This file was deleted.

53 changes: 30 additions & 23 deletions src/consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import {

import { ConsumerOptions, StopOptions, UpdatableOptions } from "./types.js";
import { TypedEventEmitter } from "./emitter.js";
import { autoBind } from "./bind.js";
import {
SQSError,
TimeoutError,
Expand Down Expand Up @@ -89,7 +88,6 @@ export class Consumer extends TypedEventEmitter {
useQueueUrlAsEndpoint: options.useQueueUrlAsEndpoint ?? true,
region: options.region || process.env.AWS_REGION || "eu-west-1",
});
autoBind(this);
}

/**
Expand Down Expand Up @@ -161,7 +159,7 @@ export class Consumer extends TypedEventEmitter {
return;
}

const exceededTimeout =
const exceededTimeout: boolean =
Date.now() - this.stopRequestedAtTimestamp >
this.pollingCompleteWaitTimeMs;
if (exceededTimeout) {
Expand All @@ -171,7 +169,7 @@ export class Consumer extends TypedEventEmitter {
}

this.emit("waiting_for_polling_to_complete");
setTimeout(this.waitForPollingToComplete, 1000);
setTimeout(() => this.waitForPollingToComplete(), 1000);
}

/**
Expand All @@ -196,7 +194,7 @@ export class Consumer extends TypedEventEmitter {
public updateOption(
option: UpdatableOptions,
value: ConsumerOptions[UpdatableOptions],
) {
): void {
validateOption(option, value, this, true);

this[option] = value;
Expand Down Expand Up @@ -237,7 +235,7 @@ export class Consumer extends TypedEventEmitter {

this.isPolling = true;

let currentPollingTimeout = this.pollingWaitTimeMs;
let currentPollingTimeout: number = this.pollingWaitTimeMs;
this.receiveMessage({
QueueUrl: this.queueUrl,
AttributeNames: this.attributeNames,
Expand All @@ -246,8 +244,10 @@ export class Consumer extends TypedEventEmitter {
WaitTimeSeconds: this.waitTimeSeconds,
VisibilityTimeout: this.visibilityTimeout,
})
.then(this.handleSqsResponse)
.catch((err) => {
.then((output: ReceiveMessageCommandOutput) =>
this.handleSqsResponse(output),
)
.catch((err): void => {
this.emitError(err);
if (isConnectionError(err)) {
logger.debug("authentication_error", {
Expand All @@ -258,16 +258,19 @@ export class Consumer extends TypedEventEmitter {
}
return;
})
.then(() => {
.then((): void => {
if (this.pollingTimeoutId) {
clearTimeout(this.pollingTimeoutId);
}
this.pollingTimeoutId = setTimeout(this.poll, currentPollingTimeout);
this.pollingTimeoutId = setTimeout(
() => this.poll(),
currentPollingTimeout,
);
})
.catch((err) => {
.catch((err): void => {
this.emitError(err);
})
.finally(() => {
.finally((): void => {
this.isPolling = false;
});
}
Expand All @@ -283,7 +286,7 @@ export class Consumer extends TypedEventEmitter {
if (this.preReceiveMessageCallback) {
await this.preReceiveMessageCallback();
}
const result = await this.sqs.send(
const result: ReceiveMessageCommandOutput = await this.sqs.send(
new ReceiveMessageCommand(params),
this.sqsSendOptions,
);
Expand All @@ -309,7 +312,11 @@ export class Consumer extends TypedEventEmitter {
if (this.handleMessageBatch) {
await this.processMessageBatch(response.Messages);
} else {
await Promise.all(response.Messages.map(this.processMessage));
await Promise.all(
response.Messages.map((message: Message) =>
this.processMessage(message),
),
);
}

this.emit("response_processed");
Expand All @@ -333,7 +340,7 @@ export class Consumer extends TypedEventEmitter {
heartbeatTimeoutId = this.startHeartbeat(message);
}

const ackedMessage = await this.executeHandler(message);
const ackedMessage: Message = await this.executeHandler(message);

if (ackedMessage?.MessageId === message.MessageId) {
await this.deleteMessage(message);
Expand Down Expand Up @@ -361,20 +368,20 @@ export class Consumer extends TypedEventEmitter {
let heartbeatTimeoutId: NodeJS.Timeout | undefined = undefined;

try {
messages.forEach((message) => {
messages.forEach((message: Message): void => {
this.emit("message_received", message);
});

if (this.heartbeatInterval) {
heartbeatTimeoutId = this.startHeartbeat(null, messages);
}

const ackedMessages = await this.executeBatchHandler(messages);
const ackedMessages: Message[] = await this.executeBatchHandler(messages);

if (ackedMessages?.length > 0) {
await this.deleteMessageBatch(ackedMessages);

ackedMessages.forEach((message) => {
ackedMessages.forEach((message: Message): void => {
this.emit("message_processed", message);
});
}
Expand Down Expand Up @@ -448,7 +455,7 @@ export class Consumer extends TypedEventEmitter {
): Promise<ChangeMessageVisibilityBatchCommandOutput> {
const params: ChangeMessageVisibilityBatchCommandInput = {
QueueUrl: this.queueUrl,
Entries: messages.map((message) => ({
Entries: messages.map((message: Message) => ({
Id: message.MessageId,
ReceiptHandle: message.ReceiptHandle,
VisibilityTimeout: timeout,
Expand Down Expand Up @@ -479,7 +486,7 @@ export class Consumer extends TypedEventEmitter {
let result;

if (this.handleMessageTimeout) {
const pending = new Promise((_, reject) => {
const pending: Promise<void> = new Promise((_, reject): void => {
handleMessageTimeoutId = setTimeout((): void => {
reject(new TimeoutError());
}, this.handleMessageTimeout);
Expand Down Expand Up @@ -518,7 +525,7 @@ export class Consumer extends TypedEventEmitter {
*/
private async executeBatchHandler(messages: Message[]): Promise<Message[]> {
try {
const result = await this.handleMessageBatch(messages);
const result: void | Message[] = await this.handleMessageBatch(messages);

return !this.alwaysAcknowledge && result instanceof Object
? result
Expand Down Expand Up @@ -576,12 +583,12 @@ export class Consumer extends TypedEventEmitter {
return;
}
logger.debug("deleting_messages", {
messageIds: messages.map((msg) => msg.MessageId),
messageIds: messages.map((msg: Message) => msg.MessageId),
});

const deleteParams: DeleteMessageBatchCommandInput = {
QueueUrl: this.queueUrl,
Entries: messages.map((message) => ({
Entries: messages.map((message: Message) => ({
Id: message.MessageId,
ReceiptHandle: message.ReceiptHandle,
})),
Expand Down

0 comments on commit e7ef075

Please sign in to comment.