From 988ebdf57a95eb85d1326f184f34e2b6f62741c2 Mon Sep 17 00:00:00 2001 From: Tom Anderson Date: Mon, 15 Apr 2024 13:46:39 +1000 Subject: [PATCH 1/4] add `drop` stream transform --- src/stream/transforms.ts | 27 +++++++++++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/src/stream/transforms.ts b/src/stream/transforms.ts index ed5876b..acd9bfb 100644 --- a/src/stream/transforms.ts +++ b/src/stream/transforms.ts @@ -226,6 +226,33 @@ export class StreamTransforms extends StreamConsumption { }); } + /** + * Drop the first `n` items from the stream. + * + * @group Transform + */ + drop(n: number, options?: { atoms?: boolean }): Stream { + this.trace("drop"); + + return this.consume(async function* (it) { + let i = 0; + + for await (const atom of it) { + // Skip this atom if only values are desired + if (!options?.atoms && !isOk(atom)) { + continue; + } + + // Only yield if we're beyond the first n items + if (i >= n) { + yield atom; + } + + i++; + } + }); + } + /** * Delay emitting each value on the stream by `ms`. * From 22723f5dda31c9a4db2a4fc0d8878acef431e913 Mon Sep 17 00:00:00 2001 From: Tom Anderson Date: Mon, 15 Apr 2024 13:48:14 +1000 Subject: [PATCH 2/4] add changeset --- .changeset/spotty-otters-sniff.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 .changeset/spotty-otters-sniff.md diff --git a/.changeset/spotty-otters-sniff.md b/.changeset/spotty-otters-sniff.md new file mode 100644 index 0000000..03f6880 --- /dev/null +++ b/.changeset/spotty-otters-sniff.md @@ -0,0 +1,5 @@ +--- +"windpipe": minor +--- + +add `drop` stream transform From 79a9c80da76c20b04a301ed3ba14690a45ab79ad Mon Sep 17 00:00:00 2001 From: Tom Anderson Date: Mon, 15 Apr 2024 13:57:41 +1000 Subject: [PATCH 3/4] add failing test --- test/index.test.ts | 34 ++++++++++++++++++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/test/index.test.ts b/test/index.test.ts index 78e15bd..73e2a9e 100644 --- a/test/index.test.ts +++ b/test/index.test.ts @@ -167,6 +167,40 @@ describe.concurrent("stream transforms", () => { expect(await s.toArray({ atoms: true })).toEqual([error("an error"), ok(2), ok(4)]); }); }); + + describe.concurrent("drop", () => { + test("multiple values", async ({ expect }) => { + expect.assertions(1); + + const s = Stream.from([1, 2, 3, 4, 5]).drop(2); + + expect(await s.toArray()).toEqual([3, 4, 5]); + }); + + test("multiple values with errors", async ({ expect }) => { + expect.assertions(1); + + const s = Stream.from([1, error("some error"), 2, 3, 4, 5]).drop(2); + + expect(await s.toArray()).toEqual([3, 4, 5]); + }); + + test("multiple atoms", async ({ expect }) => { + expect.assertions(1); + + const s = Stream.from([1, 2, 3, 4, 5]).drop(2); + + expect(await s.toArray({ atoms: true })).toEqual([ok(3), ok(4), ok(5)]); + }); + + test("multiple atoms with errors", async ({ expect }) => { + expect.assertions(1); + + const s = Stream.from([1, error("some error"), 2, 3, 4, 5]).drop(2); + + expect(await s.toArray({ atoms: true })).toEqual([ok(2), ok(3), ok(4), ok(5)]); + }); + }); }); describe.concurrent("error handling", () => { From 2509ac256d74a95a222c85ff633e9d7eb0573e4f Mon Sep 17 00:00:00 2001 From: Tom Anderson Date: Mon, 15 Apr 2024 14:03:24 +1000 Subject: [PATCH 4/4] fix failing test --- test/index.test.ts | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/test/index.test.ts b/test/index.test.ts index 73e2a9e..7881ba4 100644 --- a/test/index.test.ts +++ b/test/index.test.ts @@ -174,7 +174,7 @@ describe.concurrent("stream transforms", () => { const s = Stream.from([1, 2, 3, 4, 5]).drop(2); - expect(await s.toArray()).toEqual([3, 4, 5]); + expect(await s.toArray({ atoms: true })).toEqual([ok(3), ok(4), ok(5)]); }); test("multiple values with errors", async ({ expect }) => { @@ -182,13 +182,13 @@ describe.concurrent("stream transforms", () => { const s = Stream.from([1, error("some error"), 2, 3, 4, 5]).drop(2); - expect(await s.toArray()).toEqual([3, 4, 5]); + expect(await s.toArray({ atoms: true })).toEqual([ok(3), ok(4), ok(5)]); }); test("multiple atoms", async ({ expect }) => { expect.assertions(1); - const s = Stream.from([1, 2, 3, 4, 5]).drop(2); + const s = Stream.from([1, 2, 3, 4, 5]).drop(2, { atoms: true }); expect(await s.toArray({ atoms: true })).toEqual([ok(3), ok(4), ok(5)]); }); @@ -196,7 +196,7 @@ describe.concurrent("stream transforms", () => { test("multiple atoms with errors", async ({ expect }) => { expect.assertions(1); - const s = Stream.from([1, error("some error"), 2, 3, 4, 5]).drop(2); + const s = Stream.from([1, error("some error"), 2, 3, 4, 5]).drop(2, { atoms: true }); expect(await s.toArray({ atoms: true })).toEqual([ok(2), ok(3), ok(4), ok(5)]); });