Skip to content

Commit

Permalink
skip repost,follow,like
Browse files Browse the repository at this point in the history
  • Loading branch information
Bossett committed Sep 3, 2024
1 parent 04c4e88 commit f22016a
Showing 1 changed file with 194 additions and 185 deletions.
379 changes: 194 additions & 185 deletions src/util/subscription.ts
Original file line number Diff line number Diff line change
@@ -1,185 +1,194 @@
import { Subscription } from '@atproto/xrpc-server'
import { cborToLexRecord, readCar } from '@atproto/repo'
import { BlobRef } from '@atproto/lexicon'
import { ids, lexicons } from '../lexicon/lexicons'
import { Record as PostRecord } from '../lexicon/types/app/bsky/feed/post'
import { Record as RepostRecord } from '../lexicon/types/app/bsky/feed/repost'
import { Record as LikeRecord } from '../lexicon/types/app/bsky/feed/like'
import { Record as FollowRecord } from '../lexicon/types/app/bsky/graph/follow'
import {
Commit,
OutputSchema as RepoEvent,
isCommit,
} from '../lexicon/types/com/atproto/sync/subscribeRepos'
import { Database } from '../db'

export abstract class FirehoseSubscriptionBase {
public sub: Subscription<RepoEvent>

constructor(public db: Database, public service: string) {
this.sub = new Subscription({
service: service,
method: ids.ComAtprotoSyncSubscribeRepos,
getParams: () => this.getCursor(),
validate: (value: unknown) => {
try {
return lexicons.assertValidXrpcMessage<RepoEvent>(
ids.ComAtprotoSyncSubscribeRepos,
value,
)
} catch (err) {
console.error('repo subscription skipped invalid message', err)
}
},
})
}

abstract handleEvent(evt: RepoEvent): Promise<void>

async run(subscriptionReconnectDelay: number) {
try {
for await (const evt of this.sub) {
try {
this.handleEvent(evt) // no longer awaiting this
} catch (err) {
console.error('repo subscription could not handle message', err)
}
// update stored cursor every 20 events or so
if (isCommit(evt) && evt.seq % 20 === 0) {
await this.updateCursor(evt.seq)
}
}
} catch (err) {
console.error('repo subscription errored', err)
setTimeout(
() => this.run(subscriptionReconnectDelay),
subscriptionReconnectDelay,
)
}
}

async updateCursor(cursor: number) {
await this.db.updateSubStateCursor(this.service, cursor)
}

async getCursor(): Promise<{ cursor?: number }> {
const res = await this.db.getSubStateCursor(this.service)
return res ? { cursor: res.cursor } : {}
}
}

export const getOpsByType = async (evt: Commit): Promise<OperationsByType> => {
const car = await readCar(evt.blocks)
const opsByType: OperationsByType = {
posts: { creates: [], deletes: [] },
reposts: { creates: [], deletes: [] },
likes: { creates: [], deletes: [] },
follows: { creates: [], deletes: [] },
}

for (const op of evt.ops) {
const uri = `at://${evt.repo}/${op.path}`
const [collection] = op.path.split('/')

if (op.action === 'update') continue // updates not supported yet

if (op.action === 'create') {
if (!op.cid) continue
const recordBytes = car.blocks.get(op.cid)
if (!recordBytes) continue
const record = cborToLexRecord(recordBytes)
const create = { uri, cid: op.cid.toString(), author: evt.repo }
if (collection === ids.AppBskyFeedPost && isPost(record)) {
opsByType.posts.creates.push({ record, ...create })
} else if (collection === ids.AppBskyFeedRepost && isRepost(record)) {
opsByType.reposts.creates.push({ record, ...create })
} else if (collection === ids.AppBskyFeedLike && isLike(record)) {
opsByType.likes.creates.push({ record, ...create })
} else if (collection === ids.AppBskyGraphFollow && isFollow(record)) {
opsByType.follows.creates.push({ record, ...create })
}
}

if (op.action === 'delete') {
if (collection === ids.AppBskyFeedPost) {
opsByType.posts.deletes.push({ uri })
} else if (collection === ids.AppBskyFeedRepost) {
opsByType.reposts.deletes.push({ uri })
} else if (collection === ids.AppBskyFeedLike) {
opsByType.likes.deletes.push({ uri })
} else if (collection === ids.AppBskyGraphFollow) {
opsByType.follows.deletes.push({ uri })
}
}
}

return opsByType
}

