diff --git a/common/changes/@boostercloud/framework-core/local_concurrency_2023-06-06-14-44.json b/common/changes/@boostercloud/framework-core/local_concurrency_2023-06-06-14-44.json new file mode 100644 index 000000000..dcaecfc37 --- /dev/null +++ b/common/changes/@boostercloud/framework-core/local_concurrency_2023-06-06-14-44.json @@ -0,0 +1,10 @@ +{ + "changes": [ + { + "packageName": "@boostercloud/framework-core", + "comment": "Add Local Provider concurrency", + "type": "minor" + } + ], + "packageName": "@boostercloud/framework-core" +} \ No newline at end of file diff --git a/packages/framework-integration-tests/integration/provider-unaware/end-to-end/read-model-concurrency.integration.ts b/packages/framework-integration-tests/integration/provider-unaware/end-to-end/read-model-concurrency.integration.ts new file mode 100644 index 000000000..08275a7b7 --- /dev/null +++ b/packages/framework-integration-tests/integration/provider-unaware/end-to-end/read-model-concurrency.integration.ts @@ -0,0 +1,99 @@ +import { ApolloClient, gql, NormalizedCacheObject } from '@apollo/client' +import { random } from 'faker' +import { expect } from '../../helper/expect' +import { applicationUnderTest } from './setup' +import 'mocha' +import { ReadModelInterface, UUID } from '@boostercloud/framework-types' +import { waitForIt } from '../../helper/sleep' + +describe('Concurrency end-to-end tests', () => { + let client: ApolloClient + + before(async () => { + client = applicationUnderTest.graphql.client() + }) + + context('ReadModels', () => { + describe('With one projection', () => { + it('insert and update generate one ReadModel with version 2', async () => { + const entityId: UUID = random.uuid() + + const insertedReadModel = await addConcurrency(client, entityId, 1, 'ConcurrencyReadModel') + expect(insertedReadModel.id).to.be.eq(entityId) + expect(insertedReadModel.boosterMetadata?.version).to.be.eq(1) + + const updatedReadModel = await addConcurrency(client, entityId, 2, 'ConcurrencyReadModel') + expect(updatedReadModel.id).to.be.eq(entityId) + expect(updatedReadModel.boosterMetadata?.version).to.be.eq(2) + }) + }) + + describe('With two projections for the same ReadModel', () => { + if (process.env.TESTED_PROVIDER === 'AWS') { + console.log('AWS Provider is not working properly when inserting a ReadModel with two projections') // TODO: Fix AWS Provider + return + } + it('insert and update generate one ReadModel with version 4', async () => { + const entityId: UUID = random.uuid() + + const insertedReadModel = await addConcurrency(client, entityId, 2, 'OtherConcurrencyReadModel') + expect(insertedReadModel.id).to.be.eq(entityId) + expect(insertedReadModel.otherId).to.be.eq(entityId) + expect(insertedReadModel.boosterMetadata?.version).to.be.eq(2) + + const updatedReadModel = await addConcurrency(client, entityId, 4, 'OtherConcurrencyReadModel') + expect(updatedReadModel.id).to.be.eq(entityId) + expect(updatedReadModel.otherId).to.be.eq(entityId) + expect(updatedReadModel.boosterMetadata?.version).to.be.eq(4) + }) + }) + }) +}) + +async function addConcurrency( + client: ApolloClient, + entityId: UUID, + expectedVersion: number, + readModelName: string +): Promise { + await client.mutate({ + variables: { + id: entityId, + otherId: entityId, + }, + mutation: gql` + mutation AddConcurrency($id: ID!, $otherId: ID!) { + AddConcurrency(input: { id: $id, otherId: $otherId }) + } + `, + }) + + const mutateResult = await waitForIt( + () => { + return client.mutate({ + variables: { + id: entityId, + readModelName: readModelName, + }, + mutation: gql` + mutation GetConcurrency($id: ID!, $readModelName: String!) { + GetConcurrency(input: { id: $id, readModelName: $readModelName }) + } + `, + }) + }, + (result) => + result?.data?.GetConcurrency && + result?.data?.GetConcurrency.length > 0 && + result?.data?.GetConcurrency[0] && + (result?.data?.GetConcurrency as Array).find( + (value: ReadModelInterface) => value.boosterMetadata?.version === expectedVersion + ) !== undefined + ) + + const concurrency = (mutateResult?.data?.GetConcurrency as Array).find( + (value: ReadModelInterface) => value.boosterMetadata?.version === expectedVersion + )! + expect(concurrency.id).to.be.eq(entityId) + return concurrency +} diff --git a/packages/framework-integration-tests/src/commands/add-concurrency.ts b/packages/framework-integration-tests/src/commands/add-concurrency.ts new file mode 100644 index 000000000..2c5e33aa7 --- /dev/null +++ b/packages/framework-integration-tests/src/commands/add-concurrency.ts @@ -0,0 +1,19 @@ +import { Command } from '@boostercloud/framework-core' +import { Register, UUID } from '@boostercloud/framework-types' +import { ConcurrencyPersisted } from '../events/concurrency-persisted' + +export interface ProjectionDetails { + methodName: string + joinKey: keyof AddConcurrency +} + +@Command({ + authorize: 'all', +}) +export class AddConcurrency { + public constructor(readonly id: UUID, readonly otherId: UUID) {} + + public static async handle(command: AddConcurrency, register: Register): Promise { + register.events(new ConcurrencyPersisted(command.id, command.otherId)) + } +} diff --git a/packages/framework-integration-tests/src/commands/get-concurrency.ts b/packages/framework-integration-tests/src/commands/get-concurrency.ts new file mode 100644 index 000000000..228d7ee11 --- /dev/null +++ b/packages/framework-integration-tests/src/commands/get-concurrency.ts @@ -0,0 +1,17 @@ +import { Booster, Command } from '@boostercloud/framework-core' +import { ReadModelInterface, ReadOnlyNonEmptyArray, Register, UUID } from '@boostercloud/framework-types' + +@Command({ + authorize: 'all', +}) +export class GetConcurrency { + public constructor(readonly id: UUID, readonly readModelName: string) {} + + public static async handle( + command: GetConcurrency, + register: Register + ): Promise> { + const config = Booster.config + return await config.provider.readModels.fetch(config, command.readModelName, command.id) + } +} diff --git a/packages/framework-integration-tests/src/entities/concurrency.ts b/packages/framework-integration-tests/src/entities/concurrency.ts new file mode 100644 index 000000000..7e3890e25 --- /dev/null +++ b/packages/framework-integration-tests/src/entities/concurrency.ts @@ -0,0 +1,19 @@ +import { Entity, Reduces } from '@boostercloud/framework-core' +import { UUID } from '@boostercloud/framework-types' +import { ConcurrencyPersisted } from '../events/concurrency-persisted' + +@Entity({ + authorizeReadEvents: 'all', +}) +export class Concurrency { + public constructor(readonly id: UUID, readonly otherId: UUID) {} + + public getId() { + return this.id + } + + @Reduces(ConcurrencyPersisted) + public static persisted(event: ConcurrencyPersisted, currentConcurrency: Concurrency): Concurrency { + return new Concurrency(event.id, event.otherId) + } +} diff --git a/packages/framework-integration-tests/src/events/concurrency-persisted.ts b/packages/framework-integration-tests/src/events/concurrency-persisted.ts new file mode 100644 index 000000000..caaa6b477 --- /dev/null +++ b/packages/framework-integration-tests/src/events/concurrency-persisted.ts @@ -0,0 +1,11 @@ +import { Event } from '@boostercloud/framework-core' +import { UUID } from '@boostercloud/framework-types' + +@Event +export class ConcurrencyPersisted { + public constructor(readonly id: UUID, readonly otherId: UUID) {} + + public entityID(): UUID { + return this.id + } +} diff --git a/packages/framework-integration-tests/src/read-models/concurrency-read-model.ts b/packages/framework-integration-tests/src/read-models/concurrency-read-model.ts new file mode 100644 index 000000000..b056d2979 --- /dev/null +++ b/packages/framework-integration-tests/src/read-models/concurrency-read-model.ts @@ -0,0 +1,18 @@ +import { Projects, ReadModel } from '@boostercloud/framework-core' +import { ProjectionResult, UUID } from '@boostercloud/framework-types' +import { Concurrency } from '../entities/concurrency' + +@ReadModel({ + authorize: 'all', +}) +export class ConcurrencyReadModel { + public constructor(readonly id: UUID) {} + + @Projects(Concurrency, 'id') + public static persisted( + concurrency: Concurrency, + concurrencyReadModel?: ConcurrencyReadModel + ): ProjectionResult { + return new ConcurrencyReadModel(concurrency.id) + } +} diff --git a/packages/framework-integration-tests/src/read-models/other-concurrency-read-model.ts b/packages/framework-integration-tests/src/read-models/other-concurrency-read-model.ts new file mode 100644 index 000000000..71649e39b --- /dev/null +++ b/packages/framework-integration-tests/src/read-models/other-concurrency-read-model.ts @@ -0,0 +1,26 @@ +import { Projects, ReadModel } from '@boostercloud/framework-core' +import { ProjectionResult, UUID } from '@boostercloud/framework-types' +import { Concurrency } from '../entities/concurrency' + +@ReadModel({ + authorize: 'all', +}) +export class OtherConcurrencyReadModel { + public constructor(readonly id: UUID, readonly otherId: UUID) {} + + @Projects(Concurrency, 'id') + public static persisted( + concurrency: Concurrency, + concurrencyReadModel?: OtherConcurrencyReadModel + ): ProjectionResult { + return new OtherConcurrencyReadModel(concurrency.id, concurrency.otherId) + } + + @Projects(Concurrency, 'otherId') + public static persistedByOtherId( + concurrency: Concurrency, + concurrencyReadModel?: OtherConcurrencyReadModel + ): ProjectionResult { + return new OtherConcurrencyReadModel(concurrency.otherId, concurrency.otherId) + } +} diff --git a/packages/framework-provider-azure/src/library/read-model-adapter.ts b/packages/framework-provider-azure/src/library/read-model-adapter.ts index 139245f18..28f52250f 100644 --- a/packages/framework-provider-azure/src/library/read-model-adapter.ts +++ b/packages/framework-provider-azure/src/library/read-model-adapter.ts @@ -55,16 +55,24 @@ async function insertReadModel( id: readModel?.id?.toString(), } as ItemDefinition - await db + const { resource } = await db .database(config.resourceNames.applicationStack) .container(config.resourceNames.forReadModel(readModelName)) .items.create(itemModel) - logger.debug('[ReadModelAdapter#insertReadModel] Read model inserted') + logger.debug( + `Read model ${readModelName} inserted with id ${readModel.id} and metadata ${JSON.stringify( + resource?.boosterMetadata + )}` + ) } catch (err) { const error = err as Error & { code?: unknown } // In case of conflict (The ID provided for a resource on a PUT or POST operation has been taken by an existing resource) we should retry it if (error?.code == AZURE_CONFLICT_ERROR_CODE) { - logger.debug('[ReadModelAdapter#insertReadModel] Read model insert failed with a conflict failure') + logger.warn( + `Read model ${readModelName} insert failed with a conflict failure with id ${ + readModel.id + } and metadata ${JSON.stringify(readModel.boosterMetadata)}` + ) throw new OptimisticConcurrencyUnexpectedVersionError(error?.message) } logger.error('[ReadModelAdapter#insertReadModel] Read model insert failed without a conflict failure') @@ -84,16 +92,24 @@ async function updateReadModel( accessCondition: { condition: readModel.boosterMetadata?.optimisticConcurrencyValue || '*', type: 'IfMatch' }, } as RequestOptions try { - await db + const { resource } = await db .database(config.resourceNames.applicationStack) .container(config.resourceNames.forReadModel(readModelName)) .items.upsert(readModel, options) - logger.debug('[ReadModelAdapter#updateReadModel] Read model updated') + logger.debug( + `Read model ${readModelName} updated with id ${readModel.id} and metadata ${JSON.stringify( + resource?.boosterMetadata + )}` + ) } catch (err) { const error = err as Error & { code?: unknown } // If there is a precondition failure then we should retry it if (error?.code == AZURE_PRECONDITION_FAILED_ERROR) { - logger.debug('[ReadModelAdapter#updateReadModel] Read model update failed with a pre-condition failure') + logger.warn( + `Read model ${readModelName} update failed with a pre-condition failure with id ${ + readModel.id + } and metadata ${JSON.stringify(readModel.boosterMetadata)}` + ) throw new OptimisticConcurrencyUnexpectedVersionError(error?.message) } logger.error('[ReadModelAdapter#updateReadModel] Read model update failed without a pre-condition failure') diff --git a/packages/framework-provider-local/src/library/read-model-adapter.ts b/packages/framework-provider-local/src/library/read-model-adapter.ts index 86a4dc652..2272bacec 100644 --- a/packages/framework-provider-local/src/library/read-model-adapter.ts +++ b/packages/framework-provider-local/src/library/read-model-adapter.ts @@ -9,7 +9,7 @@ import { SortFor, UUID, } from '@boostercloud/framework-types' -import { GraphQLService, ReadModelRegistry } from '../services' +import { GraphQLService, NedbError, ReadModelRegistry, UNIQUE_VIOLATED_ERROR_TYPE } from '../services' import { getLogger } from '@boostercloud/framework-common-helpers' import { queryRecordFor } from './searcher-adapter' @@ -44,17 +44,21 @@ export async function storeReadModel( config: BoosterConfig, readModelName: string, readModel: ReadModelInterface, - _expectedCurrentVersion: number + expectedCurrentVersion: number ): Promise { const logger = getLogger(config, 'read-model-adapter#storeReadModel') logger.debug('Storing readModel ' + JSON.stringify(readModel)) try { - await db.store({ typeName: readModelName, value: readModel } as ReadModelEnvelope) + await db.store({ typeName: readModelName, value: readModel } as ReadModelEnvelope, expectedCurrentVersion) } catch (e) { - const error = e as Error + const error = e as NedbError // The error will be thrown, but in case of a conditional check, we throw the expected error type by the core - // TODO: verify the name of the exception thrown in Local Provider - if (error.name == 'TODO') { + if (error.errorType == UNIQUE_VIOLATED_ERROR_TYPE) { + logger.warn( + `Unique violated storing ReadModel ${JSON.stringify( + readModel + )} and expectedCurrentVersion ${expectedCurrentVersion}` + ) throw new OptimisticConcurrencyUnexpectedVersionError(error.message) } throw e diff --git a/packages/framework-provider-local/src/services/read-model-registry.ts b/packages/framework-provider-local/src/services/read-model-registry.ts index 757884e84..c7b5afa2b 100644 --- a/packages/framework-provider-local/src/services/read-model-registry.ts +++ b/packages/framework-provider-local/src/services/read-model-registry.ts @@ -6,10 +6,15 @@ interface LocalSortedFor { [key: string]: number } +export type NedbError = Error & { [key: string | number | symbol]: unknown } + +export const UNIQUE_VIOLATED_ERROR_TYPE = 'uniqueViolated' + export class ReadModelRegistry { public readonly readModels: DataStore = new DataStore(readModelsDatabase) constructor() { this.readModels.loadDatabase() + this.readModels.ensureIndex({ fieldName: 'uniqueKey', unique: true }) } public async query( @@ -35,18 +40,46 @@ export class ReadModelRegistry { else resolve(docs) }) ) - return queryPromise as Promise> } - public async store(readModel: ReadModelEnvelope): Promise { + public async store(readModel: ReadModelEnvelope, expectedCurrentVersion: number): Promise { + const uniqueReadModel: ReadModelEnvelope & { uniqueKey?: string } = readModel + uniqueReadModel.uniqueKey = `${readModel.typeName}_${readModel.value.id}_${readModel.value.boosterMetadata?.version}` + if (uniqueReadModel.value.boosterMetadata?.version === 1) { + return this.insert(readModel) + } + return this.update(uniqueReadModel, expectedCurrentVersion) + } + + private insert(readModel: ReadModelEnvelope): Promise { + return new Promise((resolve, reject) => { + this.readModels.insert(readModel, (err: unknown) => { + err ? reject(err) : resolve() + }) + }) + } + + private update(readModel: ReadModelEnvelope, expectedCurrentVersion: number): Promise { return new Promise((resolve, reject) => { this.readModels.update( - //use nedb dot notation value.id to match the record (see https://github.com/louischatriot/nedb#finding-documents) - { typeName: readModel.typeName, 'value.id': readModel.value.id }, + { + typeName: readModel.typeName, + 'value.id': readModel.value.id, + 'value.boosterMetadata.version': expectedCurrentVersion, + }, readModel, - { upsert: true }, - (err) => { + { upsert: false, returnUpdatedDocs: true }, + (err: unknown, numAffected: number) => { + if (numAffected === 0) { + const error: NedbError = new Error( + `Can't update readModel ${JSON.stringify( + readModel + )} with expectedCurrentVersion = ${expectedCurrentVersion} . Optimistic concurrency error` + ) as NedbError + error.errorType = UNIQUE_VIOLATED_ERROR_TYPE + reject(error) + } err ? reject(err) : resolve() } ) diff --git a/packages/framework-provider-local/test/helpers/read-model-helper.ts b/packages/framework-provider-local/test/helpers/read-model-helper.ts index b0fa2140e..57f257da5 100644 --- a/packages/framework-provider-local/test/helpers/read-model-helper.ts +++ b/packages/framework-provider-local/test/helpers/read-model-helper.ts @@ -9,6 +9,10 @@ export function createMockReadModelEnvelope(): ReadModelEnvelope { age: random.number(40), foo: random.word(), bar: random.float(), + boosterMetadata: { + version: 1, + schemaVersion: 1, + }, }, typeName: random.word(), } diff --git a/packages/framework-provider-local/test/library/read-model-adapter.test.ts b/packages/framework-provider-local/test/library/read-model-adapter.test.ts index 8f9a4750f..4bfbf96f3 100644 --- a/packages/framework-provider-local/test/library/read-model-adapter.test.ts +++ b/packages/framework-provider-local/test/library/read-model-adapter.test.ts @@ -41,9 +41,9 @@ async function storeMock( const mockUserApp: UserApp = {} as any const graphQLService = new GraphQLService(mockUserApp) stub(graphQLService, 'handleNotificationSubscription') - // @ts-ignore await storeReadModel( graphQLService, + // @ts-ignore mockReadModelRegistry, mockConfig, mockReadModel.typeName, @@ -187,7 +187,7 @@ describe('read-models-adapter', () => { }) it('should call read model registry store', () => { - expect(storeStub).to.have.been.calledWithExactly(mockReadModel) + expect(storeStub).to.have.been.calledWithExactly(mockReadModel, 1) }) it('should log the right debug message', () => { diff --git a/packages/framework-provider-local/test/services/read-model-registry.test.ts b/packages/framework-provider-local/test/services/read-model-registry.test.ts index cd39c03eb..6e7dd57eb 100644 --- a/packages/framework-provider-local/test/services/read-model-registry.test.ts +++ b/packages/framework-provider-local/test/services/read-model-registry.test.ts @@ -34,13 +34,13 @@ describe('the read model registry', () => { const publishPromises: Array> = [] for (let i = 0; i < initialReadModelsCount; i++) { - publishPromises.push(readModelRegistry.store(createMockReadModelEnvelope())) + publishPromises.push(readModelRegistry.store(createMockReadModelEnvelope(), 0)) } await Promise.all(publishPromises) mockReadModel = createMockReadModelEnvelope() - await readModelRegistry.store(mockReadModel) + await readModelRegistry.store(mockReadModel, 1) }) it('should return expected read model', async () => { @@ -189,7 +189,7 @@ describe('the read model registry', () => { readModelRegistry.readModels.remove = stub().yields(null, mockReadModelEnvelope) - await readModelRegistry.store(mockReadModelEnvelope) + await readModelRegistry.store(mockReadModelEnvelope, 1) await readModelRegistry.deleteById(id, mockReadModelEnvelope.typeName) expect(readModelRegistry.readModels.remove).to.have.been.calledWith( @@ -202,13 +202,19 @@ describe('the read model registry', () => { describe('the store method', () => { it('should upsert read models into the read models database', async () => { const readModel: ReadModelEnvelope = createMockReadModelEnvelope() - const expectedQuery = { typeName: readModel.typeName, 'value.id': readModel.value.id } + readModel.value.boosterMetadata!.version = 2 + const expectedQuery = { + typeName: readModel.typeName, + 'value.id': readModel.value.id, + 'value.boosterMetadata.version': 2, + } readModelRegistry.readModels.update = stub().yields(null, readModel) - await readModelRegistry.store(readModel) + await readModelRegistry.store(readModel, 2) expect(readModelRegistry.readModels.update).to.have.been.calledWith(expectedQuery, readModel, { - upsert: true, + upsert: false, + returnUpdatedDocs: true, }) }) @@ -224,7 +230,7 @@ describe('the read model registry', () => { readModelRegistry.readModels.update = stub().yields(error, null) - void expect(readModelRegistry.store(readModel)).to.be.rejectedWith(error) + void expect(readModelRegistry.store(readModel, 1)).to.be.rejectedWith(error) }) }) })