Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Local Provider optimistic concurrency support #1414

Merged
merged 1 commit into from
Jun 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"changes": [
{
"packageName": "@boostercloud/framework-core",
"comment": "Add Local Provider concurrency",
"type": "minor"
}
],
"packageName": "@boostercloud/framework-core"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
import { ApolloClient, gql, NormalizedCacheObject } from '@apollo/client'
import { random } from 'faker'
import { expect } from '../../helper/expect'
import { applicationUnderTest } from './setup'
import 'mocha'
import { ReadModelInterface, UUID } from '@boostercloud/framework-types'
import { waitForIt } from '../../helper/sleep'

describe('Concurrency end-to-end tests', () => {
let client: ApolloClient<NormalizedCacheObject>

before(async () => {
client = applicationUnderTest.graphql.client()
})

context('ReadModels', () => {
describe('With one projection', () => {
it('insert and update generate one ReadModel with version 2', async () => {
const entityId: UUID = random.uuid()

const insertedReadModel = await addConcurrency(client, entityId, 1, 'ConcurrencyReadModel')
expect(insertedReadModel.id).to.be.eq(entityId)
expect(insertedReadModel.boosterMetadata?.version).to.be.eq(1)

const updatedReadModel = await addConcurrency(client, entityId, 2, 'ConcurrencyReadModel')
expect(updatedReadModel.id).to.be.eq(entityId)
expect(updatedReadModel.boosterMetadata?.version).to.be.eq(2)
})
})

describe('With two projections for the same ReadModel', () => {
if (process.env.TESTED_PROVIDER === 'AWS') {
console.log('AWS Provider is not working properly when inserting a ReadModel with two projections') // TODO: Fix AWS Provider
return
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should create an issue to track this and add details

}
it('insert and update generate one ReadModel with version 4', async () => {
const entityId: UUID = random.uuid()

const insertedReadModel = await addConcurrency(client, entityId, 2, 'OtherConcurrencyReadModel')
expect(insertedReadModel.id).to.be.eq(entityId)
expect(insertedReadModel.otherId).to.be.eq(entityId)
expect(insertedReadModel.boosterMetadata?.version).to.be.eq(2)

const updatedReadModel = await addConcurrency(client, entityId, 4, 'OtherConcurrencyReadModel')
expect(updatedReadModel.id).to.be.eq(entityId)
expect(updatedReadModel.otherId).to.be.eq(entityId)
expect(updatedReadModel.boosterMetadata?.version).to.be.eq(4)
})
})
})
})

