Skip to content

Commit

Permalink
feat(mikro-orm): introduce transaction hooks
Browse files Browse the repository at this point in the history
closes #2370
  • Loading branch information
derevnjuk committed Jul 11, 2023
1 parent 8390c28 commit f4310a8
Show file tree
Hide file tree
Showing 12 changed files with 259 additions and 115 deletions.
69 changes: 51 additions & 18 deletions docs/tutorials/mikroorm.md
Original file line number Diff line number Diff line change
Expand Up @@ -296,16 +296,14 @@ The `@Transactional()` decorator allows you to enable a retry policy for the par
```typescript
import {OptimisticLockError} from "@mikro-orm/core";
import {RetryStrategy} from "@tsed/mikro-orm";
import {OverrideProvider} from "@tsed/di";
import {setTimeout} from "timers/promises";

export interface ExponentialBackoffOptions {
maxDepth: number;
}

@OverrideProvider(RetryStrategy)
export class ExponentialBackoff implements RetryStrategy {
private readonly maxDepth = 3;
private depth = 0;

constructor(private readonly options: ExponentialBackoffOptions) {}

public async acquire<T extends (...args: unknown[]) => unknown>(task: T): Promise<ReturnType<T>> {
try {
return (await task()) as ReturnType<T>;
Expand All @@ -323,24 +321,13 @@ export class ExponentialBackoff implements RetryStrategy {
}

private async retry<T extends (...args: unknown[]) => unknown>(task: T): Promise<ReturnType<T>> {
await this.sleep(2 ** this.depth * 50);
await setTimeout(2 ** this.depth * 50);

this.depth += 1;

return this.acquire(task);
}

private sleep(milliseconds: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, milliseconds));
}
}

registerProvider({
provide: RetryStrategy,
useFactory(): ExponentialBackoff {
return new ExponentialBackoff({maxDepth: 3});
}
});
```

`ExponentialBackoff` invokes passed function recursively is contained in a try/catch block. The method returns control to the interceptor if the call to the `task` function succeeds without throwing an exception. If the `task` method fails, the catch block examines the reason for the failure. If it's optimistic locking the code waits for a short delay before retrying the operation.
Expand Down Expand Up @@ -397,6 +384,52 @@ export class SomeSubscriber implements EventSubscriber {
}
```

## Transaction Hooks

The transaction hooks allow you to customize the default transaction behavior. These hooks enable you to execute custom code before and after committing data to the database. These transaction hooks provide a flexible way to extend the default transaction behavior and implement advanced patterns such as the Inbox pattern or domain event dispatching.

### BeforeTransactionCommit Hook

The `BeforeTransactionCommit` interface allows you to define hooks that are executed right before committing data to the database. This hook provides a way to modify data within the same transaction context and perform additional operations before the transaction is committed.

To use the `BeforeTransactionCommit` hook, first, you have to implement the `BeforeTransactionCommit` interface:

```typescript
import {BeforeTransactionCommit} from "@tsed/mikro-orm";
import {EntityManager} from "@mikro-orm/core";
import {Injectable} from "@tsed/di";

@Injectable()
export class Hooks implements BeforeTransactionCommit {
$beforeTransactionCommit(em: EntityManager): Promise<unknown> | unknown {
// Custom code executed before committing data
}
}
```

Then just write your code inside the `$beforeTransactionCommit` method. This code will be executed before the transaction is committed.

### AfterTransactionCommit Hook

The `AfterTransactionCommit` interface allows you to define hooks that are executed right after committing data to the database. This hook enables you to execute code after the data is committed, making multiple transactions.

To use the `AfterTransactionCommit` hook, you have to implement the `AfterTransactionCommit` interface:

```typescript
import {AfterTransactionCommit} from "@tsed/mikro-orm";
import {EntityManager} from "@mikro-orm/core";
import {Injectable} from "@tsed/di";

@Injectable()
export class Hooks implements AfterTransactionCommit {
$afterTransactionCommit(em: EntityManager): Promise<unknown> | unknown {
// Custom code executed after committing data
}
}
```

It's important to note that when using the `AfterTransactionCommit` hook, you need to handle eventual consistency and compensatory actions in case of failures on your own.

## Author

<GithubContributors :users="['derevnjuk']"/>
Expand Down
71 changes: 53 additions & 18 deletions packages/orm/mikro-orm/readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -316,16 +316,14 @@ The `@Transactional()` decorator allows you to enable a retry policy for the par
```typescript
import {OptimisticLockError} from "@mikro-orm/core";
import {RetryStrategy} from "@tsed/mikro-orm";
import {OverrideProvider} from "@tsed/di";
import {setTimeout} from "timers/promises";

export interface ExponentialBackoffOptions {
maxDepth: number;
}

