From 68ac504250540ef1da5a38e78007e44455e2849b Mon Sep 17 00:00:00 2001 From: Bossett Date: Tue, 31 Dec 2024 19:25:05 +0000 Subject: [PATCH] semaphore to control max events concurrent to manage CPU --- package.json | 1 + src/db/index.ts | 6 ++-- src/subscription.ts | 13 +-------- src/util/subscription.ts | 62 ++++++++++++++++++++++++++++------------ yarn.lock | 12 ++++++++ 5 files changed, 61 insertions(+), 33 deletions(-) diff --git a/package.json b/package.json index d11260fc..ec4ecd46 100644 --- a/package.json +++ b/package.json @@ -36,6 +36,7 @@ "@atproto/syntax": "^0.3.0", "@atproto/xrpc-server": "^0.6.3", "@skyware/jetstream": "^0.2.1", + "async-mutex": "^0.5.0", "dotenv": "^16.4.5", "express": "^4.18.2", "follow-redirects": "1.15.8", diff --git a/src/db/index.ts b/src/db/index.ts index a713e98b..8a8c60dd 100755 --- a/src/db/index.ts +++ b/src/db/index.ts @@ -1,3 +1,3 @@ -import dbSingleton from './dbClient' - -export type Database = typeof dbSingleton +import dbSingleton from './dbClient' + +export type Database = typeof dbSingleton diff --git a/src/subscription.ts b/src/subscription.ts index 13b736fd..290cda6d 100644 --- a/src/subscription.ts +++ b/src/subscription.ts @@ -1,9 +1,4 @@ -import { - OutputSchema as RepoEvent, - isCommit, -} from './lexicon/types/com/atproto/sync/subscribeRepos' -import { FirehoseSubscriptionBase, getOpsByType } from './util/subscription' -import dotenv from 'dotenv' +import { FirehoseSubscriptionBase } from './util/subscription' import algos from './algos' import batchUpdate from './addn/batchUpdate' @@ -11,7 +6,6 @@ import batchUpdate from './addn/batchUpdate' import { Database } from './db' import crypto from 'crypto' -import { Post } from './db/schema' import { BskyAgent } from '@atproto/api' export class FirehoseSubscription extends FirehoseSubscriptionBase { @@ -28,11 +22,6 @@ export class FirehoseSubscription extends FirehoseSubscriptionBase { const agent = new BskyAgent({ service: 'https://public.api.bsky.app' }) - dotenv.config() - const handle = `${process.env.FEEDGEN_HANDLE}` - const password = `${process.env.FEEDGEN_PASSWORD}` - - //agent.login({ identifier: handle, password: password }).then(async () => { batchUpdate(agent, 5 * 60 * 1000) Object.keys(algos).forEach((algo) => { diff --git a/src/util/subscription.ts b/src/util/subscription.ts index f71bc414..67bab7f5 100755 --- a/src/util/subscription.ts +++ b/src/util/subscription.ts @@ -1,4 +1,3 @@ -import { Subscription } from '@atproto/xrpc-server' import { cborToLexRecord, readCar } from '@atproto/repo' import { BlobRef } from '@atproto/lexicon' import { ids, lexicons } from '../lexicon/lexicons' @@ -6,16 +5,16 @@ import { Record as PostRecord } from '../lexicon/types/app/bsky/feed/post' import { Record as RepostRecord } from '../lexicon/types/app/bsky/feed/repost' import { Record as LikeRecord } from '../lexicon/types/app/bsky/feed/like' import { Record as FollowRecord } from '../lexicon/types/app/bsky/graph/follow' -import { - Commit, - OutputSchema as RepoEvent, - isCommit, -} from '../lexicon/types/com/atproto/sync/subscribeRepos' +import { Commit } from '../lexicon/types/com/atproto/sync/subscribeRepos' import { Database } from '../db' -import { Jetstream, CommitType, CommitCreate } from '@skyware/jetstream' +import { Jetstream, CommitType } from '@skyware/jetstream' import WebSocket from 'ws' +import { Semaphore } from 'async-mutex' + +const semaphore = new Semaphore(48) + const includedRecords = new Set(['app.bsky.feed.post']) export abstract class FirehoseSubscriptionBase { @@ -26,8 +25,8 @@ export abstract class FirehoseSubscriptionBase { abstract handleEvent(evt: any): Promise async run(subscriptionReconnectDelay: number) { - let handledEvents = 0 let lastSuccessfulCursor = (await this.getCursor()).cursor + const eventQueue: any[] = [] this.jetstream = new Jetstream({ wantedCollections: Array.from(includedRecords.values()), @@ -38,6 +37,10 @@ export abstract class FirehoseSubscriptionBase { this.jetstream.start() this.jetstream.on('commit', (event) => { + eventQueue.push(event) + }) + + const processEvent = async (event: any) => { const posts = { cursor: event.time_us, creates: [] as { @@ -63,18 +66,41 @@ export abstract class FirehoseSubscriptionBase { }) } - if (handledEvents >= 1000) { - if (lastSuccessfulCursor) this.updateCursor(lastSuccessfulCursor) - handledEvents = 0 - } + return posts + } - if (posts.creates.length + posts.deletes.length > 0) { - this.handleEvent(posts).then(() => { - lastSuccessfulCursor = posts.cursor - handledEvents++ - }) + const processQueue = async () => { + let handledEvents = 0 + let lastSuccessfulCursor = null + + while (true) { + while (eventQueue.length === 0) { + await new Promise((resolve) => setTimeout(resolve, 1000)) + } + + const event = eventQueue.shift() + const posts = await processEvent(event) + if (handledEvents >= 1000) { + if (lastSuccessfulCursor) this.updateCursor(lastSuccessfulCursor) + handledEvents = 0 + } + + if (posts.creates.length + posts.deletes.length > 0) { + await semaphore.acquire().then(async ([value, release]) => { + this.handleEvent(posts) + .then(() => { + lastSuccessfulCursor = posts.cursor + handledEvents++ + }) + .finally(() => { + release() + }) + }) + } } - }) + } + + processQueue() } async updateCursor(cursor: number) { diff --git a/yarn.lock b/yarn.lock index 5f8f4e67..476bb770 100644 --- a/yarn.lock +++ b/yarn.lock @@ -468,6 +468,13 @@ array-union@^2.1.0: resolved "https://registry.yarnpkg.com/array-union/-/array-union-2.1.0.tgz#b798420adbeb1de828d84acd8a2e23d3efe85e8d" integrity sha512-HGyxoOTYUyCM6stUe6EJgnd4EoewAI7zMdfqO+kGjnlZmBDz/cR5pf8r/cR4Wq60sL/p0IkcjUEEPwS3GFrIyw== +async-mutex@^0.5.0: + version "0.5.0" + resolved "https://registry.yarnpkg.com/async-mutex/-/async-mutex-0.5.0.tgz#353c69a0b9e75250971a64ac203b0ebfddd75482" + integrity sha512-1A94B18jkJ3DYq284ohPxoXbfTA5HsQ7/Mf4DEhcyLx3Bz27Rh59iScbB6EPiP+B+joue6YCxcMXSbFC1tZKwA== + dependencies: + tslib "^2.4.0" + asynckit@^0.4.0: version "0.4.0" resolved "https://registry.yarnpkg.com/asynckit/-/asynckit-0.4.0.tgz#c79ed97f7f34cb8f2ba1bc9790bcc366474b4b79" @@ -1464,6 +1471,11 @@ tsc-alias@1.8.10: normalize-path "^3.0.0" plimit-lit "^1.2.6" +tslib@^2.4.0: + version "2.8.1" + resolved "https://registry.yarnpkg.com/tslib/-/tslib-2.8.1.tgz#612efe4ed235d567e8aba5f2a5fab70280ade83f" + integrity sha512-oJFu94HQb+KVduSUQL7wnpmqnfmLsOA/nAh6b6EH0wCEoK0/mPeXU6c3wKDV83MkOuHPRHtSXKKU99IBazS/2w== + tsx@^4.7.1: version "4.19.0" resolved "https://registry.yarnpkg.com/tsx/-/tsx-4.19.0.tgz#6166cb399b17d14d125e6158d23384045cfdf4f6"