Skip to content

Commit

Permalink
Improve error handling, prevent infinite retries for permanent failures.
Browse files Browse the repository at this point in the history
Pubsub and EventArc will retry delivery when the service returns a failure status, so use status 200 for all non-recoverable errors to prevent fail/retry loops. This also handles duplicates due to the at-least-once delivery of pubsub.
  • Loading branch information
nielm committed Nov 15, 2021
1 parent 6dcae6d commit 2c5c584
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 29 deletions.
3 changes: 2 additions & 1 deletion cloudrun-malware-scanner/bunyanStdoutJsonLog.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ const {Writable} = require('stream');
const process = require('process');

/**
* @fileoverview This class provides support for streaming your Bunyan logs to
* This class provides support for streaming your Bunyan logs to
* stdout using structured logging for Google Cloud Logging.
*
* @class
Expand Down Expand Up @@ -79,6 +79,7 @@ class BunyanStdoutJsonLog extends Writable {
record.severity = BUNYAN_TO_STACKDRIVER.get(Number(record.level));
return JSON.stringify(record) + '\n';
}
// noinspection JSCheckFunctionSignatures

/**
* Write the log record.
Expand Down
2 changes: 1 addition & 1 deletion cloudrun-malware-scanner/logger.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
// limitations under the License.

/**
* @fileoverview Common logging settings.
* Create a Bunyan Logger using structured logging to stdout.
*/

const {createLogger} = require('bunyan');
Expand Down
68 changes: 41 additions & 27 deletions cloudrun-malware-scanner/server.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2019 Google LLC
* Copyright 2021 Google LLC
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -63,27 +63,25 @@ const storage = new Storage();
* @param {!Response} res The HTTP response object
*/
app.post('/', async (req, res) => {
logger.debug('Request body: %o', req.body);

// Sanity check required values.
if (!req.body || req.body.kind !== 'storage#object') {
handleErrorResponse(res, 400, `${req.body} is not a GCS Storage Object`);
handleErrorResponse(res, 200, `${req.body} is not a GCS Storage Object`);
return;
}

const file = req.body;
try {
if (!file.name) {
handleErrorResponse(res, 400, `file name not specified in ${file}`);
handleErrorResponse(res, 200, `file name not specified in ${file}`);
return;
}
if (!file.bucket) {
handleErrorResponse(res, 400, `bucket name not specified in ${file}`);
handleErrorResponse(res, 200, `bucket name not specified in ${file}`);
return;
}
if (file.size > MAX_FILE_SIZE) {
handleErrorResponse(
res, 413,
res, 200,
`file gs://${file.bucket}/${file.name} too large for scanning at ${
file.size} bytes`,
file.bucket);
Expand All @@ -98,7 +96,9 @@ app.post('/', async (req, res) => {
}

const gcsFile = storage.bucket(file.bucket).file(file.name);
if (! (await gcsFile.exists())) {
// File.exists() returns a FileExistsResponse, which is a list with a
// single value.
if (! (await gcsFile.exists())[0]) {
handleErrorResponse(res, 200,
`File: gs://${file.bucket}/${file.name} does not exist`);
return;
Expand All @@ -113,50 +113,59 @@ app.post('/', async (req, res) => {
try {
result = await scanner.scanStream(readStream);
} finally {
// Ensure stream is destroyed in all situations to prevent any
// resource leaks.
readStream.destroy();
}
const scanDuration = Date.now() - startTime;

if (clamd.isCleanReply(result)) {
// Move document to the bucket that holds clean documents
await moveProcessedFile(file.name, true, config);

// Log scan outcome
logger.info(`Scan status for gs://${file.bucket}/${file.name}: CLEAN (${
file.size} bytes in ${scanDuration} ms)`);

// Respond to API client
res.json({status: 'clean', clam_version: clamdVersion});
metrics.writeScanClean(config.unscanned, config.clean, file.size,
scanDuration, clamdVersion);
} else {
// Move document to the bucket that holds infected documents
await moveProcessedFile(file.name, false, config);

// Log scan outcome for document
// Move document to the bucket that holds clean documents. This can
// fail due to permissions or if the file has been deleted.
await moveProcessedFile(file.name, true, config);

// Respond to API client.
res.json({status: 'clean', clam_version: clamdVersion});
} else {
logger.warn(`Scan status for gs://${file.bucket}/${
file.name}: INFECTED ${result} (${
file.size} bytes in ${scanDuration} ms)`);
metrics.writeScanInfected(config.unscanned, config.quarantined, file.size,
scanDuration, clamdVersion);

// Move document to the bucket that holds infected documents. This can
// fail due to permissions or if the file has been deleted.
await moveProcessedFile(file.name, false, config);

// Respond to API client
// Respond to API client.
res.json({
message: result,
status: 'infected',
result: result,
clam_version: clamdVersion,
});
metrics.writeScanInfected(config.unscanned, config.quarantined, file.size,
scanDuration, clamdVersion);
}
} catch (e) {
logger.error(
{err: e},
`Exception when processing gs://${file.bucket}/${file.name}: %s`,
e.message);
let statusCode = 500;
if ((e instanceof ApiError) && e.code) {
statusCode = e.code; // ApiError codes match HTTP status codes

// A 500 will cause Pubsub/EventArc to retry the event.
let statusCode=500;

if ((e instanceof ApiError) && [403, 404].includes(e.code) ) {
// Permission denied/file not found can be raised by the stream reading
// and by the object move. They cannot be retried, so respond
// with success, but log the error.
statusCode=200;
}

handleErrorResponse(res, statusCode,
`gs://${file.bucket}/${file.name}: ${e.message}`,
file.bucket);
Expand Down Expand Up @@ -187,13 +196,17 @@ ${pkgJson.description}

/**
* Respond with an error and log the message
*
* Note: any non-successful status codes will cause the caller (PubSub/Eventarc)
* to retry sending the event, so use 200 for non-retryable errors.
*
* @param {Object} res response object
* @param {number} statusCode
* @param {string} errorMessage
* @param {string=} unscannedBucket
*/
function handleErrorResponse(res, statusCode, errorMessage,
unscannedBucket = null) {
unscannedBucket = /** @type {string} */ null) {
logger.error(`Error processing request: ${errorMessage}`);
res.status(statusCode).json({message: errorMessage, status: 'error'});
metrics.writeScanFailed(unscannedBucket);
Expand Down Expand Up @@ -271,6 +284,7 @@ async function readAndVerifyConfig() {
`Error in bucket config #${x}: no "${bucket}" bucket defined`);
success = false;
}
// Check for bucket existence
if (!(await storage.bucket(BUCKET_CONFIG.buckets[x][bucket]).exists())) {
logger.fatal(`Error in bucket config[${x}]: "${bucket}" bucket: ${
BUCKET_CONFIG.buckets[x][bucket]} does not exist`);
Expand All @@ -289,7 +303,7 @@ async function readAndVerifyConfig() {
async function run() {
let projectId = process.env.PROJECT_ID;
if (!projectId) {
// get default project ID from GoogleAuth
// Metrics needs project ID, so get it from GoogleAuth
projectId = await (new GoogleAuth().getProjectId());
}
await metrics.init(projectId);
Expand Down

0 comments on commit 2c5c584

Please sign in to comment.