Skip to content

Commit

Permalink
File Reader: fix rewind bug
Browse files Browse the repository at this point in the history
  • Loading branch information
tyt2y3 committed Sep 28, 2023
1 parent 94cf111 commit 4dfd601
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ async function testRewind(mode: StreamMode) {

await expectNext(0, 100);

nth = await source.rewind(new SeqPos.At(0n)); if (nth instanceof FileErr) { throwNewError(nth); }
expect(nth).toStrictEqual(0);

await expectNext(0, 100);

const eos = await source.next(); if (eos instanceof FileErr) { throwNewError(eos); }
expect(isEndOfStream(eos)).toBe(true);

Expand Down
4 changes: 2 additions & 2 deletions sea-streamer-file/sea-streamer-file-reader/src/message.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,14 @@ export class MessageSource implements ByteSource {
* Returns the current location in terms of N-th beacon.
*/
async rewind(target: SeqPosEnum): Promise<number | FileErr> {
let pos;
let pos: SeqPosEnum;
if (target instanceof SeqPos.Beginning) {
pos = new SeqPos.At(Header.size());
} else if (target instanceof SeqPos.End) {
pos = target;
} else if (target instanceof SeqPos.At) {
if (target.at === 0n) {
pos = Header.size();
pos = new SeqPos.At(Header.size());
} else {
let at = target.at * this.beaconInterval();
if (at < this.knownSize()) {
Expand Down
5 changes: 3 additions & 2 deletions sea-streamer-file/sea-streamer-file-reader/src/subprocess.ts
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ async function run() {
}
for (let i = 0; i < batchSize; i++) {
const message = await source.next();
if (message instanceof FileErr) { process.send!({ error: message.toString() }); global.error = true; return; }
if (message instanceof FileErr) { process.send!({ error: `${message}` }); global.error = true; return; }
buffer.push(message);
if (isEndOfStream(message)) {
ended = true;
Expand Down Expand Up @@ -200,7 +200,8 @@ async function seek(nth: number) {
}
global.state = State.Seeking as State;
const source = global.source!;
await source.rewind(new SeqPos.At(BigInt(nth)));
const rewinded = await source.rewind(new SeqPos.At(BigInt(nth)));
if (rewinded instanceof FileErr) { process.send!({ error: `${rewinded}` }); global.error = true; return; }
process_log("rewinded");
const payload = new Buffer();
payload.append(SystemBuffer.from(PULSE_MESSAGE));
Expand Down

0 comments on commit 4dfd601

Please sign in to comment.