From 15e1c9f1282ebe7ebca229a6c860e59fea521762 Mon Sep 17 00:00:00 2001 From: Bossett Date: Tue, 31 Dec 2024 10:00:40 +0000 Subject: [PATCH] The Jetstream Update --- package.json | 5 +- src/addn/rateLimit.ts | 4 +- src/algos/cats.ts | 5 +- src/algos/keyboards.ts | 6 +- src/algos/paxaus.ts | 16 +++-- src/algos/twelve-words.ts | 10 ++- src/subscription.ts | 22 ++----- src/util/subscription.ts | 134 ++++++++++++++------------------------ yarn.lock | 32 ++++++++- 9 files changed, 115 insertions(+), 119 deletions(-) diff --git a/package.json b/package.json index c56eec6..d11260f 100644 --- a/package.json +++ b/package.json @@ -28,12 +28,14 @@ "build": "tsc && tsc-alias" }, "dependencies": { + "@atcute/client": "^2.0.6", "@atproto/api": "^0.13.5", "@atproto/identity": "^0.4.1", "@atproto/lexicon": "^0.4.1", "@atproto/repo": "^0.5.0", "@atproto/syntax": "^0.3.0", "@atproto/xrpc-server": "^0.6.3", + "@skyware/jetstream": "^0.2.1", "dotenv": "^16.4.5", "express": "^4.18.2", "follow-redirects": "1.15.8", @@ -41,7 +43,8 @@ "multiformats": "^9.9.0", "node-fetch-native": "^1.6.2", "p-ratelimit": "^1.0.1", - "semver": "^7.6.0" + "semver": "^7.6.0", + "ws": "^8.18.0" }, "devDependencies": { "@types/express": "^4.17.21", diff --git a/src/addn/rateLimit.ts b/src/addn/rateLimit.ts index bb53a89..22fda34 100644 --- a/src/addn/rateLimit.ts +++ b/src/addn/rateLimit.ts @@ -3,7 +3,7 @@ import { pRateLimit } from 'p-ratelimit' const _limit = pRateLimit({ interval: undefined, rate: undefined, - concurrency: undefined, + concurrency: 96, maxDelay: undefined, }) @@ -12,7 +12,7 @@ const limit = async (fn: () => Promise, retries = 3): Promise => { return await _limit(fn) } catch (error) { if (retries > 0) { - console.log(`retrying limited call:\n${fn.toString()}`) + console.log(`${error} - retrying limited call:\n${fn.toString()}`) const delay = (ms: number) => new Promise((res) => setTimeout(res, ms)) await delay(3000) return await limit(fn, retries - 1) diff --git a/src/algos/cats.ts b/src/algos/cats.ts index ad7e84d..8ecc22a 100644 --- a/src/algos/cats.ts +++ b/src/algos/cats.ts @@ -75,7 +75,6 @@ export class manager extends AlgoManager { if (this.agent === null) return false - let match = false let matchParts: string[] = [] if (post.embed?.images) { @@ -101,9 +100,9 @@ export class manager extends AlgoManager { const matchString = matchParts.join(' ') if (matchString.match(this.re) !== null) { - match = true + return true } - return match + return false } } diff --git a/src/algos/keyboards.ts b/src/algos/keyboards.ts index e84acbb..0530e5e 100644 --- a/src/algos/keyboards.ts +++ b/src/algos/keyboards.ts @@ -189,8 +189,6 @@ export class manager extends AlgoManager { } if (this.agent === null) return false - let match = false - let matchString = '' if (post.embed?.images) { @@ -213,9 +211,9 @@ export class manager extends AlgoManager { matchString = `${post.text} ${matchString}`.replace('\n', ' ') if (matchString.match(this.re) !== null) { - match = true + return true } - return match + return false } } diff --git a/src/algos/paxaus.ts b/src/algos/paxaus.ts index aeb5d4f..473b7a7 100644 --- a/src/algos/paxaus.ts +++ b/src/algos/paxaus.ts @@ -4,6 +4,8 @@ import { AlgoManager } from '../addn/algoManager' import dotenv from 'dotenv' import { Post } from '../db/schema' import dbClient from '../db/dbClient' +import { Database } from '../db' +import { BskyAgent } from '@atproto/api' dotenv.config() @@ -38,10 +40,16 @@ export class manager extends AlgoManager { public matchTerms: string[] = ['pax aus', 'paxaus', 'pax australia'] - public re = new RegExp( - `^.*\\b(${this.matchTerms.join('|')})(es|s)?\\b.*$`, - 'ims', - ) + public re: RegExp + + constructor(db: Database, agent: BskyAgent) { + super(db, agent) + + this.re = new RegExp( + `^.*\\b(${this.matchTerms.join('|')})(es|s)?\\b.*$`, + 'ims', + ) + } public async periodicTask() { await this.db.removeTagFromOldPosts( diff --git a/src/algos/twelve-words.ts b/src/algos/twelve-words.ts index c891e70..92f8910 100644 --- a/src/algos/twelve-words.ts +++ b/src/algos/twelve-words.ts @@ -4,6 +4,8 @@ import { AlgoManager } from '../addn/algoManager' import dotenv from 'dotenv' import { Post } from '../db/schema' import dbClient from '../db/dbClient' +import { Database } from '../db' +import { BskyAgent } from '@atproto/api' dotenv.config() @@ -37,7 +39,13 @@ export const handler = async (ctx: AppContext, params: QueryParams) => { export class manager extends AlgoManager { public name: string = shortname - public re = new RegExp(`^.*\\btransition\\b.*\\b12 words\\b.*$`, 'ims') + public re: RegExp + + constructor(db: Database, agent: BskyAgent) { + super(db, agent) + + this.re = new RegExp(`^.*\\btransition\\b.*\\b12 words\\b.*$`, 'ims') + } public async periodicTask() { return diff --git a/src/subscription.ts b/src/subscription.ts index fc30749..a5ad557 100644 --- a/src/subscription.ts +++ b/src/subscription.ts @@ -54,25 +54,11 @@ export class FirehoseSubscription extends FirehoseSubscriptionBase { public authorList: string[] public intervalId: NodeJS.Timer - async handleEvent(evt: RepoEvent) { - if (!isCommit(evt)) return - - await Promise.all(this.algoManagers.map((manager) => manager.ready())) - - let ops: any - try { - ops = await getOpsByType(evt) - } catch (e) { - console.log(`core: error decoding ops ${e.message}`) - return - } - - if (!ops) return - - const postsToDelete = ops.posts.deletes.map((del) => del.uri) + async handleEvent(posts) { + const postsToDelete = posts.deletes.map((del) => del.uri) // Transform posts in parallel - const postsCreated = ops.posts.creates.map((create) => ({ + const postsCreated = posts.creates.map((create) => ({ _id: null, uri: create.uri, cid: create.cid, @@ -142,6 +128,6 @@ export class FirehoseSubscription extends FirehoseSubscriptionBase { }) } - Promise.all(dbOperations) + await Promise.all(dbOperations) } } diff --git a/src/util/subscription.ts b/src/util/subscription.ts index 0cc3b95..60921d4 100755 --- a/src/util/subscription.ts +++ b/src/util/subscription.ts @@ -13,102 +13,66 @@ import { } from '../lexicon/types/com/atproto/sync/subscribeRepos' import { Database } from '../db' +import { Jetstream, CommitType, CommitCreate } from '@skyware/jetstream' +import WebSocket from 'ws' + const includedRecords = new Set(['app.bsky.feed.post']) -class Semaphore { - private tasks: (() => void)[] = [] - private counter: number +export abstract class FirehoseSubscriptionBase { + public jetstream: Jetstream - constructor(maxConcurrent: number) { - this.counter = maxConcurrent - } + constructor(public db: Database, public service: string) {} - async acquire() { - if (this.counter > 0) { - this.counter-- - return - } - await new Promise((resolve) => this.tasks.push(resolve)) - } + abstract handleEvent(evt: any): Promise - release() { - this.counter++ - if (this.tasks.length > 0) { - const nextTask = this.tasks.shift() - if (nextTask) nextTask() - } - } -} + async run(subscriptionReconnectDelay: number) { + let handledEvents = 0 -export abstract class FirehoseSubscriptionBase { - public sub: Subscription - private eventQueue: RepoEvent[] = [] - private semaphore: Semaphore - - constructor(public db: Database, public service: string) { - this.sub = new Subscription({ - service: service, - method: ids.ComAtprotoSyncSubscribeRepos, - getParams: () => this.getCursor(), - validate: (value: unknown) => { - return value as RepoEvent - }, - heartbeatIntervalMs: 30000, + this.jetstream = new Jetstream({ + wantedCollections: Array.from(includedRecords.values()), + ws: WebSocket, + ...(await this.getCursor()), }) - this.semaphore = new Semaphore(8) - } - abstract handleEvent(evt: RepoEvent): Promise + this.jetstream.start() + + this.jetstream.on('commit', (event) => { + const posts = { + cursor: event.time_us, + creates: [] as { + uri: string + cid: string + author: string + record: any + }[], + deletes: [] as { uri: string }[], + } - async run(subscriptionReconnectDelay: number) { - let handledEvents = 0 - try { - for await (const evt of this.sub) { - const commit = evt as Commit - - if (Array.isArray(commit.ops) && commit.ops.length > 0) { - if (commit.blocks) { - const [collection] = commit.ops[0].path.split('/') - - if (includedRecords.has(collection)) { - handledEvents++ - this.eventQueue.push(evt) - if (this.eventQueue.length >= 10) { - await this.processEventQueue() - } - } - } - } - // update stored cursor every 1000 events or so - if (handledEvents > 1000 && Number.isInteger(commit.seq)) { - this.updateCursor(commit.seq).then(() => { - handledEvents = 0 - }) - } + if (event.commit.operation === CommitType.Create) { + posts.creates.push({ + uri: `at://${event.did}/${event.commit.collection}/${event.commit.rkey}`, + cid: event.commit.cid, + author: event.did, + record: event.commit.record, + }) + } + if (event.commit.operation === CommitType.Delete) { + posts.deletes.push({ + uri: `at://${event.did}/${event.commit.collection}/${event.commit.rkey}`, + }) + } + + handledEvents++ + if (handledEvents >= 1000) { + this.updateCursor(posts.cursor).then(() => { + handledEvents = 0 + }) } - } catch (err) { - console.error('repo subscription errored', err) - setTimeout( - () => this.run(subscriptionReconnectDelay), - subscriptionReconnectDelay, - ) - } - } - private async processEventQueue() { - const eventsToProcess = this.eventQueue.splice(0, 10) - await Promise.all( - eventsToProcess.map(async (evt) => { - await this.semaphore.acquire() - this.handleEvent(evt) - .catch((err) => { - console.log(`err in handleEvent ${err}`) - }) - .finally(() => { - this.semaphore.release() - }) - }), - ) + if (posts.creates.length > 0 || posts.deletes.length > 0) { + this.handleEvent(posts) + } + }) } async updateCursor(cursor: number) { diff --git a/yarn.lock b/yarn.lock index cad6f46..5f8f4e6 100644 --- a/yarn.lock +++ b/yarn.lock @@ -2,6 +2,16 @@ # yarn lockfile v1 +"@atcute/bluesky@^1.0.6": + version "1.0.11" + resolved "https://registry.yarnpkg.com/@atcute/bluesky/-/bluesky-1.0.11.tgz#0db494f4228a3a660ab4084f0a2e813fa9eef6f9" + integrity sha512-j4wLJIzKWh0gmQZ7s5svhEVRThlzqyfstbM0qM/H1Y+ZIQESx5HtVu+390zAWzZoEKZYSyTM9+ctY39OiEGDQw== + +"@atcute/client@^2.0.6": + version "2.0.6" + resolved "https://registry.yarnpkg.com/@atcute/client/-/client-2.0.6.tgz#5a2cc0e879c1b897e2770613c50836f5866f5800" + integrity sha512-mhdqEicGUx0s5HTFOLpz91rcLS9j/g63de0nmAqv7blhU3j+xBf4le54qr2YIXNfnReZI7EwLYLX/YIBez4LGA== + "@atproto/api@^0.13.5": version "0.13.6" resolved "https://registry.yarnpkg.com/@atproto/api/-/api-0.13.6.tgz#2500e9d7143e6718089632300c42ce50149f8cd5" @@ -319,6 +329,14 @@ "@nodelib/fs.scandir" "2.1.5" fastq "^1.6.0" +"@skyware/jetstream@^0.2.1": + version "0.2.1" + resolved "https://registry.yarnpkg.com/@skyware/jetstream/-/jetstream-0.2.1.tgz#23caa9601c919092ef92b0f67729b923e00fc25b" + integrity sha512-qmQkBnMYG3+XBTLUDUKTWMS0QpwCFSZh66fvQRn+xEqUQ2CXB2ELo4El0tgVvdT4+glk4nfzVG45L6Op9VURow== + dependencies: + "@atcute/bluesky" "^1.0.6" + partysocket "^1.0.2" + "@types/body-parser@*": version "1.19.5" resolved "https://registry.yarnpkg.com/@types/body-parser/-/body-parser-1.19.5.tgz#04ce9a3b677dc8bd681a17da1ab9835dc9d3ede4" @@ -727,6 +745,11 @@ event-target-shim@^5.0.0: resolved "https://registry.yarnpkg.com/event-target-shim/-/event-target-shim-5.0.1.tgz#5d4d3ebdf9583d63a5333ce2deb7480ab2b05789" integrity sha512-i/2XbnSz/uxRCU6+NdVJgKWDTM427+MqYbkQzD321DuCQJUqOuJKIA0IM2+W2xtYHdKOmZ4dR6fExsd4SXL+WQ== +event-target-shim@^6.0.2: + version "6.0.2" + resolved "https://registry.yarnpkg.com/event-target-shim/-/event-target-shim-6.0.2.tgz#ea5348c3618ee8b62ff1d344f01908ee2b8a2b71" + integrity sha512-8q3LsZjRezbFZ2PN+uP+Q7pnHUMmAOziU2vA2OwoFaKIXxlxl38IylhSSgUorWu/rf4er67w0ikBqjBFk/pomA== + events@^3.3.0: version "3.3.0" resolved "https://registry.yarnpkg.com/events/-/events-3.3.0.tgz#31a95ad0a924e2d2c419a813aeb2c4e878ea7400" @@ -1122,6 +1145,13 @@ parseurl@~1.3.3: resolved "https://registry.yarnpkg.com/parseurl/-/parseurl-1.3.3.tgz#9da19e7bee8d12dff0513ed5b76957793bc2e8d4" integrity sha512-CiyeOxFT/JZyN5m0z9PfXw4SCBJ6Sygz1Dpl0wqjlhDEGGBP1GnsUVEL0p63hoG1fcj3fHynXi9NYO4nWOL+qQ== +partysocket@^1.0.2: + version "1.0.3" + resolved "https://registry.yarnpkg.com/partysocket/-/partysocket-1.0.3.tgz#58cf1494d44900a7377592683d171fddd3d9c543" + integrity sha512-7sSojS4oCRK1Fe1h+Sa0Za5dwOf+M9VksQlynD8yqwGpLvnO4oxx9ppmOSeh6CJTMbF5gbnvUQKMK525QSBdBw== + dependencies: + event-target-shim "^6.0.2" + path-to-regexp@0.1.7: version "0.1.7" resolved "https://registry.yarnpkg.com/path-to-regexp/-/path-to-regexp-0.1.7.tgz#df604178005f522f15eb4490e7247a1bfaa67f8c" @@ -1502,7 +1532,7 @@ whatwg-url@^13.0.0: tr46 "^4.1.1" webidl-conversions "^7.0.0" -ws@^8.12.0: +ws@^8.12.0, ws@^8.18.0: version "8.18.0" resolved "https://registry.yarnpkg.com/ws/-/ws-8.18.0.tgz#0d7505a6eafe2b0e712d232b42279f53bc289bbc" integrity sha512-8VbfWfHLbbwu3+N6OKsOMpBdT4kXPDDB9cJk2bJ6mh9ucxdlnNvH1e+roYkKmN9Nxw2yjz7VzeO9oOz2zJ04Pw==