Skip to content

Commit

Permalink
support multiple concurrent negentropy trees
Browse files Browse the repository at this point in the history
  • Loading branch information
hoytech committed Sep 4, 2024
1 parent 0067bea commit 3f5899d
Show file tree
Hide file tree
Showing 14 changed files with 367 additions and 134 deletions.
6 changes: 6 additions & 0 deletions golpe.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ tables:
fields:
- name: dbVersion
- name: endianness
- name: negentropyModificationCounter

## Meta-info of nostr events, suitable for indexing
## Primary key is auto-incremented, called "levId" for Local EVent ID
Expand Down Expand Up @@ -76,6 +77,11 @@ tables:
- name: dict
type: ubytes

NegentropyFilter:
fields:
- name: filter
type: string

tablesRaw:
## Raw nostr event JSON, possibly compressed
## keys are levIds
Expand Down
59 changes: 59 additions & 0 deletions src/NegentropyFilterCache.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
#pragma once

#include <string>

#include <negentropy/storage/BTreeLMDB.h>

#include "golpe.h"

#include "filters.h"
#include "PackedEvent.h"


struct NegentropyFilterCache {
struct FilterInfo {
NostrFilter f;
uint64_t treeId;
};

std::vector<FilterInfo> filters;
uint64_t modificationCounter = 0;

void ctx(lmdb::txn &txn, const std::function<void(const std::function<void(const PackedEventView &, bool)> &)> &cb) {
freshenCache(txn);

std::vector<std::unique_ptr<negentropy::storage::BTreeLMDB>> storages(filters.size());

cb([&](const PackedEventView &ev, bool insert){
for (size_t i = 0; i < filters.size(); i++) {
const auto &filter = filters[i];

if (!filter.f.doesMatch(ev)) continue;

if (!storages[i]) storages[i] = std::make_unique<negentropy::storage::BTreeLMDB>(txn, negentropyDbi, filter.treeId);

if (insert) storages[i]->insert(ev.created_at(), ev.id());
else storages[i]->erase(ev.created_at(), ev.id());
}
});
}

private:
void freshenCache(lmdb::txn &txn) {
uint64_t curr = env.lookup_Meta(txn, 1)->negentropyModificationCounter();

if (curr != modificationCounter) {
filters.clear();

env.foreach_NegentropyFilter(txn, [&](auto &f){
filters.emplace_back(
NostrFilter(tao::json::from_string(f.filter()), MAX_U64),
f.primaryKeyId
);
return true;
});

modificationCounter = curr;
}
}
};
4 changes: 3 additions & 1 deletion src/WriterPipeline.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ struct WriterPipeline {
writerThread = std::thread([&]() {
setThreadName("Writer");

NegentropyFilterCache neFilterCache;

while (1) {
// Debounce

Expand Down Expand Up @@ -141,7 +143,7 @@ struct WriterPipeline {
if (newEventsToProc.size()) {
{
auto txn = env.txn_rw();
writeEvents(txn, newEventsToProc);
writeEvents(txn, neFilterCache, newEventsToProc);
txn.commit();
}

Expand Down
7 changes: 2 additions & 5 deletions src/apps/dbutils/cmd_delete.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,13 +68,10 @@ void cmd_delete(const std::vector<std::string> &subArgs) {

{
auto txn = env.txn_rw();
negentropy::storage::BTreeLMDB negentropyStorage(txn, negentropyDbi, 0);
NegentropyFilterCache neFilterCache;

for (auto levId : levIds) {
deleteEvent(txn, levId, negentropyStorage);
}
deleteEvents(txn, neFilterCache, levIds);

negentropyStorage.flush();
txn.commit();
}
}
122 changes: 122 additions & 0 deletions src/apps/dbutils/cmd_negentropy.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
#include <iostream>

#include <docopt.h>
#include "golpe.h"

#include "NegentropyFilterCache.h"
#include "events.h"
#include "DBQuery.h"


static const char USAGE[] =
R"(
Usage:
negentropy list
negentropy add <filter>
negentropy build <treeId>
)";


static void increaseModCounter(lmdb::txn &txn) {
auto m = env.lookup_Meta(txn, 1);
if (!m) throw herr("no Meta entry?");
env.update_Meta(txn, *m, { .negentropyModificationCounter = m->negentropyModificationCounter() + 1 });
}


void cmd_negentropy(const std::vector<std::string> &subArgs) {
std::map<std::string, docopt::value> args = docopt::docopt(USAGE, subArgs, true, "");

if (args["list"].asBool()) {
auto txn = env.txn_ro();

env.foreach_NegentropyFilter(txn, [&](auto &f){
auto treeId = f.primaryKeyId;

std::cout << "tree " << treeId << "\n";
std::cout << " filter: " << f.filter() << "\n";

negentropy::storage::BTreeLMDB storage(txn, negentropyDbi, treeId);
auto size = storage.size();
std::cout << " size: " << size << "\n";
std::cout << " fingerprint: " << to_hex(storage.fingerprint(0, size).sv()) << "\n";

return true;
});
} else if (args["add"].asBool()) {
std::string filterStr = args["<filter>"].asString();

tao::json::value filterJson = tao::json::from_string(filterStr);
auto compiledFilter = NostrFilterGroup::unwrapped(filterJson);

if (compiledFilter.filters.size() == 1 && (compiledFilter.filters[0].since != 0 || compiledFilter.filters[0].until != MAX_U64)) {
throw herr("single filters should not have since/until");
}
if (compiledFilter.filters.size() == 0) throw herr("filter will never match");

filterStr = tao::json::to_string(filterJson); // make canonical

auto txn = env.txn_rw();
increaseModCounter(txn);

env.foreach_NegentropyFilter(txn, [&](auto &f){
if (f.filter() == filterStr) throw herr("filter already exists as tree: ", f.primaryKeyId);
return true;
});

uint64_t treeId = env.insert_NegentropyFilter(txn, filterStr);
txn.commit();

std::cout << "created tree " << treeId << "\n";
std::cout << " to populate, run: strfry negentropy build " << treeId << "\n";
} else if (args["build"].asBool()) {
uint64_t treeId = args["<treeId>"].asLong();

struct Record {
uint64_t created_at;
uint8_t id[32];
};

std::vector<Record> recs;

auto txn = env.txn_rw(); // FIXME: split this into a read-only phase followed by a write
increaseModCounter(txn);

// Get filter

std::string filterStr;

{
auto view = env.lookup_NegentropyFilter(txn, treeId);
if (!view) throw herr("couldn't find treeId: ", treeId);
filterStr = view->filter();
}

// Query all matching events

DBQuery query(tao::json::from_string(filterStr));

while (1) {
bool complete = query.process(txn, [&](const auto &sub, uint64_t levId){
auto ev = lookupEventByLevId(txn, levId);
auto packed = PackedEventView(ev.buf);
recs.push_back({ packed.created_at(), });
memcpy(recs.back().id, packed.id().data(), 32);
});

if (complete) break;
}

// Store events in negentropy tree

negentropy::storage::BTreeLMDB storage(txn, negentropyDbi, treeId);

for (const auto &r : recs) {
storage.insert(r.created_at, std::string_view((char*)r.id, 32));
}

storage.flush();

txn.commit();
}
}
69 changes: 43 additions & 26 deletions src/apps/mesh/cmd_sync.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,39 +50,56 @@ void cmd_sync(const std::vector<std::string> &subArgs) {
tao::json::value filterJson = tao::json::from_string(filterStr);
auto filterCompiled = NostrFilterGroup::unwrapped(filterJson);


bool isFullDbQuery = filterCompiled.isFullDbQuery();
std::optional<uint64_t> treeId;
negentropy::storage::Vector storageVector;

if (!isFullDbQuery) {
DBQuery query(filterJson);
Decompressor decomp;

{
auto txn = env.txn_ro();

uint64_t numEvents = 0;
std::vector<uint64_t> levIds;
auto filterJsonNoTimes = filterJson;
if (filterJsonNoTimes.is_object()) {
filterJsonNoTimes.get_object().erase("since");
filterJsonNoTimes.get_object().erase("until");
}
auto filterJsonNoTimesStr = tao::json::to_string(filterJsonNoTimes);

while (1) {
bool complete = query.process(txn, [&](const auto &sub, uint64_t levId){
levIds.push_back(levId);
numEvents++;
});
env.foreach_NegentropyFilter(txn, [&](auto &f){
if (f.filter() == filterJsonNoTimesStr) {
treeId = f.primaryKeyId;
return false;
}
return true;
});

if (complete) break;
}
if (!treeId) {
DBQuery query(filterJson);
Decompressor decomp;

std::sort(levIds.begin(), levIds.end());
uint64_t numEvents = 0;
std::vector<uint64_t> levIds;

for (auto levId : levIds) {
auto ev = lookupEventByLevId(txn, levId);
PackedEventView packed(ev.buf);
storageVector.insert(packed.created_at(), packed.id());
}
while (1) {
bool complete = query.process(txn, [&](const auto &sub, uint64_t levId){
levIds.push_back(levId);
numEvents++;
});

LI << "Filter matches " << numEvents << " events";
if (complete) break;
}

storageVector.seal();
std::sort(levIds.begin(), levIds.end());

for (auto levId : levIds) {
auto ev = lookupEventByLevId(txn, levId);
PackedEventView packed(ev.buf);
storageVector.insert(packed.created_at(), packed.id());
}

LI << "Filter matches " << numEvents << " events";

storageVector.seal();
}
}


Expand All @@ -98,8 +115,8 @@ void cmd_sync(const std::vector<std::string> &subArgs) {
auto txn = env.txn_ro();
std::string neMsg;

if (isFullDbQuery) {
negentropy::storage::BTreeLMDB storageBtree(txn, negentropyDbi, 0);
if (treeId) {
negentropy::storage::BTreeLMDB storageBtree(txn, negentropyDbi, *treeId);

const auto &f = filterCompiled.filters.at(0);
negentropy::storage::SubRange subStorage(storageBtree, negentropy::Bound(f.since), negentropy::Bound(f.until == MAX_U64 ? MAX_U64 : f.until + 1));
Expand Down Expand Up @@ -151,8 +168,8 @@ void cmd_sync(const std::vector<std::string> &subArgs) {
try {
auto inputMsg = from_hex(msg.at(2).get_string());

if (isFullDbQuery) {
negentropy::storage::BTreeLMDB storageBtree(txn, negentropyDbi, 0);
if (treeId) {
negentropy::storage::BTreeLMDB storageBtree(txn, negentropyDbi, *treeId);

const auto &f = filterCompiled.filters.at(0);
negentropy::storage::SubRange subStorage(storageBtree, negentropy::Bound(f.since), negentropy::Bound(f.until == MAX_U64 ? MAX_U64 : f.until + 1));
Expand Down
9 changes: 2 additions & 7 deletions src/apps/relay/RelayCron.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,10 @@ void RelayServer::runCron() {

if (expiredLevIds.size() > 0) {
auto txn = env.txn_rw();
negentropy::storage::BTreeLMDB negentropyStorage(txn, negentropyDbi, 0);
NegentropyFilterCache neFilterCache;

uint64_t numDeleted = 0;
uint64_t numDeleted = deleteEvents(txn, neFilterCache, expiredLevIds);

for (auto levId : expiredLevIds) {
if (deleteEvent(txn, levId, negentropyStorage)) numDeleted++;
}

negentropyStorage.flush();
txn.commit();

if (numDeleted) LI << "Deleted " << numDeleted << " events (ephemeral=" << numEphemeral << " expired=" << numExpired << ")";
Expand Down
13 changes: 10 additions & 3 deletions src/apps/relay/RelayIngester.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -134,15 +134,22 @@ void RelayServer::ingesterProcessNegentropy(lmdb::txn &txn, Decompressor &decomp
if (arr.at(0) == "NEG-OPEN") {
if (arr.get_array().size() < 4) throw herr("negentropy query missing elements");

NostrFilterGroup filter;
auto maxFilterLimit = cfg().relay__negentropy__maxSyncEvents + 1;

filter = std::move(NostrFilterGroup::unwrapped(arr.at(2), maxFilterLimit));
auto filterJson = arr.at(2);

NostrFilterGroup filter = NostrFilterGroup::unwrapped(filterJson, maxFilterLimit);
Subscription sub(connId, arr[1].get_string(), std::move(filter));

if (filterJson.is_object()) {
filterJson.get_object().erase("since");
filterJson.get_object().erase("until");
}
std::string filterStr = tao::json::to_string(filterJson);

std::string negPayload = from_hex(arr.at(3).get_string());

tpNegentropy.dispatch(connId, MsgNegentropy{MsgNegentropy::NegOpen{std::move(sub), std::move(negPayload)}});
tpNegentropy.dispatch(connId, MsgNegentropy{MsgNegentropy::NegOpen{std::move(sub), std::move(filterStr), std::move(negPayload)}});
} else if (arr.at(0) == "NEG-MSG") {
std::string negPayload = from_hex(arr.at(2).get_string());
tpNegentropy.dispatch(connId, MsgNegentropy{MsgNegentropy::NegMsg{connId, SubId(arr[1].get_string()), std::move(negPayload)}});
Expand Down
Loading

0 comments on commit 3f5899d

Please sign in to comment.