From 272974e9caede58bf0e671a8e4310033aacfc952 Mon Sep 17 00:00:00 2001 From: Nathaniel Cook Date: Wed, 17 Apr 2024 08:53:33 -0600 Subject: [PATCH 1/2] refactor: update deps cid, multihash, and ipld The rust crate libipld is deprecated and ipld-core is the new _blessed_ ipld implementation in rust. This refactor updates our code to use this new dependency as well as up to date versions of the cid and multihash crates. There are no behavior changes in this PR. --- Cargo.lock | 312 +++--- Cargo.toml | 20 +- beetle/iroh-bitswap/Cargo.toml | 3 +- beetle/iroh-bitswap/src/block.rs | 2 +- beetle/iroh-bitswap/src/error.rs | 2 +- beetle/iroh-bitswap/src/prefix.rs | 2 +- beetle/iroh-bitswap/src/protocol.rs | 4 +- beetle/iroh-car/Cargo.toml | 6 +- beetle/iroh-car/src/error.rs | 4 +- beetle/iroh-car/src/header.rs | 34 +- beetle/iroh-car/src/reader.rs | 13 +- beetle/iroh-util/Cargo.toml | 1 + beetle/iroh-util/src/lib.rs | 6 +- core/Cargo.toml | 2 + core/src/event_id.rs | 7 +- core/src/interest.rs | 6 +- core/src/jws.rs | 3 +- event/Cargo.toml | 7 +- event/src/event.rs | 14 +- kubo-rpc/Cargo.toml | 5 +- kubo-rpc/src/dag.rs | 17 +- kubo-rpc/src/http.rs | 27 +- kubo-rpc/src/ipfs_metrics.rs | 2 +- kubo-rpc/src/lib.rs | 49 +- one/Cargo.toml | 6 +- one/src/events.rs | 2 +- one/src/lib.rs | 21 +- recon/Cargo.toml | 2 +- recon/src/sha256a.rs | 12 +- store/Cargo.toml | 8 +- store/src/sql/event.rs | 2 +- store/src/sqlite/model.rs | 1365 +++++++++++++++++++++++++++ store/src/tests/mod.rs | 15 +- 33 files changed, 1689 insertions(+), 292 deletions(-) create mode 100644 store/src/sqlite/model.rs diff --git a/Cargo.lock b/Cargo.lock index 36be95abe..89cf92f28 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -964,7 +964,7 @@ dependencies = [ "hex", "http", "iri-string", - "libipld 0.14.0", + "libipld", "serde", "serde_with 2.3.3", "siwe", @@ -1113,7 +1113,7 @@ dependencies = [ "anyhow", "async-trait", "base64 0.21.7", - "cid 0.10.1", + "cid 0.11.1", "did-method-key", "did-pkh", "expect-test", @@ -1122,6 +1122,8 @@ dependencies = [ "libp2p-identity", "minicbor", "multibase 0.9.1", + "multihash-codetable", + "multihash-derive 0.9.0", "once_cell", "regex", "serde", @@ -1133,7 +1135,7 @@ dependencies = [ "tokio", "tracing", "tracing-subscriber", - "unsigned-varint 0.7.2", + "unsigned-varint 0.8.0", ] [[package]] @@ -1143,12 +1145,13 @@ dependencies = [ "anyhow", "ceramic-core", "expect-test", - "libipld 0.16.0", - "multihash 0.18.1", + "ipld-core", + "multihash-codetable", "once_cell", "rand 0.8.5", "serde", "serde_ipld_dagcbor", + "serde_ipld_dagjson", "serde_json", "test-log", "tokio", @@ -1167,22 +1170,25 @@ dependencies = [ "ceramic-kubo-rpc-server", "ceramic-metadata", "ceramic-metrics", - "cid 0.10.1", + "cid 0.11.1", "dag-jose", "expect-test", "futures-util", "go-parse-duration", "hex", + "ipld-core", + "ipld-dagpb", "iroh-bitswap", "iroh-rpc-client", "itertools 0.11.0", - "libipld 0.16.0", "libp2p", "libp2p-identity", "mockall", "multiaddr", "prometheus-client", "serde", + "serde_ipld_dagcbor", + "serde_ipld_dagjson", "serde_json", "swagger", "thiserror", @@ -1262,7 +1268,7 @@ dependencies = [ "ceramic-p2p", "ceramic-store", "chrono", - "cid 0.10.1", + "cid 0.11.1", "clap 4.5.4", "dirs", "enum-as-inner", @@ -1280,7 +1286,9 @@ dependencies = [ "minicbor", "multiaddr", "multibase 0.9.1", - "multihash 0.18.1", + "multihash 0.19.1", + "multihash-codetable", + "multihash-derive 0.9.0", "names", "ordered-float", "prometheus-client", @@ -1309,7 +1317,7 @@ dependencies = [ "ceramic-core", "ceramic-metrics", "ceramic-store", - "cid 0.10.1", + "cid 0.11.1", "criterion2", "futures", "futures-util", @@ -1345,21 +1353,23 @@ dependencies = [ "ceramic-api", "ceramic-core", "ceramic-metrics", - "cid 0.10.1", + "cid 0.11.1", "criterion2", "expect-test", "futures", "hex", + "ipld-core", "iroh-bitswap", "iroh-car", "itertools 0.12.1", - "libipld 0.16.0", - "libipld-cbor 0.16.0", - "multihash 0.18.1", + "multibase 0.9.1", + "multihash 0.19.1", + "multihash-codetable", "paste", "prometheus-client", "rand 0.8.5", "recon", + "serde_ipld_dagcbor", "sqlx", "test-log", "thiserror", @@ -1458,16 +1468,16 @@ dependencies = [ [[package]] name = "cid" -version = "0.10.1" +version = "0.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fd94671561e36e4e7de75f753f577edafb0e7c05d6e4547229fdf7938fbcd2c3" +checksum = "3147d8272e8fa0ccd29ce51194dd98f79ddfb8191ba9e3409884e751798acf3a" dependencies = [ "core2", "multibase 0.9.1", - "multihash 0.18.1", + "multihash 0.19.1", "serde", "serde_bytes", - "unsigned-varint 0.7.2", + "unsigned-varint 0.8.0", ] [[package]] @@ -1876,14 +1886,13 @@ dependencies = [ [[package]] name = "dag-jose" -version = "0.1.4" +version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dc227c836e4e02549bdb97bf2715c853ea9f142822c0d79320bd5586d421817b" +checksum = "670dfb345d79d058bd6dd526373dbdcad8a9a4d54b1c96d6489214672331107d" dependencies = [ "anyhow", - "base64 0.21.7", "base64-url", - "libipld 0.16.0", + "ipld-core", "serde", "serde_derive", "serde_ipld_dagcbor", @@ -3605,6 +3614,29 @@ dependencies = [ "winreg", ] +[[package]] +name = "ipld-core" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b4ede82a79e134f179f4b29b5fdb1eb92bd1b38c4dfea394c539051150a21b9b" +dependencies = [ + "cid 0.11.1", + "serde", + "serde_bytes", +] + +[[package]] +name = "ipld-dagpb" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "500af0d645ad3c26b544d2ec77c87fefec9319f5eda567d0993b9db59708994f" +dependencies = [ + "bytes 1.6.0", + "ipld-core", + "quick-protobuf", + "thiserror", +] + [[package]] name = "ipnet" version = "2.9.0" @@ -3643,10 +3675,10 @@ dependencies = [ "async-channel 1.9.0", "async-stream", "async-trait", - "asynchronous-codec 0.6.2", + "asynchronous-codec 0.7.0", "bytes 1.6.0", "ceramic-metrics", - "cid 0.10.1", + "cid 0.11.1", "criterion2", "deadqueue", "derivative", @@ -3655,7 +3687,8 @@ dependencies = [ "keyed_priority_queue", "libp2p", "libp2p-identity", - "multihash 0.18.1", + "multihash-codetable", + "multihash-derive 0.9.0", "num_enum", "prost 0.11.9", "prost-build", @@ -3669,19 +3702,21 @@ dependencies = [ "tokio-util", "tracing", "tracing-subscriber", - "unsigned-varint 0.7.2", + "unsigned-varint 0.8.0", ] [[package]] name = "iroh-car" version = "0.15.0" dependencies = [ - "cid 0.10.1", + "cid 0.11.1", "futures", "integer-encoding", - "libipld 0.16.0", - "libipld-cbor 0.16.0", - "multihash 0.18.1", + "ipld-core", + "multihash 0.19.1", + "multihash-codetable", + "serde", + "serde_ipld_dagcbor", "thiserror", "tokio", ] @@ -3693,7 +3728,7 @@ dependencies = [ "anyhow", "async-stream", "bytes 1.6.0", - "cid 0.10.1", + "cid 0.11.1", "futures", "iroh-rpc-types", "libp2p", @@ -3710,7 +3745,7 @@ version = "0.15.0" dependencies = [ "anyhow", "bytes 1.6.0", - "cid 0.10.1", + "cid 0.11.1", "derive_more", "libp2p", "quic-rpc", @@ -3723,7 +3758,8 @@ dependencies = [ name = "iroh-util" version = "0.15.0" dependencies = [ - "cid 0.10.1", + "cid 0.11.1", + "multihash-codetable", "nix 0.26.4", "temp-env", "testdir", @@ -4175,35 +4211,17 @@ dependencies = [ "async-trait", "cached", "fnv", - "libipld-cbor 0.14.0", - "libipld-cbor-derive 0.14.0", - "libipld-core 0.14.0", - "libipld-json 0.14.0", - "libipld-macro 0.14.0", + "libipld-cbor", + "libipld-cbor-derive", + "libipld-core", + "libipld-json", + "libipld-macro", "log", "multihash 0.16.3", "parking_lot", "thiserror", ] -[[package]] -name = "libipld" -version = "0.16.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f1ccd6b8ffb3afee7081fcaec00e1b099fd1c7ccf35ba5729d88538fcc3b4599" -dependencies = [ - "fnv", - "libipld-cbor 0.16.0", - "libipld-cbor-derive 0.16.0", - "libipld-core 0.16.0", - "libipld-json 0.16.0", - "libipld-macro 0.16.0", - "libipld-pb", - "log", - "multihash 0.18.1", - "thiserror", -] - [[package]] name = "libipld-cbor" version = "0.14.0" @@ -4211,18 +4229,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8dd1ab68c9d26f20c7d0dfea6eecbae8c00359875210001b33ca27d4a02f3d09" dependencies = [ "byteorder", - "libipld-core 0.14.0", - "thiserror", -] - -[[package]] -name = "libipld-cbor" -version = "0.16.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "77d98c9d1747aa5eef1cf099cd648c3fd2d235249f5fed07522aaebc348e423b" -dependencies = [ - "byteorder", - "libipld-core 0.16.0", + "libipld-core", "thiserror", ] @@ -4239,19 +4246,6 @@ dependencies = [ "synstructure", ] -[[package]] -name = "libipld-cbor-derive" -version = "0.16.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7d5ba3a729b72973e456a1812b0afe2e176a376c1836cc1528e9fc98ae8cb838" -dependencies = [ - "proc-macro-crate", - "proc-macro2", - "quote", - "syn 1.0.109", - "synstructure", -] - [[package]] name = "libipld-core" version = "0.14.0" @@ -4267,73 +4261,25 @@ dependencies = [ "thiserror", ] -[[package]] -name = "libipld-core" -version = "0.16.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5acd707e8d8b092e967b2af978ed84709eaded82b75effe6cb6f6cc797ef8158" -dependencies = [ - "anyhow", - "cid 0.10.1", - "core2", - "multibase 0.9.1", - "multihash 0.18.1", - "serde", - "thiserror", -] - [[package]] name = "libipld-json" version = "0.14.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "18aa481a87f084d98473dd9ece253a9569c762b75f6bbba8217d54e48c9d63b3" dependencies = [ - "libipld-core 0.14.0", + "libipld-core", "multihash 0.16.3", "serde", "serde_json", ] -[[package]] -name = "libipld-json" -version = "0.16.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "25856def940047b07b25c33d4e66d248597049ab0202085215dc4dca0487731c" -dependencies = [ - "libipld-core 0.16.0", - "multihash 0.18.1", - "serde", - "serde_json", -] - [[package]] name = "libipld-macro" version = "0.14.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "852c011562ae5059b67c3a917f9f5945af5a68df8e39ede4444fff33274d25e2" dependencies = [ - "libipld-core 0.14.0", -] - -[[package]] -name = "libipld-macro" -version = "0.16.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "71171c54214f866ae6722f3027f81dff0931e600e5a61e6b1b6a49ca0b5ed4ae" -dependencies = [ - "libipld-core 0.16.0", -] - -[[package]] -name = "libipld-pb" -version = "0.16.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c3f2d0f866c4cd5dc9aa8068c429ba478d2882a3a4b70ab56f7e9a0eddf5d16f" -dependencies = [ - "bytes 1.6.0", - "libipld-core 0.16.0", - "quick-protobuf", - "thiserror", + "libipld-core", ] [[package]] @@ -5237,7 +5183,7 @@ dependencies = [ "blake3", "core2", "digest 0.10.7", - "multihash-derive", + "multihash-derive 0.8.0", "serde", "serde-big-array", "sha2 0.10.8", @@ -5247,39 +5193,64 @@ dependencies = [ [[package]] name = "multihash" -version = "0.18.1" +version = "0.19.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "076d548d76a0e2a0d4ab471d0b1c36c577786dfc4471242035d97a12a735c492" +dependencies = [ + "core2", + "serde", + "unsigned-varint 0.7.2", +] + +[[package]] +name = "multihash-codetable" +version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cfd8a792c1694c6da4f68db0a9d707c72bd260994da179e6030a5dcee00bb815" +checksum = "c11bbdf904c8be009e82ff968c4dab84388cbafc45dfaff61936eca4bf40f1b5" dependencies = [ "blake2b_simd 1.0.2", "blake2s_simd", "blake3", "core2", "digest 0.10.7", - "multihash-derive", - "serde", - "serde-big-array", + "multihash-derive 0.9.0", + "ripemd", + "sha1", "sha2 0.10.8", "sha3", - "unsigned-varint 0.7.2", + "strobe-rs", ] [[package]] -name = "multihash" -version = "0.19.1" +name = "multihash-derive" +version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "076d548d76a0e2a0d4ab471d0b1c36c577786dfc4471242035d97a12a735c492" +checksum = "fc076939022111618a5026d3be019fd8b366e76314538ff9a1b59ffbcbf98bcd" dependencies = [ - "core2", - "serde", - "unsigned-varint 0.7.2", + "proc-macro-crate", + "proc-macro-error", + "proc-macro2", + "quote", + "syn 1.0.109", + "synstructure", ] [[package]] name = "multihash-derive" -version = "0.8.0" +version = "0.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fc076939022111618a5026d3be019fd8b366e76314538ff9a1b59ffbcbf98bcd" +checksum = "890e72cb7396cb99ed98c1246a97b243cc16394470d94e0bc8b0c2c11d84290e" +dependencies = [ + "core2", + "multihash 0.19.1", + "multihash-derive-impl", +] + +[[package]] +name = "multihash-derive-impl" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d38685e08adb338659871ecfc6ee47ba9b22dcc8abcf6975d379cc49145c3040" dependencies = [ "proc-macro-crate", "proc-macro-error", @@ -6686,7 +6657,7 @@ dependencies = [ "libp2p", "libp2p-identity", "libp2p-swarm-test", - "multihash 0.18.1", + "multihash-codetable", "pin-project", "pretty", "prometheus-client", @@ -6873,6 +6844,15 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "ripemd" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bd124222d17ad93a644ed9d011a40f4fb64aa54275c08cc216524a9ea82fb09f" +dependencies = [ + "digest 0.10.7", +] + [[package]] name = "ripemd160" version = "0.9.1" @@ -7252,16 +7232,27 @@ dependencies = [ [[package]] name = "serde_ipld_dagcbor" -version = "0.3.0" +version = "0.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2433e94ffb5977e67afbd75733abd6ada1c4f497125894a8c89b3fdc5fd6a058" +checksum = "bb1eedfc9e48051a90d79e189dea2303b7c0df82f03e154ae85bf2ceea957972" dependencies = [ "cbor4ii", - "cid 0.8.6", + "ipld-core", "scopeguard", "serde", ] +[[package]] +name = "serde_ipld_dagjson" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3359b47ba7f4a306ef5984665e10539e212e97217afa489437d533208eecda36" +dependencies = [ + "ipld-core", + "serde", + "serde_json", +] + [[package]] name = "serde_jcs" version = "0.1.0" @@ -8171,7 +8162,7 @@ checksum = "0982f62c7860922026a9d9edc6c604de79693ee4c5c6bd65be11e2ff66b1df09" dependencies = [ "base64 0.12.3", "chrono", - "libipld 0.14.0", + "libipld", "serde", "serde_json", "serde_with 1.14.0", @@ -8197,7 +8188,7 @@ dependencies = [ "chrono", "flate2", "iref", - "libipld 0.14.0", + "libipld", "multihash 0.16.3", "reqwest", "serde", @@ -8271,6 +8262,19 @@ dependencies = [ "unicode-normalization", ] +[[package]] +name = "strobe-rs" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fabb238a1cccccfa4c4fb703670c0d157e1256c1ba695abf1b93bd2bb14bab2d" +dependencies = [ + "bitflags 1.3.2", + "byteorder", + "keccak", + "subtle", + "zeroize", +] + [[package]] name = "strsim" version = "0.8.0" @@ -9119,6 +9123,10 @@ name = "unsigned-varint" version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "eb066959b24b5196ae73cb057f45598450d2c5f71460e98c49b738086eff9c06" +dependencies = [ + "asynchronous-codec 0.7.0", + "bytes 1.6.0", +] [[package]] name = "untrusted" diff --git a/Cargo.toml b/Cargo.toml index 52417bed6..b364c845d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -38,7 +38,7 @@ async-channel = "1.7.1" async-recursion = "1" async-stream = "0.3" async-trait = "0.1" -asynchronous-codec = "0.6" +asynchronous-codec = "0.7" axum = "0.6" backoff = "0.4" base64 = "0.20.0" @@ -50,13 +50,14 @@ bytesize = "1.1" ceramic-api = { path = "./api" } ceramic-api-server = { path = "./api-server" } ceramic-core = { path = "./core" } +ceramic-event = { path = "./event" } ceramic-kubo-rpc-server = { path = "./kubo-rpc-server" } ceramic-metadata = { path = "./metadata" } ceramic-metrics = { path = "./metrics" } ceramic-one = { path = "./one" } ceramic-p2p = { path = "./p2p" } ceramic-store = { path = "./store" } -cid = { version = "0.10", features = ["serde-codec"] } +cid = { version = "0.11", features = ["serde-codec"] } clap = { version = "4", features = ["derive", "env"] } clap_mangen = "0.2.2" console = { version = "0.15", default-features = false } @@ -64,7 +65,7 @@ console-subscriber = "0.2" criterion2 = "0.7.0" crossterm = "0.25" ctrlc = "3.2.2" -dag-jose = "0.1.3" +dag-jose = "0.2" deadqueue = "0.2.3" derivative = "2.2" derive_more = "0.99.17" @@ -88,6 +89,8 @@ hyper = { version = "0.14", features = ["full"] } ignore = "0.4.18" indicatif = "0.17.1" integer-encoding = "3.0" +ipld-core = "0.4" +ipld-dagpb = "0.2" iroh-bitswap = { path = "./beetle/iroh-bitswap" } iroh-car = { path = "./beetle/iroh-car" } iroh-p2p = { version = "0.2.0", path = "./beetle/iroh-p2p" } @@ -96,8 +99,6 @@ iroh-rpc-types = { path = "./beetle/iroh-rpc-types" } iroh-util = { path = "./beetle/iroh-util" } keyed_priority_queue = "0.4.1" lazy_static = "1.4" -libipld = "0.16" -libipld-cbor = "0.16" libp2p = { version = "0.53", default-features = false } libp2p-identity = { version = "0.2", features = ["peerid", "ed25519"] } lru = "0.10" @@ -108,7 +109,9 @@ minicbor = { version = "0.19.1", features = ["std", "derive", "half"] } mockall = "0.11.4" multiaddr = "0.18" multibase = "0.9" -multihash = { version = "0.18", features = ["identity"] } +multihash = { version = "0.19" } +multihash-codetable = { version = "0.1", features = ["sha2", "sha3"] } +multihash-derive = { version = "0.9" } names = { version = "0.14.0", default-features = false } nix = "0.26" num_enum = "0.5.7" @@ -138,7 +141,8 @@ serde = { version = "1.0", features = ["derive"] } serde-error = "0.1.2" serde_bytes = "0.11" serde_cbor = "0.11.2" -serde_ipld_dagcbor = "0.3" +serde_ipld_dagcbor = "0.6" +serde_ipld_dagjson = "0.2" serde_json = "1.0.87" serde_qs = "0.10.1" serde_with = "2.1" @@ -189,7 +193,7 @@ tracing-subscriber = { version = "0.3", features = [ ] } tracing-test = { version = "0.2" } trust-dns-resolver = "0.22.0" -unsigned-varint = "0.7" +unsigned-varint = "0.8" url = "2.2.2" urlencoding = "2.1" uuid = { version = "1.8.0", features = ["v4"] } diff --git a/beetle/iroh-bitswap/Cargo.toml b/beetle/iroh-bitswap/Cargo.toml index e92ee0274..616187c0a 100644 --- a/beetle/iroh-bitswap/Cargo.toml +++ b/beetle/iroh-bitswap/Cargo.toml @@ -28,7 +28,8 @@ futures.workspace = true iroh-util.workspace = true keyed_priority_queue.workspace = true libp2p = { workspace = true, features = ["ping"] } -multihash.workspace = true +multihash-codetable.workspace = true +multihash-derive.workspace = true num_enum.workspace = true prost.workspace = true rand.workspace = true diff --git a/beetle/iroh-bitswap/src/block.rs b/beetle/iroh-bitswap/src/block.rs index bc5ded00b..904938237 100644 --- a/beetle/iroh-bitswap/src/block.rs +++ b/beetle/iroh-bitswap/src/block.rs @@ -2,7 +2,7 @@ use std::fmt::Debug; use bytes::Bytes; use cid::Cid; -use multihash::{Code, MultihashDigest}; +use multihash_codetable::{Code, MultihashDigest}; /// A wrapper around bytes with their `Cid`. #[derive(Clone, Eq, PartialEq, PartialOrd, Ord)] diff --git a/beetle/iroh-bitswap/src/error.rs b/beetle/iroh-bitswap/src/error.rs index f95080e32..f00717460 100644 --- a/beetle/iroh-bitswap/src/error.rs +++ b/beetle/iroh-bitswap/src/error.rs @@ -11,7 +11,7 @@ pub enum Error { #[error("Error while parsing cid: {0}")] Cid(#[from] cid::Error), #[error("Error while parsing multihash: {0}")] - Multihash(#[from] multihash::Error), + Multihash(#[from] multihash_derive::UnsupportedCode), #[error("Invalid block presence type {0}")] InvalidBlockPresenceType(#[from] num_enum::TryFromPrimitiveError), #[error("Invalid want type {0}")] diff --git a/beetle/iroh-bitswap/src/prefix.rs b/beetle/iroh-bitswap/src/prefix.rs index ff69e9a80..4cd8fee24 100644 --- a/beetle/iroh-bitswap/src/prefix.rs +++ b/beetle/iroh-bitswap/src/prefix.rs @@ -1,7 +1,7 @@ use std::convert::TryFrom; use cid::{self, Cid, Version}; -use multihash::{Code, MultihashDigest}; +use multihash_codetable::{Code, MultihashDigest}; use unsigned_varint::{decode as varint_decode, encode as varint_encode}; use crate::error::Error; diff --git a/beetle/iroh-bitswap/src/protocol.rs b/beetle/iroh-bitswap/src/protocol.rs index 9eec63236..8428612df 100644 --- a/beetle/iroh-bitswap/src/protocol.rs +++ b/beetle/iroh-bitswap/src/protocol.rs @@ -153,10 +153,10 @@ impl BitswapCodec { } impl Encoder for BitswapCodec { - type Item = BitswapMessage; + type Item<'a> = BitswapMessage; type Error = BitswapHandlerError; - fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> { + fn encode(&mut self, item: Self::Item<'_>, dst: &mut BytesMut) -> Result<(), Self::Error> { tracing::trace!("sending message protocol: {:?}\n{:?}", self.protocol, item); let message = match self.protocol { diff --git a/beetle/iroh-car/Cargo.toml b/beetle/iroh-car/Cargo.toml index cd0916a4c..93ab198d4 100644 --- a/beetle/iroh-car/Cargo.toml +++ b/beetle/iroh-car/Cargo.toml @@ -12,13 +12,14 @@ publish = false cid.workspace = true futures.workspace = true integer-encoding = { workspace = true, features = ["tokio_async"] } -libipld.workspace = true -libipld-cbor.workspace = true +serde_ipld_dagcbor.workspace = true +serde.workspace = true thiserror.workspace = true tokio = { workspace = true, features = ["io-util"] } [dev-dependencies] multihash.workspace = true +multihash-codetable.workspace = true tokio = { workspace = true, features = [ "macros", "sync", @@ -26,5 +27,6 @@ tokio = { workspace = true, features = [ "fs", "io-util", ] } +ipld-core.workspace = true [features] diff --git a/beetle/iroh-car/src/error.rs b/beetle/iroh-car/src/error.rs index f228b9ef6..d5af1838d 100644 --- a/beetle/iroh-car/src/error.rs +++ b/beetle/iroh-car/src/error.rs @@ -1,3 +1,5 @@ +use std::collections::TryReserveError; + use thiserror::Error; /// Car utility error @@ -10,7 +12,7 @@ pub enum Error { #[error("Io error: {0}")] Io(#[from] std::io::Error), #[error("Cbor encoding error: {0}")] - Cbor(#[from] libipld::error::Error), + Cbor(#[from] serde_ipld_dagcbor::error::EncodeError), #[error("ld read too large {0}")] LdReadTooLarge(usize), } diff --git a/beetle/iroh-car/src/header.rs b/beetle/iroh-car/src/header.rs index 7717f5179..33fe77e5a 100644 --- a/beetle/iroh-car/src/header.rs +++ b/beetle/iroh-car/src/header.rs @@ -1,6 +1,5 @@ use cid::Cid; -use libipld::codec::Codec; -use libipld_cbor::DagCborCodec; +use serde::{Deserialize, Serialize}; use crate::error::Error; @@ -17,9 +16,8 @@ impl CarHeader { } pub fn decode(buffer: &[u8]) -> Result { - let header: CarHeaderV1 = DagCborCodec - .decode(buffer) - .map_err(|e| Error::Parsing(e.to_string()))?; + let header: CarHeaderV1 = + serde_ipld_dagcbor::from_slice(buffer).map_err(|e| Error::Parsing(e.to_string()))?; if header.roots.is_empty() { return Err(Error::Parsing("empty CAR file".to_owned())); @@ -37,7 +35,7 @@ impl CarHeader { pub fn encode(&self) -> Result, Error> { match self { CarHeader::V1(ref header) => { - let res = DagCborCodec.encode(header)?; + let res = serde_ipld_dagcbor::to_vec(header)?; Ok(res) } } @@ -57,11 +55,9 @@ impl CarHeader { } /// CAR file header version 1. -#[derive(Debug, Clone, Default, libipld::DagCbor, PartialEq, Eq)] +#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)] pub struct CarHeaderV1 { - #[ipld] pub roots: Vec, - #[ipld] pub version: u64, } @@ -80,25 +76,23 @@ impl From> for CarHeaderV1 { #[cfg(test)] mod tests { - use libipld::codec::{Decode, Encode}; - use libipld_cbor::DagCborCodec; - use multihash::MultihashDigest; + use ipld_core::{codec::Codec, ipld::Ipld}; + use multihash_codetable::{Code, MultihashDigest}; + use serde_ipld_dagcbor::codec::DagCborCodec; use super::*; #[test] fn symmetric_header_v1() { - let digest = multihash::Code::Blake2b256.digest(b"test"); - let cid = Cid::new_v1(DagCborCodec.into(), digest); + let digest = Code::Sha2_256.digest(b"test"); + let cid = Cid::new_v1(>::CODE, digest); let header = CarHeaderV1::from(vec![cid]); - let mut bytes = Vec::new(); - header.encode(DagCborCodec, &mut bytes).unwrap(); + let bytes = serde_ipld_dagcbor::to_vec(&header).unwrap(); - assert_eq!( - CarHeaderV1::decode(DagCborCodec, &mut std::io::Cursor::new(&bytes)).unwrap(), - header - ); + let decoded: CarHeaderV1 = serde_ipld_dagcbor::from_slice(&bytes).unwrap(); + + assert_eq!(decoded, header); } } diff --git a/beetle/iroh-car/src/reader.rs b/beetle/iroh-car/src/reader.rs index 700baba1b..907c347b1 100644 --- a/beetle/iroh-car/src/reader.rs +++ b/beetle/iroh-car/src/reader.rs @@ -64,8 +64,9 @@ mod tests { use cid::Cid; use futures::TryStreamExt; - use libipld_cbor::DagCborCodec; - use multihash::MultihashDigest; + use ipld_core::{codec::Codec, ipld::Ipld}; + use multihash_codetable::{Code, MultihashDigest}; + use serde_ipld_dagcbor::codec::DagCborCodec; use crate::{header::CarHeaderV1, writer::CarWriter}; @@ -73,11 +74,11 @@ mod tests { #[tokio::test] async fn car_write_read() { - let digest_test = multihash::Code::Blake2b256.digest(b"test"); - let cid_test = Cid::new_v1(DagCborCodec.into(), digest_test); + let digest_test = Code::Sha2_256.digest(b"test"); + let cid_test = Cid::new_v1(>::CODE, digest_test); - let digest_foo = multihash::Code::Blake2b256.digest(b"foo"); - let cid_foo = Cid::new_v1(DagCborCodec.into(), digest_foo); + let digest_foo = Code::Sha2_256.digest(b"foo"); + let cid_foo = Cid::new_v1(>::CODE, digest_foo); let header = CarHeader::V1(CarHeaderV1::from(vec![cid_foo])); diff --git a/beetle/iroh-util/Cargo.toml b/beetle/iroh-util/Cargo.toml index d6debb0a4..d2064624a 100644 --- a/beetle/iroh-util/Cargo.toml +++ b/beetle/iroh-util/Cargo.toml @@ -10,6 +10,7 @@ publish = false [dependencies] cid.workspace = true +multihash-codetable.workspace = true [dev-dependencies] temp-env.workspace = true diff --git a/beetle/iroh-util/src/lib.rs b/beetle/iroh-util/src/lib.rs index 454b652ee..c98be835a 100644 --- a/beetle/iroh-util/src/lib.rs +++ b/beetle/iroh-util/src/lib.rs @@ -1,7 +1,5 @@ -use cid::{ - multihash::{Code, MultihashDigest}, - Cid, -}; +use cid::Cid; +use multihash_codetable::{Code, MultihashDigest}; /// Verifies that the provided bytes hash to the given multihash. pub fn verify_hash(cid: &Cid, bytes: &[u8]) -> Option { diff --git a/core/Cargo.toml b/core/Cargo.toml index 7aba33249..6ed17f48e 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -19,6 +19,8 @@ int-enum = "0.5.0" libp2p-identity.workspace = true minicbor.workspace = true multibase.workspace = true +multihash-codetable.workspace = true +multihash-derive.workspace = true once_cell.workspace = true regex.workspace = true serde.workspace = true diff --git a/core/src/event_id.rs b/core/src/event_id.rs index e99682296..7ecf33941 100644 --- a/core/src/event_id.rs +++ b/core/src/event_id.rs @@ -15,10 +15,9 @@ //! hash bytes, 32 bytes #![warn(missing_docs, missing_debug_implementations, clippy::all)] -use cid::{ - multihash::{Hasher, Sha2_256}, - Cid, -}; +use cid::Cid; +use multihash_codetable::Sha2_256; +use multihash_derive::Hasher; use serde::{Deserialize, Serialize}; use std::{ cmp::{Eq, Ord}, diff --git a/core/src/interest.rs b/core/src/interest.rs index 9cbe75959..0b8264646 100644 --- a/core/src/interest.rs +++ b/core/src/interest.rs @@ -1,8 +1,9 @@ //! Interest is a structure that declares a range of data in which a node is interested. use anyhow::Result; -use cid::multihash::{Hasher, Sha2_256}; use minicbor::{Decoder, Encoder}; use multibase::Base; +use multihash_codetable::Sha2_256; +use multihash_derive::Hasher; use serde::{Deserialize, Serialize}; use std::{fmt::Display, str::FromStr}; @@ -200,6 +201,7 @@ impl Builder { // // TODO: Emperically measure performance of this size. const INITIAL_VEC_CAPACITY: usize = 256; + let mut hasher = Sha2_256::default(); hasher.update(sort_key.as_bytes()); // sha256 is 32 bytes safe to unwrap to [u8; 32] @@ -208,7 +210,7 @@ impl Builder { encoder // Encode last 8 bytes of the sort_key hash .bytes(&hash[hash.len() - 8..]) - .expect("sort_key should cbor encode"); + .expect("sep_key should cbor encode"); Builder { state: WithSortKey { encoder }, } diff --git a/core/src/jws.rs b/core/src/jws.rs index c1f27da4e..be2684b6d 100644 --- a/core/src/jws.rs +++ b/core/src/jws.rs @@ -70,8 +70,9 @@ impl Jws { mod tests { use super::*; use crate::{DagCborEncoded, DidDocument, Jwk, JwkSigner}; - use cid::multihash::{Code, MultihashDigest}; use cid::Cid; + use multihash_codetable::Code; + use multihash_codetable::MultihashDigest; use ssi::did::DIDMethod; use ssi::did::Source; use ssi::jwk::Params; diff --git a/event/Cargo.toml b/event/Cargo.toml index 6d4501eb3..6390f9fca 100644 --- a/event/Cargo.toml +++ b/event/Cargo.toml @@ -12,17 +12,18 @@ publish = false [dependencies] anyhow.workspace = true ceramic-core = { path = "../core" } -multihash.workspace = true +ipld-core.workspace = true +multihash-codetable.workspace = true once_cell.workspace = true rand = "0.8.5" serde.workspace = true +serde_json.workspace = true [dev-dependencies] serde_ipld_dagcbor.workspace = true -serde_json.workspace = true +serde_ipld_dagjson.workspace = true tokio = { workspace = true, features = ["fs", "macros", "rt"] } tracing.workspace = true tracing-subscriber.workspace = true test-log.workspace = true -libipld.workspace = true expect-test.workspace = true diff --git a/event/src/event.rs b/event/src/event.rs index e72949a4f..5f65149f0 100644 --- a/event/src/event.rs +++ b/event/src/event.rs @@ -1,7 +1,7 @@ use crate::args::UnsignedEvent; use anyhow::Result; use ceramic_core::{Cid, DagCborEncoded, Jws, Signer}; -use multihash::{Code, MultihashDigest}; +use multihash_codetable::{Code, MultihashDigest}; use serde::Serialize; // https://github.com/multiformats/multicodec/blob/master/table.csv @@ -43,7 +43,9 @@ mod tests { use ceramic_core::JwkSigner; use expect_test::expect; - use libipld::{cbor::DagCborCodec, json::DagJsonCodec, prelude::Codec, Ipld}; + use ipld_core::{codec::Codec, ipld::Ipld}; + use serde_ipld_dagcbor::codec::DagCborCodec; + use serde_ipld_dagjson::codec::DagJsonCodec; use std::str::FromStr; use test_log::test; @@ -92,8 +94,8 @@ mod tests { .unwrap(); let args = EventArgs::new_with_parent(&signer, &model); let evt = args.init().unwrap(); - let data: Ipld = DagCborCodec.decode(evt.encoded.as_ref()).unwrap(); - let encoded = DagJsonCodec.encode(&data).unwrap(); + let data: Ipld = DagCborCodec::decode_from_slice(evt.encoded.as_ref()).unwrap(); + let encoded = DagJsonCodec::encode_to_vec(&data).unwrap(); expect![[r#" { "header": { @@ -132,8 +134,8 @@ mod tests { let protected: serde_json::Value = serde_json::from_slice(protected.as_ref()).unwrap(); assert!(protected.as_object().unwrap().contains_key("kid")); - let post_data: Ipld = DagCborCodec.decode(evt.linked_block.as_ref()).unwrap(); - let encoded = DagJsonCodec.encode(&post_data).unwrap(); + let post_data: Ipld = serde_ipld_dagcbor::from_slice(evt.linked_block.as_ref()).unwrap(); + let encoded = serde_ipld_dagjson::to_vec(&post_data).unwrap(); let post_data: serde_json::Value = serde_json::from_slice(&encoded).unwrap(); let post_data = post_data.as_object().unwrap().get("data").unwrap(); assert_eq!(post_data, &data); diff --git a/kubo-rpc/Cargo.toml b/kubo-rpc/Cargo.toml index 5a53121b4..841ef98fd 100644 --- a/kubo-rpc/Cargo.toml +++ b/kubo-rpc/Cargo.toml @@ -25,13 +25,15 @@ hex.workspace = true iroh-rpc-client.workspace = true iroh-bitswap.workspace = true itertools = "0.11.0" -libipld.workspace = true +ipld-core.workspace = true libp2p-identity.workspace = true libp2p.workspace = true multiaddr.workspace = true prometheus-client.workspace = true serde = { workspace = true, optional = true } serde_json = { workspace = true, optional = true } +serde_ipld_dagcbor.workspace = true +serde_ipld_dagjson.workspace = true swagger.workspace = true thiserror.workspace = true tokio.workspace = true @@ -43,3 +45,4 @@ expect-test.workspace = true tracing-test.workspace = true mockall.workspace = true async-stream.workspace = true +ipld-dagpb.workspace = true diff --git a/kubo-rpc/src/dag.rs b/kubo-rpc/src/dag.rs index 0fc3668e7..1d7c5c397 100644 --- a/kubo-rpc/src/dag.rs +++ b/kubo-rpc/src/dag.rs @@ -1,24 +1,21 @@ //! Implements the dag endpoints. -use libipld::{ - prelude::{Codec, Encode}, - Ipld, -}; +use ipld_core::{codec::Codec, ipld::Ipld}; use crate::{error::Error, IpfsDep}; use crate::{Cid, IpfsPath}; /// Get a DAG node from IPFS. -#[tracing::instrument(skip(client, output_codec))] -pub async fn get(client: T, ipfs_path: &IpfsPath, output_codec: C) -> Result, Error> +#[tracing::instrument(skip(client))] +pub async fn get(client: T, ipfs_path: &IpfsPath) -> Result, Error> where T: IpfsDep, - C: Codec, - Ipld: Encode, + C: Codec, + >::Error: std::error::Error + Send + Sync + 'static, { let (_cid, data) = client.get(ipfs_path).await?; - let mut bytes: Vec = Vec::new(); - data.encode(output_codec, &mut bytes) + let bytes = C::encode_to_vec(&data) + .map_err(anyhow::Error::from) .map_err(Error::Internal)?; Ok(bytes) } diff --git a/kubo-rpc/src/http.rs b/kubo-rpc/src/http.rs index 9533a0788..57baef053 100644 --- a/kubo-rpc/src/http.rs +++ b/kubo-rpc/src/http.rs @@ -21,10 +21,11 @@ use ceramic_kubo_rpc_server::{ }; use cid::Cid; use go_parse_duration::parse_duration; -use libipld::{cbor::DagCborCodec, json::DagJsonCodec}; use libp2p::{Multiaddr, PeerId}; use multiaddr::Protocol; use serde::Serialize; +use serde_ipld_dagcbor::codec::DagCborCodec; +use serde_ipld_dagjson::codec::DagJsonCodec; use swagger::{ApiError, ByteArray}; use tracing::{instrument, Level}; @@ -163,12 +164,12 @@ where let ipfs_path = try_or_bad_request!(IpfsPath::from_str(&arg), DagGetPostResponse); match output_codec.unwrap_or(Codecs::DagJson) { Codecs::DagJson => Ok(DagGetPostResponse::Success(ByteArray( - dag::get(self.ipfs.clone(), &ipfs_path, DagJsonCodec) + dag::get::<_, DagJsonCodec>(self.ipfs.clone(), &ipfs_path) .await .map_err(to_api_error)?, ))), Codecs::DagCbor => Ok(DagGetPostResponse::Success(ByteArray( - dag::get(self.ipfs.clone(), &ipfs_path, DagCborCodec) + dag::get::<_, DagCborCodec>(self.ipfs.clone(), &ipfs_path) .await .map_err(to_api_error)?, ))), @@ -364,14 +365,15 @@ struct ErrorJson<'a> { #[cfg(test)] mod tests { - use std::{collections::HashMap, io::Cursor}; + use std::collections::HashMap; use super::*; use crate::{tests::MockIpfsDepTest, PeerInfo}; use bytes::Bytes; use ceramic_metadata::Version; - use libipld::{pb::DagPbCodec, prelude::Decode, Ipld}; + use ipld_core::codec::Codec; + use ipld_dagpb::DagPbCodec; use mockall::predicate; use tracing_test::traced_test; @@ -642,9 +644,8 @@ mod tests { async fn dag_get_json() { // Test data from: // https://ipld.io/specs/codecs/dag-pb/fixtures/cross-codec/#dagpb_data_some - let data = Ipld::decode( - DagPbCodec, - &mut Cursor::new(hex::decode("0a050001020304").expect("should be valid hex data")), + let data = DagPbCodec::decode_from_slice( + &hex::decode("0a050001020304").expect("should be valid hex data"), ) .expect("should be valid dag-pb data"); @@ -685,9 +686,8 @@ mod tests { async fn dag_get_cbor() { // Test data from: // https://ipld.io/specs/codecs/dag-pb/fixtures/cross-codec/#dagpb_data_some - let data = Ipld::decode( - DagPbCodec, - &mut Cursor::new(hex::decode("0a050001020304").expect("should be valid hex data")), + let data = DagPbCodec::decode_from_slice( + &hex::decode("0a050001020304").expect("should be valid hex data"), ) .expect("should be valid dag-pb data"); @@ -1004,9 +1004,8 @@ mod tests { async fn pin_add() { // Test data from: // https://ipld.io/specs/codecs/dag-pb/fixtures/cross-codec/#dagpb_data_some - let data = Ipld::decode( - DagPbCodec, - &mut Cursor::new(hex::decode("0a050001020304").expect("should be valid hex data")), + let data = DagPbCodec::decode_from_slice( + &hex::decode("0a050001020304").expect("should be valid hex data"), ) .expect("should be valid dag-pb data"); diff --git a/kubo-rpc/src/ipfs_metrics.rs b/kubo-rpc/src/ipfs_metrics.rs index 67a216200..bf4832c11 100644 --- a/kubo-rpc/src/ipfs_metrics.rs +++ b/kubo-rpc/src/ipfs_metrics.rs @@ -6,7 +6,7 @@ use ceramic_metadata::Version; use ceramic_metrics::Recorder; use cid::Cid; use futures_util::Future; -use libipld::Ipld; +use ipld_core::ipld::Ipld; use libp2p_identity::PeerId; use multiaddr::Multiaddr; use prometheus_client::{ diff --git a/kubo-rpc/src/lib.rs b/kubo-rpc/src/lib.rs index 7be92c380..3e17b66fe 100644 --- a/kubo-rpc/src/lib.rs +++ b/kubo-rpc/src/lib.rs @@ -8,7 +8,6 @@ use std::{ collections::HashMap, fmt::{self, Display, Formatter}, - io::Cursor, path::PathBuf, }; use std::{str::FromStr, sync::Arc}; @@ -16,15 +15,17 @@ use std::{str::FromStr, sync::Arc}; use anyhow::{anyhow, Context}; use async_trait::async_trait; use dag_jose::DagJoseCodec; +use ipld_core::{codec::Codec, ipld::IpldIndex}; use iroh_rpc_client::P2pClient; -use libipld::{cbor::DagCborCodec, json::DagJsonCodec, prelude::Decode}; +use serde_ipld_dagcbor::codec::DagCborCodec; +use serde_ipld_dagjson::codec::DagJsonCodec; use tracing::instrument; // Pub use any types we export as part of an trait or struct pub use bytes::Bytes; pub use ceramic_metadata::Version; pub use cid::Cid; -pub use libipld::Ipld; +pub use ipld_core::ipld::Ipld; pub use libp2p::Multiaddr; pub use libp2p_identity::PeerId; @@ -294,19 +295,27 @@ where let parts = path.tail(); for part in parts.iter().filter(|s| !s.is_empty()) { // Parse part as an integer and if that fails parse as a string into an index. - let index: libipld::ipld::IpldIndex = if let Ok(i) = part.parse::() { + let index: IpldIndex = if let Ok(i) = part.parse::() { i.into() } else { part.clone().into() }; current.path = current.path.join(part); - current.data = current.data.take(index).map_err(|_| { - Error::Invalid(anyhow!( + current.data = current + .data + .take(index) + .map_err(|_| { + Error::Invalid(anyhow!( + "IPLD resolve error: Couldn't find part {} in path '{}'", + part, + parts.join("/") + )) + })? + .ok_or(Error::Invalid(anyhow!( "IPLD resolve error: Couldn't find part {} in path '{}'", part, parts.join("/") - )) - })?; + )))?; // Check if we have found a link and follow it if let Ipld::Link(c) = current.data { @@ -332,20 +341,16 @@ where #[instrument(skip(self))] async fn load_cid(&self, cid: Cid) -> Result { let bytes = self.load_cid_bytes(cid).await?; - let data = match cid.codec() { - //TODO(nathanielc): create constants for these - // dag-cbor - 0x71 => { - Ipld::decode(DagCborCodec, &mut Cursor::new(&bytes)).map_err(Error::Internal)? - } - // dag-json - 0x0129 => { - Ipld::decode(DagJsonCodec, &mut Cursor::new(&bytes)).map_err(Error::Internal)? - } - // dag-jose - 0x85 => { - Ipld::decode(DagJoseCodec, &mut Cursor::new(&bytes)).map_err(Error::Internal)? - } + let data: Ipld = match cid.codec() { + >::CODE => DagCborCodec::decode_from_slice(&bytes) + .map_err(anyhow::Error::from) + .map_err(Error::Internal)?, + >::CODE => DagJsonCodec::decode_from_slice(&bytes) + .map_err(anyhow::Error::from) + .map_err(Error::Internal)?, + >::CODE => DagJoseCodec::decode_from_slice(&bytes) + .map_err(anyhow::Error::from) + .map_err(Error::Internal)?, _ => return Err(Error::Invalid(anyhow!("unsupported codec {}", cid.codec()))), }; let path = PathBuf::new(); diff --git a/one/Cargo.toml b/one/Cargo.toml index 77004881d..50763a22f 100644 --- a/one/Cargo.toml +++ b/one/Cargo.toml @@ -22,20 +22,23 @@ chrono = "0.4.31" cid.workspace = true clap.workspace = true dirs = "5.0.1" +enum-as-inner = "0.6.0" futures.workspace = true git-version = "0.3" glob = "0.3.1" hex.workspace = true home = "0.5" hyper.workspace = true +iroh-bitswap.workspace = true iroh-rpc-client.workspace = true iroh-rpc-types.workspace = true -iroh-bitswap.workspace = true libp2p.workspace = true minicbor.workspace = true multiaddr.workspace = true multibase.workspace = true multihash.workspace = true +multihash-codetable.workspace = true +multihash-derive.workspace = true names.workspace = true ordered-float = "4.1.1" prometheus-client.workspace = true @@ -49,7 +52,6 @@ tokio-metrics = { version = "0.3.1", features = ["rt"] } tokio-prometheus-client = "0.1" tokio.workspace = true tracing.workspace = true -enum-as-inner = "0.6.0" [target.'cfg(not(target_env = "msvc"))'.dependencies] tikv-jemallocator.workspace = true diff --git a/one/src/events.rs b/one/src/events.rs index a93d92573..80ebdbb06 100644 --- a/one/src/events.rs +++ b/one/src/events.rs @@ -413,7 +413,7 @@ mod tests { use super::*; use crate::ethereum_rpc::EthRpc; use ceramic_store::SqlitePool; - use multihash::{Code, MultihashDigest}; + use multihash_codetable::{Code, MultihashDigest}; struct HardCodedEthRpc {} impl EthRpc for HardCodedEthRpc { diff --git a/one/src/lib.rs b/one/src/lib.rs index 9acde61de..aa18aca2c 100644 --- a/one/src/lib.rs +++ b/one/src/lib.rs @@ -20,7 +20,9 @@ use ceramic_p2p::{load_identity, DiskStorage, Keychain, Libp2pConfig}; use clap::{Args, Parser, Subcommand, ValueEnum}; use futures::StreamExt; use multibase::Base; -use multihash::{Code, Hasher, Multihash, MultihashDigest}; +use multihash::Multihash; +use multihash_codetable::Code; +use multihash_derive::Hasher; use recon::{FullInterests, Recon, ReconInterestProvider, Server, Sha256a}; use signal_hook::consts::signal::*; use signal_hook_tokio::Signals; @@ -626,17 +628,20 @@ impl Info { } } -async fn current_exe_hash() -> Result { +async fn current_exe_hash() -> Result> { if cfg!(debug_assertions) { // Debug builds can be 1GB+, so do we not want to spend the time to hash them. // Return a fake hash. - let mut hash = multihash::Identity256::default(); - // Spells debug when base64 url encoded with some leading padding. - hash.update(&[00, 117, 230, 238, 130]); - Ok(Code::Identity.wrap(hash.finalize())?) + Ok(Multihash::<32>::wrap( + // Identity hash code + 0, + // Spells debug when base64 url encoded with some leading padding. + &[00, 117, 230, 238, 130], + ) + .expect("hardcoded digest should fit in 32 bytes")) } else { let exe_path = env::current_exe()?; - let mut hasher = multihash::Sha2_256::default(); + let mut hasher = multihash_codetable::Sha2_256::default(); let mut f = tokio::fs::File::open(exe_path).await?; let mut buffer = vec![0; 4096]; @@ -648,6 +653,6 @@ async fn current_exe_hash() -> Result { hasher.update(&buffer[..bytes_read]); } let hash = hasher.finalize(); - Ok(Code::Sha2_256.wrap(hash)?) + Ok(Multihash::<32>::wrap(Code::Sha2_256.into(), hash)?) } } diff --git a/recon/Cargo.toml b/recon/Cargo.toml index 1776a97f7..e1acc3516 100644 --- a/recon/Cargo.toml +++ b/recon/Cargo.toml @@ -18,7 +18,7 @@ futures.workspace = true hex = "0.4.3" libp2p-identity.workspace = true libp2p.workspace = true -multihash.workspace = true +multihash-codetable.workspace = true prometheus-client.workspace = true serde.workspace = true serde_json.workspace = true diff --git a/recon/src/sha256a.rs b/recon/src/sha256a.rs index 0bb1cfddf..b9995bfbc 100644 --- a/recon/src/sha256a.rs +++ b/recon/src/sha256a.rs @@ -1,5 +1,5 @@ use ::serde::{de, Deserialize, Deserializer, Serialize, Serializer}; -use multihash::{Hasher, Sha2_256}; +use multihash_codetable::{Code, MultihashDigest}; use serde::de::Visitor; use std::fmt::{self, Debug}; use std::{convert::From, fmt::Formatter}; @@ -134,11 +134,13 @@ impl<'de> Deserialize<'de> for Sha256a { impl AssociativeHash for Sha256a { fn digest(key: &K) -> Self { - let mut hasher = Sha2_256::default(); - hasher.update(key.as_bytes()); // sha256 is 32 bytes safe to unwrap to [u8; 32] - let bytes: &[u8; 32] = hasher.finalize().try_into().unwrap(); - bytes.into() + let bytes: [u8; 32] = Code::Sha2_256 + .digest(key.as_bytes()) + .digest() + .try_into() + .unwrap(); + (&bytes).into() } fn as_bytes(&self) -> [u8; 32] { diff --git a/store/Cargo.toml b/store/Cargo.toml index b6ed96c82..0522d00d2 100644 --- a/store/Cargo.toml +++ b/store/Cargo.toml @@ -22,6 +22,7 @@ iroh-bitswap.workspace = true iroh-car.workspace = true itertools = "0.12.0" multihash.workspace = true +multihash-codetable.workspace = true prometheus-client.workspace = true thiserror.workspace = true recon.workspace = true @@ -32,10 +33,11 @@ tracing.workspace = true [dev-dependencies] criterion2 = { workspace = true, features = ["async", "async_tokio"] } expect-test.workspace = true -libipld.workspace = true -libipld-cbor.workspace = true -rand.workspace = true +ipld-core.workspace = true +multibase.workspace = true paste = "1.0" +rand.workspace = true +serde_ipld_dagcbor.workspace = true test-log.workspace = true tmpdir.workspace = true tokio.workspace = true diff --git a/store/src/sql/event.rs b/store/src/sql/event.rs index bf6ca48bd..b03d8e897 100644 --- a/store/src/sql/event.rs +++ b/store/src/sql/event.rs @@ -8,7 +8,7 @@ use ceramic_core::{event_id::InvalidEventId, EventId, RangeOpen}; use cid::Cid; use iroh_bitswap::Block; use iroh_car::CarReader; -use multihash::{Code, Multihash, MultihashDigest}; +use multihash_codetable::{Code, Multihash, MultihashDigest}; use recon::{ AssociativeHash, Error as ReconError, HashCount, InsertResult, Key, ReconItem, Result as ReconResult, Sha256a, diff --git a/store/src/sqlite/model.rs b/store/src/sqlite/model.rs new file mode 100644 index 000000000..128bf765b --- /dev/null +++ b/store/src/sqlite/model.rs @@ -0,0 +1,1365 @@ +use std::{collections::BTreeSet, marker::PhantomData}; + +use anyhow::{anyhow, Result}; +use async_trait::async_trait; +use bytes::Bytes; +use ceramic_core::{EventId, RangeOpen}; +use cid::Cid; +use iroh_bitswap::Block; +use iroh_car::{CarHeader, CarReader, CarWriter}; +use itertools::{process_results, Itertools}; +use multihash_codetable::{Code, MultihashDigest}; +use recon::{AssociativeHash, HashCount, InsertResult, Key, ReconItem, Sha256a}; +use sqlx::Row; +use tracing::instrument; + +use crate::{DbTx, SqlitePool}; + +/// Unified implementation of [`recon::Store`] and [`iroh_bitswap::Store`] that can expose the +/// individual blocks from the CAR files directly. +#[derive(Clone, Debug)] +pub struct ModelStore +where + H: AssociativeHash, +{ + pool: SqlitePool, + hash: PhantomData, +} + +#[derive(Debug)] +struct BlockRow { + cid: Cid, + root: bool, + bytes: Vec, +} + +type EventIdError = >>::Error; + +impl ModelStore +where + H: AssociativeHash + std::convert::From<[u32; 8]>, +{ + /// Create an instance of the store initializing any neccessary tables. + pub async fn new(pool: SqlitePool) -> Result { + let store = ModelStore { + pool, + hash: PhantomData, + }; + store.create_table_if_not_exists().await?; + Ok(store) + } + + /// Initialize the recon table. + async fn create_table_if_not_exists(&self) -> Result<()> { + const CREATE_STORE_KEY_TABLE: &str = "CREATE TABLE IF NOT EXISTS model_key ( + key BLOB, -- network_id sort_value controller StreamID height event_cid + ahash_0 INTEGER, -- the ahash is decomposed as [u32; 8] + ahash_1 INTEGER, + ahash_2 INTEGER, + ahash_3 INTEGER, + ahash_4 INTEGER, + ahash_5 INTEGER, + ahash_6 INTEGER, + ahash_7 INTEGER, + value_retrieved BOOL, -- indicates if we have the value + PRIMARY KEY(key) + )"; + const CREATE_VALUE_RETRIEVED_INDEX: &str = + "CREATE INDEX IF NOT EXISTS idx_key_value_retrieved + ON model_key (key, value_retrieved)"; + + const CREATE_MODEL_BLOCK_TABLE: &str = "CREATE TABLE IF NOT EXISTS model_block ( + key BLOB, -- network_id sort_value controller StreamID height event_cid + cid BLOB, -- the cid of the Block as bytes no 0x00 prefix + idx INTEGER, -- the index of the block in the CAR file + root BOOL, -- when true the block is a root in the CAR file + bytes BLOB, -- the Block + PRIMARY KEY(key, cid) + )"; + // TODO should this include idx or not? + const CREATE_BLOCK_ORDER_INDEX: &str = "CREATE INDEX IF NOT EXISTS idx_model_block_cid + ON model_block (cid)"; + + let mut tx = self.pool.tx().await?; + sqlx::query(CREATE_STORE_KEY_TABLE) + .execute(&mut *tx) + .await?; + sqlx::query(CREATE_VALUE_RETRIEVED_INDEX) + .execute(&mut *tx) + .await?; + sqlx::query(CREATE_MODEL_BLOCK_TABLE) + .execute(&mut *tx) + .await?; + sqlx::query(CREATE_BLOCK_ORDER_INDEX) + .execute(&mut *tx) + .await?; + + tx.commit().await?; + Ok(()) + } + + async fn insert_item(&self, item: &ReconItem<'_, EventId>) -> Result<(bool, bool)> { + let mut tx = self.pool.writer().begin().await?; + let (new_key, new_val) = self.insert_item_int(item, &mut tx).await?; + tx.commit().await?; + Ok((new_key, new_val)) + } + + async fn range_with_values_int( + &self, + left_fencepost: &EventId, + right_fencepost: &EventId, + offset: usize, + limit: usize, + ) -> Result)> + Send + 'static>> { + let query = sqlx::query( + " + SELECT + model_block.key, model_block.cid, model_block.root, model_block.bytes + FROM ( + SELECT + key + FROM model_key + WHERE + key > ? AND key < ? + AND value_retrieved = true + ORDER BY + key ASC + LIMIT + ? + OFFSET + ? + ) key + JOIN + model_block + ON + key.key = model_block.key + ORDER BY model_block.key, model_block.idx + ;", + ); + let all_blocks = query + .bind(left_fencepost.as_bytes()) + .bind(right_fencepost.as_bytes()) + .bind(limit as i64) + .bind(offset as i64) + .fetch_all(self.pool.reader()) + .await?; + + // Consume all block into groups of blocks by their key. + let all_blocks: Vec<(EventId, Vec)> = process_results( + all_blocks.into_iter().map( + |row| -> Result<(EventId, cid::CidGeneric<64>, bool, Vec), anyhow::Error> { + let event_id = EventId::try_from(row.get::, _>(0))?; + let cid = Cid::read_bytes(row.get::<&[u8], _>(1))?; + Ok((event_id, cid, row.get(2), row.get(3))) + }, + ), + |blocks| { + blocks + .group_by(|(key, _, _, _)| key.clone()) + .into_iter() + .map(|(key, group)| { + ( + key, + group + .map(|(_key, cid, root, bytes)| BlockRow { cid, root, bytes }) + .collect::>(), + ) + }) + .collect() + }, + )?; + + let mut values: Vec<(EventId, Vec)> = Vec::new(); + for (key, blocks) in all_blocks { + if let Some(value) = self.rebuild_car(blocks).await? { + values.push((key.clone(), value)); + } + } + Ok(Box::new(values.into_iter())) + } + + async fn value_for_key_int(&self, key: &EventId) -> Result>> { + let query = sqlx::query( + " + SELECT + cid, root, bytes + FROM model_block + WHERE + key=? + ORDER BY idx + ;", + ); + let blocks = query + .bind(key.as_bytes()) + .fetch_all(self.pool.reader()) + .await?; + self.rebuild_car( + blocks + .into_iter() + .map(|row| { + Cid::read_bytes(row.get::<&[u8], _>(0)) + .map_err(anyhow::Error::from) + .map(|cid| BlockRow { + cid, + root: row.get(1), + bytes: row.get(2), + }) + }) + .collect::>>()?, + ) + .await + } + + /// returns (new_key, new_val) tuple + async fn insert_item_int( + &self, + item: &ReconItem<'_, EventId>, + conn: &mut DbTx<'_>, + ) -> Result<(bool, bool)> { + // we insert the value first as it's possible we already have the key and can skip that step + // as it happens in a transaction, we'll roll back the value insert if the key insert fails and try again + if let Some(val) = item.value { + // Check if the value_retrieved flag is set and report if the key already exists. + let (key_exists, value_exists) = self.is_value_retrieved_int(item.key, conn).await?; + + if !value_exists { + self.update_value_retrieved_int(item.key, conn).await?; + + // Put each block from the car file + let mut reader = CarReader::new(val).await?; + let roots: BTreeSet = reader.header().roots().iter().cloned().collect(); + let mut idx = 0; + while let Some((cid, data)) = reader.next_block().await? { + self.insert_block_int( + item.key, + idx, + roots.contains(&cid), + cid, + &data.into(), + conn, + ) + .await?; + idx += 1; + } + } + + if key_exists { + return Ok((false, true)); + } + } + let new_key = self + .insert_key_int(item.key, item.value.is_some(), conn) + .await?; + Ok((new_key, item.value.is_some())) + } + + // Read the value_retrieved column. Report if the key exists and the value exists + async fn is_value_retrieved_int( + &self, + key: &EventId, + conn: &mut DbTx<'_>, + ) -> Result<(bool, bool)> { + let query = sqlx::query("SELECT value_retrieved FROM model_key WHERE key = ?"); + let row = query + .bind(key.as_bytes()) + .fetch_optional(&mut **conn) + .await?; + if let Some(row) = row { + Ok((true, row.get(0))) + } else { + Ok((false, false)) + } + } + + // set value_retrieved to true and return if the key already exists + async fn update_value_retrieved_int(&self, key: &EventId, conn: &mut DbTx<'_>) -> Result<()> { + let update = sqlx::query("UPDATE model_key SET value_retrieved = true WHERE key = ?"); + update.bind(key.as_bytes()).execute(&mut **conn).await?; + Ok(()) + } + + // store a block in the db. + async fn insert_block_int( + &self, + key: &EventId, + idx: i32, + root: bool, + cid: Cid, + blob: &Bytes, + conn: &mut DbTx<'_>, + ) -> Result<()> { + let hash = match cid.hash().code() { + 0x12 => Code::Sha2_256.digest(blob), + 0x1b => Code::Keccak256.digest(blob), + 0x11 => return Err(anyhow!("Sha1 not supported")), + _ => { + return Err(anyhow!( + "multihash type {:#x} not Sha2_256, Keccak256", + cid.hash().code(), + )) + } + }; + if cid.hash().to_bytes() != hash.to_bytes() { + return Err(anyhow!( + "cid did not match blob {} != {}", + hex::encode(cid.hash().to_bytes()), + hex::encode(hash.to_bytes()) + )); + } + + sqlx::query("INSERT INTO model_block (key, idx, root, cid, bytes) VALUES (?, ?, ?, ?, ?)") + .bind(key.as_bytes()) + .bind(idx) + .bind(root) + .bind(cid.to_bytes()) + .bind(blob.to_vec()) + .execute(&mut **conn) + .await?; + Ok(()) + } + + async fn insert_key_int( + &self, + key: &EventId, + has_value: bool, + conn: &mut DbTx<'_>, + ) -> Result { + let key_insert = sqlx::query( + "INSERT INTO model_key ( + key, + ahash_0, ahash_1, ahash_2, ahash_3, + ahash_4, ahash_5, ahash_6, ahash_7, + value_retrieved + ) VALUES ( + ?, + ?, ?, ?, ?, + ?, ?, ?, ?, + ? + );", + ); + + let hash = Sha256a::digest(key); + let resp = key_insert + .bind(key.as_bytes()) + .bind(hash.as_u32s()[0]) + .bind(hash.as_u32s()[1]) + .bind(hash.as_u32s()[2]) + .bind(hash.as_u32s()[3]) + .bind(hash.as_u32s()[4]) + .bind(hash.as_u32s()[5]) + .bind(hash.as_u32s()[6]) + .bind(hash.as_u32s()[7]) + .bind(has_value) + .execute(&mut **conn) + .await; + match resp { + std::result::Result::Ok(_rows) => Ok(true), + Err(sqlx::Error::Database(err)) => { + if err.is_unique_violation() { + Ok(false) + } else { + Err(sqlx::Error::Database(err).into()) + } + } + Err(err) => Err(err.into()), + } + } + + async fn rebuild_car(&self, blocks: Vec) -> Result>> { + if blocks.is_empty() { + return Ok(None); + } + + let size = blocks.iter().fold(0, |sum, row| sum + row.bytes.len()); + let roots: Vec = blocks + .iter() + .filter(|row| row.root) + .map(|row| row.cid) + .collect(); + // Reconstruct the car file + // TODO figure out a better capacity calculation + let mut car = Vec::with_capacity(size + 100 * blocks.len()); + let mut writer = CarWriter::new(CarHeader::V1(roots.into()), &mut car); + for BlockRow { + cid, + bytes, + root: _, + } in blocks + { + writer.write(cid, bytes).await?; + } + writer.finish().await?; + Ok(Some(car)) + } + + /// Returns all the keys found after the given row_id. + /// Uses the rowid of the value (block) table and makes sure to flatten keys + /// when there are multiple blocks for a single key. This relies on the fact that + /// we insert the blocks in order inside a transaction and that we don't delete, which + /// means that the all the entries for a key will be contiguous. + pub async fn new_keys_since_value_rowid( + &self, + row_id: i64, + limit: i64, + ) -> Result<(i64, Vec)> { + let query = sqlx::query( + "WITH entries AS ( + SELECT key, MAX(rowid) as max_rowid + FROM model_block + WHERE rowid >= ? -- we return rowid+1 so we must match it next search + GROUP BY key + ORDER BY rowid + LIMIT ? + ) + SELECT + key, + (SELECT MAX(max_rowid) + 1 FROM entries) as new_highwater_mark + from entries;", + ); + let rows = query + .bind(row_id) + .bind(limit) + .fetch_all(self.pool.reader()) + .await?; + // every row has the same new_highwater_mark value + let row_id: i64 = rows + .first() + .and_then(|r| r.get("new_highwater_mark")) + .unwrap_or(row_id); + let rows = rows + .into_iter() + .map(|row| { + let bytes: Vec = row.get(0); + EventId::try_from(bytes) + }) + .collect::, EventIdError>>()?; + + Ok((row_id, rows)) + } +} + +#[async_trait] +impl recon::Store for ModelStore +where + H: AssociativeHash, +{ + type Key = EventId; + type Hash = H; + + /// Returns true if the key was new. The value is always updated if included + async fn insert(&mut self, item: ReconItem<'_, Self::Key>) -> Result { + let (new, _new_val) = self.insert_item(&item).await?; + Ok(new) + } + + /// Insert new keys into the key space. + /// Returns true if a key did not previously exist. + async fn insert_many<'a, I>(&mut self, items: I) -> Result + where + I: ExactSizeIterator> + Send + Sync, + { + match items.len() { + 0 => Ok(InsertResult::new(vec![], 0)), + _ => { + let mut results = vec![false; items.len()]; + let mut new_val_cnt = 0; + let mut tx = self.pool.writer().begin().await?; + + for (idx, item) in items.enumerate() { + let (new_key, new_val) = self.insert_item_int(&item, &mut tx).await?; + results[idx] = new_key; + if new_val { + new_val_cnt += 1; + } + } + tx.commit().await?; + Ok(InsertResult::new(results, new_val_cnt)) + } + } + } + + /// return the hash and count for a range + #[instrument(skip(self))] + async fn hash_range( + &mut self, + left_fencepost: &Self::Key, + right_fencepost: &Self::Key, + ) -> Result> { + if left_fencepost >= right_fencepost { + return Ok(HashCount::new(H::identity(), 0)); + } + + let query = sqlx::query( + "SELECT + TOTAL(ahash_0) & 0xFFFFFFFF, TOTAL(ahash_1) & 0xFFFFFFFF, + TOTAL(ahash_2) & 0xFFFFFFFF, TOTAL(ahash_3) & 0xFFFFFFFF, + TOTAL(ahash_4) & 0xFFFFFFFF, TOTAL(ahash_5) & 0xFFFFFFFF, + TOTAL(ahash_6) & 0xFFFFFFFF, TOTAL(ahash_7) & 0xFFFFFFFF, + COUNT(1) + FROM model_key WHERE key > ? AND key < ?;", + ); + let row = query + .bind(left_fencepost.as_bytes()) + .bind(right_fencepost.as_bytes()) + .fetch_one(self.pool.reader()) + .await?; + let bytes: [u32; 8] = [ + row.get(0), + row.get(1), + row.get(2), + row.get(3), + row.get(4), + row.get(5), + row.get(6), + row.get(7), + ]; + let count: i64 = row.get(8); // sql int type is signed + let count: u64 = count + .try_into() + .expect("COUNT(1) should never return a negative number"); + Ok(HashCount::new(H::from(bytes), count)) + } + + #[instrument(skip(self))] + async fn range( + &mut self, + left_fencepost: &Self::Key, + right_fencepost: &Self::Key, + offset: usize, + limit: usize, + ) -> Result + Send + 'static>> { + let query = sqlx::query( + " + SELECT + key + FROM + model_key + WHERE + key > ? AND key < ? + ORDER BY + key ASC + LIMIT + ? + OFFSET + ?; + ", + ); + let rows = query + .bind(left_fencepost.as_bytes()) + .bind(right_fencepost.as_bytes()) + .bind(limit as i64) + .bind(offset as i64) + .fetch_all(self.pool.reader()) + .await?; + let rows = rows + .into_iter() + .map(|row| { + let bytes: Vec = row.get(0); + EventId::try_from(bytes) + }) + .collect::, EventIdError>>()?; + Ok(Box::new(rows.into_iter())) + } + #[instrument(skip(self))] + async fn range_with_values( + &mut self, + left_fencepost: &Self::Key, + right_fencepost: &Self::Key, + offset: usize, + limit: usize, + ) -> Result)> + Send + 'static>> { + self.range_with_values_int(left_fencepost, right_fencepost, offset, limit) + .await + } + + /// Return the number of keys within the range. + #[instrument(skip(self))] + async fn count( + &mut self, + left_fencepost: &Self::Key, + right_fencepost: &Self::Key, + ) -> Result { + let query = sqlx::query( + " + SELECT + count(key) + FROM + model_key + WHERE + key > ? AND key < ? + ;", + ); + let row = query + .bind(left_fencepost.as_bytes()) + .bind(right_fencepost.as_bytes()) + .fetch_one(self.pool.reader()) + .await?; + Ok(row.get::<'_, i64, _>(0) as usize) + } + + /// Return the first key within the range. + #[instrument(skip(self))] + async fn first( + &mut self, + left_fencepost: &Self::Key, + right_fencepost: &Self::Key, + ) -> Result> { + let query = sqlx::query( + " + SELECT + key + FROM + model_key + WHERE + key > ? AND key < ? + ORDER BY + key ASC + LIMIT + 1 + ; ", + ); + let rows = query + .bind(left_fencepost.as_bytes()) + .bind(right_fencepost.as_bytes()) + .fetch_all(self.pool.reader()) + .await?; + Ok(rows + .first() + .map(|row| { + let bytes: Vec = row.get(0); + EventId::try_from(bytes) + }) + .transpose()?) + } + + #[instrument(skip(self))] + async fn last( + &mut self, + left_fencepost: &Self::Key, + right_fencepost: &Self::Key, + ) -> Result> { + let query = sqlx::query( + " + SELECT + key + FROM + model_key + WHERE + key > ? AND key < ? + ORDER BY + key DESC + LIMIT + 1 + ;", + ); + let rows = query + .bind(left_fencepost.as_bytes()) + .bind(right_fencepost.as_bytes()) + .fetch_all(self.pool.reader()) + .await?; + Ok(rows + .first() + .map(|row| { + let bytes: Vec = row.get(0); + EventId::try_from(bytes) + }) + .transpose()?) + } + + #[instrument(skip(self))] + async fn first_and_last( + &mut self, + left_fencepost: &Self::Key, + right_fencepost: &Self::Key, + ) -> Result> { + let query = sqlx::query( + " + SELECT first.key, last.key + FROM + ( + SELECT key + FROM model_key + WHERE + key > ? AND key < ? + ORDER BY key ASC + LIMIT 1 + ) as first + JOIN + ( + SELECT key + FROM model_key + WHERE + key > ? AND key < ? + ORDER BY key DESC + LIMIT 1 + ) as last + ;", + ); + let rows = query + .bind(left_fencepost.as_bytes()) + .bind(right_fencepost.as_bytes()) + .bind(left_fencepost.as_bytes()) + .bind(right_fencepost.as_bytes()) + .fetch_all(self.pool.reader()) + .await?; + if let Some(row) = rows.first() { + let first = EventId::try_from(row.get::, _>(0))?; + let last = EventId::try_from(row.get::, _>(1))?; + Ok(Some((first, last))) + } else { + Ok(None) + } + } + + #[instrument(skip(self))] + async fn value_for_key(&mut self, key: &Self::Key) -> Result>> { + self.value_for_key_int(key).await + } + + #[instrument(skip(self))] + async fn keys_with_missing_values( + &mut self, + range: RangeOpen, + ) -> Result> { + if range.start >= range.end { + return Ok(vec![]); + }; + let query = sqlx::query( + " + SELECT key + FROM model_key + WHERE + key > ? + AND key < ? + AND value_retrieved = false + ;", + ); + let row = query + .bind(range.start.as_bytes()) + .bind(range.end.as_bytes()) + .fetch_all(self.pool.reader()) + .await?; + Ok(row + .into_iter() + .map(|row| EventId::try_from(row.get::, _>(0))) + .collect::, EventIdError>>()?) + } +} + +#[async_trait] +impl iroh_bitswap::Store for ModelStore { + async fn get_size(&self, cid: &Cid) -> Result { + Ok( + sqlx::query("SELECT length(bytes) FROM model_block WHERE cid = ?;") + .bind(cid.to_bytes()) + .fetch_one(self.pool.reader()) + .await? + .get::<'_, i64, _>(0) as usize, + ) + } + + async fn get(&self, cid: &Cid) -> Result { + Ok(Block::new( + sqlx::query("SELECT bytes FROM model_block WHERE cid = ?;") + .bind(cid.to_bytes()) + .fetch_one(self.pool.reader()) + .await? + .get::<'_, Vec, _>(0) + .into(), + cid.to_owned(), + )) + } + + async fn has(&self, cid: &Cid) -> Result { + Ok( + sqlx::query("SELECT count(1) FROM model_block WHERE cid = ?;") + .bind(cid.to_bytes()) + .fetch_one(self.pool.reader()) + .await? + .get::<'_, i64, _>(0) + > 0, + ) + } +} + +/// We intentionally expose the store to the API, separately from the recon::Store trait. +/// This allows us better control over the API functionality, particularly CRUD, that are related +/// to recon, but not explicitly part of the recon protocol. Eventually, it might be nice to reduce the +/// scope of the recon::Store trait (or remove the &mut self requirement), but for now we have both. +/// Anything that implements `ceramic_api::AccessModelStore` should also implement `recon::Store`. +/// This guarantees that regardless of entry point (api or recon), the data is stored and retrieved in the same way. +#[async_trait::async_trait] +impl ceramic_api::AccessModelStore for ModelStore { + type Key = EventId; + type Hash = Sha256a; + + async fn insert(&self, key: Self::Key, value: Option>) -> Result<(bool, bool)> { + self.insert_item(&ReconItem::new(&key, value.as_deref())) + .await + } + + async fn range_with_values( + &self, + start: Self::Key, + end: Self::Key, + offset: usize, + limit: usize, + ) -> Result)>> { + let res = self + .range_with_values_int(&start, &end, offset, limit) + .await?; + Ok(res.collect()) + } + async fn value_for_key(&self, key: Self::Key) -> Result>> { + self.value_for_key_int(&key).await + } + + async fn keys_since_highwater_mark( + &self, + highwater: i64, + limit: i64, + ) -> anyhow::Result<(i64, Vec)> { + self.new_keys_since_value_rowid(highwater, limit).await + } +} + +#[cfg(test)] +mod test { + + use crate::tests::*; + + use super::*; + use expect_test::expect; + use recon::{Key, ReconItem}; + use test_log::test; + + #[test(tokio::test)] + async fn hash_range_query() { + let mut store = new_store().await; + recon::Store::insert( + &mut store, + ReconItem::new_key(&random_event_id(Some( + "baeabeiazgwnti363jifhxaeaegbluw4ogcd2t5hsjaglo46wuwcgajqa5u", + ))), + ) + .await + .unwrap(); + recon::Store::insert( + &mut store, + ReconItem::new_key(&random_event_id(Some( + "baeabeihyl35xdlfju3zrkvy2exmnl6wics3rc5ppz7hwg7l7g4brbtnpny", + ))), + ) + .await + .unwrap(); + let hash = + recon::Store::hash_range(&mut store, &random_event_id_min(), &random_event_id_max()) + .await + .unwrap(); + expect!["082F8D30F129E0E26C3136F7FE503E4D30EBDDB1EEFFF1EDEF853F2C96A0898E#2"] + .assert_eq(&format!("{hash}")); + } + + #[test(tokio::test)] + async fn range_query() { + let mut store = new_store().await; + recon::Store::insert( + &mut store, + ReconItem::new_key(&random_event_id(Some( + "baeabeichhhmbhsic4maraneqf5gkhekgzcawhtpj3fh6opjtglznapz524", + ))), + ) + .await + .unwrap(); + recon::Store::insert( + &mut store, + ReconItem::new_key(&random_event_id(Some( + "baeabeibmek7v4ljsu575ohgjhovdxhcw6p6oivgb55hzkeap5po7ghzqty", + ))), + ) + .await + .unwrap(); + let ids = recon::Store::range( + &mut store, + &random_event_id_min(), + &random_event_id_max(), + 0, + usize::MAX, + ) + .await + .unwrap(); + expect![[r#" + [ + EventId { + bytes: "ce010502e320708396e92d964f16d8429ae87f86ead3ca3c010012202c22bf5e2d32a77fd71cc93baa3b9c56f3fce454c1ef4f95100febddf31f309e", + network_id: Some( + 2, + ), + separator: Some( + "e320708396e92d96", + ), + controller: Some( + "4f16d8429ae87f86", + ), + stream_id: Some( + "ead3ca3c", + ), + cid: Some( + "baeabeibmek7v4ljsu575ohgjhovdxhcw6p6oivgb55hzkeap5po7ghzqty", + ), + }, + EventId { + bytes: "ce010502e320708396e92d964f16d8429ae87f86ead3ca3c010012204739d813c902e3011034902f4ca39146c88163cde9d94fe73d3332f2d03f3dd7", + network_id: Some( + 2, + ), + separator: Some( + "e320708396e92d96", + ), + controller: Some( + "4f16d8429ae87f86", + ), + stream_id: Some( + "ead3ca3c", + ), + cid: Some( + "baeabeichhhmbhsic4maraneqf5gkhekgzcawhtpj3fh6opjtglznapz524", + ), + }, + ] + "#]] + .assert_debug_eq(&ids.collect::>()); + } + + #[test(tokio::test)] + async fn range_query_with_values() { + let mut store = new_store().await; + // Write three keys, two with values and one without + let one_id = random_event_id(Some( + "baeabeibmek7v4ljsu575ohgjhovdxhcw6p6oivgb55hzkeap5po7ghzqty", + )); + let two_id = random_event_id(Some( + "baeabeichhhmbhsic4maraneqf5gkhekgzcawhtpj3fh6opjtglznapz524", + )); + let (_one_blocks, one_car) = build_car_file(2).await; + let (_two_blocks, two_car) = build_car_file(3).await; + recon::Store::insert(&mut store, ReconItem::new(&one_id, Some(&one_car))) + .await + .unwrap(); + recon::Store::insert(&mut store, ReconItem::new(&two_id, Some(&two_car))) + .await + .unwrap(); + // Insert new event without a value to ensure we skip it in the query + recon::Store::insert( + &mut store, + ReconItem::new( + &random_event_id(Some( + "baeabeicyxeqioadjgy6v6cpy62a3gngylax54sds7rols2b67yetzaw5r4", + )), + None, + ), + ) + .await + .unwrap(); + let values: Vec<(EventId, Vec)> = recon::Store::range_with_values( + &mut store, + &random_event_id_min(), + &random_event_id_max(), + 0, + usize::MAX, + ) + .await + .unwrap() + .collect(); + + assert_eq!(vec![(one_id, one_car), (two_id, two_car)], values); + } + + #[test(tokio::test)] + async fn double_insert() { + let mut store = new_store().await; + let id = random_event_id(None); + + // first insert reports its a new key + expect![ + r#" + Ok( + true, + ) + "# + ] + .assert_debug_eq(&recon::Store::insert(&mut store, ReconItem::new_key(&id)).await); + + // second insert of same key reports it already existed + expect![ + r#" + Ok( + false, + ) + "# + ] + .assert_debug_eq(&recon::Store::insert(&mut store, ReconItem::new_key(&id)).await); + } + + #[test(tokio::test)] + async fn double_insert_with_value() { + let mut store = new_store().await; + let id = random_event_id(None); + let (_, car) = build_car_file(2).await; + + let item = ReconItem::new_with_value(&id, &car); + + // do take the first one + expect![ + r#" + Ok( + true, + ) + "# + ] + .assert_debug_eq(&recon::Store::insert(&mut store, item.clone()).await); + + // the second insert of same key with value reports it already exists. + // Do not override values + expect![[r#" + Ok( + false, + ) + "#]] + .assert_debug_eq(&recon::Store::insert(&mut store, item).await); + } + + #[test(tokio::test)] + async fn update_missing_value() { + let mut store = new_store().await; + let id = random_event_id(None); + let (_, car) = build_car_file(2).await; + + let item_without_value = ReconItem::new_key(&id); + let item_with_value = ReconItem::new_with_value(&id, &car); + + // do take the first one + expect![ + r#" + Ok( + true, + ) + "# + ] + .assert_debug_eq(&recon::Store::insert(&mut store, item_without_value).await); + + // accept the second insert of same key with the value + expect![[r#" + Ok( + false, + ) + "#]] + .assert_debug_eq(&recon::Store::insert(&mut store, item_with_value).await); + } + + #[test(tokio::test)] + async fn first_and_last() { + let mut store = new_store().await; + let a: Cid = "baeabeiaxz4bp5shtl222y6nbnszvcasj33ogrd2pfjc6torgc7dik3hkxy" + .parse() + .unwrap(); + let b: Cid = "baeabeibxz4bp5shtl222y6nbnszvcasj33ogrd2pfjc6torgc7dik3hkxy" + .parse() + .unwrap(); + let c: Cid = "baeabeicxz4bp5shtl222y6nbnszvcasj33ogrd2pfjc6torgc7dik3hkxy" + .parse() + .unwrap(); + let d: Cid = "baeabeidxz4bp5shtl222y6nbnszvcasj33ogrd2pfjc6torgc7dik3hkxy" + .parse() + .unwrap(); + recon::Store::insert( + &mut store, + ReconItem::new_key(&event_id_builder().with_event(&b).build()), + ) + .await + .unwrap(); + recon::Store::insert( + &mut store, + ReconItem::new_key(&event_id_builder().with_event(&c).build()), + ) + .await + .unwrap(); + + // Only one key in range + let ret = recon::Store::first_and_last( + &mut store, + &event_id_builder().with_event(&a).build_fencepost(), + &event_id_builder().with_event(&c).build_fencepost(), + ) + .await + .unwrap(); + expect![[r#" + Some( + ( + EventId { + bytes: "ce010502e320708396e92d964f16d8429ae87f86ead3ca3c0100122037cf02fec8f35eb5ac79a16cb3510249dedc688f4f2a45e9ba2617c6856ceabe", + network_id: Some( + 2, + ), + separator: Some( + "e320708396e92d96", + ), + controller: Some( + "4f16d8429ae87f86", + ), + stream_id: Some( + "ead3ca3c", + ), + cid: Some( + "baeabeibxz4bp5shtl222y6nbnszvcasj33ogrd2pfjc6torgc7dik3hkxy", + ), + }, + EventId { + bytes: "ce010502e320708396e92d964f16d8429ae87f86ead3ca3c0100122037cf02fec8f35eb5ac79a16cb3510249dedc688f4f2a45e9ba2617c6856ceabe", + network_id: Some( + 2, + ), + separator: Some( + "e320708396e92d96", + ), + controller: Some( + "4f16d8429ae87f86", + ), + stream_id: Some( + "ead3ca3c", + ), + cid: Some( + "baeabeibxz4bp5shtl222y6nbnszvcasj33ogrd2pfjc6torgc7dik3hkxy", + ), + }, + ), + ) + "#]] + .assert_debug_eq(&ret); + + // No keys in range + let ret = recon::Store::first_and_last( + &mut store, + &event_id_builder().with_event(&a).build_fencepost(), + &event_id_builder().with_event(&a).build_fencepost(), + ) + .await + .unwrap(); + expect![[r#" + None + "#]] + .assert_debug_eq(&ret); + + // Two keys in range + let ret = recon::Store::first_and_last( + &mut store, + &event_id_builder().with_event(&a).build_fencepost(), + &event_id_builder().with_event(&d).build_fencepost(), + ) + .await + .unwrap(); + expect![[r#" + Some( + ( + EventId { + bytes: "ce010502e320708396e92d964f16d8429ae87f86ead3ca3c0100122037cf02fec8f35eb5ac79a16cb3510249dedc688f4f2a45e9ba2617c6856ceabe", + network_id: Some( + 2, + ), + separator: Some( + "e320708396e92d96", + ), + controller: Some( + "4f16d8429ae87f86", + ), + stream_id: Some( + "ead3ca3c", + ), + cid: Some( + "baeabeibxz4bp5shtl222y6nbnszvcasj33ogrd2pfjc6torgc7dik3hkxy", + ), + }, + EventId { + bytes: "ce010502e320708396e92d964f16d8429ae87f86ead3ca3c0100122057cf02fec8f35eb5ac79a16cb3510249dedc688f4f2a45e9ba2617c6856ceabe", + network_id: Some( + 2, + ), + separator: Some( + "e320708396e92d96", + ), + controller: Some( + "4f16d8429ae87f86", + ), + stream_id: Some( + "ead3ca3c", + ), + cid: Some( + "baeabeicxz4bp5shtl222y6nbnszvcasj33ogrd2pfjc6torgc7dik3hkxy", + ), + }, + ), + ) + "#]] + .assert_debug_eq(&ret); + } + + #[test(tokio::test)] + async fn store_value_for_key() { + let mut store = new_store().await; + let key = random_event_id(None); + let (_, store_value) = build_car_file(3).await; + recon::Store::insert( + &mut store, + ReconItem::new_with_value(&key, store_value.as_slice()), + ) + .await + .unwrap(); + let value = recon::Store::value_for_key(&mut store, &key) + .await + .unwrap() + .unwrap(); + assert_eq!(hex::encode(store_value), hex::encode(value)); + } + #[test(tokio::test)] + async fn keys_with_missing_value() { + let mut store = new_store().await; + let key = random_event_id(Some( + "baeabeigc5edwvc47ul6belpxk3lgddipri5hw6f347s6ur4pdzwceprqbu", + )); + recon::Store::insert(&mut store, ReconItem::new(&key, None)) + .await + .unwrap(); + let missing_keys = recon::Store::keys_with_missing_values( + &mut store, + (EventId::min_value(), EventId::max_value()).into(), + ) + .await + .unwrap(); + expect![[r#" + [ + EventId { + bytes: "ce010502e320708396e92d964f16d8429ae87f86ead3ca3c01001220c2e9076a8b9fa2fc122df756d6618d0f8a3a7b78bbe7e5ea478f1e6c223e300d", + network_id: Some( + 2, + ), + separator: Some( + "e320708396e92d96", + ), + controller: Some( + "4f16d8429ae87f86", + ), + stream_id: Some( + "ead3ca3c", + ), + cid: Some( + "baeabeigc5edwvc47ul6belpxk3lgddipri5hw6f347s6ur4pdzwceprqbu", + ), + }, + ] + "#]] + .assert_debug_eq(&missing_keys); + + let (_, value) = build_car_file(2).await; + recon::Store::insert(&mut store, ReconItem::new(&key, Some(&value))) + .await + .unwrap(); + let missing_keys = recon::Store::keys_with_missing_values( + &mut store, + (EventId::min_value(), EventId::max_value()).into(), + ) + .await + .unwrap(); + expect![[r#" + [] + "#]] + .assert_debug_eq(&missing_keys); + } + + #[test(tokio::test)] + async fn read_value_as_block() { + let mut store = new_store().await; + let key = random_event_id(None); + let (blocks, store_value) = build_car_file(3).await; + recon::Store::insert( + &mut store, + ReconItem::new_with_value(&key, store_value.as_slice()), + ) + .await + .unwrap(); + let value = recon::Store::value_for_key(&mut store, &key) + .await + .unwrap() + .unwrap(); + assert_eq!(hex::encode(store_value), hex::encode(value)); + + // Read each block from the CAR + for block in blocks { + let value = iroh_bitswap::Store::get(&store, &block.cid).await.unwrap(); + assert_eq!(block, value); + } + } + + // stores 3 keys with 3,5,10 block long CAR files + // each one takes n+1 blocks as it needs to store the root and all blocks so we expect 3+5+10+3=21 + async fn prep_highwater_tests(store: &mut ModelStore) -> (EventId, EventId, EventId) { + let key_a = random_event_id(None); + let key_b = random_event_id(None); + let key_c = random_event_id(None); + for (x, key) in [3, 5, 10].into_iter().zip([&key_a, &key_b, &key_c]) { + let (_blocks, store_value) = build_car_file(x).await; + assert_eq!(_blocks.len(), x); + recon::Store::insert( + store, + ReconItem::new_with_value(key, store_value.as_slice()), + ) + .await + .unwrap(); + } + + (key_a, key_b, key_c) + } + + #[test(tokio::test)] + async fn keys_since_highwater_mark_all() { + let mut store: ModelStore = new_store().await; + let (key_a, key_b, key_c) = prep_highwater_tests(&mut store).await; + + let (hw, res) = store.new_keys_since_value_rowid(0, 10).await.unwrap(); + assert_eq!(3, res.len()); + assert_eq!(22, hw); // see comment in prep_highwater_tests + assert_eq!([key_a, key_b, key_c], res.as_slice()); + } + + #[test(tokio::test)] + async fn keys_since_highwater_mark_limit_1() { + let mut store: ModelStore = new_store().await; + let (key_a, _key_b, _key_c) = prep_highwater_tests(&mut store).await; + + let (hw, res) = store.new_keys_since_value_rowid(0, 1).await.unwrap(); + assert_eq!(1, res.len()); + assert_eq!(5, hw); // see comment in prep_highwater_tests + assert_eq!([key_a], res.as_slice()); + } + + #[test(tokio::test)] + async fn keys_since_highwater_mark_middle_start() { + let mut store: ModelStore = new_store().await; + let (key_a, key_b, key_c) = prep_highwater_tests(&mut store).await; + + // starting at rowid 1 which is in the middle of key A should still return key A + let (hw, res) = store.new_keys_since_value_rowid(1, 2).await.unwrap(); + assert_eq!(2, res.len()); + assert_eq!(11, hw); // see comment in prep_highwater_tests + assert_eq!([key_a, key_b], res.as_slice()); + + let (hw, res) = store.new_keys_since_value_rowid(hw, 1).await.unwrap(); + assert_eq!(1, res.len()); + assert_eq!(22, hw); + assert_eq!([key_c], res.as_slice()); + + let (hw, res) = store.new_keys_since_value_rowid(hw, 1).await.unwrap(); + assert_eq!(0, res.len()); + assert_eq!(22, hw); // previously returned 0 + } +} diff --git a/store/src/tests/mod.rs b/store/src/tests/mod.rs index e8e826a31..96e0cacfa 100644 --- a/store/src/tests/mod.rs +++ b/store/src/tests/mod.rs @@ -9,12 +9,12 @@ use ceramic_core::{ }; use cid::Cid; +use ipld_core::{codec::Codec, ipld, ipld::Ipld}; use iroh_bitswap::Block; use iroh_car::{CarHeader, CarWriter}; -use libipld::{ipld, prelude::Encode, Ipld}; -use libipld_cbor::DagCborCodec; -use multihash::{Code, MultihashDigest}; +use multihash_codetable::{Code, MultihashDigest}; use rand::Rng; +use serde_ipld_dagcbor::codec::DagCborCodec; const MODEL_ID: &str = "k2t6wz4yhfp1r5pwi52gw89nzjbu53qk7m32o5iguw42c6knsaj0feuf927agb"; const CONTROLLER: &str = "did:key:z6Mkqtw7Pj5Lv9xc4PgUYAnwfaVoMC6FRneGWVr5ekTEfKVL"; @@ -63,11 +63,10 @@ pub(crate) async fn build_car_file(count: usize) -> (Vec, Vec) { let root = ipld!( { "links": blocks.iter().map(|block| Ipld::Link(block.cid)).collect::>(), }); - let mut root_bytes = Vec::new(); - root.encode(DagCborCodec, &mut root_bytes).unwrap(); + let root_bytes = serde_ipld_dagcbor::to_vec(&root).unwrap(); let root_cid = Cid::new_v1( - DagCborCodec.into(), - MultihashDigest::digest(&Code::Sha2_256, &root_bytes), + >::CODE, + Code::Sha2_256.digest(&root_bytes), ); let mut car = Vec::new(); let roots: Vec = vec![root_cid]; @@ -83,7 +82,7 @@ pub(crate) async fn build_car_file(count: usize) -> (Vec, Vec) { pub(crate) fn random_block() -> Block { let mut data = [0u8; 1024]; rand::Rng::fill(&mut ::rand::thread_rng(), &mut data); - let hash = ::multihash::MultihashDigest::digest(&::multihash::Code::Sha2_256, &data); + let hash = Code::Sha2_256.digest(&data); Block { cid: Cid::new_v1(0x00, hash), data: data.to_vec().into(), From 696fa3b8f32847680d9a6f19205846f8de18962f Mon Sep 17 00:00:00 2001 From: Nathaniel Cook Date: Tue, 30 Apr 2024 10:00:32 -0600 Subject: [PATCH 2/2] refactor: remove extra source file --- store/src/sqlite/model.rs | 1365 ------------------------------------- 1 file changed, 1365 deletions(-) delete mode 100644 store/src/sqlite/model.rs diff --git a/store/src/sqlite/model.rs b/store/src/sqlite/model.rs deleted file mode 100644 index 128bf765b..000000000 --- a/store/src/sqlite/model.rs +++ /dev/null @@ -1,1365 +0,0 @@ -use std::{collections::BTreeSet, marker::PhantomData}; - -use anyhow::{anyhow, Result}; -use async_trait::async_trait; -use bytes::Bytes; -use ceramic_core::{EventId, RangeOpen}; -use cid::Cid; -use iroh_bitswap::Block; -use iroh_car::{CarHeader, CarReader, CarWriter}; -use itertools::{process_results, Itertools}; -use multihash_codetable::{Code, MultihashDigest}; -use recon::{AssociativeHash, HashCount, InsertResult, Key, ReconItem, Sha256a}; -use sqlx::Row; -use tracing::instrument; - -use crate::{DbTx, SqlitePool}; - -/// Unified implementation of [`recon::Store`] and [`iroh_bitswap::Store`] that can expose the -/// individual blocks from the CAR files directly. -#[derive(Clone, Debug)] -pub struct ModelStore -where - H: AssociativeHash, -{ - pool: SqlitePool, - hash: PhantomData, -} - -#[derive(Debug)] -struct BlockRow { - cid: Cid, - root: bool, - bytes: Vec, -} - -type EventIdError = >>::Error; - -impl ModelStore -where - H: AssociativeHash + std::convert::From<[u32; 8]>, -{ - /// Create an instance of the store initializing any neccessary tables. - pub async fn new(pool: SqlitePool) -> Result { - let store = ModelStore { - pool, - hash: PhantomData, - }; - store.create_table_if_not_exists().await?; - Ok(store) - } - - /// Initialize the recon table. - async fn create_table_if_not_exists(&self) -> Result<()> { - const CREATE_STORE_KEY_TABLE: &str = "CREATE TABLE IF NOT EXISTS model_key ( - key BLOB, -- network_id sort_value controller StreamID height event_cid - ahash_0 INTEGER, -- the ahash is decomposed as [u32; 8] - ahash_1 INTEGER, - ahash_2 INTEGER, - ahash_3 INTEGER, - ahash_4 INTEGER, - ahash_5 INTEGER, - ahash_6 INTEGER, - ahash_7 INTEGER, - value_retrieved BOOL, -- indicates if we have the value - PRIMARY KEY(key) - )"; - const CREATE_VALUE_RETRIEVED_INDEX: &str = - "CREATE INDEX IF NOT EXISTS idx_key_value_retrieved - ON model_key (key, value_retrieved)"; - - const CREATE_MODEL_BLOCK_TABLE: &str = "CREATE TABLE IF NOT EXISTS model_block ( - key BLOB, -- network_id sort_value controller StreamID height event_cid - cid BLOB, -- the cid of the Block as bytes no 0x00 prefix - idx INTEGER, -- the index of the block in the CAR file - root BOOL, -- when true the block is a root in the CAR file - bytes BLOB, -- the Block - PRIMARY KEY(key, cid) - )"; - // TODO should this include idx or not? - const CREATE_BLOCK_ORDER_INDEX: &str = "CREATE INDEX IF NOT EXISTS idx_model_block_cid - ON model_block (cid)"; - - let mut tx = self.pool.tx().await?; - sqlx::query(CREATE_STORE_KEY_TABLE) - .execute(&mut *tx) - .await?; - sqlx::query(CREATE_VALUE_RETRIEVED_INDEX) - .execute(&mut *tx) - .await?; - sqlx::query(CREATE_MODEL_BLOCK_TABLE) - .execute(&mut *tx) - .await?; - sqlx::query(CREATE_BLOCK_ORDER_INDEX) - .execute(&mut *tx) - .await?; - - tx.commit().await?; - Ok(()) - } - - async fn insert_item(&self, item: &ReconItem<'_, EventId>) -> Result<(bool, bool)> { - let mut tx = self.pool.writer().begin().await?; - let (new_key, new_val) = self.insert_item_int(item, &mut tx).await?; - tx.commit().await?; - Ok((new_key, new_val)) - } - - async fn range_with_values_int( - &self, - left_fencepost: &EventId, - right_fencepost: &EventId, - offset: usize, - limit: usize, - ) -> Result)> + Send + 'static>> { - let query = sqlx::query( - " - SELECT - model_block.key, model_block.cid, model_block.root, model_block.bytes - FROM ( - SELECT - key - FROM model_key - WHERE - key > ? AND key < ? - AND value_retrieved = true - ORDER BY - key ASC - LIMIT - ? - OFFSET - ? - ) key - JOIN - model_block - ON - key.key = model_block.key - ORDER BY model_block.key, model_block.idx - ;", - ); - let all_blocks = query - .bind(left_fencepost.as_bytes()) - .bind(right_fencepost.as_bytes()) - .bind(limit as i64) - .bind(offset as i64) - .fetch_all(self.pool.reader()) - .await?; - - // Consume all block into groups of blocks by their key. - let all_blocks: Vec<(EventId, Vec)> = process_results( - all_blocks.into_iter().map( - |row| -> Result<(EventId, cid::CidGeneric<64>, bool, Vec), anyhow::Error> { - let event_id = EventId::try_from(row.get::, _>(0))?; - let cid = Cid::read_bytes(row.get::<&[u8], _>(1))?; - Ok((event_id, cid, row.get(2), row.get(3))) - }, - ), - |blocks| { - blocks - .group_by(|(key, _, _, _)| key.clone()) - .into_iter() - .map(|(key, group)| { - ( - key, - group - .map(|(_key, cid, root, bytes)| BlockRow { cid, root, bytes }) - .collect::>(), - ) - }) - .collect() - }, - )?; - - let mut values: Vec<(EventId, Vec)> = Vec::new(); - for (key, blocks) in all_blocks { - if let Some(value) = self.rebuild_car(blocks).await? { - values.push((key.clone(), value)); - } - } - Ok(Box::new(values.into_iter())) - } - - async fn value_for_key_int(&self, key: &EventId) -> Result>> { - let query = sqlx::query( - " - SELECT - cid, root, bytes - FROM model_block - WHERE - key=? - ORDER BY idx - ;", - ); - let blocks = query - .bind(key.as_bytes()) - .fetch_all(self.pool.reader()) - .await?; - self.rebuild_car( - blocks - .into_iter() - .map(|row| { - Cid::read_bytes(row.get::<&[u8], _>(0)) - .map_err(anyhow::Error::from) - .map(|cid| BlockRow { - cid, - root: row.get(1), - bytes: row.get(2), - }) - }) - .collect::>>()?, - ) - .await - } - - /// returns (new_key, new_val) tuple - async fn insert_item_int( - &self, - item: &ReconItem<'_, EventId>, - conn: &mut DbTx<'_>, - ) -> Result<(bool, bool)> { - // we insert the value first as it's possible we already have the key and can skip that step - // as it happens in a transaction, we'll roll back the value insert if the key insert fails and try again - if let Some(val) = item.value { - // Check if the value_retrieved flag is set and report if the key already exists. - let (key_exists, value_exists) = self.is_value_retrieved_int(item.key, conn).await?; - - if !value_exists { - self.update_value_retrieved_int(item.key, conn).await?; - - // Put each block from the car file - let mut reader = CarReader::new(val).await?; - let roots: BTreeSet = reader.header().roots().iter().cloned().collect(); - let mut idx = 0; - while let Some((cid, data)) = reader.next_block().await? { - self.insert_block_int( - item.key, - idx, - roots.contains(&cid), - cid, - &data.into(), - conn, - ) - .await?; - idx += 1; - } - } - - if key_exists { - return Ok((false, true)); - } - } - let new_key = self - .insert_key_int(item.key, item.value.is_some(), conn) - .await?; - Ok((new_key, item.value.is_some())) - } - - // Read the value_retrieved column. Report if the key exists and the value exists - async fn is_value_retrieved_int( - &self, - key: &EventId, - conn: &mut DbTx<'_>, - ) -> Result<(bool, bool)> { - let query = sqlx::query("SELECT value_retrieved FROM model_key WHERE key = ?"); - let row = query - .bind(key.as_bytes()) - .fetch_optional(&mut **conn) - .await?; - if let Some(row) = row { - Ok((true, row.get(0))) - } else { - Ok((false, false)) - } - } - - // set value_retrieved to true and return if the key already exists - async fn update_value_retrieved_int(&self, key: &EventId, conn: &mut DbTx<'_>) -> Result<()> { - let update = sqlx::query("UPDATE model_key SET value_retrieved = true WHERE key = ?"); - update.bind(key.as_bytes()).execute(&mut **conn).await?; - Ok(()) - } - - // store a block in the db. - async fn insert_block_int( - &self, - key: &EventId, - idx: i32, - root: bool, - cid: Cid, - blob: &Bytes, - conn: &mut DbTx<'_>, - ) -> Result<()> { - let hash = match cid.hash().code() { - 0x12 => Code::Sha2_256.digest(blob), - 0x1b => Code::Keccak256.digest(blob), - 0x11 => return Err(anyhow!("Sha1 not supported")), - _ => { - return Err(anyhow!( - "multihash type {:#x} not Sha2_256, Keccak256", - cid.hash().code(), - )) - } - }; - if cid.hash().to_bytes() != hash.to_bytes() { - return Err(anyhow!( - "cid did not match blob {} != {}", - hex::encode(cid.hash().to_bytes()), - hex::encode(hash.to_bytes()) - )); - } - - sqlx::query("INSERT INTO model_block (key, idx, root, cid, bytes) VALUES (?, ?, ?, ?, ?)") - .bind(key.as_bytes()) - .bind(idx) - .bind(root) - .bind(cid.to_bytes()) - .bind(blob.to_vec()) - .execute(&mut **conn) - .await?; - Ok(()) - } - - async fn insert_key_int( - &self, - key: &EventId, - has_value: bool, - conn: &mut DbTx<'_>, - ) -> Result { - let key_insert = sqlx::query( - "INSERT INTO model_key ( - key, - ahash_0, ahash_1, ahash_2, ahash_3, - ahash_4, ahash_5, ahash_6, ahash_7, - value_retrieved - ) VALUES ( - ?, - ?, ?, ?, ?, - ?, ?, ?, ?, - ? - );", - ); - - let hash = Sha256a::digest(key); - let resp = key_insert - .bind(key.as_bytes()) - .bind(hash.as_u32s()[0]) - .bind(hash.as_u32s()[1]) - .bind(hash.as_u32s()[2]) - .bind(hash.as_u32s()[3]) - .bind(hash.as_u32s()[4]) - .bind(hash.as_u32s()[5]) - .bind(hash.as_u32s()[6]) - .bind(hash.as_u32s()[7]) - .bind(has_value) - .execute(&mut **conn) - .await; - match resp { - std::result::Result::Ok(_rows) => Ok(true), - Err(sqlx::Error::Database(err)) => { - if err.is_unique_violation() { - Ok(false) - } else { - Err(sqlx::Error::Database(err).into()) - } - } - Err(err) => Err(err.into()), - } - } - - async fn rebuild_car(&self, blocks: Vec) -> Result>> { - if blocks.is_empty() { - return Ok(None); - } - - let size = blocks.iter().fold(0, |sum, row| sum + row.bytes.len()); - let roots: Vec = blocks - .iter() - .filter(|row| row.root) - .map(|row| row.cid) - .collect(); - // Reconstruct the car file - // TODO figure out a better capacity calculation - let mut car = Vec::with_capacity(size + 100 * blocks.len()); - let mut writer = CarWriter::new(CarHeader::V1(roots.into()), &mut car); - for BlockRow { - cid, - bytes, - root: _, - } in blocks - { - writer.write(cid, bytes).await?; - } - writer.finish().await?; - Ok(Some(car)) - } - - /// Returns all the keys found after the given row_id. - /// Uses the rowid of the value (block) table and makes sure to flatten keys - /// when there are multiple blocks for a single key. This relies on the fact that - /// we insert the blocks in order inside a transaction and that we don't delete, which - /// means that the all the entries for a key will be contiguous. - pub async fn new_keys_since_value_rowid( - &self, - row_id: i64, - limit: i64, - ) -> Result<(i64, Vec)> { - let query = sqlx::query( - "WITH entries AS ( - SELECT key, MAX(rowid) as max_rowid - FROM model_block - WHERE rowid >= ? -- we return rowid+1 so we must match it next search - GROUP BY key - ORDER BY rowid - LIMIT ? - ) - SELECT - key, - (SELECT MAX(max_rowid) + 1 FROM entries) as new_highwater_mark - from entries;", - ); - let rows = query - .bind(row_id) - .bind(limit) - .fetch_all(self.pool.reader()) - .await?; - // every row has the same new_highwater_mark value - let row_id: i64 = rows - .first() - .and_then(|r| r.get("new_highwater_mark")) - .unwrap_or(row_id); - let rows = rows - .into_iter() - .map(|row| { - let bytes: Vec = row.get(0); - EventId::try_from(bytes) - }) - .collect::, EventIdError>>()?; - - Ok((row_id, rows)) - } -} - -#[async_trait] -impl recon::Store for ModelStore -where - H: AssociativeHash, -{ - type Key = EventId; - type Hash = H; - - /// Returns true if the key was new. The value is always updated if included - async fn insert(&mut self, item: ReconItem<'_, Self::Key>) -> Result { - let (new, _new_val) = self.insert_item(&item).await?; - Ok(new) - } - - /// Insert new keys into the key space. - /// Returns true if a key did not previously exist. - async fn insert_many<'a, I>(&mut self, items: I) -> Result - where - I: ExactSizeIterator> + Send + Sync, - { - match items.len() { - 0 => Ok(InsertResult::new(vec![], 0)), - _ => { - let mut results = vec![false; items.len()]; - let mut new_val_cnt = 0; - let mut tx = self.pool.writer().begin().await?; - - for (idx, item) in items.enumerate() { - let (new_key, new_val) = self.insert_item_int(&item, &mut tx).await?; - results[idx] = new_key; - if new_val { - new_val_cnt += 1; - } - } - tx.commit().await?; - Ok(InsertResult::new(results, new_val_cnt)) - } - } - } - - /// return the hash and count for a range - #[instrument(skip(self))] - async fn hash_range( - &mut self, - left_fencepost: &Self::Key, - right_fencepost: &Self::Key, - ) -> Result> { - if left_fencepost >= right_fencepost { - return Ok(HashCount::new(H::identity(), 0)); - } - - let query = sqlx::query( - "SELECT - TOTAL(ahash_0) & 0xFFFFFFFF, TOTAL(ahash_1) & 0xFFFFFFFF, - TOTAL(ahash_2) & 0xFFFFFFFF, TOTAL(ahash_3) & 0xFFFFFFFF, - TOTAL(ahash_4) & 0xFFFFFFFF, TOTAL(ahash_5) & 0xFFFFFFFF, - TOTAL(ahash_6) & 0xFFFFFFFF, TOTAL(ahash_7) & 0xFFFFFFFF, - COUNT(1) - FROM model_key WHERE key > ? AND key < ?;", - ); - let row = query - .bind(left_fencepost.as_bytes()) - .bind(right_fencepost.as_bytes()) - .fetch_one(self.pool.reader()) - .await?; - let bytes: [u32; 8] = [ - row.get(0), - row.get(1), - row.get(2), - row.get(3), - row.get(4), - row.get(5), - row.get(6), - row.get(7), - ]; - let count: i64 = row.get(8); // sql int type is signed - let count: u64 = count - .try_into() - .expect("COUNT(1) should never return a negative number"); - Ok(HashCount::new(H::from(bytes), count)) - } - - #[instrument(skip(self))] - async fn range( - &mut self, - left_fencepost: &Self::Key, - right_fencepost: &Self::Key, - offset: usize, - limit: usize, - ) -> Result + Send + 'static>> { - let query = sqlx::query( - " - SELECT - key - FROM - model_key - WHERE - key > ? AND key < ? - ORDER BY - key ASC - LIMIT - ? - OFFSET - ?; - ", - ); - let rows = query - .bind(left_fencepost.as_bytes()) - .bind(right_fencepost.as_bytes()) - .bind(limit as i64) - .bind(offset as i64) - .fetch_all(self.pool.reader()) - .await?; - let rows = rows - .into_iter() - .map(|row| { - let bytes: Vec = row.get(0); - EventId::try_from(bytes) - }) - .collect::, EventIdError>>()?; - Ok(Box::new(rows.into_iter())) - } - #[instrument(skip(self))] - async fn range_with_values( - &mut self, - left_fencepost: &Self::Key, - right_fencepost: &Self::Key, - offset: usize, - limit: usize, - ) -> Result)> + Send + 'static>> { - self.range_with_values_int(left_fencepost, right_fencepost, offset, limit) - .await - } - - /// Return the number of keys within the range. - #[instrument(skip(self))] - async fn count( - &mut self, - left_fencepost: &Self::Key, - right_fencepost: &Self::Key, - ) -> Result { - let query = sqlx::query( - " - SELECT - count(key) - FROM - model_key - WHERE - key > ? AND key < ? - ;", - ); - let row = query - .bind(left_fencepost.as_bytes()) - .bind(right_fencepost.as_bytes()) - .fetch_one(self.pool.reader()) - .await?; - Ok(row.get::<'_, i64, _>(0) as usize) - } - - /// Return the first key within the range. - #[instrument(skip(self))] - async fn first( - &mut self, - left_fencepost: &Self::Key, - right_fencepost: &Self::Key, - ) -> Result> { - let query = sqlx::query( - " - SELECT - key - FROM - model_key - WHERE - key > ? AND key < ? - ORDER BY - key ASC - LIMIT - 1 - ; ", - ); - let rows = query - .bind(left_fencepost.as_bytes()) - .bind(right_fencepost.as_bytes()) - .fetch_all(self.pool.reader()) - .await?; - Ok(rows - .first() - .map(|row| { - let bytes: Vec = row.get(0); - EventId::try_from(bytes) - }) - .transpose()?) - } - - #[instrument(skip(self))] - async fn last( - &mut self, - left_fencepost: &Self::Key, - right_fencepost: &Self::Key, - ) -> Result> { - let query = sqlx::query( - " - SELECT - key - FROM - model_key - WHERE - key > ? AND key < ? - ORDER BY - key DESC - LIMIT - 1 - ;", - ); - let rows = query - .bind(left_fencepost.as_bytes()) - .bind(right_fencepost.as_bytes()) - .fetch_all(self.pool.reader()) - .await?; - Ok(rows - .first() - .map(|row| { - let bytes: Vec = row.get(0); - EventId::try_from(bytes) - }) - .transpose()?) - } - - #[instrument(skip(self))] - async fn first_and_last( - &mut self, - left_fencepost: &Self::Key, - right_fencepost: &Self::Key, - ) -> Result> { - let query = sqlx::query( - " - SELECT first.key, last.key - FROM - ( - SELECT key - FROM model_key - WHERE - key > ? AND key < ? - ORDER BY key ASC - LIMIT 1 - ) as first - JOIN - ( - SELECT key - FROM model_key - WHERE - key > ? AND key < ? - ORDER BY key DESC - LIMIT 1 - ) as last - ;", - ); - let rows = query - .bind(left_fencepost.as_bytes()) - .bind(right_fencepost.as_bytes()) - .bind(left_fencepost.as_bytes()) - .bind(right_fencepost.as_bytes()) - .fetch_all(self.pool.reader()) - .await?; - if let Some(row) = rows.first() { - let first = EventId::try_from(row.get::, _>(0))?; - let last = EventId::try_from(row.get::, _>(1))?; - Ok(Some((first, last))) - } else { - Ok(None) - } - } - - #[instrument(skip(self))] - async fn value_for_key(&mut self, key: &Self::Key) -> Result>> { - self.value_for_key_int(key).await - } - - #[instrument(skip(self))] - async fn keys_with_missing_values( - &mut self, - range: RangeOpen, - ) -> Result> { - if range.start >= range.end { - return Ok(vec![]); - }; - let query = sqlx::query( - " - SELECT key - FROM model_key - WHERE - key > ? - AND key < ? - AND value_retrieved = false - ;", - ); - let row = query - .bind(range.start.as_bytes()) - .bind(range.end.as_bytes()) - .fetch_all(self.pool.reader()) - .await?; - Ok(row - .into_iter() - .map(|row| EventId::try_from(row.get::, _>(0))) - .collect::, EventIdError>>()?) - } -} - -#[async_trait] -impl iroh_bitswap::Store for ModelStore { - async fn get_size(&self, cid: &Cid) -> Result { - Ok( - sqlx::query("SELECT length(bytes) FROM model_block WHERE cid = ?;") - .bind(cid.to_bytes()) - .fetch_one(self.pool.reader()) - .await? - .get::<'_, i64, _>(0) as usize, - ) - } - - async fn get(&self, cid: &Cid) -> Result { - Ok(Block::new( - sqlx::query("SELECT bytes FROM model_block WHERE cid = ?;") - .bind(cid.to_bytes()) - .fetch_one(self.pool.reader()) - .await? - .get::<'_, Vec, _>(0) - .into(), - cid.to_owned(), - )) - } - - async fn has(&self, cid: &Cid) -> Result { - Ok( - sqlx::query("SELECT count(1) FROM model_block WHERE cid = ?;") - .bind(cid.to_bytes()) - .fetch_one(self.pool.reader()) - .await? - .get::<'_, i64, _>(0) - > 0, - ) - } -} - -/// We intentionally expose the store to the API, separately from the recon::Store trait. -/// This allows us better control over the API functionality, particularly CRUD, that are related -/// to recon, but not explicitly part of the recon protocol. Eventually, it might be nice to reduce the -/// scope of the recon::Store trait (or remove the &mut self requirement), but for now we have both. -/// Anything that implements `ceramic_api::AccessModelStore` should also implement `recon::Store`. -/// This guarantees that regardless of entry point (api or recon), the data is stored and retrieved in the same way. -#[async_trait::async_trait] -impl ceramic_api::AccessModelStore for ModelStore { - type Key = EventId; - type Hash = Sha256a; - - async fn insert(&self, key: Self::Key, value: Option>) -> Result<(bool, bool)> { - self.insert_item(&ReconItem::new(&key, value.as_deref())) - .await - } - - async fn range_with_values( - &self, - start: Self::Key, - end: Self::Key, - offset: usize, - limit: usize, - ) -> Result)>> { - let res = self - .range_with_values_int(&start, &end, offset, limit) - .await?; - Ok(res.collect()) - } - async fn value_for_key(&self, key: Self::Key) -> Result>> { - self.value_for_key_int(&key).await - } - - async fn keys_since_highwater_mark( - &self, - highwater: i64, - limit: i64, - ) -> anyhow::Result<(i64, Vec)> { - self.new_keys_since_value_rowid(highwater, limit).await - } -} - -#[cfg(test)] -mod test { - - use crate::tests::*; - - use super::*; - use expect_test::expect; - use recon::{Key, ReconItem}; - use test_log::test; - - #[test(tokio::test)] - async fn hash_range_query() { - let mut store = new_store().await; - recon::Store::insert( - &mut store, - ReconItem::new_key(&random_event_id(Some( - "baeabeiazgwnti363jifhxaeaegbluw4ogcd2t5hsjaglo46wuwcgajqa5u", - ))), - ) - .await - .unwrap(); - recon::Store::insert( - &mut store, - ReconItem::new_key(&random_event_id(Some( - "baeabeihyl35xdlfju3zrkvy2exmnl6wics3rc5ppz7hwg7l7g4brbtnpny", - ))), - ) - .await - .unwrap(); - let hash = - recon::Store::hash_range(&mut store, &random_event_id_min(), &random_event_id_max()) - .await - .unwrap(); - expect!["082F8D30F129E0E26C3136F7FE503E4D30EBDDB1EEFFF1EDEF853F2C96A0898E#2"] - .assert_eq(&format!("{hash}")); - } - - #[test(tokio::test)] - async fn range_query() { - let mut store = new_store().await; - recon::Store::insert( - &mut store, - ReconItem::new_key(&random_event_id(Some( - "baeabeichhhmbhsic4maraneqf5gkhekgzcawhtpj3fh6opjtglznapz524", - ))), - ) - .await - .unwrap(); - recon::Store::insert( - &mut store, - ReconItem::new_key(&random_event_id(Some( - "baeabeibmek7v4ljsu575ohgjhovdxhcw6p6oivgb55hzkeap5po7ghzqty", - ))), - ) - .await - .unwrap(); - let ids = recon::Store::range( - &mut store, - &random_event_id_min(), - &random_event_id_max(), - 0, - usize::MAX, - ) - .await - .unwrap(); - expect![[r#" - [ - EventId { - bytes: "ce010502e320708396e92d964f16d8429ae87f86ead3ca3c010012202c22bf5e2d32a77fd71cc93baa3b9c56f3fce454c1ef4f95100febddf31f309e", - network_id: Some( - 2, - ), - separator: Some( - "e320708396e92d96", - ), - controller: Some( - "4f16d8429ae87f86", - ), - stream_id: Some( - "ead3ca3c", - ), - cid: Some( - "baeabeibmek7v4ljsu575ohgjhovdxhcw6p6oivgb55hzkeap5po7ghzqty", - ), - }, - EventId { - bytes: "ce010502e320708396e92d964f16d8429ae87f86ead3ca3c010012204739d813c902e3011034902f4ca39146c88163cde9d94fe73d3332f2d03f3dd7", - network_id: Some( - 2, - ), - separator: Some( - "e320708396e92d96", - ), - controller: Some( - "4f16d8429ae87f86", - ), - stream_id: Some( - "ead3ca3c", - ), - cid: Some( - "baeabeichhhmbhsic4maraneqf5gkhekgzcawhtpj3fh6opjtglznapz524", - ), - }, - ] - "#]] - .assert_debug_eq(&ids.collect::>()); - } - - #[test(tokio::test)] - async fn range_query_with_values() { - let mut store = new_store().await; - // Write three keys, two with values and one without - let one_id = random_event_id(Some( - "baeabeibmek7v4ljsu575ohgjhovdxhcw6p6oivgb55hzkeap5po7ghzqty", - )); - let two_id = random_event_id(Some( - "baeabeichhhmbhsic4maraneqf5gkhekgzcawhtpj3fh6opjtglznapz524", - )); - let (_one_blocks, one_car) = build_car_file(2).await; - let (_two_blocks, two_car) = build_car_file(3).await; - recon::Store::insert(&mut store, ReconItem::new(&one_id, Some(&one_car))) - .await - .unwrap(); - recon::Store::insert(&mut store, ReconItem::new(&two_id, Some(&two_car))) - .await - .unwrap(); - // Insert new event without a value to ensure we skip it in the query - recon::Store::insert( - &mut store, - ReconItem::new( - &random_event_id(Some( - "baeabeicyxeqioadjgy6v6cpy62a3gngylax54sds7rols2b67yetzaw5r4", - )), - None, - ), - ) - .await - .unwrap(); - let values: Vec<(EventId, Vec)> = recon::Store::range_with_values( - &mut store, - &random_event_id_min(), - &random_event_id_max(), - 0, - usize::MAX, - ) - .await - .unwrap() - .collect(); - - assert_eq!(vec![(one_id, one_car), (two_id, two_car)], values); - } - - #[test(tokio::test)] - async fn double_insert() { - let mut store = new_store().await; - let id = random_event_id(None); - - // first insert reports its a new key - expect![ - r#" - Ok( - true, - ) - "# - ] - .assert_debug_eq(&recon::Store::insert(&mut store, ReconItem::new_key(&id)).await); - - // second insert of same key reports it already existed - expect![ - r#" - Ok( - false, - ) - "# - ] - .assert_debug_eq(&recon::Store::insert(&mut store, ReconItem::new_key(&id)).await); - } - - #[test(tokio::test)] - async fn double_insert_with_value() { - let mut store = new_store().await; - let id = random_event_id(None); - let (_, car) = build_car_file(2).await; - - let item = ReconItem::new_with_value(&id, &car); - - // do take the first one - expect![ - r#" - Ok( - true, - ) - "# - ] - .assert_debug_eq(&recon::Store::insert(&mut store, item.clone()).await); - - // the second insert of same key with value reports it already exists. - // Do not override values - expect![[r#" - Ok( - false, - ) - "#]] - .assert_debug_eq(&recon::Store::insert(&mut store, item).await); - } - - #[test(tokio::test)] - async fn update_missing_value() { - let mut store = new_store().await; - let id = random_event_id(None); - let (_, car) = build_car_file(2).await; - - let item_without_value = ReconItem::new_key(&id); - let item_with_value = ReconItem::new_with_value(&id, &car); - - // do take the first one - expect![ - r#" - Ok( - true, - ) - "# - ] - .assert_debug_eq(&recon::Store::insert(&mut store, item_without_value).await); - - // accept the second insert of same key with the value - expect![[r#" - Ok( - false, - ) - "#]] - .assert_debug_eq(&recon::Store::insert(&mut store, item_with_value).await); - } - - #[test(tokio::test)] - async fn first_and_last() { - let mut store = new_store().await; - let a: Cid = "baeabeiaxz4bp5shtl222y6nbnszvcasj33ogrd2pfjc6torgc7dik3hkxy" - .parse() - .unwrap(); - let b: Cid = "baeabeibxz4bp5shtl222y6nbnszvcasj33ogrd2pfjc6torgc7dik3hkxy" - .parse() - .unwrap(); - let c: Cid = "baeabeicxz4bp5shtl222y6nbnszvcasj33ogrd2pfjc6torgc7dik3hkxy" - .parse() - .unwrap(); - let d: Cid = "baeabeidxz4bp5shtl222y6nbnszvcasj33ogrd2pfjc6torgc7dik3hkxy" - .parse() - .unwrap(); - recon::Store::insert( - &mut store, - ReconItem::new_key(&event_id_builder().with_event(&b).build()), - ) - .await - .unwrap(); - recon::Store::insert( - &mut store, - ReconItem::new_key(&event_id_builder().with_event(&c).build()), - ) - .await - .unwrap(); - - // Only one key in range - let ret = recon::Store::first_and_last( - &mut store, - &event_id_builder().with_event(&a).build_fencepost(), - &event_id_builder().with_event(&c).build_fencepost(), - ) - .await - .unwrap(); - expect![[r#" - Some( - ( - EventId { - bytes: "ce010502e320708396e92d964f16d8429ae87f86ead3ca3c0100122037cf02fec8f35eb5ac79a16cb3510249dedc688f4f2a45e9ba2617c6856ceabe", - network_id: Some( - 2, - ), - separator: Some( - "e320708396e92d96", - ), - controller: Some( - "4f16d8429ae87f86", - ), - stream_id: Some( - "ead3ca3c", - ), - cid: Some( - "baeabeibxz4bp5shtl222y6nbnszvcasj33ogrd2pfjc6torgc7dik3hkxy", - ), - }, - EventId { - bytes: "ce010502e320708396e92d964f16d8429ae87f86ead3ca3c0100122037cf02fec8f35eb5ac79a16cb3510249dedc688f4f2a45e9ba2617c6856ceabe", - network_id: Some( - 2, - ), - separator: Some( - "e320708396e92d96", - ), - controller: Some( - "4f16d8429ae87f86", - ), - stream_id: Some( - "ead3ca3c", - ), - cid: Some( - "baeabeibxz4bp5shtl222y6nbnszvcasj33ogrd2pfjc6torgc7dik3hkxy", - ), - }, - ), - ) - "#]] - .assert_debug_eq(&ret); - - // No keys in range - let ret = recon::Store::first_and_last( - &mut store, - &event_id_builder().with_event(&a).build_fencepost(), - &event_id_builder().with_event(&a).build_fencepost(), - ) - .await - .unwrap(); - expect![[r#" - None - "#]] - .assert_debug_eq(&ret); - - // Two keys in range - let ret = recon::Store::first_and_last( - &mut store, - &event_id_builder().with_event(&a).build_fencepost(), - &event_id_builder().with_event(&d).build_fencepost(), - ) - .await - .unwrap(); - expect![[r#" - Some( - ( - EventId { - bytes: "ce010502e320708396e92d964f16d8429ae87f86ead3ca3c0100122037cf02fec8f35eb5ac79a16cb3510249dedc688f4f2a45e9ba2617c6856ceabe", - network_id: Some( - 2, - ), - separator: Some( - "e320708396e92d96", - ), - controller: Some( - "4f16d8429ae87f86", - ), - stream_id: Some( - "ead3ca3c", - ), - cid: Some( - "baeabeibxz4bp5shtl222y6nbnszvcasj33ogrd2pfjc6torgc7dik3hkxy", - ), - }, - EventId { - bytes: "ce010502e320708396e92d964f16d8429ae87f86ead3ca3c0100122057cf02fec8f35eb5ac79a16cb3510249dedc688f4f2a45e9ba2617c6856ceabe", - network_id: Some( - 2, - ), - separator: Some( - "e320708396e92d96", - ), - controller: Some( - "4f16d8429ae87f86", - ), - stream_id: Some( - "ead3ca3c", - ), - cid: Some( - "baeabeicxz4bp5shtl222y6nbnszvcasj33ogrd2pfjc6torgc7dik3hkxy", - ), - }, - ), - ) - "#]] - .assert_debug_eq(&ret); - } - - #[test(tokio::test)] - async fn store_value_for_key() { - let mut store = new_store().await; - let key = random_event_id(None); - let (_, store_value) = build_car_file(3).await; - recon::Store::insert( - &mut store, - ReconItem::new_with_value(&key, store_value.as_slice()), - ) - .await - .unwrap(); - let value = recon::Store::value_for_key(&mut store, &key) - .await - .unwrap() - .unwrap(); - assert_eq!(hex::encode(store_value), hex::encode(value)); - } - #[test(tokio::test)] - async fn keys_with_missing_value() { - let mut store = new_store().await; - let key = random_event_id(Some( - "baeabeigc5edwvc47ul6belpxk3lgddipri5hw6f347s6ur4pdzwceprqbu", - )); - recon::Store::insert(&mut store, ReconItem::new(&key, None)) - .await - .unwrap(); - let missing_keys = recon::Store::keys_with_missing_values( - &mut store, - (EventId::min_value(), EventId::max_value()).into(), - ) - .await - .unwrap(); - expect![[r#" - [ - EventId { - bytes: "ce010502e320708396e92d964f16d8429ae87f86ead3ca3c01001220c2e9076a8b9fa2fc122df756d6618d0f8a3a7b78bbe7e5ea478f1e6c223e300d", - network_id: Some( - 2, - ), - separator: Some( - "e320708396e92d96", - ), - controller: Some( - "4f16d8429ae87f86", - ), - stream_id: Some( - "ead3ca3c", - ), - cid: Some( - "baeabeigc5edwvc47ul6belpxk3lgddipri5hw6f347s6ur4pdzwceprqbu", - ), - }, - ] - "#]] - .assert_debug_eq(&missing_keys); - - let (_, value) = build_car_file(2).await; - recon::Store::insert(&mut store, ReconItem::new(&key, Some(&value))) - .await - .unwrap(); - let missing_keys = recon::Store::keys_with_missing_values( - &mut store, - (EventId::min_value(), EventId::max_value()).into(), - ) - .await - .unwrap(); - expect![[r#" - [] - "#]] - .assert_debug_eq(&missing_keys); - } - - #[test(tokio::test)] - async fn read_value_as_block() { - let mut store = new_store().await; - let key = random_event_id(None); - let (blocks, store_value) = build_car_file(3).await; - recon::Store::insert( - &mut store, - ReconItem::new_with_value(&key, store_value.as_slice()), - ) - .await - .unwrap(); - let value = recon::Store::value_for_key(&mut store, &key) - .await - .unwrap() - .unwrap(); - assert_eq!(hex::encode(store_value), hex::encode(value)); - - // Read each block from the CAR - for block in blocks { - let value = iroh_bitswap::Store::get(&store, &block.cid).await.unwrap(); - assert_eq!(block, value); - } - } - - // stores 3 keys with 3,5,10 block long CAR files - // each one takes n+1 blocks as it needs to store the root and all blocks so we expect 3+5+10+3=21 - async fn prep_highwater_tests(store: &mut ModelStore) -> (EventId, EventId, EventId) { - let key_a = random_event_id(None); - let key_b = random_event_id(None); - let key_c = random_event_id(None); - for (x, key) in [3, 5, 10].into_iter().zip([&key_a, &key_b, &key_c]) { - let (_blocks, store_value) = build_car_file(x).await; - assert_eq!(_blocks.len(), x); - recon::Store::insert( - store, - ReconItem::new_with_value(key, store_value.as_slice()), - ) - .await - .unwrap(); - } - - (key_a, key_b, key_c) - } - - #[test(tokio::test)] - async fn keys_since_highwater_mark_all() { - let mut store: ModelStore = new_store().await; - let (key_a, key_b, key_c) = prep_highwater_tests(&mut store).await; - - let (hw, res) = store.new_keys_since_value_rowid(0, 10).await.unwrap(); - assert_eq!(3, res.len()); - assert_eq!(22, hw); // see comment in prep_highwater_tests - assert_eq!([key_a, key_b, key_c], res.as_slice()); - } - - #[test(tokio::test)] - async fn keys_since_highwater_mark_limit_1() { - let mut store: ModelStore = new_store().await; - let (key_a, _key_b, _key_c) = prep_highwater_tests(&mut store).await; - - let (hw, res) = store.new_keys_since_value_rowid(0, 1).await.unwrap(); - assert_eq!(1, res.len()); - assert_eq!(5, hw); // see comment in prep_highwater_tests - assert_eq!([key_a], res.as_slice()); - } - - #[test(tokio::test)] - async fn keys_since_highwater_mark_middle_start() { - let mut store: ModelStore = new_store().await; - let (key_a, key_b, key_c) = prep_highwater_tests(&mut store).await; - - // starting at rowid 1 which is in the middle of key A should still return key A - let (hw, res) = store.new_keys_since_value_rowid(1, 2).await.unwrap(); - assert_eq!(2, res.len()); - assert_eq!(11, hw); // see comment in prep_highwater_tests - assert_eq!([key_a, key_b], res.as_slice()); - - let (hw, res) = store.new_keys_since_value_rowid(hw, 1).await.unwrap(); - assert_eq!(1, res.len()); - assert_eq!(22, hw); - assert_eq!([key_c], res.as_slice()); - - let (hw, res) = store.new_keys_since_value_rowid(hw, 1).await.unwrap(); - assert_eq!(0, res.len()); - assert_eq!(22, hw); // previously returned 0 - } -}