Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

improve ergonomics for flat* operators #10

Merged
merged 2 commits into from
Apr 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/nice-chefs-try.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"windpipe": patch
---

add `exhaust` stream consumer
5 changes: 5 additions & 0 deletions .changeset/red-glasses-suffer.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"windpipe": minor
---

alter `flat*` APIs to simplify handlers
40 changes: 29 additions & 11 deletions src/handler.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
import type { Atom, MaybeAtom } from "./atom";
import * as atom from "./atom";
import {
normalise,
ok,
unknown,
isUnknown,
type Atom,
type AtomOk,
type AtomUnknown,
type MaybeAtom,
} from "./atom";
import type { MaybePromise } from "./util";

/**
Expand All @@ -24,17 +32,27 @@ export async function handler<T, E>(
handler: () => MaybePromise<MaybeAtom<T, E>>,
trace: string[],
): Promise<Atom<T, E>> {
try {
// Run the handler
const rawResult = handler();
const result = await run(handler, trace);

if (isUnknown(result)) {
return result;
}

// Normalise the returned promise
const result = await normalisePromise(rawResult);
return normalise(result.value);
}

// Normalise as an atom and return
return atom.normalise(result);
/**
* Run some callback. If it completes successfully, the value will be returned as `AtomOk`. If an
* error is thrown, it will be caught and returned as an `AtomUnknown`. `AtomError` will never be
* produced from this helper.
*/
export async function run<T>(
cb: () => MaybePromise<T>,
trace: string[],
): Promise<AtomOk<T> | AtomUnknown> {
try {
return ok(await normalisePromise(cb())) as AtomOk<T>;
} catch (e) {
// Unknown error thrown, return it
return atom.unknown(e, trace);
return unknown(e, trace) as AtomUnknown;
}
}
9 changes: 9 additions & 0 deletions src/stream/consumption.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { Readable } from "node:stream";
import { isOk, type Atom } from "../atom";
import { StreamBase } from "./base";
import { exhaust } from "../util";

