Skip to content

Commit

Permalink
fix: send to konsistent after the transaction committed
Browse files Browse the repository at this point in the history
  • Loading branch information
7sete7 committed Jan 8, 2025
1 parent 8c33b2c commit 573a5f4
Showing 1 changed file with 81 additions and 28 deletions.
109 changes: 81 additions & 28 deletions src/imports/data/data.js
Original file line number Diff line number Diff line change
Expand Up @@ -881,30 +881,54 @@ export async function create({ authTokenId, document, data, contextUser, upsert,
);
}

if (resultRecord != null) {
if (MetaObject.Namespace.plan?.useExternalKonsistent === true && Konsistent.isQueueEnabled) {
tracingSpan?.addEvent('Sending Konsistent message');
await queueManager.sendMessage(Konsistent.queue.resource, Konsistent.queue.name, { metaName: document, operation: 'create', data: resultRecord });
} else {
try {
tracingSpan?.addEvent('Processing sync Konsistent');
await processIncomingChange(document, resultRecord, 'create', user, resultRecord, dbSession);
} catch (e) {
tracingSpan?.addEvent('Error on Konsistent', { error: e.message });
logger.error(e, `Error on processIncomingChange ${document}: ${e.message}`);
await dbSession.abortTransaction();
return errorReturn(`[${document}] Error on Konsistent: ${e.message}`);
}
// Process sync Konsistent
if (MetaObject.Namespace.plan?.useExternalKonsistent !== true || Konsistent.isQueueEnabled === false) {
try {
tracingSpan?.addEvent('Processing sync Konsistent');
await processIncomingChange(document, resultRecord, 'create', user, resultRecord, dbSession);
} catch (e) {
tracingSpan?.addEvent('Error on Konsistent', { error: e.message });
logger.error(e, `Error on processIncomingChange ${document}: ${e.message}`);
await dbSession.abortTransaction();
return errorReturn(`[${document}] Error on Konsistent: ${e.message}`);
}

await eventManager.sendEvent(document, 'create', resultRecord);
return successReturn([dateToString(resultRecord)]);
}

return successReturn([dateToString(resultRecord)]);
}

return errorReturn(`[${document}] Error on insert, there is no affected record`);
});

if (transactionResult != null && transactionResult.success != null) {
tracingSpan?.addEvent('Operation result', omit(transactionResult, ['data']));

// Process events and messages after transaction completes successfully
if (transactionResult.success === true && transactionResult.data?.[0] != null) {
const record = transactionResult.data[0];

// Process Konsistent
if (MetaObject.Namespace.plan?.useExternalKonsistent === true && Konsistent.isQueueEnabled) {
tracingSpan?.addEvent('Sending Konsistent message');
try {
await queueManager.sendMessage(Konsistent.queue.resource, Konsistent.queue.name, {
metaName: document,
operation: 'create',
data: record
});
} catch (e) {
logger.error(e, `Error sending Konsistent message: ${e.message}`);
}
}

// Send events
try {
await eventManager.sendEvent(document, 'create', record);
} catch (e) {
logger.error(e, `Error sending event: ${e.message}`);
}
}

return transactionResult;
}
} catch (e) {
Expand Down Expand Up @@ -1355,17 +1379,14 @@ export async function update({ authTokenId, document, data, contextUser, tracing
await runScriptAfterSave({ script: metaObject.scriptAfterSave, data: updatedRecords, user, extraData: { original: existsRecords } });
}

logger.debug('Processing Konsistent');
// Process sync Konsistent
if (MetaObject.Namespace.plan?.useExternalKonsistent !== true || Konsistent.isQueueEnabled === false) {
logger.debug('Processing Konsistent');
for await (const record of updatedRecords) {
const original = existsRecords.find(r => r._id === record._id);
const newRecord = omit(record, ['_id', '_createdAt', '_createdBy', '_updatedAt', '_updatedBy']);
const changedProps = objectsDiff(original, newRecord);

for await (const record of updatedRecords) {
const original = existsRecords.find(r => r._id === record._id);
const newRecord = omit(record, ['_id', '_createdAt', '_createdBy', '_updatedAt', '_updatedBy']);

const changedProps = objectsDiff(original, newRecord);
if (MetaObject.Namespace.plan?.useExternalKonsistent === true && Konsistent.isQueueEnabled) {
tracingSpan?.addEvent('Sending Konsistent message');
await queueManager.sendMessage(Konsistent.queue.resource, Konsistent.queue.name, { metaName: document, operation: 'update', data: changedProps });
} else {
try {
tracingSpan?.addEvent('Processing sync Konsistent');
await processIncomingChange(document, record, 'update', user, changedProps, dbSession);
Expand All @@ -1379,7 +1400,6 @@ export async function update({ authTokenId, document, data, contextUser, tracing
}
}

await Promise.all(updateResults.map(({ data }) => eventManager.sendEvent(document, 'update', data)));
const responseData = updatedRecords.map(record => removeUnauthorizedDataForRead(access, record, user, metaObject)).map(record => dateToString(record));

if (emailsToSend.length > 0) {
Expand Down Expand Up @@ -1411,6 +1431,39 @@ export async function update({ authTokenId, document, data, contextUser, tracing

if (transactionResult != null && transactionResult.success != null) {
tracingSpan?.addEvent('Operation result', omit(transactionResult, ['data']));

// Process events and messages after transaction completes successfully
if (transactionResult.success === true && transactionResult.data?.length > 0) {
const updatedRecords = transactionResult.data;

// Process Konsistent
if (MetaObject.Namespace.plan?.useExternalKonsistent === true && Konsistent.isQueueEnabled) {
tracingSpan?.addEvent('Sending Konsistent messages');
for (const record of updatedRecords) {
try {
const original = existsRecords.find(r => r._id === record._id);
const newRecord = omit(record, ['_id', '_createdAt', '_createdBy', '_updatedAt', '_updatedBy']);
const changedProps = objectsDiff(original, newRecord);

await queueManager.sendMessage(Konsistent.queue.resource, Konsistent.queue.name, {
metaName: document,
operation: 'update',
data: changedProps
});
} catch (e) {
logger.error(e, `Error sending Konsistent message: ${e.message}`);
}
}
}

// Send events
try {
await Promise.all(updatedRecords.map(record => eventManager.sendEvent(document, 'update', record)));
} catch (e) {
logger.error(e, `Error sending events: ${e.message}`);
}
}

return transactionResult;
}
} catch (e) {
Expand Down

0 comments on commit 573a5f4

Please sign in to comment.