diff --git a/src/consumer.ts b/src/consumer.ts index c4bfc25..42f4436 100644 --- a/src/consumer.ts +++ b/src/consumer.ts @@ -251,6 +251,7 @@ export class Consumer extends TypedEventEmitter { this.emitError(err); if (isConnectionError(err)) { logger.debug("authentication_error", { + code: err.code || "Unknown", detail: "There was an authentication error. Pausing before retrying.", }); diff --git a/src/errors.ts b/src/errors.ts index 71a2580..29a5d14 100644 --- a/src/errors.ts +++ b/src/errors.ts @@ -36,19 +36,28 @@ class StandardError extends Error { } } +/** + * List of SQS error codes that are considered connection errors. + */ +const CONNECTION_ERRORS = [ + "CredentialsError", + "UnknownEndpoint", + "AWS.SimpleQueueService.NonExistentQueue", + "CredentialsProviderError", + "InvalidAddress", + "InvalidSecurity", + "QueueDoesNotExist", + "RequestThrottled", + "OverLimit", +]; + /** * Checks if the error provided should be treated as a connection error. * @param err The error that was received. */ function isConnectionError(err: Error): boolean { if (err instanceof SQSError) { - return ( - err.statusCode === 403 || - err.code === "CredentialsError" || - err.code === "UnknownEndpoint" || - err.code === "AWS.SimpleQueueService.NonExistentQueue" || - err.code === "CredentialsProviderError" - ); + return err.statusCode === 403 || CONNECTION_ERRORS.includes(err.code); } return false; } diff --git a/test/tests/consumer.test.ts b/test/tests/consumer.test.ts index ab5300a..0a2bcaf 100644 --- a/test/tests/consumer.test.ts +++ b/test/tests/consumer.test.ts @@ -409,6 +409,8 @@ describe("Consumer", () => { }); it("waits before repolling when a credentials error occurs", async () => { + const loggerDebug = sandbox.stub(logger, "debug"); + const credentialsErr = { name: "CredentialsError", message: "Missing credentials in config", @@ -424,9 +426,16 @@ describe("Consumer", () => { sandbox.assert.calledTwice(errorListener); sandbox.assert.calledWithMatch(sqs.send.firstCall, mockReceiveMessage); sandbox.assert.calledWithMatch(sqs.send.secondCall, mockReceiveMessage); + + sandbox.assert.calledWith(loggerDebug, "authentication_error", { + code: "CredentialsError", + detail: "There was an authentication error. Pausing before retrying.", + }); }); it("waits before repolling when a 403 error occurs", async () => { + const loggerDebug = sandbox.stub(logger, "debug"); + const invalidSignatureErr = { $metadata: { httpStatusCode: 403, @@ -444,9 +453,16 @@ describe("Consumer", () => { sandbox.assert.calledTwice(errorListener); sandbox.assert.calledWithMatch(sqs.send.firstCall, mockReceiveMessage); sandbox.assert.calledWithMatch(sqs.send.secondCall, mockReceiveMessage); + + sandbox.assert.calledWith(loggerDebug, "authentication_error", { + code: "Unknown", + detail: "There was an authentication error. Pausing before retrying.", + }); }); it("waits before repolling when a UnknownEndpoint error occurs", async () => { + const loggerDebug = sandbox.stub(logger, "debug"); + const unknownEndpointErr = { name: "UnknownEndpoint", message: @@ -464,9 +480,16 @@ describe("Consumer", () => { sandbox.assert.calledTwice(sqs.send); sandbox.assert.calledWithMatch(sqs.send.firstCall, mockReceiveMessage); sandbox.assert.calledWithMatch(sqs.send.secondCall, mockReceiveMessage); + + sandbox.assert.calledWith(loggerDebug, "authentication_error", { + code: "UnknownEndpoint", + detail: "There was an authentication error. Pausing before retrying.", + }); }); it("waits before repolling when a NonExistentQueue error occurs", async () => { + const loggerDebug = sandbox.stub(logger, "debug"); + const nonExistentQueueErr = { name: "AWS.SimpleQueueService.NonExistentQueue", message: "The specified queue does not exist for this wsdl version.", @@ -483,9 +506,16 @@ describe("Consumer", () => { sandbox.assert.calledTwice(sqs.send); sandbox.assert.calledWithMatch(sqs.send.firstCall, mockReceiveMessage); sandbox.assert.calledWithMatch(sqs.send.secondCall, mockReceiveMessage); + + sandbox.assert.calledWith(loggerDebug, "authentication_error", { + code: "AWS.SimpleQueueService.NonExistentQueue", + detail: "There was an authentication error. Pausing before retrying.", + }); }); it("waits before repolling when a CredentialsProviderError error occurs", async () => { + const loggerDebug = sandbox.stub(logger, "debug"); + const credentialsProviderErr = { name: "CredentialsProviderError", message: "Could not load credentials from any providers.", @@ -502,6 +532,141 @@ describe("Consumer", () => { sandbox.assert.calledTwice(sqs.send); sandbox.assert.calledWithMatch(sqs.send.firstCall, mockReceiveMessage); sandbox.assert.calledWithMatch(sqs.send.secondCall, mockReceiveMessage); + + sandbox.assert.calledWith(loggerDebug, "authentication_error", { + code: "CredentialsProviderError", + detail: "There was an authentication error. Pausing before retrying.", + }); + }); + + it("waits before repolling when a InvalidAddress error occurs", async () => { + const loggerDebug = sandbox.stub(logger, "debug"); + + const credentialsProviderErr = { + name: "InvalidAddress", + message: "The address some-queue-url is not valid for this endpoint.", + }; + sqs.send.withArgs(mockReceiveMessage).rejects(credentialsProviderErr); + const errorListener = sandbox.stub(); + consumer.on("error", errorListener); + + consumer.start(); + await clock.tickAsync(AUTHENTICATION_ERROR_TIMEOUT); + consumer.stop(); + + sandbox.assert.calledTwice(errorListener); + sandbox.assert.calledTwice(sqs.send); + sandbox.assert.calledWithMatch(sqs.send.firstCall, mockReceiveMessage); + sandbox.assert.calledWithMatch(sqs.send.secondCall, mockReceiveMessage); + + sandbox.assert.calledWith(loggerDebug, "authentication_error", { + code: "InvalidAddress", + detail: "There was an authentication error. Pausing before retrying.", + }); + }); + + it("waits before repolling when a InvalidSecurity error occurs", async () => { + const loggerDebug = sandbox.stub(logger, "debug"); + + const credentialsProviderErr = { + name: "InvalidSecurity", + message: "The queue is not is not HTTPS and SigV4.", + }; + sqs.send.withArgs(mockReceiveMessage).rejects(credentialsProviderErr); + const errorListener = sandbox.stub(); + consumer.on("error", errorListener); + + consumer.start(); + await clock.tickAsync(AUTHENTICATION_ERROR_TIMEOUT); + consumer.stop(); + + sandbox.assert.calledTwice(errorListener); + sandbox.assert.calledTwice(sqs.send); + sandbox.assert.calledWithMatch(sqs.send.firstCall, mockReceiveMessage); + sandbox.assert.calledWithMatch(sqs.send.secondCall, mockReceiveMessage); + + sandbox.assert.calledWith(loggerDebug, "authentication_error", { + code: "InvalidSecurity", + detail: "There was an authentication error. Pausing before retrying.", + }); + }); + + it("waits before repolling when a QueueDoesNotExist error occurs", async () => { + const loggerDebug = sandbox.stub(logger, "debug"); + + const credentialsProviderErr = { + name: "QueueDoesNotExist", + message: "The queue does not exist.", + }; + sqs.send.withArgs(mockReceiveMessage).rejects(credentialsProviderErr); + const errorListener = sandbox.stub(); + consumer.on("error", errorListener); + + consumer.start(); + await clock.tickAsync(AUTHENTICATION_ERROR_TIMEOUT); + consumer.stop(); + + sandbox.assert.calledTwice(errorListener); + sandbox.assert.calledTwice(sqs.send); + sandbox.assert.calledWithMatch(sqs.send.firstCall, mockReceiveMessage); + sandbox.assert.calledWithMatch(sqs.send.secondCall, mockReceiveMessage); + + sandbox.assert.calledWith(loggerDebug, "authentication_error", { + code: "QueueDoesNotExist", + detail: "There was an authentication error. Pausing before retrying.", + }); + }); + + it("waits before repolling when a RequestThrottled error occurs", async () => { + const loggerDebug = sandbox.stub(logger, "debug"); + + const credentialsProviderErr = { + name: "RequestThrottled", + message: "Requests have been throttled.", + }; + sqs.send.withArgs(mockReceiveMessage).rejects(credentialsProviderErr); + const errorListener = sandbox.stub(); + consumer.on("error", errorListener); + + consumer.start(); + await clock.tickAsync(AUTHENTICATION_ERROR_TIMEOUT); + consumer.stop(); + + sandbox.assert.calledTwice(errorListener); + sandbox.assert.calledTwice(sqs.send); + sandbox.assert.calledWithMatch(sqs.send.firstCall, mockReceiveMessage); + sandbox.assert.calledWithMatch(sqs.send.secondCall, mockReceiveMessage); + + sandbox.assert.calledWith(loggerDebug, "authentication_error", { + code: "RequestThrottled", + detail: "There was an authentication error. Pausing before retrying.", + }); + }); + + it("waits before repolling when a RequestThrottled error occurs", async () => { + const loggerDebug = sandbox.stub(logger, "debug"); + + const credentialsProviderErr = { + name: "OverLimit", + message: "An over limit error.", + }; + sqs.send.withArgs(mockReceiveMessage).rejects(credentialsProviderErr); + const errorListener = sandbox.stub(); + consumer.on("error", errorListener); + + consumer.start(); + await clock.tickAsync(AUTHENTICATION_ERROR_TIMEOUT); + consumer.stop(); + + sandbox.assert.calledTwice(errorListener); + sandbox.assert.calledTwice(sqs.send); + sandbox.assert.calledWithMatch(sqs.send.firstCall, mockReceiveMessage); + sandbox.assert.calledWithMatch(sqs.send.secondCall, mockReceiveMessage); + + sandbox.assert.calledWith(loggerDebug, "authentication_error", { + code: "OverLimit", + detail: "There was an authentication error. Pausing before retrying.", + }); }); it("waits before repolling when a polling timeout is set", async () => {