Skip to content

Commit

Permalink
feat(database): flush unsynced span
Browse files Browse the repository at this point in the history
  • Loading branch information
shigma committed May 4, 2024
1 parent 0a199ab commit ab19e21
Show file tree
Hide file tree
Showing 2 changed files with 121 additions and 36 deletions.
151 changes: 117 additions & 34 deletions packages/database/src/channel.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Context, Logger, Session, Universal } from '@satorijs/satori'
import { Context, Logger, remove, Session, Universal } from '@satorijs/satori'
import { Flatten, Query } from 'minato'
import { Message, SyncFlag } from '.'

Expand All @@ -11,12 +11,26 @@ export enum SyncStatus {
}

interface Span {
front: SpanID
back: SpanID
queue?: boolean
front: Endpoint
back: Endpoint
prev?: Span
next?: Span
data?: Message[]
task?: Promise<void>
latest?: boolean
}

type SpanID = [bigint, string]
namespace Span {
export function from(data: Message[], reverse: boolean) {
if (reverse) data.reverse()
const span = { data } as Span
span.back = [data[0].uid, data[0].id]
span.front = [data[data.length - 1].uid, data[data.length - 1].id]
return span
}
}

type Endpoint = [bigint, string]

export class SyncChannel {
public data: SyncChannel.Data
Expand Down Expand Up @@ -51,7 +65,7 @@ export class SyncChannel {
.execute()
while (data.length) {
const { syncFlag, id: frontId, uid: frontUid } = data.pop()!
const front: SpanID = [frontUid, frontId]
const front: Endpoint = [frontUid, frontId]
if (syncFlag === SyncFlag.BOTH) {
this._spans.push({ front, back: front })
} else if (syncFlag === SyncFlag.FRONT) {
Expand Down Expand Up @@ -96,10 +110,51 @@ export class SyncChannel {
}
}

private async flushSpan(span: Span) {
if (!span.data) return
return span.task ||= this._flushSpan(span)
}

private async _flushSpan(span: Span) {
if (!span.data) return
const data: Partial<Message>[] = span.data.slice()
if (span.next) {
data.unshift({
uid: span.next.front[0],
syncFlag: span.next.front[0] === span.next.back[0] ? SyncFlag.FRONT : SyncFlag.NONE,
})
} else {
span.data[span.data.length - 1].syncFlag = SyncFlag.FRONT
}
if (span.prev) {
data.unshift({
uid: span.prev.back[0],
syncFlag: span.prev.front[0] === span.prev.back[0] ? SyncFlag.BACK : SyncFlag.NONE,
})
} else {
span.data[0].syncFlag = span.data[0].syncFlag ? SyncFlag.BOTH : SyncFlag.BACK
}
await this.ctx.database.upsert('satori.message', data)
if (span.prev && span.next) {
remove(this._spans, span)
remove(this._spans, span.next)
span.prev.front = span.next.front
} else if (span.prev) {
remove(this._spans, span)
span.prev.front = span.front
} else if (span.next) {
remove(this._spans, span)
span.next.back = span.back
}
delete span.data
delete span.prev
delete span.next
}

private async flush() {
while (this._buffer.length) {
const data = this._buffer.splice(0)
if (this._spans[0]?.queue) {
if (this._spans[0]?.latest) {
const { front, back } = this._spans[0]
const last = data.pop()!
await this.ctx.database.upsert('satori.message', [
Expand All @@ -109,24 +164,30 @@ export class SyncChannel {
])
this._spans[0].front = [last.uid, last.id]
} else {
const last = data.pop()!
const first = data.shift()
if (first) {
const back = data.pop()!
let front = data.shift()
if (front) {
await this.ctx.database.upsert('satori.message', [
{ ...first, syncFlag: SyncFlag.BACK },
{ ...front, syncFlag: SyncFlag.BACK },
...data,
{ ...last, syncFlag: SyncFlag.FRONT },
{ ...back, syncFlag: SyncFlag.FRONT },
])
} else {
front = back
await this.ctx.database.upsert('satori.message', [
{ ...last, syncFlag: SyncFlag.BOTH },
{ ...back, syncFlag: SyncFlag.BOTH },
])
}
this._spans.unshift({
front: [front.uid, front.id],
back: [back.uid, back.id],
latest: true,
})
}
}
}

async getMessageList(id: string, count: number, direction: Universal.Direction) {
async getMessageList(id: string, direction: Universal.Direction, limit: number) {
if (this._buffer.some(message => message.id === id)) {
// TODO
} else {
Expand All @@ -137,41 +198,52 @@ export class SyncChannel {
if (message) {
const span = this._spans.find(span => span.front[0] <= message.uid && message.uid <= span.back[0])
if (!span) throw new Error('malformed sync span')
const beforeTask = direction === 'after' ? Promise.resolve([]) : this.syncHistory(span, message, count, 'before')
const afterTask = direction === 'before' ? Promise.resolve([]) : this.syncHistory(span, message, count, 'after')
if (direction === 'around') {
limit = Math.floor(limit / 2) + 1
}
const beforeTask = direction === 'after' ? Promise.resolve([]) : this.syncHistory(span, message, limit, 'before')
const afterTask = direction === 'before' ? Promise.resolve([]) : this.syncHistory(span, message, limit, 'after')
const [before, after] = await Promise.all([beforeTask, afterTask])
after.shift()
before.shift()
before.reverse()
if (direction === 'after') return after
if (direction === 'before') return before
return [...before, message, ...after]
} else {
const { channelId, platform, assignee } = this.data
const bot = this.ctx.bots[`${platform}:${assignee}`]
const result = await bot.getMessageList(channelId, id, 'around')

Check failure on line 216 in packages/database/src/channel.ts

View workflow job for this annotation

GitHub Actions / lint

'result' is assigned a value but never used
}
}
}

private async syncHistory(span: Span, message: Message | { uid: bigint }, count: number, direction: 'before' | 'after') {
private async syncHistory(nextSpan: Span, message: Message | { uid: bigint }, limit: number, direction: 'before' | 'after') {
const buffer: Message[] = []
const { channelId, platform, assignee } = this.data
const bot = this.ctx.bots[`${platform}:${assignee}`]
const dir = ({
before: {
front: 'front',
back: 'back',
prev: 'prev',
next: 'next',
desc: 'desc',
$lte: '$lte',
$gte: '$gte',
},
after: {
front: 'back',
back: 'front',
prev: 'next',
next: 'prev',
desc: 'asc',
$lte: '$gte',
$gte: '$lte',
},
} as const)[direction]
outer: while (true) {
if ('id' in message && span[dir.front][0] === message.uid) {
if ('id' in message && nextSpan[dir.front][0] === message.uid) {
buffer.push(message)
} else {
const before = await this.ctx.database
Expand All @@ -180,36 +252,47 @@ export class SyncChannel {
...this._baseQuery,
uid: {
[dir.$lte]: message.uid,
[dir.$gte]: span[dir.front][0],
[dir.$gte]: nextSpan[dir.front][0],
},
})
.orderBy('uid', dir.desc)
.limit(count - buffer.length)
.limit(limit - buffer.length)
.execute()
buffer.push(...before)
}
if (buffer.length >= count) return buffer
let next = span[dir.front][1]
while (true) {
if (buffer.length >= limit) return buffer
let next = nextSpan[dir.back][1]
const data: Message[] = []
while (next) {
const result = await bot.getMessageList(channelId, next, direction)
next = result.next!
for (let index = result.data.length - 1; index >= 0; index--) {
const prevSpan = this._spans.find(span => span[dir.back][1] === result.data[index].id)
if (direction === 'before') result.data.reverse()
for (const item of result.data) {
const prevSpan = this._spans.find(span => span[dir.front][1] === item.id)
if (prevSpan) {
span = prevSpan
message = { uid: prevSpan[dir.back][0] }
const _span = Span.from(data, direction === 'before')
_span[dir.next] = nextSpan
_span[dir.prev] = prevSpan
this._spans.push(_span)
this.flushSpan(_span)
nextSpan = prevSpan
message = { uid: prevSpan[dir.front][0] }
continue outer
}
buffer.push(Message.from(result.data[index], platform))
if (buffer.length >= count) return buffer
data.push(Message.from(item, platform))
}
if (data.length + buffer.length >= limit) {
buffer.push(...data)
break
}
next = result.next!
}
const _span = Span.from(data, direction === 'before')
_span[dir.next] = nextSpan
this._spans.push(_span)
this.flushSpan(_span)
return buffer.slice(0, limit)
}
}

toJSON(): SyncChannel.Data {
return this.data
}
}

export namespace SyncChannel {
Expand Down
6 changes: 4 additions & 2 deletions packages/protocol/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ export const Methods: Dict<Method> = {
'message.update': Method('editMessage', ['channel_id', 'message_id', 'content']),
'message.delete': Method('deleteMessage', ['channel_id', 'message_id']),
'message.get': Method('getMessage', ['channel_id', 'message_id']),
'message.list': Method('getMessageList', ['channel_id', 'next', 'direction']),
'message.list': Method('getMessageList', ['channel_id', 'next', 'direction', 'limit', 'order']),

'reaction.create': Method('createReaction', ['channel_id', 'message_id', 'emoji']),
'reaction.delete': Method('deleteReaction', ['channel_id', 'message_id', 'emoji', 'user_id']),
Expand Down Expand Up @@ -80,13 +80,15 @@ export interface TwoWayList<T> {

export type Direction = 'before' | 'after' | 'around'

export type Order = 'asc' | 'desc'

export interface Methods {
// message
createMessage(channelId: string, content: Element.Fragment, guildId?: string, options?: SendOptions): Promise<Message[]>
sendMessage(channelId: string, content: Element.Fragment, guildId?: string, options?: SendOptions): Promise<string[]>
sendPrivateMessage(userId: string, content: Element.Fragment, guildId?: string, options?: SendOptions): Promise<string[]>
getMessage(channelId: string, messageId: string): Promise<Message>
getMessageList(channelId: string, next?: string, direction?: Direction): Promise<TwoWayList<Message>>
getMessageList(channelId: string, next?: string, direction?: Direction, limit?: number, order?: Order): Promise<TwoWayList<Message>>
getMessageIter(channelId: string): AsyncIterable<Message>
editMessage(channelId: string, messageId: string, content: Element.Fragment): Promise<void>
deleteMessage(channelId: string, messageId: string): Promise<void>
Expand Down

0 comments on commit ab19e21

Please sign in to comment.