Skip to content

Commit

Permalink
update queue
Browse files Browse the repository at this point in the history
  • Loading branch information
Bossett committed Sep 4, 2024
1 parent e5ab963 commit 3b843fa
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 52 deletions.
101 changes: 51 additions & 50 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,50 +1,51 @@
{
"name": "bossett-bsky-feeds",
"version": "2.1.0",
"description": "basic bsky feedgen",
"main": "index.js",
"repository": "[email protected]:bossett/bsky-feeds.git",
"author": "Bossett <[email protected]>",
"license": "MIT",
"scripts": {
"publishKeyboards": "tsx scripts/publishKeyboards.ts",
"publishCats": "tsx scripts/publishCats.ts",
"publishScience": "tsx scripts/publishFeedGen.ts",
"publishAuspol": "tsx scripts/publishAuspol.ts",
"publishDads": "tsx scripts/publishDads.ts",
"publishDadsMedia": "tsx scripts/publishDadsMedia.ts",
"publish18plusND": "tsx scripts/publish18plusND.ts",
"publishND": "tsx scripts/publishND.ts",
"publishDiscourse": "tsx scripts/publishDiscourse.ts",
"publishElusive": "tsx scripts/publishElusive.ts",
"publishOverheard": "tsx scripts/publishOverheard.ts",
"publishPAXAus": "tsx scripts/publishPAXAus.ts",
"publishExternal": "tsx scripts/publishExternal.ts",
"publishWords": "tsx scripts/publishWords.ts",
"addSelfToList": "tsx scripts/addSelfToList.ts",
"start": "tsx ./src/index.ts"
},
"dependencies": {
"@atproto/api": "^0.11.2",
"@atproto/identity": "^0.3.2",
"@atproto/lexicon": "^0.3.1",
"@atproto/repo": "^0.3.6",
"@atproto/syntax": "^0.1.5",
"@atproto/xrpc-server": "^0.4.2",
"dotenv": "^16.4.5",
"express": "^4.18.2",
"moize": "^6.1.6",
"mongodb": "^6.3.0",
"multiformats": "^9.9.0",
"node-fetch-native": "^1.6.2",
"p-ratelimit": "^1.0.1",
"semver": "^7.6.0",
"follow-redirects": "1.15.6"
},
"devDependencies": {
"@types/express": "^4.17.21",
"@types/node": "^20.11.28",
"tsx": "^4.7.1",
"typescript": "^5.4.2"
}
}
{
"name": "bossett-bsky-feeds",
"version": "2.1.0",
"description": "basic bsky feedgen",
"main": "index.js",
"repository": "[email protected]:bossett/bsky-feeds.git",
"author": "Bossett <[email protected]>",
"license": "MIT",
"scripts": {
"publishKeyboards": "tsx scripts/publishKeyboards.ts",
"publishCats": "tsx scripts/publishCats.ts",
"publishScience": "tsx scripts/publishFeedGen.ts",
"publishAuspol": "tsx scripts/publishAuspol.ts",
"publishDads": "tsx scripts/publishDads.ts",
"publishDadsMedia": "tsx scripts/publishDadsMedia.ts",
"publish18plusND": "tsx scripts/publish18plusND.ts",
"publishND": "tsx scripts/publishND.ts",
"publishDiscourse": "tsx scripts/publishDiscourse.ts",
"publishElusive": "tsx scripts/publishElusive.ts",
"publishOverheard": "tsx scripts/publishOverheard.ts",
"publishPAXAus": "tsx scripts/publishPAXAus.ts",
"publishExternal": "tsx scripts/publishExternal.ts",
"publishWords": "tsx scripts/publishWords.ts",
"addSelfToList": "tsx scripts/addSelfToList.ts",
"start": "tsx ./src/index.ts"
},
"dependencies": {
"@atproto/api": "^0.11.2",
"@atproto/identity": "^0.3.2",
"@atproto/lexicon": "^0.3.1",
"@atproto/repo": "^0.3.6",
"@atproto/syntax": "^0.1.5",
"@atproto/xrpc-server": "^0.4.2",
"async-mutex": "^0.5.0",
"dotenv": "^16.4.5",
"express": "^4.18.2",
"follow-redirects": "1.15.6",
"moize": "^6.1.6",
"mongodb": "^6.3.0",
"multiformats": "^9.9.0",
"node-fetch-native": "^1.6.2",
"p-ratelimit": "^1.0.1",
"semver": "^7.6.0"
},
"devDependencies": {
"@types/express": "^4.17.21",
"@types/node": "^20.11.28",
"tsx": "^4.7.1",
"typescript": "^5.4.2"
}
}
14 changes: 12 additions & 2 deletions src/util/subscription.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import {
} from '../lexicon/types/com/atproto/sync/subscribeRepos'
import { Database } from '../db'

