Skip to content

Commit

Permalink
Fix reconciliation algo
Browse files Browse the repository at this point in the history
For months now the reconciliation algo has been plagued by bugs
surrounding the ethereumjs/trie library. We've opened many tickets on
Github:

- #146
- #131
- ethereumjs/ethereumjs-monorepo#3264
- ethereumjs/ethereumjs-monorepo#3645

The pattern of the problem was always that somehow that `trie.root()`
couldn't be found using `trie.checkRoot`, which seemed almost like a
contradiction, especially when doing `await
trie.checkRoot(trie.root())`.

We had initially introduced the checkpointing of the trie because of
some rather theoretical problem regarding what would happen if during
the reconciliation the trie updates and, at the same times, sends level
comparisons to a peer. So to use checkpointing for us was primarily used
to implement atomicity when storing data. We wanted to just store the
remote trie's leaves in batches as to make sure not to interrupt the
algorithm to compare the trie's levels.

At the same time, the insertion of new leaves into such a trie is costly
as a big part of its hashes have to be recomputed to arrive at a new
root.

However, I think what has happened with our implementation of the
sync.put method is that the checkpointing led to the trie writes not
being processed sequentially which also lead to all sorts of problems
in the reconciliation.

The reconciliation is purposefully built in a way where it first
synchronizes old leaves and only then new leaves. While a working
reconciliation doesn't have any issues with storing comments, a
fundamentally asynchronous reconciliation will attempt to store comments
where the original upvote hasn't been made yet, leading to the message
not being processed initially.

Another big problem ended up being that the ethereumjs/trie library
isn't mature with regards to handling the application shutting down, and
so a lot of the above mentioned issues actually describe the
ethereumjs/trie library reaching a non-recoverable state.

Funnily enough, however, all it took to fix all of the above problems
was to remove all notions of checkpointing and commits. While it does
make the reconciliation algorithm MUCH slower (because it is now
synchronous), it also made it much more reliable and almost free of
errors during interaction.
  • Loading branch information
TimDaub committed Sep 10, 2024
1 parent 806c8a4 commit 52228ec
Show file tree
Hide file tree
Showing 7 changed files with 119 additions and 82 deletions.
9 changes: 6 additions & 3 deletions contributing.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,20 @@ Hey, thanks for considering to help us out with the project. Before starting to
3. Set your `OPTIMISM_RPC_HTTP_HOST` in the `.env` files
4. Install the dependencies for the API with `npm i` and for the UI `cd src/web && npm i`
5. Sync with the network using `npm run sync` and when you see it is looping over the same block/address it means we are probably done getting up to date with the network's state.
6. Start the server using `npm run dev:anon`
6. Next run the node in reconciliation mode to catch up with the latest state using `npm run reconcile`. This will take a while and the frontend will not work in this mode! There is a log line which gets incremented each time a new message is successfully stored ("Number of messages added: X"). Watch out for this line and cross check with the chart at "Cumulative Total Messages" https://news.kiwistand.com/basics to understand when you're done reconciling.
7. Once you've reached roughly the messages, you can now start the server using `npm run dev:anon`

```bash
# This is a script to automate the 'Getting started' guide, help us improve it!
# This is a script to automate the 'Getting started' guide. You WON'T be able to just
# copy and paste this, but it may help you to understand the required steps.
mkdir anon cache
cp .env-copy .env
replacement="OPTIMISM_RPC_HTTP_HOST=YOUR_PRIVATE_URL_HERE"
sed -i "1s/.*/$replacement/" .env # sed magic to replace the first line of the .env file with the OPTIMISM stuff
npm i
cd src/web && npm i && cd ..
npm run sync
npm run reconcile
npm run dev:anon
```
## Project layout
Expand All @@ -26,4 +29,4 @@ We use ES module (`.mjs` extension) for their simplicity and convenience. The mo

Those files are kind of the center of the project which allows you to understand why all the others exist.

Thanks for the help!
Thanks for the help!
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
"dev": "cross-env DEBUG=\"*@attestate/*,-@attestate/delegator2*\" concurrently \"npm run watch:web\" \"npm run start\"",
"start": "node -r dotenv/config ./src/launch.mjs",
"watch": "nodemon --watch src/views --exec 'npm run dev:anon'",
"reconcile": "NODE_ENV=reconcile npm run dev:anon",
"watch:web": "cd src/web && npm run dev",
"dev:bootstrap": "cross-env API_PORT=8443 HTTP_PORT=80 DATA_DIR=bootstrap BIND_ADDRESS_V4=\"127.0.0.1\" IS_BOOTSTRAP_NODE=true USE_EPHEMERAL_ID=false npm run dev",
"dev:anon:local": "cross-env API_PORT=8443 HTTP_PORT=4001 DATA_DIR=anonlocal BIND_ADDRESS_V4=\"127.0.0.1\" PORT=0 IS_BOOTSTRAP_NODE=false USE_EPHEMERAL_ID=true npm run dev",
Expand Down
6 changes: 6 additions & 0 deletions readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,12 @@ In your `.env` file, replace the value of `OPTIMISM_RPC_HTTP_HOST` with your ful