@OverrideProvider(RetryStrategy)
export class ExponentialBackoff implements RetryStrategy {
private readonly maxDepth = 3;
private depth = 0;

constructor(private readonly options: ExponentialBackoffOptions) {}

public async acquire<T extends (...args: unknown[]) => unknown>(task: T): Promise<ReturnType<T>> {
try {
return (await task()) as ReturnType<T>;
Expand All @@ -343,24 +341,13 @@ export class ExponentialBackoff implements RetryStrategy {
}

private async retry<T extends (...args: unknown[]) => unknown>(task: T): Promise<ReturnType<T>> {
await this.sleep(2 ** this.depth * 50);
await setTimeout(2 ** this.depth * 50);

this.depth += 1;

return this.acquire(task);
}

private sleep(milliseconds: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, milliseconds));
}
}

registerProvider({
provide: RetryStrategy,
useFactory(): ExponentialBackoff {
return new ExponentialBackoff({maxDepth: 3});
}
});
```

`ExponentialBackoff` invokes passed function recursively is contained in a try/catch block. The method returns control to the interceptor if the call to the `task` function succeeds without throwing an exception. If the `task` method fails, the catch block examines the reason for the failure. If it's optimistic locking the code waits for a short delay before retrying the operation.
Expand Down Expand Up @@ -417,6 +404,54 @@ export class SomeSubscriber implements EventSubscriber {
}
```

## Transaction Hooks

The transaction hooks allow you to customize the default transaction behavior. These hooks enable you to execute custom code before and after committing data to the database. These transaction hooks provide a flexible way to extend the default transaction behavior and implement advanced patterns such as the Inbox pattern or domain event dispatching.

### BeforeTransactionCommit Hook

The `BeforeTransactionCommit` interface allows you to define hooks that are executed right before committing data to the database. This hook provides a way to modify data within the same transaction context and perform additional operations before the transaction is committed.

To use the `BeforeTransactionCommit` hook, first, you have to implement the `BeforeTransactionCommit` interface:

```typescript
import {BeforeTransactionCommit} from "@tsed/mikro-orm";
import {EntityManager} from "@mikro-orm/core";
import {Injectable} from "@tsed/di";

@Injectable()
export class Hooks implements BeforeTransactionCommit {
$beforeTransactionCommit(em: EntityManager): Promise<unknown> | unknown {
// Custom code executed before committing data
}
}
```

Then just write your code inside the `$beforeTransactionCommit` method. This code will be executed before the transaction is committed.

### AfterTransactionCommit Hook

The `AfterTransactionCommit` interface allows you to define hooks that are executed right after committing data to the database. This hook enables you to execute code after the data is committed, making multiple transactions.

To use the `AfterTransactionCommit` hook, you have to implement the `AfterTransactionCommit` interface:

```typescript
import {AfterTransactionCommit} from "@tsed/mikro-orm";
import {EntityManager} from "@mikro-orm/core";
import {Injectable} from "@tsed/di";

@Injectable()
export class Hooks implements AfterTransactionCommit {
$afterTransactionCommit(em: EntityManager): Promise<unknown> | unknown {
// Custom code executed after committing data
}
}
```

::: tip Note
When using the `AfterTransactionCommit` hook, you need to handle eventual consistency and compensatory actions in case of failures on your own.
:::

## Contributors

