Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NDNts-aux snapshot #6

Merged
merged 15 commits into from
Oct 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@ucla-irl/ndnts-aux",
"version": "3.0.5",
"version": "4.0.0",
"description": "NDNts Auxiliary Package for Web and Deno",
"scripts": {
"test": "deno test --no-check",
Expand Down
110 changes: 108 additions & 2 deletions src/adaptors/yjs-ndn-adaptor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@ import { SyncAgent } from '../sync-agent/mod.ts';
import * as Y from 'yjs';
import { Awareness } from 'y-protocols/awareness.js';
import { Bundler } from './bundler.ts';
import { Decoder, Encoder } from '@ndn/tlv';
import { Component, Data, Name } from '@ndn/packet';
import { Version } from '@ndn/naming-convention2';
import { StateVector } from '@ndn/svs';

/**
* NDN SVS Provider for Yjs. Wraps update into `SyncAgent`'s `update` channel.
Expand Down Expand Up @@ -30,14 +34,17 @@ export class NdnSvsAdaptor {
public syncAgent: SyncAgent,
public readonly doc: Y.Doc,
public readonly topic: string,
public readonly snapshotTopic: string = 'snapshot',
public readonly snapshotFrequency: number = 10,
useBundler: boolean = false,
) {
syncAgent.register('update', topic, (content) => this.handleSyncUpdate(content));
syncAgent.register('blob', snapshotTopic, (content) => this.handleSnapshotUpdate(content));
doc.on('update', this.callback);
if (useBundler) {
this.#bundler = new Bundler(
Y.mergeUpdates,
(content) => this.syncAgent.publishUpdate(this.topic, content),
(content) => this.publishUpdate(this.topic, content),
{
thresholdSize: 3000,
delayMs: 400,
Expand Down Expand Up @@ -96,7 +103,106 @@ export class NdnSvsAdaptor {
if (this.#bundler) {
await this.#bundler.produce(content);
} else {
await this.syncAgent.publishUpdate(this.topic, content);
await this.publishUpdate(this.topic, content);
}
}

private async publishUpdate(topic: string, content: Uint8Array) {
await this.syncAgent.publishUpdate(topic, content);

const stateVector = this.syncAgent.getUpdateSyncSV();
let count = 0;
for (const [_id, seq] of stateVector) {
count += seq;
}

if (count % this.snapshotFrequency == 0) {
const encodedSV = Encoder.encode(stateVector);

// NOTE: The following code depend on snapshot naming convention to work.
// Verify this part if there's a change in naming convention.
// NOTE: Currently naming convention is hard-coded. May need organizing.
const snapshotPrefix = this.syncAgent.appPrefix.append('32=snapshot');
// New SVS encodings
const snapshotName = snapshotPrefix.append(new Component(Version.type, encodedSV));

// Snapshot content generation
const content = Y.encodeStateAsUpdate(this.doc);
// its already in UInt8Array (binary), transporting currently without any additional encoding.
// use syncAgent's blob and publish mechanism
await this.syncAgent.publishBlob('snapshot', content, snapshotName, true);

// NOTE: The following code depend on snapshot naming convention to work.
// Verify this part if there's a change in naming convention.
// Race Condition note: Testing suggests that the write above with publishBlob()
// is near certainly done before the read happens below.
// Hence no delay is added.
// first segmented object is at /50=%00
const firstSegmentName = snapshotName.append('50=%00').toString();
const firstSegmentPacketEncoded = await this.syncAgent.persistStorage.get(firstSegmentName);
if (firstSegmentPacketEncoded) {
const firstSegmentPacket = Decoder.decode(firstSegmentPacketEncoded, Data);
await this.syncAgent.persistStorage.set(snapshotPrefix.toString(), Encoder.encode(firstSegmentPacket));
zjkmxy marked this conversation as resolved.
Show resolved Hide resolved
}
}
}

async handleSnapshotUpdate(snapshotName: Uint8Array) {
zjkmxy marked this conversation as resolved.
Show resolved Hide resolved
// Maybe it's wise to put this under a try() because it might fail due to network issues.
const decodedSnapshotName = Decoder.decode(snapshotName, Name);

// NOTE: The following code depend on snapshot naming convention to work.
// Verify this part if there's a change in naming convention.
const snapshotPrefix = this.syncAgent.appPrefix.append('32=snapshot');

// NOTE: The following code depend on snapshot naming convention to work.
// Verify this part if there's a change in naming convention.
const oldSnapshotFirstSegmentEncoded = await this.syncAgent.persistStorage.get(snapshotPrefix.toString());
let oldSVCount = 0;
if (oldSnapshotFirstSegmentEncoded) {
const oldSnapshotFirstSegment = Decoder.decode(oldSnapshotFirstSegmentEncoded, Data);
const oldSnapshotVector = Decoder.decode(oldSnapshotFirstSegment.name.at(-2).value, StateVector);
zjkmxy marked this conversation as resolved.
Show resolved Hide resolved
for (const [_id, seq] of oldSnapshotVector) {
oldSVCount += seq;
}
}

// NOTE: The following code depend on snapshot naming convention to work.
// Verify this part if there's a change in naming convention.
const snapshotSV = Decoder.decode(decodedSnapshotName.at(-1).value, StateVector);
let snapshotSVcount = 0;
for (const [_id, seq] of snapshotSV) {
snapshotSVcount += seq;
}

// NOTE: The following code depend on snapshot naming convention to work.
// Verify this part if there's a change in naming convention.
// NOTE: From Github Discussion:
// Though, this "update the snapshot response strategy on receiving new snapshot from SVS" logic is somewhat optional in nature.
// It is ran, such that if a blind fetch request reaches an endpoint, endpoint returns a good response.
// Just like snapshot responses, we don't have to guarantee absolute latest when it is about blind fetching.
// hence we can just use a rough "total count" for determining if it needs an update.
if (snapshotSVcount > oldSVCount) {
zjkmxy marked this conversation as resolved.
Show resolved Hide resolved
const firstSegmentName = decodedSnapshotName.append('50=%00').toString();
// Race Condition Note: The callback to here is faster than
// fetchBlob() finish writing to persistStore.
// (in syncAgent before listener callback to here)
// Tested getBlob() to guarantee item arrival
// But ends up having multiple active sessions of fetchBlob(). bad.
zjkmxy marked this conversation as resolved.
Show resolved Hide resolved
// Hence a delay of 1 second.
await new Promise((r) => setTimeout(r, 1000));
const firstSegmentPacketEncoded = await this.syncAgent.persistStorage.get(firstSegmentName);
if (firstSegmentPacketEncoded) {
const firstSegmentPacket = Decoder.decode(firstSegmentPacketEncoded, Data);
// utilize snapshotPrefix above, with the same namingConvention warning.
// this is done to update the key of the prefix so program return latest when blind fetching.
this.syncAgent.persistStorage.set(snapshotPrefix.toString(), Encoder.encode(firstSegmentPacket));
// should set snapshotPrefix to the newest packet.
} else {
console.debug('PersistentStorage doesnt have the snapshot yet. Skipping update.');
// If the above race condition fails (reads before data arrives),
// 'endpoint's blind fetch mechanism' is not updated to latest, should be fine.
}
}
}

Expand Down
15 changes: 14 additions & 1 deletion src/sync-agent/sync-agent.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import * as endpoint from '@ndn/endpoint';
import type { Forwarder } from '@ndn/fw';
import { Data, type Interest, Name, Signer, type Verifier } from '@ndn/packet';
import { Component, Data, type Interest, Name, Signer, type Verifier } from '@ndn/packet';
import { Decoder, Encoder } from '@ndn/tlv';
import { BufferChunkSource, DataProducer, fetch } from '@ndn/segmented-object';
import { concatBuffers, fromHex } from '@ndn/util';
Expand Down Expand Up @@ -353,6 +353,19 @@ export class SyncAgent implements AsyncDisposable {

async serve(interest: Interest) {
const intName = interest.name;

// NOTE: The following code depend on snapshot naming convention to work.
// Verify this part if there's a change in naming convention.
if (intName.get(this.appPrefix.length)?.equals(Component.from('32=snapshot'))) {
const wire = await this.persistStorage.get(intName.toString());
if (wire === undefined || wire.length === 0) {
// console.warn(`A remote peer is fetching a non-existing object: ${intName.toString()}`);
return undefined;
}
const data = Decoder.decode(wire, Data);
return data;
}

if (intName.length <= this.appPrefix.length + 1) {
// The name should be at least two components plus app prefix
return undefined;
Expand Down