Skip to content

Commit

Permalink
Handle multiple buckets
Browse files Browse the repository at this point in the history
Add configuration handling for multiple source buckets.
  • Loading branch information
nielm committed Nov 15, 2021
1 parent c422943 commit 0a7628d
Show file tree
Hide file tree
Showing 3 changed files with 139 additions and 21 deletions.
19 changes: 19 additions & 0 deletions cloudrun-malware-scanner/config.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
{
"comments": [
"Configuration file for malware-scanner service.",
"----",
"To use the configuration in this file, specify the environmental variable:",
" 'CONFIG_FILE=./config.json'",
"",
"'buckets' is a list of objects specifying source/destination buckets for scanning, allowing the service to handle multiple buckets",
"Each object must have the 3 properties 'unscanned', 'clean' and 'quarantined', specifying the bucket names to use.",
""
],
"buckets": [
{
"unscanned": "unscanned-bucket-name",
"clean": "clean-bucket-name",
"quarantined": "quarantined-bucket-name"
}
]
}
3 changes: 2 additions & 1 deletion cloudrun-malware-scanner/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
"scripts": {
"start": "node server.js",
"test": "echo \"Error: no test specified\" && exit 1",
"eslint": "eslint *.js"
"eslint": "eslint *.js",
"eslint-fix": "eslint --fix *.js"
},
"author": "Google Inc.",
"license": "Apache-2.0",
Expand Down
138 changes: 118 additions & 20 deletions cloudrun-malware-scanner/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,31 @@ const pkgJson = require('./package.json');
const metrics = require('./metrics.js');

const PORT = process.env.PORT || 8080;
const CLOUD_STORAGE_BUCKET = process.env.UNSCANNED_BUCKET;
const CLEAN_BUCKET = process.env.CLEAN_BUCKET;
const QUARANTINED_BUCKET = process.env.QUARANTINED_BUCKET;
const CLAMD_HOST = '127.0.0.1';
const CLAMD_PORT = 3310;
const MAX_FILE_SIZE = 5000000000; // 5GiB

/**
* Configuration object.
*
* Values are read from the environment variable CONFIG_FILE (which specifies a
* JSON file to read the config from) or single-bucket config variables:
* UNSCANNED_BUCKET, CLEAN_BUCKET and QUARANTINED_BUCKET.
* See {@link readAndVerifyConfig}.
*
* @type {{
* buckets: Array<
* {
* unscanned: string,
* clean: string,
* quarantined: string
* }>
* }}
*/
const BUCKET_CONFIG = {
buckets: [],
};

