Skip to content

Commit

Permalink
use p-ratelimit for event limiting
Browse files Browse the repository at this point in the history
  • Loading branch information
Bossett committed Sep 4, 2024
1 parent 2c5a539 commit 6d7f44f
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 11 deletions.
10 changes: 0 additions & 10 deletions src/subscription.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,6 @@ export class FirehoseSubscription extends FirehoseSubscriptionBase {

if (!ops) return

const delay = (ms: number) => new Promise((res) => setTimeout(res, ms))

while (this.runningEvents > 128) {
await delay(1000)
}

this.runningEvents++

const postsToDelete = ops.posts.deletes.map((del) => del.uri)

// Transform posts in parallel
Expand Down Expand Up @@ -134,7 +126,5 @@ export class FirehoseSubscription extends FirehoseSubscriptionBase {
await this.db.replaceOneURI('post', to_insert.uri, to_insert)
})
}

this.runningEvents--
}
}
11 changes: 10 additions & 1 deletion src/util/subscription.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,15 @@ import { Database } from '../db'

const includedRecords = new Set(['app.bsky.feed.post'])

import { pRateLimit } from 'p-ratelimit'

const eventLimit = pRateLimit({
interval: undefined,
rate: undefined,
concurrency: 128,
maxDelay: undefined,
})

export abstract class FirehoseSubscriptionBase {
public sub: Subscription<RepoEvent>

Expand Down Expand Up @@ -42,7 +51,7 @@ export abstract class FirehoseSubscriptionBase {

if (includedRecords.has(collection)) {
try {
this.handleEvent(evt) // no longer awaiting this
await eventLimit(async () => this.handleEvent(evt)) // no longer awaiting this
} catch (err) {
console.error('repo subscription could not handle message', err)
}
Expand Down

0 comments on commit 6d7f44f

Please sign in to comment.