async function addConcurrency(
client: ApolloClient<NormalizedCacheObject>,
entityId: UUID,
expectedVersion: number,
readModelName: string
): Promise<ReadModelInterface> {
await client.mutate({
variables: {
id: entityId,
otherId: entityId,
},
mutation: gql`
mutation AddConcurrency($id: ID!, $otherId: ID!) {
AddConcurrency(input: { id: $id, otherId: $otherId })
}
`,
})

const mutateResult = await waitForIt(
() => {
return client.mutate({
variables: {
id: entityId,
readModelName: readModelName,
},
mutation: gql`
mutation GetConcurrency($id: ID!, $readModelName: String!) {
GetConcurrency(input: { id: $id, readModelName: $readModelName })
}
`,
})
},
(result) =>
result?.data?.GetConcurrency &&
result?.data?.GetConcurrency.length > 0 &&
result?.data?.GetConcurrency[0] &&
(result?.data?.GetConcurrency as Array<ReadModelInterface>).find(
(value: ReadModelInterface) => value.boosterMetadata?.version === expectedVersion
) !== undefined
)

const concurrency = (mutateResult?.data?.GetConcurrency as Array<ReadModelInterface>).find(
(value: ReadModelInterface) => value.boosterMetadata?.version === expectedVersion
)!
expect(concurrency.id).to.be.eq(entityId)
return concurrency
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import { Command } from '@boostercloud/framework-core'
import { Register, UUID } from '@boostercloud/framework-types'
import { ConcurrencyPersisted } from '../events/concurrency-persisted'

export interface ProjectionDetails {
methodName: string
joinKey: keyof AddConcurrency
}

@Command({
authorize: 'all',
})
export class AddConcurrency {
public constructor(readonly id: UUID, readonly otherId: UUID) {}

public static async handle(command: AddConcurrency, register: Register): Promise<void> {
register.events(new ConcurrencyPersisted(command.id, command.otherId))
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import { Booster, Command } from '@boostercloud/framework-core'
import { ReadModelInterface, ReadOnlyNonEmptyArray, Register, UUID } from '@boostercloud/framework-types'

@Command({
authorize: 'all',
})
export class GetConcurrency {
public constructor(readonly id: UUID, readonly readModelName: string) {}

public static async handle(
command: GetConcurrency,
register: Register
): Promise<ReadOnlyNonEmptyArray<ReadModelInterface>> {
const config = Booster.config
return await config.provider.readModels.fetch(config, command.readModelName, command.id)
}
}
19 changes: 19 additions & 0 deletions packages/framework-integration-tests/src/entities/concurrency.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import { Entity, Reduces } from '@boostercloud/framework-core'
import { UUID } from '@boostercloud/framework-types'
import { ConcurrencyPersisted } from '../events/concurrency-persisted'

@Entity({
authorizeReadEvents: 'all',
})
export class Concurrency {
public constructor(readonly id: UUID, readonly otherId: UUID) {}

public getId() {
return this.id
}

@Reduces(ConcurrencyPersisted)
public static persisted(event: ConcurrencyPersisted, currentConcurrency: Concurrency): Concurrency {
return new Concurrency(event.id, event.otherId)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import { Event } from '@boostercloud/framework-core'
import { UUID } from '@boostercloud/framework-types'

@Event
export class ConcurrencyPersisted {
public constructor(readonly id: UUID, readonly otherId: UUID) {}

public entityID(): UUID {
return this.id
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import { Projects, ReadModel } from '@boostercloud/framework-core'
import { ProjectionResult, UUID } from '@boostercloud/framework-types'
import { Concurrency } from '../entities/concurrency'

@ReadModel({
authorize: 'all',
})
export class ConcurrencyReadModel {
public constructor(readonly id: UUID) {}

@Projects(Concurrency, 'id')
public static persisted(
concurrency: Concurrency,
concurrencyReadModel?: ConcurrencyReadModel
): ProjectionResult<ConcurrencyReadModel> {
return new ConcurrencyReadModel(concurrency.id)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import { Projects, ReadModel } from '@boostercloud/framework-core'
import { ProjectionResult, UUID } from '@boostercloud/framework-types'
import { Concurrency } from '../entities/concurrency'

@ReadModel({
authorize: 'all',
})
export class OtherConcurrencyReadModel {
public constructor(readonly id: UUID, readonly otherId: UUID) {}

@Projects(Concurrency, 'id')
public static persisted(
concurrency: Concurrency,
concurrencyReadModel?: OtherConcurrencyReadModel
): ProjectionResult<OtherConcurrencyReadModel> {
return new OtherConcurrencyReadModel(concurrency.id, concurrency.otherId)
}

@Projects(Concurrency, 'otherId')
public static persistedByOtherId(
concurrency: Concurrency,
concurrencyReadModel?: OtherConcurrencyReadModel
): ProjectionResult<OtherConcurrencyReadModel> {
return new OtherConcurrencyReadModel(concurrency.otherId, concurrency.otherId)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,16 +55,24 @@ async function insertReadModel(
id: readModel?.id?.toString(),
} as ItemDefinition

await db
const { resource } = await db
.database(config.resourceNames.applicationStack)
.container(config.resourceNames.forReadModel(readModelName))
.items.create(itemModel)
logger.debug('[ReadModelAdapter#insertReadModel] Read model inserted')
logger.debug(
`Read model ${readModelName} inserted with id ${readModel.id} and metadata ${JSON.stringify(
resource?.boosterMetadata
)}`
)
} catch (err) {
const error = err as Error & { code?: unknown }
// In case of conflict (The ID provided for a resource on a PUT or POST operation has been taken by an existing resource) we should retry it
if (error?.code == AZURE_CONFLICT_ERROR_CODE) {
logger.debug('[ReadModelAdapter#insertReadModel] Read model insert failed with a conflict failure')
logger.warn(
`Read model ${readModelName} insert failed with a conflict failure with id ${
readModel.id
} and metadata ${JSON.stringify(readModel.boosterMetadata)}`
)
throw new OptimisticConcurrencyUnexpectedVersionError(error?.message)
}
logger.error('[ReadModelAdapter#insertReadModel] Read model insert failed without a conflict failure')
Expand All @@ -84,16 +92,24 @@ async function updateReadModel(
accessCondition: { condition: readModel.boosterMetadata?.optimisticConcurrencyValue || '*', type: 'IfMatch' },
} as RequestOptions
try {
await db
const { resource } = await db
.database(config.resourceNames.applicationStack)
.container(config.resourceNames.forReadModel(readModelName))
.items.upsert(readModel, options)
logger.debug('[ReadModelAdapter#updateReadModel] Read model updated')
logger.debug(
`Read model ${readModelName} updated with id ${readModel.id} and metadata ${JSON.stringify(
resource?.boosterMetadata
)}`
)
} catch (err) {
const error = err as Error & { code?: unknown }
// If there is a precondition failure then we should retry it
if (error?.code == AZURE_PRECONDITION_FAILED_ERROR) {
logger.debug('[ReadModelAdapter#updateReadModel] Read model update failed with a pre-condition failure')
logger.warn(
`Read model ${readModelName} update failed with a pre-condition failure with id ${
readModel.id
} and metadata ${JSON.stringify(readModel.boosterMetadata)}`
)
throw new OptimisticConcurrencyUnexpectedVersionError(error?.message)
}
logger.error('[ReadModelAdapter#updateReadModel] Read model update failed without a pre-condition failure')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import {
SortFor,
UUID,
} from '@boostercloud/framework-types'
import { GraphQLService, ReadModelRegistry } from '../services'
import { GraphQLService, NedbError, ReadModelRegistry, UNIQUE_VIOLATED_ERROR_TYPE } from '../services'
import { getLogger } from '@boostercloud/framework-common-helpers'
import { queryRecordFor } from './searcher-adapter'

Expand Down Expand Up @@ -44,17 +44,21 @@ export async function storeReadModel(
config: BoosterConfig,
readModelName: string,
readModel: ReadModelInterface,
_expectedCurrentVersion: number
expectedCurrentVersion: number
): Promise<void> {
const logger = getLogger(config, 'read-model-adapter#storeReadModel')
logger.debug('Storing readModel ' + JSON.stringify(readModel))
try {
await db.store({ typeName: readModelName, value: readModel } as ReadModelEnvelope)
await db.store({ typeName: readModelName, value: readModel } as ReadModelEnvelope, expectedCurrentVersion)
} catch (e) {
const error = e as Error
const error = e as NedbError
// The error will be thrown, but in case of a conditional check, we throw the expected error type by the core
// TODO: verify the name of the exception thrown in Local Provider
if (error.name == 'TODO') {
if (error.errorType == UNIQUE_VIOLATED_ERROR_TYPE) {
logger.warn(
`Unique violated storing ReadModel ${JSON.stringify(
readModel
)} and expectedCurrentVersion ${expectedCurrentVersion}`
)
throw new OptimisticConcurrencyUnexpectedVersionError(error.message)
}
throw e
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,15 @@ interface LocalSortedFor {
[key: string]: number
}

export type NedbError = Error & { [key: string | number | symbol]: unknown }

export const UNIQUE_VIOLATED_ERROR_TYPE = 'uniqueViolated'

export class ReadModelRegistry {
public readonly readModels: DataStore<ReadModelEnvelope> = new DataStore(readModelsDatabase)
constructor() {
this.readModels.loadDatabase()
this.readModels.ensureIndex({ fieldName: 'uniqueKey', unique: true })
}

public async query(
Expand All @@ -35,18 +40,46 @@ export class ReadModelRegistry {
else resolve(docs)
})
)

return queryPromise as Promise<Array<ReadModelEnvelope>>
}

public async store(readModel: ReadModelEnvelope): Promise<void> {
public async store(readModel: ReadModelEnvelope, expectedCurrentVersion: number): Promise<void> {
const uniqueReadModel: ReadModelEnvelope & { uniqueKey?: string } = readModel
uniqueReadModel.uniqueKey = `${readModel.typeName}_${readModel.value.id}_${readModel.value.boosterMetadata?.version}`
if (uniqueReadModel.value.boosterMetadata?.version === 1) {
return this.insert(readModel)
}
return this.update(uniqueReadModel, expectedCurrentVersion)
}

private insert(readModel: ReadModelEnvelope): Promise<void> {
return new Promise((resolve, reject) => {
this.readModels.insert(readModel, (err: unknown) => {
err ? reject(err) : resolve()
})
})
}

private update(readModel: ReadModelEnvelope, expectedCurrentVersion: number): Promise<void> {
return new Promise((resolve, reject) => {
this.readModels.update(
//use nedb dot notation value.id to match the record (see https://github.com/louischatriot/nedb#finding-documents)
{ typeName: readModel.typeName, 'value.id': readModel.value.id },
{
typeName: readModel.typeName,
'value.id': readModel.value.id,
'value.boosterMetadata.version': expectedCurrentVersion,
},
readModel,
{ upsert: true },
(err) => {
{ upsert: false, returnUpdatedDocs: true },
(err: unknown, numAffected: number) => {
if (numAffected === 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I though when we have an error updating something, numAffected is always 0. How do you know that if this condition is true, then it must be a optimistic concurrency error?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it's not a perfect solution but in this case we are updating read models only son we can expect that the updated documents will be greater than 0. In this case, using the query (version = expected version) while updating we will have 0 documents updated if the row was updated by another instance.

const error: NedbError = new Error(
`Can't update readModel ${JSON.stringify(
readModel
)} with expectedCurrentVersion = ${expectedCurrentVersion} . Optimistic concurrency error`
) as NedbError
error.errorType = UNIQUE_VIOLATED_ERROR_TYPE
reject(error)
}
err ? reject(err) : resolve()
}
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ export function createMockReadModelEnvelope(): ReadModelEnvelope {
age: random.number(40),
foo: random.word(),
bar: random.float(),
boosterMetadata: {
version: 1,
schemaVersion: 1,
},
},
typeName: random.word(),
}
Expand Down
Loading
Loading