Skip to content

Commit

Permalink
Implement .toReadable() for streams
Browse files Browse the repository at this point in the history
  • Loading branch information
giraugh committed May 20, 2024
1 parent 035361d commit 8db09c4
Show file tree
Hide file tree
Showing 4 changed files with 173 additions and 3 deletions.
5 changes: 5 additions & 0 deletions .changeset/beige-forks-clean.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"windpipe": minor
---

Implement `.toReadable()` method for streams
4 changes: 2 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

89 changes: 89 additions & 0 deletions src/stream/consumption.ts
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ export class StreamConsumption<T, E> extends StreamBase {
* @param options.atoms - By default, only `ok` values are serialised, however enabling this
* will serialise all values.
*
* @see {@link Stream#toReadable} if serialisation is not required
* @group Consumption
*/
serialise(options?: { single?: boolean; atoms?: boolean }): Readable {
Expand Down Expand Up @@ -133,4 +134,92 @@ export class StreamConsumption<T, E> extends StreamBase {

return s;
}

/**
* Produce a readable node stream with the values from the stream.
*
* @param kind - What kind of readable stream to produce. When "raw" only strings and buffers can be emitted on the stream. Use "object" to preserve
* objects in the readable stream. Note that object values are not serialised, they are emitted as objects.
* @param options - Options for configuring how atoms are output on the stream
*
* @see {@link Stream#serialize} if the stream values should be serialized to json
* @group Consumption
*/
toReadable(kind: "raw" | "object", options?: { single?: boolean; atoms?: boolean }): Readable;

/**
* Produce a readable node stream with the raw values from the stream.
*
* @param options.single - Whether to emit only the first atom
*
* @see {@link Stream#serialize} if the stream values should be serialized to json
* @group Consumption
*/
toReadable(kind: "raw", options?: { single?: boolean }): Readable;

/**
* Produce a readable node stream with the values from the stream
*
* @param options.single - Whether to emit only the first atom
* @param options.atoms - By default, only `ok` values are emitted, however enabling this
* will emit all values.
*
* @see {@link Stream#serialize} if the stream values should be serialized to json
* @group Consumption
*/
toReadable(kind: "object", options?: { single?: boolean; atoms?: boolean }): Readable;

toReadable(
kind: "raw" | "object",
options: { single?: boolean; atoms?: boolean } = {},
): Readable {
// Set up a new readable stream that does nothing
const s = new Readable({
read() {},
objectMode: kind === "object",
});

// Spin off asynchronously so that the stream can be immediately returned
(async () => {
let sentItems = 0;

for await (const atom of this) {
// Determine whether non-ok values should be filtered out
if (options?.atoms !== true && !isOk(atom)) {
continue;
}

// Monitor for multiple values being sent when only one is desired
if (sentItems > 0 && options?.single) {
console.warn(
"indicated that stream would emit a single value, however multiple were emitted (ignoring)",
);
break;
}

// monitor for non raw values when not using object mode
if (
kind === "raw" &&
!(typeof atom.value === "string" || atom.value instanceof Buffer)
) {
s.emit(
"error",
new Error(
`Stream indicated it would emit raw values but emitted a '${typeof atom.value}' object`,
),
);
break;
}

// Emit atom or atom value
s.push(options?.atoms ? atom : atom.value);
sentItems += 1;
}

// End the stream
s.push(null);
})();

return s;
}
}
78 changes: 77 additions & 1 deletion test/consumption.test.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import { describe, test } from "vitest";
import $ from "../src";
import { Readable } from "node:stream";

describe.concurrent("stream consumption", () => {
describe.concurrent("to array", () => {
describe.concurrent("toArray", () => {
test("values", async ({ expect }) => {
expect.assertions(1);

Expand Down Expand Up @@ -71,4 +72,79 @@ describe.concurrent("stream consumption", () => {
expect(array).toEqual([]);
});
});

describe.concurrent("toReadable", () => {
test("object values", async ({ expect }) => {
expect.assertions(2);

const stream = $.from([1, 2, 3]).toReadable("object");
expect(stream).to.be.instanceof(Readable);

const values = await promisifyStream(stream);
expect(values).to.deep.equal([1, 2, 3]);
});

test("single object value", async ({ expect }) => {
expect.assertions(2);

const stream = $.from([1, 2, 3]).toReadable("object", { single: true });
expect(stream).to.be.instanceof(Readable);

const values = await promisifyStream(stream);
expect(values).to.deep.equal([1]);
});

test("object atoms", async ({ expect }) => {
expect.assertions(2);

const stream = $.from([$.ok(1), $.ok(2), $.error(3)]).toReadable("object", {
atoms: true,
});
expect(stream).to.be.instanceof(Readable);

const values = await promisifyStream(stream);
expect(values).to.deep.equal([$.ok(1), $.ok(2), $.error(3)]);
});

test("raw values", async ({ expect }) => {
expect.assertions(2);

const stream = $.from(["hello", " ", "world"]).toReadable("raw");

expect(stream).to.be.instanceof(Readable);
const values = await promisifyStream(stream);
expect(values.join("")).to.equal("hello world");
});

test("error when using object in raw stream", async ({ expect }) => {
expect.assertions(1);

// Creating the stream wont panic
const stream = $.from([1]).toReadable("raw");

// But reading it will emit an error so this should reject
const streamPromise = promisifyStream(stream);
expect(streamPromise).rejects.toBeTruthy();
});

test("single raw value", async ({ expect }) => {
expect.assertions(2);

const stream = $.from(["hello", " ", "world"]).toReadable("raw", { single: true });

expect(stream).to.be.instanceof(Readable);
const values = await promisifyStream(stream);
expect(values.join("")).to.equal("hello");
});
});
});

function promisifyStream(stream: Readable): Promise<unknown[]> {
const data: unknown[] = [];

return new Promise((resolve, reject) => {
stream.on("data", (value) => data.push(value));
stream.on("error", reject);
stream.on("end", () => resolve(data));
});
}

0 comments on commit 8db09c4

Please sign in to comment.