Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: .flatten() method for streams #12

Merged
merged 5 commits into from
May 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/silent-parrots-exist.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"windpipe": minor
---

Implement .flatten() method on streams
4 changes: 2 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

39 changes: 33 additions & 6 deletions src/stream/higher-order.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import type { Stream } from ".";
import { Stream } from ".";
import { isError, isOk, type Atom, isUnknown } from "../atom";
import { run } from "../handler";
import { type CallbackOrStream, type MaybePromise, exhaust } from "../util";
Expand Down Expand Up @@ -34,7 +34,7 @@ export class HigherOrderStream<T, E> extends StreamTransforms<T, E> {
): Stream<U, F> {
const trace = this.trace("flatOp");

return this.consume(async function* (it) {
return this.consume(async function*(it) {
for await (const atom of it) {
const result = filter(atom);

Expand Down Expand Up @@ -67,7 +67,7 @@ export class HigherOrderStream<T, E> extends StreamTransforms<T, E> {
): Stream<U, F> {
this.trace("flatMapAll");

return this.flatOp(filter, cb, async function* (_atom, stream) {
return this.flatOp(filter, cb, async function*(_atom, stream) {
yield* stream;
});
}
Expand Down Expand Up @@ -125,6 +125,33 @@ export class HigherOrderStream<T, E> extends StreamTransforms<T, E> {
);
}

/**
* Produce a new stream from the stream that has any nested streams flattened
*
* @note Any atoms that are not nested streams are emitted as-is
* @group Higher Order
*/
flatten(): T extends Stream<infer U, E> ? Stream<U, E> : Stream<T, E> {
this.trace("flatten");

return this.consume(async function*(it) {
for await (const atom of it) {
// Yield errors/unkowns directly
if (!isOk(atom)) {
yield atom;
continue;
}

// Yield each atom within nested streams
if (atom.value instanceof Stream) {
yield* atom.value;
} else {
yield atom;
}
}
}) as T extends Stream<infer U, E> ? Stream<U, E> : Stream<T, E>;
}

/**
* Base implementation of the `flatTap` operations.
*/
Expand All @@ -134,7 +161,7 @@ export class HigherOrderStream<T, E> extends StreamTransforms<T, E> {
): Stream<T, E> {
this.trace("flatTapAtom");

return this.flatOp(filter, cb, async function* (atom, stream) {
return this.flatOp(filter, cb, async function*(atom, stream) {
await exhaust(stream);

yield atom;
Expand Down Expand Up @@ -166,7 +193,7 @@ export class HigherOrderStream<T, E> extends StreamTransforms<T, E> {
* @group Higher Order
*/
otherwise(cbOrStream: CallbackOrStream<T, E>): Stream<T, E> {
return this.consume(async function* (it) {
return this.consume(async function*(it) {
// Count the items being emitted from the iterator
let count = 0;
for await (const atom of it) {
Expand Down Expand Up @@ -201,7 +228,7 @@ export class HigherOrderStream<T, E> extends StreamTransforms<T, E> {
* @group Higher Order
*/
replaceWith<U, F>(cbOrStream: CallbackOrStream<U, F>): Stream<U, F> {
return this.consume(async function* (it) {
return this.consume(async function*(it) {
// Consume all the items in the stream
await exhaust(it);

Expand Down
74 changes: 74 additions & 0 deletions test/consumption.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
import { describe, test } from "vitest";
import $ from "../src";

describe.concurrent("stream consumption", () => {
describe.concurrent("to array", () => {
test("values", async ({ expect }) => {
expect.assertions(1);

const array = await $.from([1, 2, 3]).toArray();

expect(array).toEqual([1, 2, 3]);
});

test("values with errors on stream", async ({ expect }) => {
expect.assertions(1);

const array = await $.from([
1,
$.error("known"),
2,
3,
$.unknown("$.error", []),
]).toArray();

expect(array).toEqual([1, 2, 3]);
});

test("values with no items on stream", async ({ expect }) => {
expect.assertions(1);

const array = await $.from([]).toArray();

expect(array).toEqual([]);
});

test("atoms", async ({ expect }) => {
expect.assertions(1);

const array = await $.from([1, 2, 3]).toArray({ atoms: true });

expect(array).toEqual([$.ok(1), $.ok(2), $.ok(3)]);
});

test("atoms with errors on stream", async ({ expect }) => {
expect.assertions(1);

const array = await $.from([
1,
$.error("known"),
2,
3,
$.unknown("$.error", []),
]).toArray({
atoms: true,
});

expect(array).toEqual([
$.ok(1),
$.error("known"),
$.ok(2),
$.ok(3),
$.unknown("$.error", []),
]);
});

test("atoms with no items on stream", async ({ expect }) => {
expect.assertions(1);

const array = await $.from([]).toArray({ atoms: true });

expect(array).toEqual([]);
});
});
});
63 changes: 63 additions & 0 deletions test/creation.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import { describe, test } from "vitest";
import $ from "../src";
import { Readable } from "stream";

describe.concurrent("stream creation", () => {
describe.concurrent("from promise", () => {
test("resolving promise to emit value", async ({ expect }) => {
expect.assertions(1);

const s = $.fromPromise(Promise.resolve(10));

expect(await s.toArray({ atoms: true })).toEqual([$.ok(10)]);
});
});

describe.concurrent("from iterator", () => {
test("multi-value generator", async ({ expect }) => {
expect.assertions(1);

const s = $.fromIterator(
(function* () {
yield 1;
yield 2;
yield 3;
})(),
);

expect(await s.toArray({ atoms: true })).toEqual([$.ok(1), $.ok(2), $.ok(3)]);
});

test("multi-value async generator", async ({ expect }) => {
expect.assertions(1);

const s = $.fromIterator(
(async function* () {
yield 1;
yield 2;
yield 3;
})(),
);

expect(await s.toArray({ atoms: true })).toEqual([$.ok(1), $.ok(2), $.ok(3)]);
});
});

describe.concurrent("from iterable", () => {
test("array iterable", async ({ expect }) => {
expect.assertions(1);

const s = $.fromIterable([1, 2, 3]);

expect(await s.toArray({ atoms: true })).toEqual([$.ok(1), $.ok(2), $.ok(3)]);
});

test("readable stream", async ({ expect }) => {
expect.assertions(1);

const s = $.fromIterable(Readable.from([1, 2, 3]));

expect(await s.toArray({ atoms: true })).toEqual([$.ok(1), $.ok(2), $.ok(3)]);
});
});
});
93 changes: 93 additions & 0 deletions test/errors.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
import { describe, test } from "vitest";
import $ from "../src";

describe.concurrent("error handling", () => {
test("throw in map", async ({ expect }) => {
expect.assertions(1);

const s = $.from([1, 2, 3]).map((n) => {
if (n === 2) {
// Unhandled error
throw new Error("bad number");
} else {
return n;
}
});

expect(await s.toArray({ atoms: true })).toEqual([
$.ok(1),
$.unknown(new Error("bad number"), ["map"]),
$.ok(3),
]);
});

test("promise rejection in map", async ({ expect }) => {
expect.assertions(1);

async function process(n: number) {
if (n === 2) {
throw new Error("bad number");
} else {
return n;
}
}

const s = $.from([1, 2, 3]).map(process);

expect(await s.toArray({ atoms: true })).toEqual([
$.ok(1),
$.unknown(new Error("bad number"), ["map"]),
$.ok(3),
]);
});

test("track multiple transforms", async ({ expect }) => {
expect.assertions(1);

const s = $.from([1, 2, 3, 4, 5])
.map((n) => {
if (n === 2) {
// Unhandled error
throw new Error("bad number");
} else {
return n;
}
})
.filter((n) => n % 2 === 0);

expect(await s.toArray({ atoms: true })).toEqual([
$.unknown(new Error("bad number"), ["map"]),
$.ok(4),
]);
});

test("error thrown in later transform", async ({ expect }) => {
expect.assertions(1);

const s = $.from([1, 2, 3, 4, 5])
.filter((n) => n > 1)
.map((n) => {
if (n % 2 === 1) {
return n * 10;
} else {
return n;
}
})
.map((n) => {
if (n === 2) {
// Unhandled error
throw new Error("bad number");
} else {
return n;
}
})
.filter((n) => n % 2 === 0);

expect(await s.toArray({ atoms: true })).toEqual([
$.unknown(new Error("bad number"), ["filter", "map", "map"]),
$.ok(30),
$.ok(4),
$.ok(50),
]);
});
});
Loading
Loading