Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Adds cachedFlatMap operator #20

Merged
merged 15 commits into from
May 7, 2024
Merged
5 changes: 5 additions & 0 deletions .changeset/bright-kangaroos-change.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"windpipe": patch
---

catch unhandled errors in `fromNext` stream creation
5 changes: 5 additions & 0 deletions .changeset/green-suns-boil.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"windpipe": minor
---

Implement `fromCallback` for stream creation
5 changes: 5 additions & 0 deletions .changeset/tame-geese-allow.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"windpipe": minor
---

Adds the `cachedFlatMap` operator
2 changes: 1 addition & 1 deletion src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,6 @@ export type {
export type { MaybePromise, Truthy, CallbackOrStream } from "./util";

// Export the `StreamEnd` type
export type { StreamEnd } from "./stream";
export { StreamEnd } from "./stream";

export default Stream;
41 changes: 35 additions & 6 deletions src/stream/base.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { normalise, type Atom, type MaybeAtom, error, unknown } from "../atom";
import { Stream } from ".";
import { Readable, Writable } from "stream";
import { createNodeCallback } from "../util";

/**
* Marker for the end of a stream.
Expand Down Expand Up @@ -84,6 +85,28 @@ export class StreamBase {
throw new TypeError("expected a promise, (async) iterator, or (async) iterable");
}

/**
* Create a stream from a node-style callback. A node-compatible callback function will be
* passed as the first parameter to the callback of this function.
*
* The first parameter provided to the callback (the `error`) will be emitted as an `Error`
* atom, whilst the second parameter (the `value`) will be emitted as an `Ok` atom.
*
* @example
* $.fromCallback((next) => someAsyncMethod(paramA, paramB, next));
*
* @group Creation
*/
static fromCallback<T, E>(cb: (next: (error: E, value: T) => unknown) => void): Stream<T, E> {
// Set up a next function
const [promise, next] = createNodeCallback<T, E>();

// Run the callback
cb(next);

return StreamBase.fromPromise(promise);
}

/**
* Create a stream from a promise. The promise will be `await`ed, and the resulting value only
* ever emitted once.
Expand Down Expand Up @@ -172,12 +195,18 @@ export class StreamBase {
new Readable({
objectMode: true,
async read() {
const value = await next();

if (value === StreamEnd) {
this.push(null);
} else {
this.push(normalise(value));
try {
const value = await next();

// Promise returned as normal
if (value === StreamEnd) {
this.push(null);
} else {
this.push(normalise(value));
}
} catch (e) {
// Promise was rejected, add as an unknown error
this.push(unknown(e, []));
}
},
}),
Expand Down
48 changes: 48 additions & 0 deletions src/stream/higher-order.ts
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,54 @@ export class HigherOrderStream<T, E> extends StreamTransforms<T, E> {
);
}

/**
* Map over each value in the stream, produce a stream from it, cache the resultant stream
* and flatten all the value streams together
*
* @group Higher Order
*/
cachedFlatMap<U>(
cb: (value: T) => MaybePromise<Stream<U, E>>,
keyFn: (value: T) => string | number | symbol,
): Stream<U, E> {
const trace = this.trace("cachedFlatMap");

return this.consume(async function* (it) {
const cache = new Map<PropertyKey, Atom<U, E>[]>();

for await (const atom of it) {
if (!isOk(atom)) {
yield atom;
continue;
}

const key = keyFn(atom.value);
const cachedValues = cache.get(key);

if (cachedValues !== undefined) {
yield* cachedValues;
continue;
}

// Run the flat map handler
const streamAtom = await run(() => cb(atom.value), trace);

// If an error was emitted whilst initialising the new stream, return it
if (!isOk(streamAtom)) {
yield streamAtom;
continue;
}

// Otherwise, consume the iterator
const values = await streamAtom.value.toArray({ atoms: true });

cache.set(key, values);

yield* values;
}
});
}

/**
* Produce a new stream from the stream that has any nested streams flattened
*
Expand Down
2 changes: 1 addition & 1 deletion src/stream/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import {
} from "../atom";
import { HigherOrderStream } from "./higher-order";

export type { StreamEnd } from "./base";
export { StreamEnd } from "./base";

/**
* @template T - Type of the 'values' on the stream.
Expand Down
33 changes: 32 additions & 1 deletion src/util.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import Stream from ".";
import Stream, { type Atom } from ".";

/**
* Maybe it's a promise. Maybe it's not. Who's to say.
Expand Down Expand Up @@ -30,3 +30,34 @@ export async function exhaust(iterable: AsyncIterable<unknown>) {
}
}
}

/**
* Creates a `next` function and associated promise to promise-ify a node style callback. The
* `next` function must be passed as the callback to a function, and the resulting error or value
* will be emitted from the promise. The promise will always resolve.
*
* The error value of the callback (first parameter) will be emitted as an `Error` atom from the
* promise, whilst the value of the callback (second parameter) will be emitted as an `Ok` atom on
* the promise.
*/
export function createNodeCallback<T, E>(): [Promise<Atom<T, E>>, (error: E, value: T) => void] {
// Resolve function to be hoisted out of the promise
let resolve: (atom: Atom<T, E>) => void;

// Create the prom
const promise = new Promise<Atom<T, E>>((res) => {
resolve = res;
});

// Create the next callback
const next = (err: E, value: T) => {
if (err) {
resolve(Stream.error(err));
} else {
resolve(Stream.ok(value));
}
};

// Return a tuple of the promise and next function
return [promise, next];
}
102 changes: 101 additions & 1 deletion test/creation.test.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { describe, test } from "vitest";
import $ from "../src";
import $, { StreamEnd } from "../src";
import { Readable } from "stream";

describe.concurrent("stream creation", () => {
Expand Down Expand Up @@ -60,4 +60,104 @@ describe.concurrent("stream creation", () => {
expect(await s.toArray({ atoms: true })).toEqual([$.ok(1), $.ok(2), $.ok(3)]);
});
});

describe.concurrent("from callback", () => {
/**
* Sample function that accepts a node-style callback.
*
* @param success - Whether the method should succeed or fail.
* @param cb - Node-style callback to pass error or value to.
*/
function someNodeCallback(
success: boolean,
cb: (error: string | undefined, value?: number) => void,
) {
if (success) {
cb(undefined, 123);
} else {
cb("an error");
}
}

test("value returned from callback", async ({ expect }) => {
expect.assertions(1);

const s = $.fromCallback((next) => {
someNodeCallback(true, next);
});

expect(await s.toArray({ atoms: true })).toEqual([$.ok(123)]);
});

test("error returned from callback", async ({ expect }) => {
expect.assertions(1);

const s = $.fromCallback((next) => {
someNodeCallback(false, next);
});

expect(await s.toArray({ atoms: true })).toEqual([$.error("an error")]);
});
});

describe.concurrent("from next function", () => {
test("simple count up", async ({ expect }) => {
expect.assertions(1);

let i = 0;
const s = $.fromNext(async () => {
if (i < 4) {
return i++;
} else {
return StreamEnd;
}
});

expect(await s.toArray({ atoms: true })).toEqual([$.ok(0), $.ok(1), $.ok(2), $.ok(3)]);
});

test("next atoms produces atoms", async ({ expect }) => {
expect.assertions(1);

const atoms = [$.ok(0), $.error("some error"), $.ok(1), $.unknown("unknown error", [])];
const s = $.fromNext(async () => {
if (atoms.length > 0) {
return atoms.shift();
} else {
return StreamEnd;
}
});

expect(await s.toArray({ atoms: true })).toEqual([
$.ok(0),
$.error("some error"),
$.ok(1),
$.unknown("unknown error", []),
]);
});

test("next catches unhandled errors", async ({ expect }) => {
expect.assertions(1);

let i = 0;
const s = $.fromNext(async () => {
i += 1;

if (i === 1) {
throw "some error";
}

if (i == 2) {
return i;
}

return StreamEnd;
});

expect(await s.toArray({ atoms: true })).toEqual([
$.unknown("some error", []),
$.ok(2),
]);
});
});
});
Loading
Loading