From 3f5899dafa630e2fa6d3c32895c03d4bd71bb592 Mon Sep 17 00:00:00 2001 From: Doug Hoyte Date: Wed, 4 Sep 2024 14:15:16 -0400 Subject: [PATCH] support multiple concurrent negentropy trees --- golpe.yaml | 6 ++ src/NegentropyFilterCache.h | 59 ++++++++++ src/WriterPipeline.h | 4 +- src/apps/dbutils/cmd_delete.cpp | 7 +- src/apps/dbutils/cmd_negentropy.cpp | 122 +++++++++++++++++++++ src/apps/mesh/cmd_sync.cpp | 69 +++++++----- src/apps/relay/RelayCron.cpp | 9 +- src/apps/relay/RelayIngester.cpp | 13 ++- src/apps/relay/RelayNegentropy.cpp | 22 ++-- src/apps/relay/RelayServer.h | 1 + src/apps/relay/RelayWriter.cpp | 3 +- src/events.cpp | 160 ++++++++++++++-------------- src/events.h | 23 +++- src/onAppStartup.cpp | 3 +- 14 files changed, 367 insertions(+), 134 deletions(-) create mode 100644 src/NegentropyFilterCache.h create mode 100644 src/apps/dbutils/cmd_negentropy.cpp diff --git a/golpe.yaml b/golpe.yaml index 839bb414..92d78392 100644 --- a/golpe.yaml +++ b/golpe.yaml @@ -17,6 +17,7 @@ tables: fields: - name: dbVersion - name: endianness + - name: negentropyModificationCounter ## Meta-info of nostr events, suitable for indexing ## Primary key is auto-incremented, called "levId" for Local EVent ID @@ -76,6 +77,11 @@ tables: - name: dict type: ubytes + NegentropyFilter: + fields: + - name: filter + type: string + tablesRaw: ## Raw nostr event JSON, possibly compressed ## keys are levIds diff --git a/src/NegentropyFilterCache.h b/src/NegentropyFilterCache.h new file mode 100644 index 00000000..39f24b92 --- /dev/null +++ b/src/NegentropyFilterCache.h @@ -0,0 +1,59 @@ +#pragma once + +#include + +#include + +#include "golpe.h" + +#include "filters.h" +#include "PackedEvent.h" + + +struct NegentropyFilterCache { + struct FilterInfo { + NostrFilter f; + uint64_t treeId; + }; + + std::vector filters; + uint64_t modificationCounter = 0; + + void ctx(lmdb::txn &txn, const std::function &)> &cb) { + freshenCache(txn); + + std::vector> storages(filters.size()); + + cb([&](const PackedEventView &ev, bool insert){ + for (size_t i = 0; i < filters.size(); i++) { + const auto &filter = filters[i]; + + if (!filter.f.doesMatch(ev)) continue; + + if (!storages[i]) storages[i] = std::make_unique(txn, negentropyDbi, filter.treeId); + + if (insert) storages[i]->insert(ev.created_at(), ev.id()); + else storages[i]->erase(ev.created_at(), ev.id()); + } + }); + } + + private: + void freshenCache(lmdb::txn &txn) { + uint64_t curr = env.lookup_Meta(txn, 1)->negentropyModificationCounter(); + + if (curr != modificationCounter) { + filters.clear(); + + env.foreach_NegentropyFilter(txn, [&](auto &f){ + filters.emplace_back( + NostrFilter(tao::json::from_string(f.filter()), MAX_U64), + f.primaryKeyId + ); + return true; + }); + + modificationCounter = curr; + } + } +}; diff --git a/src/WriterPipeline.h b/src/WriterPipeline.h index a85108e6..3f6ab7f4 100644 --- a/src/WriterPipeline.h +++ b/src/WriterPipeline.h @@ -85,6 +85,8 @@ struct WriterPipeline { writerThread = std::thread([&]() { setThreadName("Writer"); + NegentropyFilterCache neFilterCache; + while (1) { // Debounce @@ -141,7 +143,7 @@ struct WriterPipeline { if (newEventsToProc.size()) { { auto txn = env.txn_rw(); - writeEvents(txn, newEventsToProc); + writeEvents(txn, neFilterCache, newEventsToProc); txn.commit(); } diff --git a/src/apps/dbutils/cmd_delete.cpp b/src/apps/dbutils/cmd_delete.cpp index 058f285d..cdbdcf70 100644 --- a/src/apps/dbutils/cmd_delete.cpp +++ b/src/apps/dbutils/cmd_delete.cpp @@ -68,13 +68,10 @@ void cmd_delete(const std::vector &subArgs) { { auto txn = env.txn_rw(); - negentropy::storage::BTreeLMDB negentropyStorage(txn, negentropyDbi, 0); + NegentropyFilterCache neFilterCache; - for (auto levId : levIds) { - deleteEvent(txn, levId, negentropyStorage); - } + deleteEvents(txn, neFilterCache, levIds); - negentropyStorage.flush(); txn.commit(); } } diff --git a/src/apps/dbutils/cmd_negentropy.cpp b/src/apps/dbutils/cmd_negentropy.cpp new file mode 100644 index 00000000..7b14955f --- /dev/null +++ b/src/apps/dbutils/cmd_negentropy.cpp @@ -0,0 +1,122 @@ +#include + +#include +#include "golpe.h" + +#include "NegentropyFilterCache.h" +#include "events.h" +#include "DBQuery.h" + + +static const char USAGE[] = +R"( + Usage: + negentropy list + negentropy add + negentropy build +)"; + + +static void increaseModCounter(lmdb::txn &txn) { + auto m = env.lookup_Meta(txn, 1); + if (!m) throw herr("no Meta entry?"); + env.update_Meta(txn, *m, { .negentropyModificationCounter = m->negentropyModificationCounter() + 1 }); +} + + +void cmd_negentropy(const std::vector &subArgs) { + std::map args = docopt::docopt(USAGE, subArgs, true, ""); + + if (args["list"].asBool()) { + auto txn = env.txn_ro(); + + env.foreach_NegentropyFilter(txn, [&](auto &f){ + auto treeId = f.primaryKeyId; + + std::cout << "tree " << treeId << "\n"; + std::cout << " filter: " << f.filter() << "\n"; + + negentropy::storage::BTreeLMDB storage(txn, negentropyDbi, treeId); + auto size = storage.size(); + std::cout << " size: " << size << "\n"; + std::cout << " fingerprint: " << to_hex(storage.fingerprint(0, size).sv()) << "\n"; + + return true; + }); + } else if (args["add"].asBool()) { + std::string filterStr = args[""].asString(); + + tao::json::value filterJson = tao::json::from_string(filterStr); + auto compiledFilter = NostrFilterGroup::unwrapped(filterJson); + + if (compiledFilter.filters.size() == 1 && (compiledFilter.filters[0].since != 0 || compiledFilter.filters[0].until != MAX_U64)) { + throw herr("single filters should not have since/until"); + } + if (compiledFilter.filters.size() == 0) throw herr("filter will never match"); + + filterStr = tao::json::to_string(filterJson); // make canonical + + auto txn = env.txn_rw(); + increaseModCounter(txn); + + env.foreach_NegentropyFilter(txn, [&](auto &f){ + if (f.filter() == filterStr) throw herr("filter already exists as tree: ", f.primaryKeyId); + return true; + }); + + uint64_t treeId = env.insert_NegentropyFilter(txn, filterStr); + txn.commit(); + + std::cout << "created tree " << treeId << "\n"; + std::cout << " to populate, run: strfry negentropy build " << treeId << "\n"; + } else if (args["build"].asBool()) { + uint64_t treeId = args[""].asLong(); + + struct Record { + uint64_t created_at; + uint8_t id[32]; + }; + + std::vector recs; + + auto txn = env.txn_rw(); // FIXME: split this into a read-only phase followed by a write + increaseModCounter(txn); + + // Get filter + + std::string filterStr; + + { + auto view = env.lookup_NegentropyFilter(txn, treeId); + if (!view) throw herr("couldn't find treeId: ", treeId); + filterStr = view->filter(); + } + + // Query all matching events + + DBQuery query(tao::json::from_string(filterStr)); + + while (1) { + bool complete = query.process(txn, [&](const auto &sub, uint64_t levId){ + auto ev = lookupEventByLevId(txn, levId); + auto packed = PackedEventView(ev.buf); + recs.push_back({ packed.created_at(), }); + memcpy(recs.back().id, packed.id().data(), 32); + }); + + if (complete) break; + } + + // Store events in negentropy tree + + negentropy::storage::BTreeLMDB storage(txn, negentropyDbi, treeId); + + for (const auto &r : recs) { + storage.insert(r.created_at, std::string_view((char*)r.id, 32)); + } + + storage.flush(); + + txn.commit(); + } +} diff --git a/src/apps/mesh/cmd_sync.cpp b/src/apps/mesh/cmd_sync.cpp index 9b017cbd..76ececac 100644 --- a/src/apps/mesh/cmd_sync.cpp +++ b/src/apps/mesh/cmd_sync.cpp @@ -50,39 +50,56 @@ void cmd_sync(const std::vector &subArgs) { tao::json::value filterJson = tao::json::from_string(filterStr); auto filterCompiled = NostrFilterGroup::unwrapped(filterJson); - - bool isFullDbQuery = filterCompiled.isFullDbQuery(); + std::optional treeId; negentropy::storage::Vector storageVector; - if (!isFullDbQuery) { - DBQuery query(filterJson); - Decompressor decomp; + { auto txn = env.txn_ro(); - uint64_t numEvents = 0; - std::vector levIds; + auto filterJsonNoTimes = filterJson; + if (filterJsonNoTimes.is_object()) { + filterJsonNoTimes.get_object().erase("since"); + filterJsonNoTimes.get_object().erase("until"); + } + auto filterJsonNoTimesStr = tao::json::to_string(filterJsonNoTimes); - while (1) { - bool complete = query.process(txn, [&](const auto &sub, uint64_t levId){ - levIds.push_back(levId); - numEvents++; - }); + env.foreach_NegentropyFilter(txn, [&](auto &f){ + if (f.filter() == filterJsonNoTimesStr) { + treeId = f.primaryKeyId; + return false; + } + return true; + }); - if (complete) break; - } + if (!treeId) { + DBQuery query(filterJson); + Decompressor decomp; - std::sort(levIds.begin(), levIds.end()); + uint64_t numEvents = 0; + std::vector levIds; - for (auto levId : levIds) { - auto ev = lookupEventByLevId(txn, levId); - PackedEventView packed(ev.buf); - storageVector.insert(packed.created_at(), packed.id()); - } + while (1) { + bool complete = query.process(txn, [&](const auto &sub, uint64_t levId){ + levIds.push_back(levId); + numEvents++; + }); - LI << "Filter matches " << numEvents << " events"; + if (complete) break; + } - storageVector.seal(); + std::sort(levIds.begin(), levIds.end()); + + for (auto levId : levIds) { + auto ev = lookupEventByLevId(txn, levId); + PackedEventView packed(ev.buf); + storageVector.insert(packed.created_at(), packed.id()); + } + + LI << "Filter matches " << numEvents << " events"; + + storageVector.seal(); + } } @@ -98,8 +115,8 @@ void cmd_sync(const std::vector &subArgs) { auto txn = env.txn_ro(); std::string neMsg; - if (isFullDbQuery) { - negentropy::storage::BTreeLMDB storageBtree(txn, negentropyDbi, 0); + if (treeId) { + negentropy::storage::BTreeLMDB storageBtree(txn, negentropyDbi, *treeId); const auto &f = filterCompiled.filters.at(0); negentropy::storage::SubRange subStorage(storageBtree, negentropy::Bound(f.since), negentropy::Bound(f.until == MAX_U64 ? MAX_U64 : f.until + 1)); @@ -151,8 +168,8 @@ void cmd_sync(const std::vector &subArgs) { try { auto inputMsg = from_hex(msg.at(2).get_string()); - if (isFullDbQuery) { - negentropy::storage::BTreeLMDB storageBtree(txn, negentropyDbi, 0); + if (treeId) { + negentropy::storage::BTreeLMDB storageBtree(txn, negentropyDbi, *treeId); const auto &f = filterCompiled.filters.at(0); negentropy::storage::SubRange subStorage(storageBtree, negentropy::Bound(f.since), negentropy::Bound(f.until == MAX_U64 ? MAX_U64 : f.until + 1)); diff --git a/src/apps/relay/RelayCron.cpp b/src/apps/relay/RelayCron.cpp index 8ed5c5f8..c3976429 100644 --- a/src/apps/relay/RelayCron.cpp +++ b/src/apps/relay/RelayCron.cpp @@ -49,15 +49,10 @@ void RelayServer::runCron() { if (expiredLevIds.size() > 0) { auto txn = env.txn_rw(); - negentropy::storage::BTreeLMDB negentropyStorage(txn, negentropyDbi, 0); + NegentropyFilterCache neFilterCache; - uint64_t numDeleted = 0; + uint64_t numDeleted = deleteEvents(txn, neFilterCache, expiredLevIds); - for (auto levId : expiredLevIds) { - if (deleteEvent(txn, levId, negentropyStorage)) numDeleted++; - } - - negentropyStorage.flush(); txn.commit(); if (numDeleted) LI << "Deleted " << numDeleted << " events (ephemeral=" << numEphemeral << " expired=" << numExpired << ")"; diff --git a/src/apps/relay/RelayIngester.cpp b/src/apps/relay/RelayIngester.cpp index 23c2961e..21e89840 100644 --- a/src/apps/relay/RelayIngester.cpp +++ b/src/apps/relay/RelayIngester.cpp @@ -134,15 +134,22 @@ void RelayServer::ingesterProcessNegentropy(lmdb::txn &txn, Decompressor &decomp if (arr.at(0) == "NEG-OPEN") { if (arr.get_array().size() < 4) throw herr("negentropy query missing elements"); - NostrFilterGroup filter; auto maxFilterLimit = cfg().relay__negentropy__maxSyncEvents + 1; - filter = std::move(NostrFilterGroup::unwrapped(arr.at(2), maxFilterLimit)); + auto filterJson = arr.at(2); + + NostrFilterGroup filter = NostrFilterGroup::unwrapped(filterJson, maxFilterLimit); Subscription sub(connId, arr[1].get_string(), std::move(filter)); + if (filterJson.is_object()) { + filterJson.get_object().erase("since"); + filterJson.get_object().erase("until"); + } + std::string filterStr = tao::json::to_string(filterJson); + std::string negPayload = from_hex(arr.at(3).get_string()); - tpNegentropy.dispatch(connId, MsgNegentropy{MsgNegentropy::NegOpen{std::move(sub), std::move(negPayload)}}); + tpNegentropy.dispatch(connId, MsgNegentropy{MsgNegentropy::NegOpen{std::move(sub), std::move(filterStr), std::move(negPayload)}}); } else if (arr.at(0) == "NEG-MSG") { std::string negPayload = from_hex(arr.at(2).get_string()); tpNegentropy.dispatch(connId, MsgNegentropy{MsgNegentropy::NegMsg{connId, SubId(arr[1].get_string()), std::move(negPayload)}}); diff --git a/src/apps/relay/RelayNegentropy.cpp b/src/apps/relay/RelayNegentropy.cpp index 12d198e2..ddc8be94 100644 --- a/src/apps/relay/RelayNegentropy.cpp +++ b/src/apps/relay/RelayNegentropy.cpp @@ -17,6 +17,7 @@ struct NegentropyViews { struct StatelessView { Subscription sub; + uint64_t treeId; }; using UserView = std::variant; @@ -42,7 +43,7 @@ struct NegentropyViews { return true; } - bool addStatelessView(uint64_t connId, const SubId &subId, Subscription &&sub) { + bool addStatelessView(uint64_t connId, const SubId &subId, Subscription &&sub, uint64_t treeId) { { auto *existing = findView(connId, subId); if (existing) removeView(connId, subId); @@ -55,7 +56,7 @@ struct NegentropyViews { return false; } - connViews.try_emplace(subId, UserView{ StatelessView{ std::move(sub), } }); + connViews.try_emplace(subId, UserView{ StatelessView{ std::move(sub), treeId, } }); return true; } @@ -188,15 +189,24 @@ void RelayServer::runNegentropy(ThreadPool::Thread &thr) { if (auto msg = std::get_if(&newMsg.msg)) { auto connId = msg->sub.connId; auto subId = msg->sub.subId; + std::optional treeId; - if (msg->sub.filterGroup.isFullDbQuery()) { - negentropy::storage::BTreeLMDB storage(txn, negentropyDbi, 0); + env.foreach_NegentropyFilter(txn, [&](auto &f){ + if (f.filter() == msg->filterStr) { + treeId = f.primaryKeyId; + return false; + } + return true; + }); + + if (treeId) { + negentropy::storage::BTreeLMDB storage(txn, negentropyDbi, *treeId); const auto &f = msg->sub.filterGroup.filters.at(0); negentropy::storage::SubRange subStorage(storage, negentropy::Bound(f.since), negentropy::Bound(f.until == MAX_U64 ? MAX_U64 : f.until + 1)); handleReconcile(connId, subId, subStorage, msg->negPayload); - if (!views.addStatelessView(connId, subId, std::move(msg->sub))) { + if (!views.addStatelessView(connId, subId, std::move(msg->sub), *treeId)) { queries.removeSub(connId, subId); sendNoticeError(connId, std::string("too many concurrent NEG requests")); } @@ -231,7 +241,7 @@ void RelayServer::runNegentropy(ThreadPool::Thread &thr) { } handleReconcile(msg->connId, msg->subId, view->storageVector, msg->negPayload); } else if (auto *view = std::get_if(userView)) { - negentropy::storage::BTreeLMDB storage(txn, negentropyDbi, 0); + negentropy::storage::BTreeLMDB storage(txn, negentropyDbi, view->treeId); const auto &f = view->sub.filterGroup.filters.at(0); negentropy::storage::SubRange subStorage(storage, negentropy::Bound(f.since), negentropy::Bound(f.until == MAX_U64 ? MAX_U64 : f.until + 1)); diff --git a/src/apps/relay/RelayServer.h b/src/apps/relay/RelayServer.h index de9b51e3..0224bd01 100644 --- a/src/apps/relay/RelayServer.h +++ b/src/apps/relay/RelayServer.h @@ -122,6 +122,7 @@ struct MsgReqMonitor : NonCopyable { struct MsgNegentropy : NonCopyable { struct NegOpen { Subscription sub; + std::string filterStr; std::string negPayload; }; diff --git a/src/apps/relay/RelayWriter.cpp b/src/apps/relay/RelayWriter.cpp index 73394bc6..3ed1e876 100644 --- a/src/apps/relay/RelayWriter.cpp +++ b/src/apps/relay/RelayWriter.cpp @@ -5,6 +5,7 @@ void RelayServer::runWriter(ThreadPool::Thread &thr) { PluginEventSifter writePolicyPlugin; + NegentropyFilterCache neFilterCache; while(1) { auto newMsgs = thr.inbox.pop_all(); @@ -61,7 +62,7 @@ void RelayServer::runWriter(ThreadPool::Thread &thr) { try { auto txn = env.txn_rw(); - writeEvents(txn, newEvents); + writeEvents(txn, neFilterCache, newEvents); txn.commit(); } catch (std::exception &e) { LE << "Error writing " << newEvents.size() << " events: " << e.what(); diff --git a/src/events.cpp b/src/events.cpp index 893ac260..45102672 100644 --- a/src/events.cpp +++ b/src/events.cpp @@ -227,23 +227,17 @@ std::string_view getEventJson(lmdb::txn &txn, Decompressor &decomp, uint64_t lev +// Do not use externally: does not handle negentropy trees -bool deleteEvent(lmdb::txn &txn, uint64_t levId, negentropy::storage::BTreeLMDB &negentropyStorage) { - auto view = env.lookup_Event(txn, levId); - if (!view) return false; - PackedEventView packed(view->buf); - - negentropyStorage.erase(packed.created_at(), packed.id()); - +bool deleteEventBasic(lmdb::txn &txn, uint64_t levId) { bool deleted = env.dbi_EventPayload.del(txn, lmdb::to_sv(levId)); env.delete_Event(txn, levId); - return deleted; } -void writeEvents(lmdb::txn &txn, std::vector &evs, uint64_t logLevel) { +void writeEvents(lmdb::txn &txn, NegentropyFilterCache &neFilterCache, std::vector &evs, uint64_t logLevel) { std::sort(evs.begin(), evs.end(), [](auto &a, auto &b) { auto aC = a.createdAt(); auto bC = b.createdAt(); @@ -254,95 +248,99 @@ void writeEvents(lmdb::txn &txn, std::vector &evs, uint64_t logLev std::vector levIdsToDelete; std::string tmpBuf; - negentropy::storage::BTreeLMDB negentropyStorage(txn, negentropyDbi, 0); + neFilterCache.ctx(txn, [&](const std::function &updateNegentropy){ + for (size_t i = 0; i < evs.size(); i++) { + auto &ev = evs[i]; - for (size_t i = 0; i < evs.size(); i++) { - auto &ev = evs[i]; + PackedEventView packed(ev.packedStr); - PackedEventView packed(ev.packedStr); - - if (lookupEventById(txn, packed.id()) || (i != 0 && ev.id() == evs[i-1].id())) { - ev.status = EventWriteStatus::Duplicate; - continue; - } + if (lookupEventById(txn, packed.id()) || (i != 0 && ev.id() == evs[i-1].id())) { + ev.status = EventWriteStatus::Duplicate; + continue; + } - if (env.lookup_Event__deletion(txn, std::string(packed.id()) + std::string(packed.pubkey()))) { - ev.status = EventWriteStatus::Deleted; - continue; - } + if (env.lookup_Event__deletion(txn, std::string(packed.id()) + std::string(packed.pubkey()))) { + ev.status = EventWriteStatus::Deleted; + continue; + } - { - std::optional replace; + { + std::optional replace; - if (isReplaceableKind(packed.kind()) || isParamReplaceableKind(packed.kind())) { - packed.foreachTag([&](char tagName, std::string_view tagVal){ - if (tagName != 'd') return true; - replace = std::string(tagVal); - return false; - }); - } + if (isReplaceableKind(packed.kind()) || isParamReplaceableKind(packed.kind())) { + packed.foreachTag([&](char tagName, std::string_view tagVal){ + if (tagName != 'd') return true; + replace = std::string(tagVal); + return false; + }); + } - if (replace) { - auto searchStr = std::string(packed.pubkey()) + *replace; - auto searchKey = makeKey_StringUint64(searchStr, packed.kind()); - - env.generic_foreachFull(txn, env.dbi_Event__replace, searchKey, lmdb::to_sv(MAX_U64), [&](auto k, auto v) { - ParsedKey_StringUint64 parsedKey(k); - if (parsedKey.s == searchStr && parsedKey.n == packed.kind()) { - auto otherEv = lookupEventByLevId(txn, lmdb::from_sv(v)); - - auto thisTimestamp = packed.created_at(); - auto otherPacked = PackedEventView(otherEv.buf); - auto otherTimestamp = otherPacked.created_at(); - - if (otherTimestamp < thisTimestamp || - (otherTimestamp == thisTimestamp && packed.id() < otherPacked.id())) { - if (logLevel >= 1) LI << "Deleting event (d-tag). id=" << to_hex(otherPacked.id()); - levIdsToDelete.push_back(otherEv.primaryKeyId); - } else { - ev.status = EventWriteStatus::Replaced; + if (replace) { + auto searchStr = std::string(packed.pubkey()) + *replace; + auto searchKey = makeKey_StringUint64(searchStr, packed.kind()); + + env.generic_foreachFull(txn, env.dbi_Event__replace, searchKey, lmdb::to_sv(MAX_U64), [&](auto k, auto v) { + ParsedKey_StringUint64 parsedKey(k); + if (parsedKey.s == searchStr && parsedKey.n == packed.kind()) { + auto otherEv = lookupEventByLevId(txn, lmdb::from_sv(v)); + + auto thisTimestamp = packed.created_at(); + auto otherPacked = PackedEventView(otherEv.buf); + auto otherTimestamp = otherPacked.created_at(); + + if (otherTimestamp < thisTimestamp || + (otherTimestamp == thisTimestamp && packed.id() < otherPacked.id())) { + if (logLevel >= 1) LI << "Deleting event (d-tag). id=" << to_hex(otherPacked.id()); + levIdsToDelete.push_back(otherEv.primaryKeyId); + } else { + ev.status = EventWriteStatus::Replaced; + } } - } - return false; - }, true); + return false; + }, true); + } } - } - if (packed.kind() == 5) { - // Deletion event, delete all referenced events - packed.foreachTag([&](char tagName, std::string_view tagVal){ - if (tagName == 'e') { - auto otherEv = lookupEventById(txn, tagVal); - if (otherEv && PackedEventView(otherEv->buf).pubkey() == packed.pubkey()) { - if (logLevel >= 1) LI << "Deleting event (kind 5). id=" << to_hex(tagVal); - levIdsToDelete.push_back(otherEv->primaryKeyId); + if (packed.kind() == 5) { + // Deletion event, delete all referenced events + packed.foreachTag([&](char tagName, std::string_view tagVal){ + if (tagName == 'e') { + auto otherEv = lookupEventById(txn, tagVal); + if (otherEv && PackedEventView(otherEv->buf).pubkey() == packed.pubkey()) { + if (logLevel >= 1) LI << "Deleting event (kind 5). id=" << to_hex(tagVal); + levIdsToDelete.push_back(otherEv->primaryKeyId); + } } - } - return true; - }); - } + return true; + }); + } - if (ev.status == EventWriteStatus::Pending) { - ev.levId = env.insert_Event(txn, ev.packedStr); + if (ev.status == EventWriteStatus::Pending) { + ev.levId = env.insert_Event(txn, ev.packedStr); - tmpBuf.clear(); - tmpBuf += '\x00'; - tmpBuf += ev.jsonStr; - env.dbi_EventPayload.put(txn, lmdb::to_sv(ev.levId), tmpBuf); + tmpBuf.clear(); + tmpBuf += '\x00'; + tmpBuf += ev.jsonStr; + env.dbi_EventPayload.put(txn, lmdb::to_sv(ev.levId), tmpBuf); - negentropyStorage.insert(ev.createdAt(), ev.id()); + updateNegentropy(PackedEventView(ev.packedStr), true); - ev.status = EventWriteStatus::Written; + ev.status = EventWriteStatus::Written; - // Deletions happen after event was written to ensure levIds are not reused + // Deletions happen after event was written to ensure levIds are not reused - for (auto levId : levIdsToDelete) deleteEvent(txn, levId, negentropyStorage); - levIdsToDelete.clear(); - } + for (auto levId : levIdsToDelete) { + auto evToDel = env.lookup_Event(txn, levId); + if (!evToDel) continue; // already deleted + updateNegentropy(PackedEventView(evToDel->buf), false); + deleteEventBasic(txn, levId); + } - if (levIdsToDelete.size()) throw herr("unprocessed deletion"); - } + levIdsToDelete.clear(); + } - negentropyStorage.flush(); + if (levIdsToDelete.size()) throw herr("unprocessed deletion"); + } + }); } diff --git a/src/events.h b/src/events.h index 9191397f..ccd0b2dc 100644 --- a/src/events.h +++ b/src/events.h @@ -1,11 +1,11 @@ #pragma once #include -#include #include "golpe.h" #include "PackedEvent.h" +#include "NegentropyFilterCache.h" #include "Decompressor.h" @@ -99,6 +99,7 @@ struct EventToWrite { EventToWrite(std::string packedStr, std::string jsonStr, void *userData = nullptr) : packedStr(packedStr), jsonStr(jsonStr), userData(userData) { } + // FIXME: do we need these methods anymore? std::string_view id() { return PackedEventView(packedStr).id(); } @@ -109,5 +110,21 @@ struct EventToWrite { }; -void writeEvents(lmdb::txn &txn, std::vector &evs, uint64_t logLevel = 1); -bool deleteEvent(lmdb::txn &txn, uint64_t levId, negentropy::storage::BTreeLMDB &negentropyStorage); +void writeEvents(lmdb::txn &txn, NegentropyFilterCache &neFilterCache, std::vector &evs, uint64_t logLevel = 1); +bool deleteEventBasic(lmdb::txn &txn, uint64_t levId); + +template +uint64_t deleteEvents(lmdb::txn &txn, NegentropyFilterCache &neFilterCache, const C &levIds) { + uint64_t numDeleted = 0; + + neFilterCache.ctx(txn, [&](const std::function &updateNegentropy){ + for (auto levId : levIds) { + auto evToDel = env.lookup_Event(txn, levId); + if (!evToDel) continue; // already deleted + updateNegentropy(PackedEventView(evToDel->buf), false); + if (deleteEventBasic(txn, levId)) numDeleted++; + } + }); + + return numDeleted; +} diff --git a/src/onAppStartup.cpp b/src/onAppStartup.cpp index 0a522ae6..30d3cd94 100644 --- a/src/onAppStartup.cpp +++ b/src/onAppStartup.cpp @@ -24,7 +24,8 @@ static void dbCheck(lmdb::txn &txn, const std::string &cmd) { auto s = env.lookup_Meta(txn, 1); if (!s) { - env.insert_Meta(txn, CURR_DB_VERSION, 1); + env.insert_Meta(txn, CURR_DB_VERSION, 1, 1); + env.insert_NegentropyFilter(txn, "{}"); return; }