Skip to content

Commit

Permalink
fix: emit known and unknown errors onto stream
Browse files Browse the repository at this point in the history
  • Loading branch information
andogq committed Jul 5, 2024
1 parent 784adb4 commit 87515b3
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 0 deletions.
5 changes: 5 additions & 0 deletions .changeset/weak-worms-flash.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"windpipe": patch
---

fix: emit known and unknown errors onto node stream
1 change: 1 addition & 0 deletions src/stream/consumption.ts
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ export class StreamConsumption<T, E> extends StreamBase {
for await (const atom of this) {
// Determine whether non-ok values should be filtered out
if (options?.atoms !== true && !isOk(atom)) {
s.emit("error", atom.value);
continue;
}

Expand Down
57 changes: 57 additions & 0 deletions test/consumption.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,63 @@ describe.concurrent("stream consumption", () => {
expect(errors).toHaveLength(1);
});

test("propagate known errors into raw readable stream", async ({ expect }) => {
expect.assertions(1);

const stream = promisifyStream(
$.from([$.ok("a"), $.ok("b"), $.error("some error"), $.ok("c")]).toReadable("raw"),
);

expect(stream).rejects.toEqual("some error");
});

test("propagate known errors into object readable stream", async ({ expect }) => {
expect.assertions(1);

const stream = promisifyStream(
$.from([$.ok("a"), $.ok("b"), $.error("some error"), $.ok("c")]).toReadable(
"object",
),
);

expect(stream).rejects.toEqual("some error");
});

test("propagate multiple errors into readable stream", async ({ expect }) => {
expect.assertions(2);

const stream = $.from([
$.ok("a"),
$.error("an error"),
$.ok("b"),
$.error("some error"),
$.ok("c"),
]).toReadable("object");

// Monitor the stream
const { data, errors } = await emptyStream(stream);

expect(errors).toEqual(["an error", "some error"]);
expect(data).toEqual(["a", "b", "c"]);
});

test("propagate known and unknown errors", async ({ expect }) => {
expect.assertions(2);

const stream = $.from([
$.ok("a"),
$.error("an error"),
$.ok("b"),
$.unknown("unknown error", []),
$.ok("c"),
]).toReadable("object");

// Monitor the stream
const { data, errors } = await emptyStream(stream);

expect(errors).toEqual(["an error", "unknown error"]);
expect(data).toEqual(["a", "b", "c"]);
});
});
});

Expand Down

0 comments on commit 87515b3

Please sign in to comment.