// Create Clients.
const app = express();
app.use(express.json());
Expand Down Expand Up @@ -71,39 +89,49 @@ app.post('/', async (req, res) => {
file.bucket);
return;
}
if (file.bucket !== CLOUD_STORAGE_BUCKET) {
handleErrorResponse(res, 403, `Incorrect bucket name - ${file.bucket}`);
const config =
BUCKET_CONFIG.buckets.filter((c) => c.unscanned === file.bucket)[0];
if (!config) {
handleErrorResponse(res, 200,
`Bucket name - ${file.bucket} not in config`);
return;
}

const gcsFile = storage.bucket(file.bucket).file(file.name);
if (! (await gcsFile.exists())) {
handleErrorResponse(res, 200,
`File: gs://${file.bucket}/${file.name} does not exist`);
return;
}

const clamdVersion = await getClamVersion();
logger.info(`Scan request for gs://${file.bucket}/${file.name}, (${
file.size} bytes) scanning with clam ${clamdVersion}`);
const startTime = Date.now();
const readStream = await storage.bucket(CLOUD_STORAGE_BUCKET)
.file(file.name)
.createReadStream();
const readStream = await gcsFile.createReadStream();
const result = await scanner.scanStream(readStream);
const scanDuration = Date.now() - startTime;

if (clamd.isCleanReply(result)) {
// Move document to the bucket that holds clean documents
await moveProcessedFile(file.name, true);
await moveProcessedFile(file.name, true, config);

// Log scan outcome
logger.info(`Scan status for gs://${file.bucket}/${file.name}: CLEAN (${
file.size} bytes in ${scanDuration} ms)`);

// Respond to API client
res.json({status: 'clean', clam_version: clamdVersion});
metrics.writeScanClean(CLOUD_STORAGE_BUCKET, CLEAN_BUCKET, file.size,
metrics.writeScanClean(config.unscanned, config.clean, file.size,
scanDuration, clamdVersion);
} else {
// Move document to the bucket that holds infected documents
await moveProcessedFile(file.name, false);
await moveProcessedFile(file.name, false, config);

// Log scan outcome for document
logger.warn(`Scan status for ${(file.name)}: INFECTED ${result}`);
logger.warn(`Scan status for gs://${file.bucket}/${
file.name}: INFECTED ${result} (${
file.size} bytes in ${scanDuration} ms)`);

// Respond to API client
res.json({
Expand All @@ -112,8 +140,8 @@ app.post('/', async (req, res) => {
result: result,
clam_version: clamdVersion,
});
metrics.writeScanInfected(CLOUD_STORAGE_BUCKET, QUARANTINED_BUCKET,
file.size, scanDuration, clamdVersion);
metrics.writeScanInfected(config.unscanned, config.quarantined, file.size,
scanDuration, clamdVersion);
}
} catch (e) {
logger.error(
Expand Down Expand Up @@ -142,7 +170,8 @@ app.post('/', async (req, res) => {
app.get('/', async (req, res) => {
res.status(200)
.type('text/plain')
.send(`${pkgJson.name} version ${pkgJson.version}
.send(
`${pkgJson.name} version ${pkgJson.version}
Using Clam AV version: ${await getClamVersion()}
${pkgJson.description}
Expand Down Expand Up @@ -177,15 +206,78 @@ async function getClamVersion() {
* Move the file to the appropriate bucket.
* @param {string} filename
* @param {boolean} isClean
* @param {!Object} config
*/
async function moveProcessedFile(filename, isClean) {
const srcfile = storage.bucket(CLOUD_STORAGE_BUCKET).file(filename);
async function moveProcessedFile(filename, isClean, config) {
const srcfile = storage.bucket(config.unscanned).file(filename);
const destinationBucketName =
isClean ? `gs://${CLEAN_BUCKET}` : `gs://${QUARANTINED_BUCKET}`;
isClean ? `gs://${config.clean}` : `gs://${config.quarantined}`;
const destinationBucket = storage.bucket(destinationBucketName);
await srcfile.move(destinationBucket);
}

/**
* Read configuration from process environmental variables.
*
* Can be CONFIG_FILE for a JSON file, or single-bucket config
* variables: UNSCANNED_BUCKET, CLEAN_BUCKET and
* QUARANTINED_BUCKET
*/
async function readAndVerifyConfig() {
if (process.env.CONFIG_FILE) {
if (!process.env.CONFIG_FILE.endsWith('.json')) {
throw new Error(`CONFIG_FILE="${
process.env.CONFIG_FILE}" should end with ".json"`);
}
try {
const envConfig = require(process.env.CONFIG_FILE);
BUCKET_CONFIG.buckets = envConfig.buckets;
} catch (e) {
logger.fatal(
{err: e},
`Unable to read JSON file from CONFIG_FILE="${
process.env.CONFIG_FILE}"`);
throw new Error(`Invalid configuration CONFIG_FILE="${
process.env.CONFIG_FILE}"`);
}
} else if (process.env.UNSCANNED_BUCKET && process.env.CLEAN_BUCKET &&
process.env.QUARANTINED_BUCKET) {
// Simple config for single-bucket scanning.
BUCKET_CONFIG.buckets = [{
unscanned: process.env.UNSCANNED_BUCKET,
clean: process.env.CLEAN_BUCKET,
quarantined: process.env.QUARANTINED_BUCKET,
}];
}

if (BUCKET_CONFIG.buckets.length === 0) {
logger.fatal(`No buckets configured for scanning`);
throw new Error('No buckets configured');
}

logger.info("BUCKET_CONFIG: "+JSON.stringify(BUCKET_CONFIG, null, 2));

// Check buckets are specified and exist.
let success = true;
for (let x = 0; x < BUCKET_CONFIG.buckets.length; x++) {
for (const bucket of ['unscanned', 'clean', 'quarantined']) {
if (!BUCKET_CONFIG.buckets[x][bucket]) {
logger.fatal(
`Error in bucket config #${x}: no "${bucket}" bucket defined`);
success = false;
}
if (!(await storage.bucket(BUCKET_CONFIG.buckets[x][bucket]).exists())) {
logger.fatal(`Error in bucket config[${x}]: "${bucket}" bucket: ${
BUCKET_CONFIG.buckets[x][bucket]} does not exist`);
success = false;
}
}
}
if (!success) {
throw new Error('Invalid configuration');
}
}

/**
* Perform async setup and start the app.
*/
Expand All @@ -196,11 +288,17 @@ async function run() {
projectId = await (new GoogleAuth().getProjectId());
}
await metrics.init(projectId);
await readAndVerifyConfig();

app.listen(PORT, () => {
logger.info(
`${pkgJson.name} version ${pkgJson.version} started on port ${PORT}`);
});
}

// Start the service.
run().catch(console.error);
// Start the service, exiting on error.
run().catch((e) => {
logger.fatal(e);
logger.fatal('Exiting');
process.exit(1);
});

0 comments on commit 0a7628d

Please sign in to comment.