Skip to content

Commit

Permalink
semaphore to control max events concurrent to manage CPU
Browse files Browse the repository at this point in the history
  • Loading branch information
Bossett committed Dec 31, 2024
1 parent 130204e commit 68ac504
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 33 deletions.
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
"@atproto/syntax": "^0.3.0",
"@atproto/xrpc-server": "^0.6.3",
"@skyware/jetstream": "^0.2.1",
"async-mutex": "^0.5.0",
"dotenv": "^16.4.5",
"express": "^4.18.2",
"follow-redirects": "1.15.8",
Expand Down
6 changes: 3 additions & 3 deletions src/db/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
import dbSingleton from './dbClient'

export type Database = typeof dbSingleton
import dbSingleton from './dbClient'

export type Database = typeof dbSingleton
13 changes: 1 addition & 12 deletions src/subscription.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,11 @@
import {
OutputSchema as RepoEvent,
isCommit,
} from './lexicon/types/com/atproto/sync/subscribeRepos'
import { FirehoseSubscriptionBase, getOpsByType } from './util/subscription'
import dotenv from 'dotenv'
import { FirehoseSubscriptionBase } from './util/subscription'

import algos from './algos'
import batchUpdate from './addn/batchUpdate'

import { Database } from './db'

import crypto from 'crypto'
import { Post } from './db/schema'
import { BskyAgent } from '@atproto/api'

export class FirehoseSubscription extends FirehoseSubscriptionBase {
Expand All @@ -28,11 +22,6 @@ export class FirehoseSubscription extends FirehoseSubscriptionBase {

const agent = new BskyAgent({ service: 'https://public.api.bsky.app' })

dotenv.config()
const handle = `${process.env.FEEDGEN_HANDLE}`
const password = `${process.env.FEEDGEN_PASSWORD}`

//agent.login({ identifier: handle, password: password }).then(async () => {
batchUpdate(agent, 5 * 60 * 1000)

Object.keys(algos).forEach((algo) => {
Expand Down
62 changes: 44 additions & 18 deletions src/util/subscription.ts
Original file line number Diff line number Diff line change
@@ -1,21 +1,20 @@
import { Subscription } from '@atproto/xrpc-server'
import { cborToLexRecord, readCar } from '@atproto/repo'
import { BlobRef } from '@atproto/lexicon'
import { ids, lexicons } from '../lexicon/lexicons'
import { Record as PostRecord } from '../lexicon/types/app/bsky/feed/post'
import { Record as RepostRecord } from '../lexicon/types/app/bsky/feed/repost'
import { Record as LikeRecord } from '../lexicon/types/app/bsky/feed/like'
import { Record as FollowRecord } from '../lexicon/types/app/bsky/graph/follow'
import {
Commit,
OutputSchema as RepoEvent,
isCommit,
} from '../lexicon/types/com/atproto/sync/subscribeRepos'
import { Commit } from '../lexicon/types/com/atproto/sync/subscribeRepos'
import { Database } from '../db'

import { Jetstream, CommitType, CommitCreate } from '@skyware/jetstream'
import { Jetstream, CommitType } from '@skyware/jetstream'
import WebSocket from 'ws'

import { Semaphore } from 'async-mutex'

const semaphore = new Semaphore(48)

const includedRecords = new Set(['app.bsky.feed.post'])

export abstract class FirehoseSubscriptionBase {
Expand All @@ -26,8 +25,8 @@ export abstract class FirehoseSubscriptionBase {
abstract handleEvent(evt: any): Promise<void>

async run(subscriptionReconnectDelay: number) {
let handledEvents = 0
let lastSuccessfulCursor = (await this.getCursor()).cursor
const eventQueue: any[] = []

this.jetstream = new Jetstream({
wantedCollections: Array.from(includedRecords.values()),
Expand All @@ -38,6 +37,10 @@ export abstract class FirehoseSubscriptionBase {
this.jetstream.start()

this.jetstream.on('commit', (event) => {
eventQueue.push(event)
})

const processEvent = async (event: any) => {
const posts = {
cursor: event.time_us,
creates: [] as {
Expand All @@ -63,18 +66,41 @@ export abstract class FirehoseSubscriptionBase {
})
}

if (handledEvents >= 1000) {
if (lastSuccessfulCursor) this.updateCursor(lastSuccessfulCursor)
handledEvents = 0
}
return posts
}

if (posts.creates.length + posts.deletes.length > 0) {
this.handleEvent(posts).then(() => {
lastSuccessfulCursor = posts.cursor
handledEvents++
})
const processQueue = async () => {
let handledEvents = 0
let lastSuccessfulCursor = null

while (true) {
while (eventQueue.length === 0) {
await new Promise((resolve) => setTimeout(resolve, 1000))
}

const event = eventQueue.shift()
const posts = await processEvent(event)
if (handledEvents >= 1000) {
if (lastSuccessfulCursor) this.updateCursor(lastSuccessfulCursor)
handledEvents = 0
}

if (posts.creates.length + posts.deletes.length > 0) {
await semaphore.acquire().then(async ([value, release]) => {
this.handleEvent(posts)
.then(() => {
lastSuccessfulCursor = posts.cursor
handledEvents++
})
.finally(() => {
release()
})
})
}
}
})
}

processQueue()
}

async updateCursor(cursor: number) {
Expand Down
12 changes: 12 additions & 0 deletions yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -468,6 +468,13 @@ array-union@^2.1.0:
resolved "https://registry.yarnpkg.com/array-union/-/array-union-2.1.0.tgz#b798420adbeb1de828d84acd8a2e23d3efe85e8d"
integrity sha512-HGyxoOTYUyCM6stUe6EJgnd4EoewAI7zMdfqO+kGjnlZmBDz/cR5pf8r/cR4Wq60sL/p0IkcjUEEPwS3GFrIyw==

async-mutex@^0.5.0:
version "0.5.0"
resolved "https://registry.yarnpkg.com/async-mutex/-/async-mutex-0.5.0.tgz#353c69a0b9e75250971a64ac203b0ebfddd75482"
integrity sha512-1A94B18jkJ3DYq284ohPxoXbfTA5HsQ7/Mf4DEhcyLx3Bz27Rh59iScbB6EPiP+B+joue6YCxcMXSbFC1tZKwA==
dependencies:
tslib "^2.4.0"

asynckit@^0.4.0:
version "0.4.0"
resolved "https://registry.yarnpkg.com/asynckit/-/asynckit-0.4.0.tgz#c79ed97f7f34cb8f2ba1bc9790bcc366474b4b79"
Expand Down Expand Up @@ -1464,6 +1471,11 @@ [email protected]:
normalize-path "^3.0.0"
plimit-lit "^1.2.6"

tslib@^2.4.0:
version "2.8.1"
resolved "https://registry.yarnpkg.com/tslib/-/tslib-2.8.1.tgz#612efe4ed235d567e8aba5f2a5fab70280ade83f"
integrity sha512-oJFu94HQb+KVduSUQL7wnpmqnfmLsOA/nAh6b6EH0wCEoK0/mPeXU6c3wKDV83MkOuHPRHtSXKKU99IBazS/2w==

tsx@^4.7.1:
version "4.19.0"
resolved "https://registry.yarnpkg.com/tsx/-/tsx-4.19.0.tgz#6166cb399b17d14d125e6158d23384045cfdf4f6"
Expand Down

0 comments on commit 68ac504

Please sign in to comment.