You can also watch the video explaining [how to get started editing the Kiwi News frontend](https://www.loom.com/share/e0e8866450d54c52b161e77907d1ccb9).

## Syncing to the network

Kiwistand isn't venture capital funded and so its algorithm is a bit rough around the edges. Please don't expect a super polished piece of software! We're working with constraints. That said, despite all of the challenges, we're trying to keep the set reconciliation algorithm as best in shape as we can! If you're running into issues, please make sure to reach out to Tim on Telegram.

Now, for actually syncing the node, please follow the guide in `Contributing.md`!

## Debugging

Once you're up and running, you might want to submit new links to the network. However, we encourage you to NOT do that on the main net.
Expand Down
145 changes: 84 additions & 61 deletions src/launch.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,34 @@ import * as newest from "./views/new.mjs";
import * as feeds from "./feeds.mjs";
import * as moderation from "./views/moderation.mjs";

const reconcileMode = env.NODE_ENV === "reconcile";
if (reconcileMode) {
log(`Running in reconciliation mode`);
}

const trie = await store.create();
if (!(await trie.checkRoot(trie.root()))) {
log("Couldn't find root, trying to recover from last valid root");
const lastRoot = await store.getLastValidRoot();

if (lastRoot) {
await trie.root(Buffer.from(lastRoot, "hex"));
log(`Using last root ${lastRoot} to recover`);
console.log(await trie.checkRoot(trie.root()), trie.root());
} else {
log("Failed to recover from missing root");
}
}

const node = await start(config);

await api.launch(trie, node);
await http.launch(trie, node);
if (!reconcileMode) {
await api.launch(trie, node);
await http.launch(trie, node);

crawl(mintCrawlPath);
crawl(delegateCrawlPath);
crawl(mintCrawlPath);
crawl(delegateCrawlPath);
}

// NOTE: We're passing in the trie here as we don't want to make it globally
// available to run more than one node in the tests
Expand All @@ -45,66 +64,70 @@ await subscribe(
trie,
);

// NOTE: This request queries all messages in the database to enable caching
// when calling ecrecover on messages' signatures
const from = null;
const amount = null;
const startDatetime = null;
const parser = JSON.parse;
const accounts = await registry.accounts();
const delegations = await registry.delegations();
const href = null;
if (!reconcileMode) {
// NOTE: This request queries all messages in the database to enable caching
// when calling ecrecover on messages' signatures
const from = null;
const amount = null;
const startDatetime = null;
const parser = JSON.parse;
const accounts = await registry.accounts();
const delegations = await registry.delegations();
const href = null;

let upvotes, comments;
await Promise.allSettled([
(async () => {
upvotes = await store.posts(
trie,
from,
amount,
parser,
startDatetime,
accounts,
delegations,
href,
"amplify",
);
})(),
(async () => {
comments = await store.posts(
trie,
from,
amount,
parser,
startDatetime,
accounts,
delegations,
href,
"comment",
);
})(),
]);
let upvotes, comments;
await Promise.allSettled([
store
.posts(
trie,
from,
amount,
parser,
startDatetime,
accounts,
delegations,
href,
"amplify",
)
.then((result) => (upvotes = result))
.catch((error) => console.error("Amplify posts error:", error)),
store
.posts(
trie,
from,
amount,
parser,
startDatetime,
accounts,
delegations,
href,
"comment",
)
.then((result) => (comments = result))
.catch((error) => console.error("Comment posts error:", error)),
]);

const alreadySetup = cache.initialize();
if (!alreadySetup) {
[...upvotes, ...comments].forEach(cache.insertMessage);
}
const alreadySetup = cache.initialize();
if (!alreadySetup) {
[...upvotes, ...comments].forEach(cache.insertMessage);
}

try {
store.cache(upvotes, comments);
} catch (err) {
log(
`launch: An irrecoverable error during upvote caching occurred. "${err.toString()}`,
);
exit(1);
}
try {
store.cache(upvotes, comments);
} catch (err) {
log(
`launch: An irrecoverable error during upvote caching occurred. "${err.toString()}`,
);
exit(1);
}

const urls = await moderation.getFeeds();
await feeds.recompute(urls);
// TODO: Unclear if this is still necessary
setInterval(async () => {
const urls = await moderation.getFeeds();
await feeds.recompute(urls);
// TODO: Unclear if this is still necessary
setInterval(async () => {
await feeds.recompute(urls);
await newest.recompute(trie);
}, 1800000);
await newest.recompute(trie);
}, 1800000);
await newest.recompute(trie);
karma.count(upvotes);
karma.count(upvotes);
}
19 changes: 15 additions & 4 deletions src/store.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,7 @@ export function upvoteID(identity, link, type) {
return `${utils.getAddress(identity)}|${normalizeUrl(link)}|${type}`;
}

let messagesAdded = 0;
async function atomicPut(trie, message, identity, accounts, delegations) {
const marker = upvoteID(identity, message.href, message.type);
const { canonical, index } = toDigest(message);
Expand All @@ -279,6 +280,9 @@ async function atomicPut(trie, message, identity, accounts, delegations) {
}
}

// NOTE on 2024-09-10: Not sure why we're nesting try catchs here, I'm pretty
// sure this wasn't initially intended so if we end up touching this part of
// the code again, it may make sense to remove it.
try {
await trie.put(Buffer.from(index, "hex"), canonical);
try {
Expand All @@ -290,7 +294,13 @@ async function atomicPut(trie, message, identity, accounts, delegations) {
} catch (err) {
// NOTE: insertMessage is just a cache, so if this operation fails, we
// want the protocol to continue to execute as normally.
log(`Inserting message threw: ${JSON.stringify(message)}`);
log(
`Inserting message threw: ${JSON.stringify(
message,
)}, and error. If this is an error about inserting the message into the cache, it may be ignored as it is uncritical ${
err.stack
}`,
);
}
// TODO: Remove and replace with SQLite implementation
if (message.type === "comment") {
Expand All @@ -310,15 +320,17 @@ async function atomicPut(trie, message, identity, accounts, delegations) {
// cached data structure that gets recomputed upon every restart of the
// application.
const result = upvotes.delete(marker);
let reason = `trie.put failed with "${err.toString()}". Successfully rolled back constraint`;
let reason = `trie.put failed with "${err.stack}". Successfully rolled back constraint`;
if (!result) {
reason = `trie.put failed with "${err.toString()}". Tried to roll back constraint but failed`;
reason = `trie.put failed with "${err.stack}". Tried to roll back constraint but failed`;
}
log(reason);
throw new Error(reason);
}
log(`Stored message with index "${index}"`);
log(`New root: "${trie.root().toString("hex")}"`);
messagesAdded += 1;
log(`Number of messages added: ${messagesAdded}`);

return {
index,
Expand Down Expand Up @@ -554,7 +566,6 @@ export async function leaves(
const nodes = [];

let pointer = 0;
log(`leaves: Does trie have checkpoints? "${trie.hasCheckpoints()}"`);
log(`leaves: Trie root "${root.toString("hex")}"`);
for await (const [node] of walkTrieDfs(trie, root, [])) {
if (Number.isInteger(amount) && nodes.length >= amount) {
Expand Down
13 changes: 0 additions & 13 deletions src/sync.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -445,7 +445,6 @@ export function handleLeaves(trie, peerFab) {
return;
}

if (!trie.hasCheckpoints()) trie.checkpoint();
log("handleLeaves: Received leaves and storing them in db");

try {
Expand All @@ -458,21 +457,9 @@ export function handleLeaves(trie, peerFab) {
await put(trie, message, allowlist, delegations, accounts);
} catch (err) {
elog(err, "handleLeaves: Unexpected error");
await trie.revert();
peerFab.set();
}

// NOTE: While there could be a strategy where we continuously stay in a
// checkpoint the entire time when the synchronization is going one, this
// seems detrimental to the mechanism, in that it introduces a high-stakes
// operation towards the very end where after many minutes of back and
// forth all data is being committed into the trie. So right now it seems
// more robust if we hence open a checkpoint the first time new levels are
// sent, and we close it by the time leaves are being received. While this
// means that practically for every newly received leaf, the
// synchronization starts over again, it sequentializes downloading the
// leaves into many sub tasks which are more likely to succeed.
await trie.commit();
peerFab.set();
});
}
Expand Down
8 changes: 7 additions & 1 deletion src/utils.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -35,5 +35,11 @@ export function elog(err, msg) {
if (msg) {
console.error(`Message: ${msg}`);
}
console.error(`Stack Trace: ${err.stack}`);
if (err && err.stack) {
console.error(`Stack Trace: ${err.stack}`);
} else if (err) {
console.error(`Error: ${err}`);
} else {
console.error(`Error wasn't defined in elog: ${err}`);
}
}

0 comments on commit 52228ec

Please sign in to comment.