Skip to content

Commit

Permalink
close jetstream when queue gets too large, open when queue complete
Browse files Browse the repository at this point in the history
  • Loading branch information
Bossett committed Jan 1, 2025
1 parent 4c6ce12 commit 00cdac0
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 14 deletions.
2 changes: 2 additions & 0 deletions src/subscription.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ export class FirehoseSubscription extends FirehoseSubscriptionBase {
public intervalId: NodeJS.Timer

async handleEvent(posts) {
if (posts.creates.length === 0 && posts.deletes.length === 0) return

const postsToDelete = posts.deletes.map((del) => del.uri)

// Transform posts in parallel
Expand Down
48 changes: 34 additions & 14 deletions src/util/subscription.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import WebSocket from 'ws'

import { Semaphore } from 'async-mutex'

const semaphore = new Semaphore(24)
const semaphore = new Semaphore(16)

const includedRecords = new Set(['app.bsky.feed.post'])

Expand All @@ -32,12 +32,19 @@ export abstract class FirehoseSubscriptionBase {
wantedCollections: Array.from(includedRecords.values()),
ws: WebSocket,
cursor: lastSuccessfulCursor,
endpoint: 'wss://jetstream2.us-west.bsky.network/subscribe',
})

this.jetstream.start()

this.jetstream.on('commit', (event) => {
console.log('got event')
eventQueue.push(event)
if (
eventQueue.length > 1000 &&
this.jetstream.ws?.readyState === WebSocket.OPEN
) {
console.log('core: queue too large, closing jetstream...')
this.jetstream.close()
}
})

const processEvent = async (event: any) => {
Expand Down Expand Up @@ -71,22 +78,34 @@ export abstract class FirehoseSubscriptionBase {

const processQueue = async () => {
let handledEvents = 0
let lastSuccessfulCursor = null
let lastSuccessfulCursor = (await this.getCursor()).cursor

while (true) {
while (eventQueue.length === 0) {
await new Promise((resolve) => setTimeout(resolve, 1000))
if (eventQueue.length === 0) {
if (
this.jetstream.ws?.readyState !== WebSocket.OPEN &&
this.jetstream.ws?.readyState !== WebSocket.CONNECTING
) {
console.log('core: jetstream closed, starting...')
this.jetstream.cursor = lastSuccessfulCursor
this.jetstream.start()
} else {
await new Promise((resolve) => setTimeout(resolve, 1))
}
}

const event = eventQueue.shift()
const posts = await processEvent(event)
if (handledEvents >= 1000) {
if (lastSuccessfulCursor) this.updateCursor(lastSuccessfulCursor)
handledEvents = 0
}
while (eventQueue.length > 0) {
const event = eventQueue.shift()
if (!event) continue

const posts = await processEvent(event)
if (handledEvents >= 1000) {
if (lastSuccessfulCursor) this.updateCursor(lastSuccessfulCursor)
handledEvents = 0
}

if (posts.creates.length + posts.deletes.length > 0) {
await semaphore.acquire().then(async ([value, release]) => {
console.log('handling event')
this.handleEvent(posts)
.then(() => {
lastSuccessfulCursor = posts.cursor
Expand All @@ -100,7 +119,8 @@ export abstract class FirehoseSubscriptionBase {
}
}

processQueue()
await new Promise((resolve) => setTimeout(resolve, 10 * 1000))
await processQueue()
}

async updateCursor(cursor: number) {
Expand Down

0 comments on commit 00cdac0

Please sign in to comment.