From 4dccbdf04f3fa311a44e1ef973a02d7b5cc7c980 Mon Sep 17 00:00:00 2001 From: AASHISH MALIK Date: Wed, 6 Mar 2024 11:04:29 +0530 Subject: [PATCH] feat: algolia v1 proxy --- .../v2/destinations/algolia/rtWorkflow.yaml | 6 +- src/cdk/v2/destinations/algolia/util.js | 71 ++++++++++++++++ src/features.json | 2 +- src/v0/destinations/am/utils.js | 3 +- src/v1/destinations/algolia/networkHandler.js | 85 +++++++++++++++++++ 5 files changed, 163 insertions(+), 4 deletions(-) create mode 100644 src/cdk/v2/destinations/algolia/util.js create mode 100644 src/v1/destinations/algolia/networkHandler.js diff --git a/src/cdk/v2/destinations/algolia/rtWorkflow.yaml b/src/cdk/v2/destinations/algolia/rtWorkflow.yaml index 758a71bf5b..4ededf3d24 100644 --- a/src/cdk/v2/destinations/algolia/rtWorkflow.yaml +++ b/src/cdk/v2/destinations/algolia/rtWorkflow.yaml @@ -2,7 +2,7 @@ bindings: - path: ../../../../v0/destinations/algolia/config - name: handleRtTfSingleEventError path: ../../../../v0/util/index - + - path: ./util steps: - name: validateInput template: | @@ -28,6 +28,10 @@ steps: $.outputs.transform#idx.error.( $.handleRtTfSingleEventError(^[idx], .originalError ?? ., {}) )[] + - name: separateDontBatchEventForProxy + template: | + const payload = $.constructFullPayload(^, $.outputs.successfulEvents); + payload - name: batchSuccessfulEvents description: Batches the successfulEvents template: | diff --git a/src/cdk/v2/destinations/algolia/util.js b/src/cdk/v2/destinations/algolia/util.js new file mode 100644 index 0000000000..e5ca1efc1d --- /dev/null +++ b/src/cdk/v2/destinations/algolia/util.js @@ -0,0 +1,71 @@ +const { BatchUtils } = require('@rudderstack/workflow-engine'); +const { constructPayload } = require('../../../../v0/util'); + +/** + * This fucntion constructs payloads based upon mappingConfig for all calls + * We build context as it has some specific payloads with default values so just breaking them down + * @param {*} message + * @returns + */ +const constructFullPayload = (arr, transformation) => { + const dontBatchEvents = arr.filter((el) => el.metadata.dontBatch); + + return arr; +}; + +const mergeMetadata = (batch) => { + const metadata = []; + batch.forEach((event) => { + metadata.push(event.metadata); + }); + return metadata; +}; + +const getMergedEvents = (batch) => { + const events = []; + batch.forEach((event) => { + events.push(event.output); + }); + return events; +}; + +const batchBuilder = (batch) => ({ + batchedRequest: { + body: { + JSON: { events: getMergedEvents(batch) }, + JSON_ARRAY: {}, + XML: {}, + FORM: {}, + }, + version: '1', + type: 'REST', + method: 'POST', + endpoint: getEndpoint(batch[0].destination.Config), + headers: { + 'Content-Type': 'application/json', + }, + params: {}, + files: {}, + }, + metadata: mergeMetadata(batch), + batched: true, + statusCode: 200, + destination: batch[0].destination, +}); + +/** + * This fucntions make chunk of successful events based on MAX_BATCH_SIZE + * and then build the response for each chunk to be returned as object of an array + * @param {*} events + * @returns + */ +const batchResponseBuilder = (events) => { + const batches = BatchUtils.chunkArrayBySizeAndLength(events, { maxItems: config.MAX_BATCH_SIZE }); + const response = []; + batches.items.forEach((batch) => { + response.push(batchBuilder(batch)); + }); + return response; +}; + +module.exports = { constructFullPayload }; diff --git a/src/features.json b/src/features.json index 5460111a22..316c48ea2b 100644 --- a/src/features.json +++ b/src/features.json @@ -83,5 +83,5 @@ "SPRIG" ], "supportSourceTransformV1": true, - "supportTransformerProxyV1": false + "supportTransformerProxyV1": true } diff --git a/src/v0/destinations/am/utils.js b/src/v0/destinations/am/utils.js index 4d4fd5dc37..190a5c1bae 100644 --- a/src/v0/destinations/am/utils.js +++ b/src/v0/destinations/am/utils.js @@ -123,7 +123,6 @@ const validateEventType = (evType) => { } }; - const userPropertiesPostProcess = (rawPayload) => { const operationList = [ '$setOnce', @@ -187,5 +186,5 @@ module.exports = { getEventId, getUnsetObj, validateEventType, - userPropertiesPostProcess + userPropertiesPostProcess, }; diff --git a/src/v1/destinations/algolia/networkHandler.js b/src/v1/destinations/algolia/networkHandler.js new file mode 100644 index 0000000000..502da1e102 --- /dev/null +++ b/src/v1/destinations/algolia/networkHandler.js @@ -0,0 +1,85 @@ +const { TransformerProxyError } = require('../../../v0/util/errorTypes'); +const { prepareProxyRequest, proxyRequest } = require('../../../adapters/network'); +const { isHttpStatusSuccess, getAuthErrCategoryFromStCode } = require('../../../v0/util/index'); + +const { + processAxiosResponse, + getDynamicErrorType, +} = require('../../../adapters/utils/networkUtils'); +const tags = require('../../../v0/util/tags'); + +const responseHandler = (responseParams) => { + let { destinationResponse, rudderJobMetadata } = responseParams; + const message = `[ALGOLIA Response V1 Handler] - Request Processed Successfully`; + const responseWithIndividualEvents = []; + // response: + // {status: 200, message: 'OK'} + // {response:'[ENOTFOUND] :: DNS lookup failed', status: 400} + // destinationResponse = { + // response: {"status": 422, "message": "EventType must be one of \"click\", \"conversion\" or \"view\""}, status: 422 + // } + const { response, status } = destinationResponse; + + if (isHttpStatusSuccess(status)) { + for (const mData of rudderJobMetadata) { + const proxyOutputObj = { + statusCode: 200, + metadata: mData, + error: 'success', + }; + responseWithIndividualEvents.push(proxyOutputObj); + } + + return { + status, + message, + destinationResponse, + response: responseWithIndividualEvents, + }; + } + + // in case of non 2xx status sending 500 for every event, populate response and update dontBatch to true + const errorMessage = response?.error?.message || response?.message || 'unknown error format'; + let serverStatus = 400; + for (let metadata of rudderJobMetadata) { + //handling case if dontBatch is true, and again we got invalid from destination + if (metadata.dontBatch && status === 422) { + responseWithIndividualEvents.push({ + statusCode: 400, + metadata, + error: errorMessage, + }); + continue; + } + serverStatus = 500; + metadata.dontBatch = true; + responseWithIndividualEvents.push({ + statusCode: 500, + metadata, + error: errorMessage, + }); + } + + // 400 ... 500 ...not going through individual status + + // sending back 500 for retry + throw new TransformerProxyError( + `ALGOLIA: Error transformer proxy v1 during ALGOLIA response transformation`, + serverStatus, + { + [tags.TAG_NAMES.ERROR_TYPE]: getDynamicErrorType(status), + }, + destinationResponse, + getAuthErrCategoryFromStCode(status), + responseWithIndividualEvents, + ); +}; + +function networkHandler() { + this.prepareProxy = prepareProxyRequest; + this.proxy = proxyRequest; + this.processAxiosResponse = processAxiosResponse; + this.responseHandler = responseHandler; +} + +module.exports = { networkHandler };