diff --git a/src/imports/data/data.js b/src/imports/data/data.js index 8d8a668..e44042c 100644 --- a/src/imports/data/data.js +++ b/src/imports/data/data.js @@ -881,30 +881,54 @@ export async function create({ authTokenId, document, data, contextUser, upsert, ); } - if (resultRecord != null) { - if (MetaObject.Namespace.plan?.useExternalKonsistent === true && Konsistent.isQueueEnabled) { - tracingSpan?.addEvent('Sending Konsistent message'); - await queueManager.sendMessage(Konsistent.queue.resource, Konsistent.queue.name, { metaName: document, operation: 'create', data: resultRecord }); - } else { - try { - tracingSpan?.addEvent('Processing sync Konsistent'); - await processIncomingChange(document, resultRecord, 'create', user, resultRecord, dbSession); - } catch (e) { - tracingSpan?.addEvent('Error on Konsistent', { error: e.message }); - logger.error(e, `Error on processIncomingChange ${document}: ${e.message}`); - await dbSession.abortTransaction(); - return errorReturn(`[${document}] Error on Konsistent: ${e.message}`); - } + // Process sync Konsistent + if (MetaObject.Namespace.plan?.useExternalKonsistent !== true || Konsistent.isQueueEnabled === false) { + try { + tracingSpan?.addEvent('Processing sync Konsistent'); + await processIncomingChange(document, resultRecord, 'create', user, resultRecord, dbSession); + } catch (e) { + tracingSpan?.addEvent('Error on Konsistent', { error: e.message }); + logger.error(e, `Error on processIncomingChange ${document}: ${e.message}`); + await dbSession.abortTransaction(); + return errorReturn(`[${document}] Error on Konsistent: ${e.message}`); } - - await eventManager.sendEvent(document, 'create', resultRecord); - return successReturn([dateToString(resultRecord)]); } + + return successReturn([dateToString(resultRecord)]); } + + return errorReturn(`[${document}] Error on insert, there is no affected record`); }); if (transactionResult != null && transactionResult.success != null) { tracingSpan?.addEvent('Operation result', omit(transactionResult, ['data'])); + + // Process events and messages after transaction completes successfully + if (transactionResult.success === true && transactionResult.data?.[0] != null) { + const record = transactionResult.data[0]; + + // Process Konsistent + if (MetaObject.Namespace.plan?.useExternalKonsistent === true && Konsistent.isQueueEnabled) { + tracingSpan?.addEvent('Sending Konsistent message'); + try { + await queueManager.sendMessage(Konsistent.queue.resource, Konsistent.queue.name, { + metaName: document, + operation: 'create', + data: record + }); + } catch (e) { + logger.error(e, `Error sending Konsistent message: ${e.message}`); + } + } + + // Send events + try { + await eventManager.sendEvent(document, 'create', record); + } catch (e) { + logger.error(e, `Error sending event: ${e.message}`); + } + } + return transactionResult; } } catch (e) { @@ -1355,17 +1379,14 @@ export async function update({ authTokenId, document, data, contextUser, tracing await runScriptAfterSave({ script: metaObject.scriptAfterSave, data: updatedRecords, user, extraData: { original: existsRecords } }); } - logger.debug('Processing Konsistent'); + // Process sync Konsistent + if (MetaObject.Namespace.plan?.useExternalKonsistent !== true || Konsistent.isQueueEnabled === false) { + logger.debug('Processing Konsistent'); + for await (const record of updatedRecords) { + const original = existsRecords.find(r => r._id === record._id); + const newRecord = omit(record, ['_id', '_createdAt', '_createdBy', '_updatedAt', '_updatedBy']); + const changedProps = objectsDiff(original, newRecord); - for await (const record of updatedRecords) { - const original = existsRecords.find(r => r._id === record._id); - const newRecord = omit(record, ['_id', '_createdAt', '_createdBy', '_updatedAt', '_updatedBy']); - - const changedProps = objectsDiff(original, newRecord); - if (MetaObject.Namespace.plan?.useExternalKonsistent === true && Konsistent.isQueueEnabled) { - tracingSpan?.addEvent('Sending Konsistent message'); - await queueManager.sendMessage(Konsistent.queue.resource, Konsistent.queue.name, { metaName: document, operation: 'update', data: changedProps }); - } else { try { tracingSpan?.addEvent('Processing sync Konsistent'); await processIncomingChange(document, record, 'update', user, changedProps, dbSession); @@ -1379,7 +1400,6 @@ export async function update({ authTokenId, document, data, contextUser, tracing } } - await Promise.all(updateResults.map(({ data }) => eventManager.sendEvent(document, 'update', data))); const responseData = updatedRecords.map(record => removeUnauthorizedDataForRead(access, record, user, metaObject)).map(record => dateToString(record)); if (emailsToSend.length > 0) { @@ -1411,6 +1431,39 @@ export async function update({ authTokenId, document, data, contextUser, tracing if (transactionResult != null && transactionResult.success != null) { tracingSpan?.addEvent('Operation result', omit(transactionResult, ['data'])); + + // Process events and messages after transaction completes successfully + if (transactionResult.success === true && transactionResult.data?.length > 0) { + const updatedRecords = transactionResult.data; + + // Process Konsistent + if (MetaObject.Namespace.plan?.useExternalKonsistent === true && Konsistent.isQueueEnabled) { + tracingSpan?.addEvent('Sending Konsistent messages'); + for (const record of updatedRecords) { + try { + const original = existsRecords.find(r => r._id === record._id); + const newRecord = omit(record, ['_id', '_createdAt', '_createdBy', '_updatedAt', '_updatedBy']); + const changedProps = objectsDiff(original, newRecord); + + await queueManager.sendMessage(Konsistent.queue.resource, Konsistent.queue.name, { + metaName: document, + operation: 'update', + data: changedProps + }); + } catch (e) { + logger.error(e, `Error sending Konsistent message: ${e.message}`); + } + } + } + + // Send events + try { + await Promise.all(updatedRecords.map(record => eventManager.sendEvent(document, 'update', record))); + } catch (e) { + logger.error(e, `Error sending events: ${e.message}`); + } + } + return transactionResult; } } catch (e) {