From 10090214e0ecf4d98dae52f0e1aacd4c1f130975 Mon Sep 17 00:00:00 2001 From: csmig <33138761+csmig@users.noreply.github.com> Date: Thu, 13 Jun 2024 00:50:17 +0000 Subject: [PATCH] feat: concurrent asset fetches (#125) --- index.js | 1 - lib/api.js | 65 ++++++++++++++++++++++++++++++++++++++++++++++++---- lib/cargo.js | 27 +++++++++++++--------- 3 files changed, 76 insertions(+), 17 deletions(-) diff --git a/index.js b/index.js index 0d5cda2..038514f 100755 --- a/index.js +++ b/index.js @@ -129,7 +129,6 @@ async function preflightServices () { logger.info({ component, message: `preflight token request suceeded`}) const promises = [ api.getCollection(options.collectionId), - api.getCollectionAssets(options.collectionId), api.getInstalledStigs(), api.getScapBenchmarkMap() ] diff --git a/lib/api.js b/lib/api.js index 4865d05..99a611b 100644 --- a/lib/api.js +++ b/lib/api.js @@ -61,7 +61,7 @@ async function apiRequest({method = 'GET', endpoint, json, authorize = true, ful } catch (e) { // accept a client error for POST /assets if it reports a duplicate name - if (e.response?.statusCode === 400 && e.response?.body?.message === 'Duplicate name') { + if (e.response?.statusCode === 422 && e.response?.body?.message === 'Duplicate name exists') { logResponse(e.response) return fullResponse ? e?.response : e?.response?.body } @@ -71,13 +71,40 @@ async function apiRequest({method = 'GET', endpoint, json, authorize = true, ful if (e.response?.statusCode === 403) { Alarm.noGrant(true) } - else { + else if (e.code === 'ETIMEDOUT' || e.response?.statusCode === 503) { Alarm.apiOffline(true) } throw (e) } } +async function apiGetRequestsParallel({endpoints, authorize = true}) { + const requestOptions = { + responseType: 'json', + timeout: { + response: options.responseTimeout + } + } + if (authorize) { + try { + await getToken() + } + catch (e) { + e.component = 'api' + logError(e) + throw(e) + } + requestOptions.headers = { + Authorization: `Bearer ${tokens.access_token}` + } + } + const requests = [] + for (const endpoint of endpoints) { + requests.push(got.get(endpoint, requestOptions)) + } + return Promise.all(requests) +} + export async function getScapBenchmarkMap() { const body = await apiRequest({endpoint: '/stigs/scap-maps'}) cache.scapBenchmarkMap = new Map(body.map(apiScapMap => [apiScapMap.scapBenchmarkId, apiScapMap.benchmarkId])) @@ -98,9 +125,37 @@ export async function getCollection(collectionId) { return cache.collection } -export async function getCollectionAssets(collectionId) { - cache.assets = await apiRequest({endpoint: `/assets?collectionId=${collectionId}&projection=stigs`}) - return cache.assets +export async function getCollectionAssets({collectionId, targets}) { + + let namedEndpoints = [] + let metadataEndpoints = [] + + if (targets) { + const names = targets.filter(t => !t.metadata.cklHostName).map(t => t.name) + const cklMetadatas = targets.filter(t => t.metadata.cklHostName).map(t => (({cklHostName, cklWebDbSite, cklWebDbInstance}) => ({cklHostName, cklWebDbSite, cklWebDbInstance}))(t.metadata)) + + namedEndpoints = names.map(name => `${options.api}/assets?collectionId=${collectionId}&name=${encodeURIComponent(name)}&projection=stigs`) + metadataEndpoints = cklMetadatas.map(md => { + let metadataParams = [] + for (const metadataKey of Object.keys(md).filter(k=>md[k])) { + metadataParams.push(`metadata=${metadataKey}%3A${encodeURIComponent(md[metadataKey])}`) + } + return `${options.api}/assets?collectionId=${collectionId}&${metadataParams.join('&')}&projection=stigs` + }) + const responses = await apiGetRequestsParallel({ + endpoints: new Set([...namedEndpoints, ...metadataEndpoints]) + }) + const assets = [] + for (const response of responses) { + logResponse (response) + if (response.body?.[0]) assets.push(response.body[0]) + } + return assets + } + else { + cache.assets = await apiRequest({endpoint: `/assets?collectionId=${collectionId}&projection=stigs`}) + return cache.assets + } } export async function getInstalledStigs() { diff --git a/lib/cargo.js b/lib/cargo.js index ffa5814..6c51eb1 100644 --- a/lib/cargo.js +++ b/lib/cargo.js @@ -13,7 +13,7 @@ async function writer ( taskAsset ) { const component = 'writer' try { logger.debug({ - component: component, + component, message: `${taskAsset.assetProps.name} started` }) @@ -23,7 +23,7 @@ async function writer ( taskAsset ) { // GET projection=stigs is an object, we just need the benchmarkIds r.apiAsset.stigs = r.apiAsset.stigs.map ( stig => stig.benchmarkId ) logger.info({ component: component, message: `asset ${r.created ? 'created' : 'found'}`, asset: r.apiAsset }) - // TODO: If created === false, then STIG assignments should be vetted again + // Iterate: If created === false, then STIG assignments should be vetted again taskAsset.assetProps = r.apiAsset } @@ -50,7 +50,7 @@ async function writer ( taskAsset ) { if (reviews.length > 0) { const r = await api.postReviews(options.collectionId, taskAsset.assetProps.assetId, reviews) logger.info({ - component: component, + component, message: `posted reviews`, asset: { name: taskAsset.assetProps.name, id: taskAsset.assetProps.assetId }, rejected: r.rejected, @@ -59,7 +59,7 @@ async function writer ( taskAsset ) { } else { logger.warn({ - component: component, + component, message: `no reviews to post`, asset: { name: taskAsset.assetProps.name, id: taskAsset.assetProps.assetId }, }) @@ -94,24 +94,30 @@ async function writer ( taskAsset ) { } async function resultsHandler( parsedResults, cb ) { - const component = 'batch' try { batchId++ const isModeScan = options.mode === 'scan' - logger.info({component: component, message: `batch started`, batchId: batchId, size: parsedResults.length}) - const apiAssets = await api.getCollectionAssets(options.collectionId) + logger.info({component, message: `batch started`, batchId, size: parsedResults.length}) + const apiAssets = await api.getCollectionAssets( + { + collectionId: options.collectionId, + targets: parsedResults.map(pr=>pr.target) + } + ) + logger.info({component, message: `asset data received`, batchId, size: apiAssets.length}) const apiStigs = await api.getInstalledStigs() - const tasks = new TaskObject ({ parsedResults, apiAssets, apiStigs, options:options }) + logger.info({component, message: `stig data received`, batchId, size: apiStigs.length}) + const tasks = new TaskObject ({ parsedResults, apiAssets, apiStigs, options }) isModeScan && tasks.errors.length && addToHistory(tasks.errors.map(e => e.sourceRef)) for ( const taskAsset of tasks.taskAssets.values() ) { const success = await writer( taskAsset ) isModeScan && success && addToHistory(taskAsset.sourceRefs) } - logger.info({component: component, message: 'batch ended', batchId: batchId}) + logger.info({component, message: 'batch ended', batchId}) cb() } catch (e) { - logger.error({component: component, message: e.message, error: serializeError(e)}) + logger.error({component, message: 'batch ended', error: serializeError(e), batchId}) cb( e, undefined) } } @@ -120,7 +126,6 @@ const cargoQueue = new Queue(resultsHandler, { id: 'file', batchSize: options.cargoSize, batchDelay: options.oneShot ? 0 : options.cargoDelay, - // batchDelayTimeout: options.cargoDelay }) cargoQueue .on('batch_failed', (err) => {