From 93ddbce4dc8ce7cec1189ef9bf26b2214f3f1302 Mon Sep 17 00:00:00 2001 From: "B. Yap" <2826165+b-yap@users.noreply.github.com> Date: Wed, 22 May 2024 19:52:26 +0800 Subject: [PATCH] Allow Tokio Console support (#517) * first iteration: add `tracing` feature of tokio, and the console-subscriber * update on tokio version, based on: https://github.com/tokio-rs/console/issues/345#issuecomment-1817939351 * use `tracing` library, instead of log. * remove unnecessary logs; add necessary logs; add timeout in `read_message_from_stellar` because it gets stuck * cleanup zombie task * Update README.md add documentation of the `tokio-console`. * remove duplicate `Parachain Block Listener` * https://github.com/pendulum-chain/spacewalk/pull/517/files#r1600267305 * https://github.com/pendulum-chain/spacewalk/pull/517#discussion_r1600209035, https://github.com/pendulum-chain/spacewalk/pull/517#discussion_r1600209572, * https://github.com/pendulum-chain/spacewalk/actions/runs/9094123182/job/24994504193?pr=517 * remocehttps://github.com/pendulum-chain/spacewalk/actions/runs/9095519314/job/24998905803?pr=517 * https://github.com/pendulum-chain/spacewalk/actions/runs/9096987912/job/25003758078?pr=517 * https://github.com/pendulum-chain/spacewalk/actions/runs/9098087121/job/25007476694?pr=517 * https://github.com/pendulum-chain/spacewalk/actions/runs/9108826563/job/25040418069?pr=517 --- Cargo.lock | 288 ++++++++++++++++-- clients/README.md | 39 ++- clients/runner/Cargo.toml | 2 +- clients/runtime/Cargo.toml | 2 +- clients/runtime/client/Cargo.toml | 2 +- clients/service/Cargo.toml | 2 +- clients/stellar-relay-lib/Cargo.toml | 7 +- clients/stellar-relay-lib/examples/connect.rs | 2 +- clients/stellar-relay-lib/src/config.rs | 2 +- .../connection/authentication/certificate.rs | 4 +- .../src/connection/connector/connector.rs | 11 +- .../connection/connector/message_handler.rs | 32 +- .../connection/connector/message_reader.rs | 63 ++-- .../connection/connector/message_sender.rs | 5 +- .../stellar-relay-lib/src/connection/error.rs | 2 +- .../src/connection/handshake.rs | 2 +- .../src/connection/helper.rs | 2 +- .../stellar-relay-lib/src/connection/hmac.rs | 5 +- .../src/connection/xdr_converter.rs | 2 +- clients/stellar-relay-lib/src/overlay.rs | 7 +- clients/vault/Cargo.toml | 7 +- clients/vault/src/main.rs | 3 + .../vault/src/oracle/collector/collector.rs | 11 +- clients/vault/src/oracle/collector/handler.rs | 8 +- clients/vault/src/system.rs | 22 +- clients/wallet/Cargo.toml | 2 +- clients/wallet/src/horizon/horizon.rs | 1 - clients/wallet/src/resubmissions.rs | 74 +++-- clients/wallet/src/stellar_wallet.rs | 6 +- 29 files changed, 474 insertions(+), 141 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b8943bbe0..4d6dc52ad 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -530,6 +530,28 @@ dependencies = [ "wasm-bindgen-futures", ] +[[package]] +name = "async-stream" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cd56dd203fef61ac097dd65721a419ddccb106b2d2b70ba60a6b529f03961a51" +dependencies = [ + "async-stream-impl", + "futures-core", + "pin-project-lite 0.2.14", +] + +[[package]] +name = "async-stream-impl" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "16e62a023e7c117e27523144c5d2459f4397fcc3cab0085af8e2224f643a0193" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.60", +] + [[package]] name = "async-task" version = "4.7.0" @@ -583,6 +605,51 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f1fdabc7756949593fe60f30ec81974b613357de856987752631dea1e3394c80" +[[package]] +name = "axum" +version = "0.6.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3b829e4e32b91e643de6eafe82b1d90675f5874230191a4ffbc1b336dec4d6bf" +dependencies = [ + "async-trait", + "axum-core", + "bitflags 1.3.2", + "bytes", + "futures-util", + "http 0.2.12", + "http-body", + "hyper", + "itoa", + "matchit", + "memchr", + "mime", + "percent-encoding 2.3.1", + "pin-project-lite 0.2.14", + "rustversion", + "serde", + "sync_wrapper", + "tower", + "tower-layer", + "tower-service", +] + +[[package]] +name = "axum-core" +version = "0.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "759fa577a247914fd3f7f76d62972792636412fbfd634cd452f6a385a74d2d2c" +dependencies = [ + "async-trait", + "bytes", + "futures-util", + "http 0.2.12", + "http-body", + "mime", + "rustversion", + "tower-layer", + "tower-service", +] + [[package]] name = "backoff" version = "0.3.0" @@ -1271,6 +1338,43 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "console-api" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fd326812b3fd01da5bb1af7d340d0d555fd3d4b641e7f1dfcf5962a902952787" +dependencies = [ + "futures-core", + "prost 0.12.4", + "prost-types 0.12.4", + "tonic", + "tracing-core", +] + +[[package]] +name = "console-subscriber" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7481d4c57092cd1c19dd541b92bdce883de840df30aa5d03fd48a3935c01842e" +dependencies = [ + "console-api", + "crossbeam-channel", + "crossbeam-utils", + "futures-task", + "hdrhistogram", + "humantime 2.1.0", + "prost-types 0.12.4", + "serde", + "serde_json", + "thread_local", + "tokio", + "tokio-stream", + "tonic", + "tracing", + "tracing-core", + "tracing-subscriber 0.3.18", +] + [[package]] name = "const-oid" version = "0.9.6" @@ -1455,6 +1559,15 @@ dependencies = [ "cfg-if 1.0.0", ] +[[package]] +name = "crossbeam-channel" +version = "0.5.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ab3db02a9c5b5121e1e42fbdb1aeb65f5e02624cc58c43f2884c6ccac0b82f95" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "crossbeam-deque" version = "0.8.5" @@ -3348,6 +3461,19 @@ dependencies = [ "allocator-api2", ] +[[package]] +name = "hdrhistogram" +version = "7.5.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "765c9198f173dd59ce26ff9f95ef0aafd0a0fe01fb9d72841bc5066a4c06511d" +dependencies = [ + "base64 0.21.7", + "byteorder", + "flate2", + "nom", + "num-traits", +] + [[package]] name = "headers" version = "0.3.9" @@ -3621,6 +3747,18 @@ dependencies = [ "webpki-roots 0.25.4", ] +[[package]] +name = "hyper-timeout" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbb958482e8c7be4bc3cf272a766a2b0bf1a6755e7a6ae777f017a31d11b13b1" +dependencies = [ + "hyper", + "pin-project-lite 0.2.14", + "tokio", + "tokio-io-timeout", +] + [[package]] name = "hyper-tls" version = "0.5.0" @@ -4320,7 +4458,7 @@ dependencies = [ "once_cell", "parking_lot 0.12.1", "pin-project", - "prost", + "prost 0.11.9", "prost-build", "rand 0.8.5", "rw-stream-sink", @@ -4388,7 +4526,7 @@ dependencies = [ "libp2p-swarm", "log", "lru", - "prost", + "prost 0.11.9", "prost-build", "prost-codec", "smallvec", @@ -4431,7 +4569,7 @@ dependencies = [ "libp2p-core 0.38.0", "libp2p-swarm", "log", - "prost", + "prost 0.11.9", "prost-build", "rand 0.8.5", "sha2 0.10.8", @@ -4506,7 +4644,7 @@ dependencies = [ "libp2p-core 0.38.0", "log", "once_cell", - "prost", + "prost 0.11.9", "prost-build", "rand 0.8.5", "sha2 0.10.8", @@ -4671,7 +4809,7 @@ dependencies = [ "libp2p-noise", "log", "multihash 0.16.3", - "prost", + "prost 0.11.9", "prost-build", "prost-codec", "rand 0.8.5", @@ -4943,12 +5081,27 @@ dependencies = [ "regex-automata 0.1.10", ] +[[package]] +name = "matchers" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8263075bb86c5a1b1427b5ae862e8889656f126e9f77c484496e8b47cf5c5558" +dependencies = [ + "regex-automata 0.1.10", +] + [[package]] name = "matches" version = "0.1.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2532096657941c2fea9c289d370a250971c689d4f143798ff67113ec042024a5" +[[package]] +name = "matchit" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0e7465ac9959cc2b1404e8e2367b43684a6d13790fe23056cc8c6c5a6b7bcb94" + [[package]] name = "matrixmultiply" version = "0.3.8" @@ -6791,7 +6944,17 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0b82eaa1d779e9a4bc1c3217db8ffbeabaae1dca241bf70183242128d48681cd" dependencies = [ "bytes", - "prost-derive", + "prost-derive 0.11.9", +] + +[[package]] +name = "prost" +version = "0.12.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d0f5d036824e4761737860779c906171497f6d55681139d8312388f8fe398922" +dependencies = [ + "bytes", + "prost-derive 0.12.5", ] [[package]] @@ -6808,8 +6971,8 @@ dependencies = [ "multimap", "petgraph", "prettyplease", - "prost", - "prost-types", + "prost 0.11.9", + "prost-types 0.11.9", "regex", "syn 1.0.109", "tempfile", @@ -6824,7 +6987,7 @@ checksum = "0dc34979ff898b6e141106178981ce2596c387ea6e62533facfc61a37fc879c0" dependencies = [ "asynchronous-codec", "bytes", - "prost", + "prost 0.11.9", "thiserror", "unsigned-varint", ] @@ -6842,13 +7005,35 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "prost-derive" +version = "0.12.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9554e3ab233f0a932403704f1a1d08c30d5ccd931adfdfa1e8b5a19b52c1d55a" +dependencies = [ + "anyhow", + "itertools", + "proc-macro2", + "quote", + "syn 2.0.60", +] + [[package]] name = "prost-types" version = "0.11.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "213622a1460818959ac1181aaeb2dc9c7f63df720db7d788b3e24eacd1983e13" dependencies = [ - "prost", + "prost 0.11.9", +] + +[[package]] +name = "prost-types" +version = "0.12.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3235c33eb02c1f1e212abdbe34c78b264b038fb58ca612664343271e36e55ffe" +dependencies = [ + "prost 0.12.4", ] [[package]] @@ -8314,7 +8499,7 @@ dependencies = [ "futures 0.3.30", "libp2p", "log", - "prost", + "prost 0.11.9", "prost-build", "sc-client-api", "sc-network", @@ -8382,7 +8567,7 @@ dependencies = [ "libp2p", "log", "parity-scale-codec", - "prost", + "prost 0.11.9", "prost-build", "sc-client-api", "sc-network", @@ -8409,7 +8594,7 @@ dependencies = [ "lru", "mockall 0.11.4", "parity-scale-codec", - "prost", + "prost 0.11.9", "prost-build", "sc-client-api", "sc-consensus", @@ -8750,7 +8935,7 @@ dependencies = [ "thiserror", "tracing", "tracing-log", - "tracing-subscriber", + "tracing-subscriber 0.2.25", ] [[package]] @@ -9316,7 +9501,7 @@ dependencies = [ "tokio", "tracing", "tracing-futures", - "tracing-subscriber", + "tracing-subscriber 0.2.25", "wallet", "warp", ] @@ -10118,7 +10303,7 @@ dependencies = [ "sp-std 5.0.0", "tracing", "tracing-core", - "tracing-subscriber", + "tracing-subscriber 0.2.25", ] [[package]] @@ -10592,7 +10777,6 @@ dependencies = [ "err-derive", "hex", "hmac 0.12.1", - "log", "rand 0.8.5", "serde", "serde_json", @@ -10601,6 +10785,7 @@ dependencies = [ "sha2 0.10.8", "substrate-stellar-sdk", "tokio", + "tracing", "tweetnacl", "wallet", ] @@ -11223,9 +11408,20 @@ dependencies = [ "signal-hook-registry", "socket2 0.5.6", "tokio-macros", + "tracing", "windows-sys 0.48.0", ] +[[package]] +name = "tokio-io-timeout" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "30b74022ada614a1b4834de765f9bb43877f910cc8ce4be40e89042c9223a8bf" +dependencies = [ + "pin-project-lite 0.2.14", + "tokio", +] + [[package]] name = "tokio-macros" version = "2.2.0" @@ -11382,12 +11578,48 @@ dependencies = [ "winnow", ] +[[package]] +name = "tonic" +version = "0.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d560933a0de61cf715926b9cac824d4c883c2c43142f787595e48280c40a1d0e" +dependencies = [ + "async-stream", + "async-trait", + "axum", + "base64 0.21.7", + "bytes", + "h2", + "http 0.2.12", + "http-body", + "hyper", + "hyper-timeout", + "percent-encoding 2.3.1", + "pin-project", + "prost 0.12.4", + "tokio", + "tokio-stream", + "tower", + "tower-layer", + "tower-service", + "tracing", +] + [[package]] name = "tower" version = "0.4.13" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c" dependencies = [ + "futures-core", + "futures-util", + "indexmap 1.9.3", + "pin-project", + "pin-project-lite 0.2.14", + "rand 0.8.5", + "slab", + "tokio", + "tokio-util", "tower-layer", "tower-service", "tracing", @@ -11496,7 +11728,7 @@ dependencies = [ "ansi_term", "chrono", "lazy_static", - "matchers", + "matchers 0.0.1", "parking_lot 0.11.2", "regex", "serde", @@ -11510,6 +11742,21 @@ dependencies = [ "tracing-serde", ] +[[package]] +name = "tracing-subscriber" +version = "0.3.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ad0f048c97dbd9faa9b7df56362b8ebcaa52adb06b498c050d2f4e32f90a7a8b" +dependencies = [ + "matchers 0.1.0", + "once_cell", + "regex", + "sharded-slab", + "thread_local", + "tracing", + "tracing-core", +] + [[package]] name = "trie-db" version = "0.27.1" @@ -11834,6 +12081,7 @@ dependencies = [ "bincode", "cfg-if 1.0.0", "clap 3.2.25", + "console-subscriber", "env_logger 0.9.3", "err-derive", "flate2", @@ -11878,7 +12126,7 @@ dependencies = [ "tokio-stream", "tracing", "tracing-futures", - "tracing-subscriber", + "tracing-subscriber 0.2.25", "wallet", ] @@ -11995,7 +12243,7 @@ dependencies = [ "tokio-stream", "tracing", "tracing-futures", - "tracing-subscriber", + "tracing-subscriber 0.2.25", ] [[package]] diff --git a/clients/README.md b/clients/README.md index da48323db..f0ebc1f2e 100644 --- a/clients/README.md +++ b/clients/README.md @@ -218,6 +218,43 @@ If the transaction submission fails giving a `tx_failed` in the `result_codes` o due to the converted destination account not having trustlines set up for the redeemed asset. The destination account is derived automatically from the account that called the extrinsic on-chain. +### Debugging with `tokio-console` +The vault is `tokio-console` ready, with the feature **_`allow-debugger`_**. _Remember to [set the rustflags](https://github.com/tokio-rs/console?tab=readme-ov-file#instrumenting-your-program)!_ +``` +RUSTFLAGS="--cfg tokio_unstable" cargo run --bin vault --features allow-debugger +``` +[Install tokio-console](https://github.com/tokio-rs/console?tab=readme-ov-file#running-the-console) +and connect to the vault. +If using the [testchain](../testchain) and vault's `standalone-metadata` feature, you can use the command: +``` +tokio-console http://127.0.0.1:6669 +``` +This will display: +Screenshot 2024-05-13 at 6 33 33 PM +The multiple ` tokio::task clients/vault/src/system.rs ` tasks follows the tasks spawned consecutively, in [system.rs](https://github.com/pendulum-chain/spacewalk/blob/main/clients/vault/src/system.rs): + +* The first 4 tasks are from `fn create_initial_tasks(...)` : + * VaultId Registration Listener + * Restart Timer + * Stellar Transaction Listener + * Parachain Block Listener +* Next 5 tasks from `fn create_issue_tasks(...)` : + * Issue Request Listener + * Issue Cancel Listener + * Issue Execute Listener + * Issue Executor + * Issue Cancel Scheduler +* Next 4 tasks from `fn create_replace_tasks(...)` : + * Request Replace Listener + * Accept Replace Listener + * Execute Replace Listener + * Replace Cancellation Scheduler +* Redeem Request Listener +* The last 2 tasks from `create_bridge_metrics_tasks(...)` : + * Bridge Metrics Listener + * Bridge Metrics Poller + + ## Notes on the implementation of subxt This section is supposed to help when encountering issues with communication of vault client and parachain. @@ -257,4 +294,4 @@ found [here](https://docs.rs/subxt-macro/latest/subxt_macro/#adding-derives-for- When the compiler complains about mismatched types although the types seem to be the same, you might have to use type substitutions. This is done by adding the `#[subxt(substitute_type = "some type")]` attribute to the metadata module. -More documentation can be found [here](https://docs.rs/subxt-macro/latest/subxt_macro/#substituting-types). \ No newline at end of file +More documentation can be found [here](https://docs.rs/subxt-macro/latest/subxt_macro/#substituting-types). diff --git a/clients/runner/Cargo.toml b/clients/runner/Cargo.toml index af5011a2f..d849d5c45 100644 --- a/clients/runner/Cargo.toml +++ b/clients/runner/Cargo.toml @@ -7,7 +7,7 @@ edition = "2021" [dependencies] clap = { version = "4.0.17", features = ["derive"]} hex = "0.4.3" -tokio = { version = "1.8", features = ["rt-multi-thread", "macros", "time"] } +tokio = { version = "1.37", features = ["rt-multi-thread", "macros", "time"] } codec = { package = "parity-scale-codec", version = "3.0.0", default-features = false, features = ["derive", "full", "bit-vec"] } sp-core = { git = "https://github.com/paritytech/substrate", branch = "polkadot-v0.9.42" } thiserror = "1.0.0" diff --git a/clients/runtime/Cargo.toml b/clients/runtime/Cargo.toml index e1e5d4ec9..8de5003ec 100644 --- a/clients/runtime/Cargo.toml +++ b/clients/runtime/Cargo.toml @@ -31,7 +31,7 @@ log = "0.4.0" serde = { version = "1.0.136", features = ["derive"] } serde_json = "1.0.71" thiserror = "1.0" -tokio = { version = "1.0", features = ["full"] } +tokio = { version = "1.37", features = ["full"] } prometheus = { version = "0.12.0", features = ["process"] } url = "2" diff --git a/clients/runtime/client/Cargo.toml b/clients/runtime/client/Cargo.toml index 84f669417..cad60c0b1 100644 --- a/clients/runtime/client/Cargo.toml +++ b/clients/runtime/client/Cargo.toml @@ -12,7 +12,7 @@ description = "Embed a substrate node into your subxt application." keywords = ["parity", "substrate", "blockchain"] [dependencies] -tokio = { version = "1.10", features = ["time", "rt-multi-thread"] } +tokio = { version = "1.37", features = ["time", "rt-multi-thread"] } futures = { version = "0.3.9", features = ["compat"], package = "futures" } futures01 = { package = "futures", version = "0.1.29" } jsonrpsee = "0.16.0" diff --git a/clients/service/Cargo.toml b/clients/service/Cargo.toml index 9ddbb1aea..774ff7ade 100644 --- a/clients/service/Cargo.toml +++ b/clients/service/Cargo.toml @@ -12,7 +12,7 @@ thiserror = "1.0" hyper = { version = "0.14.11" } hyper-tls = "0.5.0" -tokio = { version = "1.0", features = ["full"] } +tokio = { version = "1.37", features = ["full"] } warp = "0.3.2" serde = { version = "1.0.136", features = ["derive"] } diff --git a/clients/stellar-relay-lib/Cargo.toml b/clients/stellar-relay-lib/Cargo.toml index 6d21d15c9..e20ca9bf7 100644 --- a/clients/stellar-relay-lib/Cargo.toml +++ b/clients/stellar-relay-lib/Cargo.toml @@ -16,7 +16,7 @@ wallet = { path = "../wallet", features = ["testing-utils"] } [dependencies] hex = "0.4.3" -log = {version = "0.4.14"} +tracing = { version = "0.1", features = ["log"] } base64 = "0.13.0" rand = "0.8.5" @@ -34,11 +34,12 @@ substrate-stellar-sdk = {git = "https://github.com/pendulum-chain/substrate-stel err-derive = "0.3.1" -tokio = { version = "1.0", features = [ +tokio = { version = "1.37", features = [ "macros", # allows main function to be async "rt-multi-thread", # for multi-thread runtime "sync", # to make channels available - "time" # for timeouts and sleep, when reconnecting + "time", # for timeouts and sleep, when reconnecting + "tracing" # for tokio console ] } async-std = { version = "1.12.0", features = ["attributes"] } diff --git a/clients/stellar-relay-lib/examples/connect.rs b/clients/stellar-relay-lib/examples/connect.rs index 913176637..6e3283dc7 100644 --- a/clients/stellar-relay-lib/examples/connect.rs +++ b/clients/stellar-relay-lib/examples/connect.rs @@ -37,7 +37,7 @@ async fn main() -> Result<(), Box> { ScpStatementPledges::ScpStExternalize(_) => "ScpStExternalize", ScpStatementPledges::ScpStNominate(_) => "ScpStNominate ", }; - log::info!( + tracing::info!( "{} sent StellarMessage of type {} for ledger {}", node_id, stmt_type, diff --git a/clients/stellar-relay-lib/src/config.rs b/clients/stellar-relay-lib/src/config.rs index ce787e143..a00d3953f 100644 --- a/clients/stellar-relay-lib/src/config.rs +++ b/clients/stellar-relay-lib/src/config.rs @@ -45,7 +45,7 @@ impl StellarOverlayConfig { let public_key = secret_key.get_public().to_encoding(); let public_key = std::str::from_utf8(&public_key).unwrap(); - log::info!( + tracing::info!( "connection_info(): Connecting to Stellar overlay network using public key: {public_key}" ); diff --git a/clients/stellar-relay-lib/src/connection/authentication/certificate.rs b/clients/stellar-relay-lib/src/connection/authentication/certificate.rs index 588a3e371..7c457d3e7 100644 --- a/clients/stellar-relay-lib/src/connection/authentication/certificate.rs +++ b/clients/stellar-relay-lib/src/connection/authentication/certificate.rs @@ -53,7 +53,7 @@ pub fn create_auth_cert( let signature: Signature = Signature::new(keypair.create_signature(raw_sig_data).to_vec()) .map_err(|e| { - log::error!("create_auth_cert(): {e:?}"); + tracing::error!("create_auth_cert(): {e:?}"); Error::AuthSignatureFailed })?; @@ -87,7 +87,7 @@ pub fn verify_remote_auth_cert( match auth_cert_sig.try_into() { Ok(raw_sig) => remote_pub_key.verify_signature(raw_data, &raw_sig), Err(_) => { - log::warn!( + tracing::warn!( "failed to convert auth cert signature of size {} to fixed array of 64.", sig_len ); diff --git a/clients/stellar-relay-lib/src/connection/connector/connector.rs b/clients/stellar-relay-lib/src/connection/connector/connector.rs index 5625597ab..deea0bc4e 100644 --- a/clients/stellar-relay-lib/src/connection/connector/connector.rs +++ b/clients/stellar-relay-lib/src/connection/connector/connector.rs @@ -7,6 +7,7 @@ use substrate_stellar_sdk::{ types::{AuthenticatedMessageV0, Curve25519Public, HmacSha256Mac, MessageType}, XdrCodec, }; +use tracing::{error, trace}; use crate::{ connection::{ @@ -86,18 +87,12 @@ impl Connector { body: &[u8], ) -> Result<(), Error> { let remote_info = self.remote_info.as_ref().ok_or(Error::NoRemoteInfo)?; - log::trace!( + trace!( "verify_auth(): remote sequence: {}, auth message sequence: {}", remote_info.sequence(), auth_msg.sequence ); - let auth_msg_xdr = auth_msg.to_base64_xdr(); - let auth_msg_xdr = - String::from_utf8(auth_msg_xdr.clone()).unwrap_or(format!("{:?}", auth_msg_xdr)); - - log::debug!("verify_auth(): received auth message from Stellar Node: {auth_msg_xdr}"); - if remote_info.sequence() != auth_msg.sequence { // must be handled on main thread because workers could mix up order of messages. return Err(Error::InvalidSequenceNumber) @@ -169,7 +164,7 @@ impl Connector { pub fn stop(&mut self) { if let Err(e) = self.tcp_stream.shutdown(Shutdown::Both) { - log::error!("stop(): failed to shutdown tcp stream: {}", e); + error!("stop(): failed to shutdown tcp stream: {}", e); } } } diff --git a/clients/stellar-relay-lib/src/connection/connector/message_handler.rs b/clients/stellar-relay-lib/src/connection/connector/message_handler.rs index b4101ea71..276372ff1 100644 --- a/clients/stellar-relay-lib/src/connection/connector/message_handler.rs +++ b/clients/stellar-relay-lib/src/connection/connector/message_handler.rs @@ -1,8 +1,3 @@ -use substrate_stellar_sdk::{ - types::{ErrorCode, Hello, MessageType, StellarMessage}, - XdrCodec, -}; - use crate::connection::{ authentication::verify_remote_auth_cert, helper::{error_to_string, time_now}, @@ -10,6 +5,11 @@ use crate::connection::{ xdr_converter::parse_authenticated_message, Connector, Error, Xdr, }; +use substrate_stellar_sdk::{ + types::{ErrorCode, Hello, MessageType, StellarMessage}, + XdrCodec, +}; +use tracing::{error, info, trace, warn}; use crate::node::RemoteInfo; @@ -33,13 +33,13 @@ impl Connector { MessageType::ErrorMsg => match auth_msg.message { StellarMessage::ErrorMsg(e) => { - log::error!( + error!( "process_raw_message(): Received ErrorMsg during authentication: {}", error_to_string(e.clone()) ); return Err(Error::from(e)) }, - other => log::error!( + other => error!( "process_raw_message(): Received ErrorMsg during authentication: {:?}", other ), @@ -50,9 +50,7 @@ impl Connector { if self.is_handshake_created() { self.verify_auth(&auth_msg, &data[4..(data.len() - 32)])?; self.increment_remote_sequence()?; - log::trace!( - "process_raw_message(): Processing {msg_type:?} message: auth verified" - ); + trace!("process_raw_message(): Processing {msg_type:?} message: auth verified"); } return self.process_stellar_message(auth_msg.message, msg_type).await @@ -80,7 +78,7 @@ impl Connector { } else { self.send_auth_message().await?; } - log::info!("process_stellar_message(): Hello message processed successfully"); + info!("process_stellar_message(): Hello message processed successfully"); }, StellarMessage::Auth(_) => { @@ -88,21 +86,15 @@ impl Connector { }, StellarMessage::ErrorMsg(e) => { - log::error!( - "process_stellar_message(): Received ErrorMsg during authentication: {e:?}" - ); + error!("process_stellar_message(): Received ErrorMsg during authentication: {e:?}"); if e.code == ErrorCode::ErrConf || e.code == ErrorCode::ErrAuth { return Err(Error::from(e)) } return Ok(Some(StellarMessage::ErrorMsg(e))) }, + // we do not handle other messages. Return to caller other => { - log::trace!( - "process_stellar_message(): Processing {} message: received from overlay", - String::from_utf8(other.to_base64_xdr()) - .unwrap_or(format!("{:?}", other.to_base64_xdr())) - ); self.check_to_send_more(msg_type).await?; return Ok(Some(other)) }, @@ -124,7 +116,7 @@ impl Connector { remote.node().overlay_version, ); } else { - log::warn!("process_auth_message(): No remote overlay version after handshake."); + warn!("process_auth_message(): No remote overlay version after handshake."); } self.check_to_send_more(MessageType::Auth).await diff --git a/clients/stellar-relay-lib/src/connection/connector/message_reader.rs b/clients/stellar-relay-lib/src/connection/connector/message_reader.rs index aeee67a47..d04d9bb25 100644 --- a/clients/stellar-relay-lib/src/connection/connector/message_reader.rs +++ b/clients/stellar-relay-lib/src/connection/connector/message_reader.rs @@ -1,7 +1,15 @@ use crate::connection::{xdr_converter::get_xdr_message_length, Connector, Error, Xdr}; use async_std::io::ReadExt; +use std::time::Duration; use substrate_stellar_sdk::{types::StellarMessage, XdrCodec}; -use tokio::sync::{mpsc, mpsc::error::TryRecvError}; +use tokio::{ + sync::{mpsc, mpsc::error::TryRecvError}, + time::timeout, +}; +use tracing::{debug, error, info, trace, warn}; + +/// The waiting time for reading messages from stream. +static READ_TIMEOUT_IN_SECS: u64 = 60; /// Polls for messages coming from the Stellar Node and communicates it back to the user /// @@ -15,11 +23,11 @@ pub(crate) async fn poll_messages_from_stellar( send_to_user_sender: mpsc::Sender, mut send_to_node_receiver: mpsc::Receiver, ) { - log::info!("poll_messages_from_stellar(): started."); + info!("poll_messages_from_stellar(): started."); loop { if send_to_user_sender.is_closed() { - log::info!("poll_messages_from_stellar(): closing receiver during disconnection"); + info!("poll_messages_from_stellar(): closing receiver during disconnection"); // close this channel as communication to user was closed. break } @@ -28,7 +36,7 @@ pub(crate) async fn poll_messages_from_stellar( match send_to_node_receiver.try_recv() { Ok(msg) => if let Err(e) = connector.send_to_node(msg).await { - log::error!("poll_messages_from_stellar(): Error occurred during sending message to node: {e:?}"); + error!("poll_messages_from_stellar(): Error occurred during sending message to node: {e:?}"); }, Err(TryRecvError::Disconnected) => break, Err(TryRecvError::Empty) => {}, @@ -37,7 +45,7 @@ pub(crate) async fn poll_messages_from_stellar( // check for messages from Stellar Node. let xdr = match read_message_from_stellar(&mut connector).await { Err(e) => { - log::error!("poll_messages_from_stellar(): {e:?}"); + error!("poll_messages_from_stellar(): {e:?}"); break }, Ok(xdr) => xdr, @@ -47,14 +55,14 @@ pub(crate) async fn poll_messages_from_stellar( Ok(Some(stellar_msg)) => // push message to user if let Err(e) = send_to_user_sender.send(stellar_msg.clone()).await { - log::warn!("poll_messages_from_stellar(): Error occurred during sending message {} to user: {e:?}", + warn!("poll_messages_from_stellar(): Error occurred during sending message {} to user: {e:?}", String::from_utf8(stellar_msg.to_base64_xdr()) .unwrap_or_else(|_| format!("{:?}", stellar_msg.to_base64_xdr())) ); }, Ok(None) => {}, Err(e) => { - log::error!("poll_messages_from_stellar(): Error occurred during processing xdr message: {e:?}"); + error!("poll_messages_from_stellar(): Error occurred during processing xdr message: {e:?}"); break }, } @@ -65,7 +73,7 @@ pub(crate) async fn poll_messages_from_stellar( send_to_node_receiver.close(); drop(send_to_user_sender); - log::debug!("poll_messages_from_stellar(): stopped."); + debug!("poll_messages_from_stellar(): stopped."); } /// Returns Xdr format of the `StellarMessage` sent from the Stellar Node @@ -76,11 +84,17 @@ async fn read_message_from_stellar(connector: &mut Connector) -> Result continue, + // identify bytes as: + // 1. the length of the next stellar message + // 2. the remaining bytes of the previous stellar message + // return Timeout error if reading time has elapsed. + match timeout( + Duration::from_secs(READ_TIMEOUT_IN_SECS), + connector.tcp_stream.read(&mut buff_for_reading), + ) + .await + { + Ok(Ok(0)) => continue, Ok(_) if lack_bytes_from_prev == 0 => { // if there are no more bytes lacking from the previous message, // then check the size of next stellar message. @@ -89,7 +103,7 @@ async fn read_message_from_stellar(connector: &mut Connector) -> Result Result continue, Ok(Some(xdr)) => return Ok(xdr), Err(e) => { - log::trace!("read_message_from_stellar(): ERROR: {e:?}"); + trace!("read_message_from_stellar(): ERROR: {e:?}"); return Err(e) }, } }, - Ok(size) => { + Ok(Ok(size)) => { // The next few bytes was read. Add it to the readbuf. lack_bytes_from_prev = lack_bytes_from_prev.saturating_sub(size); readbuf.append(&mut buff_for_reading); @@ -126,16 +140,19 @@ async fn read_message_from_stellar(connector: &mut Connector) -> Result continue, Ok(Some(xdr)) => return Ok(xdr), Err(e) => { - log::trace!("read_message_from_stellar(): ERROR: {e:?}"); + trace!("read_message_from_stellar(): ERROR: {e:?}"); return Err(e) }, } }, - - Err(e) => { - log::trace!("read_message_from_stellar(): ERROR reading messages: {e:?}"); + Ok(Err(e)) => { + trace!("read_message_from_stellar(): ERROR reading messages: {e:?}"); return Err(Error::ReadFailed(e.to_string())) }, + Err(_) => { + trace!("read_message_from_stellar(): reading time elapsed."); + return Err(Error::Timeout) + }, } } } @@ -170,7 +187,7 @@ async fn read_message( // save it and read it on the next loop. *lack_bytes_from_prev = xpect_msg_len - actual_msg_len; *readbuf = readbuf[0..actual_msg_len].to_owned(); - log::trace!( + trace!( "read_message(): received only partial message. Need {lack_bytes_from_prev} bytes to complete." ); @@ -201,7 +218,7 @@ async fn read_unfinished_message( // this partial message completes the previous message. if actual_msg_len == *lack_bytes_from_prev { - log::trace!("read_unfinished_message(): received continuation from the previous message."); + trace!("read_unfinished_message(): received continuation from the previous message."); readbuf.append(&mut cont_buf); return Ok(Some(readbuf.clone())) @@ -212,7 +229,7 @@ async fn read_unfinished_message( *lack_bytes_from_prev -= actual_msg_len; cont_buf = cont_buf[0..actual_msg_len].to_owned(); readbuf.append(&mut cont_buf); - log::trace!( + trace!( "read_unfinished_message(): not enough bytes to complete the previous message. Need {lack_bytes_from_prev} bytes to complete." ); } diff --git a/clients/stellar-relay-lib/src/connection/connector/message_sender.rs b/clients/stellar-relay-lib/src/connection/connector/message_sender.rs index 1145dddb6..4c13f6efd 100644 --- a/clients/stellar-relay-lib/src/connection/connector/message_sender.rs +++ b/clients/stellar-relay-lib/src/connection/connector/message_sender.rs @@ -2,6 +2,7 @@ use async_std::io::WriteExt; use std::time::Duration; use substrate_stellar_sdk::types::{MessageType, SendMore, StellarMessage}; use tokio::time::timeout; +use tracing::debug; use crate::connection::{ flow_controller::MAX_FLOOD_MSG_CAP, @@ -28,14 +29,14 @@ impl Connector { pub async fn send_hello_message(&mut self) -> Result<(), Error> { let msg = self.create_hello_message(time_now())?; - log::info!("send_hello_message(): Sending Hello Message: {}", to_base64_xdr_string(&msg)); + debug!("send_hello_message(): Sending Hello Message: {}", to_base64_xdr_string(&msg)); self.send_to_node(msg).await } pub(super) async fn send_auth_message(&mut self) -> Result<(), Error> { let msg = create_auth_message(); - log::info!("send_auth_message(): Sending Auth Message: {}", to_base64_xdr_string(&msg)); + debug!("send_auth_message(): Sending Auth Message: {}", to_base64_xdr_string(&msg)); self.send_to_node(create_auth_message()).await } diff --git a/clients/stellar-relay-lib/src/connection/error.rs b/clients/stellar-relay-lib/src/connection/error.rs index 261963356..ac41d213c 100644 --- a/clients/stellar-relay-lib/src/connection/error.rs +++ b/clients/stellar-relay-lib/src/connection/error.rs @@ -92,7 +92,7 @@ impl From for Error { ErrorCode::ErrConf => Self::ConfigError(error_to_string(value)), ErrorCode::ErrAuth => Self::AuthFailed(error_to_string(value)), other => { - log::error!("Stellar Node returned error: {}", error_to_string(value)); + tracing::error!("Stellar Node returned error: {}", error_to_string(value)); Self::OverlayError(other) }, } diff --git a/clients/stellar-relay-lib/src/connection/handshake.rs b/clients/stellar-relay-lib/src/connection/handshake.rs index d9952fdac..f37db7ca7 100644 --- a/clients/stellar-relay-lib/src/connection/handshake.rs +++ b/clients/stellar-relay-lib/src/connection/handshake.rs @@ -34,7 +34,7 @@ pub fn create_hello_message( overlay_min_version: node_info.overlay_min_version, network_id: node_info.network_id, version_str: LimitedString::<100>::new(version_str.clone()).map_err(|e| { - log::error!("create_hello_message(): {e:?}"); + tracing::error!("create_hello_message(): {e:?}"); Error::VersionStrTooLong })?, listening_port: i32::try_from(listening_port).unwrap_or(11625), diff --git a/clients/stellar-relay-lib/src/connection/helper.rs b/clients/stellar-relay-lib/src/connection/helper.rs index 57f9877b7..6c049cd0f 100644 --- a/clients/stellar-relay-lib/src/connection/helper.rs +++ b/clients/stellar-relay-lib/src/connection/helper.rs @@ -24,7 +24,7 @@ pub fn secret_key_binary(key: &str) -> [u8; 32] { pub fn time_now() -> u64 { let valid_at = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_millis(); u64::try_from(valid_at).unwrap_or_else(|_| { - log::warn!("could not convert time at u128 to u64."); + tracing::warn!("could not convert time at u128 to u64."); u64::MAX }) } diff --git a/clients/stellar-relay-lib/src/connection/hmac.rs b/clients/stellar-relay-lib/src/connection/hmac.rs index 8deb5b89b..6655229ea 100644 --- a/clients/stellar-relay-lib/src/connection/hmac.rs +++ b/clients/stellar-relay-lib/src/connection/hmac.rs @@ -2,6 +2,7 @@ use crate::Error; use hmac::{Hmac, Mac}; use sha2::Sha256; use substrate_stellar_sdk::types::{HmacSha256Mac, Uint256}; +use tracing::warn; pub struct HMacKeys { sending: HmacSha256Mac, @@ -95,13 +96,13 @@ pub fn create_sha256_hmac(data_buffer: &[u8], mac_key_buffer: &Buffer) -> Option return match hmac_vec.try_into() { Ok(mac) => Some(HmacSha256Mac { mac }), Err(_) => { - log::warn!("failed to convert hmac of size {} into an array of 32.", hmac_vec_len); + warn!("failed to convert hmac of size {} into an array of 32.", hmac_vec_len); None }, } } - log::warn!("Invalid length of mac key buffer size {}", mac_key_buffer.len()); + warn!("Invalid length of mac key buffer size {}", mac_key_buffer.len()); None } diff --git a/clients/stellar-relay-lib/src/connection/xdr_converter.rs b/clients/stellar-relay-lib/src/connection/xdr_converter.rs index 9ff390188..97c5698ff 100644 --- a/clients/stellar-relay-lib/src/connection/xdr_converter.rs +++ b/clients/stellar-relay-lib/src/connection/xdr_converter.rs @@ -108,7 +108,7 @@ fn message_to_bytes(message: &T) -> Result, Error> { } pub fn log_decode_error(source: &str, error: T) -> Error { - log::error!("decode error: {:?}", error); + tracing::error!("decode error: {:?}", error); Error::DecodeError(source.to_string()) } diff --git a/clients/stellar-relay-lib/src/overlay.rs b/clients/stellar-relay-lib/src/overlay.rs index c311fa29e..f8a8c844a 100644 --- a/clients/stellar-relay-lib/src/overlay.rs +++ b/clients/stellar-relay-lib/src/overlay.rs @@ -6,6 +6,7 @@ use tokio::sync::{ Sender, }, }; +use tracing::{error, info}; use crate::{ connection::{poll_messages_from_stellar, ConnectionInfo, Connector}, @@ -34,7 +35,7 @@ impl StellarOverlayConnection { local_node_info: NodeInfo, conn_info: ConnectionInfo, ) -> Result { - log::info!("connect(): connecting to {conn_info:?}"); + info!("connect(): connecting to {conn_info:?}"); // this is a channel to communicate with the user/caller. let (send_to_user_sender, send_to_user_receiver) = mpsc::channel::(1024); @@ -63,7 +64,7 @@ impl StellarOverlayConnection { match self.receiver.try_recv() { Ok(StellarMessage::ErrorMsg(e)) => { - log::error!("listen(): received error message: {e:?}"); + error!("listen(): received error message: {e:?}"); if e.code == ErrorCode::ErrConf || e.code == ErrorCode::ErrAuth { return Err(Error::ConnectionFailed(error_to_string(e))) } @@ -88,7 +89,7 @@ impl StellarOverlayConnection { } pub fn stop(&mut self) { - log::info!("stop(): closing connection to overlay network"); + info!("stop(): closing connection to overlay network"); self.receiver.close(); } } diff --git a/clients/vault/Cargo.toml b/clients/vault/Cargo.toml index cf890271f..a8223e584 100644 --- a/clients/vault/Cargo.toml +++ b/clients/vault/Cargo.toml @@ -18,6 +18,8 @@ parachain-metadata-pendulum = ["runtime/parachain-metadata-pendulum"] parachain-metadata-amplitude = ["runtime/parachain-metadata-amplitude"] parachain-metadata-foucoco = ["runtime/parachain-metadata-foucoco"] integration-test = ["integration", "standalone-metadata"] +allow-debugger = ["console-subscriber"] + [dependencies] async-std = "1.12.0" @@ -43,9 +45,12 @@ signal-hook = "0.3.14" signal-hook-tokio = { version = "0.3.1", features = ["futures-v0_3"] } sysinfo = "0.26.1" thiserror = "1.0" -tokio = { version = "1.0", features = ["full"] } + +# tokio related +tokio = { version = "1.37", features = ["full", "tracing"] } tokio-metrics = { version = "0.1.0", default-features = false } tokio-stream = { version = "0.1.9", features = ["sync"] } +console-subscriber = { version = "0.2.0", optional = true } tracing = { version = "0.1", features = ["log"] } tracing-futures = { version = "0.2.5" } diff --git a/clients/vault/src/main.rs b/clients/vault/src/main.rs index 55939190e..dc8840179 100644 --- a/clients/vault/src/main.rs +++ b/clients/vault/src/main.rs @@ -147,6 +147,9 @@ async fn start() -> Result<(), ServiceError> { #[tokio::main] async fn main() { + #[cfg(feature = "allow-debugger")] + console_subscriber::init(); + let exit_code = if let Err(err) = start().await { tracing::error!("Exiting: {}", err); 1 diff --git a/clients/vault/src/oracle/collector/collector.rs b/clients/vault/src/oracle/collector/collector.rs index c17879d4f..52999145c 100644 --- a/clients/vault/src/oracle/collector/collector.rs +++ b/clients/vault/src/oracle/collector/collector.rs @@ -1,6 +1,7 @@ use std::{default::Default, sync::Arc}; use parking_lot::{lock_api::RwLockReadGuard, RawRwLock, RwLock}; +use stellar_relay_lib::helper::to_base64_xdr_string; use stellar_relay_lib::sdk::{ network::{Network, PUBLIC_NETWORK, TEST_NETWORK}, @@ -156,7 +157,8 @@ impl ScpMessageCollector { } else { tracing::debug!("Collecting SCPEnvelopes for slot {slot}: success"); tracing::trace!( - "Collecting SCPEnvelopes for slot {slot}: the scp envelope: {scp_envelope:?}" + "Collecting SCPEnvelopes for slot {slot}: the scp envelope: {}", + to_base64_xdr_string(&scp_envelope.statement) ); envelopes_map.insert(slot, vec![scp_envelope]); } @@ -166,8 +168,10 @@ impl ScpMessageCollector { // save the mapping of the hash of the txset and the slot. let mut m = self.txset_and_slot_map.write(); tracing::debug!("Collecting TxSet for slot {slot}: saving a map of txset_hash..."); - let hash = hex::encode(&txset_hash); - tracing::trace!("Collecting TxSet for slot {slot}: the txset_hash: {hash}"); + tracing::trace!( + "Collecting TxSet for slot {slot}: the txset_hash: {}", + hex::encode(&txset_hash) + ); m.insert(txset_hash, slot); } @@ -187,7 +191,6 @@ impl ScpMessageCollector { let mut map_write = self.txset_and_slot_map.write(); map_write.remove_by_txset_hash(&hash).map(|slot| { tracing::debug!("Collecting TxSet for slot {slot}: txset saved."); - tracing::trace!("Collecting TxSet for slot {slot}: {tx_set:?}"); self.txset_map.write().insert(slot, tx_set); slot }) diff --git a/clients/vault/src/oracle/collector/handler.rs b/clients/vault/src/oracle/collector/handler.rs index 8b788381f..3eb7c3c7f 100644 --- a/clients/vault/src/oracle/collector/handler.rs +++ b/clients/vault/src/oracle/collector/handler.rs @@ -3,7 +3,10 @@ use crate::oracle::{ errors::Error, types::StellarMessageSender, }; -use stellar_relay_lib::sdk::types::{ScpEnvelope, ScpStatementPledges, StellarMessage}; +use stellar_relay_lib::{ + helper::to_base64_xdr_string, + sdk::types::{ScpEnvelope, ScpStatementPledges, StellarMessage}, +}; // Handling SCPEnvelopes impl ScpMessageCollector { @@ -23,7 +26,8 @@ impl ScpMessageCollector { // we are only interested with `ScpStExternalize`. Other messages are ignored. if let ScpStatementPledges::ScpStExternalize(stmt) = &env.statement.pledges { tracing::trace!( - "Handling Incoming ScpEnvelopes for slot {slot}: SCPStExternalize found: {stmt:?}" + "Handling Incoming ScpEnvelopes for slot {slot}: SCPStExternalize found: {}", + to_base64_xdr_string(stmt) ); // set the last scpenvenvelope with ScpStExternalize message self.set_last_slot_index(slot); diff --git a/clients/vault/src/system.rs b/clients/vault/src/system.rs index 8caca0976..465d3ab29 100644 --- a/clients/vault/src/system.rs +++ b/clients/vault/src/system.rs @@ -291,6 +291,9 @@ impl Service for VaultService { async fn start(&mut self) -> Result<(), ServiceError> { let result = self.run_service().await; + + self.try_shutdown_wallet().await; + if let Err(error) = result { let _ = self.shutdown.send(()); Err(error) @@ -679,15 +682,6 @@ impl VaultService { tasks.append(&mut replace_tasks); - tasks.push(( - "Parachain Block Listener", - run(active_block_listener( - self.spacewalk_parachain.clone(), - issue_event_tx, - replace_event_tx, - )), - )); - tasks.push(( "Redeem Request Listener", run(listen_for_redeem_requests( @@ -776,7 +770,7 @@ impl VaultService { // purposefully _after_ register_vault_if_not_present and _before_ other calls self.vault_id_manager.fetch_vault_ids().await?; - let wallet = self.stellar_wallet.write().await; + let mut wallet = self.stellar_wallet.write().await; let vault_public_key = wallet.public_key(); let is_public_network = wallet.is_public_network(); @@ -940,4 +934,12 @@ impl VaultService { tracing::info!("Got new block at height {startup_height}"); Ok(startup_height) } + + /// shuts down the resubmission task running in the background + async fn try_shutdown_wallet(&self) { + tracing::info!("try_shutdown_wallet(): stop the resubmission scheduler"); + let mut wallet = self.stellar_wallet.write().await; + wallet.try_stop_periodic_resubmission_of_transactions().await; + drop(wallet); + } } diff --git a/clients/wallet/Cargo.toml b/clients/wallet/Cargo.toml index cb1354163..ce988d528 100644 --- a/clients/wallet/Cargo.toml +++ b/clients/wallet/Cargo.toml @@ -18,7 +18,7 @@ reqwest = { version = "0.11", features = ["json"] } serde = "1.0.136" serde_json = { version = '1.0.45', default-features = false, features = ['alloc'] } thiserror = "1.0" -tokio = { version = "1.0", features = ["full"] } +tokio = { version = "1.37", features = ["full", "tracing"] } tokio-metrics = { version = "0.1.0", default-features = false } tokio-stream = { version = "0.1.9", features = ["sync"] } tracing = { version = "0.1", features = ["log"] } diff --git a/clients/wallet/src/horizon/horizon.rs b/clients/wallet/src/horizon/horizon.rs index 6643cf552..bd14e9ed8 100644 --- a/clients/wallet/src/horizon/horizon.rs +++ b/clients/wallet/src/horizon/horizon.rs @@ -270,7 +270,6 @@ impl HorizonFetcher { future::join(issue_map.read(), memos_to_issue_ids.read()).await; if issue_map.is_empty() || memos_to_issue_ids.is_empty() { - tracing::debug!("fetch_horizon_and_process_new_transactions(): nothing to traverse"); return Ok(last_cursor) } let mut txs_iter = self.fetch_transactions_iter(last_cursor).await?; diff --git a/clients/wallet/src/resubmissions.rs b/clients/wallet/src/resubmissions.rs index 16606988c..4e44b388e 100644 --- a/clients/wallet/src/resubmissions.rs +++ b/clients/wallet/src/resubmissions.rs @@ -12,7 +12,8 @@ use primitives::{ TransactionEnvelopeExt, }; use std::time::Duration; -use tokio::time::sleep; +use tokio::{sync::mpsc, time::sleep}; +use tracing::{debug, error, info, trace, warn}; use crate::horizon::responses::TransactionsResponseIter; #[cfg(test)] @@ -26,10 +27,26 @@ const MAXIMUM_TX_FEE: u32 = 10_000_000; // 1 XLM #[cfg_attr(test, mockable)] impl StellarWallet { + /// sends a signal to stop the resubmission task + pub async fn try_stop_periodic_resubmission_of_transactions(&mut self) { + match &self.resubmission_end_signal { + None => { + debug!("try_stop_periodic_resubmission_of_transactions(): no schedule to stop"); + }, + Some(sender) => + if let Err(e) = sender.send(()).await { + warn!("try_stop_periodic_resubmission_of_transactions(): failed to send a stop message to scheduler: {e:?}"); + }, + } + } + /// reads in storage the failed (but recoverable) transactions and submit again to Stellar. pub async fn start_periodic_resubmission_of_transactions_from_cache( - &self, + &mut self, interval_in_seconds: u64, ) { + // to make sure we don't leave the thread idle, use this channel to properly shut it down. + let (sender, mut receiver) = mpsc::channel(2); + // Perform the resubmission self._resubmit_transactions_from_cache().await; @@ -40,11 +57,18 @@ impl StellarWallet { tokio::spawn(async move { let me_clone = Arc::clone(&me); loop { - pause_process_in_secs(interval_in_seconds).await; + // a shutdown message was sent. Stop the loop. + if let Some(_) = receiver.recv().await { + debug!("start_periodic_resubmission_of_transactions_from_cache(): scheduler stopped."); + break; + } + pause_process_in_secs(interval_in_seconds).await; me_clone._resubmit_transactions_from_cache().await; } }); + + self.resubmission_end_signal = Some(sender) } #[doc(hidden)] @@ -57,16 +81,12 @@ impl StellarWallet { Ok((envs, errors)) => { // Log those with errors. if !errors.is_empty() { - tracing::warn!( - "_resubmit_transactions_from_cache(): errors from cache: {errors:?}" - ); + warn!("_resubmit_transactions_from_cache(): errors from cache: {errors:?}"); } envs }, Err(errors) => { - tracing::warn!( - "_resubmit_transactions_from_cache(): errors from cache: {errors:?}" - ); + warn!("_resubmit_transactions_from_cache(): errors from cache: {errors:?}"); return }, }; @@ -80,7 +100,7 @@ impl StellarWallet { if envelopes.is_empty() { return } - tracing::info!( + info!( "_resubmit_transactions_from_cache(): resubmitting {:?} envelopes in cache...", envelopes.len() ); @@ -89,7 +109,7 @@ impl StellarWallet { // loop through the envelopes and resubmit each one for envelope in envelopes { if let Err(e) = submit(envelope.clone()).await { - tracing::debug!("_resubmit_transactions_from_cache(): encountered error: {e:?}"); + debug!("_resubmit_transactions_from_cache(): encountered error: {e:?}"); // save the kind of error and the envelope that failed error_collector.push((e, envelope)); } @@ -119,7 +139,7 @@ impl StellarWallet { match self.handle_error(error).await { // a new kind of error occurred. Process it on the next loop. Err(e) => { - tracing::error!("handle_errors(): new error occurred: {e:?}"); + error!("handle_errors(): new error occurred: {e:?}"); // push the transaction that failed, and the corresponding error errors.push((e, env)); @@ -131,7 +151,7 @@ impl StellarWallet { // Resubmission was successful Ok(Some(resp)) => - tracing::debug!("handle_errors(): successfully processed envelope: {resp:?}"), + debug!("handle_errors(): successfully processed envelope: {resp:?}"), } } } @@ -159,9 +179,7 @@ impl StellarWallet { self.remove_tx_envelope_from_cache(&env); }; - tracing::error!( - "handle_error(): Unrecoverable HorizonSubmissionError: {error:?}" - ); + error!("handle_error(): Unrecoverable HorizonSubmissionError: {error:?}"); }, }, Error::CacheError(CacheError { @@ -176,9 +194,9 @@ impl StellarWallet { .map(Some) } - tracing::warn!("handle_error(): SequenceNumberAlreadyUsed error but no envelope"); + warn!("handle_error(): SequenceNumberAlreadyUsed error but no envelope"); }, - _ => tracing::warn!("handle_error(): Unrecoverable error in Stellar wallet: {error:?}"), + _ => warn!("handle_error(): Unrecoverable error in Stellar wallet: {error:?}"), } // the error found is not recoverable, and cannot be resubmitted again. @@ -221,7 +239,7 @@ impl StellarWallet { return self.bump_sequence_number_and_submit(tx).await } - tracing::error!("handle_tx_insufficient_fee_error(): Similar transaction already submitted. Skipping {:?}", tx); + error!("handle_tx_insufficient_fee_error(): Similar transaction already submitted. Skipping {:?}", tx); Err(ResubmissionError("Transaction already submitted".to_string())) } @@ -271,7 +289,7 @@ fn _check_transaction_match( } let Ok(source_account_sequence) = tx_resp.source_account_sequence() else { - tracing::warn!("_check_transaction_match(): cannot extract sequence number of transaction response: {tx_resp:?}"); + warn!("_check_transaction_match(): cannot extract sequence number of transaction response: {tx_resp:?}"); return Err(None) }; @@ -384,7 +402,7 @@ impl StellarWallet { return self.bump_sequence_number_and_submit(tx).await } - tracing::error!("handle_tx_bad_seq_error_with_envelope(): Similar transaction already submitted. Skipping {:?}", tx); + error!("handle_tx_bad_seq_error_with_envelope(): Similar transaction already submitted. Skipping {:?}", tx); Err(ResubmissionError("Transaction already submitted".to_string())) } @@ -399,12 +417,12 @@ impl StellarWallet { let old_tx_xdr = tx.to_base64_xdr(); let old_tx = String::from_utf8(old_tx_xdr.clone()).unwrap_or(format!("{old_tx_xdr:?}")); - tracing::trace!("bump_sequence_number_and_submit(): old transaction: {old_tx}"); + trace!("bump_sequence_number_and_submit(): old transaction: {old_tx}"); let updated_tx_xdr = updated_tx.to_base64_xdr(); let updated_tx_xdr = String::from_utf8(updated_tx_xdr.clone()).unwrap_or(format!("{updated_tx_xdr:?}")); - tracing::trace!("bump_sequence_number_and_submit(): new transaction: {updated_tx_xdr}"); + trace!("bump_sequence_number_and_submit(): new transaction: {updated_tx_xdr}"); let envelope = self.create_and_sign_envelope(updated_tx)?; self.submit_transaction(envelope).await @@ -419,7 +437,7 @@ impl StellarWallet { let mut iter = match self.get_all_transactions_iter().await { Ok(iter) => iter, Err(e) => { - tracing::warn!("is_transaction_already_submitted(): failed to get iterator: {e:?}"); + warn!("is_transaction_already_submitted(): failed to get iterator: {e:?}"); return false }, }; @@ -476,7 +494,7 @@ fn decode_to_envelope( envelope_xdr_as_str_opt: &Option, ) -> Result { let Some(envelope_xdr) = envelope_xdr_as_str_opt else { - tracing::warn!("handle_error(): no envelope_xdr found"); + warn!("handle_error(): no envelope_xdr found"); return Err(ResubmissionError("no envelope_xdr".to_string())) }; @@ -908,7 +926,7 @@ mod test { let wallet = wallet_with_storage("resources/resubmit_transactions_works") .expect("should return a wallet") .clone(); - let wallet = wallet.write().await; + let mut wallet = wallet.write().await; let seq_number = wallet.get_sequence().await.expect("should return a sequence"); @@ -971,7 +989,7 @@ mod test { .mock_safe(move |_, _| MockResult::Return(Box::pin(async move { false }))); // let's resubmit these 3 transactions - let _ = wallet.start_periodic_resubmission_of_transactions_from_cache(60).await; + wallet.start_periodic_resubmission_of_transactions_from_cache(60).await; // We wait until the whole cache is empty because eventually all transactions should be // handled @@ -989,6 +1007,8 @@ mod test { } } + // shutdown the thread properly + wallet.try_stop_periodic_resubmission_of_transactions().await; wallet.remove_cache_dir(); } } diff --git a/clients/wallet/src/stellar_wallet.rs b/clients/wallet/src/stellar_wallet.rs index cffe13d0b..5af7de985 100644 --- a/clients/wallet/src/stellar_wallet.rs +++ b/clients/wallet/src/stellar_wallet.rs @@ -8,7 +8,7 @@ use primitives::stellar::{ Asset as StellarAsset, Operation, PublicKey, SecretKey, StellarTypeToString, Transaction, TransactionEnvelope, }; -use tokio::sync::Mutex; +use tokio::sync::{mpsc, Mutex}; use crate::{ cache::WalletStateStorage, @@ -54,6 +54,9 @@ pub struct StellarWallet { /// a client to connect to Horizon pub(crate) client: Client, + + /// a sender to 'stop' a scheduled resubmission task + pub(crate) resubmission_end_signal: Option>, } impl StellarWallet { @@ -121,6 +124,7 @@ impl StellarWallet { max_retry_attempts_before_fallback: Self::DEFAULT_MAX_RETRY_ATTEMPTS_BEFORE_FALLBACK, max_backoff_delay: Self::DEFAULT_MAX_BACKOFF_DELAY_IN_SECS, client, + resubmission_end_signal: None, }) }