From ab19e2128fc239e8195fa74213d4fea5fd967be3 Mon Sep 17 00:00:00 2001 From: Shigma Date: Sun, 5 May 2024 02:47:08 +0800 Subject: [PATCH] feat(database): flush unsynced span --- packages/database/src/channel.ts | 151 ++++++++++++++++++++++++------- packages/protocol/src/index.ts | 6 +- 2 files changed, 121 insertions(+), 36 deletions(-) diff --git a/packages/database/src/channel.ts b/packages/database/src/channel.ts index 03812f27..670c0b3b 100644 --- a/packages/database/src/channel.ts +++ b/packages/database/src/channel.ts @@ -1,4 +1,4 @@ -import { Context, Logger, Session, Universal } from '@satorijs/satori' +import { Context, Logger, remove, Session, Universal } from '@satorijs/satori' import { Flatten, Query } from 'minato' import { Message, SyncFlag } from '.' @@ -11,12 +11,26 @@ export enum SyncStatus { } interface Span { - front: SpanID - back: SpanID - queue?: boolean + front: Endpoint + back: Endpoint + prev?: Span + next?: Span + data?: Message[] + task?: Promise + latest?: boolean } -type SpanID = [bigint, string] +namespace Span { + export function from(data: Message[], reverse: boolean) { + if (reverse) data.reverse() + const span = { data } as Span + span.back = [data[0].uid, data[0].id] + span.front = [data[data.length - 1].uid, data[data.length - 1].id] + return span + } +} + +type Endpoint = [bigint, string] export class SyncChannel { public data: SyncChannel.Data @@ -51,7 +65,7 @@ export class SyncChannel { .execute() while (data.length) { const { syncFlag, id: frontId, uid: frontUid } = data.pop()! - const front: SpanID = [frontUid, frontId] + const front: Endpoint = [frontUid, frontId] if (syncFlag === SyncFlag.BOTH) { this._spans.push({ front, back: front }) } else if (syncFlag === SyncFlag.FRONT) { @@ -96,10 +110,51 @@ export class SyncChannel { } } + private async flushSpan(span: Span) { + if (!span.data) return + return span.task ||= this._flushSpan(span) + } + + private async _flushSpan(span: Span) { + if (!span.data) return + const data: Partial[] = span.data.slice() + if (span.next) { + data.unshift({ + uid: span.next.front[0], + syncFlag: span.next.front[0] === span.next.back[0] ? SyncFlag.FRONT : SyncFlag.NONE, + }) + } else { + span.data[span.data.length - 1].syncFlag = SyncFlag.FRONT + } + if (span.prev) { + data.unshift({ + uid: span.prev.back[0], + syncFlag: span.prev.front[0] === span.prev.back[0] ? SyncFlag.BACK : SyncFlag.NONE, + }) + } else { + span.data[0].syncFlag = span.data[0].syncFlag ? SyncFlag.BOTH : SyncFlag.BACK + } + await this.ctx.database.upsert('satori.message', data) + if (span.prev && span.next) { + remove(this._spans, span) + remove(this._spans, span.next) + span.prev.front = span.next.front + } else if (span.prev) { + remove(this._spans, span) + span.prev.front = span.front + } else if (span.next) { + remove(this._spans, span) + span.next.back = span.back + } + delete span.data + delete span.prev + delete span.next + } + private async flush() { while (this._buffer.length) { const data = this._buffer.splice(0) - if (this._spans[0]?.queue) { + if (this._spans[0]?.latest) { const { front, back } = this._spans[0] const last = data.pop()! await this.ctx.database.upsert('satori.message', [ @@ -109,24 +164,30 @@ export class SyncChannel { ]) this._spans[0].front = [last.uid, last.id] } else { - const last = data.pop()! - const first = data.shift() - if (first) { + const back = data.pop()! + let front = data.shift() + if (front) { await this.ctx.database.upsert('satori.message', [ - { ...first, syncFlag: SyncFlag.BACK }, + { ...front, syncFlag: SyncFlag.BACK }, ...data, - { ...last, syncFlag: SyncFlag.FRONT }, + { ...back, syncFlag: SyncFlag.FRONT }, ]) } else { + front = back await this.ctx.database.upsert('satori.message', [ - { ...last, syncFlag: SyncFlag.BOTH }, + { ...back, syncFlag: SyncFlag.BOTH }, ]) } + this._spans.unshift({ + front: [front.uid, front.id], + back: [back.uid, back.id], + latest: true, + }) } } } - async getMessageList(id: string, count: number, direction: Universal.Direction) { + async getMessageList(id: string, direction: Universal.Direction, limit: number) { if (this._buffer.some(message => message.id === id)) { // TODO } else { @@ -137,8 +198,11 @@ export class SyncChannel { if (message) { const span = this._spans.find(span => span.front[0] <= message.uid && message.uid <= span.back[0]) if (!span) throw new Error('malformed sync span') - const beforeTask = direction === 'after' ? Promise.resolve([]) : this.syncHistory(span, message, count, 'before') - const afterTask = direction === 'before' ? Promise.resolve([]) : this.syncHistory(span, message, count, 'after') + if (direction === 'around') { + limit = Math.floor(limit / 2) + 1 + } + const beforeTask = direction === 'after' ? Promise.resolve([]) : this.syncHistory(span, message, limit, 'before') + const afterTask = direction === 'before' ? Promise.resolve([]) : this.syncHistory(span, message, limit, 'after') const [before, after] = await Promise.all([beforeTask, afterTask]) after.shift() before.shift() @@ -146,11 +210,15 @@ export class SyncChannel { if (direction === 'after') return after if (direction === 'before') return before return [...before, message, ...after] + } else { + const { channelId, platform, assignee } = this.data + const bot = this.ctx.bots[`${platform}:${assignee}`] + const result = await bot.getMessageList(channelId, id, 'around') } } } - private async syncHistory(span: Span, message: Message | { uid: bigint }, count: number, direction: 'before' | 'after') { + private async syncHistory(nextSpan: Span, message: Message | { uid: bigint }, limit: number, direction: 'before' | 'after') { const buffer: Message[] = [] const { channelId, platform, assignee } = this.data const bot = this.ctx.bots[`${platform}:${assignee}`] @@ -158,6 +226,8 @@ export class SyncChannel { before: { front: 'front', back: 'back', + prev: 'prev', + next: 'next', desc: 'desc', $lte: '$lte', $gte: '$gte', @@ -165,13 +235,15 @@ export class SyncChannel { after: { front: 'back', back: 'front', + prev: 'next', + next: 'prev', desc: 'asc', $lte: '$gte', $gte: '$lte', }, } as const)[direction] outer: while (true) { - if ('id' in message && span[dir.front][0] === message.uid) { + if ('id' in message && nextSpan[dir.front][0] === message.uid) { buffer.push(message) } else { const before = await this.ctx.database @@ -180,36 +252,47 @@ export class SyncChannel { ...this._baseQuery, uid: { [dir.$lte]: message.uid, - [dir.$gte]: span[dir.front][0], + [dir.$gte]: nextSpan[dir.front][0], }, }) .orderBy('uid', dir.desc) - .limit(count - buffer.length) + .limit(limit - buffer.length) .execute() buffer.push(...before) } - if (buffer.length >= count) return buffer - let next = span[dir.front][1] - while (true) { + if (buffer.length >= limit) return buffer + let next = nextSpan[dir.back][1] + const data: Message[] = [] + while (next) { const result = await bot.getMessageList(channelId, next, direction) - next = result.next! - for (let index = result.data.length - 1; index >= 0; index--) { - const prevSpan = this._spans.find(span => span[dir.back][1] === result.data[index].id) + if (direction === 'before') result.data.reverse() + for (const item of result.data) { + const prevSpan = this._spans.find(span => span[dir.front][1] === item.id) if (prevSpan) { - span = prevSpan - message = { uid: prevSpan[dir.back][0] } + const _span = Span.from(data, direction === 'before') + _span[dir.next] = nextSpan + _span[dir.prev] = prevSpan + this._spans.push(_span) + this.flushSpan(_span) + nextSpan = prevSpan + message = { uid: prevSpan[dir.front][0] } continue outer } - buffer.push(Message.from(result.data[index], platform)) - if (buffer.length >= count) return buffer + data.push(Message.from(item, platform)) + } + if (data.length + buffer.length >= limit) { + buffer.push(...data) + break } + next = result.next! } + const _span = Span.from(data, direction === 'before') + _span[dir.next] = nextSpan + this._spans.push(_span) + this.flushSpan(_span) + return buffer.slice(0, limit) } } - - toJSON(): SyncChannel.Data { - return this.data - } } export namespace SyncChannel { diff --git a/packages/protocol/src/index.ts b/packages/protocol/src/index.ts index be527c83..f9e18cfb 100644 --- a/packages/protocol/src/index.ts +++ b/packages/protocol/src/index.ts @@ -34,7 +34,7 @@ export const Methods: Dict = { 'message.update': Method('editMessage', ['channel_id', 'message_id', 'content']), 'message.delete': Method('deleteMessage', ['channel_id', 'message_id']), 'message.get': Method('getMessage', ['channel_id', 'message_id']), - 'message.list': Method('getMessageList', ['channel_id', 'next', 'direction']), + 'message.list': Method('getMessageList', ['channel_id', 'next', 'direction', 'limit', 'order']), 'reaction.create': Method('createReaction', ['channel_id', 'message_id', 'emoji']), 'reaction.delete': Method('deleteReaction', ['channel_id', 'message_id', 'emoji', 'user_id']), @@ -80,13 +80,15 @@ export interface TwoWayList { export type Direction = 'before' | 'after' | 'around' +export type Order = 'asc' | 'desc' + export interface Methods { // message createMessage(channelId: string, content: Element.Fragment, guildId?: string, options?: SendOptions): Promise sendMessage(channelId: string, content: Element.Fragment, guildId?: string, options?: SendOptions): Promise sendPrivateMessage(userId: string, content: Element.Fragment, guildId?: string, options?: SendOptions): Promise getMessage(channelId: string, messageId: string): Promise - getMessageList(channelId: string, next?: string, direction?: Direction): Promise> + getMessageList(channelId: string, next?: string, direction?: Direction, limit?: number, order?: Order): Promise> getMessageIter(channelId: string): AsyncIterable editMessage(channelId: string, messageId: string, content: Element.Fragment): Promise deleteMessage(channelId: string, messageId: string): Promise