Skip to content

Commit

Permalink
fix: improve yielding in batch operations (#41)
Browse files Browse the repository at this point in the history
  • Loading branch information
andogq authored Aug 30, 2024
2 parents ea4b78a + 8d3b6be commit 0f7b944
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 11 deletions.
5 changes: 5 additions & 0 deletions .changeset/polite-spoons-fail.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"windpipe": patch
---

fix: inconsistency with batch yielding
26 changes: 15 additions & 11 deletions src/stream/transforms.ts
Original file line number Diff line number Diff line change
Expand Up @@ -240,18 +240,22 @@ export class StreamTransforms<T, E> extends StreamConsumption<T, E> {
this.trace("take");

return this.consume(async function* (it) {
if (n <= 0) {
return;
}

let i = 0;

for await (const atom of it) {
if (i >= n) {
break;
}

if (isOk(atom) || options?.atoms === true) {
yield atom;
}

i++;

if (i >= n) {
break;
}
}
});
}
Expand Down Expand Up @@ -508,19 +512,19 @@ export class StreamTransforms<T, E> extends StreamConsumption<T, E> {
}

if (result === "timeout" && "timeout" in options) {
if (totalBatchSize > 0) {
// Work out which batches are ready
const ready = Object.values(batches).filter(
(batch) => batch.length >= (options?.n ?? 1),
);
// Work out which batches are ready
const ready = Object.values(batches).filter(
(batch) => batch.length >= (options?.n ?? 1) || options?.yieldRemaining,
);

if (ready.length === 0 && options?.yieldEmpty) {
yield ok([]);
} else {
for (const batch of ready) {
const items = batch.splice(0, options?.n ?? batch.length);
yield ok<T[], E>(items);
totalBatchSize -= items.length;
}
} else if (totalBatchSize === 0 && options?.yieldEmpty) {
yield ok([]);
}
}

Expand Down
70 changes: 70 additions & 0 deletions test/transforms.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -460,5 +460,75 @@ describe("stream transforms", () => {
await vi.advanceTimersByTimeAsync(100);
expect(mapper).toHaveBeenCalledTimes(0);
});

describe("batch weirdness", () => {
test("5 items, n = 10", async ({ expect }) => {
expect.assertions(1);

const s = await $.from([1, 2, 3, 4, 5]).batch({ n: 10 }).toArray();

expect(s).toEqual([]);
});

test("5 items, n = 10, yieldRemaining", async ({ expect }) => {
expect.assertions(1);

const s = await $.from([1, 2, 3, 4, 5])
.batch({ n: 10, yieldRemaining: true })
.toArray();

expect(s).toEqual([[1, 2, 3, 4, 5]]);
});

function createHangingStream() {
let i = 0;
return $.fromNext(() => {
if (i < 5) {
return Promise.resolve(i++);
}

// Hang
return new Promise(() => {});
});
}

test("5 items, n = 10, timeout, yieldRemaining, infinite hang", async ({ expect }) => {
expect.assertions(1);

const a = createHangingStream()
.batch({ n: 10, timeout: 5, yieldRemaining: true })
.take(1)
.toArray();

await vi.advanceTimersByTimeAsync(5);
expect(await a).toEqual([[0, 1, 2, 3, 4]]);
});

test("5 items, n = 10, timeout, yieldEmpty, infinite hang", async ({ expect }) => {
expect.assertions(1);

const a = createHangingStream()
.batch({ n: 10, timeout: 5, yieldEmpty: true })
.take(1)
.toArray();

await vi.advanceTimersByTimeAsync(5);
expect(await a).toEqual([[]]);
});

test("5 items, n = 10, timeout, yieldEmpty, yieldRemaining, infinite hang", async ({
expect,
}) => {
expect.assertions(1);

const a = createHangingStream()
.batch({ n: 10, timeout: 5, yieldRemaining: true, yieldEmpty: true })
.take(2)
.toArray();

await vi.advanceTimersByTimeAsync(10);
expect(await a).toEqual([[0, 1, 2, 3, 4], []]);
});
});
});
});

0 comments on commit 0f7b944

Please sign in to comment.