diff --git a/src/NegentropyFilterCache.h b/src/NegentropyFilterCache.h index 9c5c5cf5..39f24b92 100644 --- a/src/NegentropyFilterCache.h +++ b/src/NegentropyFilterCache.h @@ -2,6 +2,8 @@ #include +#include + #include "golpe.h" #include "filters.h" diff --git a/src/apps/mesh/cmd_sync.cpp b/src/apps/mesh/cmd_sync.cpp index 9b017cbd..76ececac 100644 --- a/src/apps/mesh/cmd_sync.cpp +++ b/src/apps/mesh/cmd_sync.cpp @@ -50,39 +50,56 @@ void cmd_sync(const std::vector &subArgs) { tao::json::value filterJson = tao::json::from_string(filterStr); auto filterCompiled = NostrFilterGroup::unwrapped(filterJson); - - bool isFullDbQuery = filterCompiled.isFullDbQuery(); + std::optional treeId; negentropy::storage::Vector storageVector; - if (!isFullDbQuery) { - DBQuery query(filterJson); - Decompressor decomp; + { auto txn = env.txn_ro(); - uint64_t numEvents = 0; - std::vector levIds; + auto filterJsonNoTimes = filterJson; + if (filterJsonNoTimes.is_object()) { + filterJsonNoTimes.get_object().erase("since"); + filterJsonNoTimes.get_object().erase("until"); + } + auto filterJsonNoTimesStr = tao::json::to_string(filterJsonNoTimes); - while (1) { - bool complete = query.process(txn, [&](const auto &sub, uint64_t levId){ - levIds.push_back(levId); - numEvents++; - }); + env.foreach_NegentropyFilter(txn, [&](auto &f){ + if (f.filter() == filterJsonNoTimesStr) { + treeId = f.primaryKeyId; + return false; + } + return true; + }); - if (complete) break; - } + if (!treeId) { + DBQuery query(filterJson); + Decompressor decomp; - std::sort(levIds.begin(), levIds.end()); + uint64_t numEvents = 0; + std::vector levIds; - for (auto levId : levIds) { - auto ev = lookupEventByLevId(txn, levId); - PackedEventView packed(ev.buf); - storageVector.insert(packed.created_at(), packed.id()); - } + while (1) { + bool complete = query.process(txn, [&](const auto &sub, uint64_t levId){ + levIds.push_back(levId); + numEvents++; + }); - LI << "Filter matches " << numEvents << " events"; + if (complete) break; + } - storageVector.seal(); + std::sort(levIds.begin(), levIds.end()); + + for (auto levId : levIds) { + auto ev = lookupEventByLevId(txn, levId); + PackedEventView packed(ev.buf); + storageVector.insert(packed.created_at(), packed.id()); + } + + LI << "Filter matches " << numEvents << " events"; + + storageVector.seal(); + } } @@ -98,8 +115,8 @@ void cmd_sync(const std::vector &subArgs) { auto txn = env.txn_ro(); std::string neMsg; - if (isFullDbQuery) { - negentropy::storage::BTreeLMDB storageBtree(txn, negentropyDbi, 0); + if (treeId) { + negentropy::storage::BTreeLMDB storageBtree(txn, negentropyDbi, *treeId); const auto &f = filterCompiled.filters.at(0); negentropy::storage::SubRange subStorage(storageBtree, negentropy::Bound(f.since), negentropy::Bound(f.until == MAX_U64 ? MAX_U64 : f.until + 1)); @@ -151,8 +168,8 @@ void cmd_sync(const std::vector &subArgs) { try { auto inputMsg = from_hex(msg.at(2).get_string()); - if (isFullDbQuery) { - negentropy::storage::BTreeLMDB storageBtree(txn, negentropyDbi, 0); + if (treeId) { + negentropy::storage::BTreeLMDB storageBtree(txn, negentropyDbi, *treeId); const auto &f = filterCompiled.filters.at(0); negentropy::storage::SubRange subStorage(storageBtree, negentropy::Bound(f.since), negentropy::Bound(f.until == MAX_U64 ? MAX_U64 : f.until + 1)); diff --git a/src/apps/relay/RelayIngester.cpp b/src/apps/relay/RelayIngester.cpp index 23c2961e..21e89840 100644 --- a/src/apps/relay/RelayIngester.cpp +++ b/src/apps/relay/RelayIngester.cpp @@ -134,15 +134,22 @@ void RelayServer::ingesterProcessNegentropy(lmdb::txn &txn, Decompressor &decomp if (arr.at(0) == "NEG-OPEN") { if (arr.get_array().size() < 4) throw herr("negentropy query missing elements"); - NostrFilterGroup filter; auto maxFilterLimit = cfg().relay__negentropy__maxSyncEvents + 1; - filter = std::move(NostrFilterGroup::unwrapped(arr.at(2), maxFilterLimit)); + auto filterJson = arr.at(2); + + NostrFilterGroup filter = NostrFilterGroup::unwrapped(filterJson, maxFilterLimit); Subscription sub(connId, arr[1].get_string(), std::move(filter)); + if (filterJson.is_object()) { + filterJson.get_object().erase("since"); + filterJson.get_object().erase("until"); + } + std::string filterStr = tao::json::to_string(filterJson); + std::string negPayload = from_hex(arr.at(3).get_string()); - tpNegentropy.dispatch(connId, MsgNegentropy{MsgNegentropy::NegOpen{std::move(sub), std::move(negPayload)}}); + tpNegentropy.dispatch(connId, MsgNegentropy{MsgNegentropy::NegOpen{std::move(sub), std::move(filterStr), std::move(negPayload)}}); } else if (arr.at(0) == "NEG-MSG") { std::string negPayload = from_hex(arr.at(2).get_string()); tpNegentropy.dispatch(connId, MsgNegentropy{MsgNegentropy::NegMsg{connId, SubId(arr[1].get_string()), std::move(negPayload)}}); diff --git a/src/apps/relay/RelayNegentropy.cpp b/src/apps/relay/RelayNegentropy.cpp index 12d198e2..ddc8be94 100644 --- a/src/apps/relay/RelayNegentropy.cpp +++ b/src/apps/relay/RelayNegentropy.cpp @@ -17,6 +17,7 @@ struct NegentropyViews { struct StatelessView { Subscription sub; + uint64_t treeId; }; using UserView = std::variant; @@ -42,7 +43,7 @@ struct NegentropyViews { return true; } - bool addStatelessView(uint64_t connId, const SubId &subId, Subscription &&sub) { + bool addStatelessView(uint64_t connId, const SubId &subId, Subscription &&sub, uint64_t treeId) { { auto *existing = findView(connId, subId); if (existing) removeView(connId, subId); @@ -55,7 +56,7 @@ struct NegentropyViews { return false; } - connViews.try_emplace(subId, UserView{ StatelessView{ std::move(sub), } }); + connViews.try_emplace(subId, UserView{ StatelessView{ std::move(sub), treeId, } }); return true; } @@ -188,15 +189,24 @@ void RelayServer::runNegentropy(ThreadPool::Thread &thr) { if (auto msg = std::get_if(&newMsg.msg)) { auto connId = msg->sub.connId; auto subId = msg->sub.subId; + std::optional treeId; - if (msg->sub.filterGroup.isFullDbQuery()) { - negentropy::storage::BTreeLMDB storage(txn, negentropyDbi, 0); + env.foreach_NegentropyFilter(txn, [&](auto &f){ + if (f.filter() == msg->filterStr) { + treeId = f.primaryKeyId; + return false; + } + return true; + }); + + if (treeId) { + negentropy::storage::BTreeLMDB storage(txn, negentropyDbi, *treeId); const auto &f = msg->sub.filterGroup.filters.at(0); negentropy::storage::SubRange subStorage(storage, negentropy::Bound(f.since), negentropy::Bound(f.until == MAX_U64 ? MAX_U64 : f.until + 1)); handleReconcile(connId, subId, subStorage, msg->negPayload); - if (!views.addStatelessView(connId, subId, std::move(msg->sub))) { + if (!views.addStatelessView(connId, subId, std::move(msg->sub), *treeId)) { queries.removeSub(connId, subId); sendNoticeError(connId, std::string("too many concurrent NEG requests")); } @@ -231,7 +241,7 @@ void RelayServer::runNegentropy(ThreadPool::Thread &thr) { } handleReconcile(msg->connId, msg->subId, view->storageVector, msg->negPayload); } else if (auto *view = std::get_if(userView)) { - negentropy::storage::BTreeLMDB storage(txn, negentropyDbi, 0); + negentropy::storage::BTreeLMDB storage(txn, negentropyDbi, view->treeId); const auto &f = view->sub.filterGroup.filters.at(0); negentropy::storage::SubRange subStorage(storage, negentropy::Bound(f.since), negentropy::Bound(f.until == MAX_U64 ? MAX_U64 : f.until + 1)); diff --git a/src/apps/relay/RelayServer.h b/src/apps/relay/RelayServer.h index de9b51e3..0224bd01 100644 --- a/src/apps/relay/RelayServer.h +++ b/src/apps/relay/RelayServer.h @@ -122,6 +122,7 @@ struct MsgReqMonitor : NonCopyable { struct MsgNegentropy : NonCopyable { struct NegOpen { Subscription sub; + std::string filterStr; std::string negPayload; }; diff --git a/src/events.h b/src/events.h index b45593e2..ccd0b2dc 100644 --- a/src/events.h +++ b/src/events.h @@ -1,7 +1,6 @@ #pragma once #include -#include #include "golpe.h"