Please read [contributing guidelines here](https://tsed.io/contributing.html)
Expand Down
2 changes: 1 addition & 1 deletion packages/orm/mikro-orm/src/MikroOrmModule.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import {
} from "@tsed/di";
import {EventSubscriber, Options} from "@mikro-orm/core";
import {MikroOrmRegistry} from "./services/MikroOrmRegistry";
import {RetryStrategy} from "./services/RetryStrategy";
import {RetryStrategy} from "./interfaces/RetryStrategy";
import {OptimisticLockErrorFilter} from "./filters/OptimisticLockErrorFilter";
import {MikroOrmContext} from "./services/MikroOrmContext";
import {classOf, isFunction, Store} from "@tsed/core";
Expand Down
4 changes: 3 additions & 1 deletion packages/orm/mikro-orm/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ export * from "./decorators/subscriber";
export * from "./decorators/transactional";
export * from "./filters/OptimisticLockErrorFilter";
export * from "./interceptors/TransactionalInterceptor";
export * from "./interfaces/AfterTransactionCommit";
export * from "./interfaces/BeforeTransactionCommit";
export * from "./interfaces/RetryStrategy";
export * from "./services/MikroOrmContext";
export * from "./services/MikroOrmFactory";
export * from "./services/MikroOrmRegistry";
export * from "./services/RetryStrategy";
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
import {TransactionalInterceptor} from "./TransactionalInterceptor";
import {anyFunction, anything, deepEqual, instance, mock, objectContaining, reset, verify, when} from "ts-mockito";
import {InterceptorContext} from "@tsed/di";
import {InjectorService, InterceptorContext} from "@tsed/di";
import {Logger} from "@tsed/logger";
import {EntityManager, IsolationLevel, MikroORM, OptimisticLockError} from "@mikro-orm/core";
import {MikroOrmRegistry} from "../services/MikroOrmRegistry";
import {RetryStrategy} from "../services/RetryStrategy";
import {RetryStrategy} from "../interfaces/RetryStrategy";
import {MikroOrmContext} from "../services/MikroOrmContext";

describe("TransactionalInterceptor", () => {
const mockedInjectorService = mock<InjectorService>();
const mockedMikroOrmRegistry = mock<MikroOrmRegistry>();
const mockedMikroOrm = mock<MikroORM>();
const mockedEntityManager = mock<EntityManager>();
Expand All @@ -20,7 +21,8 @@ describe("TransactionalInterceptor", () => {

afterEach(() => {
next.mockReset();
reset<MikroOrmRegistry | MikroORM | EntityManager | Logger | RetryStrategy | MikroOrmContext>(
reset<InjectorService | MikroOrmRegistry | MikroORM | EntityManager | Logger | RetryStrategy | MikroOrmContext>(
mockedInjectorService,
mockedMikroOrmRegistry,
mockedMikroOrm,
mockedEntityManager,
Expand All @@ -33,6 +35,7 @@ describe("TransactionalInterceptor", () => {
when(mockedMikroOrm.em).thenReturn(instance(mockedEntityManager));

transactionalInterceptor = new TransactionalInterceptor(
instance(mockedInjectorService),
instance(mockedMikroOrmRegistry),
instance(mockedMikroOrmContext),
instance(mockedLogger)
Expand Down Expand Up @@ -72,43 +75,44 @@ describe("TransactionalInterceptor", () => {
verify(mockedEntityManager.transactional(anyFunction(), objectContaining({isolationLevel: IsolationLevel.SERIALIZABLE}))).once();
});

it("should set an isolation level by default", async () => {
it("should run within a new context", async () => {
// arrange
const context = {options: {}} as InterceptorContext;
const context = {} as InterceptorContext;
const entityManger = instance(mockedEntityManager);

when(mockedMikroOrmRegistry.get(anything())).thenReturn(instance(mockedMikroOrm));
when(mockedMikroOrmContext.has(anything())).thenReturn(true);
when(mockedMikroOrmContext.has(anything())).thenReturn(false);
when(mockedMikroOrmContext.get(anything())).thenReturn(entityManger);
when(mockedMikroOrmContext.run(anything(), anything())).thenCall((_: EntityManager[], func: (...args: unknown[]) => unknown) =>
func()
);

// act
await transactionalInterceptor.intercept(context, next);

// assert
verify(mockedEntityManager.transactional(anyFunction(), objectContaining({isolationLevel: IsolationLevel.READ_COMMITTED}))).once();
verify(mockedMikroOrmContext.run(deepEqual([entityManger]), anyFunction())).twice();
verify(mockedEntityManager.transactional(anyFunction(), objectContaining({}))).once();
});

it("should run within a new context", async () => {
it("should perform a task within a transaction", async () => {
// arrange
const context = {} as InterceptorContext;
const entityManger = instance(mockedEntityManager);

when(mockedMikroOrmRegistry.get(anything())).thenReturn(instance(mockedMikroOrm));
when(mockedMikroOrmContext.has(anything())).thenReturn(false);
when(mockedMikroOrmContext.has(anything())).thenReturn(true);
when(mockedMikroOrmContext.get(anything())).thenReturn(entityManger);
when(mockedMikroOrmContext.run(anything(), anything())).thenCall((_: EntityManager[], func: (...args: unknown[]) => unknown) =>
func()
);
when(mockedEntityManager.transactional(anything(), anything())).thenCall((func: (...args: unknown[]) => unknown) => func());

// act
await transactionalInterceptor.intercept(context, next);

// assert
verify(mockedMikroOrmContext.run(deepEqual([entityManger]), anyFunction())).once();
verify(mockedEntityManager.transactional(anyFunction(), objectContaining({}))).once();
expect(next).toHaveBeenCalled();
});

it("should perform a task within a transaction", async () => {
it("should emit events in the right order", async () => {
// arrange
const context = {} as InterceptorContext;
const entityManger = instance(mockedEntityManager);
Expand All @@ -117,12 +121,17 @@ describe("TransactionalInterceptor", () => {
when(mockedMikroOrmContext.has(anything())).thenReturn(true);
when(mockedMikroOrmContext.get(anything())).thenReturn(entityManger);
when(mockedEntityManager.transactional(anything(), anything())).thenCall((func: (...args: unknown[]) => unknown) => func());
when(mockedMikroOrmContext.run(anything(), anything())).thenCall((_: EntityManager[], func: (...args: unknown[]) => unknown) =>
func()
);

// act
await transactionalInterceptor.intercept(context, next);

// assert
expect(next).toHaveBeenCalled();
verify(mockedInjectorService.alterAsync("$beforeTransactionCommit", entityManger)).calledBefore(
mockedInjectorService.alterAsync("$afterTransactionCommit", entityManger)
);
});

it("should disable an explicit transaction", async () => {
Expand Down Expand Up @@ -190,6 +199,7 @@ describe("TransactionalInterceptor", () => {
it("should apply a retry mechanism if retry is enabled", async () => {
// arrange
transactionalInterceptor = new TransactionalInterceptor(
instance(mockedInjectorService),
instance(mockedMikroOrmRegistry),
instance(mockedMikroOrmContext),
instance(mockedLogger),
Expand Down
Loading

0 comments on commit f4310a8

Please sign in to comment.