Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Log additional backlog events #1332

Merged
merged 5 commits into from
Dec 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion docs/api.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -868,7 +868,9 @@ tags:
* `submission.delete` when a Submission is soft-deleted.
* `submission.purge` when soft-deleted Submissions are purged.
* `submission.restore` when a Submission is restored.
* `submission.reprocess` when an Entity Submission is held in a processing backlog and then removed.
* `submission.backlog.hold` when an Entity Submission is first held in processing backlog.
* `submission.backlog.reprocess` when an Entity Submission is reprocessed and removed from the backlog.
* `submission.backlog.force` when an Entity Submission is force-processed after being in backlog.
* `dataset.create` when a Dataset is created.
* `dataset.update` when a Dataset is updated.
* `dataset.update.publish` when a Dataset is published.
Expand Down
2 changes: 1 addition & 1 deletion lib/model/query/analytics.js
Original file line number Diff line number Diff line change
Expand Up @@ -525,7 +525,7 @@ FROM duplicateRuns;
const countSubmissionReprocess = () => ({ oneFirst }) => oneFirst(sql`
SELECT COUNT(*)
FROM audits
WHERE "action" = 'submission.reprocess'
WHERE "action" = 'submission.backlog.reprocess'
`);

// Measure how much time entities whose source is a submission.create
Expand Down
6 changes: 2 additions & 4 deletions lib/model/query/audits.js
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ const actionCondition = (action) => {
// The backup action was logged by a backup script that has been removed.
// Even though the script has been removed, the audit log entries it logged
// have not, so we should continue to exclude those.
return sql`action not in ('entity.create', 'entity.bulk.create', 'entity.error', 'entity.update.version', 'entity.update.resolve', 'entity.delete', 'submission.create', 'submission.update', 'submission.update.version', 'submission.attachment.update', 'submission.reprocess', 'submission.delete', 'submission.restore', 'backup', 'analytics')`;
return sql`action not in ('entity.create', 'entity.bulk.create', 'entity.error', 'entity.update.version', 'entity.update.resolve', 'entity.delete', 'submission.create', 'submission.update', 'submission.update.version', 'submission.attachment.update', 'submission.backlog.hold', 'submission.backlog.reprocess', 'submission.backlog.force', 'submission.delete', 'submission.restore', 'backup', 'analytics')`;
else if (action === 'user')
return sql`action in ('user.create', 'user.update', 'user.delete', 'user.assignment.create', 'user.assignment.delete', 'user.session.create')`;
else if (action === 'field_key')
Expand All @@ -50,7 +50,7 @@ const actionCondition = (action) => {
else if (action === 'form')
return sql`action in ('form.create', 'form.update', 'form.delete', 'form.restore', 'form.purge', 'form.attachment.update', 'form.submission.export', 'form.update.draft.set', 'form.update.draft.delete', 'form.update.draft.replace', 'form.update.publish', 'upgrade.process.form.entities_version')`;
else if (action === 'submission')
return sql`action in ('submission.create', 'submission.update', 'submission.update.version', 'submission.attachment.update', 'submission.reprocess', 'submission.delete', 'submission.restore', 'submission.purge')`;
return sql`action in ('submission.create', 'submission.update', 'submission.update.version', 'submission.attachment.update', 'submission.backlog.hold', 'submission.backlog.reprocess', 'submission.backlog.force', 'submission.delete', 'submission.restore', 'submission.purge')`;
else if (action === 'dataset')
return sql`action in ('dataset.create', 'dataset.update')`;
else if (action === 'entity')
Expand Down Expand Up @@ -114,8 +114,6 @@ ${extend|| sql`
LEFT JOIN entity_defs AS current_entity_def ON current_entity_def."entityId" = entities.id AND current
`}
WHERE (audits.details->>'submissionId')::INTEGER = ${submissionId}
-- suppress this one event that is used for offline entity ordering/processing
AND audits.action != 'submission.reprocess'
ORDER BY audits."loggedAt" DESC, audits.id DESC
${page(options)}`);

Expand Down
25 changes: 15 additions & 10 deletions lib/model/query/entities.js
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ const _createEntity = (dataset, entityData, submissionId, submissionDef, submiss
const _updateEntity = (dataset, entityData, submissionId, submissionDef, submissionDefId, event, forceOutOfOrderProcessing, createSubAsUpdate = false) => async ({ Audits, Entities }) => {
if (!(event.action === 'submission.create'
|| event.action === 'submission.update.version'
|| event.action === 'submission.reprocess'))
|| event.action === 'submission.backlog.reprocess'))
return null;

// Get client version of entity
Expand All @@ -270,7 +270,7 @@ const _updateEntity = (dataset, entityData, submissionId, submissionDef, submiss
// Try computing base version.
// But if there is a 404.8 not found error, double-check if the entity never existed or was deleted.
try {
baseEntityDef = await Entities._computeBaseVersion(event.id, dataset, clientEntity, submissionDef, forceOutOfOrderProcessing, createSubAsUpdate);
baseEntityDef = await Entities._computeBaseVersion(event, dataset, clientEntity, submissionDef, forceOutOfOrderProcessing, createSubAsUpdate);
} catch (err) {
if (err.problemCode === 404.8) {
// Look up deleted entity by passing deleted as option argData
Expand Down Expand Up @@ -380,7 +380,7 @@ const _updateEntity = (dataset, entityData, submissionId, submissionDef, submiss

// Used by _updateVerison to figure out the intended base version in Central
// based on the branchId, trunkVersion, and baseVersion in the submission
const _computeBaseVersion = (eventId, dataset, clientEntity, submissionDef, forceOutOfOrderProcessing, createSubAsUpdate) => async ({ Entities }) => {
const _computeBaseVersion = (event, dataset, clientEntity, submissionDef, forceOutOfOrderProcessing, createSubAsUpdate) => async ({ Entities }) => {
if (createSubAsUpdate) {
// We are in the special case of force-apply create-as-update. get the latest version.
const latestEntity = await Entities.getById(dataset.id, clientEntity.uuid)
Expand Down Expand Up @@ -427,7 +427,7 @@ const _computeBaseVersion = (eventId, dataset, clientEntity, submissionDef, forc
return latestEntity.aux.currentVersion;
} else {
// If there is no base version and we are not forcing the processing, hold submission in the backlog.
await Entities._holdSubmission(eventId, submissionDef.submissionId, submissionDef.id, clientEntity.uuid, clientEntity.def.branchId, clientEntity.def.baseVersion);
await Entities._holdSubmission(event, submissionDef.submissionId, submissionDef.id, clientEntity.uuid, clientEntity.def.branchId, clientEntity.def.baseVersion);
return null;
}
}
Expand All @@ -448,7 +448,7 @@ const _getFormDefActions = (oneFirst, datasetId, formDefId) => oneFirst(sql`

// Main submission event processing function, which runs within a transaction
// so any errors can be rolled back and logged as an entity processing error.
const _processSubmissionEvent = (event, parentEvent) => async ({ Audits, Datasets, Entities, Submissions, Forms, oneFirst, run }) => {
const _processSubmissionEvent = (event, parentEvent) => async ({ Datasets, Entities, Submissions, Forms, oneFirst, run }) => {
const { submissionId, submissionDefId } = event.details;
const forceOutOfOrderProcessing = parentEvent?.details?.force === true;

Expand Down Expand Up @@ -562,8 +562,7 @@ const _processSubmissionEvent = (event, parentEvent) => async ({ Audits, Dataset
if (nextSub.isDefined() && !forceOutOfOrderProcessing) {
const { submissionId: nextSubmissionId, submissionDefId: nextSubmissionDefId, auditId } = nextSub.get();
await Entities._deleteHeldSubmissionByEventId(auditId);
await Audits.log({ id: event.actorId }, 'submission.reprocess', { acteeId: event.acteeId },
{ submissionId: nextSubmissionId, submissionDefId: nextSubmissionDefId });
await Entities.logBacklogEvent('reprocess', event, nextSubmissionId, nextSubmissionDefId);
}
}

Expand Down Expand Up @@ -617,10 +616,13 @@ const _interruptedBranch = (entityId, clientEntity) => async ({ maybeOne }) => {
};

// Used by _computeBaseVersion to hold submissions that are not yet ready to be processed
const _holdSubmission = (eventId, submissionId, submissionDefId, entityUuid, branchId, branchBaseVersion) => async ({ run }) => run(sql`
const _holdSubmission = (event, submissionId, submissionDefId, entityUuid, branchId, branchBaseVersion) => async ({ run, Entities }) => {
await Entities.logBacklogEvent('hold', event, submissionId, submissionDefId);
await run(sql`
INSERT INTO entity_submission_backlog ("auditId", "submissionId", "submissionDefId", "entityUuid", "branchId", "branchBaseVersion", "loggedAt")
VALUES (${eventId}, ${submissionId}, ${submissionDefId}, ${entityUuid}, ${branchId}, ${branchBaseVersion}, CLOCK_TIMESTAMP())
VALUES (${event.id}, ${submissionId}, ${submissionDefId}, ${entityUuid}, ${branchId}, ${branchBaseVersion}, CLOCK_TIMESTAMP())
`);
};

// Check for a currently-held submission by id
const _checkHeldSubmission = (submissionId) => ({ maybeOne }) => maybeOne(sql`
Expand All @@ -642,6 +644,8 @@ const _deleteHeldSubmissionByEventId = (eventId) => ({ run }) => run(sql`
DELETE FROM entity_submission_backlog
WHERE "auditId"=${eventId}`);

const logBacklogEvent = (action, event, submissionId, submissionDefId) => ({ Audits }) =>
Audits.log({ id: event.actorId }, `submission.backlog.${action}`, { acteeId: event.acteeId }, { submissionId, submissionDefId });
Comment on lines +647 to +648
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some day we might want to include entityUuid in here, too, but we're not sure what that would look like so we won't do it now.


////////////////////////////////////////////////////////////////////////////////
// FORCE PROCESSING SUBMISSIONS FROM BACKLOG
Expand All @@ -664,6 +668,7 @@ const _processSingleBacklogEvent = (event) => (container) =>
container.db.transaction(async (trxn) => {
const { Entities } = container.with({ db: trxn });
await Entities._deleteHeldSubmissionByEventId(event.id);
await Entities.logBacklogEvent('force', event, event.details.submissionId, event.details.submissionDefId);
await Entities.processSubmissionEvent(event, { details: { force: true } });
return true;
});
Expand Down Expand Up @@ -854,7 +859,7 @@ module.exports = {
_computeBaseVersion, _interruptedBranch,
_holdSubmission, _checkHeldSubmission,
_getNextHeldSubmissionInBranch, _deleteHeldSubmissionByEventId,
_getHeldSubmissionsAsEvents,
_getHeldSubmissionsAsEvents, logBacklogEvent,
processBacklog, _processSingleBacklogEvent,
processSubmissionEvent, streamForExport,
getDefBySubmissionId,
Expand Down
2 changes: 1 addition & 1 deletion lib/worker/jobs.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ const jobs = {
'submission.update.version': [ require('./submission').submissionUpdateVersion, require('./entity').createOrUpdateEntityFromSubmission ],

'submission.update': [ require('./entity').createOrUpdateEntityFromSubmission ],
'submission.reprocess': [ require('./entity').createOrUpdateEntityFromSubmission ],
'submission.backlog.reprocess': [ require('./entity').createOrUpdateEntityFromSubmission ],

'form.create': [ require('./form').create ],
'form.update.draft.set': [ require('./form').updateDraftSet ],
Expand Down
2 changes: 1 addition & 1 deletion test/integration/api/audits.js
Original file line number Diff line number Diff line change
Expand Up @@ -645,7 +645,7 @@ describe('/audits', () => {
});
}));

it('should filter out offline entity submission reprocessing events given action=nonverbose', testService(async (service, container) => {
it('should filter out offline entity submission backlog events given action=nonverbose', testService(async (service, container) => {
const asAlice = await service.login('alice');

await asAlice.post('/v1/projects/1/forms?publish=true')
Expand Down
128 changes: 119 additions & 9 deletions test/integration/api/offline-entities.js
Original file line number Diff line number Diff line change
Expand Up @@ -649,7 +649,104 @@ describe('Offline Entities', () => {
});
}));

it('should not include submission.reprocess event in audit log of held submission', testOfflineEntities(async (service, container) => {
it('should log an event when holding submission in backlog (force update)', testOfflineEntities(async (service, container) => {
const asAlice = await service.login('alice');
const branchId = uuid();

// Send second update in first
await asAlice.post('/v1/projects/1/forms/offlineEntity/submissions')
.send(testData.instances.offlineEntity.one
.replace('branchId=""', `branchId="${branchId}"`)
.replace('one', 'one-update1')
.replace('baseVersion="1"', 'baseVersion="2"')
.replace('<status>arrived</status>', '<status>working</status>')
)
.set('Content-Type', 'application/xml')
.expect(200);

await exhaust(container);

await asAlice.get('/v1/projects/1/forms/offlineEntity/submissions/one-update1/audits')
.expect(200)
.then(({ body }) => {
body.length.should.equal(2);
body.map(a => a.action).should.eql([
'submission.backlog.hold',
'submission.create'
]);
});

// force process the backlog
await container.Entities.processBacklog(true);

await asAlice.get('/v1/projects/1/forms/offlineEntity/submissions/one-update1/audits')
.expect(200)
.then(({ body }) => {
body.length.should.equal(4);
body.map(a => a.action).should.eql([
'entity.update.version',
'submission.backlog.force',
'submission.backlog.hold',
'submission.create'
]);
});
}));

it('should log an event when holding submission in backlog (force update-as-create, then create-as-update)', testOfflineEntities(async (service, container) => {
const asAlice = await service.login('alice');
const branchId = uuid();

// send update to entity that hasn't been created yet
await asAlice.post('/v1/projects/1/forms/offlineEntity/submissions')
.send(testData.instances.offlineEntity.two
.replace('create="1"', 'update="1"')
.replace('branchId=""', `branchId="${branchId}"`)
.replace('two', 'two-update')
.replace('baseVersion=""', 'baseVersion="1"')
.replace('<status>new</status>', '<status>checked in</status>')
)
.set('Content-Type', 'application/xml')
.expect(200);

await exhaust(container);

// force process the backlog
await container.Entities.processBacklog(true);

await asAlice.get('/v1/projects/1/forms/offlineEntity/submissions/two-update/audits')
.expect(200)
.then(({ body }) => {
body.length.should.equal(4);
body.map(a => a.action).should.eql([
'entity.create',
'submission.backlog.force',
'submission.backlog.hold',
'submission.create'
]);
});


// Finally send create
await asAlice.post('/v1/projects/1/forms/offlineEntity/submissions')
.send(testData.instances.offlineEntity.two)
.set('Content-Type', 'application/xml')
.expect(200);

await exhaust(container);

// The create doesn't go thorugh the backlog so there's no backlog events here
await asAlice.get('/v1/projects/1/forms/offlineEntity/submissions/two/audits')
.expect(200)
.then(({ body }) => {
body.length.should.equal(2);
body.map(a => a.action).should.eql([
'entity.update.version',
'submission.create'
]);
});
}));

it('should include submission.backlog.reprocess event in audit log of held submission', testOfflineEntities(async (service, container) => {
const asAlice = await service.login('alice');
const branchId = uuid();

Expand Down Expand Up @@ -677,11 +774,14 @@ describe('Offline Entities', () => {
await asAlice.get('/v1/projects/1/forms/offlineEntity/submissions/one-update1/audits')
.expect(200)
.then(({ body }) => {
body.length.should.equal(2);
body.map(a => a.action).should.eql([
'entity.update.version',
'submission.backlog.reprocess',
'submission.backlog.hold',
'submission.create'
]);
// actor for update should be the same as submission create actor
body[0].actorId.should.equal(body[3].actorId);
});
}));
});
Expand Down Expand Up @@ -740,8 +840,11 @@ describe('Offline Entities', () => {
await asAlice.get('/v1/projects/1/forms/offlineEntity/submissions/one/audits')
.expect(200)
.then(({ body }) => {
body.length.should.equal(1);
should.not.exist(body[0].details.problem);
body.map(a => a.action).should.eql([
'submission.backlog.hold',
'submission.create'
]);
should.not.exist(body[1].details.problem);
});

// Observe that the update was still not applied.
Expand Down Expand Up @@ -820,8 +923,11 @@ describe('Offline Entities', () => {
await asAlice.get('/v1/projects/1/forms/offlineEntity/submissions/two-update/audits')
.expect(200)
.then(({ body }) => {
body.length.should.equal(1);
should.not.exist(body[0].details.problem);
body.map(a => a.action).should.eql([
'submission.backlog.hold',
'submission.create'
]);
should.not.exist(body[1].details.problem);
});

// There should be one submission (the second one) in the held submissions queue
Expand Down Expand Up @@ -896,9 +1002,13 @@ describe('Offline Entities', () => {
await asAlice.get('/v1/projects/1/forms/offlineEntity/submissions/two-update/audits')
.expect(200)
.then(({ body }) => {
body.length.should.equal(1);
body[0].action.should.eql('submission.create');
should.not.exist(body[0].details.problem);
body.map(a => a.action).should.eql([
'submission.backlog.force',
'submission.backlog.hold',
'submission.create'
]);
body[2].action.should.eql('submission.create');
should.not.exist(body[2].details.problem);
});
}));
});
Expand Down
4 changes: 2 additions & 2 deletions test/integration/other/analytics-queries.js
Original file line number Diff line number Diff line change
Expand Up @@ -1580,7 +1580,7 @@ describe('analytics task queries', function () {
countInterruptedBranches.should.equal(4);
}));

it('should count number of submission.reprocess events (submissions temporarily in the backlog)', testService(async (service, container) => {
it('should count number of submission.backlog.reprocess events (submissions temporarily in the backlog)', testService(async (service, container) => {
await createTestForm(service, container, testData.forms.offlineEntity, 1);

const asAlice = await service.login('alice');
Expand Down Expand Up @@ -1915,7 +1915,7 @@ describe('analytics task queries', function () {
.expect(200);

// switching the order of these updates triggers the
// submission.reprocess count
// submission.backlog.reprocess count
await asAlice.post('/v1/projects/1/forms/offlineEntity/submissions')
.send(testData.instances.offlineEntity.one
.replace('one', 'one-update2')
Expand Down
Loading