-
Notifications
You must be signed in to change notification settings - Fork 113
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
a7b30cc
commit 4dccbdf
Showing
5 changed files
with
163 additions
and
4 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,71 @@ | ||
const { BatchUtils } = require('@rudderstack/workflow-engine'); | ||
const { constructPayload } = require('../../../../v0/util'); | ||
|
||
/** | ||
* This fucntion constructs payloads based upon mappingConfig for all calls | ||
* We build context as it has some specific payloads with default values so just breaking them down | ||
* @param {*} message | ||
* @returns | ||
*/ | ||
const constructFullPayload = (arr, transformation) => { | ||
const dontBatchEvents = arr.filter((el) => el.metadata.dontBatch); | ||
|
||
return arr; | ||
}; | ||
|
||
const mergeMetadata = (batch) => { | ||
const metadata = []; | ||
batch.forEach((event) => { | ||
metadata.push(event.metadata); | ||
}); | ||
return metadata; | ||
}; | ||
|
||
const getMergedEvents = (batch) => { | ||
const events = []; | ||
batch.forEach((event) => { | ||
events.push(event.output); | ||
}); | ||
return events; | ||
}; | ||
|
||
const batchBuilder = (batch) => ({ | ||
batchedRequest: { | ||
body: { | ||
JSON: { events: getMergedEvents(batch) }, | ||
JSON_ARRAY: {}, | ||
XML: {}, | ||
FORM: {}, | ||
}, | ||
version: '1', | ||
type: 'REST', | ||
method: 'POST', | ||
endpoint: getEndpoint(batch[0].destination.Config), | ||
headers: { | ||
'Content-Type': 'application/json', | ||
}, | ||
params: {}, | ||
files: {}, | ||
}, | ||
metadata: mergeMetadata(batch), | ||
batched: true, | ||
statusCode: 200, | ||
destination: batch[0].destination, | ||
}); | ||
|
||
/** | ||
* This fucntions make chunk of successful events based on MAX_BATCH_SIZE | ||
* and then build the response for each chunk to be returned as object of an array | ||
* @param {*} events | ||
* @returns | ||
*/ | ||
const batchResponseBuilder = (events) => { | ||
const batches = BatchUtils.chunkArrayBySizeAndLength(events, { maxItems: config.MAX_BATCH_SIZE }); | ||
const response = []; | ||
batches.items.forEach((batch) => { | ||
response.push(batchBuilder(batch)); | ||
}); | ||
return response; | ||
}; | ||
|
||
module.exports = { constructFullPayload }; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,85 @@ | ||
const { TransformerProxyError } = require('../../../v0/util/errorTypes'); | ||
const { prepareProxyRequest, proxyRequest } = require('../../../adapters/network'); | ||
const { isHttpStatusSuccess, getAuthErrCategoryFromStCode } = require('../../../v0/util/index'); | ||
|
||
const { | ||
processAxiosResponse, | ||
getDynamicErrorType, | ||
} = require('../../../adapters/utils/networkUtils'); | ||
const tags = require('../../../v0/util/tags'); | ||
|
||
const responseHandler = (responseParams) => { | ||
let { destinationResponse, rudderJobMetadata } = responseParams; | ||
const message = `[ALGOLIA Response V1 Handler] - Request Processed Successfully`; | ||
const responseWithIndividualEvents = []; | ||
// response: | ||
// {status: 200, message: 'OK'} | ||
// {response:'[ENOTFOUND] :: DNS lookup failed', status: 400} | ||
// destinationResponse = { | ||
// response: {"status": 422, "message": "EventType must be one of \"click\", \"conversion\" or \"view\""}, status: 422 | ||
// } | ||
const { response, status } = destinationResponse; | ||
|
||
if (isHttpStatusSuccess(status)) { | ||
for (const mData of rudderJobMetadata) { | ||
const proxyOutputObj = { | ||
statusCode: 200, | ||
metadata: mData, | ||
error: 'success', | ||
}; | ||
responseWithIndividualEvents.push(proxyOutputObj); | ||
} | ||
|
||
return { | ||
status, | ||
message, | ||
destinationResponse, | ||
response: responseWithIndividualEvents, | ||
}; | ||
} | ||
|
||
// in case of non 2xx status sending 500 for every event, populate response and update dontBatch to true | ||
const errorMessage = response?.error?.message || response?.message || 'unknown error format'; | ||
let serverStatus = 400; | ||
for (let metadata of rudderJobMetadata) { | ||
//handling case if dontBatch is true, and again we got invalid from destination | ||
if (metadata.dontBatch && status === 422) { | ||
responseWithIndividualEvents.push({ | ||
statusCode: 400, | ||
metadata, | ||
error: errorMessage, | ||
}); | ||
continue; | ||
} | ||
serverStatus = 500; | ||
metadata.dontBatch = true; | ||
responseWithIndividualEvents.push({ | ||
statusCode: 500, | ||
metadata, | ||
error: errorMessage, | ||
}); | ||
} | ||
|
||
// 400 ... 500 ...not going through individual status | ||
|
||
// sending back 500 for retry | ||
throw new TransformerProxyError( | ||
`ALGOLIA: Error transformer proxy v1 during ALGOLIA response transformation`, | ||
serverStatus, | ||
{ | ||
[tags.TAG_NAMES.ERROR_TYPE]: getDynamicErrorType(status), | ||
}, | ||
destinationResponse, | ||
getAuthErrCategoryFromStCode(status), | ||
responseWithIndividualEvents, | ||
); | ||
}; | ||
|
||
function networkHandler() { | ||
this.prepareProxy = prepareProxyRequest; | ||
this.proxy = proxyRequest; | ||
this.processAxiosResponse = processAxiosResponse; | ||
this.responseHandler = responseHandler; | ||
} | ||
|
||
module.exports = { networkHandler }; |