Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(node): integrate topdown observation #1158

Draft
wants to merge 2 commits into
base: refactor-syncer
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 23 additions & 14 deletions fendermint/vm/topdown/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use std::time::Duration;
pub use crate::cache::{SequentialAppendError, SequentialKeyCache, ValueIter};
pub use crate::error::Error;
pub use crate::finality::CachedFinalityProvider;
use crate::observation::Observation;
pub use crate::toggle::Toggle;

pub type BlockHeight = u64;
Expand Down Expand Up @@ -112,20 +113,8 @@ impl Config {
/// On-chain data structure representing a topdown checkpoint agreed to by a
/// majority of subnet validators. DAG-CBOR encoded, embedded in CertifiedCheckpoint.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct Checkpoint {
/// Checkpoint version, expected to increment with schema changes.
version: u8,
/// The parent height we are forwarding our parent crosslink to.
target_height: u64,
/// The hash of the chain unit at that height. Usually a block hash, but could
/// be a different entity (e.g. tipset CID), depending on the parent chain
/// and our interface to it (e.g. if the parent is a Filecoin network, this
/// would be a tipset CID coerced into a block hash if we use the Eth API,
/// or the tipset CID as-is if we use the Filecoin API.
target_hash: BlockHash,
/// The commitment is an accumulated hash of all topdown effects since the genesis epoch
/// in the parent till the current parent block height(inclusive).
effects_commitment: Bytes,
pub enum Checkpoint {
V1(Observation),
}

/// The finality view for IPC parent at certain height.
Expand Down Expand Up @@ -213,3 +202,23 @@ pub(crate) fn is_null_round_error(err: &anyhow::Error) -> bool {
pub(crate) fn is_null_round_str(s: &str) -> bool {
s.contains(NULL_ROUND_ERR_MSG)
}

impl Checkpoint {
pub fn target_height(&self) -> BlockHeight {
match self {
Checkpoint::V1(b) => b.parent_height,
}
}

pub fn target_hash(&self) -> &Bytes {
match self {
Checkpoint::V1(b) => &b.parent_hash,
}
}

pub fn cumulative_effects_comm(&self) -> &Bytes {
match self {
Checkpoint::V1(b) => &b.cumulative_effects_comm,
}
}
}
130 changes: 125 additions & 5 deletions fendermint/vm/topdown/src/observation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,32 @@

use crate::syncer::error::Error;
use crate::syncer::store::ParentViewStore;
use crate::{BlockHeight, Bytes, Checkpoint};
use crate::{BlockHash, BlockHeight, Bytes, Checkpoint};
use anyhow::anyhow;
use arbitrary::Arbitrary;
use cid::Cid;
use fendermint_crypto::secp::RecoverableECDSASignature;
use fendermint_crypto::SecretKey;
use fendermint_vm_genesis::ValidatorKey;
use fvm_ipld_encoding::DAG_CBOR;
use multihash::Code;
use multihash::MultihashDigest;
use serde::{Deserialize, Serialize};
use std::cmp::min;
use std::fmt::{Display, Formatter};

use crate::syncer::payload::ParentBlockView;

/// Default topdown observation height range
const DEFAULT_MAX_OBSERVATION_RANGE: BlockHeight = 100;

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ObservationConfig {
/// The max number of blocks one should make the topdown observation from the previous
/// committed checkpoint
pub max_observation_range: Option<BlockHeight>,
}

/// The content that validators gossip among each other.
#[derive(Serialize, Deserialize, Hash, Debug, Clone, Eq, PartialEq, Arbitrary)]
pub struct Observation {
Expand Down Expand Up @@ -43,12 +60,51 @@ pub struct CertifiedObservation {
signature: RecoverableECDSASignature,
}

/// check in the store to see if there is a new observation available
/// Check in the store to see if there is a new observation available.
/// Caller should make sure:
/// - the store has votes since the last committed checkpoint
/// - the votes have at least 1 non-null block
pub fn deduce_new_observation<S: ParentViewStore>(
_store: &S,
_checkpoint: &Checkpoint,
store: &S,
checkpoint: &Checkpoint,
config: &ObservationConfig,
) -> Result<Observation, Error> {
todo!()
let Some(latest_height) = store.max_parent_view_height()? else {
tracing::info!("no observation yet as height not available");
return Err(Error::BlockStoreEmpty);
};

if latest_height < checkpoint.target_height() {
tracing::info!("committed vote height more than latest parent view");
return Err(Error::CommittedParentHeightNotPurged);
}

let max_observation_height = checkpoint.target_height() + config.max_observation_range();
let candidate_height = min(max_observation_height, latest_height);
tracing::debug!(
max_observation_height,
candidate_height,
"propose observation height"
);

// aggregate commitment for the observation
let mut agg = LinearizedParentBlockView::from(checkpoint);
for h in checkpoint.target_height() + 1..=candidate_height {
let Some(p) = store.get(h)? else {
tracing::debug!(height = h, "not parent block view");
return Err(Error::MissingBlockView(h, candidate_height));
};

agg.append(p)?;
}

let observation = agg.into_observation()?;
tracing::info!(
height = observation.parent_height,
"new observation derived"
);

Ok(observation)
}

impl TryFrom<&[u8]> for CertifiedObservation {
Expand Down Expand Up @@ -131,3 +187,67 @@ impl Observation {
self.parent_height
}
}

impl ObservationConfig {
pub fn max_observation_range(&self) -> BlockHeight {
self.max_observation_range
.unwrap_or(DEFAULT_MAX_OBSERVATION_RANGE)
}
}

struct LinearizedParentBlockView {
parent_height: u64,
parent_hash: Option<BlockHash>,
cumulative_effects_comm: Bytes,
}

impl From<&Checkpoint> for LinearizedParentBlockView {
fn from(value: &Checkpoint) -> Self {
LinearizedParentBlockView {
parent_height: value.target_height(),
parent_hash: Some(value.target_hash().clone()),
cumulative_effects_comm: value.cumulative_effects_comm().clone(),
}
}
}

impl LinearizedParentBlockView {
fn new_commitment(&mut self, to_append: Bytes) {
let bytes = [
self.cumulative_effects_comm.as_slice(),
to_append.as_slice(),
]
.concat();
let cid = Cid::new_v1(DAG_CBOR, Code::Blake2b256.digest(&bytes));
self.cumulative_effects_comm = cid.to_bytes();
}

pub fn append(&mut self, view: ParentBlockView) -> Result<(), Error> {
if self.parent_height + 1 != view.parent_height {
return Err(Error::NotSequential);
}

self.parent_height += 1;

self.new_commitment(view.effects_commitment()?);

if let Some(p) = view.payload {
self.parent_hash = Some(p.parent_hash);
}

Ok(())
}

fn into_observation(self) -> Result<Observation, Error> {
let Some(hash) = self.parent_hash else {
return Err(Error::CannotCommitObservationAtNullBlock(
self.parent_height,
));
};
Ok(Observation::new(
self.parent_height,
hash,
self.cumulative_effects_comm,
))
}
}
10 changes: 10 additions & 0 deletions fendermint/vm/topdown/src/syncer/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,14 @@ pub enum Error {
ParentChainReorgDetected,
#[error("Cannot query parent at height {1}: {0}")]
CannotQueryParent(String, BlockHeight),
#[error("Parent block view store is empty")]
BlockStoreEmpty,
#[error("Committed block height not purged yet")]
CommittedParentHeightNotPurged,
#[error("Cannot serialize parent block view payload to bytes")]
CannotSerializeParentBlockView,
#[error("Cannot create commitment at null parent block {0}")]
CannotCommitObservationAtNullBlock(BlockHeight),
#[error("Missing block view at height {0} for target observation height {0}")]
MissingBlockView(BlockHeight, BlockHeight),
}
6 changes: 4 additions & 2 deletions fendermint/vm/topdown/src/syncer/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright 2022-2024 Protocol Labs
// SPDX-License-Identifier: Apache-2.0, MIT

use crate::observation::Observation;
use crate::observation::{Observation, ObservationConfig};
use crate::proxy::ParentQueryProxy;
use crate::syncer::poll::ParentPoll;
use crate::syncer::store::ParentViewStore;
Expand Down Expand Up @@ -41,6 +41,8 @@ pub struct ParentSyncerConfig {
pub max_store_blocks: BlockHeight,
/// Attempts to sync as many block as possible till the finalized chain head
pub sync_many: bool,

pub observation: ObservationConfig,
}

#[derive(Clone)]
Expand Down Expand Up @@ -104,7 +106,7 @@ where
{
match req {
ParentSyncerRequest::Finalized(c) => {
let height = c.target_height;
let height = c.target_height();
if let Err(e) = poller.finalize(c) {
tracing::error!(height, err = e.to_string(), "cannot finalize parent viewer");
}
Expand Down
21 changes: 20 additions & 1 deletion fendermint/vm/topdown/src/syncer/payload.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
// Copyright 2022-2024 Protocol Labs
// SPDX-License-Identifier: Apache-2.0, MIT

use crate::{BlockHash, BlockHeight};
use crate::syncer::error::Error;
use crate::{BlockHash, BlockHeight, Bytes};
use cid::Cid;
use fvm_ipld_encoding::DAG_CBOR;
use ipc_api::cross::IpcEnvelope;
use ipc_api::staking::StakingChangeRequest;
use multihash::Code;
use multihash::MultihashDigest;

#[derive(Clone, Debug)]
pub struct ParentBlockViewPayload {
Expand Down Expand Up @@ -44,4 +49,18 @@ impl ParentBlockView {
}),
}
}

pub fn effects_commitment(&self) -> Result<Bytes, Error> {
let Some(ref p) = self.payload else {
return Ok(Cid::default().to_bytes());
};

let bytes =
fvm_ipld_encoding::to_vec(&(&p.xnet_msgs, &p.validator_changes)).map_err(|e| {
tracing::error!(err = e.to_string(), "cannot serialize parent block view");
Error::CannotSerializeParentBlockView
})?;
let cid = Cid::new_v1(DAG_CBOR, Code::Blake2b256.digest(&bytes));
Ok(cid.to_bytes())
}
}
17 changes: 9 additions & 8 deletions fendermint/vm/topdown/src/syncer/poll.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ where
let Some(min_height) = self.store.min_parent_view_height()? else {
return Ok(());
};
for h in min_height..=checkpoint.target_height {
for h in min_height..=checkpoint.target_height() {
self.store.purge(h)?;
}
Ok(())
Expand All @@ -55,12 +55,12 @@ where
async fn latest_nonnull_data(&self) -> anyhow::Result<(BlockHeight, BlockHash)> {
let Some(latest_height) = self.store.max_parent_view_height()? else {
return Ok((
self.last_finalized.target_height,
self.last_finalized.target_hash.clone(),
self.last_finalized.target_height(),
self.last_finalized.target_hash().clone(),
));
};

let start = self.last_finalized.target_height + 1;
let start = self.last_finalized.target_height() + 1;
for h in (start..=latest_height).rev() {
let Some(p) = self.store.get(h)? else {
continue;
Expand All @@ -76,8 +76,8 @@ where

// this means the votes stored are all null blocks, return last committed finality
Ok((
self.last_finalized.target_height,
self.last_finalized.target_hash.clone(),
self.last_finalized.target_height(),
self.last_finalized.target_hash().clone(),
))
}

Expand Down Expand Up @@ -145,7 +145,7 @@ where
let Some(h) = self.store.max_parent_view_height()? else {
return Ok(false);
};
Ok(h - self.last_finalized.target_height > self.config.max_store_blocks)
Ok(h - self.last_finalized.target_height() > self.config.max_store_blocks)
}

async fn finalized_chain_head(&self) -> anyhow::Result<Option<BlockHeight>> {
Expand Down Expand Up @@ -222,7 +222,8 @@ where
let view = fetch_data(&self.parent_proxy, height, block_hash_res.block_hash).await?;

self.store.store(view.clone())?;
let commitment = deduce_new_observation(&self.store, &self.last_finalized)?;
let commitment =
deduce_new_observation(&self.store, &self.last_finalized, &self.config.observation)?;
// if there is an error, ignore, we can always try next loop
let _ = self
.event_broadcast
Expand Down
31 changes: 30 additions & 1 deletion fendermint/vm/topdown/src/syncer/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

use crate::syncer::error::Error;
use crate::syncer::payload::ParentBlockView;
use crate::BlockHeight;
use crate::{BlockHeight, SequentialKeyCache};

/// Stores the parent view observed of the current node
pub trait ParentViewStore {
Expand All @@ -20,3 +20,32 @@ pub trait ParentViewStore {

fn max_parent_view_height(&self) -> Result<Option<BlockHeight>, Error>;
}

pub struct InMemoryParentViewStore {
inner: SequentialKeyCache<BlockHeight, ParentBlockView>,
}

impl ParentViewStore for InMemoryParentViewStore {
fn store(&mut self, view: ParentBlockView) -> Result<(), Error> {
self.inner
.append(view.parent_height, view)
.map_err(|_| Error::NonSequentialParentViewInsert)
}

fn get(&self, height: BlockHeight) -> Result<Option<ParentBlockView>, Error> {
Ok(self.inner.get_value(height).cloned())
}

fn purge(&mut self, height: BlockHeight) -> Result<(), Error> {
self.inner.remove_key_below(height + 1);
Ok(())
}

fn min_parent_view_height(&self) -> Result<Option<BlockHeight>, Error> {
Ok(self.inner.lower_bound())
}

fn max_parent_view_height(&self) -> Result<Option<BlockHeight>, Error> {
Ok(self.inner.upper_bound())
}
}
Loading