Skip to content

Commit

Permalink
feat: Implement .collect() for streams (#14)
Browse files Browse the repository at this point in the history
  • Loading branch information
giraugh authored May 7, 2024
2 parents 91799c3 + fb81f4c commit 4d6d924
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 6 deletions.
5 changes: 5 additions & 0 deletions .changeset/funny-kiwis-tease.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"windpipe": minor
---

Add .collect() method to streams
12 changes: 6 additions & 6 deletions src/stream/higher-order.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ export class HigherOrderStream<T, E> extends StreamTransforms<T, E> {
): Stream<U, F> {
const trace = this.trace("flatOp");

return this.consume(async function*(it) {
return this.consume(async function* (it) {
for await (const atom of it) {
const result = filter(atom);

Expand Down Expand Up @@ -67,7 +67,7 @@ export class HigherOrderStream<T, E> extends StreamTransforms<T, E> {
): Stream<U, F> {
this.trace("flatMapAll");

return this.flatOp(filter, cb, async function*(_atom, stream) {
return this.flatOp(filter, cb, async function* (_atom, stream) {
yield* stream;
});
}
Expand Down Expand Up @@ -134,7 +134,7 @@ export class HigherOrderStream<T, E> extends StreamTransforms<T, E> {
flatten(): T extends Stream<infer U, E> ? Stream<U, E> : Stream<T, E> {
this.trace("flatten");

return this.consume(async function*(it) {
return this.consume(async function* (it) {
for await (const atom of it) {
// Yield errors/unkowns directly
if (!isOk(atom)) {
Expand All @@ -161,7 +161,7 @@ export class HigherOrderStream<T, E> extends StreamTransforms<T, E> {
): Stream<T, E> {
this.trace("flatTapAtom");

return this.flatOp(filter, cb, async function*(atom, stream) {
return this.flatOp(filter, cb, async function* (atom, stream) {
await exhaust(stream);

yield atom;
Expand Down Expand Up @@ -193,7 +193,7 @@ export class HigherOrderStream<T, E> extends StreamTransforms<T, E> {
* @group Higher Order
*/
otherwise(cbOrStream: CallbackOrStream<T, E>): Stream<T, E> {
return this.consume(async function*(it) {
return this.consume(async function* (it) {
// Count the items being emitted from the iterator
let count = 0;
for await (const atom of it) {
Expand Down Expand Up @@ -228,7 +228,7 @@ export class HigherOrderStream<T, E> extends StreamTransforms<T, E> {
* @group Higher Order
*/
replaceWith<U, F>(cbOrStream: CallbackOrStream<U, F>): Stream<U, F> {
return this.consume(async function*(it) {
return this.consume(async function* (it) {
// Consume all the items in the stream
await exhaust(it);

Expand Down
23 changes: 23 additions & 0 deletions src/stream/transforms.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,29 @@ export class StreamTransforms<T, E> extends StreamConsumption<T, E> {
return stream;
}

/**
* Collect the values of the stream atoms into an array then return a stream which emits that array
*
* @note non-ok atoms are emitted as-is, the collected array is always emitted last
* @note empty streams will emit an empty array
* @group Transform
*/
collect(): Stream<T[], E> {
this.trace("collect");

return this.consume(async function* (it) {
const values: T[] = [];
for await (const atom of it) {
if (isOk(atom)) {
values.push(atom.value);
} else {
yield atom;
}
}
yield ok(values);
});
}

/**
* Map over each value in the stream.
*
Expand Down
26 changes: 26 additions & 0 deletions test/transforms.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,32 @@ describe.concurrent("stream transforms", () => {
});
});

describe.concurrent("collect", () => {
test("simple stream without errors", async ({ expect }) => {
expect.assertions(1);

const s = $.from([1, 2, 3]).collect();

expect(await s.toArray({ atoms: true })).toEqual([$.ok([1, 2, 3])]);
});

test("empty stream", async ({ expect }) => {
expect.assertions(1);

const s = $.from([]).collect();

expect(await s.toArray({ atoms: true })).toEqual([$.ok([])]);
});

test("single error", async ({ expect }) => {
expect.assertions(1);

const s = $.from([$.error(1), $.ok(2), $.ok(3)]).collect();

expect(await s.toArray({ atoms: true })).toEqual([$.error(1), $.ok([2, 3])]);
});
});

describe.concurrent("mapError", () => {
test("single error", async ({ expect }) => {
expect.assertions(1);
Expand Down

0 comments on commit 4d6d924

Please sign in to comment.