diff --git a/.changeset/nice-chefs-try.md b/.changeset/nice-chefs-try.md new file mode 100644 index 0000000..4045286 --- /dev/null +++ b/.changeset/nice-chefs-try.md @@ -0,0 +1,5 @@ +--- +"windpipe": patch +--- + +add `exhaust` stream consumer diff --git a/.changeset/red-glasses-suffer.md b/.changeset/red-glasses-suffer.md new file mode 100644 index 0000000..34fb510 --- /dev/null +++ b/.changeset/red-glasses-suffer.md @@ -0,0 +1,5 @@ +--- +"windpipe": minor +--- + +alter `flat*` APIs to simplify handlers diff --git a/src/handler.ts b/src/handler.ts index cc06c00..d9ef73f 100644 --- a/src/handler.ts +++ b/src/handler.ts @@ -1,5 +1,13 @@ -import type { Atom, MaybeAtom } from "./atom"; -import * as atom from "./atom"; +import { + normalise, + ok, + unknown, + isUnknown, + type Atom, + type AtomOk, + type AtomUnknown, + type MaybeAtom, +} from "./atom"; import type { MaybePromise } from "./util"; /** @@ -24,17 +32,27 @@ export async function handler( handler: () => MaybePromise>, trace: string[], ): Promise> { - try { - // Run the handler - const rawResult = handler(); + const result = await run(handler, trace); + + if (isUnknown(result)) { + return result; + } - // Normalise the returned promise - const result = await normalisePromise(rawResult); + return normalise(result.value); +} - // Normalise as an atom and return - return atom.normalise(result); +/** + * Run some callback. If it completes successfully, the value will be returned as `AtomOk`. If an + * error is thrown, it will be caught and returned as an `AtomUnknown`. `AtomError` will never be + * produced from this helper. + */ +export async function run( + cb: () => MaybePromise, + trace: string[], +): Promise | AtomUnknown> { + try { + return ok(await normalisePromise(cb())) as AtomOk; } catch (e) { - // Unknown error thrown, return it - return atom.unknown(e, trace); + return unknown(e, trace) as AtomUnknown; } } diff --git a/src/stream/consumption.ts b/src/stream/consumption.ts index 01d7943..405d8bc 100644 --- a/src/stream/consumption.ts +++ b/src/stream/consumption.ts @@ -1,6 +1,7 @@ import { Readable } from "node:stream"; import { isOk, type Atom } from "../atom"; import { StreamBase } from "./base"; +import { exhaust } from "../util"; export class StreamConsumption extends StreamBase { /** @@ -12,6 +13,14 @@ export class StreamConsumption extends StreamBase { return this.stream[Symbol.asyncIterator](); } + /** + * Completely exhaust the stream, driving it to completion. This is particularly useful when + * side effects of the stream are desired, but the actual values of the stream are not needed. + */ + exhaust(): Promise { + return exhaust(this); + } + /** * Create an async iterator that will emit each value in the stream. * diff --git a/src/stream/higher-order.ts b/src/stream/higher-order.ts index f2acdeb..00d48c0 100644 --- a/src/stream/higher-order.ts +++ b/src/stream/higher-order.ts @@ -1,46 +1,109 @@ import type { Stream } from "."; -import { isError, isOk, type Atom, type MaybeAtom, isUnknown } from "../atom"; -import { handler } from "../handler"; -import type { CallbackOrStream, MaybePromise } from "../util"; +import { isError, isOk, type Atom, isUnknown } from "../atom"; +import { run } from "../handler"; +import { type CallbackOrStream, type MaybePromise, exhaust } from "../util"; import { StreamTransforms } from "./transforms"; -type FlatMapResult = { atom: Atom } | { stream: Promise, E>> }; +function accept(value: T): { accept: T } { + return { accept: value }; +} + +function reject(value: T): { reject: T } { + return { reject: value }; +} + +type FilterResult = { accept: A } | { reject: R }; export class HigherOrderStream extends StreamTransforms { /** - * Map over each value in the stream, produce a stream from it, and flatten all the value - * streams together + * Base implementation of `flat*` operations. In general, all of these methods will filter over + * the type of atom, run some stream-producing callback with it, and then produce a new + * generator to expand into the stream. * - * @group Higher Order + * @template A - The accepted type from the filter. Allows for type narrowing from `Atom` + * into `AtomOk`, etc. + * @template U - The `ok` type of the produced stream. + * @template F - The `error` type of the produced stream. + * @template CT - The `ok` type of the stream produced from the callback. + * @template CE - The `error` type of the stream produced from the callback. */ - flatMap(cb: (value: T) => MaybePromise, E>>): Stream { - const trace = this.trace("flatMap"); + private flatOp( + filter: (atom: Atom) => FilterResult>, + cb: (atom: A) => MaybePromise>, + process: (atom: Atom, stream: Stream) => AsyncGenerator>, + ): Stream { + const trace = this.trace("flatOp"); - return this.flatMapAtom((atom) => { - if (isOk(atom)) { - return { stream: handler(() => cb(atom.value), trace) }; - } else { - return { atom }; + return this.consume(async function* (it) { + for await (const atom of it) { + const result = filter(atom); + + if ("reject" in result) { + yield result.reject; + continue; + } + + // Run the flat map handler + const streamAtom = await run(() => cb(result.accept), trace); + + // If an error was emitted whilst initialising the new stream, return it + if (!isOk(streamAtom)) { + yield streamAtom; + continue; + } + + // Otherwise, consume the iterator + yield* process(atom, streamAtom.value); } }); } + /** + * Internal helper for implementing the other `flatMap` methods. + */ + private flatMapAtom( + filter: (atom: Atom) => FilterResult>, + cb: (atom: A) => MaybePromise>, + ): Stream { + this.trace("flatMapAll"); + + return this.flatOp(filter, cb, async function* (_atom, stream) { + yield* stream; + }); + } + + /** + * Map over each value in the stream, produce a stream from it, and flatten all the value + * streams together + * + * @group Higher Order + */ + flatMap(cb: (value: T) => MaybePromise>): Stream { + this.trace("flatMap"); + + return this.flatMapAtom( + (atom) => (isOk(atom) ? accept(atom) : reject(atom)), + (atom) => { + return cb(atom.value); + }, + ); + } + /** * Map over each error in the stream, produce a stream from it, and flatten all the value * streams together. * * @group Higher Order */ - flatMapError(cb: (value: E) => MaybePromise, F>>): Stream { - const trace = this.trace("flatMapError"); - - return this.flatMapAtom((atom) => { - if (isError(atom)) { - return { stream: handler(() => cb(atom.value), trace) }; - } else { - return { atom }; - } - }); + flatMapError(cb: (value: E) => MaybePromise>): Stream { + this.trace("flatMapError"); + + return this.flatMapAtom( + (atom) => (isError(atom) ? accept(atom) : reject(atom)), + (atom) => { + return cb(atom.value); + }, + ); } /** @@ -50,47 +113,31 @@ export class HigherOrderStream extends StreamTransforms { * @group Higher Order */ flatMapUnknown( - cb: (value: unknown, trace: string[]) => MaybePromise, E>>, + cb: (value: unknown, trace: string[]) => MaybePromise>, ): Stream { const trace = this.trace("flatMapUnknown"); - return this.flatMapAtom((atom) => { - if (isUnknown(atom)) { - return { stream: handler(() => cb(atom.value, atom.trace), trace) }; - } else { - return { atom }; - } - }); + return this.flatMapAtom( + (atom) => (isUnknown(atom) ? accept(atom) : reject(atom)), + (atom) => { + return cb(atom.value, trace); + }, + ); } /** - * Internal helper for implementing the other `flatMap` methods. - * - * The provided callback *must* not be able to throw. It is expected that any user code run - * within is properly handled. + * Base implementation of the `flatTap` operations. */ - private flatMapAtom(cb: (atom: Atom) => FlatMapResult): Stream { - return this.consume(async function* (it) { - for await (const atom of it) { - // Create the new stream - const atomOrStream = cb(atom); - - if ("atom" in atomOrStream) { - yield atomOrStream.atom; - continue; - } - - const stream = await atomOrStream.stream; + private flatTapAtom( + filter: (atom: Atom) => FilterResult>, + cb: (atom: A) => MaybePromise>, + ): Stream { + this.trace("flatTapAtom"); - // If the returned atom isn't ok, emit it back onto the stream - if (!isOk(stream)) { - yield stream; - continue; - } + return this.flatOp(filter, cb, async function* (atom, stream) { + await exhaust(stream); - // Emit the generator of the new stream - yield* stream.value; - } + yield atom; }); } @@ -101,30 +148,13 @@ export class HigherOrderStream extends StreamTransforms { * * @group Higher Order */ - flatTap(cb: (value: T) => MaybePromise, E>>): Stream { - const trace = this.trace("flatTap"); - - return this.consume(async function* (it) { - for await (const atom of it) { - if (!isOk(atom)) { - yield atom; - continue; - } + flatTap(cb: (value: T) => MaybePromise>): Stream { + this.trace("flatTap"); - const streamAtom = await handler(() => cb(atom.value), trace); - - if (isOk(streamAtom)) { - // Consume the resulting stream, and emit the original atom - for await (const _ of streamAtom.value) { - // eslint-ignore no-empty - } - - yield atom; - } else { - yield streamAtom; - } - } - }); + return this.flatTapAtom( + (atom) => (isOk(atom) ? accept(atom) : reject(atom)), + (atom) => cb(atom.value), + ); } /** @@ -173,9 +203,7 @@ export class HigherOrderStream extends StreamTransforms { replaceWith(cbOrStream: CallbackOrStream): Stream { return this.consume(async function* (it) { // Consume all the items in the stream - for await (const _atom of it) { - // eslint-disable-next-line no-empty - } + await exhaust(it); // Replace with the user stream if (typeof cbOrStream === "function") { diff --git a/src/util.ts b/src/util.ts index 44ea9d3..312997c 100644 --- a/src/util.ts +++ b/src/util.ts @@ -15,3 +15,18 @@ export type Truthy = NonNullable>; * Type that may be a callback that resolves to a stream, or just a stream. */ export type CallbackOrStream = (() => Stream) | Stream; + +/** + * Completely exhausts the provided async iterator. + */ +export async function exhaust(iterable: AsyncIterable) { + const it = iterable[Symbol.asyncIterator](); + + // eslint-disable-next-line no-constant-condition + while (true) { + const result = await it.next(); + if (result.done) { + break; + } + } +} diff --git a/test/index.test.ts b/test/index.test.ts index 5a29708..9f6067b 100644 --- a/test/index.test.ts +++ b/test/index.test.ts @@ -326,26 +326,6 @@ describe.concurrent("higher order streams", () => { ]); }); - test("returning stream or error", async ({ expect }) => { - expect.assertions(1); - - const s = $.from([1, 2, 3]).flatMap((n) => { - if (n === 2) { - return $.error("number two"); - } - - return $.from(new Array(n).fill(n)); - }); - - expect(await s.toArray({ atoms: true })).toEqual([ - $.ok(1), - $.error("number two"), - $.ok(3), - $.ok(3), - $.ok(3), - ]); - }); - test("errors already in stream", async ({ expect }) => { expect.assertions(1);