Skip to content

Commit

Permalink
feat: add support for vdm next to fb custom audiences (#3729)
Browse files Browse the repository at this point in the history
* feat: add support for vdm next to fb custom audiences

Add support record v2 format for vdm next.

* chore: add missing tests

* chore: refactor fbca

* chore: update docs

* chore: update docs
  • Loading branch information
koladilip authored Sep 19, 2024
1 parent 0ebeac1 commit f33f525
Show file tree
Hide file tree
Showing 18 changed files with 2,439 additions and 57,181 deletions.
6,115 changes: 1,642 additions & 4,473 deletions package-lock.json

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@
"@types/koa-bodyparser": "^4.3.10",
"@types/lodash": "^4.14.197",
"@types/node": "^20.2.5",
"@types/supertest": "^6.0.2",
"@typescript-eslint/eslint-plugin": "^5.61.0",
"@typescript-eslint/parser": "^5.59.2",
"axios-mock-adapter": "^1.22.0",
Expand Down
11 changes: 11 additions & 0 deletions src/types/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,14 @@ type Destination = {
IsConnectionEnabled?: boolean;
};

type Connection = {
sourceId: string;
destinationId: string;
enabled: boolean;
config: Record<string, unknown>;
processorEnabled?: boolean;
};

type UserTransformationLibrary = {
VersionID: string;
};
Expand All @@ -151,6 +159,7 @@ type ProcessorTransformationRequest = {
message: object;
metadata: Metadata;
destination: Destination;
connection?: Connection;
libraries?: UserTransformationLibrary[];
credentials?: Credential[];
};
Expand All @@ -160,6 +169,7 @@ type RouterTransformationRequestData = {
message: object;
metadata: Metadata;
destination: Destination;
connection?: Connection;
};

type RouterTransformationRequest = {
Expand Down Expand Up @@ -350,6 +360,7 @@ export {
DeliveryJobState,
DeliveryV0Response,
DeliveryV1Response,
Connection,
Destination,
ErrorDetailer,
MessageIdMetadataMap,
Expand Down
6 changes: 4 additions & 2 deletions src/v0/destinations/fb_custom_audience/config.js
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,11 @@ const subTypeFields = [
'CONTACT_IMPORTER',
'DATA_FILE',
];
// as per real time experimentation maximum 500 users can be added at a time
// const MAX_USER_COUNT = 500; (using from destination definition)

const USER_ADD = 'add';
const USER_DELETE = 'remove';
// https://developers.facebook.com/docs/marketing-api/audiences/guides/custom-audiences/
const MAX_USER_COUNT = 10000;
/* No official Documentation is available for this but using trial
and error method we found that 65000 bytes is the maximum payload allowed size but we are 60000 just to be sure batching is done properly
*/
Expand All @@ -102,6 +103,7 @@ module.exports = {
schemaFields,
USER_ADD,
USER_DELETE,
MAX_USER_COUNT,
typeFields,
subTypeFields,
maxPayloadSize,
Expand Down
140 changes: 90 additions & 50 deletions src/v0/destinations/fb_custom_audience/recordTransform.js
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
/* eslint-disable no-const-assign */
const lodash = require('lodash');
const get = require('get-value');
const { InstrumentationError, ConfigurationError } = require('@rudderstack/integrations-lib');
const { schemaFields } = require('./config');
const { MappedToDestinationKey } = require('../../../constants');
const { schemaFields, MAX_USER_COUNT } = require('./config');
const stats = require('../../../util/stats');
const {
getDestinationExternalIDInfoForRetl,
isDefinedAndNotNullAndNotEmpty,
checkSubsetOfArray,
returnArrayOfSubarrays,
getSuccessRespEvents,
isEventSentByVDMV2Flow,
isEventSentByVDMV1Flow,
} = require('../../util');
const { getErrorResponse, createFinalResponse } = require('../../util/recordUtils');
const {
Expand All @@ -20,6 +20,7 @@ const {
batchingWithPayloadSize,
responseBuilderSimple,
getDataSource,
generateAppSecretProof,
} = require('./util');

const processRecordEventArray = (
Expand All @@ -31,7 +32,7 @@ const processRecordEventArray = (
prepareParams,
destination,
operation,
operationAudienceId,
audienceId,
) => {
const toSendEvents = [];
const metadata = [];
Expand Down Expand Up @@ -88,7 +89,7 @@ const processRecordEventArray = (
operationCategory: operation,
};

const builtResponse = responseBuilderSimple(wrappedResponse, operationAudienceId);
const builtResponse = responseBuilderSimple(wrappedResponse, audienceId);

toSendEvents.push(builtResponse);
});
Expand All @@ -99,49 +100,26 @@ const processRecordEventArray = (
return response;
};

async function processRecordInputs(groupedRecordInputs) {
const { destination } = groupedRecordInputs[0];
const { message } = groupedRecordInputs[0];
const {
isHashRequired,
accessToken,
disableFormat,
type,
subType,
isRaw,
maxUserCount,
audienceId,
} = destination.Config;
function preparePayload(events, config) {
const { audienceId, userSchema, isRaw, type, subType, isHashRequired, disableFormat } = config;
const { destination } = events[0];
const { accessToken, appSecret } = destination.Config;
const prepareParams = {
access_token: accessToken,
};

// maxUserCount validation
const maxUserCountNumber = parseInt(maxUserCount, 10);
if (Number.isNaN(maxUserCountNumber)) {
throw new ConfigurationError('Batch size must be an Integer.');
if (isDefinedAndNotNullAndNotEmpty(appSecret)) {
const dateNow = Date.now();
prepareParams.appsecret_time = Math.floor(dateNow / 1000); // Get current Unix time in seconds
prepareParams.appsecret_proof = generateAppSecretProof(accessToken, appSecret, dateNow);
}

// audience id validation
let operationAudienceId = audienceId;
const mappedToDestination = get(message, MappedToDestinationKey);
if (mappedToDestination) {
const { objectType } = getDestinationExternalIDInfoForRetl(message, 'FB_CUSTOM_AUDIENCE');
operationAudienceId = objectType;
}
if (!isDefinedAndNotNullAndNotEmpty(operationAudienceId)) {
throw new ConfigurationError('Audience ID is a mandatory field');
}
const cleanUserSchema = userSchema.map((field) => field.trim());

// user schema validation
let { userSchema } = destination.Config;
if (mappedToDestination) {
userSchema = getSchemaForEventMappedToDest(message);
}
if (!Array.isArray(userSchema)) {
userSchema = [userSchema];
if (!isDefinedAndNotNullAndNotEmpty(audienceId)) {
throw new ConfigurationError('Audience ID is a mandatory field');
}
if (!checkSubsetOfArray(schemaFields, userSchema)) {
if (!checkSubsetOfArray(schemaFields, cleanUserSchema)) {
throw new ConfigurationError('One or more of the schema fields are not supported');
}

Expand All @@ -156,7 +134,7 @@ async function processRecordInputs(groupedRecordInputs) {
paramsPayload.data_source = dataSource;
}

const groupedRecordsByAction = lodash.groupBy(groupedRecordInputs, (record) =>
const groupedRecordsByAction = lodash.groupBy(events, (record) =>
record.message.action?.toLowerCase(),
);

Expand All @@ -167,55 +145,55 @@ async function processRecordInputs(groupedRecordInputs) {
if (groupedRecordsByAction.delete) {
const deleteRecordChunksArray = returnArrayOfSubarrays(
groupedRecordsByAction.delete,
maxUserCountNumber,
MAX_USER_COUNT,
);
deleteResponse = processRecordEventArray(
deleteRecordChunksArray,
userSchema,
cleanUserSchema,
isHashRequired,
disableFormat,
paramsPayload,
prepareParams,
destination,
'remove',
operationAudienceId,
audienceId,
);
}

if (groupedRecordsByAction.insert) {
const insertRecordChunksArray = returnArrayOfSubarrays(
groupedRecordsByAction.insert,
maxUserCountNumber,
MAX_USER_COUNT,
);

insertResponse = processRecordEventArray(
insertRecordChunksArray,
userSchema,
cleanUserSchema,
isHashRequired,
disableFormat,
paramsPayload,
prepareParams,
destination,
'add',
operationAudienceId,
audienceId,
);
}

if (groupedRecordsByAction.update) {
const updateRecordChunksArray = returnArrayOfSubarrays(
groupedRecordsByAction.update,
maxUserCountNumber,
MAX_USER_COUNT,
);
updateResponse = processRecordEventArray(
updateRecordChunksArray,
userSchema,
cleanUserSchema,
isHashRequired,
disableFormat,
paramsPayload,
prepareParams,
destination,
'add',
operationAudienceId,
audienceId,
);
}

Expand All @@ -225,6 +203,7 @@ async function processRecordInputs(groupedRecordInputs) {
deleteResponse,
insertResponse,
updateResponse,

errorResponse,
);
if (finalResponse.length === 0) {
Expand All @@ -235,6 +214,67 @@ async function processRecordInputs(groupedRecordInputs) {
return finalResponse;
}

function processRecordInputsV1(groupedRecordInputs) {
const { destination } = groupedRecordInputs[0];
const { message } = groupedRecordInputs[0];
const { isHashRequired, disableFormat, type, subType, isRaw, audienceId, userSchema } =
destination.Config;

let operationAudienceId = audienceId;
let updatedUserSchema = userSchema;
if (isEventSentByVDMV1Flow(groupedRecordInputs[0])) {
const { objectType } = getDestinationExternalIDInfoForRetl(message, 'FB_CUSTOM_AUDIENCE');
operationAudienceId = objectType;
updatedUserSchema = getSchemaForEventMappedToDest(message);
}

return preparePayload(groupedRecordInputs, {
audienceId: operationAudienceId,
userSchema: updatedUserSchema,
isRaw,
type,
subType,
isHashRequired,
disableFormat,
});
}

const processRecordInputsV2 = (groupedRecordInputs) => {
const { connection, message } = groupedRecordInputs[0];
const { isHashRequired, disableFormat, type, subType, isRaw, audienceId } =
connection.config.destination;
// Ref: https://www.notion.so/rudderstacks/VDM-V2-Final-Config-and-Record-EventPayload-8cc80f3d88ad46c7bc43df4b87a0bbff
const identifiers = message?.identifiers;
let userSchema;
if (identifiers) {
userSchema = Object.keys(identifiers);
}
const events = groupedRecordInputs.map((record) => ({
...record,
message: {
...record.message,
fields: record.message.identifiers,
},
}));
return preparePayload(events, {
audienceId,
userSchema,
isRaw,
type,
subType,
isHashRequired,
disableFormat,
});
};

function processRecordInputs(groupedRecordInputs) {
const event = groupedRecordInputs[0];
if (isEventSentByVDMV2Flow(event)) {
return processRecordInputsV2(groupedRecordInputs);
}
return processRecordInputsV1(groupedRecordInputs);
}

module.exports = {
processRecordInputs,
};
Loading

0 comments on commit f33f525

Please sign in to comment.