Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Local command bus context #66

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,6 @@
"lint-staged": {
"*.ts": "eslint --fix",
"*.json": "prettier --write"
}
},
"packageManager": "[email protected]+sha512.60c18acd138bff695d339be6ad13f7e936eea6745660d4cc4a776d5247c540d0edee1a563695c183a66eb917ef88f2b4feb1fc25f32a7adcadc7aaf3438e99c1"
}
11 changes: 7 additions & 4 deletions packages/ddd-toolkit/src/command-bus/command-bus.interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,15 @@ export interface ICommandClass<C extends ICommand<unknown, unknown>> {
new (payload: unknown): C;
}

export interface ICommandHandler<C extends ICommand<unknown, unknown>> {
handle: (command: C) => Promise<C['_returnType']>;
export interface ICommandHandler<C extends ICommand<unknown, unknown>, TContext = void> {
handle: (command: C, context?: TContext) => Promise<C['_returnType']>;
}

export interface ICommandBus {
register<C extends ICommand<unknown, unknown>>(command: ICommandClass<C>, handler: ICommandHandler<C>): void;
export interface ICommandBus<TContext> {
register<C extends ICommand<unknown, unknown>>(
command: ICommandClass<C>,
handler: ICommandHandler<C, TContext>,
): void;

send<C extends ICommand<unknown, unknown>>(command: C): Promise<void>;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
export interface IContextManager<TContext> {
wrapWithContext<T>(operation: (context: TContext) => Promise<T>, context?: TContext): Promise<T>;
}
87 changes: 82 additions & 5 deletions packages/ddd-toolkit/src/command-bus/local-command-bus.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { Command } from './command';
import { waitFor } from '../utils';
import { ICommandHandler } from './command-bus.interface';
import { ILogger } from '../logger';
import { IContextManager } from './context-manager.interface';

const loggerMock: ILogger = {
log: jest.fn(),
Expand All @@ -24,17 +25,15 @@ class BarCommand extends Command<{ foo: string }> {
}

describe('LocalCommandBus', () => {
afterEach(() => jest.resetAllMocks());

describe('Given an command bus', () => {
let commandBus: LocalCommandBus;
let commandBus: LocalCommandBus<unknown>;

beforeEach(() => {
commandBus = new LocalCommandBus(loggerMock, 3, 100);
});

afterEach(() => {
jest.resetAllMocks();
});

describe('Given no registered handler to foo command', () => {
describe('When send a foo command', () => {
it('Should log warning message', async () => {
Expand Down Expand Up @@ -160,6 +159,84 @@ describe('LocalCommandBus', () => {
});
});

describe('Given a command bus with context manager', () => {
type TContext = { contextKey: string };

let commandBus: LocalCommandBus<TContext>;

class FooContextManager implements IContextManager<TContext> {
public async wrapWithContext<T>(
operation: (context: TContext) => Promise<T>,
existingContext: TContext,
): Promise<T> {
const context: TContext = existingContext || { contextKey: 'foo-context' };
return await operation(context);
}
}

beforeEach(() => {
commandBus = new LocalCommandBus(loggerMock, 3, 100, new FooContextManager());
});

it('should be defined', () => {
expect(commandBus).toBeDefined();
});

describe('Given one registered handler to foo command', () => {
const FooHandlerMock = jest.fn();

class FooCommandHandler implements ICommandHandler<FooCommand> {
async handle(...args: unknown[]) {
return await FooHandlerMock(args);
}
}

beforeEach(() => {
commandBus.register(FooCommand, new FooCommandHandler());
});

describe('When sendSync a foo command', () => {
it('context should be passed to command handler', async () => {
const command = new FooCommand({ foo: 'bar' });
await commandBus.sendSync(command);

expect(FooHandlerMock).toHaveBeenCalledWith([command, { contextKey: 'foo-context' }]);
});
});

describe('When sendSync a foo command with existing context', () => {
it('context should be passed to command handler', async () => {
const command = new FooCommand({ foo: 'bar' });
await commandBus.sendSync(command, { contextKey: 'bar-context' });

expect(FooHandlerMock).toHaveBeenCalledWith([command, { contextKey: 'bar-context' }]);
});
});
});

describe('Given one registered failing handler to foo command', () => {
const FooHandlerMock = jest.fn();

class FooCommandHandler implements ICommandHandler<FooCommand> {
async handle(...args: unknown[]) {
return await FooHandlerMock(args);
}
}

beforeEach(() => {
FooHandlerMock.mockRejectedValue(new Error('ko'));
commandBus.register(FooCommand, new FooCommandHandler());
});

describe('When sendSync a foo command', () => {
it('should throw', async () => {
const command = new FooCommand({ foo: 'bar' });
await expect(() => commandBus.sendSync(command)).rejects.toThrow();
});
});
});
});

it('default retry max attempts should be 0', () => {
const commandBus = new LocalCommandBus(loggerMock);
expect(commandBus['retryMaxAttempts']).toBe(0);
Expand Down
26 changes: 18 additions & 8 deletions packages/ddd-toolkit/src/command-bus/local-command-bus.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,30 +2,32 @@ import { ILogger } from '../logger';
import { ICommand, ICommandBus, ICommandClass, ICommandHandler } from './command-bus.interface';
import { ExponentialBackoff, IRetryMechanism } from '../event-bus';
import { inspect } from 'util';
import { IContextManager } from './context-manager.interface';

export class LocalCommandBus implements ICommandBus {
export class LocalCommandBus<TContext = void> implements ICommandBus<TContext> {
private readonly retryMechanism: IRetryMechanism;

private handlers: { [key: string]: ICommandHandler<ICommand<unknown, unknown>> } = {};
private handlers: { [key: string]: ICommandHandler<ICommand<unknown, unknown>, TContext> } = {};

constructor(
private logger: ILogger,
private readonly retryMaxAttempts = 0,
retryInitialDelay = 500,
private readonly contextManager?: IContextManager<TContext>,
) {
this.retryMechanism = new ExponentialBackoff(retryInitialDelay);
}

public register<C extends ICommand<unknown, unknown>>(
command: ICommandClass<C>,
handler: ICommandHandler<C>,
handler: ICommandHandler<C, TContext>,
): void {
if (this.handlers[command.name]) throw new Error(`Command ${command.name} is already registered`);
this.handlers[command.name] = handler;
}

public async send<C extends ICommand<unknown, unknown>>(command: C): Promise<void> {
const handler = this.handlers[command.name] as ICommandHandler<C>;
const handler = this.handlers[command.name] as ICommandHandler<C, TContext>;
if (!handler) {
this.logger.warn(`No handler found for ${command.name}`);
return;
Expand All @@ -34,15 +36,23 @@ export class LocalCommandBus implements ICommandBus {
void this.handleCommand(command, handler);
}

public async sendSync<C extends ICommand<unknown, unknown>>(command: C): Promise<C['_returnType']> {
const handler = this.handlers[command.name] as ICommandHandler<C>;
public async sendSync<C extends ICommand<unknown, unknown>>(
command: C,
context?: TContext,
): Promise<C['_returnType']> {
const handler = this.handlers[command.name] as ICommandHandler<C, TContext>;
if (!handler) throw new Error(`No handler found for ${command.name}`);
return await handler.handle(command);

return this.contextManager
? await this.contextManager.wrapWithContext(async (context) => {
return await handler.handle(command, context);
}, context)
: await handler.handle(command);
}

private async handleCommand<C extends ICommand<unknown, unknown>>(
command: C,
handler: ICommandHandler<C>,
handler: ICommandHandler<C, TContext>,
attempt = 0,
) {
try {
Expand Down