Skip to content

Commit

Permalink
Bug.image plugin read error (#181)
Browse files Browse the repository at this point in the history
* [plugins/image] always mark attachments oriented when errors occur so processing does not stall

* [plugins/image] version patch bump to 1.0.4
  • Loading branch information
restjohn authored Sep 29, 2023
1 parent c7d22fd commit 2e2962c
Show file tree
Hide file tree
Showing 4 changed files with 223 additions and 121 deletions.
4 changes: 2 additions & 2 deletions plugins/image/service/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion plugins/image/service/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@ngageoint/mage.image.service",
"version": "1.0.3",
"version": "1.0.4",
"description": "Orient images attached to MAGE observations according to EXIF meta-data and generate configurable size thumbnails.",
"main": "lib/index.js",
"scripts": {
Expand Down
192 changes: 132 additions & 60 deletions plugins/image/service/src/processor.spec.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { EventProcessingState, FindUnprocessedImageAttachments, ImageContent, ImageDescriptor, ImagePluginConfig, ImageService, createImagePluginControl, orientAttachmentImage, processImageAttachments, thumbnailAttachmentImage, UnprocessedAttachmentReference, ImagePluginControl, defaultImagePluginConfig } from './processor'
import { EventProcessingState, FindUnprocessedImageAttachments, ImageContent, ImageDescriptor, ImagePluginConfig, ImageService, createImagePluginControl, orientAttachmentImage, processImageAttachments, thumbnailAttachmentImage, UnprocessedAttachmentReference, ImagePluginControl, defaultImagePluginConfig, AttachmentProcessingResult } from './processor'
import { MageEventRepository, MageEventAttrs, MageEventId, MageEvent, copyMageEventAttrs } from '@ngageoint/mage.service/lib/entities/events/entities.events'
import { Attachment, AttachmentContentPatchAttrs, AttachmentId, AttachmentPatchAttrs, AttachmentStore, AttachmentStoreError, AttachmentStoreErrorCode, copyObservationAttrs, copyThumbnailAttrs, EventScopedObservationRepository, FormEntry, Observation, ObservationAttrs, ObservationId, patchAttachment, StagedAttachmentContent, StagedAttachmentContentId, StagedAttachmentContentRef, Thumbnail, ThumbnailContentPatchAttrs } from '@ngageoint/mage.service/lib/entities/observations/entities.observations'
import stream from 'stream'
Expand All @@ -7,6 +7,7 @@ import { BufferWriteable } from './util.spec'
import { FormFieldType } from '@ngageoint/mage.service/lib/entities/events/entities.events.forms'
import _ from 'lodash'
import { PluginStateRepository } from '@ngageoint/mage.service/lib/plugins.api'
import { FindUnprocessedAttachments } from './adapters.db.mongo'

function minutes(x: number): number {
return 1000 * 60 * x
Expand Down Expand Up @@ -68,7 +69,7 @@ function sameAsObservationWithoutDates(expected: Observation): jasmine.Asymmetri
const { createdAt, lastModified, ...expectedAttrs } = copyObservationAttrs(expected)
expectedAttrs.attachments.forEach(x => delete x.lastModified)
return {
asymmetricMatch(actual: any, matchersUtil: jasmine.MatchersUtil) {
asymmetricMatch(actual: any, matchersUtil: jasmine.MatchersUtil): boolean {
const { createdAt, lastModified, ...actualAttrs } = copyObservationAttrs(actual)
actualAttrs.attachments.forEach(x => delete x.lastModified)
return actual instanceof Observation && matchersUtil.equals(expectedAttrs, actualAttrs)
Expand Down Expand Up @@ -116,7 +117,7 @@ function observationWithAttachments(id: ObservationId, event: MageEvent, attachm

const asyncIterableOf = <T>(items: T[]): AsyncIterable<T> => {
return {
async *[Symbol.asyncIterator]() {
async *[Symbol.asyncIterator](): AsyncGenerator<T> {
for (const item of items) {
yield Promise.resolve(item)
}
Expand All @@ -126,10 +127,10 @@ const asyncIterableOf = <T>(items: T[]): AsyncIterable<T> => {

function closeTo(target: number, delta: number): jasmine.AsymmetricMatcher<number> {
return {
asymmetricMatch(other) {
asymmetricMatch(other): boolean {
return Math.abs(target - other) <= delta
},
jasmineToString(prettyPrint) {
jasmineToString(prettyPrint): string {
return `a number within ${delta} of ${target}`
}
}
Expand All @@ -138,7 +139,7 @@ function closeTo(target: number, delta: number): jasmine.AsymmetricMatcher<numbe
/**
* Within 100 milliseconds of `Date.now()`
*/
const closeToNow = () => closeTo(Date.now(), 100)
const closeToNow = (): jasmine.AsymmetricMatcher<number | Date> => closeTo(Date.now(), 100)

describe('processing interval', () => {

Expand Down Expand Up @@ -175,7 +176,7 @@ describe('processing interval', () => {

describe('orient phase', () => {

it('orients the attachment image and patches the attachment', async () => {
it('orients the attachment image and produces an attachment patch', async () => {

const att: Attachment = Object.freeze({
id: '1.123.1',
Expand All @@ -192,10 +193,7 @@ describe('processing interval', () => {
const originalConstentStream = stream.Readable.from(originalContent)
const stagedContent = new StagedAttachmentContent('stage1', new BufferWriteable())
const obsBefore: Observation = observationWithAttachments('1.123', event1, [ att ])
const obsStored: Observation = patchAttachment(obsBefore, att.id, { size: 321321, width: 1000, height: 1200, oriented: true }) as Observation
const attStored = obsStored.attachmentFor(att.id) as Attachment
const obsRepo = observationRepos.get(event1.id)!
obsRepo.patchAttachment.and.resolveTo(obsStored)
attachmentStore.readContent.and.resolveTo(originalConstentStream)
attachmentStore.stagePendingContent.and.resolveTo(stagedContent)
attachmentStore.saveContent.and.resolveTo({ size: 321321, contentLocator: att.contentLocator! })
Expand All @@ -209,22 +207,18 @@ describe('processing interval', () => {
dimensions: { width: 1000, height: 1200 },
}
})
const oriented = await orientAttachmentImage(obsBefore, att.id, imageService, obsRepo, attachmentStore, console) as Observation
const oriented = await orientAttachmentImage(obsBefore, att.id, imageService, attachmentStore, console)
const orientedContent = stagedContent.tempLocation as BufferWriteable

expect(oriented).toBeInstanceOf(Observation)
expect(oriented.attachments).toEqual([
{ ...att, lastModified: attStored.lastModified, size: 321321, width: 1000, height: 1200, oriented: true, contentLocator: att.contentLocator! }
])
expect(oriented.patch).toEqual({ contentType: 'image/jpeg', size: 321321, width: 1000, height: 1200, oriented: true, contentLocator: att.contentLocator! })
expect(orientedContent.content).toEqual(Buffer.from('photo of penguins'))
expect(obsRepo.patchAttachment).toHaveBeenCalledOnceWith(
obsBefore, att.id, { oriented: true, size: 321321, width: 1000, height: 1200, contentType: 'image/jpeg', contentLocator: att.contentLocator! })
expect(imageService.autoOrient).toHaveBeenCalledOnceWith(jasmine.objectContaining({ bytes: originalConstentStream }), stagedContent.tempLocation)
expect(attachmentStore.saveContent).toHaveBeenCalledOnceWith(stagedContent, att.id, obsBefore)
expect(obsRepo.patchAttachment).not.toHaveBeenCalled()
expect(obsRepo.save).not.toHaveBeenCalled()
})

it('marks the attachment oriented if content does not exist', async () => {
it('does not produce an attachment patch if the content does not exist', async () => {

const att: Attachment = Object.freeze({
id: '1.123.1',
Expand All @@ -238,25 +232,18 @@ describe('processing interval', () => {
contentLocator: String(Date.now())
})
const obsBefore: Observation = observationWithAttachments('1.123', event1, [ att ])
const obsStored: Observation = patchAttachment(obsBefore, att.id, { oriented: true }) as Observation
const attStored = obsStored.attachmentFor(att.id) as Attachment
const obsRepo = observationRepos.get(event1.id)!
obsRepo.patchAttachment.and.resolveTo(obsStored)
attachmentStore.readContent.and.resolveTo(null)
const oriented = await orientAttachmentImage(obsBefore, att.id, imageService, obsRepo, attachmentStore, console) as Observation
const oriented = await orientAttachmentImage(obsBefore, att.id, imageService, attachmentStore, console)

expect(oriented).toBeInstanceOf(Observation)
expect(oriented.attachments).toEqual([
{ ...att, oriented: true, lastModified: attStored.lastModified, width: undefined, height: undefined }
])
expect(obsRepo.patchAttachment).toHaveBeenCalledOnceWith(
obsBefore, att.id, { oriented: true })
expect(oriented.patch).toBeUndefined()
expect(imageService.autoOrient).not.toHaveBeenCalled()
expect(attachmentStore.saveContent).not.toHaveBeenCalled()
expect(obsRepo.patchAttachment).not.toHaveBeenCalled()
expect(obsRepo.save).not.toHaveBeenCalled()
})

it('marks the attachment oriented if reading content fails', async () => {
it('does not produce an attachment patch if reading content fails', async () => {

const att: Attachment = Object.freeze({
id: '1.123.1',
Expand All @@ -270,21 +257,48 @@ describe('processing interval', () => {
contentLocator: String(Date.now())
})
const obsBefore: Observation = observationWithAttachments('1.123', event1, [ att ])
const obsStored: Observation = patchAttachment(obsBefore, att.id, { oriented: true }) as Observation
const attStored = obsStored.attachmentFor(att.id) as Attachment
const obsRepo = observationRepos.get(event1.id)!
obsRepo.patchAttachment.and.resolveTo(obsStored)
attachmentStore.readContent.and.resolveTo(new AttachmentStoreError(AttachmentStoreErrorCode.ContentNotFound))
const oriented = await orientAttachmentImage(obsBefore, att.id, imageService, obsRepo, attachmentStore, console) as Observation
const oriented = await orientAttachmentImage(obsBefore, att.id, imageService, attachmentStore, console)

expect(oriented).toBeInstanceOf(Observation)
expect(oriented.attachments).toEqual([
{ ...att, oriented: true, lastModified: attStored.lastModified, width: undefined, height: undefined }
])
expect(obsRepo.patchAttachment).toHaveBeenCalledOnceWith(
obsBefore, att.id, { oriented: true })
expect(oriented.patch).toBeUndefined()
expect(imageService.autoOrient).not.toHaveBeenCalled()
expect(attachmentStore.saveContent).not.toHaveBeenCalled()
expect(obsRepo.patchAttachment).not.toHaveBeenCalled()
expect(obsRepo.save).not.toHaveBeenCalled()
})

it('does not produce an attachment patch if the image service cannot decode the image', async () => {

const att: Attachment = Object.freeze({
id: '1.123.1',
observationFormId: 'form1',
fieldName: 'field1',
oriented: false,
thumbnails: [],
contentType: 'image/jpeg',
name: 'test1.jpeg',
size: 320000,
contentLocator: String(Date.now())
})
const obsBefore: Observation = observationWithAttachments('1.123', event1, [ att ])
const obsRepo = observationRepos.get(event1.id)!
attachmentStore.readContent.and.resolveTo(stream.Readable.from(Buffer.from('corrupted image')))
const staged: StagedAttachmentContent = {
id: 'nawgonwork',
tempLocation: jasmine.createSpyObj<NodeJS.WritableStream>(
'mockPendingContent',
[ 'write', 'end' ]
)
}
attachmentStore.stagePendingContent.and.resolveTo(staged)
imageService.autoOrient.and.resolveTo(new Error('wut is this'))
const oriented = await orientAttachmentImage(obsBefore, att.id, imageService, attachmentStore, console)

expect(oriented.patch).toBeUndefined()
expect(attachmentStore.saveContent).not.toHaveBeenCalled()
expect(staged.tempLocation.write).not.toHaveBeenCalled()
expect(obsRepo.patchAttachment).not.toHaveBeenCalled()
expect(obsRepo.save).not.toHaveBeenCalled()
})

Expand All @@ -294,7 +308,7 @@ describe('processing interval', () => {

describe('thumbnail phase', () => {

it('generates the specified thumbnails and patches the attachment once', async () => {
it('generates the specified thumbnails and produces the attachment patch', async () => {

const att: Attachment = Object.freeze({
id: '1.456.1',
Expand All @@ -314,7 +328,7 @@ describe('processing interval', () => {
constructor(readonly metaData: Thumbnail, stagedContentId: StagedAttachmentContentId) {
this.stagedContent = new StagedAttachmentContent(stagedContentId, new BufferWriteable())
}
get stagedContentBytes() { return (this.stagedContent.tempLocation as BufferWriteable).content }
get stagedContentBytes(): Buffer { return (this.stagedContent.tempLocation as BufferWriteable).content }
}
const salt = String(Date.now())
const expectedThumbs = new Map<number, ExpectedThumb>()
Expand Down Expand Up @@ -349,26 +363,12 @@ describe('processing interval', () => {
sizeInBytes: minDimension * 100
}
})
obsRepo.patchAttachment.and.callFake(async (obs, attId, patch) => {
return patchAttachment(obs, attId, patch) as Observation
})
const obsActual = await thumbnailAttachmentImage(obsBefore, att.id, [ 60, 120, 240 ], imageService, obsRepo, attachmentStore, console) as Observation

expect(copyObservationAttrs(obsActual)).toEqual(
{
...copyObservationAttrs(obsBefore),
lastModified: jasmine.any(Date),
attachments: [
{ ...att, lastModified: jasmine.any(Date), thumbnails: Array.from(expectedThumbs.values()).map(x => x.metaData) }
]
}
)

const thumbResult = await thumbnailAttachmentImage(obsBefore, att.id, [ 60, 120, 240 ], imageService, attachmentStore, console)
expect(thumbResult.patch).toEqual({ thumbnails: Array.from(expectedThumbs.values()).map(x => x.metaData) })
expect(expectedThumbs.get(60)?.stagedContentBytes.toString()).toEqual('big majestic mountains @60')
expect(expectedThumbs.get(120)?.stagedContentBytes.toString()).toEqual('big majestic mountains @120')
expect(expectedThumbs.get(240)?.stagedContentBytes.toString()).toEqual('big majestic mountains @240')
expect(obsRepo.patchAttachment).toHaveBeenCalledOnceWith(obsBefore, att.id, {
thumbnails: Array.from(expectedThumbs.values()).map(x => x.metaData)
})
expect(attachmentStore.stagePendingContent).toHaveBeenCalledTimes(3)
expect(attachmentStore.readContent).toHaveBeenCalledTimes(3)
expect(attachmentStore.readContent.calls.argsFor(0)).toEqual([ att.id, obsBefore ])
Expand All @@ -383,6 +383,7 @@ describe('processing interval', () => {
expect(attachmentStore.saveThumbnailContent).toHaveBeenCalledWith(expectedThumbs.get(120)?.stagedContent, 120, att.id, sameAsObservationWithoutDates(obsStaged))
expect(attachmentStore.saveThumbnailContent).toHaveBeenCalledWith(expectedThumbs.get(240)?.stagedContent, 240, att.id, sameAsObservationWithoutDates(obsStaged))
expect(attachmentStore.saveContent).not.toHaveBeenCalled()
expect(obsRepo.patchAttachment).not.toHaveBeenCalled()
expect(obsRepo.save).not.toHaveBeenCalled()
})

Expand Down Expand Up @@ -791,7 +792,7 @@ describe('processing interval', () => {
processed.processed = true
return processed
},
async *[Symbol.asyncIterator]() {
async *[Symbol.asyncIterator](): AsyncGenerator<TestUnprocessed> {
while (this.cursor < unprocessedAttachments.length - 1) {
if (this.cursor < 0 || this.current?.processed === true) {
this.cursor += 1
Expand Down Expand Up @@ -836,6 +837,77 @@ describe('processing interval', () => {
}
})

it('marks attachment oriented when orient phase does not produce a patch', async () => {

const eventProcessingStates = new Map<MageEventId, EventProcessingState>([
{ event: copyMageEventAttrs(event1), latestAttachmentProcessedTimestamp: Date.now() - 1000 * 60 * 5 },
{ event: copyMageEventAttrs(event2), latestAttachmentProcessedTimestamp: Date.now() - 1000 * 60 * 2 },
].map(x => [ x.event.id, x ]))
const pluginState: ImagePluginConfig = {
enabled: true,
intervalSeconds: 60,
intervalBatchSize: 1000,
thumbnailSizes: []
}
const attachment1: Attachment = {
id: '1.100.1',
fieldName: 'field1',
lastModified: new Date(minutesAgo(6)),
observationFormId: 'form1',
oriented: false,
thumbnails: [],
}
const attachment2: Attachment = {
id: '2.200.1',
fieldName: 'field1',
lastModified: new Date(),
observationFormId: 'form1',
oriented: false,
thumbnails: []
}
const obs1 = observationWithAttachments('1.100', event1, [ attachment1 ])
const obs2 = observationWithAttachments('2.200', event2, [ attachment2 ])
const unprocessedAttachments: UnprocessedAttachmentReference[] = [
{ eventId: obs1.eventId, observationId: obs1.id, attachmentId: attachment1.id },
{ eventId: obs2.eventId, observationId: obs2.id, attachmentId: attachment2.id }
]
const findUnprocessedAttachments = jasmine.createSpy<FindUnprocessedImageAttachments>('findAttachments')
.and.resolveTo(asyncIterableOf(unprocessedAttachments))
const invalidImageBytes = stream.Readable.from(Buffer.from('wut is this'))
const validImageBytes = stream.Readable.from(Buffer.from('goats.png'))
observationRepos.get(event1.id)?.findById.withArgs(obs1.id).and.resolveTo(obs1)
observationRepos.get(event2.id)?.findById.withArgs(obs2.id).and.resolveTo(obs2)
observationRepos.get(event1.id)?.patchAttachment.and.resolveTo(obs1)
observationRepos.get(event2.id)?.patchAttachment.and.resolveTo(obs2)
attachmentStore.readContent.withArgs(jasmine.stringMatching(attachment1.id), jasmine.anything()).and.resolveTo(invalidImageBytes)
attachmentStore.readContent.withArgs(jasmine.stringMatching(attachment2.id), jasmine.anything()).and.resolveTo(validImageBytes)
attachmentStore.stagePendingContent.and.resolveTo({
tempLocation: new BufferWriteable(),
id: 'pending'
})
imageService.autoOrient.and.callFake(async (source: ImageContent, dest: NodeJS.WritableStream) => {
if (source.bytes === invalidImageBytes) {
return new Error('bad image data')
}
return await Promise.resolve<Required<ImageDescriptor>>({
mediaType: `image/png`,
dimensions: { width: 100, height: 120 },
sizeInBytes: 10000
})
})
await processImageAttachments(pluginState, eventProcessingStates,
findUnprocessedAttachments, imageService, eventRepo, observationRepoForEvent, attachmentStore, console)

expect(observationRepos.get(event1.id)?.patchAttachment).toHaveBeenCalledOnceWith(obs1, attachment1.id, { oriented: true })
expect(observationRepos.get(event2.id)?.patchAttachment).toHaveBeenCalledOnceWith(obs2, attachment2.id, {
oriented: true,
contentType: 'image/png',
size: 10000,
width: 100,
height: 120,
})
})

it('does not generate thumbnails if orient fails')

it('updates the attachment after orienting and creating thumbnails')
Expand Down Expand Up @@ -939,7 +1011,7 @@ class BufferAttachmentStore implements AttachmentStore {
* @param count
* @returns
*/
function someRunLoops(count: number = 100): Promise<void> {
function someRunLoops(count = 100): Promise<void> {
return new Promise(function waitRemaining(resolve: () => any, reject: (err: any) => any, remaining: number = count) {
if (remaining === 0) {
return resolve()
Expand Down
Loading

0 comments on commit 2e2962c

Please sign in to comment.