Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: precheck triple and presigs #802

Open
wants to merge 15 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions chain-signatures/contract/src/config/impls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ impl Default for TripleConfig {
min_triples: 1024,
max_triples: 1024 * MAX_EXPECTED_PARTICIPANTS * NETWORK_MULTIPLIER,
generation_timeout: min_to_ms(10),
preview_limit: 128,

other: Default::default(),
}
Expand All @@ -58,6 +59,7 @@ impl Default for PresignatureConfig {
min_presignatures: 512,
max_presignatures: 512 * MAX_EXPECTED_PARTICIPANTS * NETWORK_MULTIPLIER,
generation_timeout: secs_to_ms(45),
preview_limit: 128,

other: Default::default(),
}
Expand Down
10 changes: 8 additions & 2 deletions chain-signatures/contract/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ pub struct TripleConfig {
pub max_triples: u32,
/// Timeout for triple generation in milliseconds.
pub generation_timeout: u64,
/// Max amount of Triple IDs allowed to be previewed in state.
pub preview_limit: u32,

/// The remaining entries that can be present in future forms of the configuration.
#[serde(flatten)]
Expand All @@ -73,6 +75,8 @@ pub struct PresignatureConfig {
pub max_presignatures: u32,
/// Timeout for presignature generation in milliseconds.
pub generation_timeout: u64,
/// Max amount of Presignature IDs allowed to be previewed in state.
pub preview_limit: u32,

/// The remaining entries that can be present in future forms of the configuration.
#[serde(flatten)]
Expand Down Expand Up @@ -110,12 +114,14 @@ mod tests {
"triple": {
"min_triples": 10,
"max_triples": 100,
"generation_timeout": 10000
"generation_timeout": 10000,
"preview_limit": 128,
},
"presignature": {
"min_presignatures": 10,
"max_presignatures": 100,
"generation_timeout": 10000
"generation_timeout": 10000,
"preview_limit": 256,
},
"signature": {
"generation_timeout": 10000,
Expand Down
1 change: 1 addition & 0 deletions chain-signatures/contract/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,7 @@ impl VersionedMpcContract {
join_votes,
..
}) => {
log!("voting for {candidate:?} in {candidates:?}");
let candidate_info = candidates
.get(&candidate)
.ok_or(VoteError::JoinNotCandidate)?;
Expand Down
4 changes: 2 additions & 2 deletions chain-signatures/contract/src/primitives.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,8 +205,8 @@ impl Candidates {
self.candidates.insert(account_id, candidate);
}

pub fn remove(&mut self, account_id: &AccountId) {
self.candidates.remove(account_id);
pub fn remove(&mut self, account_id: &AccountId) -> Option<CandidateInfo> {
self.candidates.remove(account_id)
}

pub fn get(&self, account_id: &AccountId) -> Option<&CandidateInfo> {
Expand Down
18 changes: 10 additions & 8 deletions chain-signatures/node/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,14 @@ pub fn run(cmd: Cli) -> anyhow::Result<()> {
client_headers.insert(http::header::REFERER, referer_param.parse().unwrap());
}

let config = Arc::new(RwLock::new(Config::new(LocalConfig {
over: override_config.unwrap_or_else(Default::default),
network: NetworkConfig {
cipher_pk: hpke::PublicKey::try_from_bytes(&hex::decode(cipher_pk)?)?,
sign_sk,
},
})));

tracing::debug!(rpc_addr = rpc_client.rpc_addr(), "rpc client initialized");
let signer = InMemorySigner::from_secret_key(account_id.clone(), account_sk);
let (protocol, protocol_state) = MpcSignProtocol::init(
Expand All @@ -228,13 +236,7 @@ pub fn run(cmd: Cli) -> anyhow::Result<()> {
sign_queue,
key_storage,
triple_storage,
Config::new(LocalConfig {
over: override_config.unwrap_or_else(Default::default),
network: NetworkConfig {
cipher_pk: hpke::PublicKey::try_from_bytes(&hex::decode(cipher_pk)?)?,
sign_sk,
},
}),
config.clone(),
);

rt.block_on(async {
Expand All @@ -243,7 +245,7 @@ pub fn run(cmd: Cli) -> anyhow::Result<()> {
tracing::debug!("protocol thread spawned");
let cipher_sk = hpke::SecretKey::try_from_bytes(&hex::decode(cipher_sk)?)?;
let web_handle = tokio::spawn(async move {
web::run(web_port, sender, cipher_sk, protocol_state, indexer).await
web::run(web_port, sender, cipher_sk, protocol_state, indexer, config).await
});
tracing::debug!("protocol http server spawned");

Expand Down
5 changes: 2 additions & 3 deletions chain-signatures/node/src/indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -336,9 +336,8 @@ pub fn run(

let Ok(lake) = rt.block_on(async {
let latest = context.indexer.latest_block_height().await;
if i > 0 {
tracing::warn!("indexer latest height {latest}, restart count={i}");
}
tracing::info!("indexer latest height {latest}");

let mut lake_builder = LakeBuilder::default()
.s3_bucket_name(&options.s3_bucket)
.s3_region_name(&options.s3_region)
Expand Down
83 changes: 68 additions & 15 deletions chain-signatures/node/src/mesh/connection.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use std::time::{Duration, Instant};

use cait_sith::protocol::Participant;
use tokio::sync::RwLock;
use url::Url;

use crate::protocol::contract::primitives::Participants;
use crate::protocol::presignature::PresignatureId;
use crate::protocol::triple::TripleId;
use crate::protocol::ProtocolState;
use crate::web::StateView;

Expand All @@ -19,16 +21,19 @@ pub struct Pool {
http: reqwest::Client,
connections: RwLock<Participants>,
potential_connections: RwLock<Participants>,
status: RwLock<HashMap<Participant, StateView>>,

/// The currently active participants for this epoch.
current_active: RwLock<Option<(Participants, Instant)>>,
// Potentially active participants that we can use to establish a connection in the next epoch.
potential_active: RwLock<Option<(Participants, Instant)>>,

pub status: RwLock<HashMap<Participant, StateView>>,
}

impl Pool {
pub async fn ping(&self) -> Participants {
pub async fn ping(
&mut self,
previews: Option<(HashSet<TripleId>, HashSet<PresignatureId>)>,
) -> Participants {
if let Some((ref active, timestamp)) = *self.current_active.read().await {
if timestamp.elapsed() < DEFAULT_TIMEOUT {
return active.clone();
Expand All @@ -37,7 +42,21 @@ impl Pool {

let connections = self.connections.read().await;

let mut params = HashMap::new();
if let Some((triples, presignatures)) = previews {
if !triples.is_empty() {
params.insert("triple_preview", triples);
}
if !presignatures.is_empty() {
params.insert("presignature_preview", presignatures);
}
}

let mut status = self.status.write().await;
// Clear the status before we overwrite it just so we don't have any stale participant
// statuses that are no longer in the network after a reshare.
status.clear();

let mut participants = Participants::default();
for (participant, info) in connections.iter() {
let Ok(Ok(url)) = Url::parse(&info.url).map(|url| url.join("/state")) else {
Expand All @@ -49,13 +68,21 @@ impl Pool {
continue;
};

let Ok(resp) = self.http.get(url.clone()).send().await else {
tracing::warn!(
"Pool.ping resp err participant {:?} url {}",
participant,
url
);
continue;
let mut req = self.http.get(url.clone());
if !params.is_empty() {
req = req.header("content-type", "application/json").json(&params);
}
let resp = match req.send().await {
Ok(resp) => resp,
Err(err) => {
tracing::warn!(
?err,
"Pool.ping resp err participant {:?} url {}",
participant,
url
);
continue;
}
};

let Ok(state): Result<StateView, _> = resp.json().await else {
Expand All @@ -77,7 +104,10 @@ impl Pool {
participants
}

pub async fn ping_potential(&self) -> Participants {
pub async fn ping_potential(
&mut self,
previews: Option<(HashSet<TripleId>, HashSet<PresignatureId>)>,
) -> Participants {
if let Some((ref active, timestamp)) = *self.potential_active.read().await {
if timestamp.elapsed() < DEFAULT_TIMEOUT {
return active.clone();
Expand All @@ -86,15 +116,38 @@ impl Pool {

let connections = self.potential_connections.read().await;

let mut params = HashMap::new();
if let Some((triples, presignatures)) = previews {
if !triples.is_empty() {
params.insert("triple_preview", triples);
}
if !presignatures.is_empty() {
params.insert("presignature_preview", presignatures);
}
}

let mut status = self.status.write().await;
let mut participants = Participants::default();
for (participant, info) in connections.iter() {
let Ok(Ok(url)) = Url::parse(&info.url).map(|url| url.join("/state")) else {
continue;
};

let Ok(resp) = self.http.get(url).send().await else {
continue;
let mut req = self.http.get(url.clone());
if !params.is_empty() {
req = req.header("content-type", "application/json").json(&params);
}
let resp = match req.send().await {
Ok(resp) => resp,
Err(err) => {
tracing::warn!(
?err,
"Pool.ping_potential resp err participant {:?} url {}",
participant,
url
);
continue;
}
};

let Ok(state): Result<StateView, _> = resp.json().await else {
Expand All @@ -104,7 +157,6 @@ impl Pool {
status.insert(*participant, state);
participants.insert(participant, info.clone());
}
drop(status);

let mut potential_active = self.potential_active.write().await;
*potential_active = Some((participants.clone(), Instant::now()));
Expand Down Expand Up @@ -156,6 +208,7 @@ impl Pool {
.get(participant)
.map_or(false, |state| match state {
StateView::Running { is_stable, .. } => *is_stable,
StateView::Resharing { is_stable, .. } => *is_stable,
_ => false,
})
}
Expand Down
20 changes: 15 additions & 5 deletions chain-signatures/node/src/mesh/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
use std::collections::{HashMap, HashSet};

use cait_sith::protocol::Participant;

use crate::protocol::contract::primitives::Participants;
use crate::protocol::presignature::PresignatureId;
use crate::protocol::triple::TripleId;
use crate::protocol::ProtocolState;
use crate::web::StateView;

pub mod connection;

Expand Down Expand Up @@ -64,7 +71,12 @@ impl Mesh {
self.connections
.establish_participants(contract_state)
.await;
self.ping().await;
}

/// Ping the active participants such that we can see who is alive.
pub async fn ping(&mut self, previews: Option<(HashSet<TripleId>, HashSet<PresignatureId>)>) {
self.active_participants = self.connections.ping(previews.clone()).await;
self.active_potential_participants = self.connections.ping_potential(previews).await;

tracing::debug!(
active = ?self.active_participants.account_ids(),
Expand All @@ -73,9 +85,7 @@ impl Mesh {
);
}

/// Ping the active participants such that we can see who is alive.
pub async fn ping(&mut self) {
self.active_participants = self.connections.ping().await;
self.active_potential_participants = self.connections.ping_potential().await;
pub async fn state_views(&self) -> HashMap<Participant, StateView> {
self.connections.status.read().await.clone()
}
}
8 changes: 5 additions & 3 deletions chain-signatures/node/src/protocol/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use url::Url;
use near_account_id::AccountId;
use near_crypto::InMemorySigner;

#[async_trait::async_trait]
pub trait ConsensusCtx {
fn my_account_id(&self) -> &AccountId;
fn http_client(&self) -> &reqwest::Client;
Expand All @@ -41,7 +42,7 @@ pub trait ConsensusCtx {
fn sign_queue(&self) -> Arc<RwLock<SignQueue>>;
fn secret_storage(&self) -> &SecretNodeStorageBox;
fn triple_storage(&self) -> LockTripleNodeStorageBox;
fn cfg(&self) -> &Config;
async fn cfg(&self) -> Config;
}

#[derive(thiserror::Error, Debug)]
Expand Down Expand Up @@ -658,12 +659,13 @@ impl ConsensusProtocol for JoiningState {
tracing::info!(
"joining(running): sending a transaction to join the participant set"
);
let cfg = ctx.cfg().await;
ctx.rpc_client()
.call(ctx.signer(), ctx.mpc_contract_id(), "join")
.args_json(json!({
"url": ctx.my_address(),
"cipher_pk": ctx.cfg().local.network.cipher_pk.to_bytes(),
"sign_pk": ctx.cfg().local.network.sign_sk.public_key(),
"cipher_pk": cfg.local.network.cipher_pk.to_bytes(),
"sign_pk": cfg.local.network.sign_sk.public_key(),
}))
.max_gas()
.retry_exponential(10, 3)
Expand Down
Loading
Loading