Skip to content

Commit

Permalink
Merge pull request #10 from clear/feat/simplify-flat-handlers
Browse files Browse the repository at this point in the history
improve ergonomics for `flat*` operators
  • Loading branch information
andogq authored Apr 17, 2024
2 parents 4810d87 + e9ea819 commit ec0db46
Show file tree
Hide file tree
Showing 7 changed files with 173 additions and 113 deletions.
5 changes: 5 additions & 0 deletions .changeset/nice-chefs-try.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"windpipe": patch
---

add `exhaust` stream consumer
5 changes: 5 additions & 0 deletions .changeset/red-glasses-suffer.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"windpipe": minor
---

alter `flat*` APIs to simplify handlers
40 changes: 29 additions & 11 deletions src/handler.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
import type { Atom, MaybeAtom } from "./atom";
import * as atom from "./atom";
import {
normalise,
ok,
unknown,
isUnknown,
type Atom,
type AtomOk,
type AtomUnknown,
type MaybeAtom,
} from "./atom";
import type { MaybePromise } from "./util";

/**
Expand All @@ -24,17 +32,27 @@ export async function handler<T, E>(
handler: () => MaybePromise<MaybeAtom<T, E>>,
trace: string[],
): Promise<Atom<T, E>> {
try {
// Run the handler
const rawResult = handler();
const result = await run(handler, trace);

if (isUnknown(result)) {
return result;
}

// Normalise the returned promise
const result = await normalisePromise(rawResult);
return normalise(result.value);
}

// Normalise as an atom and return
return atom.normalise(result);
/**
* Run some callback. If it completes successfully, the value will be returned as `AtomOk`. If an
* error is thrown, it will be caught and returned as an `AtomUnknown`. `AtomError` will never be
* produced from this helper.
*/
export async function run<T>(
cb: () => MaybePromise<T>,
trace: string[],
): Promise<AtomOk<T> | AtomUnknown> {
try {
return ok(await normalisePromise(cb())) as AtomOk<T>;
} catch (e) {
// Unknown error thrown, return it
return atom.unknown(e, trace);
return unknown(e, trace) as AtomUnknown;
}
}
9 changes: 9 additions & 0 deletions src/stream/consumption.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { Readable } from "node:stream";
import { isOk, type Atom } from "../atom";
import { StreamBase } from "./base";
import { exhaust } from "../util";

