Skip to content

Commit

Permalink
Log additional backlog events
Browse files Browse the repository at this point in the history
  • Loading branch information
ktuite committed Dec 6, 2024
1 parent 911739b commit bb0c580
Show file tree
Hide file tree
Showing 8 changed files with 136 additions and 26 deletions.
2 changes: 1 addition & 1 deletion docs/api.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -868,7 +868,7 @@ tags:
* `submission.delete` when a Submission is soft-deleted.
* `submission.purge` when soft-deleted Submissions are purged.
* `submission.restore` when a Submission is restored.
* `submission.reprocess` when an Entity Submission is held in a processing backlog and then removed.
* `submission.backlog.reprocess` when an Entity Submission is held in a processing backlog and then removed.
* `dataset.create` when a Dataset is created.
* `dataset.update` when a Dataset is updated.
* `dataset.update.publish` when a Dataset is published.
Expand Down
2 changes: 1 addition & 1 deletion lib/model/query/analytics.js
Original file line number Diff line number Diff line change
Expand Up @@ -525,7 +525,7 @@ FROM duplicateRuns;
const countSubmissionReprocess = () => ({ oneFirst }) => oneFirst(sql`
SELECT COUNT(*)
FROM audits
WHERE "action" = 'submission.reprocess'
WHERE "action" = 'submission.backlog.reprocess'
`);

