From b54d958fc3722725dbc70cf350604e7af7d733bd Mon Sep 17 00:00:00 2001 From: Bossett Date: Thu, 5 Sep 2024 02:23:12 +0000 Subject: [PATCH] move process limiter --- src/util/subscription.ts | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/util/subscription.ts b/src/util/subscription.ts index fdaa1e0..74b8747 100755 --- a/src/util/subscription.ts +++ b/src/util/subscription.ts @@ -16,6 +16,7 @@ import { Database } from '../db' import { Mutex } from 'async-mutex' const includedRecords = new Set(['app.bsky.feed.post']) +let runningEvents = 0 export abstract class FirehoseSubscriptionBase { public sub: Subscription @@ -37,7 +38,7 @@ export abstract class FirehoseSubscriptionBase { const delay = (ms: number) => new Promise((res) => setTimeout(res, ms)) const mutex = new Mutex() - let runningEvents = 0 + await mutex.runExclusive(() => (runningEvents = 0)) try { for await (const evt of this.sub) { @@ -58,7 +59,7 @@ export abstract class FirehoseSubscriptionBase { this.handleEvent(evt).finally(async () => { await mutex.runExclusive(async () => { - runningEvents-- + if (runningEvents > 0) runningEvents-- }) })