From ef8278bb145698f8751bf594133636edcdb66dc3 Mon Sep 17 00:00:00 2001 From: Phuong Nguyen Date: Mon, 5 Aug 2024 23:05:26 +0900 Subject: [PATCH 01/11] Added id previews --- chain-signatures/contract/src/config/impls.rs | 2 + chain-signatures/contract/src/config/mod.rs | 4 + chain-signatures/node/src/cli.rs | 18 ++-- chain-signatures/node/src/mesh/connection.rs | 28 ++++-- chain-signatures/node/src/mesh/mod.rs | 16 ++- .../node/src/protocol/consensus.rs | 8 +- .../node/src/protocol/cryptography.rs | 34 ++++--- chain-signatures/node/src/protocol/message.rs | 4 +- chain-signatures/node/src/protocol/mod.rs | 47 ++++----- .../node/src/protocol/presignature.rs | 99 ++++++++++++++----- .../node/src/protocol/signature.rs | 85 ++++++++++------ chain-signatures/node/src/protocol/state.rs | 34 ++++++- chain-signatures/node/src/protocol/triple.rs | 24 ++++- chain-signatures/node/src/web/mod.rs | 64 +++++++++++- 14 files changed, 351 insertions(+), 116 deletions(-) diff --git a/chain-signatures/contract/src/config/impls.rs b/chain-signatures/contract/src/config/impls.rs index 36c7d5b58..ad8f9823d 100644 --- a/chain-signatures/contract/src/config/impls.rs +++ b/chain-signatures/contract/src/config/impls.rs @@ -46,6 +46,7 @@ impl Default for TripleConfig { min_triples: 1024, max_triples: 1024 * MAX_EXPECTED_PARTICIPANTS * NETWORK_MULTIPLIER, generation_timeout: min_to_ms(10), + preview_limit: 128, other: Default::default(), } @@ -58,6 +59,7 @@ impl Default for PresignatureConfig { min_presignatures: 512, max_presignatures: 512 * MAX_EXPECTED_PARTICIPANTS * NETWORK_MULTIPLIER, generation_timeout: secs_to_ms(45), + preview_limit: 128, other: Default::default(), } diff --git a/chain-signatures/contract/src/config/mod.rs b/chain-signatures/contract/src/config/mod.rs index 682e2408f..51ad11cb5 100644 --- a/chain-signatures/contract/src/config/mod.rs +++ b/chain-signatures/contract/src/config/mod.rs @@ -59,6 +59,8 @@ pub struct TripleConfig { pub max_triples: u32, /// Timeout for triple generation in milliseconds. pub generation_timeout: u64, + /// Max amount of Triple IDs allowed to be previewed in state. + pub preview_limit: u32, /// The remaining entries that can be present in future forms of the configuration. #[serde(flatten)] @@ -73,6 +75,8 @@ pub struct PresignatureConfig { pub max_presignatures: u32, /// Timeout for presignature generation in milliseconds. pub generation_timeout: u64, + /// Max amount of Presignature IDs allowed to be previewed in state. + pub preview_limit: u32, /// The remaining entries that can be present in future forms of the configuration. #[serde(flatten)] diff --git a/chain-signatures/node/src/cli.rs b/chain-signatures/node/src/cli.rs index e4445c9c5..0bd35fd91 100644 --- a/chain-signatures/node/src/cli.rs +++ b/chain-signatures/node/src/cli.rs @@ -211,6 +211,14 @@ pub fn run(cmd: Cli) -> anyhow::Result<()> { client_headers.insert(http::header::REFERER, referer_param.parse().unwrap()); } + let config = Arc::new(RwLock::new(Config::new(LocalConfig { + over: override_config.unwrap_or_else(Default::default), + network: NetworkConfig { + cipher_pk: hpke::PublicKey::try_from_bytes(&hex::decode(cipher_pk)?)?, + sign_sk, + }, + }))); + tracing::debug!(rpc_addr = rpc_client.rpc_addr(), "rpc client initialized"); let signer = InMemorySigner::from_secret_key(account_id.clone(), account_sk); let (protocol, protocol_state) = MpcSignProtocol::init( @@ -223,13 +231,7 @@ pub fn run(cmd: Cli) -> anyhow::Result<()> { sign_queue, key_storage, triple_storage, - Config::new(LocalConfig { - over: override_config.unwrap_or_else(Default::default), - network: NetworkConfig { - cipher_pk: hpke::PublicKey::try_from_bytes(&hex::decode(cipher_pk)?)?, - sign_sk, - }, - }), + config.clone(), ); rt.block_on(async { @@ -238,7 +240,7 @@ pub fn run(cmd: Cli) -> anyhow::Result<()> { tracing::debug!("protocol thread spawned"); let cipher_sk = hpke::SecretKey::try_from_bytes(&hex::decode(cipher_sk)?)?; let web_handle = tokio::spawn(async move { - web::run(web_port, sender, cipher_sk, protocol_state, indexer).await + web::run(web_port, sender, cipher_sk, protocol_state, indexer, config).await }); tracing::debug!("protocol http server spawned"); diff --git a/chain-signatures/node/src/mesh/connection.rs b/chain-signatures/node/src/mesh/connection.rs index 498cb23c6..f47d78713 100644 --- a/chain-signatures/node/src/mesh/connection.rs +++ b/chain-signatures/node/src/mesh/connection.rs @@ -1,4 +1,4 @@ -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::time::{Duration, Instant}; use cait_sith::protocol::Participant; @@ -6,8 +6,10 @@ use tokio::sync::RwLock; use url::Url; use crate::protocol::contract::primitives::Participants; +use crate::protocol::presignature::PresignatureId; +use crate::protocol::triple::TripleId; use crate::protocol::ProtocolState; -use crate::web::StateView; +use crate::web::{StateParams, StateView}; const DEFAULT_TIMEOUT: Duration = Duration::from_secs(1); @@ -19,16 +21,19 @@ pub struct Pool { http: reqwest::Client, connections: RwLock, potential_connections: RwLock, - status: RwLock>, - /// The currently active participants for this epoch. current_active: RwLock>, // Potentially active participants that we can use to establish a connection in the next epoch. potential_active: RwLock>, + + pub status: RwLock>, } impl Pool { - pub async fn ping(&self) -> Participants { + pub async fn ping( + &mut self, + previews: Option<(HashSet, HashSet)>, + ) -> Participants { if let Some((ref active, timestamp)) = *self.current_active.read().await { if timestamp.elapsed() < DEFAULT_TIMEOUT { return active.clone(); @@ -37,6 +42,12 @@ impl Pool { let connections = self.connections.read().await; + let mut params = HashMap::new(); + if let Some((triples, presignatures)) = previews { + params.insert("triple_preview", triples); + params.insert("presignature_preview", presignatures); + } + let mut status = self.status.write().await; let mut participants = Participants::default(); for (participant, info) in connections.iter() { @@ -49,7 +60,7 @@ impl Pool { continue; }; - let Ok(resp) = self.http.get(url.clone()).send().await else { + let Ok(resp) = self.http.get(url.clone()).query(¶ms).send().await else { tracing::warn!( "Pool.ping resp err participant {:?} url {}", participant, @@ -77,7 +88,7 @@ impl Pool { participants } - pub async fn ping_potential(&self) -> Participants { + pub async fn ping_potential(&mut self) -> Participants { if let Some((ref active, timestamp)) = *self.potential_active.read().await { if timestamp.elapsed() < DEFAULT_TIMEOUT { return active.clone(); @@ -102,9 +113,10 @@ impl Pool { }; status.insert(*participant, state); + // self.status.insert(*participant, state); participants.insert(participant, info.clone()); } - drop(status); + // drop(status); let mut potential_active = self.potential_active.write().await; *potential_active = Some((participants.clone(), Instant::now())); diff --git a/chain-signatures/node/src/mesh/mod.rs b/chain-signatures/node/src/mesh/mod.rs index ae823face..bc1544d84 100644 --- a/chain-signatures/node/src/mesh/mod.rs +++ b/chain-signatures/node/src/mesh/mod.rs @@ -1,5 +1,12 @@ +use std::collections::{HashMap, HashSet}; + +use cait_sith::protocol::Participant; + use crate::protocol::contract::primitives::Participants; +use crate::protocol::presignature::PresignatureId; +use crate::protocol::triple::TripleId; use crate::protocol::ProtocolState; +use crate::web::StateView; pub mod connection; @@ -64,16 +71,19 @@ impl Mesh { self.connections .establish_participants(contract_state) .await; - self.ping().await; } /// Ping the active participants such that we can see who is alive. - pub async fn ping(&mut self) { - self.active_participants = self.connections.ping().await; + pub async fn ping(&mut self, previews: Option<(HashSet, HashSet)>) { + self.active_participants = self.connections.ping(previews).await; tracing::debug!( "Mesh.ping set active participants to {:?}", self.active_participants.keys_vec() ); self.active_potential_participants = self.connections.ping_potential().await; } + + pub async fn state_views(&self) -> HashMap { + self.connections.status.read().await.clone() + } } diff --git a/chain-signatures/node/src/protocol/consensus.rs b/chain-signatures/node/src/protocol/consensus.rs index 01e6130b1..7818a102d 100644 --- a/chain-signatures/node/src/protocol/consensus.rs +++ b/chain-signatures/node/src/protocol/consensus.rs @@ -31,6 +31,7 @@ use url::Url; use near_account_id::AccountId; use near_crypto::InMemorySigner; +#[async_trait::async_trait] pub trait ConsensusCtx { fn my_account_id(&self) -> &AccountId; fn http_client(&self) -> &reqwest::Client; @@ -41,7 +42,7 @@ pub trait ConsensusCtx { fn sign_queue(&self) -> Arc>; fn secret_storage(&self) -> &SecretNodeStorageBox; fn triple_storage(&self) -> LockTripleNodeStorageBox; - fn cfg(&self) -> &Config; + async fn cfg(&self) -> Config; } #[derive(thiserror::Error, Debug)] @@ -668,12 +669,13 @@ impl ConsensusProtocol for JoiningState { tracing::info!( "joining(running): sending a transaction to join the participant set" ); + let cfg = ctx.cfg().await; ctx.rpc_client() .call(ctx.signer(), ctx.mpc_contract_id(), "join") .args_json(json!({ "url": ctx.my_address(), - "cipher_pk": ctx.cfg().local.network.cipher_pk.to_bytes(), - "sign_pk": ctx.cfg().local.network.sign_sk.public_key(), + "cipher_pk": cfg.local.network.cipher_pk.to_bytes(), + "sign_pk": cfg.local.network.sign_sk.public_key(), })) .max_gas() .retry_exponential(10, 3) diff --git a/chain-signatures/node/src/protocol/cryptography.rs b/chain-signatures/node/src/protocol/cryptography.rs index 12f08c811..c7a2616e6 100644 --- a/chain-signatures/node/src/protocol/cryptography.rs +++ b/chain-signatures/node/src/protocol/cryptography.rs @@ -23,7 +23,7 @@ pub trait CryptographicCtx { fn signer(&self) -> &InMemorySigner; fn mpc_contract_id(&self) -> &AccountId; fn secret_storage(&mut self) -> &mut SecretNodeStorageBox; - fn cfg(&self) -> &Config; + async fn cfg(&self) -> Config; /// Active participants is the active participants at the beginning of each protocol loop. fn mesh(&self) -> &Mesh; @@ -74,6 +74,7 @@ impl CryptographicProtocol for GeneratingState { mut self, mut ctx: C, ) -> Result { + let cfg = ctx.cfg().await; tracing::info!(active = ?ctx.mesh().active_participants().keys_vec(), "generating: progressing key generation"); let mut protocol = self.protocol.write().await; loop { @@ -97,10 +98,10 @@ impl CryptographicProtocol for GeneratingState { .await .send_encrypted( ctx.me().await, - &ctx.cfg().local.network.sign_sk, + &cfg.local.network.sign_sk, ctx.http_client(), ctx.mesh().active_participants(), - &ctx.cfg().protocol, + &cfg.protocol, ) .await; if !failures.is_empty() { @@ -159,10 +160,10 @@ impl CryptographicProtocol for GeneratingState { .await .send_encrypted( ctx.me().await, - &ctx.cfg().local.network.sign_sk, + &cfg.local.network.sign_sk, ctx.http_client(), ctx.mesh().active_participants(), - &ctx.cfg().protocol, + &cfg.protocol, ) .await; if !failures.is_empty() { @@ -191,16 +192,17 @@ impl CryptographicProtocol for WaitingForConsensusState { mut self, ctx: C, ) -> Result { + let cfg = ctx.cfg().await; let failures = self .messages .write() .await .send_encrypted( ctx.me().await, - &ctx.cfg().local.network.sign_sk, + &cfg.local.network.sign_sk, ctx.http_client(), ctx.mesh().active_participants(), - &ctx.cfg().protocol, + &cfg.protocol, ) .await; if !failures.is_empty() { @@ -221,6 +223,8 @@ impl CryptographicProtocol for ResharingState { mut self, mut ctx: C, ) -> Result { + let cfg = ctx.cfg().await; + // TODO: we are not using active potential participants here, but we should in the future. // Currently resharing protocol does not timeout and restart with new set of participants. // So if it picks up a participant that is not active, it will never be able to send a message to it. @@ -253,10 +257,10 @@ impl CryptographicProtocol for ResharingState { .await .send_encrypted( ctx.me().await, - &ctx.cfg().local.network.sign_sk, + &cfg.local.network.sign_sk, ctx.http_client(), &active, - &ctx.cfg().protocol, + &cfg.protocol, ) .await; if !failures.is_empty() { @@ -321,10 +325,10 @@ impl CryptographicProtocol for ResharingState { .await .send_encrypted( ctx.me().await, - &ctx.cfg().local.network.sign_sk, + &cfg.local.network.sign_sk, ctx.http_client(), &active, - &ctx.cfg().protocol, + &cfg.protocol, ) .await; if !failures.is_empty() { @@ -356,8 +360,10 @@ impl CryptographicProtocol for RunningState { mut self, ctx: C, ) -> Result { - let protocol_cfg = &ctx.cfg().protocol; + let cfg = ctx.cfg().await; + let protocol_cfg = &cfg.protocol; let active = ctx.mesh().active_participants(); + let state_views = ctx.mesh().state_views().await; tracing::debug!( "RunningState.progress active participants: {:?} potential participants: {:?} me: {:?}", active.keys_vec(), @@ -403,6 +409,7 @@ impl CryptographicProtocol for RunningState { if let Err(err) = presignature_manager .stockpile( active, + &state_views, &self.public_key, &self.private_share, &mut triple_manager, @@ -450,6 +457,7 @@ impl CryptographicProtocol for RunningState { signature_manager.handle_requests( self.threshold, &stable, + &state_views, my_requests, &mut presignature_manager, protocol_cfg, @@ -473,7 +481,7 @@ impl CryptographicProtocol for RunningState { let failures = messages .send_encrypted( ctx.me().await, - &ctx.cfg().local.network.sign_sk, + &cfg.local.network.sign_sk, ctx.http_client(), active, protocol_cfg, diff --git a/chain-signatures/node/src/protocol/message.rs b/chain-signatures/node/src/protocol/message.rs index f402dfbe8..363a13b08 100644 --- a/chain-signatures/node/src/protocol/message.rs +++ b/chain-signatures/node/src/protocol/message.rs @@ -23,7 +23,7 @@ use tokio::sync::RwLock; pub trait MessageCtx { async fn me(&self) -> Participant; fn mesh(&self) -> &Mesh; - fn cfg(&self) -> &crate::config::Config; + async fn cfg(&self) -> crate::config::Config; } #[derive(Serialize, Deserialize, Debug, PartialEq, Eq)] @@ -231,7 +231,7 @@ impl MessageHandler for RunningState { ctx: C, queue: &mut MpcMessageQueue, ) -> Result<(), MessageHandleError> { - let protocol_cfg = &ctx.cfg().protocol; + let protocol_cfg = &ctx.cfg().await.protocol; let participants = ctx.mesh().active_participants(); let mut triple_manager = self.triple_manager.write().await; diff --git a/chain-signatures/node/src/protocol/mod.rs b/chain-signatures/node/src/protocol/mod.rs index 2570ce471..2e88da781 100644 --- a/chain-signatures/node/src/protocol/mod.rs +++ b/chain-signatures/node/src/protocol/mod.rs @@ -50,10 +50,11 @@ struct Ctx { sign_queue: Arc>, secret_storage: SecretNodeStorageBox, triple_storage: LockTripleNodeStorageBox, - cfg: Config, + cfg: Arc>, mesh: Mesh, } +#[async_trait::async_trait] impl ConsensusCtx for &mut MpcSignProtocol { fn my_account_id(&self) -> &AccountId { &self.ctx.account_id @@ -87,8 +88,8 @@ impl ConsensusCtx for &mut MpcSignProtocol { &self.ctx.secret_storage } - fn cfg(&self) -> &Config { - &self.ctx.cfg + async fn cfg(&self) -> Config { + self.ctx.cfg.read().await.clone() } fn triple_storage(&self) -> LockTripleNodeStorageBox { @@ -122,8 +123,8 @@ impl CryptographicCtx for &mut MpcSignProtocol { &mut self.ctx.secret_storage } - fn cfg(&self) -> &Config { - &self.ctx.cfg + async fn cfg(&self) -> Config { + self.ctx.cfg.read().await.clone() } fn mesh(&self) -> &Mesh { @@ -141,8 +142,8 @@ impl MessageCtx for &MpcSignProtocol { &self.ctx.mesh } - fn cfg(&self) -> &Config { - &self.ctx.cfg + async fn cfg(&self) -> Config { + self.ctx.cfg.read().await.clone() } } @@ -164,7 +165,7 @@ impl MpcSignProtocol { sign_queue: Arc>, secret_storage: SecretNodeStorageBox, triple_storage: LockTripleNodeStorageBox, - cfg: Config, + cfg: Arc>, ) -> (Self, Arc>) { let state = Arc::new(RwLock::new(NodeState::Starting)); let ctx = Ctx { @@ -203,13 +204,14 @@ impl MpcSignProtocol { let mut last_pinged = Instant::now(); // Sets the latest configurations from the contract: - if let Err(err) = self - .ctx - .cfg - .fetch_inplace(&self.ctx.rpc_client, &self.ctx.mpc_contract_id) - .await { - tracing::warn!("could not fetch contract's config on startup: {err:?}"); + let mut cfg = self.ctx.cfg.write().await; + if let Err(err) = cfg + .fetch_inplace(&self.ctx.rpc_client, &self.ctx.mpc_contract_id) + .await + { + tracing::warn!("could not fetch contract's config on startup: {err:?}"); + } } loop { @@ -262,9 +264,8 @@ impl MpcSignProtocol { if last_config_update.elapsed() > Duration::from_secs(5 * 60) { // Sets the latest configurations from the contract: - if let Err(err) = self - .ctx - .cfg + let mut cfg = self.ctx.cfg.write().await; + if let Err(err) = cfg .fetch_inplace(&self.ctx.rpc_client, &self.ctx.mpc_contract_id) .await { @@ -273,16 +274,18 @@ impl MpcSignProtocol { last_config_update = Instant::now(); } - if last_pinged.elapsed() > Duration::from_millis(300) { - self.ctx.mesh.ping().await; - last_pinged = Instant::now(); - } - let state = { let guard = self.state.read().await; guard.clone() }; + if last_pinged.elapsed() > Duration::from_millis(300) { + let protocol_cfg = &self.ctx.cfg.read().await.protocol; + let planned_previews = state.plan_preview(protocol_cfg).await; + self.ctx.mesh.ping(planned_previews).await; + last_pinged = Instant::now(); + } + let crypto_time = Instant::now(); tracing::debug!("State progress. Node state: {}", state); let mut state = match state.progress(&mut self).await { diff --git a/chain-signatures/node/src/protocol/presignature.rs b/chain-signatures/node/src/protocol/presignature.rs index 852feb798..11570de7c 100644 --- a/chain-signatures/node/src/protocol/presignature.rs +++ b/chain-signatures/node/src/protocol/presignature.rs @@ -3,6 +3,7 @@ use super::triple::{Triple, TripleId, TripleManager}; use crate::protocol::contract::primitives::Participants; use crate::types::{PresignatureProtocol, SecretKeyShare}; use crate::util::AffinePointExt; +use crate::web::StateView; use cait_sith::protocol::{Action, InitializationError, Participant, ProtocolError}; use cait_sith::{KeygenOutput, PresignArguments, PresignOutput}; @@ -105,7 +106,7 @@ pub struct PresignatureManager { /// Ongoing presignature generation protocols. generators: HashMap, /// List of presignature ids generation of which was initiated by the current node. - mine: VecDeque, + pub mine: VecDeque, /// The set of presignatures that were introduced to the system by the current node. introduced: HashSet, /// Garbage collection for presignatures that have either been taken or failed. This @@ -166,7 +167,7 @@ impl PresignatureManager { #[allow(clippy::too_many_arguments)] fn generate_internal( - participants: &Participants, + participants: &[Participant], me: Participant, threshold: usize, triple0: Triple, @@ -176,13 +177,12 @@ impl PresignatureManager { mine: bool, timeout: u64, ) -> Result { - let participants: Vec<_> = participants.keys().cloned().collect(); let protocol = Box::new(cait_sith::presign( - &participants, + participants, me, // These paramaters appear to be to make it easier to use different indexing schemes for triples // Introduced in this PR https://github.com/LIT-Protocol/cait-sith/pull/7 - &participants, + participants, me, PresignArguments { triple0: (triple0.share, triple0.public), @@ -196,7 +196,7 @@ impl PresignatureManager { )?); Ok(PresignatureGenerator::new( protocol, - participants, + participants.into(), triple0.id, triple1.id, mine, @@ -207,7 +207,7 @@ impl PresignatureManager { /// Starts a new presignature generation protocol. pub fn generate( &mut self, - participants: &Participants, + participants: &[Participant], triple0: Triple, triple1: Triple, public_key: &PublicKey, @@ -252,6 +252,7 @@ impl PresignatureManager { pub async fn stockpile( &mut self, active: &Participants, + state_views: &HashMap, pk: &PublicKey, sk_share: &SecretKeyShare, triple_manager: &mut TripleManager, @@ -274,28 +275,72 @@ impl PresignatureManager { // To ensure there is no contention between different nodes we are only using triples // that we proposed. This way in a non-BFT environment we are guaranteed to never try // to use the same triple as any other node. - if let Some((triple0, triple1)) = triple_manager.take_two_mine().await { + if let Some((triple0, triple1)) = triple_manager.peek_two_mine() { let presig_participants = active .intersection(&[&triple0.public.participants, &triple1.public.participants]); if presig_participants.len() < self.threshold { + tracing::warn!( + id0 = triple0.id, + id1 = triple0.id, + participants = ?presig_participants.keys_vec(), + "running: participants are not above threshold for presignature generation" + ); + return Ok(()); + } + + let state_views = presig_participants + .iter() + .filter_map(|(p, _)| Some((*p, state_views.get(p)?))); + + // Filter out the active participants with the state views that have the triples we want to use. + let active_filtered = state_views + .filter(|(_, state_view)| { + if let StateView::Running { + triple_postview, + .. + } = state_view + { + triple_postview.contains(&triple0.id) && triple_postview.contains(&triple1.id) + } else { + false + } + }) + .map(|(p, _)| p) + .collect::>(); + + if active_filtered.len() < self.threshold { tracing::debug!( + ?triple0, + ?triple1, participants = ?presig_participants.keys_vec(), "running: we don't have enough participants to generate a presignature" ); + return Ok(()); + } - // Insert back the triples to be used later since this active set of - // participants were not able to make use of these triples. - triple_manager.insert_mine(triple0).await; - triple_manager.insert_mine(triple1).await; - } else { - self.generate( - &presig_participants, - triple0, - triple1, - pk, - sk_share, - cfg.presignature.generation_timeout, - )?; + // Actually take the triples now that we have done the necessary checks. + let Some((triple0, triple1)) = triple_manager.take_two_mine().await else { + tracing::warn!("running: popping after peeking should have succeeded"); + return Ok(()); + }; + let id0 = triple0.id; + let id1 = triple1.id; + + if let Err(err @ InitializationError::BadParameters(_)) = self.generate( + &active_filtered, + triple0, + triple1, + pk, + sk_share, + cfg.presignature.generation_timeout, + ) { + tracing::warn!( + id0, + id1, + ?err, + "we had to trash two triples due to bad parameters" + ); + return Err(err); } } else { tracing::debug!("running: we don't have enough triples to generate a presignature"); @@ -370,7 +415,7 @@ impl PresignatureManager { }, }; let generator = Self::generate_internal( - participants, + &participants.keys_vec(), self.me, self.threshold, triple0, @@ -516,6 +561,16 @@ impl PresignatureManager { messages } + + pub fn preview(&self, presignatures: &HashSet) -> HashSet { + let presignatures = presignatures + .into_iter() + .filter(|id| self.presignatures.contains_key(id)) + .cloned() + .collect(); + + presignatures + } } pub fn hash_as_id(triple0: TripleId, triple1: TripleId) -> PresignatureId { diff --git a/chain-signatures/node/src/protocol/signature.rs b/chain-signatures/node/src/protocol/signature.rs index f9038a38a..3112a1a64 100644 --- a/chain-signatures/node/src/protocol/signature.rs +++ b/chain-signatures/node/src/protocol/signature.rs @@ -5,6 +5,7 @@ use crate::indexer::ContractSignRequest; use crate::kdf::into_eth_sig; use crate::types::SignatureProtocol; use crate::util::AffinePointExt; +use crate::web::StateView; use cait_sith::protocol::{Action, InitializationError, Participant, ProtocolError}; use cait_sith::{FullSignature, PresignOutput}; @@ -573,6 +574,7 @@ impl SignatureManager { &mut self, threshold: usize, stable: &Participants, + state_views: &HashMap, my_requests: &mut ParticipantRequests, presignature_manager: &mut PresignatureManager, cfg: &ProtocolConfig, @@ -587,7 +589,8 @@ impl SignatureManager { return; } let mut failed_presigs = Vec::new(); - while let Some(mut presignature) = { + let mut alternate = false; + while let Some(presignature) = { if self.failed.is_empty() && my_requests.is_empty() { None } else { @@ -603,13 +606,44 @@ impl SignatureManager { failed_presigs.push(presignature); continue; } - let presig_id = presignature.id; + let state_views = sig_participants + .iter() + .filter_map(|(p, _)| Some((*p, state_views.get(p)?))); + + // Filter out the active participants with the state views that have the triples we want to use. + let stable_filtered = state_views + .filter(|(_, state_view)| { + if let StateView::Running { + presignature_postview, + .. + } = state_view + { + presignature_postview.contains(&presignature.id) + } else { + false + } + }) + .map(|(p, _)| p) + .collect::>(); + + if stable_filtered.len() < threshold { + failed_presigs.push(presignature); + continue; + } // NOTE: this prioritizes old requests first then tries to do new ones if there's enough presignatures. // TODO: we need to decide how to prioritize certain requests over others such as with gas or time of // when the request made it into the NEAR network. // issue: https://github.com/near/mpc-recovery/issues/596 - if let Some((receipt_id, failed_req)) = self.failed.pop_front() { + + let id = presignature.id; + alternate = !alternate; + if alternate && !self.failed.is_empty() { + let Some((receipt_id, failed_req)) = self.failed.pop_front() else { + failed_presigs.push(presignature); + continue; + }; + if let Err((presignature, InitializationError::BadParameters(err))) = self .retry_failed_generation( receipt_id, @@ -619,35 +653,28 @@ impl SignatureManager { cfg, ) { - tracing::warn!(%receipt_id, presig_id, ?err, "failed to retry signature generation: trashing presignature"); failed_presigs.push(presignature); - continue; - } - - if let Some(another_presignature) = presignature_manager.take_mine() { - presignature = another_presignature; - } else { - break; + tracing::warn!(%receipt_id, id, ?err, "failed to retry signature generation: trashing presignature"); } } - - let Some((receipt_id, my_request)) = my_requests.pop_front() else { - failed_presigs.push(presignature); - continue; - }; - if let Err((presignature, InitializationError::BadParameters(err))) = self.generate( - &sig_participants, - receipt_id, - presignature, - my_request.request, - my_request.epsilon, - my_request.delta, - my_request.time_added, - cfg, - ) { - failed_presigs.push(presignature); - tracing::warn!(%receipt_id, presig_id, ?err, "failed to start signature generation: trashing presignature"); - continue; + else { + let Some((receipt_id, my_request)) = my_requests.pop_front() else { + failed_presigs.push(presignature); + continue; + }; + if let Err((presignature, InitializationError::BadParameters(err))) = self.generate( + &sig_participants, + receipt_id, + presignature, + my_request.request, + my_request.epsilon, + my_request.delta, + my_request.time_added, + cfg, + ) { + failed_presigs.push(presignature); + tracing::warn!(%receipt_id, id, ?err, "failed to start signature generation: trashing presignature"); + } } } diff --git a/chain-signatures/node/src/protocol/state.rs b/chain-signatures/node/src/protocol/state.rs index 58fd4a506..c6e4b9c4c 100644 --- a/chain-signatures/node/src/protocol/state.rs +++ b/chain-signatures/node/src/protocol/state.rs @@ -1,9 +1,9 @@ use super::contract::primitives::{ParticipantInfo, Participants}; use super::cryptography::CryptographicError; use super::monitor::StuckMonitor; -use super::presignature::PresignatureManager; +use super::presignature::{PresignatureId, PresignatureManager}; use super::signature::SignatureManager; -use super::triple::TripleManager; +use super::triple::{TripleId, TripleManager}; use super::SignQueue; use crate::http_client::MessageQueue; use crate::storage::triple_storage::TripleData; @@ -11,8 +11,10 @@ use crate::types::{KeygenProtocol, ReshareProtocol, SecretKeyShare}; use cait_sith::protocol::Participant; use crypto_shared::PublicKey; +use mpc_contract::config::ProtocolConfig; use near_account_id::AccountId; use serde::{Deserialize, Serialize}; +use std::collections::HashSet; use std::fmt; use std::fmt::{Display, Formatter}; use std::sync::Arc; @@ -205,6 +207,34 @@ impl NodeState { NodeState::Joining(state) => state.participants.find_participant_info(account_id), } } + + pub async fn plan_preview( + &self, + cfg: &ProtocolConfig, + ) -> Option<(HashSet, HashSet)> { + match self { + NodeState::Running(state) => { + let triple_manager = state.triple_manager.read().await; + let triple_preview = triple_manager + .mine + .iter() + .take(cfg.triple.preview_limit as usize) + .cloned() + .collect(); + + let presignature_manager = state.presignature_manager.read().await; + let presignature_preview = presignature_manager + .mine + .iter() + .take(cfg.presignature.preview_limit as usize) + .cloned() + .collect(); + + Some((triple_preview, presignature_preview)) + } + _ => None, + } + } } fn fetch_participant<'a>( diff --git a/chain-signatures/node/src/protocol/triple.rs b/chain-signatures/node/src/protocol/triple.rs index bdd9b1a79..0023c44cc 100644 --- a/chain-signatures/node/src/protocol/triple.rs +++ b/chain-signatures/node/src/protocol/triple.rs @@ -346,8 +346,7 @@ impl TripleManager { let id1 = self.mine.pop_front()?; tracing::info!(id0, id1, me = ?self.me, "trying to take two triples"); - let take_two_result = self.take_two(id0, id1).await; - match take_two_result { + match self.take_two(id0, id1).await { Err(error) if matches!( error, @@ -377,6 +376,17 @@ impl TripleManager { } } + pub fn peek_two_mine(&self) -> Option<(&Triple, &Triple)> { + if self.mine.len() < 2 { + return None; + } + let id0 = self.mine.get(0)?; + let id1 = self.mine.get(1)?; + let triple0 = self.triples.get(id0)?; + let triple1 = self.triples.get(id1)?; + Some((triple0, triple1)) + } + pub async fn insert_mine(&mut self, triple: Triple) { self.mine.push_back(triple.id); self.triples.insert(triple.id, triple.clone()); @@ -591,6 +601,16 @@ impl TripleManager { let _ = tokio_retry::Retry::spawn(retry_strategy, action).await; } } + + pub fn preview(&self, triples: &HashSet) -> HashSet { + let triples = triples + .into_iter() + .filter(|id| self.triples.contains_key(id)) + .cloned() + .collect(); + + triples + } } #[cfg(test)] diff --git a/chain-signatures/node/src/web/mod.rs b/chain-signatures/node/src/web/mod.rs index 16d8a922f..be0608584 100644 --- a/chain-signatures/node/src/web/mod.rs +++ b/chain-signatures/node/src/web/mod.rs @@ -1,11 +1,15 @@ mod error; use self::error::Error; +use crate::config::Config; use crate::indexer::Indexer; use crate::protocol::message::SignedMessage; +use crate::protocol::presignature::PresignatureId; +use crate::protocol::triple::TripleId; use crate::protocol::{MpcMessage, NodeState}; use crate::web::error::Result; use anyhow::Context; +use axum::extract::Query; use axum::http::StatusCode; use axum::routing::{get, post}; use axum::{Extension, Json, Router}; @@ -15,6 +19,7 @@ use mpc_keys::hpke::{self, Ciphered}; use near_primitives::types::BlockHeight; use prometheus::{Encoder, TextEncoder}; use serde::{Deserialize, Serialize}; +use std::collections::{HashMap, HashSet}; use std::{net::SocketAddr, sync::Arc}; use tokio::sync::{mpsc::Sender, RwLock}; @@ -23,6 +28,7 @@ struct AxumState { protocol_state: Arc>, cipher_sk: hpke::SecretKey, indexer: Indexer, + config: Arc>, } pub async fn run( @@ -31,6 +37,7 @@ pub async fn run( cipher_sk: hpke::SecretKey, protocol_state: Arc>, indexer: Indexer, + config: Arc>, ) -> anyhow::Result<()> { tracing::debug!("running a node"); let axum_state = AxumState { @@ -38,6 +45,7 @@ pub async fn run( protocol_state, cipher_sk, indexer, + config, }; let app = Router::new() @@ -98,7 +106,26 @@ async fn msg( Ok(()) } -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Deserialize)] +pub struct StateParams { + pub triple_preview: Option>, + pub presignature_preview: Option>, +} + +impl StateParams { + pub fn into_query(self) -> HashMap> { + let mut query = HashMap::new(); + if let Some(triple_preview) = self.triple_preview { + query.insert("triple_preview".to_string(), triple_preview); + } + if let Some(presignature_preview) = self.presignature_preview { + query.insert("presignature_preview".to_string(), presignature_preview); + } + query + } +} + +#[derive(Clone, Debug, Serialize, Deserialize)] #[serde(tag = "type")] #[serde(rename_all = "snake_case")] #[non_exhaustive] @@ -108,9 +135,13 @@ pub enum StateView { triple_count: usize, triple_mine_count: usize, triple_potential_count: usize, + #[serde(default, skip_serializing_if = "HashSet::is_empty")] + triple_postview: HashSet, presignature_count: usize, presignature_mine_count: usize, presignature_potential_count: usize, + #[serde(default, skip_serializing_if = "HashSet::is_empty")] + presignature_postview: HashSet, latest_block_height: BlockHeight, is_stable: bool, }, @@ -128,10 +159,14 @@ pub enum StateView { } #[tracing::instrument(level = "debug", skip_all)] -async fn state(Extension(state): Extension>) -> Result> { +async fn state( + Extension(state): Extension>, + Query(params): Query, +) -> Result> { tracing::debug!("fetching state"); let latest_block_height = state.indexer.latest_block_height().await; let is_stable = state.indexer.is_on_track().await; + let config = state.config.read().await; let protocol_state = state.protocol_state.read().await; match &*protocol_state { @@ -146,14 +181,39 @@ async fn state(Extension(state): Extension>) -> Result Date: Tue, 6 Aug 2024 09:33:37 +0900 Subject: [PATCH 02/11] Working previews --- chain-signatures/node/src/mesh/connection.rs | 33 ++++++++---- chain-signatures/node/src/protocol/mod.rs | 5 +- chain-signatures/node/src/web/mod.rs | 54 +++++++------------ integration-tests/chain-signatures/Cargo.lock | 4 +- 4 files changed, 50 insertions(+), 46 deletions(-) diff --git a/chain-signatures/node/src/mesh/connection.rs b/chain-signatures/node/src/mesh/connection.rs index f47d78713..0ac39802a 100644 --- a/chain-signatures/node/src/mesh/connection.rs +++ b/chain-signatures/node/src/mesh/connection.rs @@ -44,11 +44,17 @@ impl Pool { let mut params = HashMap::new(); if let Some((triples, presignatures)) = previews { - params.insert("triple_preview", triples); - params.insert("presignature_preview", presignatures); + if !triples.is_empty() { + params.insert("triple_preview", triples); + } + if !presignatures.is_empty() { + params.insert("presignature_preview", presignatures); + } } let mut status = self.status.write().await; + status.clear(); + let mut participants = Participants::default(); for (participant, info) in connections.iter() { let Ok(Ok(url)) = Url::parse(&info.url).map(|url| url.join("/state")) else { @@ -60,13 +66,20 @@ impl Pool { continue; }; - let Ok(resp) = self.http.get(url.clone()).query(¶ms).send().await else { - tracing::warn!( - "Pool.ping resp err participant {:?} url {}", - participant, - url - ); - continue; + let resp = match self.http.get(url.clone()) + .header("content-type", "application/json") + .json(¶ms).send().await + { + Ok(resp) => resp, + Err(err) => { + tracing::warn!( + ?err, + "Pool.ping resp err participant {:?} url {}", + participant, + url + ); + continue; + } }; let Ok(state): Result = resp.json().await else { @@ -79,6 +92,8 @@ impl Pool { }; status.insert(*participant, state); + // self.status.insert(*participant, state); + participants.insert(participant, info.clone()); } drop(status); diff --git a/chain-signatures/node/src/protocol/mod.rs b/chain-signatures/node/src/protocol/mod.rs index 2e88da781..bad25372b 100644 --- a/chain-signatures/node/src/protocol/mod.rs +++ b/chain-signatures/node/src/protocol/mod.rs @@ -202,6 +202,7 @@ impl MpcSignProtocol { let mut last_state_update = Instant::now(); let mut last_config_update = Instant::now(); let mut last_pinged = Instant::now(); + let mut require_ping = false; // Sets the latest configurations from the contract: { @@ -255,6 +256,7 @@ impl MpcSignProtocol { // set which participants are currently active in the protocol and determines who will be // receiving messages. self.ctx.mesh.establish_participants(&contract_state).await; + require_ping = true; last_state_update = Instant::now(); Some(contract_state) @@ -279,11 +281,12 @@ impl MpcSignProtocol { guard.clone() }; - if last_pinged.elapsed() > Duration::from_millis(300) { + if require_ping || last_pinged.elapsed() > Duration::from_millis(300) { let protocol_cfg = &self.ctx.cfg.read().await.protocol; let planned_previews = state.plan_preview(protocol_cfg).await; self.ctx.mesh.ping(planned_previews).await; last_pinged = Instant::now(); + require_ping = false; } let crypto_time = Instant::now(); diff --git a/chain-signatures/node/src/web/mod.rs b/chain-signatures/node/src/web/mod.rs index be0608584..76f20fb5d 100644 --- a/chain-signatures/node/src/web/mod.rs +++ b/chain-signatures/node/src/web/mod.rs @@ -106,23 +106,12 @@ async fn msg( Ok(()) } -#[derive(Debug, Deserialize)] +#[derive(Debug, Serialize, Deserialize)] pub struct StateParams { - pub triple_preview: Option>, - pub presignature_preview: Option>, -} - -impl StateParams { - pub fn into_query(self) -> HashMap> { - let mut query = HashMap::new(); - if let Some(triple_preview) = self.triple_preview { - query.insert("triple_preview".to_string(), triple_preview); - } - if let Some(presignature_preview) = self.presignature_preview { - query.insert("presignature_preview".to_string(), presignature_preview); - } - query - } + #[serde(default)] + pub triple_preview: Vec, + #[serde(default)] + pub presignature_preview: Vec, } #[derive(Clone, Debug, Serialize, Deserialize)] @@ -161,7 +150,7 @@ pub enum StateView { #[tracing::instrument(level = "debug", skip_all)] async fn state( Extension(state): Extension>, - Query(params): Query, + params: Option>, ) -> Result> { tracing::debug!("fetching state"); let latest_block_height = state.indexer.latest_block_height().await; @@ -181,29 +170,26 @@ async fn state( let presignature_potential_count = presignature_read.potential_len(); let participants = state.participants.keys_vec(); - let triple_postview = if let Some(triple_preview) = params.triple_preview { - let triple_preview = triple_preview - .into_iter() + let (triple_postview, presignature_postview) = if let Some(params) = params { + let triple_preview = params.triple_preview + .iter() .take(config.protocol.triple.preview_limit as usize) + .cloned() .collect(); let triple_manager = state.triple_manager.read().await; - triple_manager.preview(&triple_preview) + let triple_postview = triple_manager.preview(&triple_preview); + let presignature_preview = params.presignature_preview + .iter() + .take(config.protocol.presignature.preview_limit as usize) + .cloned() + .collect(); + let presignature_manager = state.presignature_manager.read().await; + let presignature_postview = presignature_manager.preview(&presignature_preview); + (triple_postview, presignature_postview) } else { - HashSet::new() + (HashSet::new(), HashSet::new()) }; - let presignature_postview = - if let Some(presignature_preview) = params.presignature_preview { - let presignature_preview = presignature_preview - .into_iter() - .take(config.protocol.presignature.preview_limit as usize) - .collect(); - let presignature_manager = state.presignature_manager.read().await; - presignature_manager.preview(&presignature_preview) - } else { - HashSet::new() - }; - Ok(Json(StateView::Running { participants, triple_count, diff --git a/integration-tests/chain-signatures/Cargo.lock b/integration-tests/chain-signatures/Cargo.lock index f920c7a93..e45b09846 100644 --- a/integration-tests/chain-signatures/Cargo.lock +++ b/integration-tests/chain-signatures/Cargo.lock @@ -4239,7 +4239,7 @@ dependencies = [ [[package]] name = "mpc-contract" -version = "1.0.0-rc.2" +version = "1.0.0-rc.3" dependencies = [ "borsh", "crypto-shared", @@ -4264,7 +4264,7 @@ dependencies = [ [[package]] name = "mpc-node" -version = "1.0.0-rc.2" +version = "1.0.0-rc.3" dependencies = [ "anyhow", "async-trait", From 4d3b5d7a85bb16cfa1c44af2a3222b21c39b1a0a Mon Sep 17 00:00:00 2001 From: Phuong Nguyen Date: Tue, 6 Aug 2024 09:47:31 +0900 Subject: [PATCH 03/11] Clippy --- chain-signatures/node/src/mesh/connection.rs | 19 ++++++++++++------- chain-signatures/node/src/mesh/mod.rs | 1 + .../node/src/protocol/presignature.rs | 18 +++++++++--------- .../node/src/protocol/signature.rs | 3 +-- chain-signatures/node/src/web/mod.rs | 9 +++++---- 5 files changed, 28 insertions(+), 22 deletions(-) diff --git a/chain-signatures/node/src/mesh/connection.rs b/chain-signatures/node/src/mesh/connection.rs index 0ac39802a..0772fb8dd 100644 --- a/chain-signatures/node/src/mesh/connection.rs +++ b/chain-signatures/node/src/mesh/connection.rs @@ -9,7 +9,7 @@ use crate::protocol::contract::primitives::Participants; use crate::protocol::presignature::PresignatureId; use crate::protocol::triple::TripleId; use crate::protocol::ProtocolState; -use crate::web::{StateParams, StateView}; +use crate::web::StateView; const DEFAULT_TIMEOUT: Duration = Duration::from_secs(1); @@ -53,8 +53,6 @@ impl Pool { } let mut status = self.status.write().await; - status.clear(); - let mut participants = Participants::default(); for (participant, info) in connections.iter() { let Ok(Ok(url)) = Url::parse(&info.url).map(|url| url.join("/state")) else { @@ -66,9 +64,13 @@ impl Pool { continue; }; - let resp = match self.http.get(url.clone()) + let resp = match self + .http + .get(url.clone()) .header("content-type", "application/json") - .json(¶ms).send().await + .json(¶ms) + .send() + .await { Ok(resp) => resp, Err(err) => { @@ -128,10 +130,8 @@ impl Pool { }; status.insert(*participant, state); - // self.status.insert(*participant, state); participants.insert(participant, info.clone()); } - // drop(status); let mut potential_active = self.potential_active.write().await; *potential_active = Some((participants.clone(), Instant::now())); @@ -186,4 +186,9 @@ impl Pool { _ => false, }) } + + pub async fn clear_status(&self) { + let mut status = self.status.write().await; + status.clear(); + } } diff --git a/chain-signatures/node/src/mesh/mod.rs b/chain-signatures/node/src/mesh/mod.rs index bc1544d84..37713d6cf 100644 --- a/chain-signatures/node/src/mesh/mod.rs +++ b/chain-signatures/node/src/mesh/mod.rs @@ -75,6 +75,7 @@ impl Mesh { /// Ping the active participants such that we can see who is alive. pub async fn ping(&mut self, previews: Option<(HashSet, HashSet)>) { + self.connections.clear_status().await; self.active_participants = self.connections.ping(previews).await; tracing::debug!( "Mesh.ping set active participants to {:?}", diff --git a/chain-signatures/node/src/protocol/presignature.rs b/chain-signatures/node/src/protocol/presignature.rs index 11570de7c..75630a2c9 100644 --- a/chain-signatures/node/src/protocol/presignature.rs +++ b/chain-signatures/node/src/protocol/presignature.rs @@ -276,12 +276,14 @@ impl PresignatureManager { // that we proposed. This way in a non-BFT environment we are guaranteed to never try // to use the same triple as any other node. if let Some((triple0, triple1)) = triple_manager.peek_two_mine() { + let id0 = triple0.id; + let id1 = triple1.id; let presig_participants = active .intersection(&[&triple0.public.participants, &triple1.public.participants]); if presig_participants.len() < self.threshold { tracing::warn!( - id0 = triple0.id, - id1 = triple0.id, + id0, + id1, participants = ?presig_participants.keys_vec(), "running: participants are not above threshold for presignature generation" ); @@ -296,11 +298,11 @@ impl PresignatureManager { let active_filtered = state_views .filter(|(_, state_view)| { if let StateView::Running { - triple_postview, - .. + triple_postview, .. } = state_view { - triple_postview.contains(&triple0.id) && triple_postview.contains(&triple1.id) + triple_postview.contains(&triple0.id) + && triple_postview.contains(&triple1.id) } else { false } @@ -310,8 +312,8 @@ impl PresignatureManager { if active_filtered.len() < self.threshold { tracing::debug!( - ?triple0, - ?triple1, + id0, + id1, participants = ?presig_participants.keys_vec(), "running: we don't have enough participants to generate a presignature" ); @@ -323,8 +325,6 @@ impl PresignatureManager { tracing::warn!("running: popping after peeking should have succeeded"); return Ok(()); }; - let id0 = triple0.id; - let id1 = triple1.id; if let Err(err @ InitializationError::BadParameters(_)) = self.generate( &active_filtered, diff --git a/chain-signatures/node/src/protocol/signature.rs b/chain-signatures/node/src/protocol/signature.rs index 3112a1a64..378889474 100644 --- a/chain-signatures/node/src/protocol/signature.rs +++ b/chain-signatures/node/src/protocol/signature.rs @@ -656,8 +656,7 @@ impl SignatureManager { failed_presigs.push(presignature); tracing::warn!(%receipt_id, id, ?err, "failed to retry signature generation: trashing presignature"); } - } - else { + } else { let Some((receipt_id, my_request)) = my_requests.pop_front() else { failed_presigs.push(presignature); continue; diff --git a/chain-signatures/node/src/web/mod.rs b/chain-signatures/node/src/web/mod.rs index 76f20fb5d..0aa311e85 100644 --- a/chain-signatures/node/src/web/mod.rs +++ b/chain-signatures/node/src/web/mod.rs @@ -9,7 +9,6 @@ use crate::protocol::triple::TripleId; use crate::protocol::{MpcMessage, NodeState}; use crate::web::error::Result; use anyhow::Context; -use axum::extract::Query; use axum::http::StatusCode; use axum::routing::{get, post}; use axum::{Extension, Json, Router}; @@ -19,7 +18,7 @@ use mpc_keys::hpke::{self, Ciphered}; use near_primitives::types::BlockHeight; use prometheus::{Encoder, TextEncoder}; use serde::{Deserialize, Serialize}; -use std::collections::{HashMap, HashSet}; +use std::collections::HashSet; use std::{net::SocketAddr, sync::Arc}; use tokio::sync::{mpsc::Sender, RwLock}; @@ -171,14 +170,16 @@ async fn state( let participants = state.participants.keys_vec(); let (triple_postview, presignature_postview) = if let Some(params) = params { - let triple_preview = params.triple_preview + let triple_preview = params + .triple_preview .iter() .take(config.protocol.triple.preview_limit as usize) .cloned() .collect(); let triple_manager = state.triple_manager.read().await; let triple_postview = triple_manager.preview(&triple_preview); - let presignature_preview = params.presignature_preview + let presignature_preview = params + .presignature_preview .iter() .take(config.protocol.presignature.preview_limit as usize) .cloned() From bce570a235f3cb0f9073ca383cafa743d392ec7e Mon Sep 17 00:00:00 2001 From: Phuong Nguyen Date: Tue, 6 Aug 2024 15:35:20 +0900 Subject: [PATCH 04/11] Clippy --- chain-signatures/contract/src/config/mod.rs | 6 ++++-- chain-signatures/node/src/protocol/presignature.rs | 8 +++----- chain-signatures/node/src/protocol/triple.rs | 10 ++++------ 3 files changed, 11 insertions(+), 13 deletions(-) diff --git a/chain-signatures/contract/src/config/mod.rs b/chain-signatures/contract/src/config/mod.rs index 51ad11cb5..742da8313 100644 --- a/chain-signatures/contract/src/config/mod.rs +++ b/chain-signatures/contract/src/config/mod.rs @@ -114,12 +114,14 @@ mod tests { "triple": { "min_triples": 10, "max_triples": 100, - "generation_timeout": 10000 + "generation_timeout": 10000, + "preview_limit": 128, }, "presignature": { "min_presignatures": 10, "max_presignatures": 100, - "generation_timeout": 10000 + "generation_timeout": 10000, + "preview_limit": 256, }, "signature": { "generation_timeout": 10000, diff --git a/chain-signatures/node/src/protocol/presignature.rs b/chain-signatures/node/src/protocol/presignature.rs index 7b27c4465..f4263fbb5 100644 --- a/chain-signatures/node/src/protocol/presignature.rs +++ b/chain-signatures/node/src/protocol/presignature.rs @@ -565,13 +565,11 @@ impl PresignatureManager { } pub fn preview(&self, presignatures: &HashSet) -> HashSet { - let presignatures = presignatures - .into_iter() + presignatures + .iter() .filter(|id| self.presignatures.contains_key(id)) .cloned() - .collect(); - - presignatures + .collect() } } diff --git a/chain-signatures/node/src/protocol/triple.rs b/chain-signatures/node/src/protocol/triple.rs index 0023c44cc..d24c61ff5 100644 --- a/chain-signatures/node/src/protocol/triple.rs +++ b/chain-signatures/node/src/protocol/triple.rs @@ -380,7 +380,7 @@ impl TripleManager { if self.mine.len() < 2 { return None; } - let id0 = self.mine.get(0)?; + let id0 = self.mine.front()?; let id1 = self.mine.get(1)?; let triple0 = self.triples.get(id0)?; let triple1 = self.triples.get(id1)?; @@ -603,13 +603,11 @@ impl TripleManager { } pub fn preview(&self, triples: &HashSet) -> HashSet { - let triples = triples - .into_iter() + triples + .iter() .filter(|id| self.triples.contains_key(id)) .cloned() - .collect(); - - triples + .collect() } } From ed19ae8e9bfff2d1a0b107c9c49593ff711c524a Mon Sep 17 00:00:00 2001 From: Phuong Nguyen Date: Wed, 7 Aug 2024 16:38:43 +0900 Subject: [PATCH 05/11] Removed clear_status and made logging match warn --- chain-signatures/node/src/mesh/connection.rs | 12 +++++------- chain-signatures/node/src/mesh/mod.rs | 1 - chain-signatures/node/src/protocol/cryptography.rs | 8 +------- 3 files changed, 6 insertions(+), 15 deletions(-) diff --git a/chain-signatures/node/src/mesh/connection.rs b/chain-signatures/node/src/mesh/connection.rs index dbbb10e87..3c7558506 100644 --- a/chain-signatures/node/src/mesh/connection.rs +++ b/chain-signatures/node/src/mesh/connection.rs @@ -53,6 +53,10 @@ impl Pool { } let mut status = self.status.write().await; + // Clear the status before we overwrite it just so we don't have any stale participant + // statuses that are no longer in the network after a reshare. + status.clear(); + let mut participants = Participants::default(); for (participant, info) in connections.iter() { let Ok(Ok(url)) = Url::parse(&info.url).map(|url| url.join("/state")) else { @@ -94,8 +98,6 @@ impl Pool { }; status.insert(*participant, state); - // self.status.insert(*participant, state); - participants.insert(participant, info.clone()); } drop(status); @@ -179,12 +181,8 @@ impl Pool { .get(participant) .map_or(false, |state| match state { StateView::Running { is_stable, .. } => *is_stable, + StateView::Resharing { is_stable,.. } => *is_stable, _ => false, }) } - - pub async fn clear_status(&self) { - let mut status = self.status.write().await; - status.clear(); - } } diff --git a/chain-signatures/node/src/mesh/mod.rs b/chain-signatures/node/src/mesh/mod.rs index 2597248d1..fb87b1dfb 100644 --- a/chain-signatures/node/src/mesh/mod.rs +++ b/chain-signatures/node/src/mesh/mod.rs @@ -75,7 +75,6 @@ impl Mesh { /// Ping the active participants such that we can see who is alive. pub async fn ping(&mut self, previews: Option<(HashSet, HashSet)>) { - self.connections.clear_status().await; self.active_participants = self.connections.ping(previews).await; self.active_potential_participants = self.connections.ping_potential().await; diff --git a/chain-signatures/node/src/protocol/cryptography.rs b/chain-signatures/node/src/protocol/cryptography.rs index c7a2616e6..edb5a9d1b 100644 --- a/chain-signatures/node/src/protocol/cryptography.rs +++ b/chain-signatures/node/src/protocol/cryptography.rs @@ -364,14 +364,8 @@ impl CryptographicProtocol for RunningState { let protocol_cfg = &cfg.protocol; let active = ctx.mesh().active_participants(); let state_views = ctx.mesh().state_views().await; - tracing::debug!( - "RunningState.progress active participants: {:?} potential participants: {:?} me: {:?}", - active.keys_vec(), - ctx.mesh().potential_participants().await.keys_vec(), - ctx.me().await - ); if active.len() < self.threshold { - tracing::info!( + tracing::warn!( active = ?active.keys_vec(), "running: not enough participants to progress" ); From eae642093be3af0686fc94596e805d21e2eb7aa5 Mon Sep 17 00:00:00 2001 From: Phuong Nguyen Date: Wed, 7 Aug 2024 16:42:15 +0900 Subject: [PATCH 06/11] Make http request conditional on having params --- chain-signatures/node/src/mesh/connection.rs | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/chain-signatures/node/src/mesh/connection.rs b/chain-signatures/node/src/mesh/connection.rs index 3c7558506..347fcad8c 100644 --- a/chain-signatures/node/src/mesh/connection.rs +++ b/chain-signatures/node/src/mesh/connection.rs @@ -68,11 +68,17 @@ impl Pool { continue; }; - let resp = match self + let mut req = self .http - .get(url.clone()) - .header("content-type", "application/json") - .json(¶ms) + .get(url.clone()); + + if !params.is_empty() { + req = req + .header("content-type", "application/json") + .json(¶ms); + } + + let resp = match req .send() .await { From e967e834a7385548200e63ca463c1c51e6f620ee Mon Sep 17 00:00:00 2001 From: Phuong Nguyen Date: Wed, 7 Aug 2024 17:55:30 +0900 Subject: [PATCH 07/11] Better loggin and include stable_filtered --- chain-signatures/node/src/mesh/connection.rs | 15 +- .../node/src/protocol/presignature.rs | 166 ++++++++++-------- .../node/src/protocol/signature.rs | 63 ++++--- 3 files changed, 137 insertions(+), 107 deletions(-) diff --git a/chain-signatures/node/src/mesh/connection.rs b/chain-signatures/node/src/mesh/connection.rs index 347fcad8c..9f350805e 100644 --- a/chain-signatures/node/src/mesh/connection.rs +++ b/chain-signatures/node/src/mesh/connection.rs @@ -68,20 +68,13 @@ impl Pool { continue; }; - let mut req = self - .http - .get(url.clone()); + let mut req = self.http.get(url.clone()); if !params.is_empty() { - req = req - .header("content-type", "application/json") - .json(¶ms); + req = req.header("content-type", "application/json").json(¶ms); } - let resp = match req - .send() - .await - { + let resp = match req.send().await { Ok(resp) => resp, Err(err) => { tracing::warn!( @@ -187,7 +180,7 @@ impl Pool { .get(participant) .map_or(false, |state| match state { StateView::Running { is_stable, .. } => *is_stable, - StateView::Resharing { is_stable,.. } => *is_stable, + StateView::Resharing { is_stable, .. } => *is_stable, _ => false, }) } diff --git a/chain-signatures/node/src/protocol/presignature.rs b/chain-signatures/node/src/protocol/presignature.rs index f4263fbb5..95e7d27ac 100644 --- a/chain-signatures/node/src/protocol/presignature.rs +++ b/chain-signatures/node/src/protocol/presignature.rs @@ -260,95 +260,111 @@ impl PresignatureManager { triple_manager: &mut TripleManager, cfg: &ProtocolConfig, ) -> Result<(), InitializationError> { - let not_enough_presignatures = { + let enough_presignatures = { // Stopgap to prevent too many presignatures in the system. This should be around min_presig*nodes*2 // for good measure so that we have enough presignatures to do sig generation while also maintain // the minimum number of presignature where a single node can't flood the system. - if self.potential_len() >= cfg.presignature.max_presignatures as usize { + if self.potential_len() < cfg.presignature.max_presignatures as usize { false } else { // We will always try to generate a new triple if we have less than the minimum - self.my_len() < cfg.presignature.min_presignatures as usize - && self.introduced.len() < cfg.max_concurrent_introduction as usize + self.my_len() >= cfg.presignature.min_presignatures as usize + || self.introduced.len() >= cfg.max_concurrent_introduction as usize } }; - if not_enough_presignatures { - // To ensure there is no contention between different nodes we are only using triples - // that we proposed. This way in a non-BFT environment we are guaranteed to never try - // to use the same triple as any other node. - if let Some((triple0, triple1)) = triple_manager.peek_two_mine() { - let id0 = triple0.id; - let id1 = triple1.id; - let presig_participants = active - .intersection(&[&triple0.public.participants, &triple1.public.participants]); - if presig_participants.len() < self.threshold { - tracing::warn!( - id0, - id1, - participants = ?presig_participants.keys_vec(), - "running: participants are not above threshold for presignature generation" - ); - return Ok(()); - } - - let state_views = presig_participants - .iter() - .filter_map(|(p, _)| Some((*p, state_views.get(p)?))); - - // Filter out the active participants with the state views that have the triples we want to use. - let active_filtered = state_views - .filter(|(_, state_view)| { - if let StateView::Running { - triple_postview, .. - } = state_view - { - triple_postview.contains(&triple0.id) - && triple_postview.contains(&triple1.id) - } else { - false - } - }) - .map(|(p, _)| p) - .collect::>(); - - if active_filtered.len() < self.threshold { - tracing::debug!( - id0, - id1, - participants = ?presig_participants.keys_vec(), - "running: we don't have enough participants to generate a presignature" - ); - return Ok(()); - } + if enough_presignatures { + return Ok(()); + } - // Actually take the triples now that we have done the necessary checks. - let Some((triple0, triple1)) = triple_manager.take_two_mine().await else { - tracing::warn!("running: popping after peeking should have succeeded"); - return Ok(()); - }; + // To ensure there is no contention between different nodes we are only using triples + // that we proposed. This way in a non-BFT environment we are guaranteed to never try + // to use the same triple as any other node. + let Some((triple0, triple1)) = triple_manager.peek_two_mine() else { + tracing::debug!( + triple_mine = triple_manager.my_len(), + triple_potential = triple_manager.potential_len(), + "running: we don't have enough triples to generate a presignature" + ); + return Ok(()); + }; + let id0 = triple0.id; + let id1 = triple1.id; + let presig_participants = + active.intersection(&[&triple0.public.participants, &triple1.public.participants]); + if presig_participants.len() < self.threshold { + tracing::warn!( + id0, + id1, + threshold = self.threshold, + triple0 = ?triple0.public.participants, + triple1 = ?triple1.public.participants, + active = ?active.keys_vec(), + "running: participants are not above threshold for presignature generation" + ); + return Ok(()); + } - if let Err(err @ InitializationError::BadParameters(_)) = self.generate( - &active_filtered, - triple0, - triple1, - pk, - sk_share, - cfg.presignature.generation_timeout, - ) { - tracing::warn!( - id0, - id1, - ?err, - "we had to trash two triples due to bad parameters" - ); - return Err(err); + // Filter out the active participants with the state views that have the triples we want to use. + let active_filtered = presig_participants + .iter() + .filter_map(|(p, _)| Some((*p, state_views.get(p)?))) + .filter(|(_, state_view)| { + if let StateView::Running { + triple_postview, .. + } = state_view + { + triple_postview.contains(&triple0.id) && triple_postview.contains(&triple1.id) + } else { + false } - } else { - tracing::debug!("running: we don't have enough triples to generate a presignature"); - } + }) + .map(|(p, _)| p) + .collect::>(); + + if active_filtered.len() < self.threshold { + tracing::warn!( + id0, + id1, + threshold = self.threshold, + triple0 = ?triple0.public.participants, + triple1 = ?triple1.public.participants, + active = ?active.keys_vec(), + ?active_filtered, + ?state_views, + "running: we don't have enough participants to generate a presignature" + ); + return Ok(()); } + // Actually take the triples now that we have done the necessary checks. + let Some((triple0, triple1)) = triple_manager.take_two_mine().await else { + tracing::warn!( + id0, + id1, + potential = triple_manager.potential_len(), + "running: popping after peeking should have succeeded", + ); + return Ok(()); + }; + + if let Err(err @ InitializationError::BadParameters(_)) = self.generate( + &active_filtered, + triple0, + triple1, + pk, + sk_share, + cfg.presignature.generation_timeout, + ) { + tracing::warn!( + id0, + id1, + ?err, + ?active_filtered, + "we had to trash two triples due to bad parameters" + ); + return Err(err); + } Ok(()) } diff --git a/chain-signatures/node/src/protocol/signature.rs b/chain-signatures/node/src/protocol/signature.rs index 378889474..68a1cb9b7 100644 --- a/chain-signatures/node/src/protocol/signature.rs +++ b/chain-signatures/node/src/protocol/signature.rs @@ -106,7 +106,7 @@ impl SignQueue { my_account_id: &AccountId, ) { if stable.len() < threshold { - tracing::info!( + tracing::warn!( "Require at least {} stable participants to organize, got {}: {:?}", threshold, stable.len(), @@ -291,14 +291,13 @@ impl SignatureManager { #[allow(clippy::too_many_arguments)] #[allow(clippy::result_large_err)] fn generate_internal( - participants: &Participants, + participants: &[Participant], me: Participant, public_key: PublicKey, presignature: Presignature, req: GenerationRequest, cfg: &ProtocolConfig, ) -> Result { - let participants = participants.keys_vec(); let GenerationRequest { proposer, request, @@ -316,7 +315,7 @@ impl SignatureManager { let presignature_id = presignature.id; let protocol = Box::new( cait_sith::sign( - &participants, + participants, me, derive_key(public_key, epsilon), output, @@ -326,7 +325,7 @@ impl SignatureManager { ); Ok(SignatureGenerator::new( protocol, - participants, + participants.into(), proposer, presignature_id, request, @@ -343,10 +342,10 @@ impl SignatureManager { receipt_id: ReceiptId, req: GenerationRequest, presignature: Presignature, - participants: &Participants, + participants: &[Participant], cfg: &ProtocolConfig, ) -> Result<(), (Presignature, InitializationError)> { - tracing::info!(receipt_id = %receipt_id, participants = ?participants.keys_vec(), "restarting failed protocol to generate signature"); + tracing::info!(%receipt_id, ?participants, "restarting failed protocol to generate signature"); let generator = Self::generate_internal( participants, self.me, @@ -364,7 +363,7 @@ impl SignatureManager { #[allow(clippy::result_large_err)] pub fn generate( &mut self, - participants: &Participants, + participants: &[Participant], receipt_id: ReceiptId, presignature: Presignature, request: ContractSignRequest, @@ -377,7 +376,7 @@ impl SignatureManager { %receipt_id, me = ?self.me, presignature_id = presignature.id, - participants = ?participants.keys_vec(), + ?participants, "starting protocol to generate a new signature", ); let generator = Self::generate_internal( @@ -442,7 +441,7 @@ impl SignatureManager { }; tracing::info!(me = ?self.me, presignature_id, "found presignature: ready to start signature generation"); let generator = match Self::generate_internal( - participants, + &participants.keys_vec(), self.me, self.public_key, presignature, @@ -597,21 +596,24 @@ impl SignatureManager { presignature_manager.take_mine() } } { + let id = presignature.id; let sig_participants = stable.intersection(&[&presignature.participants]); if sig_participants.len() < threshold { - tracing::debug!( - participants = ?sig_participants.keys_vec(), + tracing::warn!( + id, + threshold, + stable = ?stable.keys_vec(), + participants = ?presignature.participants, "we do not have enough participants to generate a signature" ); failed_presigs.push(presignature); continue; } - let state_views = sig_participants - .iter() - .filter_map(|(p, _)| Some((*p, state_views.get(p)?))); // Filter out the active participants with the state views that have the triples we want to use. - let stable_filtered = state_views + let stable_filtered = sig_participants + .iter() + .filter_map(|(p, _)| Some((*p, state_views.get(p)?))) .filter(|(_, state_view)| { if let StateView::Running { presignature_postview, @@ -627,6 +629,14 @@ impl SignatureManager { .collect::>(); if stable_filtered.len() < threshold { + tracing::warn!( + id, + threshold, + stable = ?stable.keys_vec(), + participants = ?presignature.participants, + ?state_views, + "unable to use presignature for signature generation", + ); failed_presigs.push(presignature); continue; } @@ -636,7 +646,6 @@ impl SignatureManager { // when the request made it into the NEAR network. // issue: https://github.com/near/mpc-recovery/issues/596 - let id = presignature.id; alternate = !alternate; if alternate && !self.failed.is_empty() { let Some((receipt_id, failed_req)) = self.failed.pop_front() else { @@ -649,12 +658,18 @@ impl SignatureManager { receipt_id, failed_req, presignature, - &sig_participants, + &stable_filtered, cfg, ) { failed_presigs.push(presignature); - tracing::warn!(%receipt_id, id, ?err, "failed to retry signature generation: trashing presignature"); + tracing::warn!( + %receipt_id, + id, + ?stable_filtered, + ?err, + "failed to retry signature generation: trashing presignature", + ); } } else { let Some((receipt_id, my_request)) = my_requests.pop_front() else { @@ -662,7 +677,7 @@ impl SignatureManager { continue; }; if let Err((presignature, InitializationError::BadParameters(err))) = self.generate( - &sig_participants, + &stable_filtered, receipt_id, presignature, my_request.request, @@ -672,7 +687,13 @@ impl SignatureManager { cfg, ) { failed_presigs.push(presignature); - tracing::warn!(%receipt_id, id, ?err, "failed to start signature generation: trashing presignature"); + tracing::warn!( + %receipt_id, + id, + ?stable_filtered, + ?err, + "failed to start signature generation: trashing presignature", + ); } } } From cf4f3b058cf8dbbfe79ea99d3e3044dbb728aad0 Mon Sep 17 00:00:00 2001 From: Phuong Nguyen Date: Wed, 7 Aug 2024 23:17:44 +0900 Subject: [PATCH 08/11] Improve log messages --- chain-signatures/node/src/protocol/presignature.rs | 4 ++-- chain-signatures/node/src/protocol/signature.rs | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/chain-signatures/node/src/protocol/presignature.rs b/chain-signatures/node/src/protocol/presignature.rs index 95e7d27ac..09c442957 100644 --- a/chain-signatures/node/src/protocol/presignature.rs +++ b/chain-signatures/node/src/protocol/presignature.rs @@ -300,7 +300,7 @@ impl PresignatureManager { triple0 = ?triple0.public.participants, triple1 = ?triple1.public.participants, active = ?active.keys_vec(), - "running: participants are not above threshold for presignature generation" + "running: common participants are less than threshold for presignature generation" ); return Ok(()); } @@ -332,7 +332,7 @@ impl PresignatureManager { active = ?active.keys_vec(), ?active_filtered, ?state_views, - "running: we don't have enough participants to generate a presignature" + "running: filtered participants are less than threshold for presignature generation" ); return Ok(()); } diff --git a/chain-signatures/node/src/protocol/signature.rs b/chain-signatures/node/src/protocol/signature.rs index 68a1cb9b7..0db04edcf 100644 --- a/chain-signatures/node/src/protocol/signature.rs +++ b/chain-signatures/node/src/protocol/signature.rs @@ -579,8 +579,8 @@ impl SignatureManager { cfg: &ProtocolConfig, ) { if stable.len() < threshold { - tracing::info!( - "Require at least {} stable participants to handle_requests, got {}: {:?}", + tracing::warn!( + "require at least {} stable participants to handle_requests, got {}: {:?}", threshold, stable.len(), stable.keys_vec() From de7e447e474dc0cf0e0a36a9f8d07a19e5a33f7f Mon Sep 17 00:00:00 2001 From: Phuong Nguyen Date: Mon, 12 Aug 2024 19:14:58 +0900 Subject: [PATCH 09/11] Get resharing test to pass --- chain-signatures/contract/src/lib.rs | 13 ++++--- chain-signatures/contract/src/primitives.rs | 4 +-- chain-signatures/contract/tests/vote.rs | 10 +++--- chain-signatures/node/src/mesh/connection.rs | 34 ++++++++++++++++--- chain-signatures/node/src/mesh/mod.rs | 4 +-- .../node/src/protocol/cryptography.rs | 2 -- .../node/src/protocol/signature.rs | 6 ---- .../chain-signatures/src/utils.rs | 32 ++++++++++------- .../chain-signatures/tests/lib.rs | 16 ++++----- 9 files changed, 71 insertions(+), 50 deletions(-) diff --git a/chain-signatures/contract/src/lib.rs b/chain-signatures/contract/src/lib.rs index dd8a1e615..09ab6082b 100644 --- a/chain-signatures/contract/src/lib.rs +++ b/chain-signatures/contract/src/lib.rs @@ -332,11 +332,11 @@ impl VersionedMpcContract { } #[handle_result] - pub fn vote_join(&mut self, candidate_account_id: AccountId) -> Result { + pub fn vote_join(&mut self, candidate: AccountId) -> Result { log!( - "vote_join: signer={}, candidate_account_id={}", + "vote_join: signer={}, candidate={}", env::signer_account_id(), - candidate_account_id + candidate ); let voter = self.voter()?; let protocol_state = self.mutable_state(); @@ -351,14 +351,13 @@ impl VersionedMpcContract { .. }) => { let candidate_info = candidates - .get(&candidate_account_id) + .get(&candidate) .ok_or(VoteError::JoinNotCandidate)?; - let voted = join_votes.entry(candidate_account_id.clone()); + let voted = join_votes.entry(candidate.clone()); voted.insert(voter); if voted.len() >= *threshold { let mut new_participants = participants.clone(); - new_participants - .insert(candidate_account_id.clone(), candidate_info.clone().into()); + new_participants.insert(candidate, candidate_info.clone().into()); *protocol_state = ProtocolContractState::Resharing(ResharingContractState { old_epoch: *epoch, old_participants: participants.clone(), diff --git a/chain-signatures/contract/src/primitives.rs b/chain-signatures/contract/src/primitives.rs index fc2639983..2b87e9478 100644 --- a/chain-signatures/contract/src/primitives.rs +++ b/chain-signatures/contract/src/primitives.rs @@ -205,8 +205,8 @@ impl Candidates { self.candidates.insert(account_id, candidate); } - pub fn remove(&mut self, account_id: &AccountId) { - self.candidates.remove(account_id); + pub fn remove(&mut self, account_id: &AccountId) -> Option { + self.candidates.remove(account_id) } pub fn get(&self, account_id: &AccountId) -> Option<&CandidateInfo> { diff --git a/chain-signatures/contract/tests/vote.rs b/chain-signatures/contract/tests/vote.rs index ec052f731..5db958f12 100644 --- a/chain-signatures/contract/tests/vote.rs +++ b/chain-signatures/contract/tests/vote.rs @@ -76,7 +76,7 @@ async fn test_vote_join() -> anyhow::Result<()> { let execution = accounts[0] .call(contract.id(), "vote_join") .args_json(json!({ - "candidate_account_id": alice.id() + "candidate": alice.id() })) .transact() .await?; @@ -88,7 +88,7 @@ async fn test_vote_join() -> anyhow::Result<()> { let execution = alice .call(contract.id(), "vote_join") .args_json(json!({ - "candidate_account_id": alice.id() + "candidate": alice.id() })) .transact() .await?; @@ -98,7 +98,7 @@ async fn test_vote_join() -> anyhow::Result<()> { let execution = accounts[1] .call(contract.id(), "vote_join") .args_json(json!({ - "candidate_account_id": alice.id() + "candidate": alice.id() })) .transact() .await?; @@ -267,7 +267,7 @@ async fn test_vote_reshare() -> anyhow::Result<()> { let execution = accounts[0] .call(contract.id(), "vote_join") .args_json(json!({ - "candidate_account_id": alice.id() + "candidate": alice.id() })) .transact() .await?; @@ -277,7 +277,7 @@ async fn test_vote_reshare() -> anyhow::Result<()> { let execution = accounts[1] .call(contract.id(), "vote_join") .args_json(json!({ - "candidate_account_id": alice.id() + "candidate": alice.id() })) .transact() .await?; diff --git a/chain-signatures/node/src/mesh/connection.rs b/chain-signatures/node/src/mesh/connection.rs index e77715d01..3bc4e89c2 100644 --- a/chain-signatures/node/src/mesh/connection.rs +++ b/chain-signatures/node/src/mesh/connection.rs @@ -69,11 +69,9 @@ impl Pool { }; let mut req = self.http.get(url.clone()); - if !params.is_empty() { req = req.header("content-type", "application/json").json(¶ms); } - let resp = match req.send().await { Ok(resp) => resp, Err(err) => { @@ -106,7 +104,10 @@ impl Pool { participants } - pub async fn ping_potential(&mut self) -> Participants { + pub async fn ping_potential( + &mut self, + previews: Option<(HashSet, HashSet)>, + ) -> Participants { if let Some((ref active, timestamp)) = *self.potential_active.read().await { if timestamp.elapsed() < DEFAULT_TIMEOUT { return active.clone(); @@ -115,6 +116,16 @@ impl Pool { let connections = self.potential_connections.read().await; + let mut params = HashMap::new(); + if let Some((triples, presignatures)) = previews { + if !triples.is_empty() { + params.insert("triple_preview", triples); + } + if !presignatures.is_empty() { + params.insert("presignature_preview", presignatures); + } + } + let mut status = self.status.write().await; let mut participants = Participants::default(); for (participant, info) in connections.iter() { @@ -122,8 +133,21 @@ impl Pool { continue; }; - let Ok(resp) = self.http.get(url).send().await else { - continue; + let mut req = self.http.get(url.clone()); + if !params.is_empty() { + req = req.header("content-type", "application/json").json(¶ms); + } + let resp = match req.send().await { + Ok(resp) => resp, + Err(err) => { + tracing::warn!( + ?err, + "Pool.ping_potential resp err participant {:?} url {}", + participant, + url + ); + continue; + } }; let Ok(state): Result = resp.json().await else { diff --git a/chain-signatures/node/src/mesh/mod.rs b/chain-signatures/node/src/mesh/mod.rs index 76e37ef90..3a7b82dd7 100644 --- a/chain-signatures/node/src/mesh/mod.rs +++ b/chain-signatures/node/src/mesh/mod.rs @@ -75,8 +75,8 @@ impl Mesh { /// Ping the active participants such that we can see who is alive. pub async fn ping(&mut self, previews: Option<(HashSet, HashSet)>) { - self.active_participants = self.connections.ping(previews).await; - self.active_potential_participants = self.connections.ping_potential().await; + self.active_participants = self.connections.ping(previews.clone()).await; + self.active_potential_participants = self.connections.ping_potential(previews).await; tracing::debug!( active = ?self.active_participants.account_ids(), diff --git a/chain-signatures/node/src/protocol/cryptography.rs b/chain-signatures/node/src/protocol/cryptography.rs index b0b90f1b0..77226678f 100644 --- a/chain-signatures/node/src/protocol/cryptography.rs +++ b/chain-signatures/node/src/protocol/cryptography.rs @@ -434,8 +434,6 @@ impl CryptographicProtocol for RunningState { // block height is up to date, such that they too can process signature requests. If they cannot // then they are considered unstable and should not be a part of signature generation this round. let stable = ctx.mesh().stable_participants().await; - tracing::info!(?stable, "stable participants"); - let mut sign_queue = self.sign_queue.write().await; crate::metrics::SIGN_QUEUE_SIZE .with_label_values(&[my_account_id.as_str()]) diff --git a/chain-signatures/node/src/protocol/signature.rs b/chain-signatures/node/src/protocol/signature.rs index 516ef8c71..14e74b90f 100644 --- a/chain-signatures/node/src/protocol/signature.rs +++ b/chain-signatures/node/src/protocol/signature.rs @@ -105,12 +105,6 @@ impl SignQueue { my_account_id: &AccountId, ) { if stable.len() < threshold { - tracing::warn!( - "Require at least {} stable participants to organize, got {}: {:?}", - threshold, - stable.len(), - stable.keys_vec() - ); return; } for request in self.unorganized_requests.drain(..) { diff --git a/integration-tests/chain-signatures/src/utils.rs b/integration-tests/chain-signatures/src/utils.rs index ca9d19a08..885956559 100644 --- a/integration-tests/chain-signatures/src/utils.rs +++ b/integration-tests/chain-signatures/src/utils.rs @@ -5,39 +5,47 @@ use near_workspaces::{Account, AccountId}; use std::fs; pub async fn vote_join( - accounts: Vec, + accounts: &[&Account], mpc_contract: &AccountId, - account_id: &AccountId, + candidate: &AccountId, + threshold: usize, ) -> anyhow::Result<()> { let vote_futures = accounts .iter() .map(|account| { - tracing::info!( - "{} voting for new participant: {}", - account.id(), - account_id - ); + tracing::info!("{} voting for new participant: {}", account.id(), candidate); account .call(mpc_contract, "vote_join") .args_json(serde_json::json!({ - "candidate_account_id": account_id + "candidate": candidate })) .transact() }) .collect::>(); - futures::future::join_all(vote_futures) + let successes = futures::future::join_all(vote_futures) .await .iter() - .for_each(|result| { - assert!(result.as_ref().unwrap().failures().is_empty()); + .fold(0, |acc, next| { + let outcome = next.as_ref().unwrap(); + if outcome.is_failure() { + tracing::error!("voting failed: {:?}", outcome); + } + + let val = outcome.is_success() as usize; + acc + val }); + assert!( + successes >= threshold, + "Voting did not pass the threshold={} for new participant to join", + threshold + ); Ok(()) } pub async fn vote_leave( - accounts: Vec, + accounts: &[&Account], mpc_contract: &AccountId, account_id: &AccountId, ) -> Vec> { diff --git a/integration-tests/chain-signatures/tests/lib.rs b/integration-tests/chain-signatures/tests/lib.rs index f37ccaaf9..3cb851d5e 100644 --- a/integration-tests/chain-signatures/tests/lib.rs +++ b/integration-tests/chain-signatures/tests/lib.rs @@ -71,22 +71,21 @@ impl MultichainTestContext<'_> { self.nodes.start_node(&account_id, &sk, &self.cfg).await?; // Wait for new node to add itself as a candidate - tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; + tokio::time::sleep(tokio::time::Duration::from_secs(10)).await; // T number of participants should vote let participants = self.participant_accounts().await?; let voting_participants = participants .iter() .take(state.threshold) - .cloned() .collect::>(); - assert!(vote_join( - voting_participants, + vote_join( + &voting_participants, self.nodes.ctx().mpc_contract.id(), &account_id, + self.cfg.threshold, ) - .await - .is_ok()); + .await?; let new_state = wait_for::running_mpc(self, Some(state.epoch + 1)).await?; assert_eq!(new_state.participants.len(), state.participants.len() + 1); @@ -112,12 +111,11 @@ impl MultichainTestContext<'_> { .iter() .filter(|account| account.id() != leaving_account_id) .take(state.threshold) - .cloned() - .collect::>(); + .collect::>(); tracing::info!("Removing vote from: {:?}", voting_accounts); let results = vote_leave( - voting_accounts.clone(), + &voting_accounts, self.nodes.ctx().mpc_contract.id(), leaving_account_id, ) From 82de0988b59bd5c5da47ab69cfeff3e7b850b505 Mon Sep 17 00:00:00 2001 From: Phuong Nguyen Date: Fri, 4 Oct 2024 17:25:09 +0900 Subject: [PATCH 10/11] Fix reshare test --- chain-signatures/contract/src/lib.rs | 1 + chain-signatures/node/src/indexer.rs | 5 ++--- integration-tests/chain-signatures/tests/cases/mod.rs | 6 +++--- integration-tests/chain-signatures/tests/lib.rs | 5 +++-- 4 files changed, 9 insertions(+), 8 deletions(-) diff --git a/chain-signatures/contract/src/lib.rs b/chain-signatures/contract/src/lib.rs index c75fccb90..41d8eecc9 100644 --- a/chain-signatures/contract/src/lib.rs +++ b/chain-signatures/contract/src/lib.rs @@ -352,6 +352,7 @@ impl VersionedMpcContract { join_votes, .. }) => { + log!("voting for {candidate:?} in {candidates:?}"); let candidate_info = candidates .get(&candidate) .ok_or(VoteError::JoinNotCandidate)?; diff --git a/chain-signatures/node/src/indexer.rs b/chain-signatures/node/src/indexer.rs index 2ecdb958f..1a8ea06a0 100644 --- a/chain-signatures/node/src/indexer.rs +++ b/chain-signatures/node/src/indexer.rs @@ -336,9 +336,8 @@ pub fn run( let Ok(lake) = rt.block_on(async { let latest = context.indexer.latest_block_height().await; - if i > 0 { - tracing::warn!("indexer latest height {latest}, restart count={i}"); - } + tracing::info!("indexer latest height {latest}"); + let mut lake_builder = LakeBuilder::default() .s3_bucket_name(&options.s3_bucket) .s3_region_name(&options.s3_region) diff --git a/integration-tests/chain-signatures/tests/cases/mod.rs b/integration-tests/chain-signatures/tests/cases/mod.rs index 2cb2cad70..2476b9743 100644 --- a/integration-tests/chain-signatures/tests/cases/mod.rs +++ b/integration-tests/chain-signatures/tests/cases/mod.rs @@ -28,7 +28,7 @@ async fn test_multichain_reshare() -> anyhow::Result<()> { actions::single_signature_production(&ctx, &state).await?; tracing::info!("!!! Add participant 3"); - assert!(ctx.add_participant(None).await.is_ok()); + ctx.add_participant(None).await.unwrap(); let state = wait_for::running_mpc(&ctx, None).await?; wait_for::has_at_least_triples(&ctx, 2).await?; wait_for::has_at_least_presignatures(&ctx, 2).await?; @@ -39,7 +39,7 @@ async fn test_multichain_reshare() -> anyhow::Result<()> { state.participants.keys().nth(2).unwrap().clone().as_ref(), ) .unwrap(); - assert!(ctx.remove_participant(Some(&account_2)).await.is_ok()); + ctx.remove_participant(Some(&account_2)).await.unwrap(); let account_0 = near_workspaces::types::AccountId::from_str( state.participants.keys().next().unwrap().clone().as_ref(), ) @@ -63,7 +63,7 @@ async fn test_multichain_reshare() -> anyhow::Result<()> { actions::single_signature_production(&ctx, &state).await?; tracing::info!("!!! Add back participant 0"); - assert!(ctx.add_participant(Some(node_cfg_0)).await.is_ok()); + ctx.add_participant(Some(node_cfg_0)).await.unwrap(); let state = wait_for::running_mpc(&ctx, None).await?; wait_for::has_at_least_triples(&ctx, 2).await?; wait_for::has_at_least_presignatures(&ctx, 2).await?; diff --git a/integration-tests/chain-signatures/tests/lib.rs b/integration-tests/chain-signatures/tests/lib.rs index a258e6f9f..d8ea324f8 100644 --- a/integration-tests/chain-signatures/tests/lib.rs +++ b/integration-tests/chain-signatures/tests/lib.rs @@ -61,7 +61,7 @@ impl MultichainTestContext<'_> { self.nodes.start_node(&self.cfg, &node_account).await?; // Wait for new node to add itself as a candidate - tokio::time::sleep(tokio::time::Duration::from_secs(10)).await; + tokio::time::sleep(tokio::time::Duration::from_secs(20)).await; // T number of participants should vote let participants = self.participant_accounts().await?; @@ -76,7 +76,8 @@ impl MultichainTestContext<'_> { node_account.id(), self.cfg.threshold, ) - .await?; + .await + .unwrap(); let new_state = wait_for::running_mpc(self, Some(state.epoch + 1)).await?; assert_eq!(new_state.participants.len(), state.participants.len() + 1); From 536d7f929968d2aa71bb48fb2fbf42d65a2852e5 Mon Sep 17 00:00:00 2001 From: Phuong N Date: Wed, 9 Oct 2024 07:57:09 +0000 Subject: [PATCH 11/11] Bump wait for new node startup to 30secs --- integration-tests/chain-signatures/tests/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/integration-tests/chain-signatures/tests/lib.rs b/integration-tests/chain-signatures/tests/lib.rs index d8ea324f8..9323a6e72 100644 --- a/integration-tests/chain-signatures/tests/lib.rs +++ b/integration-tests/chain-signatures/tests/lib.rs @@ -61,7 +61,7 @@ impl MultichainTestContext<'_> { self.nodes.start_node(&self.cfg, &node_account).await?; // Wait for new node to add itself as a candidate - tokio::time::sleep(tokio::time::Duration::from_secs(20)).await; + tokio::time::sleep(tokio::time::Duration::from_secs(30)).await; // T number of participants should vote let participants = self.participant_accounts().await?;