Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add induceError and induceEnd #310

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .editorconfig
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
root = true

[*]
indent_style = space
indent_size = 2
end_of_line = LF
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
node_modules/
.rpt2_cache/
dist/
dist/
.idea
17 changes: 12 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,27 @@ yarn add @most/adapter
### Adapter<A, B>

```typescript
type Adapter<A, B> = [(event: A) => void, Stream<B>]
type Adapter<A, B> = [(event: A) => void, Stream<B>, (e: Error) => void, () => void]
```

An adapter is an entangled pair of an event stream, and a function to induce (cause) events in that stream.
An adapter is an entangled tuple of an event stream, and a functions to induce (cause) events or an error in that stream, or to end it.

### createAdapter :: () → Adapter<A, A>
### createAdapter :: () → Adapter A, A

Create an Adapter.

```typescript
const [induce, events] = createAdapter<string>()
const [induce, events, induceError] = createAdapter<string>()

// Cause an event with value "hello" to occur in events.
induce('hello')
// Cause an event with value "world" to occur in events.
induce('world')
```
// Cause events to fail with the error
induceError(new Error("oops!"))

const [, eventsToo, , endStream] = createAdapter<any>()

// Cause eventsToo to end w/o a value
endStream()
```
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@most/adapter",
"version": "1.0.0",
"version": "1.1.0",
"main": "dist/index.js",
"module": "dist/index.mjs",
"author": "Brian Cavalier <[email protected]>",
Expand Down
15 changes: 13 additions & 2 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,26 @@
import { Disposable, Scheduler, Sink, Stream, Time } from '@most/types'

export type Adapter<A, B> = [(event: A) => void, Stream<B>]
export type Adapter<A, B> = [(event: A) => void, Stream<B>, (e: Error) => void, () => void]

export const createAdapter = <A> (): Adapter<A, A> => {
const sinks: { sink: Sink<A>, scheduler: Scheduler }[] = []
return [a => broadcast(sinks, a), new FanoutPortStream(sinks)]
return [
a => broadcast(sinks, a),
new FanoutPortStream(sinks),
broadcastError(sinks),
broadcastEnd(sinks)
]
}

const broadcast = <A> (sinks: { sink: Sink<A>, scheduler: Scheduler }[], a: A): void =>
sinks.slice().forEach(({ sink, scheduler }) => tryEvent(scheduler.currentTime(), a, sink))

const broadcastError = <A> (sinks: { sink: Sink<A>, scheduler: Scheduler }[]) => (e: Error) : void =>
sinks.slice().forEach(({ sink, scheduler }) => sink.error(scheduler.currentTime(), e))

const broadcastEnd = <A> (sinks: { sink: Sink<A>, scheduler: Scheduler }[]) => () : void =>
sinks.slice().forEach(({ sink, scheduler }) => sink.end(scheduler.currentTime()))

export class FanoutPortStream<A> {
constructor (private readonly sinks: { sink: Sink<A>, scheduler: Scheduler }[]) {}

Expand Down
55 changes: 55 additions & 0 deletions test/index-test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,59 @@ describe('createAdapter', () => {
assert(f1.called)
assert(f2.called)
})

it ('broadcasts an error event', () :Promise<any> => {
const [induce, s, induceError] = createAdapter()
const sampleError = new Error("sample error")
const f1 = fake()
const f2 = fake()
const s1 = tap(f1, s)
const s2 = tap(f2, s)
const scheduler = newDefaultScheduler()
const retVal = Promise.all([
runEffects(s1, scheduler)
.then(
() => { assert(false); },
(e: Error) => {
assert(e === sampleError);
assert(f1.notCalled);
}
),
runEffects(s2, scheduler)
.then(
() => { assert(false); },
(e: Error) => {
assert(e === sampleError);
assert(f2.notCalled);
}
)
])

induceError(sampleError);
induce(undefined);

return retVal
})

it ('ends all observers after emitting an event', () => {
const [induce, s, , induceEnd] = createAdapter()
const sampleEvent = { foo: 'bar' }
const f1 = fake()
const f2 = fake()
const s1 = tap(f1, s)
const s2 = tap(f2, s)
const scheduler = newDefaultScheduler()
const retVal = Promise.all([
runEffects(s1, scheduler)
.then(() => { assert(f1.calledOnceWith(sampleEvent)) }),
runEffects(s2, scheduler)
.then(() => { assert(f2.calledOnceWith(sampleEvent)) })
])

induce(sampleEvent);
induceEnd();

return retVal
})

})
3 changes: 2 additions & 1 deletion test/tsconfig.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
{
"extends": "../tsconfig",
"compilerOptions": {
"module": "commonjs"
"module": "commonjs",
"lib": ["es2015"]
}
}