diff --git a/.changeset/spotty-otters-sniff.md b/.changeset/spotty-otters-sniff.md new file mode 100644 index 0000000..03f6880 --- /dev/null +++ b/.changeset/spotty-otters-sniff.md @@ -0,0 +1,5 @@ +--- +"windpipe": minor +--- + +add `drop` stream transform diff --git a/src/stream/transforms.ts b/src/stream/transforms.ts index ed5876b..acd9bfb 100644 --- a/src/stream/transforms.ts +++ b/src/stream/transforms.ts @@ -226,6 +226,33 @@ export class StreamTransforms extends StreamConsumption { }); } + /** + * Drop the first `n` items from the stream. + * + * @group Transform + */ + drop(n: number, options?: { atoms?: boolean }): Stream { + this.trace("drop"); + + return this.consume(async function* (it) { + let i = 0; + + for await (const atom of it) { + // Skip this atom if only values are desired + if (!options?.atoms && !isOk(atom)) { + continue; + } + + // Only yield if we're beyond the first n items + if (i >= n) { + yield atom; + } + + i++; + } + }); + } + /** * Delay emitting each value on the stream by `ms`. * diff --git a/test/index.test.ts b/test/index.test.ts index 78e15bd..7881ba4 100644 --- a/test/index.test.ts +++ b/test/index.test.ts @@ -167,6 +167,40 @@ describe.concurrent("stream transforms", () => { expect(await s.toArray({ atoms: true })).toEqual([error("an error"), ok(2), ok(4)]); }); }); + + describe.concurrent("drop", () => { + test("multiple values", async ({ expect }) => { + expect.assertions(1); + + const s = Stream.from([1, 2, 3, 4, 5]).drop(2); + + expect(await s.toArray({ atoms: true })).toEqual([ok(3), ok(4), ok(5)]); + }); + + test("multiple values with errors", async ({ expect }) => { + expect.assertions(1); + + const s = Stream.from([1, error("some error"), 2, 3, 4, 5]).drop(2); + + expect(await s.toArray({ atoms: true })).toEqual([ok(3), ok(4), ok(5)]); + }); + + test("multiple atoms", async ({ expect }) => { + expect.assertions(1); + + const s = Stream.from([1, 2, 3, 4, 5]).drop(2, { atoms: true }); + + expect(await s.toArray({ atoms: true })).toEqual([ok(3), ok(4), ok(5)]); + }); + + test("multiple atoms with errors", async ({ expect }) => { + expect.assertions(1); + + const s = Stream.from([1, error("some error"), 2, 3, 4, 5]).drop(2, { atoms: true }); + + expect(await s.toArray({ atoms: true })).toEqual([ok(2), ok(3), ok(4), ok(5)]); + }); + }); }); describe.concurrent("error handling", () => {