Skip to content

Commit

Permalink
feat: concurrent asset fetches (#125)
Browse files Browse the repository at this point in the history
  • Loading branch information
csmig authored Jun 13, 2024
1 parent ed5f8c3 commit 1009021
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 17 deletions.
1 change: 0 additions & 1 deletion index.js
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,6 @@ async function preflightServices () {
logger.info({ component, message: `preflight token request suceeded`})
const promises = [
api.getCollection(options.collectionId),
api.getCollectionAssets(options.collectionId),
api.getInstalledStigs(),
api.getScapBenchmarkMap()
]
Expand Down
65 changes: 60 additions & 5 deletions lib/api.js
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ async function apiRequest({method = 'GET', endpoint, json, authorize = true, ful
}
catch (e) {
// accept a client error for POST /assets if it reports a duplicate name
if (e.response?.statusCode === 400 && e.response?.body?.message === 'Duplicate name') {
if (e.response?.statusCode === 422 && e.response?.body?.message === 'Duplicate name exists') {
logResponse(e.response)
return fullResponse ? e?.response : e?.response?.body
}
Expand All @@ -71,13 +71,40 @@ async function apiRequest({method = 'GET', endpoint, json, authorize = true, ful
if (e.response?.statusCode === 403) {
Alarm.noGrant(true)
}
else {
else if (e.code === 'ETIMEDOUT' || e.response?.statusCode === 503) {
Alarm.apiOffline(true)
}
throw (e)
}
}

async function apiGetRequestsParallel({endpoints, authorize = true}) {
const requestOptions = {
responseType: 'json',
timeout: {
response: options.responseTimeout
}
}
if (authorize) {
try {
await getToken()
}
catch (e) {
e.component = 'api'
logError(e)
throw(e)
}
requestOptions.headers = {
Authorization: `Bearer ${tokens.access_token}`
}
}
const requests = []
for (const endpoint of endpoints) {
requests.push(got.get(endpoint, requestOptions))
}
return Promise.all(requests)
}

export async function getScapBenchmarkMap() {
const body = await apiRequest({endpoint: '/stigs/scap-maps'})
cache.scapBenchmarkMap = new Map(body.map(apiScapMap => [apiScapMap.scapBenchmarkId, apiScapMap.benchmarkId]))
Expand All @@ -98,9 +125,37 @@ export async function getCollection(collectionId) {
return cache.collection
}

export async function getCollectionAssets(collectionId) {
cache.assets = await apiRequest({endpoint: `/assets?collectionId=${collectionId}&projection=stigs`})
return cache.assets
export async function getCollectionAssets({collectionId, targets}) {

let namedEndpoints = []
let metadataEndpoints = []

if (targets) {
const names = targets.filter(t => !t.metadata.cklHostName).map(t => t.name)
const cklMetadatas = targets.filter(t => t.metadata.cklHostName).map(t => (({cklHostName, cklWebDbSite, cklWebDbInstance}) => ({cklHostName, cklWebDbSite, cklWebDbInstance}))(t.metadata))

namedEndpoints = names.map(name => `${options.api}/assets?collectionId=${collectionId}&name=${encodeURIComponent(name)}&projection=stigs`)
metadataEndpoints = cklMetadatas.map(md => {
let metadataParams = []
for (const metadataKey of Object.keys(md).filter(k=>md[k])) {
metadataParams.push(`metadata=${metadataKey}%3A${encodeURIComponent(md[metadataKey])}`)
}
return `${options.api}/assets?collectionId=${collectionId}&${metadataParams.join('&')}&projection=stigs`
})
const responses = await apiGetRequestsParallel({
endpoints: new Set([...namedEndpoints, ...metadataEndpoints])
})
const assets = []
for (const response of responses) {
logResponse (response)
if (response.body?.[0]) assets.push(response.body[0])
}
return assets
}
else {
cache.assets = await apiRequest({endpoint: `/assets?collectionId=${collectionId}&projection=stigs`})
return cache.assets
}
}

export async function getInstalledStigs() {
Expand Down
27 changes: 16 additions & 11 deletions lib/cargo.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ async function writer ( taskAsset ) {
const component = 'writer'
try {
logger.debug({
component: component,
component,
message: `${taskAsset.assetProps.name} started`
})

Expand All @@ -23,7 +23,7 @@ async function writer ( taskAsset ) {
// GET projection=stigs is an object, we just need the benchmarkIds
r.apiAsset.stigs = r.apiAsset.stigs.map ( stig => stig.benchmarkId )
logger.info({ component: component, message: `asset ${r.created ? 'created' : 'found'}`, asset: r.apiAsset })
// TODO: If created === false, then STIG assignments should be vetted again
// Iterate: If created === false, then STIG assignments should be vetted again
taskAsset.assetProps = r.apiAsset
}

Expand All @@ -50,7 +50,7 @@ async function writer ( taskAsset ) {
if (reviews.length > 0) {
const r = await api.postReviews(options.collectionId, taskAsset.assetProps.assetId, reviews)
logger.info({
component: component,
component,
message: `posted reviews`,
asset: { name: taskAsset.assetProps.name, id: taskAsset.assetProps.assetId },
rejected: r.rejected,
Expand All @@ -59,7 +59,7 @@ async function writer ( taskAsset ) {
}
else {
logger.warn({
component: component,
component,
message: `no reviews to post`,
asset: { name: taskAsset.assetProps.name, id: taskAsset.assetProps.assetId },
})
Expand Down Expand Up @@ -94,24 +94,30 @@ async function writer ( taskAsset ) {
}

async function resultsHandler( parsedResults, cb ) {
const component = 'batch'
try {
batchId++
const isModeScan = options.mode === 'scan'
logger.info({component: component, message: `batch started`, batchId: batchId, size: parsedResults.length})
const apiAssets = await api.getCollectionAssets(options.collectionId)
logger.info({component, message: `batch started`, batchId, size: parsedResults.length})
const apiAssets = await api.getCollectionAssets(
{
collectionId: options.collectionId,
targets: parsedResults.map(pr=>pr.target)
}
)
logger.info({component, message: `asset data received`, batchId, size: apiAssets.length})
const apiStigs = await api.getInstalledStigs()
const tasks = new TaskObject ({ parsedResults, apiAssets, apiStigs, options:options })
logger.info({component, message: `stig data received`, batchId, size: apiStigs.length})
const tasks = new TaskObject ({ parsedResults, apiAssets, apiStigs, options })
isModeScan && tasks.errors.length && addToHistory(tasks.errors.map(e => e.sourceRef))
for ( const taskAsset of tasks.taskAssets.values() ) {
const success = await writer( taskAsset )
isModeScan && success && addToHistory(taskAsset.sourceRefs)
}
logger.info({component: component, message: 'batch ended', batchId: batchId})
logger.info({component, message: 'batch ended', batchId})
cb()
}
catch (e) {
logger.error({component: component, message: e.message, error: serializeError(e)})
logger.error({component, message: 'batch ended', error: serializeError(e), batchId})
cb( e, undefined)
}
}
Expand All @@ -120,7 +126,6 @@ const cargoQueue = new Queue(resultsHandler, {
id: 'file',
batchSize: options.cargoSize,
batchDelay: options.oneShot ? 0 : options.cargoDelay,
// batchDelayTimeout: options.cargoDelay
})
cargoQueue
.on('batch_failed', (err) => {
Expand Down

0 comments on commit 1009021

Please sign in to comment.