diff --git a/src/subscription.ts b/src/subscription.ts index 290cda6..32629b8 100644 --- a/src/subscription.ts +++ b/src/subscription.ts @@ -44,6 +44,8 @@ export class FirehoseSubscription extends FirehoseSubscriptionBase { public intervalId: NodeJS.Timer async handleEvent(posts) { + if (posts.creates.length === 0 && posts.deletes.length === 0) return + const postsToDelete = posts.deletes.map((del) => del.uri) // Transform posts in parallel diff --git a/src/util/subscription.ts b/src/util/subscription.ts index 35be703..4be8e7c 100755 --- a/src/util/subscription.ts +++ b/src/util/subscription.ts @@ -13,7 +13,7 @@ import WebSocket from 'ws' import { Semaphore } from 'async-mutex' -const semaphore = new Semaphore(24) +const semaphore = new Semaphore(16) const includedRecords = new Set(['app.bsky.feed.post']) @@ -32,12 +32,19 @@ export abstract class FirehoseSubscriptionBase { wantedCollections: Array.from(includedRecords.values()), ws: WebSocket, cursor: lastSuccessfulCursor, + endpoint: 'wss://jetstream2.us-west.bsky.network/subscribe', }) - this.jetstream.start() - this.jetstream.on('commit', (event) => { + console.log('got event') eventQueue.push(event) + if ( + eventQueue.length > 1000 && + this.jetstream.ws?.readyState === WebSocket.OPEN + ) { + console.log('core: queue too large, closing jetstream...') + this.jetstream.close() + } }) const processEvent = async (event: any) => { @@ -71,22 +78,34 @@ export abstract class FirehoseSubscriptionBase { const processQueue = async () => { let handledEvents = 0 - let lastSuccessfulCursor = null + let lastSuccessfulCursor = (await this.getCursor()).cursor while (true) { - while (eventQueue.length === 0) { - await new Promise((resolve) => setTimeout(resolve, 1000)) + if (eventQueue.length === 0) { + if ( + this.jetstream.ws?.readyState !== WebSocket.OPEN && + this.jetstream.ws?.readyState !== WebSocket.CONNECTING + ) { + console.log('core: jetstream closed, starting...') + this.jetstream.cursor = lastSuccessfulCursor + this.jetstream.start() + } else { + await new Promise((resolve) => setTimeout(resolve, 1)) + } } - const event = eventQueue.shift() - const posts = await processEvent(event) - if (handledEvents >= 1000) { - if (lastSuccessfulCursor) this.updateCursor(lastSuccessfulCursor) - handledEvents = 0 - } + while (eventQueue.length > 0) { + const event = eventQueue.shift() + if (!event) continue + + const posts = await processEvent(event) + if (handledEvents >= 1000) { + if (lastSuccessfulCursor) this.updateCursor(lastSuccessfulCursor) + handledEvents = 0 + } - if (posts.creates.length + posts.deletes.length > 0) { await semaphore.acquire().then(async ([value, release]) => { + console.log('handling event') this.handleEvent(posts) .then(() => { lastSuccessfulCursor = posts.cursor @@ -100,7 +119,8 @@ export abstract class FirehoseSubscriptionBase { } } - processQueue() + await new Promise((resolve) => setTimeout(resolve, 10 * 1000)) + await processQueue() } async updateCursor(cursor: number) {