diff --git a/Cargo.lock b/Cargo.lock index c01ffbeb9ed9..1df0481b25ed 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1619,6 +1619,15 @@ version = "1.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e78d4f1cc4ae33bbfc157ed5d5a5ef3bc29227303d595861deb238fcec4e9457" +[[package]] +name = "encoding_rs" +version = "0.8.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7896dc8abb250ffdda33912550faa54c88ec8b998dec0b2c55ab224921ce11df" +dependencies = [ + "cfg-if 1.0.0", +] + [[package]] name = "enum-as-inner" version = "0.3.3" @@ -1876,6 +1885,21 @@ version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" +[[package]] +name = "foreign-types" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f6f339eb8adc052cd2ca78910fda869aefa38d22d5cb648e6485e4d3fc06f3b1" +dependencies = [ + "foreign-types-shared", +] + +[[package]] +name = "foreign-types-shared" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b" + [[package]] name = "fork-tree" version = "3.0.0" @@ -2684,6 +2708,19 @@ dependencies = [ "webpki", ] +[[package]] +name = "hyper-tls" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d6183ddfa99b85da61a140bea0efc93fdf56ceaa041b37d553518030827f9905" +dependencies = [ + "bytes 1.1.0", + "hyper", + "native-tls", + "tokio", + "tokio-native-tls", +] + [[package]] name = "idna" version = "0.1.5" @@ -2856,7 +2893,7 @@ dependencies = [ "socket2 0.3.19", "widestring", "winapi 0.3.9", - "winreg", + "winreg 0.6.2", ] [[package]] @@ -4130,6 +4167,12 @@ dependencies = [ "thrift", ] +[[package]] +name = "mime" +version = "0.3.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2a60c7ce501c71e03a9c9c0d35b861413ae925bd979cc7a4e30d060069aaac8d" + [[package]] name = "minimal-lexical" version = "0.2.1" @@ -4348,6 +4391,24 @@ dependencies = [ "rand 0.8.4", ] +[[package]] +name = "native-tls" +version = "0.2.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "48ba9f7719b5a0f42f338907614285fb5fd70e53858141f69898a1fb7203b24d" +dependencies = [ + "lazy_static", + "libc", + "log", + "openssl", + "openssl-probe", + "openssl-sys", + "schannel", + "security-framework", + "security-framework-sys", + "tempfile", +] + [[package]] name = "net2" version = "0.2.37" @@ -4539,12 +4600,39 @@ dependencies = [ "syn", ] +[[package]] +name = "openssl" +version = "0.10.38" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c7ae222234c30df141154f159066c5093ff73b63204dcda7121eb082fc56a95" +dependencies = [ + "bitflags", + "cfg-if 1.0.0", + "foreign-types", + "libc", + "once_cell", + "openssl-sys", +] + [[package]] name = "openssl-probe" version = "0.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "28988d872ab76095a6e6ac88d99b54fd267702734fd7ffe610ca27f533ddb95a" +[[package]] +name = "openssl-sys" +version = "0.9.72" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7e46109c383602735fa0a2e48dd2b7c892b048e1bf69e5c3b1d804b7d9c203cb" +dependencies = [ + "autocfg", + "cc", + "libc", + "pkg-config", + "vcpkg", +] + [[package]] name = "ordered-float" version = "1.1.1" @@ -7854,6 +7942,41 @@ dependencies = [ "winapi 0.3.9", ] +[[package]] +name = "reqwest" +version = "0.11.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7c4e0a76dc12a116108933f6301b95e83634e0c47b0afbed6abbaa0601e99258" +dependencies = [ + "base64", + "bytes 1.1.0", + "encoding_rs", + "futures-core", + "futures-util", + "http", + "http-body", + "hyper", + "hyper-tls", + "ipnet", + "js-sys", + "lazy_static", + "log", + "mime", + "native-tls", + "percent-encoding 2.1.0", + "pin-project-lite 0.2.7", + "serde", + "serde_json", + "serde_urlencoded", + "tokio", + "tokio-native-tls", + "url 2.2.2", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", + "winreg 0.7.0", +] + [[package]] name = "resolv-conf" version = "0.7.0" @@ -9253,6 +9376,18 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_urlencoded" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "edfa57a7f8d9c1d260a549e7224100f6c43d43f9103e06dd8b4095a9b2b43ce9" +dependencies = [ + "form_urlencoded", + "itoa 0.4.8", + "ryu", + "serde", +] + [[package]] name = "sha-1" version = "0.8.2" @@ -10811,6 +10946,16 @@ dependencies = [ "syn", ] +[[package]] +name = "tokio-native-tls" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f7d995660bd2b7f8c1568414c1126076c13fbb725c40112dc0120b78eb9b717b" +dependencies = [ + "native-tls", + "tokio", +] + [[package]] name = "tokio-rustls" version = "0.22.0" @@ -10833,6 +10978,18 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-tungstenite" +version = "0.16.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e80b39df6afcc12cdf752398ade96a6b9e99c903dfdc36e53ad10b9c366bca72" +dependencies = [ + "futures-util", + "log", + "tokio", + "tungstenite", +] + [[package]] name = "tokio-util" version = "0.6.9" @@ -11066,6 +11223,25 @@ version = "1.0.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5e66dcbec4290c69dd03c57e76c2469ea5c7ce109c6dd4351c13055cf71ea055" +[[package]] +name = "tungstenite" +version = "0.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6ad3713a14ae247f22a728a0456a545df14acf3867f905adff84be99e23b3ad1" +dependencies = [ + "base64", + "byteorder", + "bytes 1.1.0", + "http", + "httparse", + "log", + "rand 0.8.4", + "sha-1 0.9.8", + "thiserror", + "url 2.2.2", + "utf-8", +] + [[package]] name = "twox-hash" version = "1.6.1" @@ -11212,6 +11388,12 @@ dependencies = [ "percent-encoding 2.1.0", ] +[[package]] +name = "utf-8" +version = "0.7.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9" + [[package]] name = "value-bag" version = "1.0.0-alpha.8" @@ -11782,6 +11964,15 @@ dependencies = [ "winapi 0.3.9", ] +[[package]] +name = "winreg" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0120db82e8a1e0b9fb3345a539c478767c0048d842860994d96113d5b667bd69" +dependencies = [ + "winapi 0.3.9", +] + [[package]] name = "ws2_32-sys" version = "0.2.1" @@ -11989,6 +12180,23 @@ dependencies = [ "synstructure", ] +[[package]] +name = "zombienet-backchannel" +version = "0.9.13" +dependencies = [ + "futures-util", + "lazy_static", + "parity-scale-codec", + "reqwest", + "serde", + "serde_json", + "thiserror", + "tokio", + "tokio-tungstenite", + "tracing", + "url 2.2.2", +] + [[package]] name = "zstd" version = "0.9.1+zstd.1.5.1" diff --git a/Cargo.toml b/Cargo.toml index b46812cc1d16..ee1e6cd3c363 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -91,6 +91,7 @@ members = [ "node/test/polkadot-simnet/common", "node/test/polkadot-simnet/node", "node/test/polkadot-simnet/test", + "node/zombienet-backchannel", "parachain/test-parachains", "parachain/test-parachains/adder", "parachain/test-parachains/adder/collator", diff --git a/node/zombienet-backchannel/Cargo.toml b/node/zombienet-backchannel/Cargo.toml new file mode 100644 index 000000000000..eeff7b4e9835 --- /dev/null +++ b/node/zombienet-backchannel/Cargo.toml @@ -0,0 +1,22 @@ +[package] +name = "zombienet-backchannel" +description = "Zombienet backchannel to notify test runner and coordinate with malus actors." +license = "GPL-3.0-only" +version = "0.9.13" +authors = ["Parity Technologies "] +edition = "2021" +readme = "README.md" +publish = false + +[dependencies] +tokio = { version = "1.0.0", default-features = false, features = ["macros", "net", "rt-multi-thread", "sync"] } +url = "2.0.0" +tokio-tungstenite = "0.16" +futures-util = "0.3.18" +lazy_static = "1.4.0" +parity-scale-codec = { version = "2.3.1", features = ["derive"] } +reqwest = "0.11" +thiserror = "1.0.30" +tracing = "0.1.26" +serde = { version = "1.0", features = ["derive"] } +serde_json = "1" diff --git a/node/zombienet-backchannel/src/errors.rs b/node/zombienet-backchannel/src/errors.rs new file mode 100644 index 000000000000..d6ce76c9347c --- /dev/null +++ b/node/zombienet-backchannel/src/errors.rs @@ -0,0 +1,39 @@ +// Copyright 2021 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +//! Polkadot Zombienet Backchannel error definitions. + +#[derive(Debug, thiserror::Error)] +#[allow(missing_docs)] +pub enum BackchannelError { + #[error("Error connecting websocket server")] + CantConnectToWS, + + #[error("Backchannel not initialized yet")] + Uninitialized, + + #[error("Backchannel already initialized")] + AlreadyInitialized, + + #[error("Error sending new value to backchannel")] + SendItemFail, + + #[error("Invalid host for connection backchannel")] + InvalidHost, + + #[error("Invalid port for connection backchannel")] + InvalidPort, +} diff --git a/node/zombienet-backchannel/src/lib.rs b/node/zombienet-backchannel/src/lib.rs new file mode 100644 index 000000000000..c489ee0e7f96 --- /dev/null +++ b/node/zombienet-backchannel/src/lib.rs @@ -0,0 +1,159 @@ +// Copyright 2017-2021 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +//! Provides the possibility to coordination between malicious actors and +//! the zombienet test-runner, allowing to reference runtime's generated +//! values in the test specifications, through a bidirectional message passing +//! implemented as a `backchannel`. + +use futures_util::{SinkExt, StreamExt}; +use lazy_static::lazy_static; +use parity_scale_codec as codec; +use serde::{Deserialize, Serialize}; +use std::{env, sync::Mutex}; +use tokio::sync::broadcast; +use tokio_tungstenite::{connect_async, tungstenite::protocol::Message}; + +mod errors; +use errors::BackchannelError; + +lazy_static! { + pub static ref ZOMBIENET_BACKCHANNEL: Mutex> = Mutex::new(None); +} + +#[derive(Debug)] +pub struct ZombienetBackchannel { + broadcast_tx: broadcast::Sender, + ws_tx: broadcast::Sender, +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct BackchannelItem { + key: String, + value: String, +} + +pub struct Broadcaster; + +pub const ZOMBIENET: &str = "🧟ZOMBIENET🧟"; + +impl Broadcaster { + /// Return a subscriber that will receive all message broadcasted by the zombienet backchannel + /// websocket server. + pub fn subscribe(&self) -> Result, BackchannelError> { + let mut zombienet_bkc = ZOMBIENET_BACKCHANNEL.lock().unwrap(); + let zombienet_bkc = zombienet_bkc.as_mut().ok_or(BackchannelError::Uninitialized)?; + let sender = zombienet_bkc.broadcast_tx.clone(); + Ok(sender.subscribe()) + } + + /// Provides a simple API to send a key/value to the zombienet websocket server. + pub async fn send( + &mut self, + key: &'static str, + val: impl codec::Encode, + ) -> Result<(), BackchannelError> { + let mut zombienet_bkc = ZOMBIENET_BACKCHANNEL.lock().unwrap(); + let zombienet_bkc = zombienet_bkc.as_mut().ok_or(BackchannelError::Uninitialized)?; + + let encoded = val.encode(); + let backchannel_item = BackchannelItem { + key: key.to_string(), + value: String::from_utf8_lossy(&encoded).to_string(), + }; + + let sender = zombienet_bkc.ws_tx.clone(); + sender.send(backchannel_item).map_err(|e| { + tracing::error!(target = ZOMBIENET, "Error sending new item: {}", e); + BackchannelError::SendItemFail + })?; + + Ok(()) + } +} + +impl ZombienetBackchannel { + pub async fn init() -> Result<(), BackchannelError> { + let mut zombienet_bkc = ZOMBIENET_BACKCHANNEL.lock().unwrap(); + if zombienet_bkc.is_none() { + let backchannel_host = + env::var("BACKCHANNEL_HOST").unwrap_or_else(|_| "backchannel".to_string()); + let backchannel_port = + env::var("BACKCHANNEL_PORT").unwrap_or_else(|_| "3000".to_string()); + + // validate port + backchannel_port.parse::().map_err(|_| BackchannelError::InvalidPort)?; + // validate non empty string for host + if backchannel_host.trim().is_empty() { + return Err(BackchannelError::InvalidHost) + }; + + let ws_url = format!("ws://{}:{}/ws", backchannel_host, backchannel_port); + tracing::debug!(target = ZOMBIENET, "Connecting to : {}", &ws_url); + let (ws_stream, _) = + connect_async(ws_url).await.map_err(|_| BackchannelError::CantConnectToWS)?; + let (mut write, mut read) = ws_stream.split(); + + let (tx, _rx) = broadcast::channel(256); + let (tx_relay, mut rx_relay) = broadcast::channel::(256); + + // receive from the ws and send to all subcribers + let tx1 = tx.clone(); + tokio::spawn(async move { + while let Some(Ok(Message::Text(text))) = read.next().await { + match serde_json::from_str::(&text) { + Ok(backchannel_item) => + if tx1.send(backchannel_item).is_err() { + tracing::error!("Error sending through the channel"); + return + }, + Err(_) => { + tracing::error!("Invalid payload received"); + }, + } + } + }); + + // receive from subscribers and relay to ws + tokio::spawn(async move { + while let Ok(item) = rx_relay.recv().await { + if write + .send(Message::Text(serde_json::to_string(&item).unwrap())) + .await + .is_err() + { + tracing::error!("Error sending through ws"); + } + } + }); + + *zombienet_bkc = Some(ZombienetBackchannel { broadcast_tx: tx, ws_tx: tx_relay }); + return Ok(()) + } + + Err(BackchannelError::AlreadyInitialized) + } + + /// Ensure that the backchannel is initialized and return a broadcaster instance + /// allowing to subscribe or send new items. + pub fn broadcaster() -> Result { + if ZOMBIENET_BACKCHANNEL.lock().unwrap().is_some() { + Ok(Broadcaster {}) + } else { + Err(BackchannelError::Uninitialized) + } + } +}