diff --git a/.changeset/polite-spoons-fail.md b/.changeset/polite-spoons-fail.md new file mode 100644 index 0000000..8d97b51 --- /dev/null +++ b/.changeset/polite-spoons-fail.md @@ -0,0 +1,5 @@ +--- +"windpipe": patch +--- + +fix: inconsistency with batch yielding diff --git a/src/stream/transforms.ts b/src/stream/transforms.ts index 097a24d..4e2352e 100644 --- a/src/stream/transforms.ts +++ b/src/stream/transforms.ts @@ -240,18 +240,22 @@ export class StreamTransforms extends StreamConsumption { this.trace("take"); return this.consume(async function* (it) { + if (n <= 0) { + return; + } + let i = 0; for await (const atom of it) { - if (i >= n) { - break; - } - if (isOk(atom) || options?.atoms === true) { yield atom; } i++; + + if (i >= n) { + break; + } } }); } @@ -508,19 +512,19 @@ export class StreamTransforms extends StreamConsumption { } if (result === "timeout" && "timeout" in options) { - if (totalBatchSize > 0) { - // Work out which batches are ready - const ready = Object.values(batches).filter( - (batch) => batch.length >= (options?.n ?? 1), - ); + // Work out which batches are ready + const ready = Object.values(batches).filter( + (batch) => batch.length >= (options?.n ?? 1) || options?.yieldRemaining, + ); + if (ready.length === 0 && options?.yieldEmpty) { + yield ok([]); + } else { for (const batch of ready) { const items = batch.splice(0, options?.n ?? batch.length); yield ok(items); totalBatchSize -= items.length; } - } else if (totalBatchSize === 0 && options?.yieldEmpty) { - yield ok([]); } } diff --git a/test/transforms.test.ts b/test/transforms.test.ts index b8f227c..00d89e1 100644 --- a/test/transforms.test.ts +++ b/test/transforms.test.ts @@ -460,5 +460,75 @@ describe("stream transforms", () => { await vi.advanceTimersByTimeAsync(100); expect(mapper).toHaveBeenCalledTimes(0); }); + + describe("batch weirdness", () => { + test("5 items, n = 10", async ({ expect }) => { + expect.assertions(1); + + const s = await $.from([1, 2, 3, 4, 5]).batch({ n: 10 }).toArray(); + + expect(s).toEqual([]); + }); + + test("5 items, n = 10, yieldRemaining", async ({ expect }) => { + expect.assertions(1); + + const s = await $.from([1, 2, 3, 4, 5]) + .batch({ n: 10, yieldRemaining: true }) + .toArray(); + + expect(s).toEqual([[1, 2, 3, 4, 5]]); + }); + + function createHangingStream() { + let i = 0; + return $.fromNext(() => { + if (i < 5) { + return Promise.resolve(i++); + } + + // Hang + return new Promise(() => {}); + }); + } + + test("5 items, n = 10, timeout, yieldRemaining, infinite hang", async ({ expect }) => { + expect.assertions(1); + + const a = createHangingStream() + .batch({ n: 10, timeout: 5, yieldRemaining: true }) + .take(1) + .toArray(); + + await vi.advanceTimersByTimeAsync(5); + expect(await a).toEqual([[0, 1, 2, 3, 4]]); + }); + + test("5 items, n = 10, timeout, yieldEmpty, infinite hang", async ({ expect }) => { + expect.assertions(1); + + const a = createHangingStream() + .batch({ n: 10, timeout: 5, yieldEmpty: true }) + .take(1) + .toArray(); + + await vi.advanceTimersByTimeAsync(5); + expect(await a).toEqual([[]]); + }); + + test("5 items, n = 10, timeout, yieldEmpty, yieldRemaining, infinite hang", async ({ + expect, + }) => { + expect.assertions(1); + + const a = createHangingStream() + .batch({ n: 10, timeout: 5, yieldRemaining: true, yieldEmpty: true }) + .take(2) + .toArray(); + + await vi.advanceTimersByTimeAsync(10); + expect(await a).toEqual([[0, 1, 2, 3, 4], []]); + }); + }); }); });