Skip to content

Commit

Permalink
Removed additional error type
Browse files Browse the repository at this point in the history
Signed-off-by: Fraser Benjamin <[email protected]>
  • Loading branch information
fraserbenjamin committed Apr 6, 2024
1 parent 275fbce commit 9ecf875
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 34 deletions.
16 changes: 4 additions & 12 deletions src/consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ export class Consumer extends TypedEventEmitter {
private visibilityTimeout: number;
private terminateVisibilityTimeout: boolean;
private waitTimeSeconds: number;
private authenticationErrorTimeout: number;
private connectionErrorTimeout: number;
private pollingWaitTimeMs: number;
private pollingCompleteWaitTimeMs: number;
Expand All @@ -77,9 +76,8 @@ export class Consumer extends TypedEventEmitter {
options.terminateVisibilityTimeout || false;
this.heartbeatInterval = options.heartbeatInterval;
this.waitTimeSeconds = options.waitTimeSeconds ?? 20;
this.authenticationErrorTimeout =
options.authenticationErrorTimeout ?? 10000;
this.connectionErrorTimeout = options.connectionErrorTimeout ?? 10000;
this.connectionErrorTimeout =
options.connectionErrorTimeout ?? 10000;
this.pollingWaitTimeMs = options.pollingWaitTimeMs ?? 0;
this.pollingCompleteWaitTimeMs = options.pollingCompleteWaitTimeMs ?? 0;
this.shouldDeleteMessages = options.shouldDeleteMessages ?? true;
Expand Down Expand Up @@ -251,20 +249,14 @@ export class Consumer extends TypedEventEmitter {
)
.catch((err): void => {
this.emitError(err);

if (isConnectionError(err)) {
logger.debug("authentication_error", {
detail:
"There was an authentication error. Pausing before retrying.",
});
currentPollingTimeout = this.authenticationErrorTimeout;
} else {
logger.debug("connection_error", {
detail:
"There was a connection error. Pausing before retrying.",
`${err.code}: There was an connection error. Pausing before retrying.`,
});
currentPollingTimeout = this.connectionErrorTimeout;
}
return;
})
.then((): void => {
if (this.pollingTimeoutId) {
Expand Down
4 changes: 3 additions & 1 deletion src/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,12 @@ function isConnectionError(err: Error): boolean {
if (err instanceof SQSError) {
return (
err.statusCode === 403 ||
err.name === "SQSError" ||
err.code === "CredentialsError" ||
err.code === "UnknownEndpoint" ||
err.code === "AWS.SimpleQueueService.NonExistentQueue" ||
err.code === "CredentialsProviderError"
err.code === "CredentialsProviderError" ||
err.code === "InvalidAddress"
);
}
return false;
Expand Down
41 changes: 20 additions & 21 deletions test/tests/consumer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import { logger } from "../../src/logger.js";

const sandbox = sinon.createSandbox();

const AUTHENTICATION_ERROR_TIMEOUT = 20;
const CONNECTION_ERROR_TIMEOUT = 20;
const POLLING_TIMEOUT = 100;
const QUEUE_URL = "some-queue-url";
Expand Down Expand Up @@ -87,7 +86,6 @@ describe("Consumer", () => {
region: REGION,
handleMessage,
sqs,
authenticationErrorTimeout: AUTHENTICATION_ERROR_TIMEOUT,
connectionErrorTimeout: CONNECTION_ERROR_TIMEOUT,
});
});
Expand Down Expand Up @@ -258,7 +256,7 @@ describe("Consumer", () => {
new Promise((resolve) => setTimeout(resolve, 1000)),
handleMessageTimeout,
sqs,
authenticationErrorTimeout: AUTHENTICATION_ERROR_TIMEOUT,
connectionErrorTimeout: CONNECTION_ERROR_TIMEOUT,
});

consumer.start();
Expand All @@ -283,7 +281,7 @@ describe("Consumer", () => {
throw new Error("unexpected parsing error");
},
sqs,
authenticationErrorTimeout: AUTHENTICATION_ERROR_TIMEOUT,
connectionErrorTimeout: CONNECTION_ERROR_TIMEOUT,
});

consumer.start();
Expand Down Expand Up @@ -317,7 +315,7 @@ describe("Consumer", () => {
throw new CustomError("unexpected parsing error");
},
sqs,
authenticationErrorTimeout: AUTHENTICATION_ERROR_TIMEOUT,
connectionErrorTimeout: CONNECTION_ERROR_TIMEOUT,
});

consumer.start();
Expand All @@ -341,7 +339,7 @@ describe("Consumer", () => {
throw customError;
},
sqs,
authenticationErrorTimeout: AUTHENTICATION_ERROR_TIMEOUT,
connectionErrorTimeout: CONNECTION_ERROR_TIMEOUT,
});

consumer.start();
Expand Down Expand Up @@ -420,7 +418,7 @@ describe("Consumer", () => {
consumer.on("error", errorListener);

consumer.start();
await clock.tickAsync(AUTHENTICATION_ERROR_TIMEOUT);
await clock.tickAsync(CONNECTION_ERROR_TIMEOUT);
consumer.stop();

sandbox.assert.calledTwice(errorListener);
Expand All @@ -440,7 +438,7 @@ describe("Consumer", () => {
consumer.on("error", errorListener);

consumer.start();
await clock.tickAsync(AUTHENTICATION_ERROR_TIMEOUT);
await clock.tickAsync(CONNECTION_ERROR_TIMEOUT);
consumer.stop();

sandbox.assert.calledTwice(errorListener);
Expand All @@ -459,7 +457,7 @@ describe("Consumer", () => {
consumer.on("error", errorListener);

consumer.start();
await clock.tickAsync(AUTHENTICATION_ERROR_TIMEOUT);
await clock.tickAsync(CONNECTION_ERROR_TIMEOUT);
consumer.stop();

sandbox.assert.calledTwice(errorListener);
Expand All @@ -471,6 +469,7 @@ describe("Consumer", () => {
it("waits before repolling when a connection error occurs", async () => {
const unknownEndpointErr = {
name: "SQSError",
code: "SQSError",
message:
"SQS receive message failed: getaddrinfo ENOTFOUND sqs.eu-west-1.amazonaws.com",
};
Expand Down Expand Up @@ -498,7 +497,7 @@ describe("Consumer", () => {
consumer.on("error", errorListener);

consumer.start();
await clock.tickAsync(AUTHENTICATION_ERROR_TIMEOUT);
await clock.tickAsync(CONNECTION_ERROR_TIMEOUT);
consumer.stop();

sandbox.assert.calledTwice(errorListener);
Expand All @@ -517,7 +516,7 @@ describe("Consumer", () => {
consumer.on("error", errorListener);

consumer.start();
await clock.tickAsync(AUTHENTICATION_ERROR_TIMEOUT);
await clock.tickAsync(CONNECTION_ERROR_TIMEOUT);
consumer.stop();

sandbox.assert.calledTwice(errorListener);
Expand All @@ -532,7 +531,7 @@ describe("Consumer", () => {
region: REGION,
handleMessage,
sqs,
authenticationErrorTimeout: AUTHENTICATION_ERROR_TIMEOUT,
connectionErrorTimeout: CONNECTION_ERROR_TIMEOUT,
pollingWaitTimeMs: POLLING_TIMEOUT,
});

Expand Down Expand Up @@ -582,7 +581,7 @@ describe("Consumer", () => {
region: REGION,
handleMessage,
sqs,
authenticationErrorTimeout: AUTHENTICATION_ERROR_TIMEOUT,
connectionErrorTimeout: CONNECTION_ERROR_TIMEOUT,
preReceiveMessageCallback: preReceiveMessageCallbackStub,
postReceiveMessageCallback: postReceiveMessageCallbackStub,
});
Expand Down Expand Up @@ -618,7 +617,7 @@ describe("Consumer", () => {
region: REGION,
handleMessage,
sqs,
authenticationErrorTimeout: AUTHENTICATION_ERROR_TIMEOUT,
connectionErrorTimeout: CONNECTION_ERROR_TIMEOUT,
shouldDeleteMessages: false,
});

Expand Down Expand Up @@ -723,7 +722,7 @@ describe("Consumer", () => {
AttributeNames: [],
MessageAttributeNames: ["attribute-1", "attribute-2"],
MaxNumberOfMessages: 3,
WaitTimeSeconds: AUTHENTICATION_ERROR_TIMEOUT,
WaitTimeSeconds: CONNECTION_ERROR_TIMEOUT,
VisibilityTimeout: undefined,
}),
);
Expand Down Expand Up @@ -767,7 +766,7 @@ describe("Consumer", () => {
AttributeNames: ["ApproximateReceiveCount"],
MessageAttributeNames: [],
MaxNumberOfMessages: 1,
WaitTimeSeconds: AUTHENTICATION_ERROR_TIMEOUT,
WaitTimeSeconds: CONNECTION_ERROR_TIMEOUT,
VisibilityTimeout: undefined,
}),
);
Expand Down Expand Up @@ -903,7 +902,7 @@ describe("Consumer", () => {
},
batchSize: 2,
sqs,
authenticationErrorTimeout: AUTHENTICATION_ERROR_TIMEOUT,
connectionErrorTimeout: CONNECTION_ERROR_TIMEOUT,
});

consumer.start();
Expand Down Expand Up @@ -939,7 +938,7 @@ describe("Consumer", () => {
},
batchSize: 2,
sqs,
authenticationErrorTimeout: AUTHENTICATION_ERROR_TIMEOUT,
connectionErrorTimeout: CONNECTION_ERROR_TIMEOUT,
});

consumer.start();
Expand All @@ -965,7 +964,7 @@ describe("Consumer", () => {
},
batchSize: 2,
sqs,
authenticationErrorTimeout: AUTHENTICATION_ERROR_TIMEOUT,
connectionErrorTimeout: CONNECTION_ERROR_TIMEOUT,
});

consumer.start();
Expand Down Expand Up @@ -1547,7 +1546,7 @@ describe("Consumer", () => {
handleMessage,
sqs,
pollingCompleteWaitTimeMs: 5000,
authenticationErrorTimeout: AUTHENTICATION_ERROR_TIMEOUT,
connectionErrorTimeout: CONNECTION_ERROR_TIMEOUT,
});

consumer.on("stopped", handleStop);
Expand Down Expand Up @@ -1601,7 +1600,7 @@ describe("Consumer", () => {
handleMessage,
sqs,
pollingCompleteWaitTimeMs: 500,
authenticationErrorTimeout: AUTHENTICATION_ERROR_TIMEOUT,
connectionErrorTimeout: CONNECTION_ERROR_TIMEOUT,
});

consumer.on("stopped", handleStop);
Expand Down

0 comments on commit 9ecf875

Please sign in to comment.