From 2c5c5845abb03fe481097eef66cbaca0c8a06e24 Mon Sep 17 00:00:00 2001 From: nielm Date: Thu, 4 Nov 2021 18:22:26 +0100 Subject: [PATCH] Improve error handling, prevent infinite retries for permanent failures. Pubsub and EventArc will retry delivery when the service returns a failure status, so use status 200 for all non-recoverable errors to prevent fail/retry loops. This also handles duplicates due to the at-least-once delivery of pubsub. --- .../bunyanStdoutJsonLog.js | 3 +- cloudrun-malware-scanner/logger.js | 2 +- cloudrun-malware-scanner/server.js | 68 +++++++++++-------- 3 files changed, 44 insertions(+), 29 deletions(-) diff --git a/cloudrun-malware-scanner/bunyanStdoutJsonLog.js b/cloudrun-malware-scanner/bunyanStdoutJsonLog.js index ed06300..b120cb0 100644 --- a/cloudrun-malware-scanner/bunyanStdoutJsonLog.js +++ b/cloudrun-malware-scanner/bunyanStdoutJsonLog.js @@ -18,7 +18,7 @@ const {Writable} = require('stream'); const process = require('process'); /** - * @fileoverview This class provides support for streaming your Bunyan logs to + * This class provides support for streaming your Bunyan logs to * stdout using structured logging for Google Cloud Logging. * * @class @@ -79,6 +79,7 @@ class BunyanStdoutJsonLog extends Writable { record.severity = BUNYAN_TO_STACKDRIVER.get(Number(record.level)); return JSON.stringify(record) + '\n'; } + // noinspection JSCheckFunctionSignatures /** * Write the log record. diff --git a/cloudrun-malware-scanner/logger.js b/cloudrun-malware-scanner/logger.js index 08cd125..1b1fab8 100644 --- a/cloudrun-malware-scanner/logger.js +++ b/cloudrun-malware-scanner/logger.js @@ -13,7 +13,7 @@ // limitations under the License. /** - * @fileoverview Common logging settings. + * Create a Bunyan Logger using structured logging to stdout. */ const {createLogger} = require('bunyan'); diff --git a/cloudrun-malware-scanner/server.js b/cloudrun-malware-scanner/server.js index 0d2d71d..3f40e7d 100644 --- a/cloudrun-malware-scanner/server.js +++ b/cloudrun-malware-scanner/server.js @@ -1,5 +1,5 @@ /* -* Copyright 2019 Google LLC +* Copyright 2021 Google LLC * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -63,27 +63,25 @@ const storage = new Storage(); * @param {!Response} res The HTTP response object */ app.post('/', async (req, res) => { - logger.debug('Request body: %o', req.body); - // Sanity check required values. if (!req.body || req.body.kind !== 'storage#object') { - handleErrorResponse(res, 400, `${req.body} is not a GCS Storage Object`); + handleErrorResponse(res, 200, `${req.body} is not a GCS Storage Object`); return; } const file = req.body; try { if (!file.name) { - handleErrorResponse(res, 400, `file name not specified in ${file}`); + handleErrorResponse(res, 200, `file name not specified in ${file}`); return; } if (!file.bucket) { - handleErrorResponse(res, 400, `bucket name not specified in ${file}`); + handleErrorResponse(res, 200, `bucket name not specified in ${file}`); return; } if (file.size > MAX_FILE_SIZE) { handleErrorResponse( - res, 413, + res, 200, `file gs://${file.bucket}/${file.name} too large for scanning at ${ file.size} bytes`, file.bucket); @@ -98,7 +96,9 @@ app.post('/', async (req, res) => { } const gcsFile = storage.bucket(file.bucket).file(file.name); - if (! (await gcsFile.exists())) { + // File.exists() returns a FileExistsResponse, which is a list with a + // single value. + if (! (await gcsFile.exists())[0]) { handleErrorResponse(res, 200, `File: gs://${file.bucket}/${file.name} does not exist`); return; @@ -113,50 +113,59 @@ app.post('/', async (req, res) => { try { result = await scanner.scanStream(readStream); } finally { + // Ensure stream is destroyed in all situations to prevent any + // resource leaks. readStream.destroy(); } const scanDuration = Date.now() - startTime; if (clamd.isCleanReply(result)) { - // Move document to the bucket that holds clean documents - await moveProcessedFile(file.name, true, config); - - // Log scan outcome logger.info(`Scan status for gs://${file.bucket}/${file.name}: CLEAN (${ file.size} bytes in ${scanDuration} ms)`); - - // Respond to API client - res.json({status: 'clean', clam_version: clamdVersion}); metrics.writeScanClean(config.unscanned, config.clean, file.size, scanDuration, clamdVersion); - } else { - // Move document to the bucket that holds infected documents - await moveProcessedFile(file.name, false, config); - // Log scan outcome for document + // Move document to the bucket that holds clean documents. This can + // fail due to permissions or if the file has been deleted. + await moveProcessedFile(file.name, true, config); + + // Respond to API client. + res.json({status: 'clean', clam_version: clamdVersion}); + } else { logger.warn(`Scan status for gs://${file.bucket}/${ file.name}: INFECTED ${result} (${ file.size} bytes in ${scanDuration} ms)`); + metrics.writeScanInfected(config.unscanned, config.quarantined, file.size, + scanDuration, clamdVersion); + + // Move document to the bucket that holds infected documents. This can + // fail due to permissions or if the file has been deleted. + await moveProcessedFile(file.name, false, config); - // Respond to API client + // Respond to API client. res.json({ message: result, status: 'infected', result: result, clam_version: clamdVersion, }); - metrics.writeScanInfected(config.unscanned, config.quarantined, file.size, - scanDuration, clamdVersion); } } catch (e) { logger.error( {err: e}, `Exception when processing gs://${file.bucket}/${file.name}: %s`, e.message); - let statusCode = 500; - if ((e instanceof ApiError) && e.code) { - statusCode = e.code; // ApiError codes match HTTP status codes + + // A 500 will cause Pubsub/EventArc to retry the event. + let statusCode=500; + + if ((e instanceof ApiError) && [403, 404].includes(e.code) ) { + // Permission denied/file not found can be raised by the stream reading + // and by the object move. They cannot be retried, so respond + // with success, but log the error. + statusCode=200; } + handleErrorResponse(res, statusCode, `gs://${file.bucket}/${file.name}: ${e.message}`, file.bucket); @@ -187,13 +196,17 @@ ${pkgJson.description} /** * Respond with an error and log the message + * + * Note: any non-successful status codes will cause the caller (PubSub/Eventarc) + * to retry sending the event, so use 200 for non-retryable errors. + * * @param {Object} res response object * @param {number} statusCode * @param {string} errorMessage * @param {string=} unscannedBucket */ function handleErrorResponse(res, statusCode, errorMessage, - unscannedBucket = null) { + unscannedBucket = /** @type {string} */ null) { logger.error(`Error processing request: ${errorMessage}`); res.status(statusCode).json({message: errorMessage, status: 'error'}); metrics.writeScanFailed(unscannedBucket); @@ -271,6 +284,7 @@ async function readAndVerifyConfig() { `Error in bucket config #${x}: no "${bucket}" bucket defined`); success = false; } + // Check for bucket existence if (!(await storage.bucket(BUCKET_CONFIG.buckets[x][bucket]).exists())) { logger.fatal(`Error in bucket config[${x}]: "${bucket}" bucket: ${ BUCKET_CONFIG.buckets[x][bucket]} does not exist`); @@ -289,7 +303,7 @@ async function readAndVerifyConfig() { async function run() { let projectId = process.env.PROJECT_ID; if (!projectId) { - // get default project ID from GoogleAuth + // Metrics needs project ID, so get it from GoogleAuth projectId = await (new GoogleAuth().getProjectId()); } await metrics.init(projectId);