Skip to content

Commit

Permalink
Merge branch 'develop' of github.com:rudderlabs/rudder-transformer in…
Browse files Browse the repository at this point in the history
…to chore.format-standard
  • Loading branch information
Sai Sankeerth committed Feb 13, 2024
2 parents 3b1db74 + a98cabd commit 1cfd84c
Show file tree
Hide file tree
Showing 10 changed files with 871 additions and 674 deletions.
13 changes: 3 additions & 10 deletions src/v0/destinations/am/transform.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@ const {
getFieldValueFromMessage,
getValueFromMessage,
deleteObjectProperty,
getErrorRespEvents,
removeUndefinedAndNullValues,
isDefinedAndNotNull,
isAppleFamily,
isDefinedAndNotNullAndNotEmpty,
simpleProcessRouterDest,
isValidInteger,
handleRtTfSingleEventError,
} = require('../../util');
const {
BASE_URL,
Expand All @@ -40,7 +40,6 @@ const {
AMBatchSizeLimit,
AMBatchEventLimit,
} = require('./config');
const tags = require('../../util/tags');

const AMUtils = require('./utils');

Expand Down Expand Up @@ -904,16 +903,10 @@ const batch = (destEvents) => {
// this case shold not happen and should be filtered already
// by the first pass of single event transformation
if (messageEvent && !userId && !deviceId) {
const errorResponse = getErrorRespEvents(
metadata,
400,
const MissingUserIdDeviceIdError = new InstrumentationError(
'Both userId and deviceId cannot be undefined',
{
[tags.TAG_NAMES.ERROR_CATEGORY]: tags.ERROR_CATEGORIES.DATA_VALIDATION,
[tags.TAG_NAMES.ERROR_TYPE]: tags.ERROR_TYPES.INSTRUMENTATION,
},
);
respList.push(errorResponse);
respList.push(handleRtTfSingleEventError(ev, MissingUserIdDeviceIdError, {}));
return;
}
/* check if not a JSON body or (userId length < 5 && batchEventsWithUserIdLengthLowerThanFive is false) or
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ const {
defaultBatchRequestConfig,
getSuccessRespEvents,
checkInvalidRtTfEvents,
combineBatchRequestsWithSameJobIds,
} = require('../../util');
const {
CALL_CONVERSION,
Expand Down Expand Up @@ -229,7 +230,7 @@ const processRouterDest = async (inputs, reqMetadata) => {
.concat(storeSalesEventsBatchedResponseList)
.concat(clickCallEvents)
.concat(errorRespList);
return batchedResponseList;
return combineBatchRequestsWithSameJobIds(batchedResponseList);
};

module.exports = { process, processRouterDest };
2 changes: 1 addition & 1 deletion src/v0/destinations/mp/transform.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ const {
handleRtTfSingleEventError,
groupEventsByType,
parseConfigArray,
combineBatchRequestsWithSameJobIds,
} = require('../../util');
const {
ConfigCategory,
Expand All @@ -33,7 +34,6 @@ const {
createIdentifyResponse,
isImportAuthCredentialsAvailable,
buildUtmParams,
combineBatchRequestsWithSameJobIds,
groupEventsByEndpoint,
batchEvents,
trimTraits,
Expand Down
91 changes: 0 additions & 91 deletions src/v0/destinations/mp/util.js
Original file line number Diff line number Diff line change
Expand Up @@ -139,44 +139,6 @@ const isImportAuthCredentialsAvailable = (destination) =>
destination.Config.serviceAccountUserName &&
destination.Config.projectId);

/**
* Finds an existing batch based on metadata JobIds from the provided batch and metadataMap.
* @param {*} batch
* @param {*} metadataMap The map containing metadata items indexed by JobIds.
* @returns
*/
const findExistingBatch = (batch, metadataMap) => {
let existingBatch = null;

// eslint-disable-next-line no-restricted-syntax
for (const metadataItem of batch.metadata) {
if (metadataMap.has(metadataItem.jobId)) {
existingBatch = metadataMap.get(metadataItem.jobId);
break;
}
}

return existingBatch;
};

/**
* Removes duplicate metadata within each merged batch object.
* @param {*} mergedBatches An array of merged batch objects.
*/
const removeDuplicateMetadata = (mergedBatches) => {
mergedBatches.forEach((batch) => {
const metadataSet = new Set();
// eslint-disable-next-line no-param-reassign
batch.metadata = batch.metadata.filter((metadataItem) => {
if (!metadataSet.has(metadataItem.jobId)) {
metadataSet.add(metadataItem.jobId);
return true;
}
return false;
});
});
};

/**
* Builds UTM parameters from a campaign object.
*
Expand Down Expand Up @@ -273,58 +235,6 @@ const batchEvents = (successRespList, maxBatchSize, reqMetadata) => {
});
};

/**
* Combines batched requests with the same JobIds.
* @param {*} inputBatches The array of batched request objects.
* @returns The combined batched requests with merged JobIds.
*
*/
const combineBatchRequestsWithSameJobIds = (inputBatches) => {
const combineBatches = (batches) => {
const clonedBatches = [...batches];
const mergedBatches = [];
const metadataMap = new Map();

clonedBatches.forEach((batch) => {
const existingBatch = findExistingBatch(batch, metadataMap);

if (existingBatch) {
// Merge batchedRequests arrays
existingBatch.batchedRequest = [
...(Array.isArray(existingBatch.batchedRequest)
? existingBatch.batchedRequest
: [existingBatch.batchedRequest]),
...(Array.isArray(batch.batchedRequest) ? batch.batchedRequest : [batch.batchedRequest]),
];

// Merge metadata
batch.metadata.forEach((metadataItem) => {
if (!metadataMap.has(metadataItem.jobId)) {
metadataMap.set(metadataItem.jobId, existingBatch);
}
existingBatch.metadata.push(metadataItem);
});
} else {
mergedBatches.push(batch);
batch.metadata.forEach((metadataItem) => {
metadataMap.set(metadataItem.jobId, batch);
});
}
});

// Remove duplicate metadata within each merged object
removeDuplicateMetadata(mergedBatches);

return mergedBatches;
};
// We need to run this twice because in first pass some batches might not get merged
// and in second pass they might get merged
// Example: [[{jobID:1}, {jobID:2}], [{jobID:3}], [{jobID:1}, {jobID:3}]]
// 1st pass: [[{jobID:1}, {jobID:2}, {jobID:3}], [{jobID:3}]]
// 2nd pass: [[{jobID:1}, {jobID:2}, {jobID:3}]]
return combineBatches(combineBatches(inputBatches));
};

/**
* Trims the traits and contextTraits objects based on the setOnceProperties array and returns an object containing the modified traits, contextTraits, and setOnce properties.
*
Expand Down Expand Up @@ -398,6 +308,5 @@ module.exports = {
groupEventsByEndpoint,
generateBatchedPayloadForArray,
batchEvents,
combineBatchRequestsWithSameJobIds,
trimTraits,
};
Loading

0 comments on commit 1cfd84c

Please sign in to comment.