Skip to content

Commit

Permalink
Merge pull request #1 from clear/feat/drop
Browse files Browse the repository at this point in the history
add `drop` stream transform
  • Loading branch information
andogq authored Apr 15, 2024
2 parents a7f6135 + 2509ac2 commit c995dde
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 0 deletions.
5 changes: 5 additions & 0 deletions .changeset/spotty-otters-sniff.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"windpipe": minor
---

add `drop` stream transform
27 changes: 27 additions & 0 deletions src/stream/transforms.ts
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,33 @@ export class StreamTransforms<T, E> extends StreamConsumption<T, E> {
});
}

/**
* Drop the first `n` items from the stream.
*
* @group Transform
*/
drop(n: number, options?: { atoms?: boolean }): Stream<T, E> {
this.trace("drop");

return this.consume(async function* (it) {
let i = 0;

for await (const atom of it) {
// Skip this atom if only values are desired
if (!options?.atoms && !isOk(atom)) {
continue;
}

// Only yield if we're beyond the first n items
if (i >= n) {
yield atom;
}

i++;
}
});
}

/**
* Delay emitting each value on the stream by `ms`.
*
Expand Down
34 changes: 34 additions & 0 deletions test/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,40 @@ describe.concurrent("stream transforms", () => {
expect(await s.toArray({ atoms: true })).toEqual([error("an error"), ok(2), ok(4)]);
});
});

describe.concurrent("drop", () => {
test("multiple values", async ({ expect }) => {
expect.assertions(1);

const s = Stream.from([1, 2, 3, 4, 5]).drop(2);

expect(await s.toArray({ atoms: true })).toEqual([ok(3), ok(4), ok(5)]);
});

test("multiple values with errors", async ({ expect }) => {
expect.assertions(1);

const s = Stream.from([1, error("some error"), 2, 3, 4, 5]).drop(2);

expect(await s.toArray({ atoms: true })).toEqual([ok(3), ok(4), ok(5)]);
});

test("multiple atoms", async ({ expect }) => {
expect.assertions(1);

const s = Stream.from([1, 2, 3, 4, 5]).drop(2, { atoms: true });

expect(await s.toArray({ atoms: true })).toEqual([ok(3), ok(4), ok(5)]);
});

test("multiple atoms with errors", async ({ expect }) => {
expect.assertions(1);

const s = Stream.from([1, error("some error"), 2, 3, 4, 5]).drop(2, { atoms: true });

expect(await s.toArray({ atoms: true })).toEqual([ok(2), ok(3), ok(4), ok(5)]);
});
});
});

describe.concurrent("error handling", () => {
Expand Down

0 comments on commit c995dde

Please sign in to comment.