Skip to content

Commit

Permalink
reorder queue logic
Browse files Browse the repository at this point in the history
  • Loading branch information
Bossett committed Sep 5, 2024
1 parent b54d958 commit ddb3656
Showing 1 changed file with 10 additions and 7 deletions.
17 changes: 10 additions & 7 deletions src/util/subscription.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import { Mutex } from 'async-mutex'

const includedRecords = new Set(['app.bsky.feed.post'])
let runningEvents = 0
const mutex = new Mutex()
const delay = (ms: number) => new Promise((res) => setTimeout(res, ms))

export abstract class FirehoseSubscriptionBase {
public sub: Subscription<RepoEvent>
Expand All @@ -35,9 +37,6 @@ export abstract class FirehoseSubscriptionBase {
abstract handleEvent(evt: RepoEvent): Promise<void>

async run(subscriptionReconnectDelay: number) {
const delay = (ms: number) => new Promise((res) => setTimeout(res, ms))
const mutex = new Mutex()

await mutex.runExclusive(() => (runningEvents = 0))

try {
Expand All @@ -57,11 +56,15 @@ export abstract class FirehoseSubscriptionBase {
runningEvents++
})

this.handleEvent(evt).finally(async () => {
await mutex.runExclusive(async () => {
if (runningEvents > 0) runningEvents--
this.handleEvent(evt)
.catch((err) => {
console.log(`err in handleEvent ${err}`)
})
.finally(async () => {
await mutex.runExclusive(async () => {
if (runningEvents > 0) runningEvents--
})
})
})

// no longer awaiting this
}
Expand Down

0 comments on commit ddb3656

Please sign in to comment.