export class StreamConsumption<T, E> extends StreamBase {
/**
Expand All @@ -12,6 +13,14 @@ export class StreamConsumption<T, E> extends StreamBase {
return this.stream[Symbol.asyncIterator]();
}

/**
* Completely exhaust the stream, driving it to completion. This is particularly useful when
* side effects of the stream are desired, but the actual values of the stream are not needed.
*/
exhaust(): Promise<void> {
return exhaust(this);
}

/**
* Create an async iterator that will emit each value in the stream.
*
Expand Down
192 changes: 110 additions & 82 deletions src/stream/higher-order.ts
Original file line number Diff line number Diff line change
@@ -1,46 +1,109 @@
import type { Stream } from ".";
import { isError, isOk, type Atom, type MaybeAtom, isUnknown } from "../atom";
import { handler } from "../handler";
import type { CallbackOrStream, MaybePromise } from "../util";
import { isError, isOk, type Atom, isUnknown } from "../atom";
import { run } from "../handler";
import { type CallbackOrStream, type MaybePromise, exhaust } from "../util";
import { StreamTransforms } from "./transforms";

type FlatMapResult<T, E> = { atom: Atom<T, E> } | { stream: Promise<Atom<Stream<T, E>, E>> };
function accept<T>(value: T): { accept: T } {
return { accept: value };
}

function reject<T>(value: T): { reject: T } {
return { reject: value };
}

type FilterResult<A, R> = { accept: A } | { reject: R };

export class HigherOrderStream<T, E> extends StreamTransforms<T, E> {
/**
* Map over each value in the stream, produce a stream from it, and flatten all the value
* streams together
* Base implementation of `flat*` operations. In general, all of these methods will filter over
* the type of atom, run some stream-producing callback with it, and then produce a new
* generator to expand into the stream.
*
* @group Higher Order
* @template A - The accepted type from the filter. Allows for type narrowing from `Atom<T, E>`
* into `AtomOk<T>`, etc.
* @template U - The `ok` type of the produced stream.
* @template F - The `error` type of the produced stream.
* @template CT - The `ok` type of the stream produced from the callback.
* @template CE - The `error` type of the stream produced from the callback.
*/
flatMap<U>(cb: (value: T) => MaybePromise<MaybeAtom<Stream<U, E>, E>>): Stream<U, E> {
const trace = this.trace("flatMap");
private flatOp<A, U, F, CT, CE>(
filter: (atom: Atom<T, E>) => FilterResult<A, Atom<U, F>>,
cb: (atom: A) => MaybePromise<Stream<CT, CE>>,
process: (atom: Atom<T, E>, stream: Stream<CT, CE>) => AsyncGenerator<Atom<U, F>>,
): Stream<U, F> {
const trace = this.trace("flatOp");

return this.flatMapAtom((atom) => {
if (isOk(atom)) {
return { stream: handler(() => cb(atom.value), trace) };
} else {
return { atom };
return this.consume(async function* (it) {
for await (const atom of it) {
const result = filter(atom);

if ("reject" in result) {
yield result.reject;
continue;
}

// Run the flat map handler
const streamAtom = await run(() => cb(result.accept), trace);

// If an error was emitted whilst initialising the new stream, return it
if (!isOk(streamAtom)) {
yield streamAtom;
continue;
}

// Otherwise, consume the iterator
yield* process(atom, streamAtom.value);
}
});
}

/**
* Internal helper for implementing the other `flatMap` methods.
*/
private flatMapAtom<A, U, F>(
filter: (atom: Atom<T, E>) => FilterResult<A, Atom<U, F>>,
cb: (atom: A) => MaybePromise<Stream<U, F>>,
): Stream<U, F> {
this.trace("flatMapAll");

return this.flatOp(filter, cb, async function* (_atom, stream) {
yield* stream;
});
}

/**
* Map over each value in the stream, produce a stream from it, and flatten all the value
* streams together
*
* @group Higher Order
*/
flatMap<U>(cb: (value: T) => MaybePromise<Stream<U, E>>): Stream<U, E> {
this.trace("flatMap");

return this.flatMapAtom(
(atom) => (isOk(atom) ? accept(atom) : reject(atom)),
(atom) => {
return cb(atom.value);
},
);
}

/**
* Map over each error in the stream, produce a stream from it, and flatten all the value
* streams together.
*
* @group Higher Order
*/
flatMapError<F>(cb: (value: E) => MaybePromise<MaybeAtom<Stream<T, F>, F>>): Stream<T, F> {
const trace = this.trace("flatMapError");

return this.flatMapAtom((atom) => {
if (isError(atom)) {
return { stream: handler(() => cb(atom.value), trace) };
} else {
return { atom };
}
});
flatMapError<F>(cb: (value: E) => MaybePromise<Stream<T, F>>): Stream<T, F> {
this.trace("flatMapError");

return this.flatMapAtom(
(atom) => (isError(atom) ? accept(atom) : reject(atom)),
(atom) => {
return cb(atom.value);
},
);
}

/**
Expand All @@ -50,47 +113,31 @@ export class HigherOrderStream<T, E> extends StreamTransforms<T, E> {
* @group Higher Order
*/
flatMapUnknown(
cb: (value: unknown, trace: string[]) => MaybePromise<MaybeAtom<Stream<T, E>, E>>,
cb: (value: unknown, trace: string[]) => MaybePromise<Stream<T, E>>,
): Stream<T, E> {
const trace = this.trace("flatMapUnknown");

return this.flatMapAtom((atom) => {
if (isUnknown(atom)) {
return { stream: handler(() => cb(atom.value, atom.trace), trace) };
} else {
return { atom };
}
});
return this.flatMapAtom(
(atom) => (isUnknown(atom) ? accept(atom) : reject(atom)),
(atom) => {
return cb(atom.value, trace);
},
);
}

/**
* Internal helper for implementing the other `flatMap` methods.
*
* The provided callback *must* not be able to throw. It is expected that any user code run
* within is properly handled.
* Base implementation of the `flatTap` operations.
*/
private flatMapAtom<U, F>(cb: (atom: Atom<T, E>) => FlatMapResult<U, F>): Stream<U, F> {
return this.consume(async function* (it) {
for await (const atom of it) {
// Create the new stream
const atomOrStream = cb(atom);

if ("atom" in atomOrStream) {
yield atomOrStream.atom;
continue;
}

const stream = await atomOrStream.stream;
private flatTapAtom<A>(
filter: (atom: Atom<T, E>) => FilterResult<A, Atom<T, E>>,
cb: (atom: A) => MaybePromise<Stream<unknown, unknown>>,
): Stream<T, E> {
this.trace("flatTapAtom");

// If the returned atom isn't ok, emit it back onto the stream
if (!isOk(stream)) {
yield stream;
continue;
}
return this.flatOp(filter, cb, async function* (atom, stream) {
await exhaust(stream);

// Emit the generator of the new stream
yield* stream.value;
}
yield atom;
});
}

Expand All @@ -101,30 +148,13 @@ export class HigherOrderStream<T, E> extends StreamTransforms<T, E> {
*
* @group Higher Order
*/
flatTap(cb: (value: T) => MaybePromise<MaybeAtom<Stream<unknown, unknown>, E>>): Stream<T, E> {
const trace = this.trace("flatTap");

return this.consume(async function* (it) {
for await (const atom of it) {
if (!isOk(atom)) {
yield atom;
continue;
}
flatTap(cb: (value: T) => MaybePromise<Stream<unknown, unknown>>): Stream<T, E> {
this.trace("flatTap");

const streamAtom = await handler(() => cb(atom.value), trace);

if (isOk(streamAtom)) {
// Consume the resulting stream, and emit the original atom
for await (const _ of streamAtom.value) {
// eslint-ignore no-empty
}

yield atom;
} else {
yield streamAtom;
}
}
});
return this.flatTapAtom(
(atom) => (isOk(atom) ? accept(atom) : reject(atom)),
(atom) => cb(atom.value),
);
}

/**
Expand Down Expand Up @@ -173,9 +203,7 @@ export class HigherOrderStream<T, E> extends StreamTransforms<T, E> {
replaceWith<U, F>(cbOrStream: CallbackOrStream<U, F>): Stream<U, F> {
return this.consume(async function* (it) {
// Consume all the items in the stream
for await (const _atom of it) {
// eslint-disable-next-line no-empty
}
await exhaust(it);

// Replace with the user stream
if (typeof cbOrStream === "function") {
Expand Down
15 changes: 15 additions & 0 deletions src/util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,18 @@ export type Truthy<T> = NonNullable<Exclude<T, false | "">>;
* Type that may be a callback that resolves to a stream, or just a stream.
*/
export type CallbackOrStream<T, E> = (() => Stream<T, E>) | Stream<T, E>;

/**
* Completely exhausts the provided async iterator.
*/
export async function exhaust(iterable: AsyncIterable<unknown>) {
const it = iterable[Symbol.asyncIterator]();

// eslint-disable-next-line no-constant-condition
while (true) {
const result = await it.next();
if (result.done) {
break;
}
}
}
20 changes: 0 additions & 20 deletions test/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -326,26 +326,6 @@ describe.concurrent("higher order streams", () => {
]);
});

test("returning stream or error", async ({ expect }) => {
expect.assertions(1);

const s = $.from([1, 2, 3]).flatMap<number>((n) => {
if (n === 2) {
return $.error("number two");
}

return $.from(new Array(n).fill(n));
});

expect(await s.toArray({ atoms: true })).toEqual([
$.ok(1),
$.error("number two"),
$.ok(3),
$.ok(3),
$.ok(3),
]);
});

test("errors already in stream", async ({ expect }) => {
expect.assertions(1);

Expand Down

0 comments on commit ec0db46

Please sign in to comment.