Skip to content

Commit

Permalink
fix: tweak batching behaviour (#43)
Browse files Browse the repository at this point in the history
  • Loading branch information
andogq authored Sep 9, 2024
2 parents 0a73db1 + af1e481 commit 316e542
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 2 deletions.
5 changes: 5 additions & 0 deletions .changeset/polite-wombats-jam.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"windpipe": patch
---

fix batching with timeout and yield-remaining
8 changes: 6 additions & 2 deletions src/stream/transforms.ts
Original file line number Diff line number Diff line change
Expand Up @@ -517,8 +517,12 @@ export class StreamTransforms<T, E> extends StreamConsumption<T, E> {
(batch) => batch.length >= (options?.n ?? 1) || options?.yieldRemaining,
);

if (ready.length === 0 && options?.yieldEmpty) {
yield ok([]);
if (ready.reduce((total, batch) => total + batch.length, 0) === 0) {
if (options?.yieldEmpty) {
// Only yield an empty batch if there are absolutely no items ready to
// be yielded and if the configuration allows it
yield ok([]);
}
} else {
for (const batch of ready) {
const items = batch.splice(0, options?.n ?? batch.length);
Expand Down
27 changes: 27 additions & 0 deletions test/transforms.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,8 @@ describe("stream transforms", () => {
});

test("timeout bucket no items", async ({ expect }) => {
expect.assertions(2);

const mapper = vi.fn();

$.fromNext(() => new Promise<number>(() => {}))
Expand All @@ -461,6 +463,31 @@ describe("stream transforms", () => {
expect(mapper).toHaveBeenCalledTimes(0);
});

test("yield remaining doesn't incorrectly yield empty", async ({ expect }) => {
expect.assertions(3);

const mapper = vi.fn();

const testItems = [1, 1, 1, 1];
$.fromNext(async () => {
if (testItems.length > 0) {
return testItems.shift();
}

return new Promise(() => {});
})
.batch({ timeout: 100, n: 10, yieldRemaining: true, yieldEmpty: false })
.map(mapper)
.exhaust();

await vi.advanceTimersByTimeAsync(100);
expect(mapper).toHaveBeenCalledTimes(1);
expect(mapper).toHaveBeenNthCalledWith(1, [1, 1, 1, 1]);

await vi.advanceTimersByTimeAsync(100);
expect(mapper).toHaveBeenCalledTimes(1);
});

describe("batch weirdness", () => {
test("5 items, n = 10", async ({ expect }) => {
expect.assertions(1);
Expand Down

0 comments on commit 316e542

Please sign in to comment.