type OperationsByType = {
posts: Operations<PostRecord>
reposts: Operations<RepostRecord>
likes: Operations<LikeRecord>
follows: Operations<FollowRecord>
}

type Operations<T = Record<string, unknown>> = {
creates: CreateOp<T>[]
deletes: DeleteOp[]
}

type CreateOp<T> = {
uri: string
cid: string
author: string
record: T
}

type DeleteOp = {
uri: string
}

export const isPost = (obj: unknown): obj is PostRecord => {
return isType(obj, ids.AppBskyFeedPost)
}

export const isRepost = (obj: unknown): obj is RepostRecord => {
return isType(obj, ids.AppBskyFeedRepost)
}

export const isLike = (obj: unknown): obj is LikeRecord => {
return isType(obj, ids.AppBskyFeedLike)
}

export const isFollow = (obj: unknown): obj is FollowRecord => {
return isType(obj, ids.AppBskyGraphFollow)
}

const isType = (obj: unknown, nsid: string) => {
try {
lexicons.assertValidRecord(nsid, fixBlobRefs(obj))
return true
} catch (err) {
return false
}
}

// @TODO right now record validation fails on BlobRefs
// simply because multiple packages have their own copy
// of the BlobRef class, causing instanceof checks to fail.
// This is a temporary solution.
const fixBlobRefs = (obj: unknown): unknown => {
if (Array.isArray(obj)) {
return obj.map(fixBlobRefs)
}
if (obj && typeof obj === 'object') {
if (obj.constructor.name === 'BlobRef') {
const blob = obj as BlobRef
return new BlobRef(blob.ref, blob.mimeType, blob.size, blob.original)
}
return Object.entries(obj).reduce((acc, [key, val]) => {
return Object.assign(acc, { [key]: fixBlobRefs(val) })
}, {} as Record<string, unknown>)
}
return obj
}
import { Subscription } from '@atproto/xrpc-server'
import { cborToLexRecord, readCar } from '@atproto/repo'
import { BlobRef } from '@atproto/lexicon'
import { ids, lexicons } from '../lexicon/lexicons'
import { Record as PostRecord } from '../lexicon/types/app/bsky/feed/post'
import { Record as RepostRecord } from '../lexicon/types/app/bsky/feed/repost'
import { Record as LikeRecord } from '../lexicon/types/app/bsky/feed/like'
import { Record as FollowRecord } from '../lexicon/types/app/bsky/graph/follow'
import {
Commit,
OutputSchema as RepoEvent,
isCommit,
} from '../lexicon/types/com/atproto/sync/subscribeRepos'
import { Database } from '../db'

export abstract class FirehoseSubscriptionBase {
public sub: Subscription<RepoEvent>

constructor(public db: Database, public service: string) {
this.sub = new Subscription({
service: service,
method: ids.ComAtprotoSyncSubscribeRepos,
getParams: () => this.getCursor(),
validate: (value: unknown) => {
try {
return lexicons.assertValidXrpcMessage<RepoEvent>(
ids.ComAtprotoSyncSubscribeRepos,
value,
)
} catch (err) {
console.error('repo subscription skipped invalid message', err)
}
},
})
}

abstract handleEvent(evt: RepoEvent): Promise<void>

async run(subscriptionReconnectDelay: number) {
try {
for await (const evt of this.sub) {
try {
this.handleEvent(evt) // no longer awaiting this
} catch (err) {
console.error('repo subscription could not handle message', err)
}
// update stored cursor every 20 events or so
if (isCommit(evt) && evt.seq % 20 === 0) {
await this.updateCursor(evt.seq)
}
}
} catch (err) {
console.error('repo subscription errored', err)
setTimeout(
() => this.run(subscriptionReconnectDelay),
subscriptionReconnectDelay,
)
}
}

async updateCursor(cursor: number) {
await this.db.updateSubStateCursor(this.service, cursor)
}

async getCursor(): Promise<{ cursor?: number }> {
const res = await this.db.getSubStateCursor(this.service)
return res ? { cursor: res.cursor } : {}
}
}

