diff --git a/.changeset/beige-forks-clean.md b/.changeset/beige-forks-clean.md new file mode 100644 index 0000000..c3aad08 --- /dev/null +++ b/.changeset/beige-forks-clean.md @@ -0,0 +1,5 @@ +--- +"windpipe": minor +--- + +Implement `.toReadable()` method for streams diff --git a/package-lock.json b/package-lock.json index 10cfaee..c0c0e14 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "windpipe", - "version": "0.7.0", + "version": "0.8.2", "lockfileVersion": 3, "requires": true, "packages": { "": { "name": "windpipe", - "version": "0.7.0", + "version": "0.8.2", "license": "ISC", "devDependencies": { "@changesets/cli": "^2.27.1", diff --git a/src/stream/consumption.ts b/src/stream/consumption.ts index 405d8bc..f94d1e8 100644 --- a/src/stream/consumption.ts +++ b/src/stream/consumption.ts @@ -83,6 +83,7 @@ export class StreamConsumption extends StreamBase { * @param options.atoms - By default, only `ok` values are serialised, however enabling this * will serialise all values. * + * @see {@link Stream#toReadable} if serialisation is not required * @group Consumption */ serialise(options?: { single?: boolean; atoms?: boolean }): Readable { @@ -133,4 +134,92 @@ export class StreamConsumption extends StreamBase { return s; } + + /** + * Produce a readable node stream with the values from the stream. + * + * @param kind - What kind of readable stream to produce. When "raw" only strings and buffers can be emitted on the stream. Use "object" to preserve + * objects in the readable stream. Note that object values are not serialised, they are emitted as objects. + * @param options - Options for configuring how atoms are output on the stream + * + * @see {@link Stream#serialize} if the stream values should be serialized to json + * @group Consumption + */ + toReadable(kind: "raw" | "object", options?: { single?: boolean; atoms?: boolean }): Readable; + + /** + * Produce a readable node stream with the raw values from the stream. + * + * @param options.single - Whether to emit only the first atom + * + * @see {@link Stream#serialize} if the stream values should be serialized to json + * @group Consumption + */ + toReadable(kind: "raw", options?: { single?: boolean }): Readable; + + /** + * Produce a readable node stream with the values from the stream + * + * @param options.single - Whether to emit only the first atom + * @param options.atoms - By default, only `ok` values are emitted, however enabling this + * will emit all values. + * + * @see {@link Stream#serialize} if the stream values should be serialized to json + * @group Consumption + */ + toReadable(kind: "object", options?: { single?: boolean; atoms?: boolean }): Readable; + + toReadable( + kind: "raw" | "object", + options: { single?: boolean; atoms?: boolean } = {}, + ): Readable { + // Set up a new readable stream that does nothing + const s = new Readable({ + read() {}, + objectMode: kind === "object", + }); + + // Spin off asynchronously so that the stream can be immediately returned + (async () => { + let sentItems = 0; + + for await (const atom of this) { + // Determine whether non-ok values should be filtered out + if (options?.atoms !== true && !isOk(atom)) { + continue; + } + + // Monitor for multiple values being sent when only one is desired + if (sentItems > 0 && options?.single) { + console.warn( + "indicated that stream would emit a single value, however multiple were emitted (ignoring)", + ); + break; + } + + // monitor for non raw values when not using object mode + if ( + kind === "raw" && + !(typeof atom.value === "string" || atom.value instanceof Buffer) + ) { + s.emit( + "error", + new Error( + `Stream indicated it would emit raw values but emitted a '${typeof atom.value}' object`, + ), + ); + break; + } + + // Emit atom or atom value + s.push(options?.atoms ? atom : atom.value); + sentItems += 1; + } + + // End the stream + s.push(null); + })(); + + return s; + } } diff --git a/test/consumption.test.ts b/test/consumption.test.ts index eff7125..b382dcc 100644 --- a/test/consumption.test.ts +++ b/test/consumption.test.ts @@ -1,8 +1,9 @@ import { describe, test } from "vitest"; import $ from "../src"; +import { Readable } from "node:stream"; describe.concurrent("stream consumption", () => { - describe.concurrent("to array", () => { + describe.concurrent("toArray", () => { test("values", async ({ expect }) => { expect.assertions(1); @@ -71,4 +72,79 @@ describe.concurrent("stream consumption", () => { expect(array).toEqual([]); }); }); + + describe.concurrent("toReadable", () => { + test("object values", async ({ expect }) => { + expect.assertions(2); + + const stream = $.from([1, 2, 3]).toReadable("object"); + expect(stream).to.be.instanceof(Readable); + + const values = await promisifyStream(stream); + expect(values).to.deep.equal([1, 2, 3]); + }); + + test("single object value", async ({ expect }) => { + expect.assertions(2); + + const stream = $.from([1, 2, 3]).toReadable("object", { single: true }); + expect(stream).to.be.instanceof(Readable); + + const values = await promisifyStream(stream); + expect(values).to.deep.equal([1]); + }); + + test("object atoms", async ({ expect }) => { + expect.assertions(2); + + const stream = $.from([$.ok(1), $.ok(2), $.error(3)]).toReadable("object", { + atoms: true, + }); + expect(stream).to.be.instanceof(Readable); + + const values = await promisifyStream(stream); + expect(values).to.deep.equal([$.ok(1), $.ok(2), $.error(3)]); + }); + + test("raw values", async ({ expect }) => { + expect.assertions(2); + + const stream = $.from(["hello", " ", "world"]).toReadable("raw"); + + expect(stream).to.be.instanceof(Readable); + const values = await promisifyStream(stream); + expect(values.join("")).to.equal("hello world"); + }); + + test("error when using object in raw stream", async ({ expect }) => { + expect.assertions(1); + + // Creating the stream wont panic + const stream = $.from([1]).toReadable("raw"); + + // But reading it will emit an error so this should reject + const streamPromise = promisifyStream(stream); + expect(streamPromise).rejects.toBeTruthy(); + }); + + test("single raw value", async ({ expect }) => { + expect.assertions(2); + + const stream = $.from(["hello", " ", "world"]).toReadable("raw", { single: true }); + + expect(stream).to.be.instanceof(Readable); + const values = await promisifyStream(stream); + expect(values.join("")).to.equal("hello"); + }); + }); }); + +function promisifyStream(stream: Readable): Promise { + const data: unknown[] = []; + + return new Promise((resolve, reject) => { + stream.on("data", (value) => data.push(value)); + stream.on("error", reject); + stream.on("end", () => resolve(data)); + }); +}