Skip to content

Commit

Permalink
fix: incorrect slicing when making batch (#33)
Browse files Browse the repository at this point in the history
  • Loading branch information
andogq authored Jul 11, 2024
2 parents d4eaf0a + a87c801 commit 184b767
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 1 deletion.
5 changes: 5 additions & 0 deletions .changeset/pink-lamps-sit.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"windpipe": patch
---

fix: incorrect splicing when generating batch
2 changes: 1 addition & 1 deletion src/stream/transforms.ts
Original file line number Diff line number Diff line change
Expand Up @@ -506,7 +506,7 @@ export class StreamTransforms<T, E> extends StreamConsumption<T, E> {
);

for (const batch of ready) {
const items = batch.splice(0, options?.n ?? -1);
const items = batch.splice(0, options?.n ?? batch.length);
yield ok<T[], E>(items);
totalBatchSize -= items.length;
}
Expand Down
29 changes: 29 additions & 0 deletions test/transforms.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -429,5 +429,34 @@ describe("stream transforms", () => {
[2, 4],
]);
});

test("timeout bucket", async ({ expect }) => {
const mapper = vi.fn();

$.from([1, 2, 3, 4, 5, 6])
.batch({ timeout: 100, byBucket: (n) => (n % 2 === 0 ? "even" : "odd") })
.map(mapper)
.exhaust();

await vi.advanceTimersByTimeAsync(100);
expect(mapper).toHaveBeenCalledTimes(2);
expect(mapper).toHaveBeenNthCalledWith(1, [1, 3, 5]);
expect(mapper).toHaveBeenNthCalledWith(2, [2, 4, 6]);
});

test("timeout bucket no items", async ({ expect }) => {
const mapper = vi.fn();

$.fromNext(() => new Promise<number>(() => {}))
.batch({ timeout: 100, byBucket: (n) => (n % 2 === 0 ? "even" : "odd") })
.map(mapper)
.exhaust();

await vi.advanceTimersByTimeAsync(100);
expect(mapper).toHaveBeenCalledTimes(0);

await vi.advanceTimersByTimeAsync(100);
expect(mapper).toHaveBeenCalledTimes(0);
});
});
});

0 comments on commit 184b767

Please sign in to comment.