diff --git a/package.json b/package.json index d65f260..c4a7b5e 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@ucla-irl/ndnts-aux", - "version": "3.0.5", + "version": "4.0.0", "description": "NDNts Auxiliary Package for Web and Deno", "scripts": { "test": "deno test --no-check", diff --git a/src/adaptors/yjs-ndn-adaptor.ts b/src/adaptors/yjs-ndn-adaptor.ts index bb35d5d..813529c 100644 --- a/src/adaptors/yjs-ndn-adaptor.ts +++ b/src/adaptors/yjs-ndn-adaptor.ts @@ -2,6 +2,10 @@ import { SyncAgent } from '../sync-agent/mod.ts'; import * as Y from 'yjs'; import { Awareness } from 'y-protocols/awareness.js'; import { Bundler } from './bundler.ts'; +import { Decoder, Encoder } from '@ndn/tlv'; +import { Component, Data, Name } from '@ndn/packet'; +import { Version } from '@ndn/naming-convention2'; +import { StateVector } from '@ndn/svs'; /** * NDN SVS Provider for Yjs. Wraps update into `SyncAgent`'s `update` channel. @@ -30,14 +34,17 @@ export class NdnSvsAdaptor { public syncAgent: SyncAgent, public readonly doc: Y.Doc, public readonly topic: string, + public readonly snapshotTopic: string = 'snapshot', + public readonly snapshotFrequency: number = 10, useBundler: boolean = false, ) { syncAgent.register('update', topic, (content) => this.handleSyncUpdate(content)); + syncAgent.register('blob', snapshotTopic, (content) => this.handleSnapshotUpdate(content)); doc.on('update', this.callback); if (useBundler) { this.#bundler = new Bundler( Y.mergeUpdates, - (content) => this.syncAgent.publishUpdate(this.topic, content), + (content) => this.publishUpdate(this.topic, content), { thresholdSize: 3000, delayMs: 400, @@ -96,7 +103,106 @@ export class NdnSvsAdaptor { if (this.#bundler) { await this.#bundler.produce(content); } else { - await this.syncAgent.publishUpdate(this.topic, content); + await this.publishUpdate(this.topic, content); + } + } + + private async publishUpdate(topic: string, content: Uint8Array) { + await this.syncAgent.publishUpdate(topic, content); + + const stateVector = this.syncAgent.getUpdateSyncSV(); + let count = 0; + for (const [_id, seq] of stateVector) { + count += seq; + } + + if (count % this.snapshotFrequency == 0) { + const encodedSV = Encoder.encode(stateVector); + + // NOTE: The following code depend on snapshot naming convention to work. + // Verify this part if there's a change in naming convention. + // NOTE: Currently naming convention is hard-coded. May need organizing. + const snapshotPrefix = this.syncAgent.appPrefix.append('32=snapshot'); + // New SVS encodings + const snapshotName = snapshotPrefix.append(new Component(Version.type, encodedSV)); + + // Snapshot content generation + const content = Y.encodeStateAsUpdate(this.doc); + // its already in UInt8Array (binary), transporting currently without any additional encoding. + // use syncAgent's blob and publish mechanism + await this.syncAgent.publishBlob('snapshot', content, snapshotName, true); + + // NOTE: The following code depend on snapshot naming convention to work. + // Verify this part if there's a change in naming convention. + // Race Condition note: Testing suggests that the write above with publishBlob() + // is near certainly done before the read happens below. + // Hence no delay is added. + // first segmented object is at /50=%00 + const firstSegmentName = snapshotName.append('50=%00').toString(); + const firstSegmentPacketEncoded = await this.syncAgent.persistStorage.get(firstSegmentName); + if (firstSegmentPacketEncoded) { + const firstSegmentPacket = Decoder.decode(firstSegmentPacketEncoded, Data); + await this.syncAgent.persistStorage.set(snapshotPrefix.toString(), Encoder.encode(firstSegmentPacket)); + } + } + } + + async handleSnapshotUpdate(snapshotName: Uint8Array) { + // Maybe it's wise to put this under a try() because it might fail due to network issues. + const decodedSnapshotName = Decoder.decode(snapshotName, Name); + + // NOTE: The following code depend on snapshot naming convention to work. + // Verify this part if there's a change in naming convention. + const snapshotPrefix = this.syncAgent.appPrefix.append('32=snapshot'); + + // NOTE: The following code depend on snapshot naming convention to work. + // Verify this part if there's a change in naming convention. + const oldSnapshotFirstSegmentEncoded = await this.syncAgent.persistStorage.get(snapshotPrefix.toString()); + let oldSVCount = 0; + if (oldSnapshotFirstSegmentEncoded) { + const oldSnapshotFirstSegment = Decoder.decode(oldSnapshotFirstSegmentEncoded, Data); + const oldSnapshotVector = Decoder.decode(oldSnapshotFirstSegment.name.at(-2).value, StateVector); + for (const [_id, seq] of oldSnapshotVector) { + oldSVCount += seq; + } + } + + // NOTE: The following code depend on snapshot naming convention to work. + // Verify this part if there's a change in naming convention. + const snapshotSV = Decoder.decode(decodedSnapshotName.at(-1).value, StateVector); + let snapshotSVcount = 0; + for (const [_id, seq] of snapshotSV) { + snapshotSVcount += seq; + } + + // NOTE: The following code depend on snapshot naming convention to work. + // Verify this part if there's a change in naming convention. + // NOTE: From Github Discussion: + // Though, this "update the snapshot response strategy on receiving new snapshot from SVS" logic is somewhat optional in nature. + // It is ran, such that if a blind fetch request reaches an endpoint, endpoint returns a good response. + // Just like snapshot responses, we don't have to guarantee absolute latest when it is about blind fetching. + // hence we can just use a rough "total count" for determining if it needs an update. + if (snapshotSVcount > oldSVCount) { + const firstSegmentName = decodedSnapshotName.append('50=%00').toString(); + // Race Condition Note: The callback to here is faster than + // fetchBlob() finish writing to persistStore. + // (in syncAgent before listener callback to here) + // Tested getBlob() to guarantee item arrival + // But ends up having multiple active sessions of fetchBlob(). bad. + // Hence a delay of 1 second. + await new Promise((r) => setTimeout(r, 1000)); + const firstSegmentPacketEncoded = await this.syncAgent.persistStorage.get(firstSegmentName); + if (firstSegmentPacketEncoded) { + const firstSegmentPacket = Decoder.decode(firstSegmentPacketEncoded, Data); + // utilize snapshotPrefix above, with the same namingConvention warning. + // this is done to update the key of the prefix so program return latest when blind fetching. + this.syncAgent.persistStorage.set(snapshotPrefix.toString(), Encoder.encode(firstSegmentPacket)); + // should set snapshotPrefix to the newest packet. + } else { + console.debug('PersistentStorage doesnt have the snapshot yet. Skipping update.'); + // If the above race condition fails (reads before data arrives), + // 'endpoint's blind fetch mechanism' is not updated to latest, should be fine. + } } } diff --git a/src/sync-agent/sync-agent.ts b/src/sync-agent/sync-agent.ts index e0c74fc..e94f446 100644 --- a/src/sync-agent/sync-agent.ts +++ b/src/sync-agent/sync-agent.ts @@ -1,6 +1,6 @@ import * as endpoint from '@ndn/endpoint'; import type { Forwarder } from '@ndn/fw'; -import { Data, type Interest, Name, Signer, type Verifier } from '@ndn/packet'; +import { Component, Data, type Interest, Name, Signer, type Verifier } from '@ndn/packet'; import { Decoder, Encoder } from '@ndn/tlv'; import { BufferChunkSource, DataProducer, fetch } from '@ndn/segmented-object'; import { concatBuffers, fromHex } from '@ndn/util'; @@ -353,6 +353,19 @@ export class SyncAgent implements AsyncDisposable { async serve(interest: Interest) { const intName = interest.name; + + // NOTE: The following code depend on snapshot naming convention to work. + // Verify this part if there's a change in naming convention. + if (intName.get(this.appPrefix.length)?.equals(Component.from('32=snapshot'))) { + const wire = await this.persistStorage.get(intName.toString()); + if (wire === undefined || wire.length === 0) { + // console.warn(`A remote peer is fetching a non-existing object: ${intName.toString()}`); + return undefined; + } + const data = Decoder.decode(wire, Data); + return data; + } + if (intName.length <= this.appPrefix.length + 1) { // The name should be at least two components plus app prefix return undefined;