Skip to content

Commit

Permalink
Simplify flattened stream error type
Browse files Browse the repository at this point in the history
  • Loading branch information
giraugh committed May 3, 2024
1 parent 10e211e commit 9ac3261
Showing 1 changed file with 8 additions and 8 deletions.
16 changes: 8 additions & 8 deletions src/stream/higher-order.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ export class HigherOrderStream<T, E> extends StreamTransforms<T, E> {
): Stream<U, F> {
const trace = this.trace("flatOp");

return this.consume(async function* (it) {
return this.consume(async function*(it) {
for await (const atom of it) {
const result = filter(atom);

Expand Down Expand Up @@ -67,7 +67,7 @@ export class HigherOrderStream<T, E> extends StreamTransforms<T, E> {
): Stream<U, F> {
this.trace("flatMapAll");

return this.flatOp(filter, cb, async function* (_atom, stream) {
return this.flatOp(filter, cb, async function*(_atom, stream) {
yield* stream;
});
}
Expand Down Expand Up @@ -131,10 +131,10 @@ export class HigherOrderStream<T, E> extends StreamTransforms<T, E> {
* @note Any atoms that are not nested streams are emitted as-is
* @group Higher Order
*/
flatten(): T extends Stream<infer U, infer V> ? Stream<U, V | E> : Stream<T, E> {
flatten(): T extends Stream<infer U, E> ? Stream<U, E> : Stream<T, E> {
this.trace("flatten");

return this.consume(async function* (it) {
return this.consume(async function*(it) {
for await (const atom of it) {
// Yield errors/unkowns directly
if (!isOk(atom)) {
Expand All @@ -149,7 +149,7 @@ export class HigherOrderStream<T, E> extends StreamTransforms<T, E> {
yield atom;
}
}
}) as T extends Stream<infer U, infer V> ? Stream<U, V | E> : Stream<T, E>;
}) as T extends Stream<infer U, E> ? Stream<U, E> : Stream<T, E>;
}

/**
Expand All @@ -161,7 +161,7 @@ export class HigherOrderStream<T, E> extends StreamTransforms<T, E> {
): Stream<T, E> {
this.trace("flatTapAtom");

return this.flatOp(filter, cb, async function* (atom, stream) {
return this.flatOp(filter, cb, async function*(atom, stream) {
await exhaust(stream);

yield atom;
Expand Down Expand Up @@ -193,7 +193,7 @@ export class HigherOrderStream<T, E> extends StreamTransforms<T, E> {
* @group Higher Order
*/
otherwise(cbOrStream: CallbackOrStream<T, E>): Stream<T, E> {
return this.consume(async function* (it) {
return this.consume(async function*(it) {
// Count the items being emitted from the iterator
let count = 0;
for await (const atom of it) {
Expand Down Expand Up @@ -228,7 +228,7 @@ export class HigherOrderStream<T, E> extends StreamTransforms<T, E> {
* @group Higher Order
*/
replaceWith<U, F>(cbOrStream: CallbackOrStream<U, F>): Stream<U, F> {
return this.consume(async function* (it) {
return this.consume(async function*(it) {
// Consume all the items in the stream
await exhaust(it);

Expand Down

0 comments on commit 9ac3261

Please sign in to comment.