diff --git a/.changeset/silent-parrots-exist.md b/.changeset/silent-parrots-exist.md new file mode 100644 index 0000000..1594df0 --- /dev/null +++ b/.changeset/silent-parrots-exist.md @@ -0,0 +1,5 @@ +--- +"windpipe": minor +--- + +Implement .flatten() method on streams diff --git a/package-lock.json b/package-lock.json index db98915..908784d 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "windpipe", - "version": "0.2.0", + "version": "0.6.0", "lockfileVersion": 3, "requires": true, "packages": { "": { "name": "windpipe", - "version": "0.2.0", + "version": "0.6.0", "license": "ISC", "devDependencies": { "@changesets/cli": "^2.27.1", diff --git a/src/stream/higher-order.ts b/src/stream/higher-order.ts index 00d48c0..758e15e 100644 --- a/src/stream/higher-order.ts +++ b/src/stream/higher-order.ts @@ -1,4 +1,4 @@ -import type { Stream } from "."; +import { Stream } from "."; import { isError, isOk, type Atom, isUnknown } from "../atom"; import { run } from "../handler"; import { type CallbackOrStream, type MaybePromise, exhaust } from "../util"; @@ -34,7 +34,7 @@ export class HigherOrderStream extends StreamTransforms { ): Stream { const trace = this.trace("flatOp"); - return this.consume(async function* (it) { + return this.consume(async function*(it) { for await (const atom of it) { const result = filter(atom); @@ -67,7 +67,7 @@ export class HigherOrderStream extends StreamTransforms { ): Stream { this.trace("flatMapAll"); - return this.flatOp(filter, cb, async function* (_atom, stream) { + return this.flatOp(filter, cb, async function*(_atom, stream) { yield* stream; }); } @@ -125,6 +125,33 @@ export class HigherOrderStream extends StreamTransforms { ); } + /** + * Produce a new stream from the stream that has any nested streams flattened + * + * @note Any atoms that are not nested streams are emitted as-is + * @group Higher Order + */ + flatten(): T extends Stream ? Stream : Stream { + this.trace("flatten"); + + return this.consume(async function*(it) { + for await (const atom of it) { + // Yield errors/unkowns directly + if (!isOk(atom)) { + yield atom; + continue; + } + + // Yield each atom within nested streams + if (atom.value instanceof Stream) { + yield* atom.value; + } else { + yield atom; + } + } + }) as T extends Stream ? Stream : Stream; + } + /** * Base implementation of the `flatTap` operations. */ @@ -134,7 +161,7 @@ export class HigherOrderStream extends StreamTransforms { ): Stream { this.trace("flatTapAtom"); - return this.flatOp(filter, cb, async function* (atom, stream) { + return this.flatOp(filter, cb, async function*(atom, stream) { await exhaust(stream); yield atom; @@ -166,7 +193,7 @@ export class HigherOrderStream extends StreamTransforms { * @group Higher Order */ otherwise(cbOrStream: CallbackOrStream): Stream { - return this.consume(async function* (it) { + return this.consume(async function*(it) { // Count the items being emitted from the iterator let count = 0; for await (const atom of it) { @@ -201,7 +228,7 @@ export class HigherOrderStream extends StreamTransforms { * @group Higher Order */ replaceWith(cbOrStream: CallbackOrStream): Stream { - return this.consume(async function* (it) { + return this.consume(async function*(it) { // Consume all the items in the stream await exhaust(it); diff --git a/test/consumption.test.ts b/test/consumption.test.ts new file mode 100644 index 0000000..eff7125 --- /dev/null +++ b/test/consumption.test.ts @@ -0,0 +1,74 @@ +import { describe, test } from "vitest"; +import $ from "../src"; + +describe.concurrent("stream consumption", () => { + describe.concurrent("to array", () => { + test("values", async ({ expect }) => { + expect.assertions(1); + + const array = await $.from([1, 2, 3]).toArray(); + + expect(array).toEqual([1, 2, 3]); + }); + + test("values with errors on stream", async ({ expect }) => { + expect.assertions(1); + + const array = await $.from([ + 1, + $.error("known"), + 2, + 3, + $.unknown("$.error", []), + ]).toArray(); + + expect(array).toEqual([1, 2, 3]); + }); + + test("values with no items on stream", async ({ expect }) => { + expect.assertions(1); + + const array = await $.from([]).toArray(); + + expect(array).toEqual([]); + }); + + test("atoms", async ({ expect }) => { + expect.assertions(1); + + const array = await $.from([1, 2, 3]).toArray({ atoms: true }); + + expect(array).toEqual([$.ok(1), $.ok(2), $.ok(3)]); + }); + + test("atoms with errors on stream", async ({ expect }) => { + expect.assertions(1); + + const array = await $.from([ + 1, + $.error("known"), + 2, + 3, + $.unknown("$.error", []), + ]).toArray({ + atoms: true, + }); + + expect(array).toEqual([ + $.ok(1), + $.error("known"), + $.ok(2), + $.ok(3), + $.unknown("$.error", []), + ]); + }); + + test("atoms with no items on stream", async ({ expect }) => { + expect.assertions(1); + + const array = await $.from([]).toArray({ atoms: true }); + + expect(array).toEqual([]); + }); + }); +}); diff --git a/test/creation.test.ts b/test/creation.test.ts new file mode 100644 index 0000000..60315f6 --- /dev/null +++ b/test/creation.test.ts @@ -0,0 +1,63 @@ +import { describe, test } from "vitest"; +import $ from "../src"; +import { Readable } from "stream"; + +describe.concurrent("stream creation", () => { + describe.concurrent("from promise", () => { + test("resolving promise to emit value", async ({ expect }) => { + expect.assertions(1); + + const s = $.fromPromise(Promise.resolve(10)); + + expect(await s.toArray({ atoms: true })).toEqual([$.ok(10)]); + }); + }); + + describe.concurrent("from iterator", () => { + test("multi-value generator", async ({ expect }) => { + expect.assertions(1); + + const s = $.fromIterator( + (function* () { + yield 1; + yield 2; + yield 3; + })(), + ); + + expect(await s.toArray({ atoms: true })).toEqual([$.ok(1), $.ok(2), $.ok(3)]); + }); + + test("multi-value async generator", async ({ expect }) => { + expect.assertions(1); + + const s = $.fromIterator( + (async function* () { + yield 1; + yield 2; + yield 3; + })(), + ); + + expect(await s.toArray({ atoms: true })).toEqual([$.ok(1), $.ok(2), $.ok(3)]); + }); + }); + + describe.concurrent("from iterable", () => { + test("array iterable", async ({ expect }) => { + expect.assertions(1); + + const s = $.fromIterable([1, 2, 3]); + + expect(await s.toArray({ atoms: true })).toEqual([$.ok(1), $.ok(2), $.ok(3)]); + }); + + test("readable stream", async ({ expect }) => { + expect.assertions(1); + + const s = $.fromIterable(Readable.from([1, 2, 3])); + + expect(await s.toArray({ atoms: true })).toEqual([$.ok(1), $.ok(2), $.ok(3)]); + }); + }); +}); diff --git a/test/errors.test.ts b/test/errors.test.ts new file mode 100644 index 0000000..1078639 --- /dev/null +++ b/test/errors.test.ts @@ -0,0 +1,93 @@ +import { describe, test } from "vitest"; +import $ from "../src"; + +describe.concurrent("error handling", () => { + test("throw in map", async ({ expect }) => { + expect.assertions(1); + + const s = $.from([1, 2, 3]).map((n) => { + if (n === 2) { + // Unhandled error + throw new Error("bad number"); + } else { + return n; + } + }); + + expect(await s.toArray({ atoms: true })).toEqual([ + $.ok(1), + $.unknown(new Error("bad number"), ["map"]), + $.ok(3), + ]); + }); + + test("promise rejection in map", async ({ expect }) => { + expect.assertions(1); + + async function process(n: number) { + if (n === 2) { + throw new Error("bad number"); + } else { + return n; + } + } + + const s = $.from([1, 2, 3]).map(process); + + expect(await s.toArray({ atoms: true })).toEqual([ + $.ok(1), + $.unknown(new Error("bad number"), ["map"]), + $.ok(3), + ]); + }); + + test("track multiple transforms", async ({ expect }) => { + expect.assertions(1); + + const s = $.from([1, 2, 3, 4, 5]) + .map((n) => { + if (n === 2) { + // Unhandled error + throw new Error("bad number"); + } else { + return n; + } + }) + .filter((n) => n % 2 === 0); + + expect(await s.toArray({ atoms: true })).toEqual([ + $.unknown(new Error("bad number"), ["map"]), + $.ok(4), + ]); + }); + + test("error thrown in later transform", async ({ expect }) => { + expect.assertions(1); + + const s = $.from([1, 2, 3, 4, 5]) + .filter((n) => n > 1) + .map((n) => { + if (n % 2 === 1) { + return n * 10; + } else { + return n; + } + }) + .map((n) => { + if (n === 2) { + // Unhandled error + throw new Error("bad number"); + } else { + return n; + } + }) + .filter((n) => n % 2 === 0); + + expect(await s.toArray({ atoms: true })).toEqual([ + $.unknown(new Error("bad number"), ["filter", "map", "map"]), + $.ok(30), + $.ok(4), + $.ok(50), + ]); + }); +}); diff --git a/test/higher-order.test.ts b/test/higher-order.test.ts new file mode 100644 index 0000000..6957b32 --- /dev/null +++ b/test/higher-order.test.ts @@ -0,0 +1,175 @@ +import { describe, test, vi } from "vitest"; +import $ from "../src"; + +describe.concurrent("higher order streams", () => { + describe.concurrent("flat map", () => { + test("returning stream", async ({ expect }) => { + expect.assertions(1); + + const s = $.from([1, 2, 3]).flatMap((n) => $.from(new Array(n).fill(n))); + + expect(await s.toArray({ atoms: true })).toEqual([ + $.ok(1), + $.ok(2), + $.ok(2), + $.ok(3), + $.ok(3), + $.ok(3), + ]); + }); + + test("errors already in stream", async ({ expect }) => { + expect.assertions(1); + + const s = $.from([ + $.ok(1), + $.error("known error"), + $.ok(2), + $.unknown("bad error", []), + $.ok(3), + ]).flatMap((n) => $.from(new Array(n).fill(n))); + + expect(await s.toArray({ atoms: true })).toEqual([ + $.ok(1), + $.error("known error"), + $.ok(2), + $.ok(2), + $.unknown("bad error", []), + $.ok(3), + $.ok(3), + $.ok(3), + ]); + }); + }); + + describe.concurrent("flat tap", () => { + test("simple stream", async ({ expect }) => { + expect.assertions(3); + + const subCallback = vi.fn(); + const callback = vi.fn().mockImplementation((n) => $.of(n * n).tap(subCallback)); + const s = $.from([1, 2, 3, 4]).flatTap(callback); + + // Ensure that the flat tap doesn't alter the emitted stream items + expect(await s.toArray({ atoms: true })).toEqual([$.ok(1), $.ok(2), $.ok(3), $.ok(4)]); + + // Ensure that the flatTap implementation is called once for each item in the stream + expect(callback).toBeCalledTimes(4); + + // Ensure that the stream returned from flatTap is fully executed + expect(subCallback).toBeCalledTimes(4); + }); + + test("simple stream", async ({ expect }) => { + expect.assertions(2); + + const callback = vi.fn().mockImplementation((n) => $.of(n * n)); + const s = $.from([1, 2, 3, 4]).flatTap(callback); + + expect(await s.toArray({ atoms: true })).toEqual([$.ok(1), $.ok(2), $.ok(3), $.ok(4)]); + expect(callback).toBeCalledTimes(4); + }); + }); + + describe.concurrent("otherwise", () => { + test("empty stream", async ({ expect }) => { + expect.assertions(1); + + const s = $.from([]).otherwise($.from([1, 2])); + + expect(await s.toArray({ atoms: true })).toEqual([$.ok(1), $.ok(2)]); + }); + + test("non-empty stream", async ({ expect }) => { + expect.assertions(1); + + const s = $.from([1]).otherwise($.from([2, 3])); + + expect(await s.toArray({ atoms: true })).toEqual([$.ok(1)]); + }); + + test("empty stream with otherwise function", async ({ expect }) => { + expect.assertions(2); + + const otherwise = vi.fn().mockReturnValue($.from([1, 2])); + + const s = $.from([]).otherwise(otherwise); + + expect(await s.toArray({ atoms: true })).toEqual([$.ok(1), $.ok(2)]); + expect(otherwise).toHaveBeenCalledOnce(); + }); + + test("non-empty stream with otherwise function", async ({ expect }) => { + expect.assertions(2); + + const otherwise = vi.fn().mockReturnValue($.from([2, 3])); + + const s = $.from([1]).otherwise(otherwise); + + expect(await s.toArray({ atoms: true })).toEqual([$.ok(1)]); + expect(otherwise).not.toHaveBeenCalled(); + }); + + test("stream with known error", async ({ expect }) => { + expect.assertions(1); + + const s = $.from([$.error("some error")]).otherwise($.from([1])); + + expect(await s.toArray({ atoms: true })).toEqual([$.error("some error")]); + }); + + test("stream with unknown error", async ({ expect }) => { + expect.assertions(1); + + const s = $.from([$.unknown("some error", [])]).otherwise($.from([1])); + + expect(await s.toArray({ atoms: true })).toEqual([$.unknown("some error", [])]); + }); + }); + + describe.concurrent("flatten", () => { + test("simple nested stream", async ({ expect }) => { + expect.assertions(1); + + const s = $.from([$.from([1, 2]), $.from([3, 4])]).flatten(); + + // We should get all values in order + expect(await s.toArray({ atoms: true })).toEqual([$.ok(1), $.ok(2), $.ok(3), $.ok(4)]); + }); + + test("no effect on already flat stream", async ({ expect }) => { + expect.assertions(1); + + const s = $.from([1, 2, 3, 4]).flatten(); + + // We should get all values in order + expect(await s.toArray({ atoms: true })).toEqual([$.ok(1), $.ok(2), $.ok(3), $.ok(4)]); + }); + + test("correctly flattens mixed depth stream", async ({ expect }) => { + expect.assertions(1); + + const s = $.from([1, 2, $.from([3, 4])]).flatten(); + + // We should get all values in order + expect(await s.toArray({ atoms: true })).toEqual([$.ok(1), $.ok(2), $.ok(3), $.ok(4)]); + }); + + test("maintains errors from flattened stream", async ({ expect }) => { + expect.assertions(1); + + const s = $.from([$.ok($.from([1, 2])), $.error("oh no")]).flatten(); + + // We should get all values in order + expect(await s.toArray({ atoms: true })).toEqual([$.ok(1), $.ok(2), $.error("oh no")]); + }); + + test("flattening an empty stream", async ({ expect }) => { + expect.assertions(1); + + const s = $.from([]).flatten(); + + expect(await s.toArray({ atoms: true })).toEqual([]); + }); + }); +}); diff --git a/test/index.test.ts b/test/index.test.ts deleted file mode 100644 index 9f6067b..0000000 --- a/test/index.test.ts +++ /dev/null @@ -1,509 +0,0 @@ -import { describe, test, vi } from "vitest"; -import $ from "../src"; -import { Readable } from "stream"; - -describe.concurrent("stream creation", () => { - describe.concurrent("from promise", () => { - test("resolving promise to emit value", async ({ expect }) => { - expect.assertions(1); - - const s = $.fromPromise(Promise.resolve(10)); - - expect(await s.toArray({ atoms: true })).toEqual([$.ok(10)]); - }); - }); - - describe.concurrent("from iterator", () => { - test("multi-value generator", async ({ expect }) => { - expect.assertions(1); - - const s = $.fromIterator( - (function* () { - yield 1; - yield 2; - yield 3; - })(), - ); - - expect(await s.toArray({ atoms: true })).toEqual([$.ok(1), $.ok(2), $.ok(3)]); - }); - - test("multi-value async generator", async ({ expect }) => { - expect.assertions(1); - - const s = $.fromIterator( - (async function* () { - yield 1; - yield 2; - yield 3; - })(), - ); - - expect(await s.toArray({ atoms: true })).toEqual([$.ok(1), $.ok(2), $.ok(3)]); - }); - }); - - describe.concurrent("from iterable", () => { - test("array iterable", async ({ expect }) => { - expect.assertions(1); - - const s = $.fromIterable([1, 2, 3]); - - expect(await s.toArray({ atoms: true })).toEqual([$.ok(1), $.ok(2), $.ok(3)]); - }); - - test("readable stream", async ({ expect }) => { - expect.assertions(1); - - const s = $.fromIterable(Readable.from([1, 2, 3])); - - expect(await s.toArray({ atoms: true })).toEqual([$.ok(1), $.ok(2), $.ok(3)]); - }); - }); -}); - -describe.concurrent("stream transforms", () => { - describe.concurrent("map", () => { - test("synchronous value", async ({ expect }) => { - expect.assertions(1); - - const s = $.from([1, 2, 3]).map((n) => n * 10); - - expect(await s.toArray({ atoms: true })).toEqual([$.ok(10), $.ok(20), $.ok(30)]); - }); - - test("synchronous atom", async ({ expect }) => { - expect.assertions(1); - - const s = $.from([1, 2, 3]).map((n) => { - if (n === 2) { - return $.error("number 2"); - } else { - return $.ok(n); - } - }); - - expect(await s.toArray({ atoms: true })).toEqual([ - $.ok(1), - $.error("number 2"), - $.ok(3), - ]); - }); - - test("synchronous mix", async ({ expect }) => { - expect.assertions(1); - - const s = $.from([1, 2, 3]).map((n) => { - if (n === 2) { - return $.error("number 2"); - } else { - return n; - } - }); - - expect(await s.toArray({ atoms: true })).toEqual([ - $.ok(1), - $.error("number 2"), - $.ok(3), - ]); - }); - - test("asynchronous value", async ({ expect }) => { - expect.assertions(1); - - const s = $.from([1, 2, 3]).map(async (n) => n * 10); - - expect(await s.toArray({ atoms: true })).toEqual([$.ok(10), $.ok(20), $.ok(30)]); - }); - }); - - describe.concurrent("mapError", () => { - test("single error", async ({ expect }) => { - expect.assertions(1); - - const s = $.from([$.error(1), $.ok(2), $.ok(3)]).mapError((_e) => $.ok("error")); - - expect(await s.toArray({ atoms: true })).toEqual([$.ok("error"), $.ok(2), $.ok(3)]); - }); - - test("multiple errors", async ({ expect }) => { - expect.assertions(1); - - const s = $.from([$.error(1), $.ok(2), $.error(3)]).mapError((e) => $.ok("error" + e)); - - expect(await s.toArray({ atoms: true })).toEqual([ - $.ok("error1"), - $.ok(2), - $.ok("error3"), - ]); - }); - }); - - describe.concurrent("mapUnknown", () => { - test("single unknown", async ({ expect }) => { - expect.assertions(1); - - const s = $.from([$.unknown(1, []), $.ok(2), $.ok(3)]).mapUnknown((e) => $.error(e)); - - expect(await s.toArray({ atoms: true })).toEqual([$.error(1), $.ok(2), $.ok(3)]); - }); - - test("multiple unknown", async ({ expect }) => { - expect.assertions(1); - - const s = $.from([$.unknown(1, []), $.ok(2), $.unknown(3, [])]).mapUnknown((e) => - $.error(e), - ); - - expect(await s.toArray({ atoms: true })).toEqual([$.error(1), $.ok(2), $.error(3)]); - }); - }); - - describe.concurrent("filter", () => { - test("synchronous values", async ({ expect }) => { - expect.assertions(1); - - const s = $.from([1, 2, 3, 4]).filter((n) => n % 2 === 0); - - expect(await s.toArray({ atoms: true })).toEqual([$.ok(2), $.ok(4)]); - }); - - test("synchronous atoms", async ({ expect }) => { - expect.assertions(1); - - const s = $.from([1, $.error("an error"), 2, 3, 4]) - // Perform the actual filter operation - .filter((n) => n % 2 === 0); - - expect(await s.toArray({ atoms: true })).toEqual([ - $.error("an error"), - $.ok(2), - $.ok(4), - ]); - }); - }); - - describe.concurrent("drop", () => { - test("multiple values", async ({ expect }) => { - expect.assertions(1); - - const s = $.from([1, 2, 3, 4, 5]).drop(2); - - expect(await s.toArray({ atoms: true })).toEqual([$.ok(3), $.ok(4), $.ok(5)]); - }); - - test("multiple values with errors", async ({ expect }) => { - expect.assertions(1); - - const s = $.from([1, $.error("some error"), 2, 3, 4, 5]).drop(2); - - expect(await s.toArray({ atoms: true })).toEqual([$.ok(3), $.ok(4), $.ok(5)]); - }); - - test("multiple atoms", async ({ expect }) => { - expect.assertions(1); - - const s = $.from([1, 2, 3, 4, 5]).drop(2, { atoms: true }); - - expect(await s.toArray({ atoms: true })).toEqual([$.ok(3), $.ok(4), $.ok(5)]); - }); - - test("multiple atoms with errors", async ({ expect }) => { - expect.assertions(1); - - const s = $.from([1, $.error("some error"), 2, 3, 4, 5]).drop(2, { atoms: true }); - - expect(await s.toArray({ atoms: true })).toEqual([$.ok(2), $.ok(3), $.ok(4), $.ok(5)]); - }); - }); -}); - -describe.concurrent("error handling", () => { - test("throw in map", async ({ expect }) => { - expect.assertions(1); - - const s = $.from([1, 2, 3]).map((n) => { - if (n === 2) { - // Unhandled error - throw new Error("bad number"); - } else { - return n; - } - }); - - expect(await s.toArray({ atoms: true })).toEqual([ - $.ok(1), - $.unknown(new Error("bad number"), ["map"]), - $.ok(3), - ]); - }); - - test("promise rejection in map", async ({ expect }) => { - expect.assertions(1); - - async function process(n: number) { - if (n === 2) { - throw new Error("bad number"); - } else { - return n; - } - } - - const s = $.from([1, 2, 3]).map(process); - - expect(await s.toArray({ atoms: true })).toEqual([ - $.ok(1), - $.unknown(new Error("bad number"), ["map"]), - $.ok(3), - ]); - }); - - test("track multiple transforms", async ({ expect }) => { - expect.assertions(1); - - const s = $.from([1, 2, 3, 4, 5]) - .map((n) => { - if (n === 2) { - // Unhandled error - throw new Error("bad number"); - } else { - return n; - } - }) - .filter((n) => n % 2 === 0); - - expect(await s.toArray({ atoms: true })).toEqual([ - $.unknown(new Error("bad number"), ["map"]), - $.ok(4), - ]); - }); - - test("error thrown in later transform", async ({ expect }) => { - expect.assertions(1); - - const s = $.from([1, 2, 3, 4, 5]) - .filter((n) => n > 1) - .map((n) => { - if (n % 2 === 1) { - return n * 10; - } else { - return n; - } - }) - .map((n) => { - if (n === 2) { - // Unhandled error - throw new Error("bad number"); - } else { - return n; - } - }) - .filter((n) => n % 2 === 0); - - expect(await s.toArray({ atoms: true })).toEqual([ - $.unknown(new Error("bad number"), ["filter", "map", "map"]), - $.ok(30), - $.ok(4), - $.ok(50), - ]); - }); -}); - -describe.concurrent("higher order streams", () => { - describe.concurrent("flat map", () => { - test("returning stream", async ({ expect }) => { - expect.assertions(1); - - const s = $.from([1, 2, 3]).flatMap((n) => $.from(new Array(n).fill(n))); - - expect(await s.toArray({ atoms: true })).toEqual([ - $.ok(1), - $.ok(2), - $.ok(2), - $.ok(3), - $.ok(3), - $.ok(3), - ]); - }); - - test("errors already in stream", async ({ expect }) => { - expect.assertions(1); - - const s = $.from([ - $.ok(1), - $.error("known error"), - $.ok(2), - $.unknown("bad error", []), - $.ok(3), - ]).flatMap((n) => $.from(new Array(n).fill(n))); - - expect(await s.toArray({ atoms: true })).toEqual([ - $.ok(1), - $.error("known error"), - $.ok(2), - $.ok(2), - $.unknown("bad error", []), - $.ok(3), - $.ok(3), - $.ok(3), - ]); - }); - }); - - describe.concurrent("flat tap", () => { - test("simple stream", async ({ expect }) => { - expect.assertions(3); - - const subCallback = vi.fn(); - const callback = vi.fn().mockImplementation((n) => $.of(n * n).tap(subCallback)); - const s = $.from([1, 2, 3, 4]).flatTap(callback); - - // Ensure that the flat tap doesn't alter the emitted stream items - expect(await s.toArray({ atoms: true })).toEqual([$.ok(1), $.ok(2), $.ok(3), $.ok(4)]); - - // Ensure that the flatTap implementation is called once for each item in the stream - expect(callback).toBeCalledTimes(4); - - // Ensure that the stream returned from flatTap is fully executed - expect(subCallback).toBeCalledTimes(4); - }); - - test("simple stream", async ({ expect }) => { - expect.assertions(2); - - const callback = vi.fn().mockImplementation((n) => $.of(n * n)); - const s = $.from([1, 2, 3, 4]).flatTap(callback); - - expect(await s.toArray({ atoms: true })).toEqual([$.ok(1), $.ok(2), $.ok(3), $.ok(4)]); - expect(callback).toBeCalledTimes(4); - }); - }); - - describe.concurrent("otherwise", () => { - test("empty stream", async ({ expect }) => { - expect.assertions(1); - - const s = $.from([]).otherwise($.from([1, 2])); - - expect(await s.toArray({ atoms: true })).toEqual([$.ok(1), $.ok(2)]); - }); - - test("non-empty stream", async ({ expect }) => { - expect.assertions(1); - - const s = $.from([1]).otherwise($.from([2, 3])); - - expect(await s.toArray({ atoms: true })).toEqual([$.ok(1)]); - }); - - test("empty stream with otherwise function", async ({ expect }) => { - expect.assertions(2); - - const otherwise = vi.fn().mockReturnValue($.from([1, 2])); - - const s = $.from([]).otherwise(otherwise); - - expect(await s.toArray({ atoms: true })).toEqual([$.ok(1), $.ok(2)]); - expect(otherwise).toHaveBeenCalledOnce(); - }); - - test("non-empty stream with otherwise function", async ({ expect }) => { - expect.assertions(2); - - const otherwise = vi.fn().mockReturnValue($.from([2, 3])); - - const s = $.from([1]).otherwise(otherwise); - - expect(await s.toArray({ atoms: true })).toEqual([$.ok(1)]); - expect(otherwise).not.toHaveBeenCalled(); - }); - - test("stream with known error", async ({ expect }) => { - expect.assertions(1); - - const s = $.from([$.error("some error")]).otherwise($.from([1])); - - expect(await s.toArray({ atoms: true })).toEqual([$.error("some error")]); - }); - - test("stream with unknown error", async ({ expect }) => { - expect.assertions(1); - - const s = $.from([$.unknown("some error", [])]).otherwise($.from([1])); - - expect(await s.toArray({ atoms: true })).toEqual([$.unknown("some error", [])]); - }); - }); -}); - -describe.concurrent("stream consumption", () => { - describe.concurrent("to array", () => { - test("values", async ({ expect }) => { - expect.assertions(1); - - const array = await $.from([1, 2, 3]).toArray(); - - expect(array).toEqual([1, 2, 3]); - }); - - test("values with errors on stream", async ({ expect }) => { - expect.assertions(1); - - const array = await $.from([ - 1, - $.error("known"), - 2, - 3, - $.unknown("$.error", []), - ]).toArray(); - - expect(array).toEqual([1, 2, 3]); - }); - - test("values with no items on stream", async ({ expect }) => { - expect.assertions(1); - - const array = await $.from([]).toArray(); - - expect(array).toEqual([]); - }); - - test("atoms", async ({ expect }) => { - expect.assertions(1); - - const array = await $.from([1, 2, 3]).toArray({ atoms: true }); - - expect(array).toEqual([$.ok(1), $.ok(2), $.ok(3)]); - }); - - test("atoms with errors on stream", async ({ expect }) => { - expect.assertions(1); - - const array = await $.from([ - 1, - $.error("known"), - 2, - 3, - $.unknown("$.error", []), - ]).toArray({ - atoms: true, - }); - - expect(array).toEqual([ - $.ok(1), - $.error("known"), - $.ok(2), - $.ok(3), - $.unknown("$.error", []), - ]); - }); - - test("atoms with no items on stream", async ({ expect }) => { - expect.assertions(1); - - const array = await $.from([]).toArray({ atoms: true }); - - expect(array).toEqual([]); - }); - }); -}); diff --git a/test/transforms.test.ts b/test/transforms.test.ts new file mode 100644 index 0000000..30fe471 --- /dev/null +++ b/test/transforms.test.ts @@ -0,0 +1,158 @@ +import { describe, test } from "vitest"; +import $ from "../src"; + +describe.concurrent("stream transforms", () => { + describe.concurrent("map", () => { + test("synchronous value", async ({ expect }) => { + expect.assertions(1); + + const s = $.from([1, 2, 3]).map((n) => n * 10); + + expect(await s.toArray({ atoms: true })).toEqual([$.ok(10), $.ok(20), $.ok(30)]); + }); + + test("synchronous atom", async ({ expect }) => { + expect.assertions(1); + + const s = $.from([1, 2, 3]).map((n) => { + if (n === 2) { + return $.error("number 2"); + } else { + return $.ok(n); + } + }); + + expect(await s.toArray({ atoms: true })).toEqual([ + $.ok(1), + $.error("number 2"), + $.ok(3), + ]); + }); + + test("synchronous mix", async ({ expect }) => { + expect.assertions(1); + + const s = $.from([1, 2, 3]).map((n) => { + if (n === 2) { + return $.error("number 2"); + } else { + return n; + } + }); + + expect(await s.toArray({ atoms: true })).toEqual([ + $.ok(1), + $.error("number 2"), + $.ok(3), + ]); + }); + + test("asynchronous value", async ({ expect }) => { + expect.assertions(1); + + const s = $.from([1, 2, 3]).map(async (n) => n * 10); + + expect(await s.toArray({ atoms: true })).toEqual([$.ok(10), $.ok(20), $.ok(30)]); + }); + }); + + describe.concurrent("mapError", () => { + test("single error", async ({ expect }) => { + expect.assertions(1); + + const s = $.from([$.error(1), $.ok(2), $.ok(3)]).mapError((_e) => $.ok("error")); + + expect(await s.toArray({ atoms: true })).toEqual([$.ok("error"), $.ok(2), $.ok(3)]); + }); + + test("multiple errors", async ({ expect }) => { + expect.assertions(1); + + const s = $.from([$.error(1), $.ok(2), $.error(3)]).mapError((e) => $.ok("error" + e)); + + expect(await s.toArray({ atoms: true })).toEqual([ + $.ok("error1"), + $.ok(2), + $.ok("error3"), + ]); + }); + }); + + describe.concurrent("mapUnknown", () => { + test("single unknown", async ({ expect }) => { + expect.assertions(1); + + const s = $.from([$.unknown(1, []), $.ok(2), $.ok(3)]).mapUnknown((e) => $.error(e)); + + expect(await s.toArray({ atoms: true })).toEqual([$.error(1), $.ok(2), $.ok(3)]); + }); + + test("multiple unknown", async ({ expect }) => { + expect.assertions(1); + + const s = $.from([$.unknown(1, []), $.ok(2), $.unknown(3, [])]).mapUnknown((e) => + $.error(e), + ); + + expect(await s.toArray({ atoms: true })).toEqual([$.error(1), $.ok(2), $.error(3)]); + }); + }); + + describe.concurrent("filter", () => { + test("synchronous values", async ({ expect }) => { + expect.assertions(1); + + const s = $.from([1, 2, 3, 4]).filter((n) => n % 2 === 0); + + expect(await s.toArray({ atoms: true })).toEqual([$.ok(2), $.ok(4)]); + }); + + test("synchronous atoms", async ({ expect }) => { + expect.assertions(1); + + const s = $.from([1, $.error("an error"), 2, 3, 4]) + // Perform the actual filter operation + .filter((n) => n % 2 === 0); + + expect(await s.toArray({ atoms: true })).toEqual([ + $.error("an error"), + $.ok(2), + $.ok(4), + ]); + }); + }); + + describe.concurrent("drop", () => { + test("multiple values", async ({ expect }) => { + expect.assertions(1); + + const s = $.from([1, 2, 3, 4, 5]).drop(2); + + expect(await s.toArray({ atoms: true })).toEqual([$.ok(3), $.ok(4), $.ok(5)]); + }); + + test("multiple values with errors", async ({ expect }) => { + expect.assertions(1); + + const s = $.from([1, $.error("some error"), 2, 3, 4, 5]).drop(2); + + expect(await s.toArray({ atoms: true })).toEqual([$.ok(3), $.ok(4), $.ok(5)]); + }); + + test("multiple atoms", async ({ expect }) => { + expect.assertions(1); + + const s = $.from([1, 2, 3, 4, 5]).drop(2, { atoms: true }); + + expect(await s.toArray({ atoms: true })).toEqual([$.ok(3), $.ok(4), $.ok(5)]); + }); + + test("multiple atoms with errors", async ({ expect }) => { + expect.assertions(1); + + const s = $.from([1, $.error("some error"), 2, 3, 4, 5]).drop(2, { atoms: true }); + + expect(await s.toArray({ atoms: true })).toEqual([$.ok(2), $.ok(3), $.ok(4), $.ok(5)]); + }); + }); +});