Skip to content

Commit

Permalink
Remove optimisticConcurrencyValue (#1414)
Browse files Browse the repository at this point in the history
Add rush change
Add local provider concurrency
  • Loading branch information
gonzalojaubert authored Jun 29, 2023
1 parent 3215284 commit b06f34f
Show file tree
Hide file tree
Showing 14 changed files with 309 additions and 27 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"changes": [
{
"packageName": "@boostercloud/framework-core",
"comment": "Add Local Provider concurrency",
"type": "minor"
}
],
"packageName": "@boostercloud/framework-core"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
import { ApolloClient, gql, NormalizedCacheObject } from '@apollo/client'
import { random } from 'faker'
import { expect } from '../../helper/expect'
import { applicationUnderTest } from './setup'
import 'mocha'
import { ReadModelInterface, UUID } from '@boostercloud/framework-types'
import { waitForIt } from '../../helper/sleep'

describe('Concurrency end-to-end tests', () => {
let client: ApolloClient<NormalizedCacheObject>

before(async () => {
client = applicationUnderTest.graphql.client()
})

context('ReadModels', () => {
describe('With one projection', () => {
it('insert and update generate one ReadModel with version 2', async () => {
const entityId: UUID = random.uuid()

const insertedReadModel = await addConcurrency(client, entityId, 1, 'ConcurrencyReadModel')
expect(insertedReadModel.id).to.be.eq(entityId)
expect(insertedReadModel.boosterMetadata?.version).to.be.eq(1)

const updatedReadModel = await addConcurrency(client, entityId, 2, 'ConcurrencyReadModel')
expect(updatedReadModel.id).to.be.eq(entityId)
expect(updatedReadModel.boosterMetadata?.version).to.be.eq(2)
})
})

describe('With two projections for the same ReadModel', () => {
if (process.env.TESTED_PROVIDER === 'AWS') {
console.log('AWS Provider is not working properly when inserting a ReadModel with two projections') // TODO: Fix AWS Provider
return
}
it('insert and update generate one ReadModel with version 4', async () => {
const entityId: UUID = random.uuid()

const insertedReadModel = await addConcurrency(client, entityId, 2, 'OtherConcurrencyReadModel')
expect(insertedReadModel.id).to.be.eq(entityId)
expect(insertedReadModel.otherId).to.be.eq(entityId)
expect(insertedReadModel.boosterMetadata?.version).to.be.eq(2)

const updatedReadModel = await addConcurrency(client, entityId, 4, 'OtherConcurrencyReadModel')
expect(updatedReadModel.id).to.be.eq(entityId)
expect(updatedReadModel.otherId).to.be.eq(entityId)
expect(updatedReadModel.boosterMetadata?.version).to.be.eq(4)
})
})
})
})

async function addConcurrency(
client: ApolloClient<NormalizedCacheObject>,
entityId: UUID,
expectedVersion: number,
readModelName: string
): Promise<ReadModelInterface> {
await client.mutate({
variables: {
id: entityId,
otherId: entityId,
},
mutation: gql`
mutation AddConcurrency($id: ID!, $otherId: ID!) {
AddConcurrency(input: { id: $id, otherId: $otherId })
}
`,
})

const mutateResult = await waitForIt(
() => {
return client.mutate({
variables: {
id: entityId,
readModelName: readModelName,
},
mutation: gql`
mutation GetConcurrency($id: ID!, $readModelName: String!) {
GetConcurrency(input: { id: $id, readModelName: $readModelName })
}
`,
})
},
(result) =>
result?.data?.GetConcurrency &&
result?.data?.GetConcurrency.length > 0 &&
result?.data?.GetConcurrency[0] &&
(result?.data?.GetConcurrency as Array<ReadModelInterface>).find(
(value: ReadModelInterface) => value.boosterMetadata?.version === expectedVersion
) !== undefined
)

const concurrency = (mutateResult?.data?.GetConcurrency as Array<ReadModelInterface>).find(
(value: ReadModelInterface) => value.boosterMetadata?.version === expectedVersion
)!
expect(concurrency.id).to.be.eq(entityId)
return concurrency
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import { Command } from '@boostercloud/framework-core'
import { Register, UUID } from '@boostercloud/framework-types'
import { ConcurrencyPersisted } from '../events/concurrency-persisted'

export interface ProjectionDetails {
methodName: string
joinKey: keyof AddConcurrency
}

@Command({
authorize: 'all',
})
export class AddConcurrency {
public constructor(readonly id: UUID, readonly otherId: UUID) {}

public static async handle(command: AddConcurrency, register: Register): Promise<void> {
register.events(new ConcurrencyPersisted(command.id, command.otherId))
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import { Booster, Command } from '@boostercloud/framework-core'
import { ReadModelInterface, ReadOnlyNonEmptyArray, Register, UUID } from '@boostercloud/framework-types'

@Command({
authorize: 'all',
})
export class GetConcurrency {
public constructor(readonly id: UUID, readonly readModelName: string) {}

public static async handle(
command: GetConcurrency,
register: Register
): Promise<ReadOnlyNonEmptyArray<ReadModelInterface>> {
const config = Booster.config
return await config.provider.readModels.fetch(config, command.readModelName, command.id)
}
}
19 changes: 19 additions & 0 deletions packages/framework-integration-tests/src/entities/concurrency.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import { Entity, Reduces } from '@boostercloud/framework-core'
import { UUID } from '@boostercloud/framework-types'
import { ConcurrencyPersisted } from '../events/concurrency-persisted'

@Entity({
authorizeReadEvents: 'all',
})
export class Concurrency {
public constructor(readonly id: UUID, readonly otherId: UUID) {}

public getId() {
return this.id
}

@Reduces(ConcurrencyPersisted)
public static persisted(event: ConcurrencyPersisted, currentConcurrency: Concurrency): Concurrency {
return new Concurrency(event.id, event.otherId)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import { Event } from '@boostercloud/framework-core'
import { UUID } from '@boostercloud/framework-types'

@Event
export class ConcurrencyPersisted {
public constructor(readonly id: UUID, readonly otherId: UUID) {}

public entityID(): UUID {
return this.id
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import { Projects, ReadModel } from '@boostercloud/framework-core'
import { ProjectionResult, UUID } from '@boostercloud/framework-types'
import { Concurrency } from '../entities/concurrency'

@ReadModel({
authorize: 'all',
})
export class ConcurrencyReadModel {
public constructor(readonly id: UUID) {}

@Projects(Concurrency, 'id')
public static persisted(
concurrency: Concurrency,
concurrencyReadModel?: ConcurrencyReadModel
): ProjectionResult<ConcurrencyReadModel> {
return new ConcurrencyReadModel(concurrency.id)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import { Projects, ReadModel } from '@boostercloud/framework-core'
import { ProjectionResult, UUID } from '@boostercloud/framework-types'
import { Concurrency } from '../entities/concurrency'

@ReadModel({
authorize: 'all',
})
export class OtherConcurrencyReadModel {
public constructor(readonly id: UUID, readonly otherId: UUID) {}

@Projects(Concurrency, 'id')
public static persisted(
concurrency: Concurrency,
concurrencyReadModel?: OtherConcurrencyReadModel
): ProjectionResult<OtherConcurrencyReadModel> {
return new OtherConcurrencyReadModel(concurrency.id, concurrency.otherId)
}

@Projects(Concurrency, 'otherId')
public static persistedByOtherId(
concurrency: Concurrency,
concurrencyReadModel?: OtherConcurrencyReadModel
): ProjectionResult<OtherConcurrencyReadModel> {
return new OtherConcurrencyReadModel(concurrency.otherId, concurrency.otherId)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,16 +55,24 @@ async function insertReadModel(
id: readModel?.id?.toString(),
} as ItemDefinition

await db
const { resource } = await db
.database(config.resourceNames.applicationStack)
.container(config.resourceNames.forReadModel(readModelName))
.items.create(itemModel)
logger.debug('[ReadModelAdapter#insertReadModel] Read model inserted')
logger.debug(
`Read model ${readModelName} inserted with id ${readModel.id} and metadata ${JSON.stringify(
resource?.boosterMetadata
)}`
)
} catch (err) {
const error = err as Error & { code?: unknown }
// In case of conflict (The ID provided for a resource on a PUT or POST operation has been taken by an existing resource) we should retry it
if (error?.code == AZURE_CONFLICT_ERROR_CODE) {
logger.debug('[ReadModelAdapter#insertReadModel] Read model insert failed with a conflict failure')
logger.warn(
`Read model ${readModelName} insert failed with a conflict failure with id ${
readModel.id
} and metadata ${JSON.stringify(readModel.boosterMetadata)}`
)
throw new OptimisticConcurrencyUnexpectedVersionError(error?.message)
}
logger.error('[ReadModelAdapter#insertReadModel] Read model insert failed without a conflict failure')
Expand All @@ -84,16 +92,24 @@ async function updateReadModel(
accessCondition: { condition: readModel.boosterMetadata?.optimisticConcurrencyValue || '*', type: 'IfMatch' },
} as RequestOptions
try {
await db
const { resource } = await db
.database(config.resourceNames.applicationStack)
.container(config.resourceNames.forReadModel(readModelName))
.items.upsert(readModel, options)
logger.debug('[ReadModelAdapter#updateReadModel] Read model updated')
logger.debug(
`Read model ${readModelName} updated with id ${readModel.id} and metadata ${JSON.stringify(
resource?.boosterMetadata
)}`
)
} catch (err) {
const error = err as Error & { code?: unknown }
// If there is a precondition failure then we should retry it
if (error?.code == AZURE_PRECONDITION_FAILED_ERROR) {
logger.debug('[ReadModelAdapter#updateReadModel] Read model update failed with a pre-condition failure')
logger.warn(
`Read model ${readModelName} update failed with a pre-condition failure with id ${
readModel.id
} and metadata ${JSON.stringify(readModel.boosterMetadata)}`
)
throw new OptimisticConcurrencyUnexpectedVersionError(error?.message)
}
logger.error('[ReadModelAdapter#updateReadModel] Read model update failed without a pre-condition failure')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import {
SortFor,
UUID,
} from '@boostercloud/framework-types'
import { GraphQLService, ReadModelRegistry } from '../services'
import { GraphQLService, NedbError, ReadModelRegistry, UNIQUE_VIOLATED_ERROR_TYPE } from '../services'
import { getLogger } from '@boostercloud/framework-common-helpers'
import { queryRecordFor } from './searcher-adapter'

Expand Down Expand Up @@ -44,17 +44,21 @@ export async function storeReadModel(
config: BoosterConfig,
readModelName: string,
readModel: ReadModelInterface,
_expectedCurrentVersion: number
expectedCurrentVersion: number
): Promise<void> {
const logger = getLogger(config, 'read-model-adapter#storeReadModel')
logger.debug('Storing readModel ' + JSON.stringify(readModel))
try {
await db.store({ typeName: readModelName, value: readModel } as ReadModelEnvelope)
await db.store({ typeName: readModelName, value: readModel } as ReadModelEnvelope, expectedCurrentVersion)
} catch (e) {
const error = e as Error
const error = e as NedbError
// The error will be thrown, but in case of a conditional check, we throw the expected error type by the core
// TODO: verify the name of the exception thrown in Local Provider
if (error.name == 'TODO') {
if (error.errorType == UNIQUE_VIOLATED_ERROR_TYPE) {
logger.warn(
`Unique violated storing ReadModel ${JSON.stringify(
readModel
)} and expectedCurrentVersion ${expectedCurrentVersion}`
)
throw new OptimisticConcurrencyUnexpectedVersionError(error.message)
}
throw e
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,15 @@ interface LocalSortedFor {
[key: string]: number
}

export type NedbError = Error & { [key: string | number | symbol]: unknown }

export const UNIQUE_VIOLATED_ERROR_TYPE = 'uniqueViolated'

export class ReadModelRegistry {
public readonly readModels: DataStore<ReadModelEnvelope> = new DataStore(readModelsDatabase)
constructor() {
this.readModels.loadDatabase()
this.readModels.ensureIndex({ fieldName: 'uniqueKey', unique: true })
}

public async query(
Expand All @@ -35,18 +40,46 @@ export class ReadModelRegistry {
else resolve(docs)
})
)

return queryPromise as Promise<Array<ReadModelEnvelope>>
}

public async store(readModel: ReadModelEnvelope): Promise<void> {
public async store(readModel: ReadModelEnvelope, expectedCurrentVersion: number): Promise<void> {
const uniqueReadModel: ReadModelEnvelope & { uniqueKey?: string } = readModel
uniqueReadModel.uniqueKey = `${readModel.typeName}_${readModel.value.id}_${readModel.value.boosterMetadata?.version}`
if (uniqueReadModel.value.boosterMetadata?.version === 1) {
return this.insert(readModel)
}
return this.update(uniqueReadModel, expectedCurrentVersion)
}

private insert(readModel: ReadModelEnvelope): Promise<void> {
return new Promise((resolve, reject) => {
this.readModels.insert(readModel, (err: unknown) => {
err ? reject(err) : resolve()
})
})
}

private update(readModel: ReadModelEnvelope, expectedCurrentVersion: number): Promise<void> {
return new Promise((resolve, reject) => {
this.readModels.update(
//use nedb dot notation value.id to match the record (see https://github.com/louischatriot/nedb#finding-documents)
{ typeName: readModel.typeName, 'value.id': readModel.value.id },
{
typeName: readModel.typeName,
'value.id': readModel.value.id,
'value.boosterMetadata.version': expectedCurrentVersion,
},
readModel,
{ upsert: true },
(err) => {
{ upsert: false, returnUpdatedDocs: true },
(err: unknown, numAffected: number) => {
if (numAffected === 0) {
const error: NedbError = new Error(
`Can't update readModel ${JSON.stringify(
readModel
)} with expectedCurrentVersion = ${expectedCurrentVersion} . Optimistic concurrency error`
) as NedbError
error.errorType = UNIQUE_VIOLATED_ERROR_TYPE
reject(error)
}
err ? reject(err) : resolve()
}
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ export function createMockReadModelEnvelope(): ReadModelEnvelope {
age: random.number(40),
foo: random.word(),
bar: random.float(),
boosterMetadata: {
version: 1,
schemaVersion: 1,
},
},
typeName: random.word(),
}
Expand Down
Loading

0 comments on commit b06f34f

Please sign in to comment.