Skip to content

Commit

Permalink
Remove single option from toReadable() and improve handling of null…
Browse files Browse the repository at this point in the history
… atoms
  • Loading branch information
giraugh committed May 20, 2024
1 parent 8db09c4 commit c284f9f
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 42 deletions.
3 changes: 2 additions & 1 deletion src/stream/base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,8 @@ export class StreamBase {
array = [...array];

return Stream.fromNext(async () => {
return array.shift() ?? StreamBase.StreamEnd;
if (array.length === 0) return StreamBase.StreamEnd;
return array.shift()!;
});
}

Expand Down
38 changes: 16 additions & 22 deletions src/stream/consumption.ts
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ export class StreamConsumption<T, E> extends StreamBase {
* @see {@link Stream#serialize} if the stream values should be serialized to json
* @group Consumption
*/
toReadable(kind: "raw" | "object", options?: { single?: boolean; atoms?: boolean }): Readable;
toReadable(kind: "raw" | "object", options?: { atoms?: boolean }): Readable;

/**
* Produce a readable node stream with the raw values from the stream.
Expand All @@ -155,19 +155,19 @@ export class StreamConsumption<T, E> extends StreamBase {
* @see {@link Stream#serialize} if the stream values should be serialized to json
* @group Consumption
*/
toReadable(kind: "raw", options?: { single?: boolean }): Readable;
toReadable(kind: "raw"): Readable;

/**
* Produce a readable node stream with the values from the stream
* Produce a readable node stream in object mode with the values from the stream
*
* @param options.single - Whether to emit only the first atom
* @param options.atoms - By default, only `ok` values are emitted, however enabling this
* will emit all values.
*
* @note When not using `options.atoms`, any `null` atom values will be skipped when piping to the readable stream
* @see {@link Stream#serialize} if the stream values should be serialized to json
* @group Consumption
*/
toReadable(kind: "object", options?: { single?: boolean; atoms?: boolean }): Readable;
toReadable(kind: "object", options?: { atoms?: boolean }): Readable;

toReadable(
kind: "raw" | "object",
Expand All @@ -181,39 +181,33 @@ export class StreamConsumption<T, E> extends StreamBase {

// Spin off asynchronously so that the stream can be immediately returned
(async () => {
let sentItems = 0;

for await (const atom of this) {
// Determine whether non-ok values should be filtered out
if (options?.atoms !== true && !isOk(atom)) {
continue;
}

// Monitor for multiple values being sent when only one is desired
if (sentItems > 0 && options?.single) {
console.warn(
"indicated that stream would emit a single value, however multiple were emitted (ignoring)",
);
break;
}

// monitor for non raw values when not using object mode
if (
kind === "raw" &&
!(typeof atom.value === "string" || atom.value instanceof Buffer)
) {
s.emit(
"error",
new Error(
`Stream indicated it would emit raw values but emitted a '${typeof atom.value}' object`,
),
);
const message = `Stream indicated it would emit raw values but emitted a '${typeof atom.value}' object`;
console.error(message);
s.emit("error", new Error(message));
break;
}

// Show a warning if any atom value is null
if (!options?.atoms && atom.value === null) {
console.warn(
"Stream attempted to emit a `null` value in object mode which would have ended the stream early. (Skipping)",
);
continue;
}

// Emit atom or atom value
s.push(options?.atoms ? atom : atom.value);
sentItems += 1;
}

// End the stream
Expand Down
27 changes: 8 additions & 19 deletions test/consumption.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -84,26 +84,25 @@ describe.concurrent("stream consumption", () => {
expect(values).to.deep.equal([1, 2, 3]);
});

test("single object value", async ({ expect }) => {
test("object atoms", async ({ expect }) => {
expect.assertions(2);

const stream = $.from([1, 2, 3]).toReadable("object", { single: true });
const stream = $.from([$.ok(1), $.ok(2), $.error(3)]).toReadable("object", {
atoms: true,
});
expect(stream).to.be.instanceof(Readable);

const values = await promisifyStream(stream);
expect(values).to.deep.equal([1]);
expect(values).to.deep.equal([$.ok(1), $.ok(2), $.error(3)]);
});

test("object atoms", async ({ expect }) => {
test("null in object stream", async ({ expect }) => {
expect.assertions(2);

const stream = $.from([$.ok(1), $.ok(2), $.error(3)]).toReadable("object", {
atoms: true,
});
const stream = $.from([1, null, 2, 3]).toReadable("object");
expect(stream).to.be.instanceof(Readable);

const values = await promisifyStream(stream);
expect(values).to.deep.equal([$.ok(1), $.ok(2), $.error(3)]);
expect(values).to.deep.equal([1, 2, 3]);
});

test("raw values", async ({ expect }) => {
Expand All @@ -126,16 +125,6 @@ describe.concurrent("stream consumption", () => {
const streamPromise = promisifyStream(stream);
expect(streamPromise).rejects.toBeTruthy();
});

test("single raw value", async ({ expect }) => {
expect.assertions(2);

const stream = $.from(["hello", " ", "world"]).toReadable("raw", { single: true });

expect(stream).to.be.instanceof(Readable);
const values = await promisifyStream(stream);
expect(values.join("")).to.equal("hello");
});
});
});

Expand Down
12 changes: 12 additions & 0 deletions test/creation.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,18 @@ describe.concurrent("stream creation", () => {
expect(await s.toArray({ atoms: true })).toEqual([$.ok(1), $.ok(2), $.ok(3)]);
});

test("array with nullish values", async ({ expect }) => {
expect.assertions(1);

const s = $.fromArray([1, null, undefined]);

expect(await s.toArray({ atoms: true })).toEqual([
$.ok(1),
$.ok(null),
$.ok(undefined),
]);
});

test("don't modify original array", async ({ expect }) => {
expect.assertions(2);

Expand Down

0 comments on commit c284f9f

Please sign in to comment.