Skip to content

Commit

Permalink
The Jetstream Update
Browse files Browse the repository at this point in the history
  • Loading branch information
Bossett committed Dec 31, 2024
1 parent 8d1c468 commit 15e1c9f
Show file tree
Hide file tree
Showing 9 changed files with 115 additions and 119 deletions.
5 changes: 4 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -28,20 +28,23 @@
"build": "tsc && tsc-alias"
},
"dependencies": {
"@atcute/client": "^2.0.6",
"@atproto/api": "^0.13.5",
"@atproto/identity": "^0.4.1",
"@atproto/lexicon": "^0.4.1",
"@atproto/repo": "^0.5.0",
"@atproto/syntax": "^0.3.0",
"@atproto/xrpc-server": "^0.6.3",
"@skyware/jetstream": "^0.2.1",
"dotenv": "^16.4.5",
"express": "^4.18.2",
"follow-redirects": "1.15.8",
"mongodb": "^6.3.0",
"multiformats": "^9.9.0",
"node-fetch-native": "^1.6.2",
"p-ratelimit": "^1.0.1",
"semver": "^7.6.0"
"semver": "^7.6.0",
"ws": "^8.18.0"
},
"devDependencies": {
"@types/express": "^4.17.21",
Expand Down
4 changes: 2 additions & 2 deletions src/addn/rateLimit.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { pRateLimit } from 'p-ratelimit'
const _limit = pRateLimit({
interval: undefined,
rate: undefined,
concurrency: undefined,
concurrency: 96,
maxDelay: undefined,
})

Expand All @@ -12,7 +12,7 @@ const limit = async <T>(fn: () => Promise<T>, retries = 3): Promise<T> => {
return await _limit(fn)
} catch (error) {
if (retries > 0) {
console.log(`retrying limited call:\n${fn.toString()}`)
console.log(`${error} - retrying limited call:\n${fn.toString()}`)
const delay = (ms: number) => new Promise((res) => setTimeout(res, ms))
await delay(3000)
return await limit(fn, retries - 1)
Expand Down
5 changes: 2 additions & 3 deletions src/algos/cats.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ export class manager extends AlgoManager {

if (this.agent === null) return false

let match = false
let matchParts: string[] = []

if (post.embed?.images) {
Expand All @@ -101,9 +100,9 @@ export class manager extends AlgoManager {
const matchString = matchParts.join(' ')

if (matchString.match(this.re) !== null) {
match = true
return true
}

return match
return false
}
}
6 changes: 2 additions & 4 deletions src/algos/keyboards.ts
Original file line number Diff line number Diff line change
Expand Up @@ -189,8 +189,6 @@ export class manager extends AlgoManager {
}
if (this.agent === null) return false

let match = false

let matchString = ''

if (post.embed?.images) {
Expand All @@ -213,9 +211,9 @@ export class manager extends AlgoManager {
matchString = `${post.text} ${matchString}`.replace('\n', ' ')

if (matchString.match(this.re) !== null) {
match = true
return true
}

return match
return false
}
}
16 changes: 12 additions & 4 deletions src/algos/paxaus.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import { AlgoManager } from '../addn/algoManager'
import dotenv from 'dotenv'
import { Post } from '../db/schema'
import dbClient from '../db/dbClient'
import { Database } from '../db'
import { BskyAgent } from '@atproto/api'

dotenv.config()

Expand Down Expand Up @@ -38,10 +40,16 @@ export class manager extends AlgoManager {

public matchTerms: string[] = ['pax aus', 'paxaus', 'pax australia']

public re = new RegExp(
`^.*\\b(${this.matchTerms.join('|')})(es|s)?\\b.*$`,
'ims',
)
public re: RegExp

constructor(db: Database, agent: BskyAgent) {
super(db, agent)

this.re = new RegExp(
`^.*\\b(${this.matchTerms.join('|')})(es|s)?\\b.*$`,
'ims',
)
}

public async periodicTask() {
await this.db.removeTagFromOldPosts(
Expand Down
10 changes: 9 additions & 1 deletion src/algos/twelve-words.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import { AlgoManager } from '../addn/algoManager'
import dotenv from 'dotenv'
import { Post } from '../db/schema'
import dbClient from '../db/dbClient'
import { Database } from '../db'
import { BskyAgent } from '@atproto/api'

dotenv.config()

Expand Down Expand Up @@ -37,7 +39,13 @@ export const handler = async (ctx: AppContext, params: QueryParams) => {
export class manager extends AlgoManager {
public name: string = shortname

public re = new RegExp(`^.*\\btransition\\b.*\\b12 words\\b.*$`, 'ims')
public re: RegExp

constructor(db: Database, agent: BskyAgent) {
super(db, agent)

this.re = new RegExp(`^.*\\btransition\\b.*\\b12 words\\b.*$`, 'ims')
}

public async periodicTask() {
return
Expand Down
22 changes: 4 additions & 18 deletions src/subscription.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,25 +54,11 @@ export class FirehoseSubscription extends FirehoseSubscriptionBase {
public authorList: string[]
public intervalId: NodeJS.Timer

async handleEvent(evt: RepoEvent) {
if (!isCommit(evt)) return

await Promise.all(this.algoManagers.map((manager) => manager.ready()))

let ops: any
try {
ops = await getOpsByType(evt)
} catch (e) {
console.log(`core: error decoding ops ${e.message}`)
return
}

if (!ops) return

const postsToDelete = ops.posts.deletes.map((del) => del.uri)
async handleEvent(posts) {
const postsToDelete = posts.deletes.map((del) => del.uri)

// Transform posts in parallel
const postsCreated = ops.posts.creates.map((create) => ({
const postsCreated = posts.creates.map((create) => ({
_id: null,
uri: create.uri,
cid: create.cid,
Expand Down Expand Up @@ -142,6 +128,6 @@ export class FirehoseSubscription extends FirehoseSubscriptionBase {
})
}

Promise.all(dbOperations)
await Promise.all(dbOperations)
}
}
134 changes: 49 additions & 85 deletions src/util/subscription.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,102 +13,66 @@ import {
} from '../lexicon/types/com/atproto/sync/subscribeRepos'
import { Database } from '../db'

import { Jetstream, CommitType, CommitCreate } from '@skyware/jetstream'
import WebSocket from 'ws'

const includedRecords = new Set(['app.bsky.feed.post'])

class Semaphore {
private tasks: (() => void)[] = []
private counter: number
export abstract class FirehoseSubscriptionBase {
public jetstream: Jetstream

constructor(maxConcurrent: number) {
this.counter = maxConcurrent
}
constructor(public db: Database, public service: string) {}

async acquire() {
if (this.counter > 0) {
this.counter--
return
}
await new Promise<void>((resolve) => this.tasks.push(resolve))
}
abstract handleEvent(evt: any): Promise<void>

release() {
this.counter++
if (this.tasks.length > 0) {
const nextTask = this.tasks.shift()
if (nextTask) nextTask()
}
}
}
async run(subscriptionReconnectDelay: number) {
let handledEvents = 0

export abstract class FirehoseSubscriptionBase {
public sub: Subscription<RepoEvent>
private eventQueue: RepoEvent[] = []
private semaphore: Semaphore

constructor(public db: Database, public service: string) {
this.sub = new Subscription({
service: service,
method: ids.ComAtprotoSyncSubscribeRepos,
getParams: () => this.getCursor(),
validate: (value: unknown) => {
return value as RepoEvent
},
heartbeatIntervalMs: 30000,
this.jetstream = new Jetstream({
wantedCollections: Array.from(includedRecords.values()),
ws: WebSocket,
...(await this.getCursor()),
})
this.semaphore = new Semaphore(8)
}

abstract handleEvent(evt: RepoEvent): Promise<void>
this.jetstream.start()

this.jetstream.on('commit', (event) => {
const posts = {
cursor: event.time_us,
creates: [] as {
uri: string
cid: string
author: string
record: any
}[],
deletes: [] as { uri: string }[],
}

async run(subscriptionReconnectDelay: number) {
let handledEvents = 0
try {
for await (const evt of this.sub) {
const commit = evt as Commit

if (Array.isArray(commit.ops) && commit.ops.length > 0) {
if (commit.blocks) {
const [collection] = commit.ops[0].path.split('/')

if (includedRecords.has(collection)) {
handledEvents++
this.eventQueue.push(evt)
if (this.eventQueue.length >= 10) {
await this.processEventQueue()
}
}
}
}
// update stored cursor every 1000 events or so
if (handledEvents > 1000 && Number.isInteger(commit.seq)) {
this.updateCursor(commit.seq).then(() => {
handledEvents = 0
})
}
if (event.commit.operation === CommitType.Create) {
posts.creates.push({
uri: `at://${event.did}/${event.commit.collection}/${event.commit.rkey}`,
cid: event.commit.cid,
author: event.did,
record: event.commit.record,
})
}
if (event.commit.operation === CommitType.Delete) {
posts.deletes.push({
uri: `at://${event.did}/${event.commit.collection}/${event.commit.rkey}`,
})
}

handledEvents++
if (handledEvents >= 1000) {
this.updateCursor(posts.cursor).then(() => {
handledEvents = 0
})
}
} catch (err) {
console.error('repo subscription errored', err)
setTimeout(
() => this.run(subscriptionReconnectDelay),
subscriptionReconnectDelay,
)
}
}

private async processEventQueue() {
const eventsToProcess = this.eventQueue.splice(0, 10)
await Promise.all(
eventsToProcess.map(async (evt) => {
await this.semaphore.acquire()
this.handleEvent(evt)
.catch((err) => {
console.log(`err in handleEvent ${err}`)
})
.finally(() => {
this.semaphore.release()
})
}),
)
if (posts.creates.length > 0 || posts.deletes.length > 0) {
this.handleEvent(posts)
}
})
}

async updateCursor(cursor: number) {
Expand Down
32 changes: 31 additions & 1 deletion yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,16 @@
# yarn lockfile v1


"@atcute/bluesky@^1.0.6":
version "1.0.11"
resolved "https://registry.yarnpkg.com/@atcute/bluesky/-/bluesky-1.0.11.tgz#0db494f4228a3a660ab4084f0a2e813fa9eef6f9"
integrity sha512-j4wLJIzKWh0gmQZ7s5svhEVRThlzqyfstbM0qM/H1Y+ZIQESx5HtVu+390zAWzZoEKZYSyTM9+ctY39OiEGDQw==

"@atcute/client@^2.0.6":
version "2.0.6"
resolved "https://registry.yarnpkg.com/@atcute/client/-/client-2.0.6.tgz#5a2cc0e879c1b897e2770613c50836f5866f5800"
integrity sha512-mhdqEicGUx0s5HTFOLpz91rcLS9j/g63de0nmAqv7blhU3j+xBf4le54qr2YIXNfnReZI7EwLYLX/YIBez4LGA==

"@atproto/api@^0.13.5":
version "0.13.6"
resolved "https://registry.yarnpkg.com/@atproto/api/-/api-0.13.6.tgz#2500e9d7143e6718089632300c42ce50149f8cd5"
Expand Down Expand Up @@ -319,6 +329,14 @@
"@nodelib/fs.scandir" "2.1.5"
fastq "^1.6.0"

"@skyware/jetstream@^0.2.1":
version "0.2.1"
resolved "https://registry.yarnpkg.com/@skyware/jetstream/-/jetstream-0.2.1.tgz#23caa9601c919092ef92b0f67729b923e00fc25b"
integrity sha512-qmQkBnMYG3+XBTLUDUKTWMS0QpwCFSZh66fvQRn+xEqUQ2CXB2ELo4El0tgVvdT4+glk4nfzVG45L6Op9VURow==
dependencies:
"@atcute/bluesky" "^1.0.6"
partysocket "^1.0.2"

"@types/body-parser@*":
version "1.19.5"
resolved "https://registry.yarnpkg.com/@types/body-parser/-/body-parser-1.19.5.tgz#04ce9a3b677dc8bd681a17da1ab9835dc9d3ede4"
Expand Down Expand Up @@ -727,6 +745,11 @@ event-target-shim@^5.0.0:
resolved "https://registry.yarnpkg.com/event-target-shim/-/event-target-shim-5.0.1.tgz#5d4d3ebdf9583d63a5333ce2deb7480ab2b05789"
integrity sha512-i/2XbnSz/uxRCU6+NdVJgKWDTM427+MqYbkQzD321DuCQJUqOuJKIA0IM2+W2xtYHdKOmZ4dR6fExsd4SXL+WQ==

event-target-shim@^6.0.2:
version "6.0.2"
resolved "https://registry.yarnpkg.com/event-target-shim/-/event-target-shim-6.0.2.tgz#ea5348c3618ee8b62ff1d344f01908ee2b8a2b71"
integrity sha512-8q3LsZjRezbFZ2PN+uP+Q7pnHUMmAOziU2vA2OwoFaKIXxlxl38IylhSSgUorWu/rf4er67w0ikBqjBFk/pomA==

events@^3.3.0:
version "3.3.0"
resolved "https://registry.yarnpkg.com/events/-/events-3.3.0.tgz#31a95ad0a924e2d2c419a813aeb2c4e878ea7400"
Expand Down Expand Up @@ -1122,6 +1145,13 @@ parseurl@~1.3.3:
resolved "https://registry.yarnpkg.com/parseurl/-/parseurl-1.3.3.tgz#9da19e7bee8d12dff0513ed5b76957793bc2e8d4"
integrity sha512-CiyeOxFT/JZyN5m0z9PfXw4SCBJ6Sygz1Dpl0wqjlhDEGGBP1GnsUVEL0p63hoG1fcj3fHynXi9NYO4nWOL+qQ==

partysocket@^1.0.2:
version "1.0.3"
resolved "https://registry.yarnpkg.com/partysocket/-/partysocket-1.0.3.tgz#58cf1494d44900a7377592683d171fddd3d9c543"
integrity sha512-7sSojS4oCRK1Fe1h+Sa0Za5dwOf+M9VksQlynD8yqwGpLvnO4oxx9ppmOSeh6CJTMbF5gbnvUQKMK525QSBdBw==
dependencies:
event-target-shim "^6.0.2"

[email protected]:
version "0.1.7"
resolved "https://registry.yarnpkg.com/path-to-regexp/-/path-to-regexp-0.1.7.tgz#df604178005f522f15eb4490e7247a1bfaa67f8c"
Expand Down Expand Up @@ -1502,7 +1532,7 @@ whatwg-url@^13.0.0:
tr46 "^4.1.1"
webidl-conversions "^7.0.0"

ws@^8.12.0:
ws@^8.12.0, ws@^8.18.0:
version "8.18.0"
resolved "https://registry.yarnpkg.com/ws/-/ws-8.18.0.tgz#0d7505a6eafe2b0e712d232b42279f53bc289bbc"
integrity sha512-8VbfWfHLbbwu3+N6OKsOMpBdT4kXPDDB9cJk2bJ6mh9ucxdlnNvH1e+roYkKmN9Nxw2yjz7VzeO9oOz2zJ04Pw==
Expand Down

0 comments on commit 15e1c9f

Please sign in to comment.