Skip to content

Commit

Permalink
File Reader: enhance API
Browse files Browse the repository at this point in the history
  • Loading branch information
tyt2y3 committed Sep 27, 2023
1 parent 744430f commit 94cf111
Showing 1 changed file with 73 additions and 25 deletions.
98 changes: 73 additions & 25 deletions sea-streamer-file/sea-streamer-file-reader/src/subprocess.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,30 @@ import { Buffer as SystemBuffer } from "node:buffer";

export const DEFAULT_QUOTA: number = 10000;

export interface CtrlMsg {
cmd: "open" | "more" | "seek" | "exit";
export type CtrlMsg = OpenCmd | RunCmd | MoreCmd | SeekCmd | ExitCmd;

export interface OpenCmd {
cmd: "open";
/** file path */
path?: string;
path: string;
}

export interface RunCmd {
cmd: "run";
}

export interface MoreCmd {
cmd: "more";
}

export interface SeekCmd {
cmd: "seek";
/** n-th beacon */
nth?: number;
nth: number;
}

export interface ExitCmd {
cmd: "exit";
}

export type IpcMessage = MetaUpdate | MessageUpdate;
Expand Down Expand Up @@ -42,6 +60,7 @@ export interface StatusUpdate {

enum State {
Init,
Ready,
Running,
PreSeek,
Seeking,
Expand All @@ -65,14 +84,20 @@ const process_log = (msg: string) => process.send!({ log: msg });

function onMessage(ctrl: CtrlMsg) {
if (ctrl.cmd === "open") {
open(ctrl.path!).then(run);
open(ctrl.path);
} else if (ctrl.cmd === "run") {
run();
} else if (ctrl.cmd === "more") {
sleepFor = 1;
quota = DEFAULT_QUOTA;
} else if (ctrl.cmd === "seek") {
if (global.state === State.Running) {
process_log(`seek ${ctrl.nth}`);
seek(ctrl.nth!).then(run);
process_log(`seek ${ctrl.nth}`);
if (global.state === State.Init) {
untilInit().then(() => seek(ctrl.nth)).then(run);
} else if (global.state === State.Ready) {
seek(ctrl.nth).then(run);
} else if (global.state === State.Running) {
preseek().then(() => seek(ctrl.nth)).then(run);
} else {
process.send!({ error: "Not seekable" }); global.error = true; return;
}
Expand All @@ -98,6 +123,7 @@ async function open(path: string) {
if (source instanceof FileErr) { process.send!({ error: "Failed to read file header" }); global.error = true; return; }
global.source = source;
process.send!({ header: source.fileHeader().toJson() });
global.state = State.Ready as State;
} catch (e) {
process.send!({ error: `Failed to open file: ${e}` }); global.error = true; return;
}
Expand All @@ -107,6 +133,9 @@ async function run() {
if (global.error) {
return;
}
if (global.state === State.Init) {
await untilInit();
}
global.state = State.Running as State;
const source = global.source!;
const batchSize = 100;
Expand Down Expand Up @@ -141,31 +170,50 @@ async function run() {
await source.close();
}

async function seek(nth: number) {
async function untilInit() {
if (global.error) {
return;
}
if (global.state !== State.Init) {
process.send!({ error: "Not Init?" }); global.error = true; return;
}
while (global.state === State.Init) { await sleep(1); }
}

async function preseek() {
if (global.error) {
return;
}
if (global.state !== State.Running) {
process.send!({ error: "Not Running?" }); global.error = true; return;
}
global.state = State.PreSeek as State;
while (global.state === State.PreSeek) { await sleep(10); }
if (global.state === State.Seeking) {
const source = global.source!;
await source.rewind(new SeqPos.At(BigInt(nth)));
process_log("rewinded");
const payload = new Buffer();
payload.append(SystemBuffer.from(PULSE_MESSAGE));
const pulse = new Message(new MessageHeader(
new StreamKey(SEA_STREAMER_INTERNAL),
new ShardId(0n),
new SeqNo(0n),
new Date(),
), payload);
process.send!({ messages: [pulse], status: getStatus() });
quota = DEFAULT_QUOTA;
} else {
process.send!({ error: "Not seeking?" }); global.error = true; return;
if (global.state !== State.Seeking) {
process.send!({ error: "Not Seeking?" }); global.error = true; return;
}
}

async function seek(nth: number) {
if (global.error) {
return;
}
global.state = State.Seeking as State;
const source = global.source!;
await source.rewind(new SeqPos.At(BigInt(nth)));
process_log("rewinded");
const payload = new Buffer();
payload.append(SystemBuffer.from(PULSE_MESSAGE));
const pulse = new Message(new MessageHeader(
new StreamKey(SEA_STREAMER_INTERNAL),
new ShardId(0n),
new SeqNo(0n),
new Date(),
), payload);
process.send!({ messages: [pulse], status: getStatus() });
quota = DEFAULT_QUOTA;
}

function getStatus(): StatusUpdate {
return {
fileSize: global.source!.knownSize(),
Expand Down

0 comments on commit 94cf111

Please sign in to comment.