diff --git a/.gitignore b/.gitignore index cef4277..cad44ec 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,7 @@ /target .idea *.db* +*.redb* *.skey *.vkey pooltool.json diff --git a/Cargo.lock b/Cargo.lock index cfb1b7a..c28965e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -362,6 +362,15 @@ dependencies = [ "num-traits", ] +[[package]] +name = "bincode" +version = "1.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b1f45e9417d87227c7a56d22e471c6206462cba514c7590c09aff4cf6d1ddcad" +dependencies = [ + "serde", +] + [[package]] name = "bitflags" version = "1.3.2" @@ -500,6 +509,7 @@ dependencies = [ "autotools", "bech32 0.11.0", "bigdecimal", + "bincode", "blake2b_simd", "built", "byteorder", @@ -514,13 +524,14 @@ dependencies = [ "minicbor 0.24.2", "num-bigint", "num-rational", - "pallas-crypto 0.30.0 (git+https://github.com/txpipe/pallas?rev=d3084d6e3209ee0f507a413b1a2491e07abf3756)", + "pallas-crypto 0.30.0 (git+https://github.com/txpipe/pallas?rev=f82bc469264a7b209a97edef83f20b6849bf8315)", "pallas-network", "pallas-traverse", "pkg-config", "pretty_env_logger", "rand", "rayon", + "redb", "regex", "reqwest", "rusqlite", @@ -534,6 +545,7 @@ dependencies = [ "tokio", "tracing", "tracing-subscriber", + "uuid", ] [[package]] @@ -1438,7 +1450,7 @@ dependencies = [ [[package]] name = "pallas-codec" version = "0.30.0" -source = "git+https://github.com/txpipe/pallas?rev=d3084d6e3209ee0f507a413b1a2491e07abf3756#d3084d6e3209ee0f507a413b1a2491e07abf3756" +source = "git+https://github.com/txpipe/pallas?rev=f82bc469264a7b209a97edef83f20b6849bf8315#f82bc469264a7b209a97edef83f20b6849bf8315" dependencies = [ "hex", "minicbor 0.20.0", @@ -1463,11 +1475,11 @@ dependencies = [ [[package]] name = "pallas-crypto" version = "0.30.0" -source = "git+https://github.com/txpipe/pallas?rev=d3084d6e3209ee0f507a413b1a2491e07abf3756#d3084d6e3209ee0f507a413b1a2491e07abf3756" +source = "git+https://github.com/txpipe/pallas?rev=f82bc469264a7b209a97edef83f20b6849bf8315#f82bc469264a7b209a97edef83f20b6849bf8315" dependencies = [ "cryptoxide", "hex", - "pallas-codec 0.30.0 (git+https://github.com/txpipe/pallas?rev=d3084d6e3209ee0f507a413b1a2491e07abf3756)", + "pallas-codec 0.30.0 (git+https://github.com/txpipe/pallas?rev=f82bc469264a7b209a97edef83f20b6849bf8315)", "rand_core", "serde", "thiserror", @@ -1826,6 +1838,15 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "redb" +version = "2.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a6dd20d3cdeb9c7d2366a0b16b93b35b75aec15309fbeb7ce477138c9f68c8c0" +dependencies = [ + "libc", +] + [[package]] name = "regex" version = "1.10.6" @@ -2460,6 +2481,15 @@ version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" +[[package]] +name = "uuid" +version = "1.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "81dfa00651efa65069b0b6b651f4aaa31ba9e3c3ce0137aaad053604ee7e0314" +dependencies = [ + "getrandom", +] + [[package]] name = "valuable" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index 16cd656..ad3e449 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,6 +11,7 @@ links = "libsodium" [dependencies] async-std = "1.12" bech32 = "0.11" +bincode = "1.3.3" bigdecimal = "0.4" num-bigint = "0.4" num-rational = "0.4" @@ -18,7 +19,7 @@ blake2b_simd = "1.0" byteorder = "1.5" #pallas-network = { git = "https://github.com/AndrewWestberg/pallas", rev="35f693c57eec5f70c4f8e2f6a24445b14c6104b9" } #pallas-traverse = { git = "https://github.com/AndrewWestberg/pallas", rev="35f693c57eec5f70c4f8e2f6a24445b14c6104b9" } -pallas-crypto = { git = "https://github.com/txpipe/pallas", rev = "d3084d6e3209ee0f507a413b1a2491e07abf3756" } +pallas-crypto = { git = "https://github.com/txpipe/pallas", rev = "f82bc469264a7b209a97edef83f20b6849bf8315" } pallas-network = "0.30" pallas-traverse = "0.30" #pallas-crypto = "0.30.0" @@ -28,6 +29,7 @@ futures = "0.3" hex = "0.4" libc = "0.2" minicbor = { version = "0.24", features = ["std"] } +redb = "2.1.1" regex = "1.10" reqwest = { version = "0.12", default-features = false, features = ["blocking", "rustls-tls-webpki-roots", "rustls-tls", "json", "gzip", "deflate"] } rusqlite = { version = "0.32", features = ["bundled"] } @@ -44,6 +46,7 @@ tokio = { version = "1", features = ["rt", "rt-multi-thread", "net", "io-util", thiserror = "1.0" tracing = "0.1" tracing-subscriber = "0.3" +uuid = { version = "1", features = ["v7"] } # logging diff --git a/src/lib.rs b/src/lib.rs index f70fa12..0328825 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,544 +1,516 @@ -pub mod nodeclient { - use std::fs::File; - use std::io::{stdout, BufReader}; - use std::path::{Path, PathBuf}; - use std::str::FromStr; - use std::string::ParseError; - use std::thread; - use std::thread::JoinHandle; +use std::io::stdout; +use std::path::PathBuf; +use std::str::FromStr; +use std::string::ParseError; +use std::thread; +use std::thread::JoinHandle; - use serde::Deserialize; - use structopt::StructOpt; +use structopt::StructOpt; - use crate::nodeclient::leaderlog::handle_error; +use crate::nodeclient::leaderlog::handle_error; +use crate::nodeclient::sync::pooltool; +use crate::nodeclient::sync::pooltool::PooltoolConfig; +use crate::nodeclient::{leaderlog, ping, sign, snapshot, sync, validate}; - pub mod leaderlog; - pub mod math; - pub mod ping; - pub mod pooltool; - pub mod signing; - pub mod snapshot; - pub mod sqlite; - pub mod sync; - mod validate; +pub(crate) mod nodeclient; - pub static APP_USER_AGENT: &str = concat!(env!("CARGO_PKG_NAME"), "/", env!("CARGO_PKG_VERSION"),); +pub static APP_USER_AGENT: &str = concat!(env!("CARGO_PKG_NAME"), "/", env!("CARGO_PKG_VERSION"),); - #[derive(Debug)] - pub enum LedgerSet { - Mark, - Set, - Go, - } +#[derive(Debug)] +pub enum LedgerSet { + Mark, + Set, + Go, +} - impl FromStr for LedgerSet { - type Err = ParseError; - fn from_str(ledger_set: &str) -> Result { - match ledger_set { - "next" => Ok(LedgerSet::Mark), - "current" => Ok(LedgerSet::Set), - "prev" => Ok(LedgerSet::Go), - _ => Ok(LedgerSet::Set), - } +impl FromStr for LedgerSet { + type Err = ParseError; + fn from_str(ledger_set: &str) -> Result { + match ledger_set { + "next" => Ok(LedgerSet::Mark), + "current" => Ok(LedgerSet::Set), + "prev" => Ok(LedgerSet::Go), + _ => Ok(LedgerSet::Set), } } +} - #[derive(Debug, StructOpt)] - pub enum Command { - Ping { - #[structopt(short, long, help = "cardano-node hostname to connect to")] - host: String, - #[structopt(short, long, default_value = "3001", help = "cardano-node port")] - port: u16, - #[structopt(long, default_value = "764824073", help = "network magic.")] - network_magic: u64, - #[structopt(short, long, default_value = "2", help = "connect timeout in seconds")] - timeout_seconds: u64, - }, - Validate { - #[structopt(long, help = "full or partial block hash to validate")] - hash: String, - #[structopt( - parse(from_os_str), - short, - long, - default_value = "./cncli.db", - help = "sqlite database file" - )] - db: PathBuf, - }, - Sync { - #[structopt( - parse(from_os_str), - short, - long, - default_value = "./cncli.db", - help = "sqlite database file" - )] - db: PathBuf, - #[structopt(short, long, help = "cardano-node hostname to connect to")] - host: String, - #[structopt(short, long, default_value = "3001", help = "cardano-node port")] - port: u16, - #[structopt(long, default_value = "764824073", help = "network magic.")] - network_magic: u64, - #[structopt(long, help = "Exit at 100% sync'd.")] - no_service: bool, - #[structopt( - short, - long, - default_value = "1a3be38bcbb7911969283716ad7aa550250226b76a61fc51cc9a9a35d9276d81", - help = "shelley genesis hash value" - )] - shelley_genesis_hash: String, - }, - Leaderlog { - #[structopt( - parse(from_os_str), - short, - long, - default_value = "./cncli.db", - help = "sqlite database file" - )] - db: PathBuf, - #[structopt(parse(from_os_str), long, help = "byron genesis json file")] - byron_genesis: PathBuf, - #[structopt(parse(from_os_str), long, help = "shelley genesis json file")] - shelley_genesis: PathBuf, - #[structopt(long, help = "pool active stake snapshot value in lovelace")] - pool_stake: u64, - #[structopt(long, help = "total active stake snapshot value in lovelace")] - active_stake: u64, - #[structopt(long = "d", default_value = "0", help = "decentralization parameter")] - d: f64, - #[structopt(long, help = "hex string of the extra entropy value")] - extra_entropy: Option, - #[structopt( - long, - default_value = "current", - help = "Which ledger data to use. prev - previous epoch, current - current epoch, next - future epoch" - )] - ledger_set: LedgerSet, - #[structopt(long, help = "lower-case hex pool id")] - pool_id: String, - #[structopt(parse(from_os_str), long, help = "pool's vrf.skey file")] - pool_vrf_skey: PathBuf, - #[structopt( - long = "tz", - default_value = "America/Los_Angeles", - help = "TimeZone string from the IANA database - https://en.wikipedia.org/wiki/List_of_tz_database_time_zones" - )] - timezone: String, - #[structopt( - short, - long, - default_value = "praos", - help = "Consensus algorithm - Alonzo and earlier uses tpraos, Babbage uses praos, Conway uses cpraos" - )] - consensus: String, - #[structopt( - long, - env = "SHELLEY_TRANS_EPOCH", - default_value = "-1", - help = "Epoch number where we transition from Byron to Shelley. -1 means guess based on genesis files" - )] - shelley_transition_epoch: i64, - #[structopt( - long, - help = "Provide a nonce value in lower-case hex instead of calculating from the db" - )] - nonce: Option, - #[structopt( - long, - help = "Provide a specific epoch number to calculate for and ignore --ledger-set option" - )] - epoch: Option, - }, - Sendtip { - #[structopt( - parse(from_os_str), - long, - default_value = "./pooltool.json", - help = "pooltool config file for sending tips" - )] - config: PathBuf, - #[structopt( - parse(from_os_str), - long, - help = "path to cardano-node executable for gathering version info" - )] - cardano_node: PathBuf, - }, - Sendslots { - #[structopt( - parse(from_os_str), - long, - default_value = "./pooltool.json", - help = "pooltool config file for sending slots" - )] - config: PathBuf, - #[structopt( - parse(from_os_str), - short, - long, - default_value = "./cncli.db", - help = "sqlite database file" - )] - db: PathBuf, - #[structopt(parse(from_os_str), long, help = "byron genesis json file")] - byron_genesis: PathBuf, - #[structopt(parse(from_os_str), long, help = "shelley genesis json file")] - shelley_genesis: PathBuf, - #[structopt( - long, - env = "SHELLEY_TRANS_EPOCH", - default_value = "-1", - help = "Epoch number where we transition from Byron to Shelley. -1 means guess based on genesis files" - )] - shelley_transition_epoch: i64, - #[structopt(long, env = "OVERRIDE_TIME", hide_env_values = true, hidden = true)] - override_time: Option, - }, - Status { - #[structopt( - parse(from_os_str), - short, - long, - default_value = "./cncli.db", - help = "sqlite database file" - )] - db: PathBuf, - #[structopt(parse(from_os_str), long, help = "byron genesis json file")] - byron_genesis: PathBuf, - #[structopt(parse(from_os_str), long, help = "shelley genesis json file")] - shelley_genesis: PathBuf, - #[structopt( - long, - env = "SHELLEY_TRANS_EPOCH", - default_value = "-1", - help = "Epoch number where we transition from Byron to Shelley. -1 means guess based on genesis files" - )] - shelley_transition_epoch: i64, - }, - Nonce { - #[structopt( - parse(from_os_str), - short, - long, - default_value = "./cncli.db", - help = "sqlite database file" - )] - db: PathBuf, - #[structopt(parse(from_os_str), long, help = "byron genesis json file")] - byron_genesis: PathBuf, - #[structopt(parse(from_os_str), long, help = "shelley genesis json file")] - shelley_genesis: PathBuf, - #[structopt(long, help = "hex string of the extra entropy value")] - extra_entropy: Option, - #[structopt( - long, - default_value = "current", - help = "Which ledger data to use. prev - previous epoch, current - current epoch, next - future epoch" - )] - ledger_set: LedgerSet, - #[structopt( - long, - env = "SHELLEY_TRANS_EPOCH", - default_value = "-1", - help = "Epoch number where we transition from Byron to Shelley. -1 means guess based on genesis files" - )] - shelley_transition_epoch: i64, - #[structopt( - short, - long, - default_value = "praos", - help = "Consensus algorithm - Alonzo and earlier uses tpraos, Babbage uses praos, Conway uses cpraos" - )] - consensus: String, - #[structopt( - long, - help = "Provide a specific epoch number to calculate for and ignore --ledger-set option" - )] - epoch: Option, - }, - Challenge { - #[structopt(long, help = "validating domain e.g. pooltool.io")] - domain: String, - }, - Sign { - #[structopt(parse(from_os_str), long, help = "pool's vrf.skey file")] - pool_vrf_skey: PathBuf, - #[structopt(long, help = "validating domain e.g. pooltool.io")] - domain: String, - #[structopt(long, help = "nonce value in lower-case hex")] - nonce: String, - }, - Verify { - #[structopt(parse(from_os_str), long, help = "pool's vrf.vkey file")] - pool_vrf_vkey: PathBuf, - #[structopt( - long, - help = "pool's vrf hash in hex retrieved from 'cardano-cli query pool-params...'" - )] - pool_vrf_vkey_hash: String, - #[structopt(long, help = "validating domain e.g. pooltool.io")] - domain: String, - #[structopt(long, help = "nonce value in lower-case hex")] - nonce: String, - #[structopt(long, help = "signature to verify in hex")] - signature: String, - }, - Snapshot { - #[structopt(parse(from_os_str), long, help = "cardano-node socket path")] - socket_path: PathBuf, - #[structopt(long, default_value = "764824073", help = "network magic.")] - network_magic: u64, - #[structopt(long, default_value = "mark", help = "Snapshot name to retrieve (mark, set, go)")] - name: String, - #[structopt( - long, - default_value = "1", - help = "The network identifier, (1 for mainnet, 0 for testnet)" - )] - network_id: u8, - #[structopt( - long, - default_value = "stake", - help = "The prefix for stake addresses, (stake for mainnet, stake_test for testnet)" - )] - stake_prefix: String, - #[structopt(long, default_value = "mark.csv", help = "The name of the output file (CSV format)")] - output_file: String, - }, - } +#[derive(Debug, StructOpt)] +pub enum Command { + Ping { + #[structopt(short, long, help = "cardano-node hostname to connect to")] + host: String, + #[structopt(short, long, default_value = "3001", help = "cardano-node port")] + port: u16, + #[structopt(long, default_value = "764824073", help = "network magic.")] + network_magic: u64, + #[structopt(short, long, default_value = "2", help = "connect timeout in seconds")] + timeout_seconds: u64, + }, + Validate { + #[structopt(long, help = "full or partial block hash to validate")] + hash: String, + #[structopt( + parse(from_os_str), + short, + long, + default_value = "./cncli.db", + help = "sqlite database file" + )] + db: PathBuf, + }, + Sync { + #[structopt( + parse(from_os_str), + short, + long, + default_value = "./cncli.db", + help = "sqlite database file" + )] + db: PathBuf, + #[structopt(short, long, help = "cardano-node hostname to connect to")] + host: String, + #[structopt(short, long, default_value = "3001", help = "cardano-node port")] + port: u16, + #[structopt(long, default_value = "764824073", help = "network magic.")] + network_magic: u64, + #[structopt(long, help = "Exit at 100% sync'd.")] + no_service: bool, + #[structopt( + short, + long, + default_value = "1a3be38bcbb7911969283716ad7aa550250226b76a61fc51cc9a9a35d9276d81", + help = "shelley genesis hash value" + )] + shelley_genesis_hash: String, + #[structopt(long, help="Use the redb database instead of sqlite")] + use_redb: bool, + }, + Leaderlog { + #[structopt( + parse(from_os_str), + short, + long, + default_value = "./cncli.db", + help = "sqlite database file" + )] + db: PathBuf, + #[structopt(parse(from_os_str), long, help = "byron genesis json file")] + byron_genesis: PathBuf, + #[structopt(parse(from_os_str), long, help = "shelley genesis json file")] + shelley_genesis: PathBuf, + #[structopt(long, help = "pool active stake snapshot value in lovelace")] + pool_stake: u64, + #[structopt(long, help = "total active stake snapshot value in lovelace")] + active_stake: u64, + #[structopt(long = "d", default_value = "0", help = "decentralization parameter")] + d: f64, + #[structopt(long, help = "hex string of the extra entropy value")] + extra_entropy: Option, + #[structopt( + long, + default_value = "current", + help = "Which ledger data to use. prev - previous epoch, current - current epoch, next - future epoch" + )] + ledger_set: LedgerSet, + #[structopt(long, help = "lower-case hex pool id")] + pool_id: String, + #[structopt(parse(from_os_str), long, help = "pool's vrf.skey file")] + pool_vrf_skey: PathBuf, + #[structopt( + long = "tz", + default_value = "America/Los_Angeles", + help = "TimeZone string from the IANA database - https://en.wikipedia.org/wiki/List_of_tz_database_time_zones" + )] + timezone: String, + #[structopt( + short, + long, + default_value = "praos", + help = "Consensus algorithm - Alonzo and earlier uses tpraos, Babbage uses praos, Conway uses cpraos" + )] + consensus: String, + #[structopt( + long, + env = "SHELLEY_TRANS_EPOCH", + help = "Epoch number where we transition from Byron to Shelley. Omitted means guess based on genesis files" + )] + shelley_transition_epoch: Option, + #[structopt( + long, + help = "Provide a nonce value in lower-case hex instead of calculating from the db" + )] + nonce: Option, + #[structopt( + long, + help = "Provide a specific epoch number to calculate for and ignore --ledger-set option" + )] + epoch: Option, + }, + Sendtip { + #[structopt( + parse(from_os_str), + long, + default_value = "./pooltool.json", + help = "pooltool config file for sending tips" + )] + config: PathBuf, + #[structopt( + parse(from_os_str), + long, + help = "path to cardano-node executable for gathering version info" + )] + cardano_node: PathBuf, + }, + Sendslots { + #[structopt( + parse(from_os_str), + long, + default_value = "./pooltool.json", + help = "pooltool config file for sending slots" + )] + config: PathBuf, + #[structopt( + parse(from_os_str), + short, + long, + default_value = "./cncli.db", + help = "sqlite database file" + )] + db: PathBuf, + #[structopt(parse(from_os_str), long, help = "byron genesis json file")] + byron_genesis: PathBuf, + #[structopt(parse(from_os_str), long, help = "shelley genesis json file")] + shelley_genesis: PathBuf, + #[structopt( + long, + env = "SHELLEY_TRANS_EPOCH", + help = "Epoch number where we transition from Byron to Shelley. Omitted means guess based on genesis files" + )] + shelley_transition_epoch: Option, + #[structopt(long, env = "OVERRIDE_TIME", hide_env_values = true, hidden = true)] + override_time: Option, + }, + Status { + #[structopt( + parse(from_os_str), + short, + long, + default_value = "./cncli.db", + help = "sqlite or redb database file" + )] + db: PathBuf, + #[structopt(parse(from_os_str), long, help = "byron genesis json file")] + byron_genesis: PathBuf, + #[structopt(parse(from_os_str), long, help = "shelley genesis json file")] + shelley_genesis: PathBuf, + #[structopt( + long, + env = "SHELLEY_TRANS_EPOCH", + help = "Epoch number where we transition from Byron to Shelley. Omitted means guess based on genesis files" + )] + shelley_transition_epoch: Option, + }, + Nonce { + #[structopt( + parse(from_os_str), + short, + long, + default_value = "./cncli.db", + help = "sqlite or redb database file" + )] + db: PathBuf, + #[structopt(parse(from_os_str), long, help = "byron genesis json file")] + byron_genesis: PathBuf, + #[structopt(parse(from_os_str), long, help = "shelley genesis json file")] + shelley_genesis: PathBuf, + #[structopt(long, help = "hex string of the extra entropy value")] + extra_entropy: Option, + #[structopt( + long, + default_value = "current", + help = "Which ledger data to use. prev - previous epoch, current - current epoch, next - future epoch" + )] + ledger_set: LedgerSet, + #[structopt( + long, + env = "SHELLEY_TRANS_EPOCH", + help = "Epoch number where we transition from Byron to Shelley. Omitted means guess based on genesis files" + )] + shelley_transition_epoch: Option, + #[structopt( + short, + long, + default_value = "praos", + help = "Consensus algorithm - Alonzo and earlier uses tpraos, Babbage uses praos, Conway uses cpraos" + )] + consensus: String, + #[structopt( + long, + help = "Provide a specific epoch number to calculate for and ignore --ledger-set option" + )] + epoch: Option, + }, + Challenge { + #[structopt(long, help = "validating domain e.g. pooltool.io")] + domain: String, + }, + Sign { + #[structopt(parse(from_os_str), long, help = "pool's vrf.skey file")] + pool_vrf_skey: PathBuf, + #[structopt(long, help = "validating domain e.g. pooltool.io")] + domain: String, + #[structopt(long, help = "nonce value in lower-case hex")] + nonce: String, + }, + Verify { + #[structopt(parse(from_os_str), long, help = "pool's vrf.vkey file")] + pool_vrf_vkey: PathBuf, + #[structopt( + long, + help = "pool's vrf hash in hex retrieved from 'cardano-cli query pool-params...'" + )] + pool_vrf_vkey_hash: String, + #[structopt(long, help = "validating domain e.g. pooltool.io")] + domain: String, + #[structopt(long, help = "nonce value in lower-case hex")] + nonce: String, + #[structopt(long, help = "signature to verify in hex")] + signature: String, + }, + Snapshot { + #[structopt(parse(from_os_str), long, help = "cardano-node socket path")] + socket_path: PathBuf, + #[structopt(long, default_value = "764824073", help = "network magic.")] + network_magic: u64, + #[structopt(long, default_value = "mark", help = "Snapshot name to retrieve (mark, set, go)")] + name: String, + #[structopt( + long, + default_value = "1", + help = "The network identifier, (1 for mainnet, 0 for testnet)" + )] + network_id: u8, + #[structopt( + long, + default_value = "stake", + help = "The prefix for stake addresses, (stake for mainnet, stake_test for testnet)" + )] + stake_prefix: String, + #[structopt(long, default_value = "mark.csv", help = "The name of the output file (CSV format)")] + output_file: String, + }, +} - pub async fn start(cmd: Command) { - match cmd { - Command::Ping { - ref host, - ref port, - ref network_magic, - ref timeout_seconds, - } => { - ping::ping(&mut stdout(), host.as_str(), *port, *network_magic, *timeout_seconds).await; - } - Command::Validate { ref db, ref hash } => { - validate::validate_block(db, hash.as_str()); +pub async fn start(cmd: Command) { + match cmd { + Command::Ping { + ref host, + ref port, + ref network_magic, + ref timeout_seconds, + } => { + ping::ping(&mut stdout(), host.as_str(), *port, *network_magic, *timeout_seconds).await; + } + Command::Validate { ref db, ref hash } => { + validate::validate_block(db, hash.as_str()); + } + Command::Sync { + ref db, + ref host, + ref port, + ref network_magic, + ref no_service, + ref shelley_genesis_hash, + ref use_redb, + } => { + sync::sync( + db, + host.as_str(), + *port, + *network_magic, + shelley_genesis_hash.as_str(), + *no_service, + *use_redb, + ) + .await; + } + Command::Leaderlog { + ref db, + ref byron_genesis, + ref shelley_genesis, + ref pool_stake, + ref active_stake, + ref d, + ref extra_entropy, + ref ledger_set, + ref pool_id, + ref pool_vrf_skey, + ref timezone, + ref consensus, + ref shelley_transition_epoch, + ref nonce, + ref epoch, + } => { + if let Err(error) = leaderlog::calculate_leader_logs( + db, + byron_genesis, + shelley_genesis, + pool_stake, + active_stake, + d, + extra_entropy, + ledger_set, + pool_id, + pool_vrf_skey, + timezone, + false, + consensus, + shelley_transition_epoch, + nonce, + epoch, + ) { + handle_error(error); } - Command::Sync { - ref db, - ref host, - ref port, - ref network_magic, - ref no_service, - ref shelley_genesis_hash, - } => { - sync::sync( - db, - host.as_str(), - *port, - *network_magic, - shelley_genesis_hash.as_str(), - *no_service, - ) - .await; + } + Command::Nonce { + ref db, + ref byron_genesis, + ref shelley_genesis, + ref extra_entropy, + ref ledger_set, + ref shelley_transition_epoch, + ref consensus, + ref epoch, + } => { + if let Err(error) = leaderlog::calculate_leader_logs( + db, + byron_genesis, + shelley_genesis, + &0u64, + &0u64, + &0f64, + extra_entropy, + ledger_set, + "nonce", + &PathBuf::new(), + "America/Los_Angeles", + true, + consensus, + shelley_transition_epoch, + &None, + epoch, + ) { + handle_error(error); } - Command::Leaderlog { - ref db, - ref byron_genesis, - ref shelley_genesis, - ref pool_stake, - ref active_stake, - ref d, - ref extra_entropy, - ref ledger_set, - ref pool_id, - ref pool_vrf_skey, - ref timezone, - ref consensus, - ref shelley_transition_epoch, - ref nonce, - ref epoch, - } => { - if let Err(error) = leaderlog::calculate_leader_logs( - db, - byron_genesis, - shelley_genesis, - pool_stake, - active_stake, - d, - extra_entropy, - ledger_set, - pool_id, - pool_vrf_skey, - timezone, - false, - consensus, - shelley_transition_epoch, - nonce, - epoch, - ) { - handle_error(error); - } + } + Command::Sendtip { + ref config, + ref cardano_node, + } => { + if !config.exists() { + handle_error("config not found!"); + return; } - Command::Nonce { - ref db, - ref byron_genesis, - ref shelley_genesis, - ref extra_entropy, - ref ledger_set, - ref shelley_transition_epoch, - ref consensus, - ref epoch, - } => { - if let Err(error) = leaderlog::calculate_leader_logs( - db, - byron_genesis, - shelley_genesis, - &0u64, - &0u64, - &0f64, - extra_entropy, - ledger_set, - "nonce", - &PathBuf::new(), - "America/Los_Angeles", - true, - consensus, - shelley_transition_epoch, - &None, - epoch, - ) { - handle_error(error); - } + if !cardano_node.exists() { + handle_error("cardano-node not found!"); + return; } - Command::Sendtip { - ref config, - ref cardano_node, - } => { - if !config.exists() { - handle_error("config not found!"); - return; - } - if !cardano_node.exists() { - handle_error("cardano-node not found!"); - return; - } - - let pooltool_config: PooltoolConfig = get_pooltool_config(config); - let mut handles: Vec> = vec![]; - for pool in pooltool_config.pools.into_iter() { - let api_key = pooltool_config.api_key.clone(); - let cardano_node_path = cardano_node.clone(); - handles.push(thread::spawn(move || { - tokio::runtime::Runtime::new().unwrap().block_on(sync::sendtip( - pool.name, - pool.pool_id, - pool.host, - pool.port, - api_key, - &cardano_node_path, - )); - })); - } - for handle in handles { - handle.join().unwrap() - } + let pooltool_config: PooltoolConfig = pooltool::get_pooltool_config(config); + let mut handles: Vec> = vec![]; + for pool in pooltool_config.pools.into_iter() { + let api_key = pooltool_config.api_key.clone(); + let cardano_node_path = cardano_node.clone(); + handles.push(thread::spawn(move || { + tokio::runtime::Runtime::new().unwrap().block_on(sync::sendtip( + pool.name, + pool.pool_id, + pool.host, + pool.port, + api_key, + &cardano_node_path, + )); + })); } - Command::Sendslots { - ref config, - ref db, - ref byron_genesis, - ref shelley_genesis, - ref shelley_transition_epoch, - ref override_time, - } => { - if !config.exists() { - handle_error("config not found!"); - return; - } - let pooltool_config: PooltoolConfig = get_pooltool_config(config); - leaderlog::send_slots( - db, - byron_genesis, - shelley_genesis, - pooltool_config, - shelley_transition_epoch, - override_time, - ); - } - Command::Status { - ref db, - ref byron_genesis, - ref shelley_genesis, - ref shelley_transition_epoch, - } => { - leaderlog::status(db, byron_genesis, shelley_genesis, shelley_transition_epoch); - } - Command::Challenge { ref domain } => { - signing::create_challenge(domain); + + for handle in handles { + handle.join().unwrap() } - Command::Sign { - ref pool_vrf_skey, - ref domain, - ref nonce, - } => { - if !pool_vrf_skey.exists() { - handle_error("vrf.skey not found!"); - return; - } - signing::sign_challenge(pool_vrf_skey, domain, nonce); + } + Command::Sendslots { + ref config, + ref db, + ref byron_genesis, + ref shelley_genesis, + ref shelley_transition_epoch, + ref override_time, + } => { + if !config.exists() { + handle_error("config not found!"); + return; } - Command::Verify { - ref pool_vrf_vkey, - ref pool_vrf_vkey_hash, - ref domain, - ref nonce, - ref signature, - } => { - signing::verify_challenge(pool_vrf_vkey, pool_vrf_vkey_hash, domain, nonce, signature); + let pooltool_config: PooltoolConfig = pooltool::get_pooltool_config(config); + leaderlog::send_slots( + db, + byron_genesis, + shelley_genesis, + pooltool_config, + shelley_transition_epoch, + override_time, + ); + } + Command::Status { + ref db, + ref byron_genesis, + ref shelley_genesis, + ref shelley_transition_epoch, + } => { + leaderlog::status(db, byron_genesis, shelley_genesis, shelley_transition_epoch); + } + Command::Challenge { ref domain } => { + sign::create_challenge(domain); + } + Command::Sign { + ref pool_vrf_skey, + ref domain, + ref nonce, + } => { + if !pool_vrf_skey.exists() { + handle_error("vrf.skey not found!"); + return; } - Command::Snapshot { - ref socket_path, - ref network_magic, - ref name, - ref network_id, - ref stake_prefix, - ref output_file, - } => { - if let Err(error) = snapshot::dump( - socket_path, - *network_magic, - name.as_str(), - *network_id, - stake_prefix.as_str(), - output_file.as_str(), - ) - .await - { - handle_error(error); - } + sign::sign_challenge(pool_vrf_skey, domain, nonce); + } + Command::Verify { + ref pool_vrf_vkey, + ref pool_vrf_vkey_hash, + ref domain, + ref nonce, + ref signature, + } => { + sign::verify_challenge(pool_vrf_vkey, pool_vrf_vkey_hash, domain, nonce, signature); + } + Command::Snapshot { + ref socket_path, + ref network_magic, + ref name, + ref network_id, + ref stake_prefix, + ref output_file, + } => { + if let Err(error) = snapshot::dump( + socket_path, + *network_magic, + name.as_str(), + *network_id, + stake_prefix.as_str(), + output_file.as_str(), + ) + .await + { + handle_error(error); } } } - - fn get_pooltool_config(config: &Path) -> PooltoolConfig { - let buf = BufReader::new(File::open(config).unwrap()); - serde_json::from_reader(buf).unwrap() - } - - #[derive(Debug, Deserialize)] - pub struct PooltoolConfig { - api_key: String, - pools: Vec, - } - - #[derive(Debug, Deserialize)] - struct Pool { - name: String, - pool_id: String, - host: String, - port: u16, - } } diff --git a/src/main.rs b/src/main.rs index 9b255e6..e50d41f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -5,7 +5,7 @@ use std::{panic, process}; use structopt::StructOpt; -use cncli::nodeclient::{self, Command}; +use cncli::Command; pub mod built_info { include!(concat!(env!("OUT_DIR"), "/built.rs")); @@ -21,7 +21,9 @@ pub mod built_info { } #[derive(Debug, StructOpt)] -#[structopt(name = "cncli", about = "A community-built cardano-node CLI", version = built_info::version())] +#[structopt( + name = "cncli", about = "A community-built cardano-node CLI", version = built_info::version() +)] struct Cli { #[structopt(subcommand)] cmd: Command, @@ -66,7 +68,7 @@ async fn main() { })); let args = Cli::from_args(); - nodeclient::start(args.cmd).await; + cncli::start(args.cmd).await; } #[cfg(test)] diff --git a/src/nodeclient/blockstore/mod.rs b/src/nodeclient/blockstore/mod.rs new file mode 100644 index 0000000..1617401 --- /dev/null +++ b/src/nodeclient/blockstore/mod.rs @@ -0,0 +1,38 @@ +use thiserror::Error; + +use crate::nodeclient::sync::BlockHeader; + +pub(crate) mod redb; +pub(crate) mod sqlite; + +#[derive(Error, Debug)] +pub enum Error { + #[error("Redb error: {0}")] + Redb(#[from] redb::Error), + + #[error("Sqlite error: {0}")] + Sqlite(#[from] sqlite::Error), + + #[error("rusqlite error: {0}")] + Rusqlite(#[from] rusqlite::Error), + + #[error("Blockstore error: {0}")] + Blockstore(String), +} + +pub(crate) struct Block { + pub(crate) block_number: u64, + pub(crate) slot_number: u64, + pub(crate) hash: String, + pub(crate) prev_hash: String, + pub(crate) pool_id: String, + pub(crate) leader_vrf: String, + pub(crate) orphaned: bool, +} + +pub(crate) trait BlockStore { + fn save_block(&mut self, pending_blocks: &mut Vec, shelley_genesis_hash: &str) -> Result<(), Error>; + fn load_blocks(&mut self) -> Result)>, Error>; + fn find_block_by_hash(&mut self, hash_start: &str) -> Result, Error>; + fn get_tip_slot_number(&mut self) -> Result; +} diff --git a/src/nodeclient/blockstore/redb.rs b/src/nodeclient/blockstore/redb.rs new file mode 100644 index 0000000..cfa9fdd --- /dev/null +++ b/src/nodeclient/blockstore/redb.rs @@ -0,0 +1,346 @@ +use std::io::Read; +use std::path::Path; + +use itertools::Itertools; +use pallas_crypto::hash::{Hash, Hasher}; +use pallas_crypto::nonce::NonceGenerator; +use pallas_crypto::nonce::rolling_nonce::RollingNonceGenerator; +use redb::{Builder, Database, MultimapTableDefinition, ReadableMultimapTable, ReadableTable, RepairSession, TableDefinition, TypeName, Value}; +use serde::{Deserialize, Serialize}; +use thiserror::Error; +use tracing::info; +use uuid::Uuid; + +use crate::nodeclient::blockstore; +use crate::nodeclient::blockstore::{Block, BlockStore}; +use crate::nodeclient::sync::BlockHeader; + +#[derive(Error, Debug)] +pub enum Error { + #[error("IO error: {0}")] + Io(#[from] std::io::Error), + + #[error("Redb error: {0}")] + Redb(#[from] redb::Error), + + #[error("Redb db error: {0}")] + RedbDb(#[from] redb::DatabaseError), + + #[error("Redb commit error: {0}")] + RedbCommit(#[from] redb::CommitError), + + #[error("Redb transaction error: {0}")] + RedbTransaction(#[from] redb::TransactionError), + + #[error("Redb table error: {0}")] + RedbTable(#[from] redb::TableError), + + #[error("Redb storage error: {0}")] + RedbStorage(#[from] redb::StorageError), + + #[error("Nonce error: {0}")] + Nonce(#[from] pallas_crypto::nonce::Error), +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +struct ChainRecord { + block_number: u64, + slot_number: u64, + hash: Vec, + prev_hash: Vec, + pool_id: Vec, + eta_v: Vec, + node_vkey: Vec, + node_vrf_vkey: Vec, + block_vrf_0: Vec, + block_vrf_1: Vec, + eta_vrf_0: Vec, + eta_vrf_1: Vec, + leader_vrf_0: Vec, + leader_vrf_1: Vec, + block_size: u64, + block_body_hash: Vec, + pool_opcert: Vec, + unknown_0: u64, + unknown_1: u64, + unknown_2: Vec, + protocol_major_version: u64, + protocol_minor_version: u64, + orphaned: bool, +} + +impl Value for ChainRecord { + type SelfType<'a> = Self; + type AsBytes<'a> = Vec + where + Self: 'a; + + fn fixed_width() -> Option { + // dynamic sized object. not fixed width + None + } + + fn from_bytes<'a>(data: &'a [u8]) -> Self::SelfType<'a> + where + Self: 'a, + { + bincode::deserialize(data).unwrap() + } + + fn as_bytes<'a, 'b: 'a>(value: &'a Self::SelfType<'b>) -> Self::AsBytes<'a> + where + Self: 'a, + Self: 'b, + { + bincode::serialize(value).unwrap() + } + + fn type_name() -> TypeName { + TypeName::new(stringify!(ChainRecord)) + } +} + +// magic number must be set to the ASCII letters 'redb' followed by 0x1A, 0x0A, 0xA9, 0x0D, 0x0A. +// This sequence is inspired by the PNG magic number. +const MAGIC_NUMBER: &[u8; 9] = b"redb\x1A\x0A\xA9\x0D\x0A"; + +const VERSION_TABLE: TableDefinition<&str, u16> = TableDefinition::new("version"); +const CHAIN_TABLE: TableDefinition = TableDefinition::new("chain"); +const CHAIN_TABLE_SLOT_INDEX: MultimapTableDefinition = MultimapTableDefinition::new("chain_slot_index"); +const CHAIN_TABLE_HASH_INDEX: MultimapTableDefinition<&[u8], u128> = MultimapTableDefinition::new("chain_hash_index"); + + +pub(crate) fn is_redb_database(db_path: &Path) -> Result { + let mut file = std::fs::File::open(db_path).unwrap(); + let mut magic_number = [0u8; 9]; + file.read_exact(&mut magic_number)?; + Ok(&magic_number == MAGIC_NUMBER) +} + +pub struct RedbBlockStore { + db: Database, +} + +impl RedbBlockStore { + const DB_VERSION: u16 = 1; + + pub fn new(db_path: &Path) -> Result { + let db = Builder::new() + .set_repair_callback(Self::repair_callback) + .create(db_path)?; + Self::migrate(&db)?; + Ok(Self { db }) + } + + pub fn repair_callback(session: &mut RepairSession) { + let progress = session.progress(); + info!("Redb Repair progress: {:?}", progress); + } + + fn migrate(db: &Database) -> Result<(), Error> { + let read_tx = db.begin_read()?; + let current_version = match read_tx.open_table(VERSION_TABLE) { + Ok(version_table) => match version_table.get("version")? { + Some(version) => version.value(), + None => 0, + }, + Err(_) => 0, + }; + + if current_version < Self::DB_VERSION { + // Do migration + let write_tx = db.begin_write()?; + { + let mut version_table = write_tx.open_table(VERSION_TABLE)?; + info!("Migrating database from version 0 to 1"); + version_table.insert("version", Self::DB_VERSION)?; + // create the chain table if it doesn't exist + write_tx.open_table(CHAIN_TABLE)?; + write_tx.open_multimap_table(CHAIN_TABLE_SLOT_INDEX)?; + write_tx.open_multimap_table(CHAIN_TABLE_HASH_INDEX)?; + } + write_tx.commit()?; + } + + Ok(()) + } + + fn redb_save_block( + &mut self, + pending_blocks: &mut Vec, + shelley_genesis_hash: &str, + ) -> Result<(), Error> { + let first_pending_block_number = pending_blocks.first().unwrap().block_number; + + let write_tx = self.db.begin_write()?; + { + // get the last block eta_v (nonce) in the db + let mut chain_table = write_tx.open_table(CHAIN_TABLE)?; + let mut chain_table_slot_index = write_tx.open_multimap_table(CHAIN_TABLE_SLOT_INDEX)?; + let mut chain_table_hash_index = write_tx.open_multimap_table(CHAIN_TABLE_HASH_INDEX)?; + let mut chain_iter = chain_table.iter()?; + let mut prev_eta_v: Hash<32> = Hash::from(hex::decode(shelley_genesis_hash).unwrap().as_slice()); + let mut to_update: Vec<(u128, ChainRecord)> = Vec::new(); + + while let Some(chain_record) = chain_iter.next_back() { + let (key, chain_record) = chain_record?; + let chain_record: ChainRecord = chain_record.value(); + if chain_record.orphaned { + continue; + } + if chain_record.block_number >= first_pending_block_number && !chain_record.orphaned { + // set it to orphaned + to_update.push(( + key.value(), + ChainRecord { + orphaned: true, + ..chain_record.clone() + }, + )); + continue; + } + prev_eta_v = Hash::from(chain_record.eta_v.as_slice()); + // sanity check + assert_eq!( + chain_record.block_number, + first_pending_block_number - 1, + "block_number: {}, first_pending_block_number: {}", + chain_record.block_number, + first_pending_block_number + ); + break; + } + for (key, chain_record) in to_update { + chain_table.insert(key, chain_record)?; + } + + // save the pending blocks + for block in pending_blocks.drain(..) { + let key = Uuid::now_v7().as_u128(); + + // blake2b 224 of node_vkey is the pool_id + let pool_id = Hasher::<224>::hash(block.node_vkey.as_slice()); + + // calculate rolling nonce (eta_v) + let mut rolling_nonce_generator = RollingNonceGenerator::new(prev_eta_v); + rolling_nonce_generator.apply_block(&block.eta_vrf_0)?; + let eta_v = rolling_nonce_generator.finalize()?; + + let chain_record = ChainRecord { + block_number: block.block_number, + slot_number: block.slot_number, + hash: block.hash.clone(), + prev_hash: block.prev_hash.clone(), + pool_id: pool_id.to_vec(), + eta_v: eta_v.to_vec(), + node_vkey: block.node_vkey.clone(), + node_vrf_vkey: block.node_vrf_vkey.clone(), + block_vrf_0: block.block_vrf_0.clone(), + block_vrf_1: block.block_vrf_1.clone(), + eta_vrf_0: block.eta_vrf_0.clone(), + eta_vrf_1: block.eta_vrf_1.clone(), + leader_vrf_0: block.leader_vrf_0.clone(), + leader_vrf_1: block.leader_vrf_1.clone(), + block_size: block.block_size, + block_body_hash: block.block_body_hash.clone(), + pool_opcert: block.pool_opcert.clone(), + unknown_0: block.unknown_0, + unknown_1: block.unknown_1, + unknown_2: block.unknown_2.clone(), + protocol_major_version: block.protocol_major_version, + protocol_minor_version: block.protocol_minor_version, + orphaned: false, + }; + chain_table.insert(key, chain_record)?; + chain_table_slot_index.insert(block.slot_number, key)?; + chain_table_hash_index.insert(block.hash.as_slice(), key)?; + + prev_eta_v = eta_v; + } + } + write_tx.commit()?; + + Ok(()) + } + + fn redb_load_blocks(&mut self) -> Result)>, Error> { + let read_tx = self.db.begin_read()?; + // get slot_number and hash from chain table ordering by slot_number descending where orphaned is false + // limit the result to 33 records + let chain_table = read_tx.open_table(CHAIN_TABLE)?; + let mut chain_iter = chain_table.iter()?; + let mut blocks: Vec<(u64, Vec)> = Vec::new(); + while let Some(record) = chain_iter.next_back() { + let (_, chain_record) = record?; + let chain_record: ChainRecord = chain_record.value(); + if chain_record.orphaned { + continue; + } + let slot_number = chain_record.slot_number; + let hash = chain_record.hash.clone(); + blocks.push((slot_number, hash)); + if blocks.len() >= 33 { + break; + } + } + + Ok(blocks) + } + + fn redb_find_block_by_hash(&mut self, hash_start: &str) -> Result, Error> { + let read_tx = self.db.begin_read()?; + let chain_table = read_tx.open_table(CHAIN_TABLE)?; + let mut chain_iter = chain_table.iter()?; + while let Some(record) = chain_iter.next_back() { + let (_, chain_record) = record?; + let chain_record: ChainRecord = chain_record.value(); + if hex::encode(&chain_record.hash).starts_with(hash_start) { + let block = Block { + block_number: chain_record.block_number, + slot_number: chain_record.slot_number, + hash: hex::encode(&chain_record.hash), + prev_hash: hex::encode(&chain_record.prev_hash), + pool_id: hex::encode(&chain_record.pool_id), + leader_vrf: hex::encode(&chain_record.leader_vrf_0), + orphaned: chain_record.orphaned, + }; + return Ok(Some(block)); + } + } + + Ok(None) + } + + fn redb_get_tip_slot_number(&mut self) -> Result { + let read_tx = self.db.begin_read()?; + let chain_table_slot_index = read_tx.open_multimap_table(CHAIN_TABLE_SLOT_INDEX)?; + let mut iter = chain_table_slot_index.iter()?; + while let Some(result) = iter.next_back() { + let (slot_number, _) = result?; + return Ok(slot_number.value()); + } + Ok(0) + } +} + +impl BlockStore for RedbBlockStore { + fn save_block( + &mut self, + pending_blocks: &mut Vec, + shelley_genesis_hash: &str, + ) -> Result<(), blockstore::Error> { + Ok(self.redb_save_block(pending_blocks, shelley_genesis_hash)?) + } + + fn load_blocks(&mut self) -> Result)>, blockstore::Error> { + Ok(self.redb_load_blocks()?) + } + + fn find_block_by_hash(&mut self, hash_start: &str) -> Result, blockstore::Error> { + Ok(self.redb_find_block_by_hash(hash_start)?) + } + + fn get_tip_slot_number(&mut self) -> Result { + Ok(self.redb_get_tip_slot_number()?) + } +} diff --git a/src/nodeclient/sqlite.rs b/src/nodeclient/blockstore/sqlite.rs similarity index 85% rename from src/nodeclient/sqlite.rs rename to src/nodeclient/blockstore/sqlite.rs index af0418d..6c3d505 100644 --- a/src/nodeclient/sqlite.rs +++ b/src/nodeclient/blockstore/sqlite.rs @@ -1,4 +1,3 @@ -use std::io; use std::path::Path; use blake2b_simd::Params; @@ -9,6 +8,8 @@ use pallas_crypto::nonce::rolling_nonce::RollingNonceGenerator; use rusqlite::{Connection, named_params}; use thiserror::Error; +use crate::nodeclient::blockstore; +use crate::nodeclient::blockstore::{Block, BlockStore}; use crate::nodeclient::sync::BlockHeader; #[derive(Error, Debug)] @@ -20,11 +21,6 @@ pub enum Error { Nonce(#[from] pallas_crypto::nonce::Error), } -pub trait BlockStore { - fn save_block(&mut self, pending_blocks: &mut Vec, shelley_genesis_hash: &str) -> io::Result<()>; - fn load_blocks(&mut self) -> Option)>>; -} - pub struct SqLiteBlockStore { pub db: Connection, } @@ -202,7 +198,9 @@ impl SqLiteBlockStore { shelley_genesis_hash.to_string() } }, - ).unwrap().as_slice() + ) + .unwrap() + .as_slice(), ); let tx = db.transaction()?; @@ -277,13 +275,15 @@ impl SqLiteBlockStore { shelley_genesis_hash.to_string() } }, - ).unwrap().as_slice() + ) + .unwrap() + .as_slice(), ); } // calculate rolling nonce (eta_v) let mut rolling_nonce_generator = RollingNonceGenerator::new(prev_eta_v); rolling_nonce_generator.apply_block(&block.eta_vrf_0)?; - prev_eta_v = rolling_nonce_generator.finalize()?; + let eta_v = rolling_nonce_generator.finalize()?; // blake2b 224 of node_vkey is the pool_id let pool_id = Params::new() @@ -300,7 +300,7 @@ impl SqLiteBlockStore { ":hash" : hex::encode(block.hash), ":prev_hash" : hex::encode(block.prev_hash), ":pool_id" : hex::encode(pool_id), - ":eta_v" : hex::encode(prev_eta_v), + ":eta_v" : hex::encode(eta_v), ":node_vkey" : hex::encode(block.node_vkey), ":node_vrf_vkey" : hex::encode(block.node_vrf_vkey), ":block_vrf_0": hex::encode(block.block_vrf_0), @@ -318,36 +318,76 @@ impl SqLiteBlockStore { ":protocol_major_version" : block.protocol_major_version, ":protocol_minor_version" : block.protocol_minor_version, })?; + + prev_eta_v = eta_v; } } tx.commit()?; Ok(()) } -} - -impl BlockStore for SqLiteBlockStore { - fn save_block(&mut self, pending_blocks: &mut Vec, shelley_genesis_hash: &str) -> io::Result<()> { - match self.sql_save_block(pending_blocks, shelley_genesis_hash) { - Ok(_) => Ok(()), - Err(error) => Err(io::Error::new(io::ErrorKind::Other, format!("Database error!: {:?}", error))), - } - } - fn load_blocks(&mut self) -> Option)>> { + fn sql_load_blocks(&mut self) -> Result)>, Error> { let db = &self.db; let mut stmt = db .prepare("SELECT slot_number, hash FROM (SELECT slot_number, hash, orphaned FROM chain ORDER BY slot_number DESC LIMIT 100) WHERE orphaned = 0 ORDER BY slot_number DESC LIMIT 33;") .unwrap(); let blocks = stmt .query_map([], |row| { - let slot_result: Result = row.get(0); + let slot_result: Result = row.get(0); let hash_result: Result = row.get(1); let slot = slot_result?; let hash = hash_result?; Ok((slot, hex::decode(hash).unwrap())) - }) - .ok()?; - Some(blocks.map(|item| item.unwrap()).collect()) + })?; + Ok(blocks.map(|item| item.unwrap()).collect()) + } + + fn sql_find_block_by_hash(&mut self, hash_start: &str) -> Result, Error> { + let db = &self.db; + let like = format!("{hash_start}%"); + Ok(db.query_row( + "SELECT block_number,slot_number,hash,prev_hash,pool_id,leader_vrf_0,orphaned FROM chain WHERE hash LIKE ? ORDER BY orphaned ASC", + [&like], + |row| { + Ok(Some(Block { + block_number: row.get(0)?, + slot_number: row.get(1)?, + hash: row.get(2)?, + prev_hash: row.get(3)?, + pool_id: row.get(4)?, + leader_vrf: row.get(5)?, + orphaned: row.get(6)?, + })) + }, + )?) + } + + fn sql_get_tip_slot_number(&mut self) -> Result { + let db = &self.db; + let tip_slot_number: u64 = db.query_row("SELECT MAX(slot_number) FROM chain", [], |row| row.get(0))?; + Ok(tip_slot_number) + } +} + +impl BlockStore for SqLiteBlockStore { + fn save_block( + &mut self, + pending_blocks: &mut Vec, + shelley_genesis_hash: &str, + ) -> Result<(), blockstore::Error> { + Ok(self.sql_save_block(pending_blocks, shelley_genesis_hash)?) + } + + fn load_blocks(&mut self) -> Result)>, blockstore::Error> { + Ok(self.sql_load_blocks()?) + } + + fn find_block_by_hash(&mut self, hash_start: &str) -> Result, blockstore::Error> { + Ok(self.sql_find_block_by_hash(hash_start)?) + } + + fn get_tip_slot_number(&mut self) -> Result { + Ok(self.sql_get_tip_slot_number()?) } } diff --git a/src/nodeclient/math.rs b/src/nodeclient/leaderlog/math.rs similarity index 100% rename from src/nodeclient/math.rs rename to src/nodeclient/leaderlog/math.rs diff --git a/src/nodeclient/leaderlog.rs b/src/nodeclient/leaderlog/mod.rs similarity index 90% rename from src/nodeclient/leaderlog.rs rename to src/nodeclient/leaderlog/mod.rs index 30fe5ba..ad0f864 100644 --- a/src/nodeclient/leaderlog.rs +++ b/src/nodeclient/leaderlog/mod.rs @@ -1,6 +1,6 @@ use std::fmt::Display; use std::fs::File; -use std::io::{stdout, BufReader}; +use std::io::{BufReader, stdout}; use std::path::Path; use std::str::FromStr; @@ -14,20 +14,25 @@ use log::{debug, error, info, trace}; use num_bigint::{BigInt, Sign}; use num_rational::BigRational; use rayon::prelude::*; -use rusqlite::{named_params, Connection, OptionalExtension}; +// use rusqlite::{named_params, Connection, OptionalExtension}; use serde::{Deserialize, Serialize}; use serde_aux::prelude::deserialize_number_from_string; use thiserror::Error; +use crate::{LedgerSet, PooltoolConfig}; +use crate::nodeclient::blockstore; +use crate::nodeclient::blockstore::BlockStore; +use crate::nodeclient::blockstore::redb::{is_redb_database, RedbBlockStore}; +use crate::nodeclient::blockstore::sqlite::SqLiteBlockStore; use crate::nodeclient::leaderlog::deserialize::cbor_hex; use crate::nodeclient::leaderlog::ledgerstate::calculate_ledger_state_sigma_d_and_extra_entropy; use crate::nodeclient::leaderlog::libsodium::{sodium_crypto_vrf_proof_to_hash, sodium_crypto_vrf_prove}; -use crate::nodeclient::math::{ln, normalize, round, taylor_exp_cmp, TaylorCmp}; -use crate::nodeclient::{LedgerSet, PooltoolConfig}; +use crate::nodeclient::leaderlog::math::{ln, normalize, round, taylor_exp_cmp, TaylorCmp}; mod deserialize; -pub mod ledgerstate; +mod ledgerstate; pub(crate) mod libsodium; +mod math; #[derive(Error, Debug)] pub enum Error { @@ -37,8 +42,8 @@ pub enum Error { #[error("JSON error: {0}")] Json(#[from] serde_json::Error), - #[error("SQLite error: {0}")] - Sqlite(#[from] rusqlite::Error), + #[error("Rusqlite error: {0}")] + Rusqlite(#[from] rusqlite::Error), #[error("Libsodium error: {0}")] Libsodium(#[from] libsodium::Error), @@ -51,6 +56,15 @@ pub enum Error { #[error("Leaderlog error: {0}")] Leaderlog(String), + + #[error("Blockstore error: {0}")] + Blockstore(#[from] blockstore::Error), + + #[error("Redb error: {0}")] + Redb(#[from] blockstore::redb::Error), + + #[error("Sqlite error: {0}")] + Sqlite(#[from] blockstore::sqlite::Error), } #[derive(Debug, Serialize)] @@ -63,7 +77,7 @@ struct LeaderLogError { #[derive(Debug, Deserialize)] #[serde(rename_all = "camelCase")] struct ByronGenesis { - start_time: i64, + start_time: u64, protocol_consts: ProtocolConsts, block_version_data: BlockVersionData, } @@ -71,14 +85,14 @@ struct ByronGenesis { #[derive(Debug, Deserialize)] #[serde(rename_all = "camelCase")] struct ProtocolConsts { - k: i64, + k: u64, } #[derive(Debug, Deserialize)] #[serde(rename_all = "camelCase")] struct BlockVersionData { #[serde(deserialize_with = "deserialize_number_from_string")] - slot_duration: i64, + slot_duration: u64, } #[derive(Debug, Deserialize)] @@ -86,8 +100,8 @@ struct BlockVersionData { struct ShelleyGenesis { active_slots_coeff: f64, network_magic: u32, - slot_length: i64, - epoch_length: i64, + slot_length: u64, + epoch_length: u64, } #[derive(Debug, Deserialize)] @@ -103,10 +117,10 @@ pub(crate) struct VrfKey { #[serde(rename_all = "camelCase")] struct LeaderLog { status: String, - epoch: i64, + epoch: u64, epoch_nonce: String, consensus: String, - epoch_slots: i64, + epoch_slots: u64, epoch_slots_ideal: f64, max_performance: f64, pool_id: String, @@ -121,9 +135,9 @@ struct LeaderLog { #[derive(Debug, Serialize)] #[serde(rename_all = "camelCase")] struct Slot { - no: i64, - slot: i64, - slot_in_epoch: i64, + no: u64, + slot: u64, + slot_in_epoch: u64, at: String, } @@ -156,10 +170,6 @@ pub(crate) fn read_vrf_key(vrf_key_path: &Path) -> Result { Ok(serde_json::from_reader(buf)?) } -fn get_tip_slot_number(db: &Connection) -> Result { - db.query_row("SELECT MAX(slot_number) FROM chain", [], |row| row.get(0)) -} - fn get_eta_v_before_slot(db: &Connection, slot_number: i64) -> Result { db.query_row( "SELECT eta_v FROM chain WHERE orphaned = 0 AND slot_number < ?1 ORDER BY slot_number DESC LIMIT 1", @@ -196,10 +206,10 @@ fn get_prev_slots(db: &Connection, epoch: i64, pool_id: &str) -> Result i64 { +fn guess_shelley_transition_epoch(network_magic: u32) -> u64 { match network_magic { 764824073 => { // mainnet @@ -232,16 +242,13 @@ fn guess_shelley_transition_epoch(network_magic: u32) -> i64 { } } +/// Calculate the first slot of the epoch and the epoch number for the given slot fn get_first_slot_of_epoch( byron: &ByronGenesis, shelley: &ShelleyGenesis, - current_slot: i64, - shelley_trans_epoch: i64, -) -> (i64, i64) { - let shelley_transition_epoch = match shelley_trans_epoch { - -1 => guess_shelley_transition_epoch(shelley.network_magic), - _ => shelley_trans_epoch, - }; + current_slot: u64, + shelley_transition_epoch: u64, +) -> (u64, u64) { let byron_epoch_length = 10 * byron.protocol_consts.k; let byron_slots = byron_epoch_length * shelley_transition_epoch; let shelley_slots = current_slot - byron_slots; @@ -255,14 +262,10 @@ fn get_first_slot_of_epoch( fn slot_to_naivedatetime( byron: &ByronGenesis, shelley: &ShelleyGenesis, - slot: i64, - shelley_trans_epoch: i64, + slot: u64, + shelley_transition_epoch: u64, ) -> NaiveDateTime { - let shelley_transition_epoch = match shelley_trans_epoch { - -1 => guess_shelley_transition_epoch(shelley.network_magic), - _ => shelley_trans_epoch, - }; - let network_start_time = DateTime::from_timestamp(byron.start_time, 0).unwrap().naive_utc(); + let network_start_time = DateTime::from_timestamp(byron.start_time as i64, 0).unwrap().naive_utc(); let byron_epoch_length = 10 * byron.protocol_consts.k; let byron_slots = byron_epoch_length * shelley_transition_epoch; let shelley_slots = slot - byron_slots; @@ -270,17 +273,17 @@ fn slot_to_naivedatetime( let byron_secs = (byron.block_version_data.slot_duration * byron_slots) / 1000; let shelley_secs = shelley_slots * shelley.slot_length; - network_start_time + TimeDelta::try_seconds(byron_secs).unwrap() + TimeDelta::try_seconds(shelley_secs).unwrap() + network_start_time + TimeDelta::try_seconds(byron_secs as i64).unwrap() + TimeDelta::try_seconds(shelley_secs as i64).unwrap() } fn slot_to_timestamp( byron: &ByronGenesis, shelley: &ShelleyGenesis, - slot: i64, + slot: u64, tz: &Tz, - shelley_trans_epoch: i64, + shelley_transition_epoch: u64, ) -> String { - let slot_time = slot_to_naivedatetime(byron, shelley, slot, shelley_trans_epoch); + let slot_time = slot_to_naivedatetime(byron, shelley, slot, shelley_transition_epoch); tz.from_utc_datetime(&slot_time).to_rfc3339() } @@ -447,12 +450,7 @@ fn is_slot_leader_tpraos( } } -fn get_current_slot(byron: &ByronGenesis, shelley: &ShelleyGenesis, shelley_trans_epoch: &i64) -> Result { - let shelley_transition_epoch = match shelley_trans_epoch { - -1 => guess_shelley_transition_epoch(shelley.network_magic), - _ => *shelley_trans_epoch, - }; - +fn get_current_slot(byron: &ByronGenesis, shelley: &ShelleyGenesis, shelley_transition_epoch: u64) -> Result { // read byron genesis values let byron_slot_length = byron.block_version_data.slot_duration; let byron_k = byron.protocol_consts.k; @@ -464,7 +462,7 @@ fn get_current_slot(byron: &ByronGenesis, shelley: &ShelleyGenesis, shelley_tran // read shelley genesis values let slot_length = shelley.slot_length; - let current_time_sec = Utc::now().timestamp(); + let current_time_sec = Utc::now().timestamp() as u64; // Calculate current slot let byron_slots = shelley_transition_epoch * byron_epoch_length; @@ -472,12 +470,7 @@ fn get_current_slot(byron: &ByronGenesis, shelley: &ShelleyGenesis, shelley_tran Ok(byron_slots + shelley_slots) } -fn get_current_epoch(byron: &ByronGenesis, shelley: &ShelleyGenesis, shelley_trans_epoch: &i64) -> i64 { - let shelley_transition_epoch = match shelley_trans_epoch { - -1 => guess_shelley_transition_epoch(shelley.network_magic), - _ => *shelley_trans_epoch, - }; - +fn get_current_epoch(byron: &ByronGenesis, shelley: &ShelleyGenesis, shelley_transition_epoch: u64) -> u64 { // read byron genesis values let byron_slot_length = byron.block_version_data.slot_duration; let byron_k = byron.protocol_consts.k; @@ -491,7 +484,7 @@ fn get_current_epoch(byron: &ByronGenesis, shelley: &ShelleyGenesis, shelley_tra let slot_length = shelley.slot_length; let epoch_length = shelley.epoch_length; - let current_time_sec = Utc::now().timestamp(); + let current_time_sec = Utc::now().timestamp() as u64; shelley_transition_epoch + ((current_time_sec - byron_end_time_sec) / slot_length / epoch_length) } @@ -511,9 +504,9 @@ pub(crate) fn calculate_leader_logs( timezone: &str, is_just_nonce: bool, consensus: &str, - shelley_transition_epoch: &i64, + shelley_transition_epoch: &Option, nonce: &Option, - epoch: &Option, + epoch: &Option, ) -> Result<(), Error> { let tz: Tz = timezone.parse::().unwrap(); @@ -549,7 +542,14 @@ pub(crate) fn calculate_leader_logs( return Err(Error::Leaderlog(format!("Invalid Consensus: --consensus {consensus}"))); } - let db = Connection::open(db_path)?; + // check if db_path is a redb database based on magic number + let use_redb = is_redb_database(db_path)?; + + let mut block_store: Box = if use_redb { + Box::new(RedbBlockStore::new(db_path)?) + } else { + Box::new(SqLiteBlockStore::new(db_path)?) + }; let byron = read_byron_genesis(byron_genesis)?; debug!("{:?}", byron); @@ -557,6 +557,11 @@ pub(crate) fn calculate_leader_logs( let shelley = read_shelley_genesis(shelley_genesis)?; debug!("{:?}", shelley); + let shelley_transition_epoch = match *shelley_transition_epoch { + None => guess_shelley_transition_epoch(shelley.network_magic), + Some(value) => value, + }; + let ledger_info = calculate_ledger_state_sigma_d_and_extra_entropy(pool_stake, active_stake, d, extra_entropy)?; let tip_slot_number = match nonce { @@ -567,7 +572,7 @@ pub(crate) fn calculate_leader_logs( now_slot_number } None => { - let tip_slot_number = get_tip_slot_number(&db)?; + let tip_slot_number = block_store.get_tip_slot_number()?; debug!("tip_slot_number: {}", tip_slot_number); tip_slot_number } @@ -577,7 +582,7 @@ pub(crate) fn calculate_leader_logs( let epoch_offset = match epoch { Some(epoch) => { - if *epoch > current_epoch || *epoch <= *shelley_transition_epoch { + if *epoch > current_epoch || *epoch <= shelley_transition_epoch { return Err(Error::Leaderlog(format!("Invalid Epoch: --epoch {epoch}, current_epoch: {current_epoch}, shelley_transition_epoch: {shelley_transition_epoch}"))); } current_epoch - *epoch @@ -589,18 +594,18 @@ pub(crate) fn calculate_leader_logs( // pretend we're on a different slot number if we want to calculate past or future epochs. let additional_slots: i64 = match epoch_offset { 0 => match ledger_set { - LedgerSet::Mark => shelley.epoch_length, + LedgerSet::Mark => shelley.epoch_length as i64, LedgerSet::Set => 0, - LedgerSet::Go => -shelley.epoch_length, + LedgerSet::Go => -(shelley.epoch_length as i64), }, - _ => -shelley.epoch_length * epoch_offset, + _ => -((shelley.epoch_length * epoch_offset) as i64), }; let (epoch, first_slot_of_epoch) = get_first_slot_of_epoch( &byron, &shelley, - tip_slot_number + additional_slots, - *shelley_transition_epoch, + (tip_slot_number as i64 + additional_slots) as u64, + shelley_transition_epoch, ); debug!("epoch: {}", epoch); @@ -618,7 +623,7 @@ pub(crate) fn calculate_leader_logs( }, None => { // Make sure we're fully sync'd - let tip_time = slot_to_naivedatetime(&byron, &shelley, tip_slot_number, *shelley_transition_epoch) + let tip_time = slot_to_naivedatetime(&byron, &shelley, tip_slot_number, shelley_transition_epoch) .and_utc() .timestamp(); let system_time = Utc::now().timestamp(); @@ -632,18 +637,18 @@ pub(crate) fn calculate_leader_logs( debug!("first_slot_of_epoch: {}", first_slot_of_epoch); debug!("first_slot_of_prev_epoch: {}", first_slot_of_prev_epoch); let stability_window_multiplier = match consensus { - "cpraos" => 4i64, - _ => 3i64, + "cpraos" => 4u64, + _ => 3u64, }; - let stability_window: i64 = ((stability_window_multiplier * byron.protocol_consts.k) as f64 + let stability_window = ((stability_window_multiplier * byron.protocol_consts.k) as f64 / shelley.active_slots_coeff) - .ceil() as i64; + .ceil() as u64; let stability_window_start = first_slot_of_epoch - stability_window; debug!("stability_window: {}", stability_window); debug!("stability_window_start: {}", stability_window_start); let stability_window_start_plus_1_min = stability_window_start + 60; - let tip_slot_number = get_tip_slot_number(&db)?; + let tip_slot_number = block_store.get_tip_slot_number()?; if tip_slot_number < stability_window_start_plus_1_min { return Err(Error::Leaderlog(format!( "Not enough blocks sync'd to calculate! Try again later after slot {stability_window_start_plus_1_min} is sync'd." @@ -777,7 +782,7 @@ pub(crate) fn calculate_leader_logs( no, slot: *slot, slot_in_epoch: slot - first_slot_of_epoch, - at: slot_to_timestamp(&byron, &shelley, *slot, &tz, *shelley_transition_epoch), + at: slot_to_timestamp(&byron, &shelley, *slot, &tz, shelley_transition_epoch), }; debug!("Found assigned slot: {:?}", &slot); @@ -808,14 +813,14 @@ pub(crate) fn calculate_leader_logs( .as_bytes(), ); - db.execute("INSERT INTO slots (epoch,pool_id,slot_qty,slots,hash) VALUES (:epoch,:pool_id,:slot_qty,:slots,:hash) ON CONFLICT (epoch,pool_id) DO UPDATE SET slot_qty=excluded.slot_qty, slots=excluded.slots, hash=excluded.hash", - named_params! { + db.execute("INSERT INTO slots (epoch,pool_id,slot_qty,slots,hash) VALUES (:epoch,:pool_id,:slot_qty,:slots,:hash) ON CONFLICT (epoch,pool_id) DO UPDATE SET slot_qty=excluded.slot_qty, slots=excluded.slots, hash=excluded.hash", + named_params! { ":epoch" : epoch, ":pool_id" : pool_id, ":slot_qty" : assigned_slots.len() as i64, ":slots" : slots, ":hash" : hash - } + }, )?; println!("{}", serde_json::to_string_pretty(&leader_log)?); @@ -825,7 +830,7 @@ pub(crate) fn calculate_leader_logs( Ok(()) } -pub(crate) fn status(db_path: &Path, byron_genesis: &Path, shelley_genesis: &Path, shelley_trans_epoch: &i64) { +pub(crate) fn status(db_path: &Path, byron_genesis: &Path, shelley_genesis: &Path, shelley_trans_epoch: &Option) { if !db_path.exists() { handle_error("database not found!"); return; @@ -871,7 +876,7 @@ pub(crate) fn send_slots( byron_genesis: &Path, shelley_genesis: &Path, pooltool_config: PooltoolConfig, - shelley_trans_epoch: &i64, + shelley_trans_epoch: &Option, override_time: &Option, ) { if !db_path.exists() { @@ -914,7 +919,7 @@ pub(crate) fn send_slots( override_time: override_time.clone(), prev_slots, }) - .unwrap(); + .unwrap(); info!("Sending: {}", &request); match reqwest::blocking::Client::builder().build() { Ok(client) => { @@ -986,5 +991,5 @@ pub fn handle_error(error_message: T) { error_message: format!("{error_message}"), }, ) - .unwrap(); + .unwrap(); } diff --git a/src/nodeclient/mod.rs b/src/nodeclient/mod.rs new file mode 100644 index 0000000..c02e238 --- /dev/null +++ b/src/nodeclient/mod.rs @@ -0,0 +1,7 @@ +pub(crate) mod blockstore; +pub(crate) mod leaderlog; +pub(crate) mod ping; +pub(crate) mod sign; +pub(crate) mod snapshot; +pub(crate) mod sync; +pub(crate) mod validate; diff --git a/src/nodeclient/ping.rs b/src/nodeclient/ping/mod.rs similarity index 100% rename from src/nodeclient/ping.rs rename to src/nodeclient/ping/mod.rs diff --git a/src/nodeclient/signing.rs b/src/nodeclient/sign/mod.rs similarity index 100% rename from src/nodeclient/signing.rs rename to src/nodeclient/sign/mod.rs diff --git a/src/nodeclient/snapshot.rs b/src/nodeclient/snapshot/mod.rs similarity index 91% rename from src/nodeclient/snapshot.rs rename to src/nodeclient/snapshot/mod.rs index a57e85b..3b8a70c 100644 --- a/src/nodeclient/snapshot.rs +++ b/src/nodeclient/snapshot/mod.rs @@ -22,23 +22,20 @@ pub enum Error { #[error("Unexpected array length: expected {expected}, got {actual}")] UnexpectedArrayLength { expected: u64, actual: u64 }, - #[error("Unexpected map length: expected {expected}, got {actual}")] - UnexpectedMapLength { expected: u64, actual: u64 }, - #[error("Unexpected Cbor Type: {value:?}")] UnexpectedCborType { value: Type }, #[error(transparent)] - Bech32Error(#[from] bech32::primitives::hrp::Error), + Bech32(#[from] bech32::primitives::hrp::Error), #[error(transparent)] - Bech32EncodingError(#[from] bech32::EncodeError), + Bech32Encoding(#[from] bech32::EncodeError), #[error(transparent)] - IoError(#[from] std::io::Error), + Io(#[from] std::io::Error), #[error("Snapshot error: {0}")] - SnapshotError(String), + Snapshot(String), } #[derive(Debug)] @@ -63,7 +60,7 @@ pub(crate) async fn dump( "mark" => Snapshot::Mark, "set" => Snapshot::Set, "go" => Snapshot::Go, - _ => return Err(Error::SnapshotError(format!("Unknown snapshot name: {}", name))), + _ => return Err(Error::Snapshot(format!("Unknown snapshot name: {}", name))), }; let client = client.statequery(); @@ -160,7 +157,7 @@ pub(crate) async fn dump( let stake_key_prefix = [match address_type { 0 => 0xe0u8, // key-based stake address 1 => 0xf0u8, // script-based stake address - _ => return Err(Error::SnapshotError(format!("Unknown address type: {}", address_type))), + _ => return Err(Error::Snapshot(format!("Unknown address type: {}", address_type))), } | network_id]; let stake_key_bytes = decoder.bytes()?; let stake_key_bytes = [&stake_key_prefix, stake_key_bytes].concat(); diff --git a/src/nodeclient/sync.rs b/src/nodeclient/sync/mod.rs similarity index 89% rename from src/nodeclient/sync.rs rename to src/nodeclient/sync/mod.rs index bfd99d3..68aa9d5 100644 --- a/src/nodeclient/sync.rs +++ b/src/nodeclient/sync/mod.rs @@ -4,52 +4,49 @@ use std::ops::Sub; use std::path::Path; use std::time::{Duration, Instant}; -use thiserror::Error; - use log::{debug, error, info, warn}; -use pallas_network::facades::{KeepAliveLoop, PeerClient, DEFAULT_KEEP_ALIVE_INTERVAL_SEC}; -use pallas_network::miniprotocols::chainsync::{HeaderContent, NextResponse, Tip}; -use pallas_network::miniprotocols::handshake::Confirmation; +use pallas_network::facades::{DEFAULT_KEEP_ALIVE_INTERVAL_SEC, KeepAliveLoop, PeerClient}; use pallas_network::miniprotocols::{ - blockfetch, chainsync, handshake, keepalive, txsubmission, Point, MAINNET_MAGIC, PROTOCOL_N2N_BLOCK_FETCH, - PROTOCOL_N2N_CHAIN_SYNC, PROTOCOL_N2N_HANDSHAKE, PROTOCOL_N2N_KEEP_ALIVE, PROTOCOL_N2N_TX_SUBMISSION, + blockfetch, chainsync, handshake, keepalive, MAINNET_MAGIC, Point, PROTOCOL_N2N_BLOCK_FETCH, PROTOCOL_N2N_CHAIN_SYNC, + PROTOCOL_N2N_HANDSHAKE, PROTOCOL_N2N_KEEP_ALIVE, PROTOCOL_N2N_TX_SUBMISSION, txsubmission, }; +use pallas_network::miniprotocols::chainsync::{HeaderContent, NextResponse, Tip}; +use pallas_network::miniprotocols::handshake::Confirmation; use pallas_network::multiplexer::{Bearer, Plexer}; use pallas_traverse::MultiEraHeader; +use thiserror::Error; -use crate::nodeclient::pooltool; -use crate::nodeclient::sqlite; -use crate::nodeclient::sqlite::BlockStore; +use crate::nodeclient::blockstore; +use crate::nodeclient::blockstore::BlockStore; +use crate::nodeclient::blockstore::redb::RedbBlockStore; +use crate::nodeclient::blockstore::sqlite::SqLiteBlockStore; -use super::sqlite::SqLiteBlockStore; +pub(crate) mod pooltool; const FIVE_SECS: Duration = Duration::from_secs(5); #[derive(Error, Debug)] pub enum Error { - #[error("loggingobserver error occurred")] - LoggingObserverError(String), - #[error("pallas_traverse error occurred: {0}")] - PallasTraverseError(#[from] pallas_traverse::Error), + PallasTraverse(#[from] pallas_traverse::Error), #[error("io error occurred: {0}")] - IoError(#[from] std::io::Error), + Io(#[from] std::io::Error), #[error("keepalive error occurred: {0}")] - KeepAliveError(#[from] keepalive::ClientError), + KeepAlive(#[from] keepalive::ClientError), #[error("chainsync error occurred: {0}")] - ChainSyncError(#[from] chainsync::ClientError), + ChainSync(#[from] chainsync::ClientError), - #[error("chainsync canceled")] - ChainSyncCanceled, + #[error("blockstore error occurred: {0}")] + BlockStore(#[from] blockstore::Error), } #[derive(Debug, Clone)] -pub struct BlockHeader { - pub block_number: i64, - pub slot_number: i64, +pub(crate) struct BlockHeader { + pub block_number: u64, + pub slot_number: u64, pub hash: Vec, pub prev_hash: Vec, pub node_vkey: Vec, @@ -60,14 +57,14 @@ pub struct BlockHeader { pub eta_vrf_1: Vec, pub leader_vrf_0: Vec, pub leader_vrf_1: Vec, - pub block_size: i64, + pub block_size: u64, pub block_body_hash: Vec, pub pool_opcert: Vec, - pub unknown_0: i64, - pub unknown_1: i64, + pub unknown_0: u64, + pub unknown_1: u64, pub unknown_2: Vec, - pub protocol_major_version: i64, - pub protocol_minor_version: i64, + pub protocol_major_version: u64, + pub protocol_minor_version: u64, } struct LoggingObserver { @@ -123,8 +120,8 @@ impl Observer for LoggingObserver { MultiEraHeader::ShelleyCompatible(header) => { //sqlite only handles signed values so some casting is done here self.pending_blocks.push(BlockHeader { - block_number: header.header_body.block_number as i64, - slot_number: slot as i64, + block_number: header.header_body.block_number, + slot_number: slot, hash: hash.to_vec(), prev_hash: match header.header_body.prev_hash { None => vec![], @@ -138,14 +135,14 @@ impl Observer for LoggingObserver { eta_vrf_1: header.header_body.nonce_vrf.1.to_vec(), leader_vrf_0: leader_vrf_output, leader_vrf_1: header.header_body.leader_vrf.1.to_vec(), - block_size: header.header_body.block_body_size as i64, + block_size: header.header_body.block_body_size, block_body_hash: header.header_body.block_body_hash.to_vec(), pool_opcert: header.header_body.operational_cert_hot_vkey.to_vec(), - unknown_0: header.header_body.operational_cert_sequence_number as i64, - unknown_1: header.header_body.operational_cert_kes_period as i64, + unknown_0: header.header_body.operational_cert_sequence_number, + unknown_1: header.header_body.operational_cert_kes_period, unknown_2: header.header_body.operational_cert_sigma.to_vec(), - protocol_major_version: header.header_body.protocol_major as i64, - protocol_minor_version: header.header_body.protocol_minor as i64, + protocol_major_version: header.header_body.protocol_major, + protocol_minor_version: header.header_body.protocol_minor, }); let block_number: f64 = header.header_body.block_number as f64; let tip_block_number: f64 = tip.1 as f64; @@ -174,8 +171,8 @@ impl Observer for LoggingObserver { MultiEraHeader::BabbageCompatible(header) => { //sqlite only handles signed values so some casting is done here self.pending_blocks.push(BlockHeader { - block_number: header.header_body.block_number as i64, - slot_number: slot as i64, + block_number: header.header_body.block_number, + slot_number: slot, hash: hash.to_vec(), prev_hash: match header.header_body.prev_hash { None => vec![], @@ -189,15 +186,14 @@ impl Observer for LoggingObserver { eta_vrf_1: vec![], leader_vrf_0: leader_vrf_output, leader_vrf_1: vec![], - block_size: header.header_body.block_body_size as i64, + block_size: header.header_body.block_body_size, block_body_hash: header.header_body.block_body_hash.to_vec(), pool_opcert: header.header_body.operational_cert.operational_cert_hot_vkey.to_vec(), - unknown_0: header.header_body.operational_cert.operational_cert_sequence_number - as i64, - unknown_1: header.header_body.operational_cert.operational_cert_kes_period as i64, + unknown_0: header.header_body.operational_cert.operational_cert_sequence_number, + unknown_1: header.header_body.operational_cert.operational_cert_kes_period, unknown_2: header.header_body.operational_cert.operational_cert_sigma.to_vec(), - protocol_major_version: header.header_body.protocol_version.0 as i64, - protocol_minor_version: header.header_body.protocol_version.1 as i64, + protocol_major_version: header.header_body.protocol_version.0, + protocol_minor_version: header.header_body.protocol_version.1, }); let block_number: f64 = header.header_body.block_number as f64; let tip_block_number = max(header.header_body.block_number, tip.1); @@ -257,24 +253,20 @@ impl Observer for LoggingObserver { } } -fn get_intersect_blocks(block_store: &mut SqLiteBlockStore) -> Result, Error> { +fn get_intersect_blocks(block_store: &mut Box) -> Result, Error> { let start = Instant::now(); debug!("get_intersect_blocks"); let mut chain_blocks: Vec = vec![]; /* Classic sync: Use blocks from store if available. */ - match block_store.load_blocks() { - None => {} - Some(blocks) => { - for (i, block) in blocks.iter().enumerate() { - // all powers of 2 including 0th element 0, 2, 4, 8, 16, 32 - if (i == 0) || ((i > 1) && (i & (i - 1) == 0)) { - chain_blocks.push(Point::Specific(block.0 as u64, block.1.clone())); - } - } + let blocks = block_store.load_blocks()?; + for (i, (slot, hash)) in blocks.iter().enumerate() { + // all powers of 2 including 0th element 0, 2, 4, 8, 16, 32 + if (i == 0) || ((i > 1) && (i & (i - 1) == 0)) { + chain_blocks.push(Point::Specific(*slot, hash.clone())); } - }; + } // add known points chain_blocks.push( @@ -363,16 +355,21 @@ pub(crate) async fn sync( network_magic: u64, shelley_genesis_hash: &str, no_service: bool, + use_redb: bool, ) { loop { // Retry to establish connection forever - let mut block_store = sqlite::SqLiteBlockStore::new(db).unwrap(); + let mut block_store: Box = if use_redb { + Box::new(RedbBlockStore::new(db).unwrap()) + } else { + Box::new(SqLiteBlockStore::new(db).unwrap()) + }; let chain_blocks = get_intersect_blocks(&mut block_store).unwrap(); match Bearer::connect_tcp_timeout( &format!("{host}:{port}").to_socket_addrs().unwrap().next().unwrap(), FIVE_SECS, ) - .await + .await { Ok(bearer) => { let mut plexer = Plexer::new(bearer); @@ -421,11 +418,11 @@ pub(crate) async fn sync( false, no_service, Some(chain_blocks), - Some(Box::new(block_store)), + Some(block_store), shelley_genesis_hash, ) - .await - .unwrap(); + .await + .unwrap(); plexer.abort().await; } @@ -475,7 +472,7 @@ pub(crate) async fn sendtip( &format!("{host}:{port}").to_socket_addrs().unwrap().next().unwrap(), FIVE_SECS, ) - .await + .await { Ok(bearer) => { let mut plexer = Plexer::new(bearer); @@ -526,8 +523,8 @@ pub(crate) async fn sendtip( Some(Box::new(pooltool_notifier)), "1a3be38bcbb7911969283716ad7aa550250226b76a61fc51cc9a9a35d9276d81".to_string(), ) - .await - .unwrap(); + .await + .unwrap(); plexer.abort().await; } diff --git a/src/nodeclient/pooltool.rs b/src/nodeclient/sync/pooltool.rs similarity index 83% rename from src/nodeclient/pooltool.rs rename to src/nodeclient/sync/pooltool.rs index e63bfde..2571f59 100644 --- a/src/nodeclient/pooltool.rs +++ b/src/nodeclient/sync/pooltool.rs @@ -1,16 +1,38 @@ +use std::fs::File; +use std::io::BufReader; use std::ops::Sub; -use std::path::PathBuf; +use std::path::{Path, PathBuf}; use std::process::{Command, Stdio}; use std::time::{Duration, Instant}; use chrono::{SecondsFormat, Utc}; use log::{error, info}; use regex::Regex; -use serde::Serialize; +use serde::{Deserialize, Serialize}; -use crate::nodeclient::sqlite::BlockStore; +use crate::APP_USER_AGENT; +use crate::nodeclient::blockstore; +use crate::nodeclient::blockstore::{Block, BlockStore, Error}; use crate::nodeclient::sync::BlockHeader; -use crate::nodeclient::APP_USER_AGENT; + +pub(crate) fn get_pooltool_config(config: &Path) -> PooltoolConfig { + let buf = BufReader::new(File::open(config).unwrap()); + serde_json::from_reader(buf).unwrap() +} + +#[derive(Debug, Deserialize)] +pub(crate) struct PooltoolConfig { + pub(crate) api_key: String, + pub(crate) pools: Vec, +} + +#[derive(Debug, Deserialize)] +pub(crate) struct Pool { + pub(crate) name: String, + pub(crate) pool_id: String, + pub(crate) host: String, + pub(crate) port: u16, +} #[derive(Debug, Serialize)] #[serde(rename_all = "camelCase")] @@ -26,15 +48,15 @@ struct PooltoolData0 { node_id: String, version: String, at: String, - block_no: i64, - slot_no: i64, + block_no: u64, + slot_no: u64, block_hash: String, parent_hash: String, leader_vrf: String, leader_vrf_proof: String, node_v_key: String, - protocol_major_version: i64, - protocol_minor_version: i64, + protocol_major_version: u64, + protocol_minor_version: u64, platform: String, } @@ -52,16 +74,16 @@ struct PooltoolData1 { node_id: String, version: String, at: String, - block_no: i64, - slot_no: i64, + block_no: u64, + slot_no: u64, block_hash: String, parent_hash: String, leader_vrf: String, block_vrf: String, block_vrf_proof: String, node_v_key: String, - protocol_major_version: i64, - protocol_minor_version: i64, + protocol_major_version: u64, + protocol_minor_version: u64, platform: String, } @@ -142,7 +164,7 @@ impl PoolToolNotifier { platform: "cncli".to_string(), }, }) - .unwrap(), + .unwrap(), ) .send() } else { @@ -169,7 +191,7 @@ impl PoolToolNotifier { platform: "cncli".to_string(), }, }) - .unwrap(), + .unwrap(), ) .send() }; @@ -207,12 +229,20 @@ impl BlockStore for PoolToolNotifier { &mut self, pending_blocks: &mut Vec, _shelley_genesis_hash: &str, - ) -> std::io::Result<()> { + ) -> Result<(), blockstore::Error> { self.send_to_pooltool(pending_blocks.last().unwrap()); Ok(()) } - fn load_blocks(&mut self) -> Option)>> { - None + fn load_blocks(&mut self) -> Result)>, Error> { + Err(Error::Blockstore("Not implemented".to_string())) + } + + fn find_block_by_hash(&mut self, _hash_start: &str) -> Result, Error> { + Err(Error::Blockstore("Not implemented".to_string())) + } + + fn get_tip_slot_number(&mut self) -> Result { + Err(Error::Blockstore("Not implemented".to_string())) } } diff --git a/src/nodeclient/validate.rs b/src/nodeclient/validate.rs deleted file mode 100644 index f8cdb24..0000000 --- a/src/nodeclient/validate.rs +++ /dev/null @@ -1,76 +0,0 @@ -use std::path::Path; - -use rusqlite::{Connection, Error}; - -struct Block { - block_number: i64, - slot_number: i64, - hash: String, - prev_hash: String, - pool_id: String, - leader_vrf: String, - orphaned: bool, -} - -pub fn validate_block(db_path: &Path, hash: &str) { - let like = format!("{hash}%"); - match query_block(db_path, like) { - Ok(block) => { - println!( - "{{\n\ - \x20\"status\": \"{}\",\n\ - \x20\"block_number\": \"{}\",\n\ - \x20\"slot_number\": \"{}\",\n\ - \x20\"pool_id\": \"{}\",\n\ - \x20\"hash\": \"{}\",\n\ - \x20\"prev_hash\": \"{}\",\n\ - \x20\"leader_vrf\": \"{}\"\n\ - }}", - if block.orphaned { "orphaned" } else { "ok" }, - block.block_number, - block.slot_number, - block.pool_id, - block.hash, - block.prev_hash, - block.leader_vrf - ); - } - Err(error) => { - println!( - "{{\n\ - \x20\"status\": \"error\",\n\ - \x20\"errorMessage\": \"{error}\"\n\ - }}" - ); - } - } -} - -fn query_block(db_path: &Path, like: String) -> Result { - if !db_path.exists() { - return Err(Error::InvalidPath(db_path.to_path_buf())); - } - - let db = Connection::open(db_path)?; - let query_result = db.query_row( - "SELECT block_number,slot_number,hash,prev_hash,pool_id,leader_vrf_0,orphaned FROM chain WHERE hash LIKE ? ORDER BY orphaned ASC", - [&like], - |row| { - Ok(Block { - block_number: row.get(0)?, - slot_number: row.get(1)?, - hash: row.get(2)?, - prev_hash: row.get(3)?, - pool_id: row.get(4)?, - leader_vrf: row.get(5)?, - orphaned: row.get(6)?, - }) - }, - ); - - if let Err(error) = db.close() { - return Err(error.1); - } - - query_result -} diff --git a/src/nodeclient/validate/mod.rs b/src/nodeclient/validate/mod.rs new file mode 100644 index 0000000..18585c5 --- /dev/null +++ b/src/nodeclient/validate/mod.rs @@ -0,0 +1,84 @@ +use std::path::Path; + +use thiserror::Error; + +use crate::nodeclient::blockstore::{Block, BlockStore}; +use crate::nodeclient::blockstore::redb::{is_redb_database, RedbBlockStore}; +use crate::nodeclient::blockstore::sqlite::SqLiteBlockStore; + +#[derive(Debug, Error)] +pub enum Error { + #[error("Invalid path: {0}")] + InvalidPath(std::path::PathBuf), + + #[error("Redb error: {0}")] + Redb(#[from] crate::nodeclient::blockstore::redb::Error), + + #[error("Sqlite error: {0}")] + Sqlite(#[from] crate::nodeclient::blockstore::sqlite::Error), + + #[error("Blockstore error: {0}")] + Blockstore(#[from] crate::nodeclient::blockstore::Error), +} + +pub fn validate_block(db_path: &Path, hash: &str) { + let like = format!("{hash}%"); + match query_block(db_path, like) { + Ok(block) => { + match block { + Some(block) => { + println!( + "{{\n\ + \x20\"status\": \"{}\",\n\ + \x20\"block_number\": \"{}\",\n\ + \x20\"slot_number\": \"{}\",\n\ + \x20\"pool_id\": \"{}\",\n\ + \x20\"hash\": \"{}\",\n\ + \x20\"prev_hash\": \"{}\",\n\ + \x20\"leader_vrf\": \"{}\"\n\ + }}", + if block.orphaned { "orphaned" } else { "ok" }, + block.block_number, + block.slot_number, + block.pool_id, + block.hash, + block.prev_hash, + block.leader_vrf, + ); + } + None => { + println!( + "{{\n\ + \x20\"status\": \"error\",\n\ + \x20\"errorMessage\": \"Block not found\"\n\ + }}" + ); + } + } + } + Err(error) => { + println!( + "{{\n\ + \x20\"status\": \"error\",\n\ + \x20\"errorMessage\": \"{error}\"\n\ + }}" + ); + } + } +} + +fn query_block(db_path: &Path, hash_start: String) -> Result, Error> { + if !db_path.exists() { + return Err(Error::InvalidPath(db_path.to_path_buf())); + } + // check if db_path is a redb database based on magic number + let use_redb = is_redb_database(db_path)?; + + let mut block_store: Box = if use_redb { + Box::new(RedbBlockStore::new(db_path)?) + } else { + Box::new(SqLiteBlockStore::new(db_path)?) + }; + + Ok(block_store.find_block_by_hash(&hash_start)?) +} diff --git a/src/test.rs b/src/test.rs index 83a7f74..3c48e79 100644 --- a/src/test.rs +++ b/src/test.rs @@ -5,7 +5,7 @@ use chrono::{NaiveDateTime, Utc}; use num_rational::BigRational; use regex::Regex; -use cncli::nodeclient::math::{ceiling, exp, find_e, ln, round, split_ln}; +use cncli::nodeclient::leaderlog::math::{ceiling, exp, find_e, ln, round, split_ln}; use cncli::nodeclient::ping; use nodeclient::leaderlog::is_overlay_slot; use nodeclient::math::ipow;