export class StreamConsumption<T, E> extends StreamBase {
/**
Expand All @@ -12,6 +13,14 @@ export class StreamConsumption<T, E> extends StreamBase {
return this.stream[Symbol.asyncIterator]();
}

/**
* Completely exhaust the stream, driving it to completion. This is particularly useful when
* side effects of the stream are desired, but the actual values of the stream are not needed.
*/
exhaust(): Promise<void> {
return exhaust(this);
}

/**
* Create an async iterator that will emit each value in the stream.
*
Expand Down
192 changes: 110 additions & 82 deletions src/stream/higher-order.ts
Original file line number Diff line number Diff line change
@@ -1,46 +1,109 @@
import type { Stream } from ".";
import { isError, isOk, type Atom, type MaybeAtom, isUnknown } from "../atom";
import { handler } from "../handler";
import type { CallbackOrStream, MaybePromise } from "../util";
import { isError, isOk, type Atom, isUnknown } from "../atom";
import { run } from "../handler";
import { type CallbackOrStream, type MaybePromise, exhaust } from "../util";
import { StreamTransforms } from "./transforms";

type FlatMapResult<T, E> = { atom: Atom<T, E> } | { stream: Promise<Atom<Stream<T, E>, E>> };
function accept<T>(value: T): { accept: T } {
return { accept: value };
}

function reject<T>(value: T): { reject: T } {
return { reject: value };
}

type FilterResult<A, R> = { accept: A } | { reject: R };

export class HigherOrderStream<T, E> extends StreamTransforms<T, E> {
/**
* Map over each value in the stream, produce a stream from it, and flatten all the value
* streams together
* Base implementation of `flat*` operations. In general, all of these methods will filter over
* the type of atom, run some stream-producing callback with it, and then produce a new
* generator to expand into the stream.
*
* @group Higher Order
* @template A - The accepted type from the filter. Allows for type narrowing from `Atom<T, E>`
* into `AtomOk<T>`, etc.
* @template U - The `ok` type of the produced stream.
* @template F - The `error` type of the produced stream.
* @template CT - The `ok` type of the stream produced from the callback.
* @template CE - The `error` type of the stream produced from the callback.
*/
flatMap<U>(cb: (value: T) => MaybePromise<MaybeAtom<Stream<U, E>, E>>): Stream<U, E> {
const trace = this.trace("flatMap");
private flatOp<A, U, F, CT, CE>(
filter: (atom: Atom<T, E>) => FilterResult<A, Atom<U, F>>,
cb: (atom: A) => MaybePromise<Stream<CT, CE>>,
process: (atom: Atom<T, E>, stream: Stream<CT, CE>) => AsyncGenerator<Atom<U, F>>,
): Stream<U, F> {
const trace = this.trace("flatOp");

return this.flatMapAtom((atom) => {
if (isOk(atom)) {
return { stream: handler(() => cb(atom.value), trace) };
} else {
return { atom };
return this.consume(async function* (it) {
for await (const atom of it) {
const result = filter(atom);

if ("reject" in result) {
yield result.reject;
continue;
}

// Run the flat map handler
const streamAtom = await run(() => cb(result.accept), trace);

// If an error was emitted whilst initialising the new stream, return it
if (!isOk(streamAtom)) {
yield streamAtom;
continue;
}

// Otherwise, consume the iterator
yield* process(atom, streamAtom.value);
}
});
}

/**
* Internal helper for implementing the other `flatMap` methods.
*/
private flatMapAtom<A, U, F>(
filter: (atom: Atom<T, E>) => FilterResult<A, Atom<U, F>>,
cb: (atom: A) => MaybePromise<Stream<U, F>>,
): Stream<U, F> {
this.trace("flatMapAll");

return this.flatOp(filter, cb, async function* (_atom, stream) {
yield* stream;
});
}

/**
* Map over each value in the stream, produce a stream from it, and flatten all the value
* streams together
*
* @group Higher Order
*/
flatMap<U>(cb: (value: T) => MaybePromise<Stream<U, E>>): Stream<U, E> {
this.trace("flatMap");

return this.flatMapAtom(
(atom) => (isOk(atom) ? accept(atom) : reject(atom)),
(atom) => {
return cb(atom.value);
},
);
}

/**
* Map over each error in the stream, produce a stream from it, and flatten all the value
* streams together.
*
* @group Higher Order
*/
flatMapError<F>(cb: (value: E) => MaybePromise<MaybeAtom<Stream<T, F>, F>>): Stream<T, F> {
const trace = this.trace("flatMapError");

return this.flatMapAtom((atom) => {
if (isError(atom)) {
return { stream: handler(() => cb(atom.value), trace) };
} else {
return { atom };
}
});
flatMapError<F>(cb: (value: E) => MaybePromise<Stream<T, F>>): Stream<T, F> {
this.trace("flatMapError");

return this.flatMapAtom(
(atom) => (isError(atom) ? accept(atom) : reject(atom)),
(atom) => {
return cb(atom.value);
},
);
}

/**
Expand All @@ -50,47 +113,31 @@ export class HigherOrderStream<T, E> extends StreamTransforms<T, E> {
* @group Higher Order
*/
flatMapUnknown(
cb: (value: unknown, trace: string[]) => MaybePromise<MaybeAtom<Stream<T, E>, E>>,
cb: (value: unknown, trace: string[]) => MaybePromise<Stream<T, E>>,
): Stream<T, E> {
const trace = this.trace("flatMapUnknown");

return this.flatMapAtom((atom) => {
if (isUnknown(atom)) {
return { stream: handler(() => cb(atom.value, atom.trace), trace) };
} else {
return { atom };
}
});
return this.flatMapAtom(
(atom) => (isUnknown(atom) ? accept(atom) : reject(atom)),
(atom) => {
return cb(atom.value, trace);
},
);
}

/**
* Internal helper for implementing the other `flatMap` methods.
*
* The provided callback *must* not be able to throw. It is expected that any user code run
* within is properly handled.
* Base implementation of the `flatTap` operations.
*/
private flatMapAtom<U, F>(cb: (atom: Atom<T, E>) => FlatMapResult<U, F>): Stream<U, F> {
return this.consume(async function* (it) {
for await (const atom of it) {
// Create the new stream
const atomOrStream = cb(atom);

if ("atom" in atomOrStream) {
yield atomOrStream.atom;
continue;
}

const stream = await atomOrStream.stream;
private flatTapAtom<A>(
filter: (atom: Atom<T, E>) => FilterResult<A, Atom<T, E>>,
cb: (atom: A) => MaybePromise<Stream<unknown, unknown>>,
): Stream<T, E> {
this.trace("flatTapAtom");

// If the returned atom isn't ok, emit it back onto the stream
if (!isOk(stream)) {
yield stream;
continue;
}
return this.flatOp(filter, cb, async function* (atom, stream) {
await exhaust(stream);

// Emit the generator of the new stream
yield* stream.value;
}
yield atom;
});
}

Expand All @@ -101,30 +148,13 @@ export class HigherOrderStream<T, E> extends StreamTransforms<T, E> {
*
* @group Higher Order
*/
flatTap(cb: (value: T) => MaybePromise<MaybeAtom<Stream<unknown, unknown>, E>>): Stream<T, E> {
const trace = this.trace("flatTap");

return this.consume(async function* (it) {
for await (const atom of it) {
if (!isOk(atom)) {
yield atom;
continue;
}
flatTap(cb: (value: T) => MaybePromise<Stream<unknown, unknown>>): Stream<T, E> {
this.trace("flatTap");

const streamAtom = await handler(() => cb(atom.value), trace);

if (isOk(streamAtom)) {
// Consume the resulting stream, and emit the original atom
for await (const _ of streamAtom.value) {
// eslint-ignore no-empty
}

yield atom;
} else {
yield streamAtom;
}
}
});
return this.flatTapAtom(
(atom) => (isOk(atom) ? accept(atom) : reject(atom)),
(atom) => cb(atom.value),
);
}

/**
Expand Down Expand Up @@ -173,9 +203,7 @@ export class HigherOrderStream<T, E> extends StreamTransforms<T, E> {
replaceWith<U, F>(cbOrStream: CallbackOrStream<U, F>): Stream<U, F> {
return this.consume(async function* (it) {
// Consume all the items in the stream
for await (const _atom of it) {
// eslint-disable-next-line no-empty
}
await exhaust(it);

// Replace with the user stream
if (typeof cbOrStream === "function") {
Expand Down
15 changes: 15 additions & 0 deletions src/util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,18 @@ export type Truthy<T> = NonNullable<Exclude<T, false | "">>;
* Type that may be a callback that resolves to a stream, or just a stream.
*/
export type CallbackOrStream<T, E> = (() => Stream<T, E>) | Stream<T, E>;

/**
* Completely exhausts the provided async iterator.
*/
export async function exhaust(iterable: AsyncIterable<unknown>) {
const it = iterable[Symbol.asyncIterator]();

// eslint-disable-next-line no-constant-condition
while (true) {
const result = await it.next();
if (result.done) {
break;
}
}
}
20 changes: 0 additions & 20 deletions test/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -326,26 +326,6 @@ describe.concurrent("higher order streams", () => {
]);
});

test("returning stream or error", async ({ expect }) => {
expect.assertions(1);

const s = $.from([1, 2, 3]).flatMap<number>((n) => {
if (n === 2) {
return $.error("number two");
}

return $.from(new Array(n).fill(n));
});

expect(await s.toArray({ atoms: true })).toEqual([
$.ok(1),
$.error("number two"),
$.ok(3),
$.ok(3),
$.ok(3),
]);
});

test("errors already in stream", async ({ expect }) => {
expect.assertions(1);

Expand Down
Loading