import { Mutex } from 'async-mutex'

const includedRecords = new Set(['app.bsky.feed.post'])

export abstract class FirehoseSubscriptionBase {
Expand All @@ -33,6 +35,7 @@ export abstract class FirehoseSubscriptionBase {

async run(subscriptionReconnectDelay: number) {
const delay = (ms: number) => new Promise((res) => setTimeout(res, ms))
const mutex = new Mutex()

let runningEvents = 0

Expand All @@ -47,8 +50,15 @@ export abstract class FirehoseSubscriptionBase {
if (includedRecords.has(collection)) {
while (runningEvents > 128) delay(1000)

runningEvents++
this.handleEvent(evt).finally(() => runningEvents--)
await mutex.runExclusive(async () => {
runningEvents++
})

this.handleEvent(evt).finally(async () => {
await mutex.runExclusive(async () => {
runningEvents--
})
})

// no longer awaiting this
}
Expand Down
12 changes: 12 additions & 0 deletions yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,13 @@ [email protected]:
resolved "https://registry.yarnpkg.com/array-flatten/-/array-flatten-1.1.1.tgz#9a5f699051b1e7073328f2a008968b64ea2955d2"
integrity sha512-PCVAQswWemu6UdxsDFFX/+gVeYqKAod3D3UVm91jHwynguOwAvYPhx8nNlM++NqRcK6CxxpUafjmhIdKiHibqg==

async-mutex@^0.5.0:
version "0.5.0"
resolved "https://registry.yarnpkg.com/async-mutex/-/async-mutex-0.5.0.tgz#353c69a0b9e75250971a64ac203b0ebfddd75482"
integrity sha512-1A94B18jkJ3DYq284ohPxoXbfTA5HsQ7/Mf4DEhcyLx3Bz27Rh59iScbB6EPiP+B+joue6YCxcMXSbFC1tZKwA==
dependencies:
tslib "^2.4.0"

asynckit@^0.4.0:
version "0.4.0"
resolved "https://registry.yarnpkg.com/asynckit/-/asynckit-0.4.0.tgz#c79ed97f7f34cb8f2ba1bc9790bcc366474b4b79"
Expand Down Expand Up @@ -1205,6 +1212,11 @@ tslib@^2.1.0:
resolved "https://registry.yarnpkg.com/tslib/-/tslib-2.6.2.tgz#703ac29425e7b37cd6fd456e92404d46d1f3e4ae"
integrity sha512-AEYxH93jGFPn/a2iVAwW87VuUIkR1FVUKB77NwMF7nBTDkDrrT/Hpt/IrCJ0QXhW27jTBDcf5ZY7w6RiqTMw2Q==

tslib@^2.4.0:
version "2.7.0"
resolved "https://registry.yarnpkg.com/tslib/-/tslib-2.7.0.tgz#d9b40c5c40ab59e8738f297df3087bf1a2690c01"
integrity sha512-gLXCKdN1/j47AiHiOkJN69hJmcbGTHI0ImLmbYLHykhgeN0jVGola9yVjFgzCUklsZQMW55o+dW7IXv3RCXDzA==

tsx@^4.7.1:
version "4.7.1"
resolved "https://registry.yarnpkg.com/tsx/-/tsx-4.7.1.tgz#27af6cbf4e1cdfcb9b5425b1c61bb7e668eb5e84"
Expand Down

0 comments on commit 3b843fa

Please sign in to comment.