From f6410e795b88a8795a74dc6c08be63295823e682 Mon Sep 17 00:00:00 2001 From: Matthias Seemann Date: Mon, 26 Feb 2024 18:58:52 +0100 Subject: [PATCH] add induceError and induceEnd --- .editorconfig | 6 +++++ .gitignore | 3 ++- README.md | 17 +++++++++----- package.json | 2 +- src/index.ts | 15 +++++++++++-- test/index-test.ts | 55 ++++++++++++++++++++++++++++++++++++++++++++++ test/tsconfig.json | 3 ++- 7 files changed, 91 insertions(+), 10 deletions(-) create mode 100644 .editorconfig diff --git a/.editorconfig b/.editorconfig new file mode 100644 index 0000000..9b63eb7 --- /dev/null +++ b/.editorconfig @@ -0,0 +1,6 @@ +root = true + +[*] +indent_style = space +indent_size = 2 +end_of_line = LF diff --git a/.gitignore b/.gitignore index 9ff2e6a..01caeea 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ node_modules/ .rpt2_cache/ -dist/ \ No newline at end of file +dist/ +.idea diff --git a/README.md b/README.md index ab737de..f0e7cb5 100644 --- a/README.md +++ b/README.md @@ -21,20 +21,27 @@ yarn add @most/adapter ### Adapter ```typescript -type Adapter = [(event: A) => void, Stream] +type Adapter = [(event: A) => void, Stream, (e: Error) => void, () => void] ``` -An adapter is an entangled pair of an event stream, and a function to induce (cause) events in that stream. +An adapter is an entangled tuple of an event stream, and a functions to induce (cause) events or an error in that stream, or to end it. -### createAdapter :: () → Adapter +### createAdapter :: () → Adapter A, A Create an Adapter. ```typescript -const [induce, events] = createAdapter() +const [induce, events, induceError] = createAdapter() // Cause an event with value "hello" to occur in events. induce('hello') // Cause an event with value "world" to occur in events. induce('world') -``` \ No newline at end of file +// Cause events to fail with the error +induceError(new Error("oops!")) + +const [, eventsToo, , endStream] = createAdapter() + +// Cause eventsToo to end w/o a value +endStream() +``` diff --git a/package.json b/package.json index 44426c8..d719d09 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@most/adapter", - "version": "1.0.0", + "version": "1.1.0", "main": "dist/index.js", "module": "dist/index.mjs", "author": "Brian Cavalier ", diff --git a/src/index.ts b/src/index.ts index 832f59e..62fac09 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,15 +1,26 @@ import { Disposable, Scheduler, Sink, Stream, Time } from '@most/types' -export type Adapter = [(event: A) => void, Stream] +export type Adapter = [(event: A) => void, Stream, (e: Error) => void, () => void] export const createAdapter = (): Adapter => { const sinks: { sink: Sink, scheduler: Scheduler }[] = [] - return [a => broadcast(sinks, a), new FanoutPortStream(sinks)] + return [ + a => broadcast(sinks, a), + new FanoutPortStream(sinks), + broadcastError(sinks), + broadcastEnd(sinks) + ] } const broadcast = (sinks: { sink: Sink, scheduler: Scheduler }[], a: A): void => sinks.slice().forEach(({ sink, scheduler }) => tryEvent(scheduler.currentTime(), a, sink)) +const broadcastError = (sinks: { sink: Sink, scheduler: Scheduler }[]) => (e: Error) : void => + sinks.slice().forEach(({ sink, scheduler }) => sink.error(scheduler.currentTime(), e)) + +const broadcastEnd = (sinks: { sink: Sink, scheduler: Scheduler }[]) => () : void => + sinks.slice().forEach(({ sink, scheduler }) => sink.end(scheduler.currentTime())) + export class FanoutPortStream { constructor (private readonly sinks: { sink: Sink, scheduler: Scheduler }[]) {} diff --git a/test/index-test.ts b/test/index-test.ts index a55917d..8d71198 100644 --- a/test/index-test.ts +++ b/test/index-test.ts @@ -19,4 +19,59 @@ describe('createAdapter', () => { assert(f1.called) assert(f2.called) }) + + it ('broadcasts an error event', () :Promise => { + const [induce, s, induceError] = createAdapter() + const sampleError = new Error("sample error") + const f1 = fake() + const f2 = fake() + const s1 = tap(f1, s) + const s2 = tap(f2, s) + const scheduler = newDefaultScheduler() + const retVal = Promise.all([ + runEffects(s1, scheduler) + .then( + () => { assert(false); }, + (e: Error) => { + assert(e === sampleError); + assert(f1.notCalled); + } + ), + runEffects(s2, scheduler) + .then( + () => { assert(false); }, + (e: Error) => { + assert(e === sampleError); + assert(f2.notCalled); + } + ) + ]) + + induceError(sampleError); + induce(undefined); + + return retVal + }) + + it ('ends all observers after emitting an event', () => { + const [induce, s, , induceEnd] = createAdapter() + const sampleEvent = { foo: 'bar' } + const f1 = fake() + const f2 = fake() + const s1 = tap(f1, s) + const s2 = tap(f2, s) + const scheduler = newDefaultScheduler() + const retVal = Promise.all([ + runEffects(s1, scheduler) + .then(() => { assert(f1.calledOnceWith(sampleEvent)) }), + runEffects(s2, scheduler) + .then(() => { assert(f2.calledOnceWith(sampleEvent)) }) + ]) + + induce(sampleEvent); + induceEnd(); + + return retVal + }) + }) diff --git a/test/tsconfig.json b/test/tsconfig.json index 09479fd..f5f4614 100644 --- a/test/tsconfig.json +++ b/test/tsconfig.json @@ -1,6 +1,7 @@ { "extends": "../tsconfig", "compilerOptions": { - "module": "commonjs" + "module": "commonjs", + "lib": ["es2015"] } }