Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
hoytech committed Sep 4, 2024
1 parent 97186c2 commit 7f92857
Show file tree
Hide file tree
Showing 6 changed files with 72 additions and 36 deletions.
2 changes: 2 additions & 0 deletions src/NegentropyFilterCache.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

#include <string>

#include <negentropy/storage/BTreeLMDB.h>

#include "golpe.h"

#include "filters.h"
Expand Down
69 changes: 43 additions & 26 deletions src/apps/mesh/cmd_sync.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,39 +50,56 @@ void cmd_sync(const std::vector<std::string> &subArgs) {
tao::json::value filterJson = tao::json::from_string(filterStr);
auto filterCompiled = NostrFilterGroup::unwrapped(filterJson);


bool isFullDbQuery = filterCompiled.isFullDbQuery();
std::optional<uint64_t> treeId;
negentropy::storage::Vector storageVector;

if (!isFullDbQuery) {
DBQuery query(filterJson);
Decompressor decomp;

{
auto txn = env.txn_ro();

uint64_t numEvents = 0;
std::vector<uint64_t> levIds;
auto filterJsonNoTimes = filterJson;
if (filterJsonNoTimes.is_object()) {
filterJsonNoTimes.get_object().erase("since");
filterJsonNoTimes.get_object().erase("until");
}
auto filterJsonNoTimesStr = tao::json::to_string(filterJsonNoTimes);

while (1) {
bool complete = query.process(txn, [&](const auto &sub, uint64_t levId){
levIds.push_back(levId);
numEvents++;
});
env.foreach_NegentropyFilter(txn, [&](auto &f){
if (f.filter() == filterJsonNoTimesStr) {
treeId = f.primaryKeyId;
return false;
}
return true;
});

if (complete) break;
}
if (!treeId) {
DBQuery query(filterJson);
Decompressor decomp;

std::sort(levIds.begin(), levIds.end());
uint64_t numEvents = 0;
std::vector<uint64_t> levIds;

for (auto levId : levIds) {
auto ev = lookupEventByLevId(txn, levId);
PackedEventView packed(ev.buf);
storageVector.insert(packed.created_at(), packed.id());
}
while (1) {
bool complete = query.process(txn, [&](const auto &sub, uint64_t levId){
levIds.push_back(levId);
numEvents++;
});

LI << "Filter matches " << numEvents << " events";
if (complete) break;
}

storageVector.seal();
std::sort(levIds.begin(), levIds.end());

for (auto levId : levIds) {
auto ev = lookupEventByLevId(txn, levId);
PackedEventView packed(ev.buf);
storageVector.insert(packed.created_at(), packed.id());
}

LI << "Filter matches " << numEvents << " events";

storageVector.seal();
}
}


Expand All @@ -98,8 +115,8 @@ void cmd_sync(const std::vector<std::string> &subArgs) {
auto txn = env.txn_ro();
std::string neMsg;

if (isFullDbQuery) {
negentropy::storage::BTreeLMDB storageBtree(txn, negentropyDbi, 0);
if (treeId) {
negentropy::storage::BTreeLMDB storageBtree(txn, negentropyDbi, *treeId);

const auto &f = filterCompiled.filters.at(0);
negentropy::storage::SubRange subStorage(storageBtree, negentropy::Bound(f.since), negentropy::Bound(f.until == MAX_U64 ? MAX_U64 : f.until + 1));
Expand Down Expand Up @@ -151,8 +168,8 @@ void cmd_sync(const std::vector<std::string> &subArgs) {
try {
auto inputMsg = from_hex(msg.at(2).get_string());

if (isFullDbQuery) {
negentropy::storage::BTreeLMDB storageBtree(txn, negentropyDbi, 0);
if (treeId) {
negentropy::storage::BTreeLMDB storageBtree(txn, negentropyDbi, *treeId);

const auto &f = filterCompiled.filters.at(0);
negentropy::storage::SubRange subStorage(storageBtree, negentropy::Bound(f.since), negentropy::Bound(f.until == MAX_U64 ? MAX_U64 : f.until + 1));
Expand Down
13 changes: 10 additions & 3 deletions src/apps/relay/RelayIngester.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -134,15 +134,22 @@ void RelayServer::ingesterProcessNegentropy(lmdb::txn &txn, Decompressor &decomp
if (arr.at(0) == "NEG-OPEN") {
if (arr.get_array().size() < 4) throw herr("negentropy query missing elements");

NostrFilterGroup filter;
auto maxFilterLimit = cfg().relay__negentropy__maxSyncEvents + 1;

filter = std::move(NostrFilterGroup::unwrapped(arr.at(2), maxFilterLimit));
auto filterJson = arr.at(2);

NostrFilterGroup filter = NostrFilterGroup::unwrapped(filterJson, maxFilterLimit);
Subscription sub(connId, arr[1].get_string(), std::move(filter));

if (filterJson.is_object()) {
filterJson.get_object().erase("since");
filterJson.get_object().erase("until");
}
std::string filterStr = tao::json::to_string(filterJson);

std::string negPayload = from_hex(arr.at(3).get_string());

tpNegentropy.dispatch(connId, MsgNegentropy{MsgNegentropy::NegOpen{std::move(sub), std::move(negPayload)}});
tpNegentropy.dispatch(connId, MsgNegentropy{MsgNegentropy::NegOpen{std::move(sub), std::move(filterStr), std::move(negPayload)}});
} else if (arr.at(0) == "NEG-MSG") {
std::string negPayload = from_hex(arr.at(2).get_string());
tpNegentropy.dispatch(connId, MsgNegentropy{MsgNegentropy::NegMsg{connId, SubId(arr[1].get_string()), std::move(negPayload)}});
Expand Down
22 changes: 16 additions & 6 deletions src/apps/relay/RelayNegentropy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ struct NegentropyViews {

struct StatelessView {
Subscription sub;
uint64_t treeId;
};

using UserView = std::variant<MemoryView, StatelessView>;
Expand All @@ -42,7 +43,7 @@ struct NegentropyViews {
return true;
}

bool addStatelessView(uint64_t connId, const SubId &subId, Subscription &&sub) {
bool addStatelessView(uint64_t connId, const SubId &subId, Subscription &&sub, uint64_t treeId) {
{
auto *existing = findView(connId, subId);
if (existing) removeView(connId, subId);
Expand All @@ -55,7 +56,7 @@ struct NegentropyViews {
return false;
}

connViews.try_emplace(subId, UserView{ StatelessView{ std::move(sub), } });
connViews.try_emplace(subId, UserView{ StatelessView{ std::move(sub), treeId, } });

return true;
}
Expand Down Expand Up @@ -188,15 +189,24 @@ void RelayServer::runNegentropy(ThreadPool<MsgNegentropy>::Thread &thr) {
if (auto msg = std::get_if<MsgNegentropy::NegOpen>(&newMsg.msg)) {
auto connId = msg->sub.connId;
auto subId = msg->sub.subId;
std::optional<uint64_t> treeId;

if (msg->sub.filterGroup.isFullDbQuery()) {
negentropy::storage::BTreeLMDB storage(txn, negentropyDbi, 0);
env.foreach_NegentropyFilter(txn, [&](auto &f){
if (f.filter() == msg->filterStr) {
treeId = f.primaryKeyId;
return false;
}
return true;
});

if (treeId) {
negentropy::storage::BTreeLMDB storage(txn, negentropyDbi, *treeId);

const auto &f = msg->sub.filterGroup.filters.at(0);
negentropy::storage::SubRange subStorage(storage, negentropy::Bound(f.since), negentropy::Bound(f.until == MAX_U64 ? MAX_U64 : f.until + 1));
handleReconcile(connId, subId, subStorage, msg->negPayload);

if (!views.addStatelessView(connId, subId, std::move(msg->sub))) {
if (!views.addStatelessView(connId, subId, std::move(msg->sub), *treeId)) {
queries.removeSub(connId, subId);
sendNoticeError(connId, std::string("too many concurrent NEG requests"));
}
Expand Down Expand Up @@ -231,7 +241,7 @@ void RelayServer::runNegentropy(ThreadPool<MsgNegentropy>::Thread &thr) {
}
handleReconcile(msg->connId, msg->subId, view->storageVector, msg->negPayload);
} else if (auto *view = std::get_if<NegentropyViews::StatelessView>(userView)) {
negentropy::storage::BTreeLMDB storage(txn, negentropyDbi, 0);
negentropy::storage::BTreeLMDB storage(txn, negentropyDbi, view->treeId);

const auto &f = view->sub.filterGroup.filters.at(0);
negentropy::storage::SubRange subStorage(storage, negentropy::Bound(f.since), negentropy::Bound(f.until == MAX_U64 ? MAX_U64 : f.until + 1));
Expand Down
1 change: 1 addition & 0 deletions src/apps/relay/RelayServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ struct MsgReqMonitor : NonCopyable {
struct MsgNegentropy : NonCopyable {
struct NegOpen {
Subscription sub;
std::string filterStr;
std::string negPayload;
};

Expand Down
1 change: 0 additions & 1 deletion src/events.h
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
#pragma once

#include <secp256k1_schnorrsig.h>
#include <negentropy/storage/BTreeLMDB.h>

#include "golpe.h"

Expand Down

0 comments on commit 7f92857

Please sign in to comment.