From 6d7f44f83c7109ffd4ba2f8630c09a9da6cf50c9 Mon Sep 17 00:00:00 2001 From: Bossett Date: Wed, 4 Sep 2024 23:30:16 +0000 Subject: [PATCH] use p-ratelimit for event limiting --- src/subscription.ts | 10 ---------- src/util/subscription.ts | 11 ++++++++++- 2 files changed, 10 insertions(+), 11 deletions(-) diff --git a/src/subscription.ts b/src/subscription.ts index 71004d4b..1a568f25 100644 --- a/src/subscription.ts +++ b/src/subscription.ts @@ -66,14 +66,6 @@ export class FirehoseSubscription extends FirehoseSubscriptionBase { if (!ops) return - const delay = (ms: number) => new Promise((res) => setTimeout(res, ms)) - - while (this.runningEvents > 128) { - await delay(1000) - } - - this.runningEvents++ - const postsToDelete = ops.posts.deletes.map((del) => del.uri) // Transform posts in parallel @@ -134,7 +126,5 @@ export class FirehoseSubscription extends FirehoseSubscriptionBase { await this.db.replaceOneURI('post', to_insert.uri, to_insert) }) } - - this.runningEvents-- } } diff --git a/src/util/subscription.ts b/src/util/subscription.ts index 6bb391e8..85a18019 100755 --- a/src/util/subscription.ts +++ b/src/util/subscription.ts @@ -15,6 +15,15 @@ import { Database } from '../db' const includedRecords = new Set(['app.bsky.feed.post']) +import { pRateLimit } from 'p-ratelimit' + +const eventLimit = pRateLimit({ + interval: undefined, + rate: undefined, + concurrency: 128, + maxDelay: undefined, +}) + export abstract class FirehoseSubscriptionBase { public sub: Subscription @@ -42,7 +51,7 @@ export abstract class FirehoseSubscriptionBase { if (includedRecords.has(collection)) { try { - this.handleEvent(evt) // no longer awaiting this + await eventLimit(async () => this.handleEvent(evt)) // no longer awaiting this } catch (err) { console.error('repo subscription could not handle message', err) }