Skip to content

Commit

Permalink
feat: .flatten() method for streams
Browse files Browse the repository at this point in the history
Merge pull request #12 from clear/feat/stream-flatten
  • Loading branch information
giraugh authored May 3, 2024
2 parents 80dda39 + 9ac3261 commit 73e5e4f
Show file tree
Hide file tree
Showing 9 changed files with 603 additions and 517 deletions.
5 changes: 5 additions & 0 deletions .changeset/silent-parrots-exist.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"windpipe": minor
---

Implement .flatten() method on streams
4 changes: 2 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

39 changes: 33 additions & 6 deletions src/stream/higher-order.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import type { Stream } from ".";
import { Stream } from ".";
import { isError, isOk, type Atom, isUnknown } from "../atom";
import { run } from "../handler";
import { type CallbackOrStream, type MaybePromise, exhaust } from "../util";
Expand Down Expand Up @@ -34,7 +34,7 @@ export class HigherOrderStream<T, E> extends StreamTransforms<T, E> {
): Stream<U, F> {
const trace = this.trace("flatOp");

return this.consume(async function* (it) {
return this.consume(async function*(it) {
for await (const atom of it) {
const result = filter(atom);

Expand Down Expand Up @@ -67,7 +67,7 @@ export class HigherOrderStream<T, E> extends StreamTransforms<T, E> {
): Stream<U, F> {
this.trace("flatMapAll");

return this.flatOp(filter, cb, async function* (_atom, stream) {
return this.flatOp(filter, cb, async function*(_atom, stream) {
yield* stream;
});
}
Expand Down Expand Up @@ -125,6 +125,33 @@ export class HigherOrderStream<T, E> extends StreamTransforms<T, E> {
);
}

/**
* Produce a new stream from the stream that has any nested streams flattened
*
* @note Any atoms that are not nested streams are emitted as-is
* @group Higher Order
*/
flatten(): T extends Stream<infer U, E> ? Stream<U, E> : Stream<T, E> {
this.trace("flatten");

return this.consume(async function*(it) {
for await (const atom of it) {
// Yield errors/unkowns directly
if (!isOk(atom)) {
yield atom;
continue;
}

// Yield each atom within nested streams
if (atom.value instanceof Stream) {
yield* atom.value;
} else {
yield atom;
}
}
}) as T extends Stream<infer U, E> ? Stream<U, E> : Stream<T, E>;
}

/**
* Base implementation of the `flatTap` operations.
*/
Expand All @@ -134,7 +161,7 @@ export class HigherOrderStream<T, E> extends StreamTransforms<T, E> {
): Stream<T, E> {
this.trace("flatTapAtom");

return this.flatOp(filter, cb, async function* (atom, stream) {
return this.flatOp(filter, cb, async function*(atom, stream) {
await exhaust(stream);

yield atom;
Expand Down Expand Up @@ -166,7 +193,7 @@ export class HigherOrderStream<T, E> extends StreamTransforms<T, E> {
* @group Higher Order
*/
otherwise(cbOrStream: CallbackOrStream<T, E>): Stream<T, E> {
return this.consume(async function* (it) {
return this.consume(async function*(it) {
// Count the items being emitted from the iterator
let count = 0;
for await (const atom of it) {
Expand Down Expand Up @@ -201,7 +228,7 @@ export class HigherOrderStream<T, E> extends StreamTransforms<T, E> {
* @group Higher Order
*/
replaceWith<U, F>(cbOrStream: CallbackOrStream<U, F>): Stream<U, F> {
return this.consume(async function* (it) {
return this.consume(async function*(it) {
// Consume all the items in the stream
await exhaust(it);

Expand Down
74 changes: 74 additions & 0 deletions test/consumption.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
import { describe, test } from "vitest";
import $ from "../src";

describe.concurrent("stream consumption", () => {
describe.concurrent("to array", () => {
test("values", async ({ expect }) => {
expect.assertions(1);

const array = await $.from([1, 2, 3]).toArray();

expect(array).toEqual([1, 2, 3]);
});

test("values with errors on stream", async ({ expect }) => {
expect.assertions(1);

const array = await $.from([
1,
$.error("known"),
2,
3,
$.unknown("$.error", []),
]).toArray();

expect(array).toEqual([1, 2, 3]);
});

test("values with no items on stream", async ({ expect }) => {
expect.assertions(1);

const array = await $.from([]).toArray();

expect(array).toEqual([]);
});

test("atoms", async ({ expect }) => {
expect.assertions(1);

const array = await $.from([1, 2, 3]).toArray({ atoms: true });

expect(array).toEqual([$.ok(1), $.ok(2), $.ok(3)]);
});

test("atoms with errors on stream", async ({ expect }) => {
expect.assertions(1);

const array = await $.from([
1,
$.error("known"),
2,
3,
$.unknown("$.error", []),
]).toArray({
atoms: true,
});

expect(array).toEqual([
$.ok(1),
$.error("known"),
$.ok(2),
$.ok(3),
$.unknown("$.error", []),
]);
});

test("atoms with no items on stream", async ({ expect }) => {
expect.assertions(1);

const array = await $.from([]).toArray({ atoms: true });

expect(array).toEqual([]);
});
});
});
63 changes: 63 additions & 0 deletions test/creation.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import { describe, test } from "vitest";
import $ from "../src";
import { Readable } from "stream";

describe.concurrent("stream creation", () => {
describe.concurrent("from promise", () => {
test("resolving promise to emit value", async ({ expect }) => {
expect.assertions(1);

const s = $.fromPromise(Promise.resolve(10));

expect(await s.toArray({ atoms: true })).toEqual([$.ok(10)]);
});
});

describe.concurrent("from iterator", () => {
test("multi-value generator", async ({ expect }) => {
expect.assertions(1);

const s = $.fromIterator(
(function* () {
yield 1;
yield 2;
yield 3;
})(),
);

expect(await s.toArray({ atoms: true })).toEqual([$.ok(1), $.ok(2), $.ok(3)]);
});

test("multi-value async generator", async ({ expect }) => {
expect.assertions(1);

const s = $.fromIterator(
(async function* () {
yield 1;
yield 2;
yield 3;
})(),
);

expect(await s.toArray({ atoms: true })).toEqual([$.ok(1), $.ok(2), $.ok(3)]);
});
});

describe.concurrent("from iterable", () => {
test("array iterable", async ({ expect }) => {
expect.assertions(1);

const s = $.fromIterable([1, 2, 3]);

expect(await s.toArray({ atoms: true })).toEqual([$.ok(1), $.ok(2), $.ok(3)]);
});

test("readable stream", async ({ expect }) => {
expect.assertions(1);

const s = $.fromIterable(Readable.from([1, 2, 3]));

expect(await s.toArray({ atoms: true })).toEqual([$.ok(1), $.ok(2), $.ok(3)]);
});
});
});
93 changes: 93 additions & 0 deletions test/errors.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
import { describe, test } from "vitest";
import $ from "../src";

describe.concurrent("error handling", () => {
test("throw in map", async ({ expect }) => {
expect.assertions(1);

const s = $.from([1, 2, 3]).map((n) => {
if (n === 2) {
// Unhandled error
throw new Error("bad number");
} else {
return n;
}
});

expect(await s.toArray({ atoms: true })).toEqual([
$.ok(1),
$.unknown(new Error("bad number"), ["map"]),
$.ok(3),
]);
});

test("promise rejection in map", async ({ expect }) => {
expect.assertions(1);

async function process(n: number) {
if (n === 2) {
throw new Error("bad number");
} else {
return n;
}
}

const s = $.from([1, 2, 3]).map(process);

expect(await s.toArray({ atoms: true })).toEqual([
$.ok(1),
$.unknown(new Error("bad number"), ["map"]),
$.ok(3),
]);
});

test("track multiple transforms", async ({ expect }) => {
expect.assertions(1);

const s = $.from([1, 2, 3, 4, 5])
.map((n) => {
if (n === 2) {
// Unhandled error
throw new Error("bad number");
} else {
return n;
}
})
.filter((n) => n % 2 === 0);

expect(await s.toArray({ atoms: true })).toEqual([
$.unknown(new Error("bad number"), ["map"]),
$.ok(4),
]);
});

test("error thrown in later transform", async ({ expect }) => {
expect.assertions(1);

const s = $.from([1, 2, 3, 4, 5])
.filter((n) => n > 1)
.map((n) => {
if (n % 2 === 1) {
return n * 10;
} else {
return n;
}
})
.map((n) => {
if (n === 2) {
// Unhandled error
throw new Error("bad number");
} else {
return n;
}
})
.filter((n) => n % 2 === 0);

expect(await s.toArray({ atoms: true })).toEqual([
$.unknown(new Error("bad number"), ["filter", "map", "map"]),
$.ok(30),
$.ok(4),
$.ok(50),
]);
});
});
Loading

0 comments on commit 73e5e4f

Please sign in to comment.