export const getOpsByType = async (evt: Commit): Promise<OperationsByType> => {
const car = await readCar(evt.blocks)
const opsByType: OperationsByType = {
posts: { creates: [], deletes: [] },
reposts: { creates: [], deletes: [] },
likes: { creates: [], deletes: [] },
follows: { creates: [], deletes: [] },
}

for (const op of evt.ops) {
const uri = `at://${evt.repo}/${op.path}`
const [collection] = op.path.split('/')

if (
[
'app.bsky.feed.repost',
'app.bsky.graph.follow',
'app.bsky.feed.like',
].includes(collection)
)
continue

if (op.action === 'update') continue // updates not supported yet

if (op.action === 'create') {
if (!op.cid) continue
const recordBytes = car.blocks.get(op.cid)
if (!recordBytes) continue
const record = cborToLexRecord(recordBytes)
const create = { uri, cid: op.cid.toString(), author: evt.repo }
if (collection === ids.AppBskyFeedPost && isPost(record)) {
opsByType.posts.creates.push({ record, ...create })
} else if (collection === ids.AppBskyFeedRepost && isRepost(record)) {
opsByType.reposts.creates.push({ record, ...create })
} else if (collection === ids.AppBskyFeedLike && isLike(record)) {
opsByType.likes.creates.push({ record, ...create })
} else if (collection === ids.AppBskyGraphFollow && isFollow(record)) {
opsByType.follows.creates.push({ record, ...create })
}
}

if (op.action === 'delete') {
if (collection === ids.AppBskyFeedPost) {
opsByType.posts.deletes.push({ uri })
} else if (collection === ids.AppBskyFeedRepost) {
opsByType.reposts.deletes.push({ uri })
} else if (collection === ids.AppBskyFeedLike) {
opsByType.likes.deletes.push({ uri })
} else if (collection === ids.AppBskyGraphFollow) {
opsByType.follows.deletes.push({ uri })
}
}
}

return opsByType
}

type OperationsByType = {
posts: Operations<PostRecord>
reposts: Operations<RepostRecord>
likes: Operations<LikeRecord>
follows: Operations<FollowRecord>
}

type Operations<T = Record<string, unknown>> = {
creates: CreateOp<T>[]
deletes: DeleteOp[]
}

type CreateOp<T> = {
uri: string
cid: string
author: string
record: T
}

type DeleteOp = {
uri: string
}

export const isPost = (obj: unknown): obj is PostRecord => {
return isType(obj, ids.AppBskyFeedPost)
}

export const isRepost = (obj: unknown): obj is RepostRecord => {
return isType(obj, ids.AppBskyFeedRepost)
}

export const isLike = (obj: unknown): obj is LikeRecord => {
return isType(obj, ids.AppBskyFeedLike)
}

export const isFollow = (obj: unknown): obj is FollowRecord => {
return isType(obj, ids.AppBskyGraphFollow)
}

const isType = (obj: unknown, nsid: string) => {
try {
lexicons.assertValidRecord(nsid, fixBlobRefs(obj))
return true
} catch (err) {
return false
}
}

// @TODO right now record validation fails on BlobRefs
// simply because multiple packages have their own copy
// of the BlobRef class, causing instanceof checks to fail.
// This is a temporary solution.
const fixBlobRefs = (obj: unknown): unknown => {
if (Array.isArray(obj)) {
return obj.map(fixBlobRefs)
}
if (obj && typeof obj === 'object') {
if (obj.constructor.name === 'BlobRef') {
const blob = obj as BlobRef
return new BlobRef(blob.ref, blob.mimeType, blob.size, blob.original)
}
return Object.entries(obj).reduce((acc, [key, val]) => {
return Object.assign(acc, { [key]: fixBlobRefs(val) })
}, {} as Record<string, unknown>)
}
return obj
}

0 comments on commit f22016a

Please sign in to comment.