Skip to content

Commit

Permalink
fix(proc): behave correctly on numIdleProcesses: 0 (#142)
Browse files Browse the repository at this point in the history
  • Loading branch information
nbsp authored Nov 10, 2024
1 parent 0d6d5ea commit d2f1ef9
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 11 deletions.
5 changes: 5 additions & 0 deletions .changeset/funny-plants-taste.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@livekit/agents": patch
---

fix(proc): behave correctly on numIdleProcesses: 0
34 changes: 23 additions & 11 deletions agents/src/ipc/proc_pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ export class ProcPool {
closed = false;
controller = new AbortController();
initMutex = new Mutex();
procMutex: MultiMutex;
procMutex?: MultiMutex;
procUnlock?: () => void;
warmedProcQueue = new Queue<JobExecutor>();

Expand All @@ -28,7 +28,9 @@ export class ProcPool {
closeTimeout: number,
) {
this.agent = agent;
this.procMutex = new MultiMutex(numIdleProcesses);
if (numIdleProcesses > 0) {
this.procMutex = new MultiMutex(numIdleProcesses);
}
this.initializeTimeout = initializeTimeout;
this.closeTimeout = closeTimeout;
}
Expand All @@ -42,10 +44,18 @@ export class ProcPool {
}

async launchJob(info: RunningJobInfo) {
const proc = await this.warmedProcQueue.get();
if (this.procUnlock) {
this.procUnlock();
this.procUnlock = undefined;
let proc: JobExecutor;
if (this.procMutex) {
proc = await this.warmedProcQueue.get();
if (this.procUnlock) {
this.procUnlock();
this.procUnlock = undefined;
}
} else {
proc = new ProcJobExecutor(this.agent, this.initializeTimeout, this.closeTimeout);
this.executors.push(proc);
await proc.start();
await proc.initialize();
}
await proc.launchJob(info);
}
Expand Down Expand Up @@ -89,11 +99,13 @@ export class ProcPool {
}

async run(signal: AbortSignal) {
while (!signal.aborted) {
this.procUnlock = await this.procMutex.lock();
const task = this.procWatchTask();
this.tasks.push(task);
task.finally(() => this.tasks.splice(this.tasks.indexOf(task)));
if (this.procMutex) {
while (!signal.aborted) {
this.procUnlock = await this.procMutex.lock();
const task = this.procWatchTask();
this.tasks.push(task);
task.finally(() => this.tasks.splice(this.tasks.indexOf(task)));
}
}
}

Expand Down

0 comments on commit d2f1ef9

Please sign in to comment.