Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement packet clearing in relayer next #3488

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
0eb4265
Implement CanQueryPacketCommitments for relayer-next
ljoss17 Jul 13, 2023
8806a8a
Implement CanQueryUnreceivedPacketSequences for relayer-next
ljoss17 Jul 14, 2023
24a4104
Implement CanQueryUnreceivedPacketEvents for relayer-next
ljoss17 Jul 14, 2023
86c5476
Implement CanQueryUnreceivedPackets for relayer-next
ljoss17 Jul 14, 2023
42d5c54
Merge branch 'soares/relayer-next' into luca_joss/relayer-next-clear-…
ljoss17 Jul 17, 2023
55399d5
Implement PacketClearerWithTarget for relayer-next
ljoss17 Jul 17, 2023
f300506
Fix clear_packets_with_target
ljoss17 Jul 18, 2023
9779c22
Merge branch 'soares/relayer-next' into luca_joss/relayer-next-clear-…
ljoss17 Jul 18, 2023
4f15ac8
Partial implementation of ClearPacketWorker for relayer-next
ljoss17 Jul 19, 2023
25904ee
Remove PacketClearConfig
ljoss17 Jul 19, 2023
97eeb50
Rename channels to src/dst instead of using counterparty
ljoss17 Jul 19, 2023
0579c59
Improve clear_receive_packets
ljoss17 Jul 20, 2023
b54144e
Add test for packet clearing relayer-next
ljoss17 Jul 20, 2023
071c996
Fix and improve Clear packet implementation and tests for relayer-next
ljoss17 Jul 21, 2023
acf2b7b
Add HasClearInterval trait for relayer-next
ljoss17 Jul 24, 2023
7b69032
Remove Target from ClearPacketWorker in relayer-next
ljoss17 Jul 24, 2023
7ba9308
Merge branch 'soares/relayer-next' into luca_joss/relayer-next-clear-…
ljoss17 Jul 31, 2023
528e034
Update cargo lock
ljoss17 Jul 31, 2023
92c386f
Fix 'query_unreceived_packet_sequences' and 'query_packet_commitments…
ljoss17 Aug 2, 2023
132ae5d
Add documentation to 'query_packet_commitments'
ljoss17 Aug 2, 2023
11da2e4
Rename 'CanQueryUnreceivedPackets' to 'CanQuerySendPacketsFromSequences'
ljoss17 Aug 2, 2023
495bffb
Add documentation for 'height' used in 'query_unreceived_packets'
ljoss17 Aug 2, 2023
e0b5727
Clean 'packet_clear' test for relayer-next
ljoss17 Aug 2, 2023
2f4e771
Split CanQuerySendPacketsFromSequences into two traits
ljoss17 Aug 3, 2023
41ce78e
Add 'try_extract_send_packet_event' to relayer-cosmos methods module
ljoss17 Aug 4, 2023
9ccc656
Moved traits related to send packets in their own mod
ljoss17 Aug 4, 2023
610e571
Moved packet_clear traits to relayer-components
ljoss17 Aug 4, 2023
e0b5f22
Moved send_packet impls to its own mod
ljoss17 Aug 4, 2023
063dc7f
Moved 'try_extract_write_acknowledgement_event' to the methods module
ljoss17 Aug 4, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
161 changes: 62 additions & 99 deletions Cargo.lock

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion crates/relayer-all-in-one/src/all_for_one/birelay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ impl<BiRelay, RelayAToB, RelayBToA> AfoBiRelay for BiRelay
where
RelayAToB: AfoRelay,
RelayBToA: AfoRelay<AfoSrcChain = RelayAToB::AfoDstChain, AfoDstChain = RelayAToB::AfoSrcChain>,
BiRelay: HasAfoRuntime
BiRelay: Clone
+ HasAfoRuntime
+ HasLoggerWithBaseLevels
+ CanAutoRelay
+ HasTwoWayRelay<RelayAToB = RelayAToB, RelayBToA = RelayBToA>,
Expand Down
9 changes: 9 additions & 0 deletions crates/relayer-all-in-one/src/all_for_one/chain.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
use ibc_relayer_components::chain::traits::client::create::HasCreateClientOptions;
use ibc_relayer_components::chain::traits::queries::consensus_state::CanQueryConsensusState;
use ibc_relayer_components::chain::traits::queries::packet_commitments::CanQueryPacketCommitments;
use ibc_relayer_components::chain::traits::queries::received_packet::CanQueryReceivedPacket;
use ibc_relayer_components::chain::traits::queries::send_packet::CanQuerySendPacketsFromSequences;
use ibc_relayer_components::chain::traits::queries::status::CanQueryChainStatus;
use ibc_relayer_components::chain::traits::queries::unreceived_packets::CanQueryUnreceivedPacketSequences;
use ibc_relayer_components::chain::traits::types::chain::HasChainTypes;
use ibc_relayer_components::chain::traits::types::channel::{
HasChannelHandshakePayloads, HasInitChannelOptionsType,
Expand Down Expand Up @@ -29,6 +32,9 @@ pub trait AfoChain<Counterparty>:
+ HasConsensusStateType<Counterparty>
+ CanQueryConsensusState<Counterparty>
+ CanQueryReceivedPacket<Counterparty>
+ CanQueryPacketCommitments<Counterparty>
+ CanQueryUnreceivedPacketSequences<Counterparty>
+ CanQuerySendPacketsFromSequences<Counterparty>
+ HasCreateClientOptions<Counterparty>
+ HasInitConnectionOptionsType<Counterparty>
+ HasConnectionHandshakePayloads<Counterparty>
Expand Down Expand Up @@ -65,6 +71,9 @@ where
+ HasConsensusStateType<Counterparty>
+ CanQueryConsensusState<Counterparty>
+ CanQueryReceivedPacket<Counterparty>
+ CanQueryPacketCommitments<Counterparty>
+ CanQueryUnreceivedPacketSequences<Counterparty>
+ CanQuerySendPacketsFromSequences<Counterparty>
+ HasCreateClientOptions<Counterparty>
+ HasInitConnectionOptionsType<Counterparty>
+ HasConnectionHandshakePayloads<Counterparty>
Expand Down
5 changes: 4 additions & 1 deletion crates/relayer-all-in-one/src/all_for_one/relay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use ibc_relayer_components::relay::traits::event_relayer::CanRelayEvent;
use ibc_relayer_components::relay::traits::ibc_message_sender::CanSendIbcMessages;
use ibc_relayer_components::relay::traits::messages::update_client::CanBuildUpdateClientMessage;
use ibc_relayer_components::relay::traits::packet::HasRelayPacket;
use ibc_relayer_components::relay::traits::packet_clear::CanClearReceivePackets;
use ibc_relayer_components::relay::traits::packet_filter::CanFilterPackets;
use ibc_relayer_components::relay::traits::packet_relayer::CanRelayPacket;
use ibc_relayer_components::relay::traits::packet_relayers::ack_packet::CanRelayAckPacket;
Expand Down Expand Up @@ -52,6 +53,7 @@ pub trait AfoRelay:
+ CanInitChannel
+ CanRelayChannelOpenHandshake
+ SupportsPacketRetry
+ CanClearReceivePackets
{
type AfoSrcChain: AfoChain<Self::AfoDstChain>;

Expand Down Expand Up @@ -91,7 +93,8 @@ where
+ CanRelayConnectionOpenHandshake
+ CanInitChannel
+ CanRelayChannelOpenHandshake
+ SupportsPacketRetry,
+ SupportsPacketRetry
+ CanClearReceivePackets,
{
type AfoSrcChain = SrcChain;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
pub mod channel;
pub mod consensus_state;
pub mod packet_commitments;
pub mod received_packet;
pub mod send_packet;
pub mod status;
pub mod unreceived_packets;
pub mod write_ack;
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
use async_trait::async_trait;

use ibc_relayer_components::chain::traits::queries::packet_commitments::CanQueryPacketCommitments;

use crate::one_for_all::traits::chain::OfaIbcChain;
use crate::one_for_all::types::chain::OfaChainWrapper;
use crate::std_prelude::*;

#[async_trait]
impl<Chain, Counterparty> CanQueryPacketCommitments<OfaChainWrapper<Counterparty>>
for OfaChainWrapper<Chain>
where
Chain: OfaIbcChain<Counterparty>,
Counterparty: OfaIbcChain<Chain>,
{
/// Query the sequences of the packets that the chain has committed to be
/// sent to the counterparty chain, of which the full packet relaying is not
/// yet completed. Once the chain receives the ack from the counterparty
/// chain, a given sequence should be removed from the packet commitment list.
async fn query_packet_commitments(
&self,
channel_id: &Self::ChannelId,
port_id: &Self::PortId,
) -> Result<(Vec<Self::Sequence>, Self::Height), Self::Error> {
let (sequences, height) = self
.chain
.query_packet_commitments(channel_id, port_id)
.await?;
Ok((sequences, height))
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
use async_trait::async_trait;

use ibc_relayer_components::chain::traits::queries::send_packet::CanQuerySendPacketsFromSequences;

use crate::one_for_all::traits::chain::{OfaChainTypes, OfaIbcChain};
use crate::one_for_all::types::chain::OfaChainWrapper;
use crate::std_prelude::*;

#[async_trait]
impl<Chain, Counterparty> CanQuerySendPacketsFromSequences<OfaChainWrapper<Counterparty>>
for OfaChainWrapper<Chain>
where
Chain: OfaIbcChain<Counterparty> + OfaChainTypes,
Counterparty: OfaIbcChain<Chain>,
{
/// Given a list of sequences, a channel and port will query a list of outgoing
/// packets which have not been relayed.
async fn query_send_packets_from_sequences(
&self,
channel_id: &Self::ChannelId,
port_id: &Self::PortId,
counterparty_channel_id: &Counterparty::ChannelId,
counterparty_port_id: &Counterparty::PortId,
sequences: &[Self::Sequence],
// The height is given to query the packets from a specific height.
// This height should be the same as the query height from the
// `CanQueryPacketCommitments` made on the same chain.
height: &Self::Height,
) -> Result<Vec<Self::OutgoingPacket>, Self::Error> {
let send_packets = self
.chain
.query_send_packets_from_sequences(
channel_id,
port_id,
counterparty_channel_id,
counterparty_port_id,
sequences,
height,
)
.await?;
Ok(send_packets)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
use async_trait::async_trait;

use ibc_relayer_components::chain::traits::queries::unreceived_packets::CanQueryUnreceivedPacketSequences;

use crate::one_for_all::traits::chain::{OfaChainTypes, OfaIbcChain};
use crate::one_for_all::types::chain::OfaChainWrapper;
use crate::std_prelude::*;

#[async_trait]
impl<Chain, Counterparty> CanQueryUnreceivedPacketSequences<OfaChainWrapper<Counterparty>>
for OfaChainWrapper<Chain>
where
Chain: OfaIbcChain<Counterparty> + OfaChainTypes,
Counterparty: OfaIbcChain<Chain>,
{
/// Given a list of counterparty commitment sequences,
/// return a filtered list of sequences which the chain
/// has not received the packet from the counterparty chain.
async fn query_unreceived_packet_sequences(
&self,
channel_id: &Chain::ChannelId,
port_id: &Chain::PortId,
sequences: &[Counterparty::Sequence],
) -> Result<Vec<Counterparty::Sequence>, Self::Error> {
let unreceived_packet_sequences = self
.chain
.query_unreceived_packet_sequences(channel_id, port_id, sequences)
.await?;

Ok(unreceived_packet_sequences)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pub mod receive_packet_clear;
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
use async_trait::async_trait;

use ibc_relayer_components::chain::types::aliases::{ChannelId, PortId};
use ibc_relayer_components::relay::impls::packet_clear::receive_packets_clear::ReceivePacketClearRelayer;
use ibc_relayer_components::relay::traits::chains::HasRelayChains;
use ibc_relayer_components::relay::traits::packet_clear::{
CanClearReceivePackets, ReceivePacketClearer,
};

use crate::one_for_all::traits::relay::OfaRelay;
use crate::one_for_all::types::relay::OfaRelayWrapper;
use crate::std_prelude::*;

#[async_trait]
impl<Relay> CanClearReceivePackets for OfaRelayWrapper<Relay>
where
Relay: OfaRelay,
{
async fn clear_receive_packets(
&self,
src_channel_id: &ChannelId<
<OfaRelayWrapper<Relay> as HasRelayChains>::SrcChain,
<OfaRelayWrapper<Relay> as HasRelayChains>::DstChain,
>,
src_port_id: &PortId<
<OfaRelayWrapper<Relay> as HasRelayChains>::SrcChain,
<OfaRelayWrapper<Relay> as HasRelayChains>::DstChain,
>,
dst_channel_id: &ChannelId<
<OfaRelayWrapper<Relay> as HasRelayChains>::DstChain,
<OfaRelayWrapper<Relay> as HasRelayChains>::SrcChain,
>,
dst_port_id: &PortId<
<OfaRelayWrapper<Relay> as HasRelayChains>::DstChain,
<OfaRelayWrapper<Relay> as HasRelayChains>::SrcChain,
>,
) -> Result<(), Relay::Error> {
ReceivePacketClearRelayer::clear_receive_packets(
self,
src_channel_id,
src_port_id,
dst_channel_id,
dst_port_id,
)
.await
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
pub mod auto_relayer;
pub mod batch;
pub mod channel;
pub mod clear_packet;
pub mod client;
pub mod connection;
pub mod error;
Expand Down
35 changes: 35 additions & 0 deletions crates/relayer-all-in-one/src/one_for_all/traits/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,41 @@ where
sequence: &Counterparty::Sequence,
) -> Result<bool, Self::Error>;

/// Query the sequences of the packets that the chain has committed to be
/// sent to the counterparty chain, of which the full packet relaying is not
/// yet completed. Once the chain receives the ack from the counterparty
/// chain, a given sequence should be removed from the packet commitment list.
async fn query_packet_commitments(
&self,
channel_id: &Self::ChannelId,
port_id: &Self::PortId,
) -> Result<(Vec<Self::Sequence>, Self::Height), Self::Error>;

/// Given a list of counterparty commitment sequences,
/// return a filtered list of sequences which the chain
/// has not received the packet from the counterparty chain.
async fn query_unreceived_packet_sequences(
&self,
channel_id: &Self::ChannelId,
port_id: &Self::PortId,
sequences: &[Counterparty::Sequence],
) -> Result<Vec<Counterparty::Sequence>, Self::Error>;

/// Given a list of sequences, a channel and port will query a list of outgoing
/// packets which have not been relayed.
async fn query_send_packets_from_sequences(
&self,
channel_id: &Self::ChannelId,
port_id: &Self::PortId,
counterparty_channel_id: &Counterparty::ChannelId,
counterparty_port_id: &Counterparty::PortId,
sequences: &[Self::Sequence],
// The height is given to query the packets from a specific height.
// This height should be the same as the query height from the
// `CanQueryPacketCommitments` made on the same chain.
height: &Self::Height,
) -> Result<Vec<Self::OutgoingPacket>, Self::Error>;

async fn build_receive_packet_payload(
&self,
height: &Self::Height,
Expand Down
1 change: 1 addition & 0 deletions crates/relayer-components-extra/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ extern crate alloc;

pub mod batch;
pub mod builder;
pub mod packet_clear;
pub mod relay;
pub mod runtime;
pub mod telemetry;
1 change: 1 addition & 0 deletions crates/relayer-components-extra/src/packet_clear/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pub mod worker;
80 changes: 80 additions & 0 deletions crates/relayer-components-extra/src/packet_clear/worker.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
use async_trait::async_trait;
use core::time::Duration;

use ibc_relayer_components::chain::types::aliases::{ChannelId, PortId};
use ibc_relayer_components::relay::traits::chains::HasRelayChains;
use ibc_relayer_components::relay::traits::clear_interval::HasClearInterval;
use ibc_relayer_components::relay::traits::packet_clear::CanClearReceivePackets;
use ibc_relayer_components::runtime::traits::runtime::HasRuntime;
use ibc_relayer_components::runtime::traits::sleep::CanSleep;

use crate::runtime::traits::spawn::{HasSpawner, Spawner, TaskHandle};
use crate::std_prelude::*;

#[async_trait]
pub trait CanSpawnPacketClearWorker: HasRelayChains {
fn spawn_packet_clear_worker(
self,
src_channel_id: ChannelId<Self::SrcChain, Self::DstChain>,
src_counterparty_port_id: PortId<Self::SrcChain, Self::DstChain>,
dst_channel_id: ChannelId<Self::DstChain, Self::SrcChain>,
dst_port_id: PortId<Self::DstChain, Self::SrcChain>,
) -> Box<dyn TaskHandle>;
}

impl<Relay> CanSpawnPacketClearWorker for Relay
where
Relay: CanRunLoop + HasRuntime,
Relay::Runtime: HasSpawner,
{
fn spawn_packet_clear_worker(
self,
src_channel_id: ChannelId<Relay::SrcChain, Relay::DstChain>,
src_port_id: PortId<Relay::SrcChain, Relay::DstChain>,
dst_channel_id: ChannelId<Relay::DstChain, Relay::SrcChain>,
dst_port_id: PortId<Relay::DstChain, Relay::SrcChain>,
) -> Box<dyn TaskHandle> {
let spawner = self.runtime().spawner();

spawner.spawn(async move {
self.run_loop(&src_channel_id, &src_port_id, &dst_channel_id, &dst_port_id)
.await;
})
}
}

#[async_trait]
trait CanRunLoop: HasRelayChains {
async fn run_loop(
&self,
src_channel_id: &ChannelId<Self::SrcChain, Self::DstChain>,
src_port_id: &PortId<Self::SrcChain, Self::DstChain>,
dst_channel_id: &ChannelId<Self::DstChain, Self::SrcChain>,
dst_port_id: &PortId<Self::DstChain, Self::SrcChain>,
);
}

#[async_trait]
impl<Relay> CanRunLoop for Relay
where
Relay: HasRuntime + CanClearReceivePackets + HasClearInterval,
Relay::Runtime: CanSleep,
{
async fn run_loop(
&self,
src_channel_id: &ChannelId<Relay::SrcChain, Relay::DstChain>,
src_port_id: &PortId<Relay::SrcChain, Relay::DstChain>,
dst_channel_id: &ChannelId<Relay::DstChain, Relay::SrcChain>,
dst_port_id: &PortId<Relay::DstChain, Relay::SrcChain>,
) {
let runtime = self.runtime();
let clear_interval = self.clear_interval().into();

loop {
let _ = self
.clear_receive_packets(src_channel_id, src_port_id, dst_channel_id, dst_port_id)
.await;
runtime.sleep(Duration::from_secs(clear_interval)).await;
}
}
}
3 changes: 3 additions & 0 deletions crates/relayer-components/src/chain/traits/queries/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
pub mod channel;
pub mod connection;
pub mod consensus_state;
pub mod packet_commitments;
pub mod received_packet;
pub mod send_packet;
pub mod status;
pub mod unreceived_packets;
pub mod write_ack;
Loading
Loading