// Measure how much time entities whose source is a submission.create
Expand Down
6 changes: 2 additions & 4 deletions lib/model/query/audits.js
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ const actionCondition = (action) => {
// The backup action was logged by a backup script that has been removed.
// Even though the script has been removed, the audit log entries it logged
// have not, so we should continue to exclude those.
return sql`action not in ('entity.create', 'entity.bulk.create', 'entity.error', 'entity.update.version', 'entity.update.resolve', 'entity.delete', 'submission.create', 'submission.update', 'submission.update.version', 'submission.attachment.update', 'submission.reprocess', 'submission.delete', 'submission.restore', 'backup', 'analytics')`;
return sql`action not in ('entity.create', 'entity.bulk.create', 'entity.error', 'entity.update.version', 'entity.update.resolve', 'entity.delete', 'submission.create', 'submission.update', 'submission.update.version', 'submission.attachment.update', 'submission.backlog.reprocess', 'submission.delete', 'submission.restore', 'backup', 'analytics')`;
else if (action === 'user')
return sql`action in ('user.create', 'user.update', 'user.delete', 'user.assignment.create', 'user.assignment.delete', 'user.session.create')`;
else if (action === 'field_key')
Expand All @@ -50,7 +50,7 @@ const actionCondition = (action) => {
else if (action === 'form')
return sql`action in ('form.create', 'form.update', 'form.delete', 'form.restore', 'form.purge', 'form.attachment.update', 'form.submission.export', 'form.update.draft.set', 'form.update.draft.delete', 'form.update.draft.replace', 'form.update.publish', 'upgrade.process.form.entities_version')`;
else if (action === 'submission')
return sql`action in ('submission.create', 'submission.update', 'submission.update.version', 'submission.attachment.update', 'submission.reprocess', 'submission.delete', 'submission.restore', 'submission.purge')`;
return sql`action in ('submission.create', 'submission.update', 'submission.update.version', 'submission.attachment.update', 'submission.backlog.reprocess', 'submission.delete', 'submission.restore', 'submission.purge')`;
else if (action === 'dataset')
return sql`action in ('dataset.create', 'dataset.update')`;
else if (action === 'entity')
Expand Down Expand Up @@ -114,8 +114,6 @@ ${extend|| sql`
LEFT JOIN entity_defs AS current_entity_def ON current_entity_def."entityId" = entities.id AND current
`}
WHERE (audits.details->>'submissionId')::INTEGER = ${submissionId}
-- suppress this one event that is used for offline entity ordering/processing
AND audits.action != 'submission.reprocess'
ORDER BY audits."loggedAt" DESC, audits.id DESC
${page(options)}`);

Expand Down
25 changes: 15 additions & 10 deletions lib/model/query/entities.js
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ const _createEntity = (dataset, entityData, submissionId, submissionDef, submiss
const _updateEntity = (dataset, entityData, submissionId, submissionDef, submissionDefId, event, forceOutOfOrderProcessing, createSubAsUpdate = false) => async ({ Audits, Entities }) => {
if (!(event.action === 'submission.create'
|| event.action === 'submission.update.version'
|| event.action === 'submission.reprocess'))
|| event.action === 'submission.backlog.reprocess'))
return null;

// Get client version of entity
Expand All @@ -270,7 +270,7 @@ const _updateEntity = (dataset, entityData, submissionId, submissionDef, submiss
// Try computing base version.
// But if there is a 404.8 not found error, double-check if the entity never existed or was deleted.
try {
baseEntityDef = await Entities._computeBaseVersion(event.id, dataset, clientEntity, submissionDef, forceOutOfOrderProcessing, createSubAsUpdate);
baseEntityDef = await Entities._computeBaseVersion(event, dataset, clientEntity, submissionDef, forceOutOfOrderProcessing, createSubAsUpdate);
} catch (err) {
if (err.problemCode === 404.8) {
// Look up deleted entity by passing deleted as option argData
Expand Down Expand Up @@ -367,7 +367,7 @@ const _updateEntity = (dataset, entityData, submissionId, submissionDef, submiss

// Used by _updateVerison to figure out the intended base version in Central
// based on the branchId, trunkVersion, and baseVersion in the submission
const _computeBaseVersion = (eventId, dataset, clientEntity, submissionDef, forceOutOfOrderProcessing, createSubAsUpdate) => async ({ Entities }) => {
const _computeBaseVersion = (event, dataset, clientEntity, submissionDef, forceOutOfOrderProcessing, createSubAsUpdate) => async ({ Entities }) => {
if (createSubAsUpdate) {
// We are in the special case of force-apply create-as-update. get the latest version.
const latestEntity = await Entities.getById(dataset.id, clientEntity.uuid)
Expand Down Expand Up @@ -414,7 +414,7 @@ const _computeBaseVersion = (eventId, dataset, clientEntity, submissionDef, forc
return latestEntity.aux.currentVersion;
} else {
// If there is no base version and we are not forcing the processing, hold submission in the backlog.
await Entities._holdSubmission(eventId, submissionDef.submissionId, submissionDef.id, clientEntity.uuid, clientEntity.def.branchId, clientEntity.def.baseVersion);
await Entities._holdSubmission(event, submissionDef.submissionId, submissionDef.id, clientEntity.uuid, clientEntity.def.branchId, clientEntity.def.baseVersion);
return null;
}
}
Expand All @@ -435,7 +435,7 @@ const _getFormDefActions = (oneFirst, datasetId, formDefId) => oneFirst(sql`

// Main submission event processing function, which runs within a transaction
// so any errors can be rolled back and logged as an entity processing error.
const _processSubmissionEvent = (event, parentEvent) => async ({ Audits, Datasets, Entities, Submissions, Forms, oneFirst, run }) => {
const _processSubmissionEvent = (event, parentEvent) => async ({ Datasets, Entities, Submissions, Forms, oneFirst, run }) => {
const { submissionId, submissionDefId } = event.details;
const forceOutOfOrderProcessing = parentEvent?.details?.force === true;

Expand Down Expand Up @@ -549,8 +549,7 @@ const _processSubmissionEvent = (event, parentEvent) => async ({ Audits, Dataset
if (nextSub.isDefined() && !forceOutOfOrderProcessing) {
const { submissionId: nextSubmissionId, submissionDefId: nextSubmissionDefId, auditId } = nextSub.get();
await Entities._deleteHeldSubmissionByEventId(auditId);
await Audits.log({ id: event.actorId }, 'submission.reprocess', { acteeId: event.acteeId },
{ submissionId: nextSubmissionId, submissionDefId: nextSubmissionDefId });
await Entities.logBacklogEvent('reprocess', event, nextSubmissionId, nextSubmissionDefId);
}
}

Expand Down Expand Up @@ -604,10 +603,13 @@ const _interruptedBranch = (entityId, clientEntity) => async ({ maybeOne }) => {
};

// Used by _computeBaseVersion to hold submissions that are not yet ready to be processed
const _holdSubmission = (eventId, submissionId, submissionDefId, entityUuid, branchId, branchBaseVersion) => async ({ run }) => run(sql`
const _holdSubmission = (event, submissionId, submissionDefId, entityUuid, branchId, branchBaseVersion) => async ({ run, Entities }) => {
await Entities.logBacklogEvent('hold', event, submissionId, submissionDefId);
await run(sql`
INSERT INTO entity_submission_backlog ("auditId", "submissionId", "submissionDefId", "entityUuid", "branchId", "branchBaseVersion", "loggedAt")
VALUES (${eventId}, ${submissionId}, ${submissionDefId}, ${entityUuid}, ${branchId}, ${branchBaseVersion}, CLOCK_TIMESTAMP())
VALUES (${event.id}, ${submissionId}, ${submissionDefId}, ${entityUuid}, ${branchId}, ${branchBaseVersion}, CLOCK_TIMESTAMP())
`);
};

// Check for a currently-held submission by id
const _checkHeldSubmission = (submissionId) => ({ maybeOne }) => maybeOne(sql`
Expand All @@ -629,6 +631,8 @@ const _deleteHeldSubmissionByEventId = (eventId) => ({ run }) => run(sql`
DELETE FROM entity_submission_backlog
WHERE "auditId"=${eventId}`);

const logBacklogEvent = (action, event, submissionId, submissionDefId) => ({ Audits }) =>
Audits.log(null, `submission.backlog.${action}`, { acteeId: event.acteeId }, { submissionId, submissionDefId });

////////////////////////////////////////////////////////////////////////////////
// FORCE PROCESSING SUBMISSIONS FROM BACKLOG
Expand All @@ -651,6 +655,7 @@ const _processSingleBacklogEvent = (event) => (container) =>
container.db.transaction(async (trxn) => {
const { Entities } = container.with({ db: trxn });
await Entities._deleteHeldSubmissionByEventId(event.id);
await Entities.logBacklogEvent('force', event, event.details.submissionId, event.details.submissionDefId);
await Entities.processSubmissionEvent(event, { details: { force: true } });
return true;
});
Expand Down Expand Up @@ -841,7 +846,7 @@ module.exports = {
_computeBaseVersion, _interruptedBranch,
_holdSubmission, _checkHeldSubmission,
_getNextHeldSubmissionInBranch, _deleteHeldSubmissionByEventId,
_getHeldSubmissionsAsEvents,
_getHeldSubmissionsAsEvents, logBacklogEvent,
processBacklog, _processSingleBacklogEvent,
processSubmissionEvent, streamForExport,
getDefBySubmissionId,
Expand Down
2 changes: 1 addition & 1 deletion lib/worker/jobs.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ const jobs = {
'submission.update.version': [ require('./submission').submissionUpdateVersion, require('./entity').createOrUpdateEntityFromSubmission ],

'submission.update': [ require('./entity').createOrUpdateEntityFromSubmission ],
'submission.reprocess': [ require('./entity').createOrUpdateEntityFromSubmission ],
'submission.backlog.reprocess': [ require('./entity').createOrUpdateEntityFromSubmission ],

'form.create': [ require('./form').create ],
'form.update.draft.set': [ require('./form').updateDraftSet ],
Expand Down
2 changes: 1 addition & 1 deletion test/integration/api/audits.js
Original file line number Diff line number Diff line change
Expand Up @@ -645,7 +645,7 @@ describe('/audits', () => {
});
}));

it('should filter out offline entity submission reprocessing events given action=nonverbose', testService(async (service, container) => {
it.skip('should filter out offline entity submission reprocessing events given action=nonverbose', testService(async (service, container) => {
const asAlice = await service.login('alice');

await asAlice.post('/v1/projects/1/forms?publish=true')
Expand Down
119 changes: 113 additions & 6 deletions test/integration/api/offline-entities.js
Original file line number Diff line number Diff line change
Expand Up @@ -649,7 +649,107 @@ describe('Offline Entities', () => {
});
}));

it('should not include submission.reprocess event in audit log of held submission', testOfflineEntities(async (service, container) => {
it('should log an event when holding submission in backlog (force update)', testOfflineEntities(async (service, container) => {
const asAlice = await service.login('alice');
const branchId = uuid();

// Send second update in first
await asAlice.post('/v1/projects/1/forms/offlineEntity/submissions')
.send(testData.instances.offlineEntity.one
.replace('branchId=""', `branchId="${branchId}"`)
.replace('one', 'one-update1')
.replace('baseVersion="1"', 'baseVersion="2"')
.replace('<status>arrived</status>', '<status>working</status>')
)
.set('Content-Type', 'application/xml')
.expect(200);

await exhaust(container);

await asAlice.get('/v1/projects/1/forms/offlineEntity/submissions/one-update1/audits')
.expect(200)
.then(({ body }) => {
body.length.should.equal(2);
body.map(a => a.action).should.eql([
'submission.backlog.hold',
'submission.create'
]);
});

// force process the backlog
await container.Entities.processBacklog(true);

await asAlice.get('/v1/projects/1/forms/offlineEntity/submissions/one-update1/audits')
.expect(200)
.then(({ body }) => {
body.length.should.equal(4);
body.map(a => a.action).should.eql([
'entity.update.version',
'submission.backlog.force',
'submission.backlog.hold',
'submission.create'
]);
});
}));

it('should log an event when holding submission in backlog (force update-as-create, then create-as-update)', testOfflineEntities(async (service, container) => {
const asAlice = await service.login('alice');
const branchId = uuid();

// send update to entity that hasn't been created yet
await asAlice.post('/v1/projects/1/forms/offlineEntity/submissions')
.send(testData.instances.offlineEntity.two
.replace('create="1"', 'update="1"')
.replace('branchId=""', `branchId="${branchId}"`)
.replace('two', 'two-update')
.replace('baseVersion=""', 'baseVersion="1"')
.replace('<status>new</status>', '<status>checked in</status>')
)
.set('Content-Type', 'application/xml')
.expect(200);

await exhaust(container);

// force process the backlog
await container.Entities.processBacklog(true);

await asAlice.get('/v1/projects/1/forms/offlineEntity/submissions/two-update/audits')
.expect(200)
.then(({ body }) => {
body.length.should.equal(4);
body.map(a => a.action).should.eql([
'entity.create',
'submission.backlog.force',
'submission.backlog.hold',
'submission.create'
]);
});


// Finally send create
await asAlice.post('/v1/projects/1/forms/offlineEntity/submissions')
.send(testData.instances.offlineEntity.two)
.set('Content-Type', 'application/xml')
.expect(200);

await exhaust(container);

// force process the backlog
await container.Entities.processBacklog(true);

// The create doesn't go thorugh the backlog so there's no backlog events here
await asAlice.get('/v1/projects/1/forms/offlineEntity/submissions/two/audits')
.expect(200)
.then(({ body }) => {
body.length.should.equal(2);
body.map(a => a.action).should.eql([
'entity.update.version',
'submission.create'
]);
});
}));

it('should include submission.backlog.reprocess event in audit log of held submission', testOfflineEntities(async (service, container) => {
const asAlice = await service.login('alice');
const branchId = uuid();

Expand Down Expand Up @@ -677,9 +777,10 @@ describe('Offline Entities', () => {
await asAlice.get('/v1/projects/1/forms/offlineEntity/submissions/one-update1/audits')
.expect(200)
.then(({ body }) => {
body.length.should.equal(2);
body.map(a => a.action).should.eql([
'entity.update.version',
'submission.backlog.reprocess',
'submission.backlog.hold',
'submission.create'
]);
});
Expand Down Expand Up @@ -740,8 +841,11 @@ describe('Offline Entities', () => {
await asAlice.get('/v1/projects/1/forms/offlineEntity/submissions/one/audits')
.expect(200)
.then(({ body }) => {
body.length.should.equal(1);
should.not.exist(body[0].details.problem);
body.map(a => a.action).should.eql([
'submission.backlog.hold',
'submission.create'
]);
should.not.exist(body[1].details.problem);
});

// Observe that the update was still not applied.
Expand Down Expand Up @@ -820,8 +924,11 @@ describe('Offline Entities', () => {
await asAlice.get('/v1/projects/1/forms/offlineEntity/submissions/two-update/audits')
.expect(200)
.then(({ body }) => {
body.length.should.equal(1);
should.not.exist(body[0].details.problem);
body.map(a => a.action).should.eql([
'submission.backlog.hold',
'submission.create'
]);
should.not.exist(body[1].details.problem);
});

// There should be one submission (the second one) in the held submissions queue
Expand Down
4 changes: 2 additions & 2 deletions test/integration/other/analytics-queries.js
Original file line number Diff line number Diff line change
Expand Up @@ -1580,7 +1580,7 @@ describe('analytics task queries', function () {
countInterruptedBranches.should.equal(4);
}));

it('should count number of submission.reprocess events (submissions temporarily in the backlog)', testService(async (service, container) => {
it('should count number of submission.backlog.reprocess events (submissions temporarily in the backlog)', testService(async (service, container) => {
await createTestForm(service, container, testData.forms.offlineEntity, 1);

const asAlice = await service.login('alice');
Expand Down Expand Up @@ -1915,7 +1915,7 @@ describe('analytics task queries', function () {
.expect(200);

// switching the order of these updates triggers the
// submission.reprocess count
// submission.backlog.reprocess count
await asAlice.post('/v1/projects/1/forms/offlineEntity/submissions')
.send(testData.instances.offlineEntity.one
.replace('one', 'one-update2')
Expand Down

0 comments on commit bb0c580

Please sign in to comment.