From 4238d6ceb00fea3bcf1480b5155cab308ecb003d Mon Sep 17 00:00:00 2001 From: Soares Chen Date: Wed, 16 Aug 2023 11:23:54 +0200 Subject: [PATCH] Remove one-for-all runtime constructs (#3542) * Implement runtime traits directly for TokioRuntimeContext * Use AfoRuntime directly * Remove OfaRuntime constructs * Rename Log to LogEntries * Implement telemetry traits directly * Remove OfaTelemetry * Slightly improve telemetry --- .../src/all_for_one/birelay.rs | 3 +- .../src/all_for_one/builder.rs | 11 +- .../src/all_for_one/runtime.rs | 27 ++- .../src/one_for_all/impls/birelay/runtime.rs | 3 +- .../src/one_for_all/impls/builder/runtime.rs | 3 +- .../src/one_for_all/impls/chain/telemetry.rs | 3 +- .../src/one_for_all/impls/chain/types.rs | 3 +- .../src/one_for_all/impls/mod.rs | 2 - .../src/one_for_all/impls/relay/types.rs | 3 +- .../src/one_for_all/impls/runtime/channel.rs | 126 ------------- .../src/one_for_all/impls/runtime/error.rs | 11 -- .../src/one_for_all/impls/runtime/mutex.rs | 25 --- .../src/one_for_all/impls/runtime/sleep.rs | 18 -- .../src/one_for_all/impls/runtime/spawn.rs | 25 --- .../src/one_for_all/impls/runtime/time.rs | 21 --- .../src/one_for_all/impls/telemetry.rs | 73 -------- .../one_for_all/impls/transaction/runtime.rs | 3 +- .../src/one_for_all/traits/birelay.rs | 10 +- .../src/one_for_all/traits/builder.rs | 13 +- .../src/one_for_all/traits/chain.rs | 17 +- .../src/one_for_all/traits/mod.rs | 1 - .../src/one_for_all/traits/relay.rs | 10 +- .../src/one_for_all/traits/runtime.rs | 3 - .../src/one_for_all/traits/telemetry.rs | 36 ---- .../src/one_for_all/traits/transaction.rs | 13 +- .../src/one_for_all/types/batch/aliases.rs | 16 +- .../src/one_for_all/types/builder.rs | 2 +- .../src/one_for_all/types/mod.rs | 2 - .../src/one_for_all/types/runtime.rs | 29 --- .../src/one_for_all/types/telemetry.rs | 21 --- .../src/builder/components/relay/batch.rs | 19 +- .../src/runtime/traits/channel.rs | 6 + .../relayer-cosmos/src/all_for_one/chain.rs | 7 +- .../relayer-cosmos/src/all_for_one/relay.rs | 7 +- crates/relayer-cosmos/src/contexts/birelay.rs | 7 +- crates/relayer-cosmos/src/contexts/builder.rs | 14 +- crates/relayer-cosmos/src/contexts/chain.rs | 12 +- crates/relayer-cosmos/src/contexts/relay.rs | 7 +- .../src/contexts/transaction.rs | 7 +- crates/relayer-cosmos/src/impls/birelay.rs | 9 +- crates/relayer-cosmos/src/impls/builder.rs | 9 +- crates/relayer-cosmos/src/impls/chain.rs | 14 +- crates/relayer-cosmos/src/impls/mod.rs | 1 + crates/relayer-cosmos/src/impls/relay.rs | 11 +- crates/relayer-cosmos/src/impls/telemetry.rs | 138 ++++++++++++++ .../relayer-cosmos/src/impls/transaction.rs | 11 +- crates/relayer-cosmos/src/methods/runtime.rs | 1 - crates/relayer-cosmos/src/types/error.rs | 2 +- crates/relayer-cosmos/src/types/telemetry.rs | 142 ++------------ .../src/relayer_mock/base/error.rs | 2 +- .../src/relayer_mock/base/impls/chain.rs | 6 +- .../src/relayer_mock/base/impls/relay.rs | 4 +- .../src/relayer_mock/base/types/runtime.rs | 2 +- .../logger/tracing.rs => impls/logger.rs} | 25 ++- crates/relayer-runtime/src/impls/mod.rs | 2 + .../src/impls/runtime/channel.rs | 120 ++++++++++++ .../src/impls/runtime/error.rs | 8 + .../src}/impls/runtime/mod.rs | 0 .../src/impls/runtime/mutex.rs | 21 +++ .../src/impls/runtime/sleep.rs | 13 ++ .../src/impls/runtime/spawn.rs | 39 ++++ .../relayer-runtime/src/impls/runtime/time.rs | 18 ++ crates/relayer-runtime/src/lib.rs | 3 +- crates/relayer-runtime/src/tokio/context.rs | 176 ------------------ .../relayer-runtime/src/tokio/logger/mod.rs | 4 - crates/relayer-runtime/src/tokio/mod.rs | 3 - .../src/{tokio => types}/error.rs | 0 .../logger/log.rs => types/log/entries.rs} | 6 +- .../src/{tokio/logger => types/log}/level.rs | 0 .../relayer-runtime/src/types/log/logger.rs | 1 + crates/relayer-runtime/src/types/log/mod.rs | 4 + .../src/{tokio/logger => types/log}/value.rs | 0 crates/relayer-runtime/src/types/mod.rs | 4 + crates/relayer-runtime/src/types/runtime.rs | 13 ++ crates/relayer-runtime/src/types/task.rs | 3 + .../src/framework/binary/next.rs | 2 +- 76 files changed, 570 insertions(+), 866 deletions(-) delete mode 100644 crates/relayer-all-in-one/src/one_for_all/impls/runtime/channel.rs delete mode 100644 crates/relayer-all-in-one/src/one_for_all/impls/runtime/error.rs delete mode 100644 crates/relayer-all-in-one/src/one_for_all/impls/runtime/mutex.rs delete mode 100644 crates/relayer-all-in-one/src/one_for_all/impls/runtime/sleep.rs delete mode 100644 crates/relayer-all-in-one/src/one_for_all/impls/runtime/spawn.rs delete mode 100644 crates/relayer-all-in-one/src/one_for_all/impls/runtime/time.rs delete mode 100644 crates/relayer-all-in-one/src/one_for_all/impls/telemetry.rs delete mode 100644 crates/relayer-all-in-one/src/one_for_all/traits/telemetry.rs delete mode 100644 crates/relayer-all-in-one/src/one_for_all/types/runtime.rs delete mode 100644 crates/relayer-all-in-one/src/one_for_all/types/telemetry.rs create mode 100644 crates/relayer-cosmos/src/impls/telemetry.rs rename crates/relayer-runtime/src/{tokio/logger/tracing.rs => impls/logger.rs} (82%) create mode 100644 crates/relayer-runtime/src/impls/mod.rs create mode 100644 crates/relayer-runtime/src/impls/runtime/channel.rs create mode 100644 crates/relayer-runtime/src/impls/runtime/error.rs rename crates/{relayer-all-in-one/src/one_for_all => relayer-runtime/src}/impls/runtime/mod.rs (100%) create mode 100644 crates/relayer-runtime/src/impls/runtime/mutex.rs create mode 100644 crates/relayer-runtime/src/impls/runtime/sleep.rs create mode 100644 crates/relayer-runtime/src/impls/runtime/spawn.rs create mode 100644 crates/relayer-runtime/src/impls/runtime/time.rs delete mode 100644 crates/relayer-runtime/src/tokio/context.rs delete mode 100644 crates/relayer-runtime/src/tokio/logger/mod.rs delete mode 100644 crates/relayer-runtime/src/tokio/mod.rs rename crates/relayer-runtime/src/{tokio => types}/error.rs (100%) rename crates/relayer-runtime/src/{tokio/logger/log.rs => types/log/entries.rs} (75%) rename crates/relayer-runtime/src/{tokio/logger => types/log}/level.rs (100%) create mode 100644 crates/relayer-runtime/src/types/log/logger.rs create mode 100644 crates/relayer-runtime/src/types/log/mod.rs rename crates/relayer-runtime/src/{tokio/logger => types/log}/value.rs (100%) create mode 100644 crates/relayer-runtime/src/types/mod.rs create mode 100644 crates/relayer-runtime/src/types/runtime.rs create mode 100644 crates/relayer-runtime/src/types/task.rs diff --git a/crates/relayer-all-in-one/src/all_for_one/birelay.rs b/crates/relayer-all-in-one/src/all_for_one/birelay.rs index 8483c5d176..ba68a75623 100644 --- a/crates/relayer-all-in-one/src/all_for_one/birelay.rs +++ b/crates/relayer-all-in-one/src/all_for_one/birelay.rs @@ -6,7 +6,8 @@ use crate::all_for_one::relay::AfoRelay; use crate::all_for_one::runtime::HasAfoRuntime; pub trait AfoBiRelay: - HasAfoRuntime + Clone + + HasAfoRuntime + HasLoggerWithBaseLevels + CanAutoRelay + HasTwoWayRelay diff --git a/crates/relayer-all-in-one/src/all_for_one/builder.rs b/crates/relayer-all-in-one/src/all_for_one/builder.rs index bafb30ece5..128fe43228 100644 --- a/crates/relayer-all-in-one/src/all_for_one/builder.rs +++ b/crates/relayer-all-in-one/src/all_for_one/builder.rs @@ -62,6 +62,7 @@ pub trait CanBuildAfoBiRelayFromOfa: impl CanBuildAfoBiRelayFromOfa for OfaBuilderWrapper where Build: OfaBuilder, + // OfaBuilderWrapper: CanBuildBiRelayFromRelays { type AfoBiRelay = OfaBiRelayWrapper; @@ -72,7 +73,13 @@ where client_id_a: &ClientIdA, client_id_b: &ClientIdB, ) -> Result, Build::Error> { - self.build_afo_birelay(chain_id_a, chain_id_b, client_id_a, client_id_b) - .await + as CanBuildAfoBiRelay>::build_afo_birelay( + self, + chain_id_a, + chain_id_b, + client_id_a, + client_id_b, + ) + .await } } diff --git a/crates/relayer-all-in-one/src/all_for_one/runtime.rs b/crates/relayer-all-in-one/src/all_for_one/runtime.rs index c4fdc63fe4..054c133b5c 100644 --- a/crates/relayer-all-in-one/src/all_for_one/runtime.rs +++ b/crates/relayer-all-in-one/src/all_for_one/runtime.rs @@ -3,23 +3,44 @@ use ibc_relayer_components::runtime::traits::runtime::HasRuntime; use ibc_relayer_components::runtime::traits::sleep::CanSleep; use ibc_relayer_components::runtime::traits::time::HasTime; use ibc_relayer_components_extra::runtime::traits::channel::{ - CanCreateChannels, CanStreamReceiver, CanUseChannels, + CanCloneSender, CanCreateChannels, CanStreamReceiver, CanUseChannels, HasChannelTypes, +}; +use ibc_relayer_components_extra::runtime::traits::channel_once::{ + CanCreateChannelsOnce, CanUseChannelsOnce, HasChannelOnceTypes, }; use ibc_relayer_components_extra::runtime::traits::spawn::HasSpawner; pub trait AfoRuntime: - HasMutex + CanSleep + HasTime + HasSpawner + CanCreateChannels + CanStreamReceiver + CanUseChannels + Clone + + HasMutex + + CanSleep + + HasTime + + HasSpawner + + HasChannelTypes + + HasChannelOnceTypes + + CanCreateChannels + + CanCreateChannelsOnce + + CanStreamReceiver + + CanCloneSender + + CanUseChannels + + CanUseChannelsOnce { } impl AfoRuntime for Runtime where - Runtime: HasMutex + Runtime: Clone + + HasMutex + CanSleep + HasTime + HasSpawner + + HasChannelTypes + + HasChannelOnceTypes + CanCreateChannels + + CanCreateChannelsOnce + CanStreamReceiver + + CanCloneSender + CanUseChannels + + CanUseChannelsOnce { } diff --git a/crates/relayer-all-in-one/src/one_for_all/impls/birelay/runtime.rs b/crates/relayer-all-in-one/src/one_for_all/impls/birelay/runtime.rs index f684c4cfa6..f6dea570f0 100644 --- a/crates/relayer-all-in-one/src/one_for_all/impls/birelay/runtime.rs +++ b/crates/relayer-all-in-one/src/one_for_all/impls/birelay/runtime.rs @@ -3,13 +3,12 @@ use ibc_relayer_components::runtime::traits::runtime::HasRuntime; use crate::one_for_all::traits::birelay::OfaBiRelay; use crate::one_for_all::types::birelay::OfaBiRelayWrapper; -use crate::one_for_all::types::runtime::OfaRuntimeWrapper; impl HasRuntime for OfaBiRelayWrapper where BiRelay: OfaBiRelay, { - type Runtime = OfaRuntimeWrapper; + type Runtime = BiRelay::Runtime; fn runtime(&self) -> &Self::Runtime { self.birelay.runtime() diff --git a/crates/relayer-all-in-one/src/one_for_all/impls/builder/runtime.rs b/crates/relayer-all-in-one/src/one_for_all/impls/builder/runtime.rs index a312d079e3..d3504eb023 100644 --- a/crates/relayer-all-in-one/src/one_for_all/impls/builder/runtime.rs +++ b/crates/relayer-all-in-one/src/one_for_all/impls/builder/runtime.rs @@ -3,13 +3,12 @@ use ibc_relayer_components::runtime::traits::runtime::HasRuntime; use crate::one_for_all::traits::builder::OfaBuilder; use crate::one_for_all::types::builder::OfaBuilderWrapper; -use crate::one_for_all::types::runtime::OfaRuntimeWrapper; impl HasRuntime for OfaBuilderWrapper where Builder: OfaBuilder, { - type Runtime = OfaRuntimeWrapper; + type Runtime = Builder::Runtime; fn runtime(&self) -> &Self::Runtime { self.builder.runtime() diff --git a/crates/relayer-all-in-one/src/one_for_all/impls/chain/telemetry.rs b/crates/relayer-all-in-one/src/one_for_all/impls/chain/telemetry.rs index ded3db012b..7b80fc32ae 100644 --- a/crates/relayer-all-in-one/src/one_for_all/impls/chain/telemetry.rs +++ b/crates/relayer-all-in-one/src/one_for_all/impls/chain/telemetry.rs @@ -2,13 +2,12 @@ use ibc_relayer_components_extra::telemetry::traits::telemetry::HasTelemetry; use crate::one_for_all::traits::chain::OfaChain; use crate::one_for_all::types::chain::OfaChainWrapper; -use crate::one_for_all::types::telemetry::OfaTelemetryWrapper; impl HasTelemetry for OfaChainWrapper where Chain: OfaChain, { - type Telemetry = OfaTelemetryWrapper; + type Telemetry = Chain::Telemetry; fn telemetry(&self) -> &Self::Telemetry { self.chain.telemetry() diff --git a/crates/relayer-all-in-one/src/one_for_all/impls/chain/types.rs b/crates/relayer-all-in-one/src/one_for_all/impls/chain/types.rs index 9bb6a9d17e..1ba82c75df 100644 --- a/crates/relayer-all-in-one/src/one_for_all/impls/chain/types.rs +++ b/crates/relayer-all-in-one/src/one_for_all/impls/chain/types.rs @@ -22,7 +22,6 @@ use ibc_relayer_components_extra::components::extra::chain::ExtraChainComponents use crate::one_for_all::traits::chain::{OfaChain, OfaChainTypes, OfaIbcChain}; use crate::one_for_all::types::chain::OfaChainWrapper; use crate::one_for_all::types::component::OfaComponents; -use crate::one_for_all::types::runtime::OfaRuntimeWrapper; use crate::std_prelude::*; impl DelegateComponent for OfaChainWrapper @@ -33,7 +32,7 @@ where } impl HasRuntime for OfaChainWrapper { - type Runtime = OfaRuntimeWrapper; + type Runtime = Chain::Runtime; fn runtime(&self) -> &Self::Runtime { self.chain.runtime() diff --git a/crates/relayer-all-in-one/src/one_for_all/impls/mod.rs b/crates/relayer-all-in-one/src/one_for_all/impls/mod.rs index 49f3510015..3427ba7f12 100644 --- a/crates/relayer-all-in-one/src/one_for_all/impls/mod.rs +++ b/crates/relayer-all-in-one/src/one_for_all/impls/mod.rs @@ -2,6 +2,4 @@ pub mod birelay; pub mod builder; pub mod chain; pub mod relay; -pub mod runtime; -pub mod telemetry; pub mod transaction; diff --git a/crates/relayer-all-in-one/src/one_for_all/impls/relay/types.rs b/crates/relayer-all-in-one/src/one_for_all/impls/relay/types.rs index 159779a303..a166bfe51c 100644 --- a/crates/relayer-all-in-one/src/one_for_all/impls/relay/types.rs +++ b/crates/relayer-all-in-one/src/one_for_all/impls/relay/types.rs @@ -10,7 +10,6 @@ use crate::one_for_all::traits::relay::OfaRelay; use crate::one_for_all::types::chain::OfaChainWrapper; use crate::one_for_all::types::component::OfaComponents; use crate::one_for_all::types::relay::OfaRelayWrapper; -use crate::one_for_all::types::runtime::OfaRuntimeWrapper; impl DelegateComponent for OfaRelayWrapper where @@ -30,7 +29,7 @@ impl HasRuntime for OfaRelayWrapper where Relay: OfaRelay, { - type Runtime = OfaRuntimeWrapper; + type Runtime = Relay::Runtime; fn runtime(&self) -> &Self::Runtime { self.relay.runtime() diff --git a/crates/relayer-all-in-one/src/one_for_all/impls/runtime/channel.rs b/crates/relayer-all-in-one/src/one_for_all/impls/runtime/channel.rs deleted file mode 100644 index c307a3ace9..0000000000 --- a/crates/relayer-all-in-one/src/one_for_all/impls/runtime/channel.rs +++ /dev/null @@ -1,126 +0,0 @@ -use core::pin::Pin; - -use async_trait::async_trait; -use futures_core::stream::Stream; -use ibc_relayer_components::core::traits::sync::Async; -use ibc_relayer_components_extra::runtime::traits::channel::{ - CanCreateChannels, CanStreamReceiver, CanUseChannels, HasChannelTypes, -}; -use ibc_relayer_components_extra::runtime::traits::channel_once::{ - CanCreateChannelsOnce, CanUseChannelsOnce, HasChannelOnceTypes, -}; - -use crate::one_for_all::traits::runtime::OfaRuntime; -use crate::one_for_all::types::runtime::OfaRuntimeWrapper; -use crate::std_prelude::*; - -impl HasChannelTypes for OfaRuntimeWrapper -where - Runtime: OfaRuntime, -{ - type Sender = Runtime::Sender - where - T: Async; - - type Receiver = Runtime::Receiver - where - T: Async; -} - -impl HasChannelOnceTypes for OfaRuntimeWrapper -where - Runtime: OfaRuntime, -{ - type SenderOnce = Runtime::SenderOnce - where - T: Async; - - type ReceiverOnce = Runtime::ReceiverOnce - where - T: Async; -} - -impl CanCreateChannels for OfaRuntimeWrapper -where - Runtime: OfaRuntime, -{ - fn new_channel() -> (Self::Sender, Self::Receiver) - where - T: Async, - { - Runtime::new_channel() - } -} - -impl CanCreateChannelsOnce for OfaRuntimeWrapper -where - Runtime: OfaRuntime, -{ - fn new_channel_once() -> (Self::SenderOnce, Self::ReceiverOnce) - where - T: Async, - { - Runtime::new_channel_once() - } -} - -#[async_trait] -impl CanUseChannelsOnce for OfaRuntimeWrapper -where - Runtime: OfaRuntime, -{ - fn send_once(sender: Self::SenderOnce, value: T) -> Result<(), Self::Error> - where - T: Async, - { - Runtime::send_once(sender, value) - } - - async fn receive_once(receiver: Self::ReceiverOnce) -> Result - where - T: Async, - { - Runtime::receive_once(receiver).await - } -} - -#[async_trait] -impl CanUseChannels for OfaRuntimeWrapper -where - Runtime: OfaRuntime, -{ - fn send(sender: &Self::Sender, value: T) -> Result<(), Self::Error> - where - T: Async, - { - Runtime::send(sender, value) - } - - async fn receive(receiver: &mut Self::Receiver) -> Result - where - T: Async, - { - Runtime::receive(receiver).await - } - - fn try_receive(receiver: &mut Self::Receiver) -> Result, Self::Error> - where - T: Async, - { - Runtime::try_receive(receiver) - } -} - -impl CanStreamReceiver for OfaRuntimeWrapper -where - Runtime: OfaRuntime, -{ - fn receiver_to_stream( - receiver: Self::Receiver, - ) -> Pin + Send + 'static>> - where - T: Async, - { - Runtime::receiver_to_stream(receiver) - } -} diff --git a/crates/relayer-all-in-one/src/one_for_all/impls/runtime/error.rs b/crates/relayer-all-in-one/src/one_for_all/impls/runtime/error.rs deleted file mode 100644 index 6bccddc19b..0000000000 --- a/crates/relayer-all-in-one/src/one_for_all/impls/runtime/error.rs +++ /dev/null @@ -1,11 +0,0 @@ -use ibc_relayer_components::core::traits::error::HasErrorType; - -use crate::one_for_all::traits::runtime::OfaRuntime; -use crate::one_for_all::types::runtime::OfaRuntimeWrapper; - -impl HasErrorType for OfaRuntimeWrapper -where - Runtime: OfaRuntime, -{ - type Error = Runtime::Error; -} diff --git a/crates/relayer-all-in-one/src/one_for_all/impls/runtime/mutex.rs b/crates/relayer-all-in-one/src/one_for_all/impls/runtime/mutex.rs deleted file mode 100644 index 5370209de3..0000000000 --- a/crates/relayer-all-in-one/src/one_for_all/impls/runtime/mutex.rs +++ /dev/null @@ -1,25 +0,0 @@ -use async_trait::async_trait; -use ibc_relayer_components::core::traits::sync::Async; -use ibc_relayer_components::runtime::traits::mutex::HasMutex; - -use crate::one_for_all::traits::runtime::OfaRuntime; -use crate::one_for_all::types::runtime::OfaRuntimeWrapper; -use crate::std_prelude::*; - -#[async_trait] -impl HasMutex for OfaRuntimeWrapper -where - Runtime: OfaRuntime, -{ - type Mutex = Runtime::Mutex; - - type MutexGuard<'a, T: Async> = Runtime::MutexGuard<'a, T>; - - fn new_mutex(item: T) -> Self::Mutex { - Runtime::new_mutex(item) - } - - async fn acquire_mutex<'a, T: Async>(mutex: &'a Self::Mutex) -> Self::MutexGuard<'a, T> { - Runtime::acquire_mutex(mutex).await - } -} diff --git a/crates/relayer-all-in-one/src/one_for_all/impls/runtime/sleep.rs b/crates/relayer-all-in-one/src/one_for_all/impls/runtime/sleep.rs deleted file mode 100644 index fc431784e3..0000000000 --- a/crates/relayer-all-in-one/src/one_for_all/impls/runtime/sleep.rs +++ /dev/null @@ -1,18 +0,0 @@ -use core::time::Duration; - -use async_trait::async_trait; -use ibc_relayer_components::runtime::traits::sleep::CanSleep; - -use crate::one_for_all::traits::runtime::OfaRuntime; -use crate::one_for_all::types::runtime::OfaRuntimeWrapper; -use crate::std_prelude::*; - -#[async_trait] -impl CanSleep for OfaRuntimeWrapper -where - Runtime: OfaRuntime, -{ - async fn sleep(&self, duration: Duration) { - self.runtime.sleep(duration).await - } -} diff --git a/crates/relayer-all-in-one/src/one_for_all/impls/runtime/spawn.rs b/crates/relayer-all-in-one/src/one_for_all/impls/runtime/spawn.rs deleted file mode 100644 index a7ef85b433..0000000000 --- a/crates/relayer-all-in-one/src/one_for_all/impls/runtime/spawn.rs +++ /dev/null @@ -1,25 +0,0 @@ -use core::future::Future; - -use ibc_relayer_components_extra::runtime::traits::spawn::{HasSpawner, Spawner, TaskHandle}; - -use crate::one_for_all::traits::runtime::OfaRuntime; -use crate::one_for_all::types::runtime::OfaRuntimeWrapper; -use crate::std_prelude::*; - -impl HasSpawner for OfaRuntimeWrapper { - type Spawner = Self; - - fn spawner(&self) -> Self::Spawner { - self.clone() - } -} - -impl Spawner for OfaRuntimeWrapper { - fn spawn(&self, task: F) -> Box - where - F: Future + Send + 'static, - F::Output: Send + 'static, - { - self.runtime.spawn(task) - } -} diff --git a/crates/relayer-all-in-one/src/one_for_all/impls/runtime/time.rs b/crates/relayer-all-in-one/src/one_for_all/impls/runtime/time.rs deleted file mode 100644 index 5b42089a59..0000000000 --- a/crates/relayer-all-in-one/src/one_for_all/impls/runtime/time.rs +++ /dev/null @@ -1,21 +0,0 @@ -use core::time::Duration; - -use ibc_relayer_components::runtime::traits::time::HasTime; - -use crate::one_for_all::traits::runtime::OfaRuntime; -use crate::one_for_all::types::runtime::OfaRuntimeWrapper; - -impl HasTime for OfaRuntimeWrapper -where - Runtime: OfaRuntime, -{ - type Time = Runtime::Time; - - fn now(&self) -> Self::Time { - self.runtime.now() - } - - fn duration_since(current_time: &Self::Time, other_time: &Self::Time) -> Duration { - Runtime::duration_since(current_time, other_time) - } -} diff --git a/crates/relayer-all-in-one/src/one_for_all/impls/telemetry.rs b/crates/relayer-all-in-one/src/one_for_all/impls/telemetry.rs deleted file mode 100644 index 409df38847..0000000000 --- a/crates/relayer-all-in-one/src/one_for_all/impls/telemetry.rs +++ /dev/null @@ -1,73 +0,0 @@ -use ibc_relayer_components_extra::telemetry::traits::metrics::{ - HasLabel, HasMetric, TelemetryCounter, TelemetryUpDownCounter, TelemetryValueRecorder, -}; - -use crate::one_for_all::traits::telemetry::OfaTelemetry; -use crate::one_for_all::types::telemetry::OfaTelemetryWrapper; - -impl HasLabel for OfaTelemetryWrapper { - type Label = Telemetry::Label; - fn new_label(key: &str, value: &str) -> Self::Label { - Telemetry::new_label(key, value) - } -} - -impl HasMetric for OfaTelemetryWrapper -where - Telemetry: OfaTelemetry, -{ - type Value = Telemetry::CounterType; - type Unit = Telemetry::Unit; - - fn update_metric( - &self, - name: &str, - labels: &[Self::Label], - value: Self::Value, - description: Option<&str>, - unit: Option, - ) { - self.telemetry - .update_counter_metric(name, labels, value, description, unit); - } -} - -impl HasMetric for OfaTelemetryWrapper -where - Telemetry: OfaTelemetry, -{ - type Value = Telemetry::ValueRecorderType; - type Unit = Telemetry::Unit; - - fn update_metric( - &self, - name: &str, - labels: &[Self::Label], - value: Self::Value, - description: Option<&str>, - unit: Option, - ) { - self.telemetry - .update_value_recorder_metric(name, labels, value, description, unit); - } -} - -impl HasMetric for OfaTelemetryWrapper -where - Telemetry: OfaTelemetry, -{ - type Value = Telemetry::UpDownCounterType; - type Unit = Telemetry::Unit; - - fn update_metric( - &self, - name: &str, - labels: &[Self::Label], - value: Self::Value, - description: Option<&str>, - unit: Option, - ) { - self.telemetry - .update_up_down_counter_metric(name, labels, value, description, unit); - } -} diff --git a/crates/relayer-all-in-one/src/one_for_all/impls/transaction/runtime.rs b/crates/relayer-all-in-one/src/one_for_all/impls/transaction/runtime.rs index 576ebae7b1..ad5ccc6652 100644 --- a/crates/relayer-all-in-one/src/one_for_all/impls/transaction/runtime.rs +++ b/crates/relayer-all-in-one/src/one_for_all/impls/transaction/runtime.rs @@ -2,14 +2,13 @@ use ibc_relayer_components::core::traits::error::HasErrorType; use ibc_relayer_components::runtime::traits::runtime::HasRuntime; use crate::one_for_all::traits::transaction::OfaTxContext; -use crate::one_for_all::types::runtime::OfaRuntimeWrapper; use crate::one_for_all::types::transaction::OfaTxWrapper; impl HasRuntime for OfaTxWrapper where TxContext: OfaTxContext, { - type Runtime = OfaRuntimeWrapper; + type Runtime = TxContext::Runtime; fn runtime(&self) -> &Self::Runtime { self.tx_context.runtime() diff --git a/crates/relayer-all-in-one/src/one_for_all/traits/birelay.rs b/crates/relayer-all-in-one/src/one_for_all/traits/birelay.rs index 068a1b0ef1..fc39f449be 100644 --- a/crates/relayer-all-in-one/src/one_for_all/traits/birelay.rs +++ b/crates/relayer-all-in-one/src/one_for_all/traits/birelay.rs @@ -1,17 +1,17 @@ use core::fmt::Debug; +use ibc_relayer_components::core::traits::error::HasErrorType; use ibc_relayer_components::core::traits::sync::Async; use ibc_relayer_components::logger::traits::level::HasBaseLogLevels; +use crate::all_for_one::runtime::AfoRuntime; use crate::one_for_all::traits::relay::{OfaHomogeneousRelay, OfaRelay}; -use crate::one_for_all::traits::runtime::OfaRuntime; use crate::one_for_all::types::relay::OfaRelayWrapper; -use crate::one_for_all::types::runtime::OfaRuntimeWrapper; pub trait OfaBiRelay: Async { type Error: Debug + Async; - type Runtime: OfaRuntime; + type Runtime: AfoRuntime; type Logger: HasBaseLogLevels; @@ -24,9 +24,9 @@ pub trait OfaBiRelay: Async { Logger = ::Logger, >; - fn runtime(&self) -> &OfaRuntimeWrapper; + fn runtime(&self) -> &Self::Runtime; - fn runtime_error(e: ::Error) -> Self::Error; + fn runtime_error(e: ::Error) -> Self::Error; fn logger(&self) -> &Self::Logger; diff --git a/crates/relayer-all-in-one/src/one_for_all/traits/builder.rs b/crates/relayer-all-in-one/src/one_for_all/traits/builder.rs index 8707d729a1..af90cf5f19 100644 --- a/crates/relayer-all-in-one/src/one_for_all/traits/builder.rs +++ b/crates/relayer-all-in-one/src/one_for_all/traits/builder.rs @@ -1,35 +1,36 @@ use alloc::collections::BTreeMap; use alloc::sync::Arc; use core::fmt::Debug; +use ibc_relayer_components::core::traits::error::HasErrorType; use ibc_relayer_components::logger::traits::level::HasBaseLogLevels; +use ibc_relayer_components::runtime::traits::mutex::HasMutex; use ibc_relayer_components_extra::batch::types::config::BatchConfig; use async_trait::async_trait; use ibc_relayer_components::core::traits::sync::Async; +use crate::all_for_one::runtime::AfoRuntime; use crate::one_for_all::traits::birelay::OfaBiRelay; use crate::one_for_all::traits::chain::OfaChainTypes; use crate::one_for_all::traits::relay::OfaRelay; -use crate::one_for_all::traits::runtime::OfaRuntime; use crate::one_for_all::types::batch::aliases::MessageBatchSender; use crate::one_for_all::types::chain::OfaChainWrapper; use crate::one_for_all::types::relay::OfaRelayWrapper; -use crate::one_for_all::types::runtime::OfaRuntimeWrapper; use crate::std_prelude::*; #[async_trait] pub trait OfaBuilder: Async { type Error: Debug + Async; - type Runtime: OfaRuntime; + type Runtime: AfoRuntime; type Logger: HasBaseLogLevels; type BiRelay: OfaBiRelay; - fn runtime(&self) -> &OfaRuntimeWrapper; + fn runtime(&self) -> &Self::Runtime; - fn runtime_error(e: ::Error) -> Self::Error; + fn runtime_error(e: ::Error) -> Self::Error; fn birelay_error(e: ::Error) -> Self::Error; @@ -90,7 +91,7 @@ pub type ClientIdB = as OfaChainTypes>::ClientId; pub type Runtime = ::Runtime; -pub type Mutex = as OfaRuntime>::Mutex; +pub type Mutex = as HasMutex>::Mutex; pub type ChainACache = Arc, OfaChainWrapper>>>>; diff --git a/crates/relayer-all-in-one/src/one_for_all/traits/chain.rs b/crates/relayer-all-in-one/src/one_for_all/traits/chain.rs index 1329b40f39..000012482a 100644 --- a/crates/relayer-all-in-one/src/one_for_all/traits/chain.rs +++ b/crates/relayer-all-in-one/src/one_for_all/traits/chain.rs @@ -4,17 +4,16 @@ use alloc::sync::Arc; use core::fmt::{Debug, Display}; +use ibc_relayer_components::core::traits::error::HasErrorType; use ibc_relayer_components::logger::traits::level::HasBaseLogLevels; use ibc_relayer_components::logger::traits::logger::BaseLogger; +use ibc_relayer_components_extra::telemetry::traits::metrics::HasBasicMetrics; use async_trait::async_trait; use ibc_relayer_components::core::traits::sync::Async; use ibc_relayer_components::runtime::traits::subscription::Subscription; -use crate::one_for_all::traits::runtime::OfaRuntime; -use crate::one_for_all::traits::telemetry::OfaTelemetry; -use crate::one_for_all::types::runtime::OfaRuntimeWrapper; -use crate::one_for_all::types::telemetry::OfaTelemetryWrapper; +use crate::all_for_one::runtime::AfoRuntime; use crate::std_prelude::*; #[async_trait] @@ -29,11 +28,11 @@ pub trait OfaChainTypes: Async { Corresponds to [`HasRuntime::Runtime`](ibc_relayer_components::runtime::traits::runtime::HasRuntime::Runtime). */ - type Runtime: OfaRuntime; + type Runtime: AfoRuntime; type Logger: HasBaseLogLevels; - type Telemetry: OfaTelemetry; + type Telemetry: HasBasicMetrics; /** Corresponds to @@ -172,13 +171,13 @@ pub trait OfaChainTypes: Async { #[async_trait] pub trait OfaChain: OfaChainTypes { - fn runtime(&self) -> &OfaRuntimeWrapper; + fn runtime(&self) -> &Self::Runtime; - fn runtime_error(e: ::Error) -> Self::Error; + fn runtime_error(e: ::Error) -> Self::Error; fn logger(&self) -> &Self::Logger; - fn telemetry(&self) -> &OfaTelemetryWrapper; + fn telemetry(&self) -> &Self::Telemetry; fn log_event<'a>(event: &'a Self::Event) -> ::LogValue<'a>; diff --git a/crates/relayer-all-in-one/src/one_for_all/traits/mod.rs b/crates/relayer-all-in-one/src/one_for_all/traits/mod.rs index 49f3510015..9caa1fb264 100644 --- a/crates/relayer-all-in-one/src/one_for_all/traits/mod.rs +++ b/crates/relayer-all-in-one/src/one_for_all/traits/mod.rs @@ -3,5 +3,4 @@ pub mod builder; pub mod chain; pub mod relay; pub mod runtime; -pub mod telemetry; pub mod transaction; diff --git a/crates/relayer-all-in-one/src/one_for_all/traits/relay.rs b/crates/relayer-all-in-one/src/one_for_all/traits/relay.rs index 0c616471dd..059eeb0176 100644 --- a/crates/relayer-all-in-one/src/one_for_all/traits/relay.rs +++ b/crates/relayer-all-in-one/src/one_for_all/traits/relay.rs @@ -5,14 +5,14 @@ use core::fmt::Debug; use async_trait::async_trait; +use ibc_relayer_components::core::traits::error::HasErrorType; use ibc_relayer_components::core::traits::sync::Async; use ibc_relayer_components::logger::traits::level::HasBaseLogLevels; +use crate::all_for_one::runtime::AfoRuntime; use crate::one_for_all::traits::chain::{OfaChainTypes, OfaIbcChain}; -use crate::one_for_all::traits::runtime::OfaRuntime; use crate::one_for_all::types::batch::aliases::MessageBatchSender; use crate::one_for_all::types::chain::OfaChainWrapper; -use crate::one_for_all::types::runtime::OfaRuntimeWrapper; use crate::std_prelude::*; #[async_trait] @@ -22,7 +22,7 @@ pub trait OfaRelay: Async { */ type Error: Debug + Clone + Async; - type Runtime: OfaRuntime; + type Runtime: AfoRuntime; type Logger: HasBaseLogLevels; @@ -39,7 +39,7 @@ pub trait OfaRelay: Async { type PacketLock<'a>: Send; - fn runtime_error(e: ::Error) -> Self::Error; + fn runtime_error(e: ::Error) -> Self::Error; fn src_chain_error(e: ::Error) -> Self::Error; @@ -73,7 +73,7 @@ pub trait OfaRelay: Async { src_channel_id: &::ChannelId, ) -> Self::Error; - fn runtime(&self) -> &OfaRuntimeWrapper; + fn runtime(&self) -> &Self::Runtime; fn logger(&self) -> &Self::Logger; diff --git a/crates/relayer-all-in-one/src/one_for_all/traits/runtime.rs b/crates/relayer-all-in-one/src/one_for_all/traits/runtime.rs index a21fa188e4..8ee550264e 100644 --- a/crates/relayer-all-in-one/src/one_for_all/traits/runtime.rs +++ b/crates/relayer-all-in-one/src/one_for_all/traits/runtime.rs @@ -8,7 +8,6 @@ use futures_core::{Future, Stream}; use ibc_relayer_components::core::traits::sync::Async; use ibc_relayer_components_extra::runtime::traits::spawn::TaskHandle; -use crate::one_for_all::types::runtime::LogLevel; use crate::std_prelude::*; #[async_trait] @@ -37,8 +36,6 @@ pub trait OfaRuntime: Async { where T: Async; - async fn log(&self, level: LogLevel, message: &str); - async fn sleep(&self, duration: Duration); fn now(&self) -> Self::Time; diff --git a/crates/relayer-all-in-one/src/one_for_all/traits/telemetry.rs b/crates/relayer-all-in-one/src/one_for_all/traits/telemetry.rs deleted file mode 100644 index 8db8edf2a7..0000000000 --- a/crates/relayer-all-in-one/src/one_for_all/traits/telemetry.rs +++ /dev/null @@ -1,36 +0,0 @@ -use ibc_relayer_components::core::traits::sync::Async; -use ibc_relayer_components_extra::telemetry::traits::metrics::HasLabel; - -pub trait OfaTelemetry: HasLabel { - type CounterType: Async + From; - type ValueRecorderType: Async + From; - type UpDownCounterType: Async + From; - type Unit: Async; - - fn update_counter_metric( - &self, - name: &str, - labels: &[Self::Label], - value: Self::CounterType, - description: Option<&str>, - unit: Option, - ); - - fn update_value_recorder_metric( - &self, - name: &str, - labels: &[Self::Label], - value: Self::ValueRecorderType, - description: Option<&str>, - unit: Option, - ); - - fn update_up_down_counter_metric( - &self, - name: &str, - labels: &[Self::Label], - value: Self::UpDownCounterType, - description: Option<&str>, - unit: Option, - ); -} diff --git a/crates/relayer-all-in-one/src/one_for_all/traits/transaction.rs b/crates/relayer-all-in-one/src/one_for_all/traits/transaction.rs index 1d59fd70c2..727fccb635 100644 --- a/crates/relayer-all-in-one/src/one_for_all/traits/transaction.rs +++ b/crates/relayer-all-in-one/src/one_for_all/traits/transaction.rs @@ -2,19 +2,20 @@ use core::fmt::{Debug, Display}; use core::time::Duration; use async_trait::async_trait; +use ibc_relayer_components::core::traits::error::HasErrorType; use ibc_relayer_components::core::traits::sync::Async; use ibc_relayer_components::logger::traits::level::HasBaseLogLevels; use ibc_relayer_components::logger::traits::logger::BaseLogger; +use ibc_relayer_components::runtime::traits::mutex::HasMutex; -use crate::one_for_all::traits::runtime::OfaRuntime; -use crate::one_for_all::types::runtime::OfaRuntimeWrapper; +use crate::all_for_one::runtime::AfoRuntime; use crate::std_prelude::*; #[async_trait] pub trait OfaTxContext: Async { type Error: Async + Debug; - type Runtime: OfaRuntime; + type Runtime: AfoRuntime; type Logger: HasBaseLogLevels; @@ -44,9 +45,9 @@ pub trait OfaTxContext: Async { type TxResponse: Async; - fn runtime(&self) -> &OfaRuntimeWrapper; + fn runtime(&self) -> &Self::Runtime; - fn runtime_error(e: ::Error) -> Self::Error; + fn runtime_error(e: ::Error) -> Self::Error; fn logger(&self) -> &Self::Logger; @@ -88,7 +89,7 @@ pub trait OfaTxContext: Async { fn mutex_for_nonce_allocation( &self, signer: &Self::Signer, - ) -> &::Mutex<()>; + ) -> &::Mutex<()>; fn parse_tx_response_as_events( response: Self::TxResponse, diff --git a/crates/relayer-all-in-one/src/one_for_all/types/batch/aliases.rs b/crates/relayer-all-in-one/src/one_for_all/types/batch/aliases.rs index 8a727fa31c..b869d1494b 100644 --- a/crates/relayer-all-in-one/src/one_for_all/types/batch/aliases.rs +++ b/crates/relayer-all-in-one/src/one_for_all/types/batch/aliases.rs @@ -1,18 +1,22 @@ +use ibc_relayer_components::runtime::traits::mutex::HasMutex; +use ibc_relayer_components_extra::runtime::traits::channel::HasChannelTypes; +use ibc_relayer_components_extra::runtime::traits::channel_once::HasChannelOnceTypes; + use crate::one_for_all::traits::chain::OfaChainTypes; -use crate::one_for_all::traits::runtime::OfaRuntime; use crate::std_prelude::*; pub type Runtime = ::Runtime; -pub type Mutex = as OfaRuntime>::Mutex; +pub type Mutex = as HasMutex>::Mutex; -pub type Sender = as OfaRuntime>::Sender; +pub type Sender = as HasChannelTypes>::Sender; -pub type Receiver = as OfaRuntime>::Receiver; +pub type Receiver = as HasChannelTypes>::Receiver; -pub type SenderOnce = as OfaRuntime>::SenderOnce; +pub type SenderOnce = as HasChannelOnceTypes>::SenderOnce; -pub type ReceiverOnce = as OfaRuntime>::ReceiverOnce; +pub type ReceiverOnce = + as HasChannelOnceTypes>::ReceiverOnce; pub type EventResult = Result::Event>>, Error>; diff --git a/crates/relayer-all-in-one/src/one_for_all/types/builder.rs b/crates/relayer-all-in-one/src/one_for_all/types/builder.rs index ecc39014a0..05ca60ef27 100644 --- a/crates/relayer-all-in-one/src/one_for_all/types/builder.rs +++ b/crates/relayer-all-in-one/src/one_for_all/types/builder.rs @@ -1,12 +1,12 @@ use alloc::collections::BTreeMap; use alloc::sync::Arc; +use ibc_relayer_components::runtime::traits::mutex::HasMutex; use crate::one_for_all::traits::birelay::OfaHomogeneousBiRelay; use crate::one_for_all::traits::builder::{ BatchSenderCacheA, BatchSenderCacheB, ChainACache, ChainBCache, OfaBuilder, RelayAToBCache, RelayBToACache, }; -use crate::one_for_all::traits::runtime::OfaRuntime; pub struct OfaBuilderWrapper where diff --git a/crates/relayer-all-in-one/src/one_for_all/types/mod.rs b/crates/relayer-all-in-one/src/one_for_all/types/mod.rs index 2061b7ec84..405078f232 100644 --- a/crates/relayer-all-in-one/src/one_for_all/types/mod.rs +++ b/crates/relayer-all-in-one/src/one_for_all/types/mod.rs @@ -4,6 +4,4 @@ pub mod builder; pub mod chain; pub mod component; pub mod relay; -pub mod runtime; -pub mod telemetry; pub mod transaction; diff --git a/crates/relayer-all-in-one/src/one_for_all/types/runtime.rs b/crates/relayer-all-in-one/src/one_for_all/types/runtime.rs deleted file mode 100644 index a5ae2ba8e4..0000000000 --- a/crates/relayer-all-in-one/src/one_for_all/types/runtime.rs +++ /dev/null @@ -1,29 +0,0 @@ -use alloc::sync::Arc; - -pub enum LogLevel { - Error, - Warn, - Info, - Debug, - Trace, -} - -pub struct OfaRuntimeWrapper { - pub runtime: Arc, -} - -impl OfaRuntimeWrapper { - pub fn new(runtime: Runtime) -> Self { - Self { - runtime: Arc::new(runtime), - } - } -} - -impl Clone for OfaRuntimeWrapper { - fn clone(&self) -> Self { - Self { - runtime: self.runtime.clone(), - } - } -} diff --git a/crates/relayer-all-in-one/src/one_for_all/types/telemetry.rs b/crates/relayer-all-in-one/src/one_for_all/types/telemetry.rs deleted file mode 100644 index 680543691d..0000000000 --- a/crates/relayer-all-in-one/src/one_for_all/types/telemetry.rs +++ /dev/null @@ -1,21 +0,0 @@ -use alloc::sync::Arc; - -pub struct OfaTelemetryWrapper { - pub telemetry: Arc, -} - -impl OfaTelemetryWrapper { - pub fn new(telemetry: Telemetry) -> Self { - Self { - telemetry: Arc::new(telemetry), - } - } -} - -impl Clone for OfaTelemetryWrapper { - fn clone(&self) -> Self { - Self { - telemetry: self.telemetry.clone(), - } - } -} diff --git a/crates/relayer-components-extra/src/builder/components/relay/batch.rs b/crates/relayer-components-extra/src/builder/components/relay/batch.rs index 9a8eb197ae..f5ed548fa3 100644 --- a/crates/relayer-components-extra/src/builder/components/relay/batch.rs +++ b/crates/relayer-components-extra/src/builder/components/relay/batch.rs @@ -20,7 +20,7 @@ use crate::batch::types::aliases::{MessageBatchReceiver, MessageBatchSender}; use crate::batch::worker::CanSpawnBatchMessageWorker; use crate::builder::traits::cache::HasBatchSenderCache; use crate::builder::traits::relay::CanBuildRelayWithBatch; -use crate::runtime::traits::channel::{CanCreateChannels, HasChannelTypes}; +use crate::runtime::traits::channel::{CanCloneSender, CanCreateChannels, HasChannelTypes}; use crate::runtime::traits::channel_once::HasChannelOnceTypes; use crate::std_prelude::*; @@ -31,7 +31,7 @@ impl RelayFromChainsBuilder for BuildRelayWithBatchWorker where Build: HasBiRelayType - + HasRuntimeWithMutex + + HasRuntime + HasBatchConfig + HasErrorType + CanBuildRelayWithBatch, @@ -48,8 +48,7 @@ where DstChain: HasRuntime + HasChainId, SrcRuntime: HasChannelTypes + HasChannelOnceTypes, DstRuntime: HasChannelTypes + HasChannelOnceTypes, - MessageBatchSender: Clone, - MessageBatchSender: Clone, + Build::Runtime: HasMutex, { async fn build_relay_from_chains( build: &Build, @@ -114,7 +113,7 @@ where } #[async_trait] -trait CanBuildBatchChannel: HasBiRelayType + HasErrorType +pub trait CanBuildBatchChannel: HasBiRelayType + HasErrorType where Target: ChainBuildTarget, TargetChain: HasRuntime, @@ -143,13 +142,12 @@ where Target: ChainBuildTarget, Chain: HasIbcChainTypes + HasRuntime, Counterparty: HasIbcChainTypes, - Runtime: CanCreateChannels + HasChannelOnceTypes, + Runtime: CanCreateChannels + HasChannelOnceTypes + CanCloneSender, Build: HasBatchSenderCache>, Chain::ChainId: Ord + Clone, Counterparty::ChainId: Ord + Clone, Chain::ClientId: Ord + Clone, Counterparty::ClientId: Ord + Clone, - MessageBatchSender>: Clone, { async fn build_batch_channel( &self, @@ -165,9 +163,6 @@ where ), Self::Error, > { - // let (sender, receiver) = Runtime::new_channel(); - // Ok((sender, Some(receiver))) - let mutex = self.batch_sender_cache(target); let mut sender_cache = Build::Runtime::acquire_mutex(mutex).await; @@ -180,10 +175,10 @@ where ); if let Some(sender) = sender_cache.get(&cache_key) { - Ok(((*sender).clone(), None)) + Ok((Runtime::clone_sender(sender), None)) } else { let (sender, receiver) = Runtime::new_channel(); - sender_cache.insert(cache_key, sender.clone()); + sender_cache.insert(cache_key, Runtime::clone_sender(&sender)); Ok((sender, Some(receiver))) } } diff --git a/crates/relayer-components-extra/src/runtime/traits/channel.rs b/crates/relayer-components-extra/src/runtime/traits/channel.rs index 0d6a0ed8b4..e16a4b6d80 100644 --- a/crates/relayer-components-extra/src/runtime/traits/channel.rs +++ b/crates/relayer-components-extra/src/runtime/traits/channel.rs @@ -151,3 +151,9 @@ pub trait CanStreamReceiver: HasChannelTypes { where T: Async; } + +pub trait CanCloneSender: HasChannelTypes { + fn clone_sender(sender: &Self::Sender) -> Self::Sender + where + T: Async; +} diff --git a/crates/relayer-cosmos/src/all_for_one/chain.rs b/crates/relayer-cosmos/src/all_for_one/chain.rs index f1ac3b3956..fd07c71b55 100644 --- a/crates/relayer-cosmos/src/all_for_one/chain.rs +++ b/crates/relayer-cosmos/src/all_for_one/chain.rs @@ -1,8 +1,7 @@ use alloc::sync::Arc; use ibc_relayer::chain::endpoint::ChainStatus; use ibc_relayer_all_in_one::all_for_one::chain::{AfoChain, AfoCounterpartyChain}; -use ibc_relayer_all_in_one::one_for_all::types::runtime::OfaRuntimeWrapper; -use ibc_relayer_runtime::tokio::context::TokioRuntimeContext; +use ibc_relayer_runtime::types::runtime::TokioRuntimeContext; use ibc_relayer_types::clients::ics07_tendermint::consensus_state::ConsensusState; use ibc_relayer_types::core::ics04_channel::events::WriteAcknowledgement; use ibc_relayer_types::core::ics04_channel::packet::Packet; @@ -20,7 +19,7 @@ use crate::types::error::Error; pub trait AfoCosmosChain: AfoChain< Counterparty, - AfoRuntime = OfaRuntimeWrapper, + AfoRuntime = TokioRuntimeContext, Error = Error, Height = Height, Timestamp = Timestamp, @@ -48,7 +47,7 @@ impl AfoCosmosChain for Chain where Chain: AfoChain< Counterparty, - AfoRuntime = OfaRuntimeWrapper, + AfoRuntime = TokioRuntimeContext, Error = Error, Height = Height, Timestamp = Timestamp, diff --git a/crates/relayer-cosmos/src/all_for_one/relay.rs b/crates/relayer-cosmos/src/all_for_one/relay.rs index 24c6a9d55f..1dad8b7bce 100644 --- a/crates/relayer-cosmos/src/all_for_one/relay.rs +++ b/crates/relayer-cosmos/src/all_for_one/relay.rs @@ -1,6 +1,5 @@ use ibc_relayer_all_in_one::all_for_one::relay::AfoRelay; -use ibc_relayer_all_in_one::one_for_all::types::runtime::OfaRuntimeWrapper; -use ibc_relayer_runtime::tokio::context::TokioRuntimeContext; +use ibc_relayer_runtime::types::runtime::TokioRuntimeContext; use ibc_relayer_types::core::ics04_channel::packet::Packet; use crate::all_for_one::chain::AfoCosmosChain; @@ -10,7 +9,7 @@ pub trait AfoCosmosRelay: AfoSrcChain = Self::CosmosSrcChain, AfoDstChain = Self::CosmosDstChain, Packet = Packet, - AfoRuntime = OfaRuntimeWrapper, + AfoRuntime = TokioRuntimeContext, > { type CosmosSrcChain: AfoCosmosChain; @@ -24,7 +23,7 @@ where AfoSrcChain = SrcChain, AfoDstChain = DstChain, Packet = Packet, - AfoRuntime = OfaRuntimeWrapper, + AfoRuntime = TokioRuntimeContext, >, SrcChain: AfoCosmosChain, DstChain: AfoCosmosChain, diff --git a/crates/relayer-cosmos/src/contexts/birelay.rs b/crates/relayer-cosmos/src/contexts/birelay.rs index 5d632d824d..b176f5d42e 100644 --- a/crates/relayer-cosmos/src/contexts/birelay.rs +++ b/crates/relayer-cosmos/src/contexts/birelay.rs @@ -1,7 +1,6 @@ use ibc_relayer::chain::handle::ChainHandle; use ibc_relayer_all_in_one::one_for_all::types::relay::OfaRelayWrapper; -use ibc_relayer_all_in_one::one_for_all::types::runtime::OfaRuntimeWrapper; -use ibc_relayer_runtime::tokio::context::TokioRuntimeContext; +use ibc_relayer_runtime::types::runtime::TokioRuntimeContext; use crate::contexts::relay::CosmosRelay; @@ -10,7 +9,7 @@ where ChainA: ChainHandle, ChainB: ChainHandle, { - pub runtime: OfaRuntimeWrapper, + pub runtime: TokioRuntimeContext, pub relay_a_to_b: OfaRelayWrapper>, pub relay_b_to_a: OfaRelayWrapper>, } @@ -21,7 +20,7 @@ where ChainB: ChainHandle, { pub fn new( - runtime: OfaRuntimeWrapper, + runtime: TokioRuntimeContext, relay_a_to_b: OfaRelayWrapper>, relay_b_to_a: OfaRelayWrapper>, ) -> Self { diff --git a/crates/relayer-cosmos/src/contexts/builder.rs b/crates/relayer-cosmos/src/contexts/builder.rs index d8cccae7a2..1b5354cf22 100644 --- a/crates/relayer-cosmos/src/contexts/builder.rs +++ b/crates/relayer-cosmos/src/contexts/builder.rs @@ -16,10 +16,8 @@ use ibc_relayer::keyring::Secp256k1KeyPair; use ibc_relayer::spawn::spawn_chain_runtime; use ibc_relayer_all_in_one::one_for_all::types::builder::OfaBuilderWrapper; use ibc_relayer_all_in_one::one_for_all::types::chain::OfaChainWrapper; -use ibc_relayer_all_in_one::one_for_all::types::runtime::OfaRuntimeWrapper; -use ibc_relayer_all_in_one::one_for_all::types::telemetry::OfaTelemetryWrapper; use ibc_relayer_components_extra::batch::types::config::BatchConfig; -use ibc_relayer_runtime::tokio::context::TokioRuntimeContext; +use ibc_relayer_runtime::types::runtime::TokioRuntimeContext; use ibc_relayer_types::core::ics24_host::identifier::ChainId; use ibc_relayer_types::core::ics24_host::identifier::ClientId; use tokio::runtime::Runtime as TokioRuntime; @@ -33,8 +31,8 @@ use crate::types::telemetry::CosmosTelemetry; pub struct CosmosBuilder { pub config: Config, pub packet_filter: PacketFilter, - pub telemetry: OfaTelemetryWrapper, - pub runtime: OfaRuntimeWrapper, + pub telemetry: CosmosTelemetry, + pub runtime: TokioRuntimeContext, pub batch_config: BatchConfig, pub key_map: HashMap, } @@ -48,9 +46,7 @@ impl CosmosBuilder { batch_config: BatchConfig, key_map: HashMap, ) -> Self { - let telemetry = OfaTelemetryWrapper::new(telemetry); - - let runtime = OfaRuntimeWrapper::new(TokioRuntimeContext::new(runtime)); + let runtime = TokioRuntimeContext::new(runtime); Self { config, @@ -84,7 +80,7 @@ impl CosmosBuilder { &self, chain_id: &ChainId, ) -> Result, Error> { - let runtime = self.runtime.runtime.runtime.clone(); + let runtime = self.runtime.runtime.clone(); let (handle, key, chain_config) = task::block_in_place(|| -> Result<_, Error> { let handle = spawn_chain_runtime::(&self.config, chain_id, runtime) diff --git a/crates/relayer-cosmos/src/contexts/chain.rs b/crates/relayer-cosmos/src/contexts/chain.rs index c7c5fd3481..e0a61aa59a 100644 --- a/crates/relayer-cosmos/src/contexts/chain.rs +++ b/crates/relayer-cosmos/src/contexts/chain.rs @@ -4,12 +4,10 @@ use ibc_relayer::chain::handle::ChainHandle; use ibc_relayer::config::EventSourceMode; use ibc_relayer::event::source::queries::all as all_queries; use ibc_relayer::keyring::Secp256k1KeyPair; -use ibc_relayer_all_in_one::one_for_all::types::runtime::OfaRuntimeWrapper; -use ibc_relayer_all_in_one::one_for_all::types::telemetry::OfaTelemetryWrapper; use ibc_relayer_all_in_one::one_for_all::types::transaction::OfaTxWrapper; use ibc_relayer_components::runtime::impls::subscription::empty::EmptySubscription; use ibc_relayer_components::runtime::traits::subscription::Subscription; -use ibc_relayer_runtime::tokio::context::TokioRuntimeContext; +use ibc_relayer_runtime::types::runtime::TokioRuntimeContext; use ibc_relayer_types::core::ics02_client::height::Height; use ibc_relayer_types::core::ics24_host::identifier::ChainId; use tendermint::abci::Event as AbciEvent; @@ -25,8 +23,8 @@ pub struct CosmosChain { pub handle: Handle, pub chain_id: ChainId, pub compat_mode: CompatMode, - pub runtime: OfaRuntimeWrapper, - pub telemetry: OfaTelemetryWrapper, + pub runtime: TokioRuntimeContext, + pub telemetry: CosmosTelemetry, pub subscription: Arc)>>, pub tx_context: OfaTxWrapper, } @@ -39,8 +37,8 @@ impl CosmosChain { compat_mode: CompatMode, key_entry: Secp256k1KeyPair, event_source_mode: EventSourceMode, - runtime: OfaRuntimeWrapper, - telemetry: OfaTelemetryWrapper, + runtime: TokioRuntimeContext, + telemetry: CosmosTelemetry, ) -> Self { let chain_version = tx_config.chain_id.version(); diff --git a/crates/relayer-cosmos/src/contexts/relay.rs b/crates/relayer-cosmos/src/contexts/relay.rs index 80590abf9b..06667c191d 100644 --- a/crates/relayer-cosmos/src/contexts/relay.rs +++ b/crates/relayer-cosmos/src/contexts/relay.rs @@ -5,8 +5,7 @@ use futures::lock::Mutex; use ibc_relayer::chain::handle::ChainHandle; use ibc_relayer::config::filter::PacketFilter; use ibc_relayer_all_in_one::one_for_all::types::chain::OfaChainWrapper; -use ibc_relayer_all_in_one::one_for_all::types::runtime::OfaRuntimeWrapper; -use ibc_relayer_runtime::tokio::context::TokioRuntimeContext; +use ibc_relayer_runtime::types::runtime::TokioRuntimeContext; use ibc_relayer_types::core::ics04_channel::packet::Sequence; use ibc_relayer_types::core::ics24_host::identifier::{ChannelId, ClientId, PortId}; @@ -18,7 +17,7 @@ where SrcChain: ChainHandle, DstChain: ChainHandle, { - pub runtime: OfaRuntimeWrapper, + pub runtime: TokioRuntimeContext, pub src_chain: OfaChainWrapper>, pub dst_chain: OfaChainWrapper>, pub src_client_id: ClientId, @@ -35,7 +34,7 @@ where DstChain: ChainHandle, { pub fn new( - runtime: OfaRuntimeWrapper, + runtime: TokioRuntimeContext, src_chain: OfaChainWrapper>, dst_chain: OfaChainWrapper>, src_client_id: ClientId, diff --git a/crates/relayer-cosmos/src/contexts/transaction.rs b/crates/relayer-cosmos/src/contexts/transaction.rs index ff62195f19..caa8b4f0c5 100644 --- a/crates/relayer-cosmos/src/contexts/transaction.rs +++ b/crates/relayer-cosmos/src/contexts/transaction.rs @@ -1,8 +1,7 @@ use futures::lock::Mutex; use ibc_relayer::chain::cosmos::types::config::TxConfig; use ibc_relayer::keyring::Secp256k1KeyPair; -use ibc_relayer_all_in_one::one_for_all::types::runtime::OfaRuntimeWrapper; -use ibc_relayer_runtime::tokio::context::TokioRuntimeContext; +use ibc_relayer_runtime::types::runtime::TokioRuntimeContext; use tendermint_rpc::HttpClient; pub struct CosmosTxContext { @@ -10,7 +9,7 @@ pub struct CosmosTxContext { pub rpc_client: HttpClient, pub key_entry: Secp256k1KeyPair, pub nonce_mutex: Mutex<()>, - pub runtime: OfaRuntimeWrapper, + pub runtime: TokioRuntimeContext, } impl CosmosTxContext { @@ -18,7 +17,7 @@ impl CosmosTxContext { tx_config: TxConfig, rpc_client: HttpClient, key_entry: Secp256k1KeyPair, - runtime: OfaRuntimeWrapper, + runtime: TokioRuntimeContext, ) -> Self { Self { tx_config, diff --git a/crates/relayer-cosmos/src/impls/birelay.rs b/crates/relayer-cosmos/src/impls/birelay.rs index 1d685e2a80..2013f6dd52 100644 --- a/crates/relayer-cosmos/src/impls/birelay.rs +++ b/crates/relayer-cosmos/src/impls/birelay.rs @@ -1,10 +1,9 @@ use ibc_relayer::chain::handle::ChainHandle; use ibc_relayer_all_in_one::one_for_all::traits::birelay::OfaBiRelay; use ibc_relayer_all_in_one::one_for_all::types::relay::OfaRelayWrapper; -use ibc_relayer_all_in_one::one_for_all::types::runtime::OfaRuntimeWrapper; -use ibc_relayer_runtime::tokio::context::TokioRuntimeContext; -use ibc_relayer_runtime::tokio::error::Error as TokioError; -use ibc_relayer_runtime::tokio::logger::tracing::TracingLogger; +use ibc_relayer_runtime::types::error::Error as TokioError; +use ibc_relayer_runtime::types::log::logger::TracingLogger; +use ibc_relayer_runtime::types::runtime::TokioRuntimeContext; use crate::contexts::birelay::CosmosBiRelay; use crate::contexts::relay::CosmosRelay; @@ -25,7 +24,7 @@ where type RelayBToA = CosmosRelay; - fn runtime(&self) -> &OfaRuntimeWrapper { + fn runtime(&self) -> &Self::Runtime { &self.runtime } diff --git a/crates/relayer-cosmos/src/impls/builder.rs b/crates/relayer-cosmos/src/impls/builder.rs index 6f821c8828..6e8b952a94 100644 --- a/crates/relayer-cosmos/src/impls/builder.rs +++ b/crates/relayer-cosmos/src/impls/builder.rs @@ -3,11 +3,10 @@ use ibc_relayer::chain::handle::BaseChainHandle; use ibc_relayer_all_in_one::one_for_all::traits::builder::OfaBuilder; use ibc_relayer_all_in_one::one_for_all::types::chain::OfaChainWrapper; use ibc_relayer_all_in_one::one_for_all::types::relay::OfaRelayWrapper; -use ibc_relayer_all_in_one::one_for_all::types::runtime::OfaRuntimeWrapper; use ibc_relayer_components_extra::batch::types::config::BatchConfig; -use ibc_relayer_runtime::tokio::context::TokioRuntimeContext; -use ibc_relayer_runtime::tokio::error::Error as TokioRuntimeError; -use ibc_relayer_runtime::tokio::logger::tracing::TracingLogger; +use ibc_relayer_runtime::types::error::Error as TokioRuntimeError; +use ibc_relayer_runtime::types::log::logger::TracingLogger; +use ibc_relayer_runtime::types::runtime::TokioRuntimeContext; use ibc_relayer_types::core::ics24_host::identifier::{ChainId, ClientId}; use crate::contexts::birelay::CosmosBiRelay; @@ -27,7 +26,7 @@ impl OfaBuilder for CosmosBuilder { type BiRelay = CosmosBiRelay; - fn runtime(&self) -> &OfaRuntimeWrapper { + fn runtime(&self) -> &TokioRuntimeContext { &self.runtime } diff --git a/crates/relayer-cosmos/src/impls/chain.rs b/crates/relayer-cosmos/src/impls/chain.rs index 9bd25263c5..de474127fe 100644 --- a/crates/relayer-cosmos/src/impls/chain.rs +++ b/crates/relayer-cosmos/src/impls/chain.rs @@ -8,14 +8,12 @@ use ibc_relayer::event::{ connection_open_ack_try_from_abci_event, connection_open_try_try_from_abci_event, }; use ibc_relayer_all_in_one::one_for_all::traits::chain::{OfaChain, OfaChainTypes, OfaIbcChain}; -use ibc_relayer_all_in_one::one_for_all::types::runtime::OfaRuntimeWrapper; -use ibc_relayer_all_in_one::one_for_all::types::telemetry::OfaTelemetryWrapper; use ibc_relayer_components::chain::traits::message_sender::CanSendMessages; use ibc_relayer_components::runtime::traits::subscription::Subscription; -use ibc_relayer_runtime::tokio::context::TokioRuntimeContext; -use ibc_relayer_runtime::tokio::error::Error as TokioError; -use ibc_relayer_runtime::tokio::logger::tracing::TracingLogger; -use ibc_relayer_runtime::tokio::logger::value::LogValue; +use ibc_relayer_runtime::types::error::Error as TokioError; +use ibc_relayer_runtime::types::log::logger::TracingLogger; +use ibc_relayer_runtime::types::log::value::LogValue; +use ibc_relayer_runtime::types::runtime::TokioRuntimeContext; use ibc_relayer_types::core::ics02_client::events::CLIENT_ID_ATTRIBUTE_KEY; use ibc_relayer_types::core::ics04_channel::events::{SendPacket, WriteAcknowledgement}; use ibc_relayer_types::core::ics04_channel::packet::Packet; @@ -177,7 +175,7 @@ impl OfaChain for CosmosChain where Chain: ChainHandle, { - fn runtime(&self) -> &OfaRuntimeWrapper { + fn runtime(&self) -> &TokioRuntimeContext { &self.runtime } @@ -201,7 +199,7 @@ where LogValue::Display(packet) } - fn telemetry(&self) -> &OfaTelemetryWrapper { + fn telemetry(&self) -> &CosmosTelemetry { &self.telemetry } diff --git a/crates/relayer-cosmos/src/impls/mod.rs b/crates/relayer-cosmos/src/impls/mod.rs index 722da69ca0..037cd4756d 100644 --- a/crates/relayer-cosmos/src/impls/mod.rs +++ b/crates/relayer-cosmos/src/impls/mod.rs @@ -3,4 +3,5 @@ pub mod builder; pub mod chain; pub mod relay; pub mod subscription; +pub mod telemetry; pub mod transaction; diff --git a/crates/relayer-cosmos/src/impls/relay.rs b/crates/relayer-cosmos/src/impls/relay.rs index a31cd2fb38..a723a41755 100644 --- a/crates/relayer-cosmos/src/impls/relay.rs +++ b/crates/relayer-cosmos/src/impls/relay.rs @@ -5,10 +5,9 @@ use ibc_relayer::chain::handle::ChainHandle; use ibc_relayer_all_in_one::one_for_all::traits::chain::OfaChain; use ibc_relayer_all_in_one::one_for_all::traits::relay::OfaRelay; use ibc_relayer_all_in_one::one_for_all::types::chain::OfaChainWrapper; -use ibc_relayer_all_in_one::one_for_all::types::runtime::OfaRuntimeWrapper; -use ibc_relayer_runtime::tokio::context::TokioRuntimeContext; -use ibc_relayer_runtime::tokio::error::Error as TokioError; -use ibc_relayer_runtime::tokio::logger::tracing::TracingLogger; +use ibc_relayer_runtime::types::error::Error as TokioError; +use ibc_relayer_runtime::types::log::logger::TracingLogger; +use ibc_relayer_runtime::types::runtime::TokioRuntimeContext; use ibc_relayer_types::core::ics04_channel::packet::Packet; use ibc_relayer_types::core::ics24_host::identifier::{ChannelId, ClientId, ConnectionId}; @@ -61,7 +60,7 @@ where e } - fn runtime(&self) -> &OfaRuntimeWrapper { + fn runtime(&self) -> &TokioRuntimeContext { &self.runtime } @@ -103,7 +102,7 @@ where } else { lock_table.insert(packet_key.clone()); - let runtime = &self.runtime().runtime.runtime; + let runtime = &self.runtime().runtime; let (sender, receiver) = channel(); diff --git a/crates/relayer-cosmos/src/impls/telemetry.rs b/crates/relayer-cosmos/src/impls/telemetry.rs new file mode 100644 index 0000000000..43e24e1b30 --- /dev/null +++ b/crates/relayer-cosmos/src/impls/telemetry.rs @@ -0,0 +1,138 @@ +use ibc_relayer_components_extra::telemetry::traits::metrics::{ + HasLabel, HasMetric, TelemetryCounter, TelemetryUpDownCounter, TelemetryValueRecorder, +}; +use opentelemetry::metrics::Unit; +use opentelemetry::KeyValue; + +use crate::types::telemetry::CosmosTelemetry; + +impl HasLabel for CosmosTelemetry { + type Label = KeyValue; + + fn new_label(key: &str, value: &str) -> Self::Label { + KeyValue::new(key.to_string(), value.to_string()) + } +} + +impl HasMetric for CosmosTelemetry { + type Value = u64; + + type Unit = Unit; + + fn update_metric( + &self, + name: &str, + labels: &[Self::Label], + value: Self::Value, + description: Option<&str>, + unit: Option, + ) { + let mut counters = self.counters.lock().unwrap(); + + if let Some(metric) = counters.get(name) { + metric.add(value, labels); + } else { + let builder = self.meter.u64_counter(name); + + let builder = if let Some(description) = description { + builder.with_description(description) + } else { + builder + }; + + let builder = if let Some(unit) = unit { + builder.with_unit(unit) + } else { + builder + }; + + let metric = builder.init(); + + metric.add(value, labels); + + counters.insert(name.to_string(), metric); + } + } +} + +impl HasMetric for CosmosTelemetry { + type Value = u64; + + type Unit = Unit; + + fn update_metric( + &self, + name: &str, + labels: &[Self::Label], + value: Self::Value, + description: Option<&str>, + unit: Option, + ) { + let mut value_recorders = self.value_recorders.lock().unwrap(); + + if let Some(metric) = value_recorders.get(name) { + metric.record(value, labels); + } else { + let builder = self.meter.u64_value_recorder(name); + + let builder = if let Some(description) = description { + builder.with_description(description) + } else { + builder + }; + + let builder = if let Some(unit) = unit { + builder.with_unit(unit) + } else { + builder + }; + + let metric = builder.init(); + + metric.record(value, labels); + + value_recorders.insert(name.to_string(), metric); + } + } +} + +impl HasMetric for CosmosTelemetry { + type Value = i64; + + type Unit = Unit; + + fn update_metric( + &self, + name: &str, + labels: &[Self::Label], + value: Self::Value, + description: Option<&str>, + unit: Option, + ) { + let mut updown_counters = self.updown_counters.lock().unwrap(); + + if let Some(metric) = updown_counters.get(name) { + metric.add(value, labels); + } else { + let builder = self.meter.i64_up_down_counter(name); + + let builder = if let Some(description) = description { + builder.with_description(description) + } else { + builder + }; + + let builder = if let Some(unit) = unit { + builder.with_unit(unit) + } else { + builder + }; + + let metric = builder.init(); + + metric.add(value, labels); + + updown_counters.insert(name.to_string(), metric); + } + } +} diff --git a/crates/relayer-cosmos/src/impls/transaction.rs b/crates/relayer-cosmos/src/impls/transaction.rs index d1dc499a70..21077b8a83 100644 --- a/crates/relayer-cosmos/src/impls/transaction.rs +++ b/crates/relayer-cosmos/src/impls/transaction.rs @@ -15,11 +15,10 @@ use ibc_relayer::chain::cosmos::types::tx::SignedTx; use ibc_relayer::config::types::Memo; use ibc_relayer::keyring::{Secp256k1KeyPair, SigningKeyPair}; use ibc_relayer_all_in_one::one_for_all::traits::transaction::OfaTxContext; -use ibc_relayer_all_in_one::one_for_all::types::runtime::OfaRuntimeWrapper; -use ibc_relayer_runtime::tokio::context::TokioRuntimeContext; -use ibc_relayer_runtime::tokio::error::Error as TokioError; -use ibc_relayer_runtime::tokio::logger::tracing::TracingLogger; -use ibc_relayer_runtime::tokio::logger::value::LogValue; +use ibc_relayer_runtime::types::error::Error as TokioError; +use ibc_relayer_runtime::types::log::logger::TracingLogger; +use ibc_relayer_runtime::types::log::value::LogValue; +use ibc_relayer_runtime::types::runtime::TokioRuntimeContext; use ibc_relayer_types::core::ics24_host::identifier::ChainId; use prost::Message as _; use tendermint::abci::Event as AbciEvent; @@ -56,7 +55,7 @@ impl OfaTxContext for CosmosTxContext { type TxResponse = TxResponse; - fn runtime(&self) -> &OfaRuntimeWrapper { + fn runtime(&self) -> &TokioRuntimeContext { &self.runtime } diff --git a/crates/relayer-cosmos/src/methods/runtime.rs b/crates/relayer-cosmos/src/methods/runtime.rs index 22c18fed0c..ab33bf6d0f 100644 --- a/crates/relayer-cosmos/src/methods/runtime.rs +++ b/crates/relayer-cosmos/src/methods/runtime.rs @@ -34,7 +34,6 @@ where let chain_handle = self.handle.clone(); self.runtime - .runtime .runtime .spawn_blocking(move || cont(chain_handle)) .await diff --git a/crates/relayer-cosmos/src/types/error.rs b/crates/relayer-cosmos/src/types/error.rs index 99225f5973..20d1c08b38 100644 --- a/crates/relayer-cosmos/src/types/error.rs +++ b/crates/relayer-cosmos/src/types/error.rs @@ -6,7 +6,7 @@ use ibc_relayer::error::Error as RelayerError; use ibc_relayer::foreign_client::ForeignClientError; use ibc_relayer::spawn::SpawnError; use ibc_relayer::supervisor::error::Error as SupervisorError; -use ibc_relayer_runtime::tokio::error::Error as TokioError; +use ibc_relayer_runtime::types::error::Error as TokioError; use ibc_relayer_types::core::ics02_client::error::Error as ClientError; use ibc_relayer_types::core::ics04_channel::error::Error as ChannelError; use ibc_relayer_types::core::ics23_commitment; diff --git a/crates/relayer-cosmos/src/types/telemetry.rs b/crates/relayer-cosmos/src/types/telemetry.rs index 4ee539b0a5..0f88e62511 100644 --- a/crates/relayer-cosmos/src/types/telemetry.rs +++ b/crates/relayer-cosmos/src/types/telemetry.rs @@ -2,145 +2,29 @@ use alloc::sync::Arc; use std::collections::HashMap; use std::sync::Mutex; -use ibc_relayer_all_in_one::one_for_all::traits::telemetry::OfaTelemetry; -use ibc_relayer_components_extra::telemetry::traits::metrics::HasLabel; use opentelemetry::{ global, - metrics::{Counter, Meter, Unit, UpDownCounter, ValueRecorder}, - KeyValue, + metrics::{Counter, Meter, UpDownCounter, ValueRecorder}, }; -pub struct TelemetryState { - pub meter: Meter, - pub counters: HashMap>, - pub value_recorders: HashMap>, - pub updown_counters: HashMap>, -} - +#[derive(Clone)] pub struct CosmosTelemetry { - pub telemetry_state: Arc>, -} + pub meter: Arc, -impl CosmosTelemetry { - pub fn new(telemetry_state: TelemetryState) -> Self { - Self { - telemetry_state: Arc::new(Mutex::new(telemetry_state)), - } - } -} + pub counters: Arc>>>, -impl Default for CosmosTelemetry { - fn default() -> Self { - Self::new(TelemetryState { - meter: global::meter("hermes"), - counters: HashMap::new(), - value_recorders: HashMap::new(), - updown_counters: HashMap::new(), - }) - } -} + pub value_recorders: Arc>>>, -impl HasLabel for CosmosTelemetry { - type Label = KeyValue; - fn new_label(key: &str, value: &str) -> Self::Label { - KeyValue::new(key.to_string(), value.to_string()) - } + pub updown_counters: Arc>>>, } -impl OfaTelemetry for CosmosTelemetry { - type CounterType = u64; - type ValueRecorderType = u64; - type UpDownCounterType = i64; - type Unit = Unit; - - fn update_counter_metric( - &self, - name: &str, - labels: &[Self::Label], - value: Self::CounterType, - description: Option<&str>, - unit: Option, - ) { - let mut telemetry_state = self.telemetry_state.lock().unwrap(); - if let Some(metric) = telemetry_state.counters.get(name) { - metric.add(value, labels); - } else { - let builder = telemetry_state.meter.u64_counter(name); - let builder = if let Some(description) = description { - builder.with_description(description) - } else { - builder - }; - let builder = if let Some(unit) = unit { - builder.with_unit(unit) - } else { - builder - }; - let metric = builder.init(); - metric.add(value, labels); - telemetry_state.counters.insert(name.to_string(), metric); - } - } - - fn update_value_recorder_metric( - &self, - name: &str, - labels: &[Self::Label], - value: Self::ValueRecorderType, - description: Option<&str>, - unit: Option, - ) { - let mut telemetry_state = self.telemetry_state.lock().unwrap(); - if let Some(metric) = telemetry_state.value_recorders.get(name) { - metric.record(value, labels); - } else { - let builder = telemetry_state.meter.u64_value_recorder(name); - let builder = if let Some(description) = description { - builder.with_description(description) - } else { - builder - }; - let builder = if let Some(unit) = unit { - builder.with_unit(unit) - } else { - builder - }; - let metric = builder.init(); - metric.record(value, labels); - telemetry_state - .value_recorders - .insert(name.to_string(), metric); - } - } - - fn update_up_down_counter_metric( - &self, - name: &str, - labels: &[Self::Label], - value: Self::UpDownCounterType, - description: Option<&str>, - unit: Option, - ) { - let mut telemetry_state = self.telemetry_state.lock().unwrap(); - if let Some(metric) = telemetry_state.updown_counters.get(name) { - metric.add(value, labels); - } else { - let builder = telemetry_state.meter.i64_up_down_counter(name); - let builder = if let Some(description) = description { - builder.with_description(description) - } else { - builder - }; - let builder = if let Some(unit) = unit { - builder.with_unit(unit) - } else { - builder - }; - let metric = builder.init(); - metric.add(value, labels); - telemetry_state - .updown_counters - .insert(name.to_string(), metric); +impl Default for CosmosTelemetry { + fn default() -> Self { + Self { + meter: Arc::new(global::meter("hermes")), + counters: Arc::new(Mutex::new(HashMap::new())), + value_recorders: Arc::new(Mutex::new(HashMap::new())), + updown_counters: Arc::new(Mutex::new(HashMap::new())), } } } diff --git a/crates/relayer-mock/src/relayer_mock/base/error.rs b/crates/relayer-mock/src/relayer_mock/base/error.rs index 5ce005933f..2d6f8fcf3c 100644 --- a/crates/relayer-mock/src/relayer_mock/base/error.rs +++ b/crates/relayer-mock/src/relayer_mock/base/error.rs @@ -3,7 +3,7 @@ use alloc::sync::Arc; use eyre::Report; use flex_error::{define_error, TraceError}; -use ibc_relayer_runtime::tokio::error::Error as TokioError; +use ibc_relayer_runtime::types::error::Error as TokioError; pub type Error = Arc; diff --git a/crates/relayer-mock/src/relayer_mock/base/impls/chain.rs b/crates/relayer-mock/src/relayer_mock/base/impls/chain.rs index 254ed03254..d13da124a0 100644 --- a/crates/relayer-mock/src/relayer_mock/base/impls/chain.rs +++ b/crates/relayer-mock/src/relayer_mock/base/impls/chain.rs @@ -49,9 +49,9 @@ use ibc_relayer_components::chain::traits::types::timestamp::HasTimestampType; use ibc_relayer_components::core::traits::error::HasErrorType; use ibc_relayer_components::logger::traits::has_logger::{HasLogger, HasLoggerType}; use ibc_relayer_components::runtime::traits::runtime::HasRuntime; -use ibc_relayer_runtime::tokio::error::Error as TokioError; -use ibc_relayer_runtime::tokio::logger::tracing::TracingLogger; -use ibc_relayer_runtime::tokio::logger::value::LogValue; +use ibc_relayer_runtime::types::error::Error as TokioError; +use ibc_relayer_runtime::types::log::logger::TracingLogger; +use ibc_relayer_runtime::types::log::value::LogValue; use crate::relayer_mock::base::error::{BaseError, Error}; use crate::relayer_mock::base::types::aliases::{ diff --git a/crates/relayer-mock/src/relayer_mock/base/impls/relay.rs b/crates/relayer-mock/src/relayer_mock/base/impls/relay.rs index 4a3bba42f0..221316a07c 100644 --- a/crates/relayer-mock/src/relayer_mock/base/impls/relay.rs +++ b/crates/relayer-mock/src/relayer_mock/base/impls/relay.rs @@ -9,11 +9,11 @@ use ibc_relayer_components::relay::traits::packet_relayers::lock::HasPacketLock; use ibc_relayer_components::relay::traits::target::{DestinationTarget, SourceTarget}; use ibc_relayer_components::relay::traits::update_client::UpdateClientMessageBuilder; use ibc_relayer_components::runtime::traits::runtime::HasRuntime; -use ibc_relayer_runtime::tokio::logger::tracing::TracingLogger; +use ibc_relayer_runtime::types::log::logger::TracingLogger; use std::vec; use async_trait::async_trait; -use ibc_relayer_runtime::tokio::error::Error as TokioError; +use ibc_relayer_runtime::types::error::Error as TokioError; use crate::relayer_mock::base::error::{BaseError, Error}; use crate::relayer_mock::base::types::aliases::ClientId; diff --git a/crates/relayer-mock/src/relayer_mock/base/types/runtime.rs b/crates/relayer-mock/src/relayer_mock/base/types/runtime.rs index 724d932445..38552798e1 100644 --- a/crates/relayer-mock/src/relayer_mock/base/types/runtime.rs +++ b/crates/relayer-mock/src/relayer_mock/base/types/runtime.rs @@ -6,7 +6,7 @@ use ibc_relayer_components::runtime::traits::sleep::CanSleep; use ibc_relayer_components::runtime::traits::time::HasTime; use async_trait::async_trait; -use ibc_relayer_runtime::tokio::error::Error as TokioError; +use ibc_relayer_runtime::types::error::Error as TokioError; use crate::relayer_mock::base::types::aliases::MockTimestamp; use crate::relayer_mock::util::clock::MockClock; diff --git a/crates/relayer-runtime/src/tokio/logger/tracing.rs b/crates/relayer-runtime/src/impls/logger.rs similarity index 82% rename from crates/relayer-runtime/src/tokio/logger/tracing.rs rename to crates/relayer-runtime/src/impls/logger.rs index 76f5165ffd..42e399fa35 100644 --- a/crates/relayer-runtime/src/tokio/logger/tracing.rs +++ b/crates/relayer-runtime/src/impls/logger.rs @@ -5,14 +5,13 @@ use ibc_relayer_components::logger::traits::level::{ use ibc_relayer_components::logger::traits::logger::BaseLogger; use tracing::{debug, error, event_enabled, info, trace, warn, Level}; -use crate::tokio::logger::level::LogLevel; -use crate::tokio::logger::log::Log; -use crate::tokio::logger::value::LogValue; - -pub struct TracingLogger; +use crate::types::log::entries::LogEntries; +use crate::types::log::level::LogLevel; +use crate::types::log::logger::TracingLogger; +use crate::types::log::value::LogValue; impl BaseLogger for TracingLogger { - type Log<'a, 'r> = Log<'a>; + type Log<'a, 'r> = LogEntries<'a>; type LogValue<'a> = LogValue<'a>; @@ -27,35 +26,35 @@ impl BaseLogger for TracingLogger { match level { LogLevel::Trace => { if event_enabled!(Level::TRACE) { - let log = Log::default(); + let log: LogEntries<'_> = LogEntries::default(); build_log(&log); trace!(message = message, details = log.to_string()) } } LogLevel::Debug => { if event_enabled!(Level::DEBUG) { - let log = Log::default(); + let log = LogEntries::default(); build_log(&log); debug!(message = message, details = log.to_string()) } } LogLevel::Info => { if event_enabled!(Level::INFO) { - let log = Log::default(); + let log = LogEntries::default(); build_log(&log); info!(message = message, details = log.to_string()) } } LogLevel::Warn => { if event_enabled!(Level::WARN) { - let log = Log::default(); + let log = LogEntries::default(); build_log(&log); warn!(warning = message, details = log.to_string()) } } LogLevel::Error => { if event_enabled!(Level::ERROR) { - let log = Log::default(); + let log = LogEntries::default(); build_log(&log); error!(message = message, details = log.to_string()) } @@ -63,7 +62,7 @@ impl BaseLogger for TracingLogger { } } - fn log_field<'a, 'b, 'r>(log: &Log<'a>, key: &'b str, value: LogValue<'a>) + fn log_field<'a, 'b, 'r>(log: &LogEntries<'a>, key: &'b str, value: LogValue<'a>) where 'b: 'a, { @@ -89,7 +88,7 @@ impl BaseLogger for TracingLogger { } fn map_values<'a>(build_log: impl for<'s> FnOnce(&'s Self::Log<'a, 's>)) -> LogValue<'a> { - let in_log = Log::default(); + let in_log = LogEntries::default(); build_log(&in_log); let values = in_log.fields.into_inner(); LogValue::Nested(values) diff --git a/crates/relayer-runtime/src/impls/mod.rs b/crates/relayer-runtime/src/impls/mod.rs new file mode 100644 index 0000000000..947c7ea081 --- /dev/null +++ b/crates/relayer-runtime/src/impls/mod.rs @@ -0,0 +1,2 @@ +pub mod logger; +pub mod runtime; diff --git a/crates/relayer-runtime/src/impls/runtime/channel.rs b/crates/relayer-runtime/src/impls/runtime/channel.rs new file mode 100644 index 0000000000..8daf361f60 --- /dev/null +++ b/crates/relayer-runtime/src/impls/runtime/channel.rs @@ -0,0 +1,120 @@ +use core::pin::Pin; + +use async_trait::async_trait; +use futures::Stream; +use ibc_relayer_components::core::traits::sync::Async; +use ibc_relayer_components_extra::runtime::traits::channel::{ + CanCloneSender, CanCreateChannels, CanStreamReceiver, CanUseChannels, HasChannelTypes, +}; +use ibc_relayer_components_extra::runtime::traits::channel_once::{ + CanCreateChannelsOnce, CanUseChannelsOnce, HasChannelOnceTypes, +}; +use tokio::sync::{mpsc, oneshot}; +use tokio_stream::wrappers::UnboundedReceiverStream; + +use crate::types::error::Error; +use crate::types::runtime::TokioRuntimeContext; + +impl HasChannelTypes for TokioRuntimeContext { + type Sender = mpsc::UnboundedSender + where + T: Async; + + type Receiver = mpsc::UnboundedReceiver + where + T: Async; +} + +impl HasChannelOnceTypes for TokioRuntimeContext { + type SenderOnce = oneshot::Sender + where + T: Async; + + type ReceiverOnce = oneshot::Receiver + where + T: Async; +} + +impl CanCreateChannels for TokioRuntimeContext { + fn new_channel() -> (Self::Sender, Self::Receiver) + where + T: Async, + { + mpsc::unbounded_channel() + } +} + +impl CanCreateChannelsOnce for TokioRuntimeContext { + fn new_channel_once() -> (Self::SenderOnce, Self::ReceiverOnce) + where + T: Async, + { + let (sender, receiver) = oneshot::channel(); + (sender, receiver) + } +} + +#[async_trait] +impl CanUseChannels for TokioRuntimeContext { + fn send(sender: &Self::Sender, value: T) -> Result<(), Self::Error> + where + T: Async, + { + sender.send(value).map_err(|_| Error::channel_closed()) + } + + async fn receive(receiver: &mut Self::Receiver) -> Result + where + T: Async, + { + receiver.recv().await.ok_or_else(Error::channel_closed) + } + + fn try_receive(receiver: &mut Self::Receiver) -> Result, Self::Error> + where + T: Async, + { + match receiver.try_recv() { + Ok(batch) => Ok(Some(batch)), + Err(mpsc::error::TryRecvError::Empty) => Ok(None), + Err(mpsc::error::TryRecvError::Disconnected) => Err(Error::channel_closed()), + } + } +} + +#[async_trait] +impl CanUseChannelsOnce for TokioRuntimeContext { + fn send_once(sender: Self::SenderOnce, value: T) -> Result<(), Self::Error> + where + T: Async, + { + sender.send(value).map_err(|_| Error::channel_closed()) + } + + async fn receive_once(receiver: Self::ReceiverOnce) -> Result + where + T: Async, + { + receiver.await.map_err(|_| Error::channel_closed()) + } +} + +impl CanStreamReceiver for TokioRuntimeContext { + fn receiver_to_stream( + receiver: Self::Receiver, + ) -> Pin + Send + 'static>> + where + T: Async, + { + Box::pin(UnboundedReceiverStream::new(receiver)) + } +} + +impl CanCloneSender for TokioRuntimeContext { + fn clone_sender(sender: &Self::Sender) -> Self::Sender + where + T: Async, + { + sender.clone() + } +} diff --git a/crates/relayer-runtime/src/impls/runtime/error.rs b/crates/relayer-runtime/src/impls/runtime/error.rs new file mode 100644 index 0000000000..b1224eb474 --- /dev/null +++ b/crates/relayer-runtime/src/impls/runtime/error.rs @@ -0,0 +1,8 @@ +use ibc_relayer_components::core::traits::error::HasErrorType; + +use crate::types::error::Error; +use crate::types::runtime::TokioRuntimeContext; + +impl HasErrorType for TokioRuntimeContext { + type Error = Error; +} diff --git a/crates/relayer-all-in-one/src/one_for_all/impls/runtime/mod.rs b/crates/relayer-runtime/src/impls/runtime/mod.rs similarity index 100% rename from crates/relayer-all-in-one/src/one_for_all/impls/runtime/mod.rs rename to crates/relayer-runtime/src/impls/runtime/mod.rs diff --git a/crates/relayer-runtime/src/impls/runtime/mutex.rs b/crates/relayer-runtime/src/impls/runtime/mutex.rs new file mode 100644 index 0000000000..6d132784b5 --- /dev/null +++ b/crates/relayer-runtime/src/impls/runtime/mutex.rs @@ -0,0 +1,21 @@ +use async_trait::async_trait; +use futures::lock::{Mutex, MutexGuard}; +use ibc_relayer_components::core::traits::sync::Async; +use ibc_relayer_components::runtime::traits::mutex::HasMutex; + +use crate::types::runtime::TokioRuntimeContext; + +#[async_trait] +impl HasMutex for TokioRuntimeContext { + type Mutex = Mutex; + + type MutexGuard<'a, T: Async> = MutexGuard<'a, T>; + + fn new_mutex(item: T) -> Self::Mutex { + Mutex::new(item) + } + + async fn acquire_mutex<'a, T: Async>(mutex: &'a Self::Mutex) -> Self::MutexGuard<'a, T> { + mutex.lock().await + } +} diff --git a/crates/relayer-runtime/src/impls/runtime/sleep.rs b/crates/relayer-runtime/src/impls/runtime/sleep.rs new file mode 100644 index 0000000000..aa2ebd61de --- /dev/null +++ b/crates/relayer-runtime/src/impls/runtime/sleep.rs @@ -0,0 +1,13 @@ +use async_trait::async_trait; +use core::time::Duration; +use ibc_relayer_components::runtime::traits::sleep::CanSleep; +use tokio::time::sleep; + +use crate::types::runtime::TokioRuntimeContext; + +#[async_trait] +impl CanSleep for TokioRuntimeContext { + async fn sleep(&self, duration: Duration) { + sleep(duration).await; + } +} diff --git a/crates/relayer-runtime/src/impls/runtime/spawn.rs b/crates/relayer-runtime/src/impls/runtime/spawn.rs new file mode 100644 index 0000000000..ef41c8f126 --- /dev/null +++ b/crates/relayer-runtime/src/impls/runtime/spawn.rs @@ -0,0 +1,39 @@ +use core::future::Future; +use core::pin::Pin; +use ibc_relayer_components_extra::runtime::traits::spawn::{HasSpawner, Spawner, TaskHandle}; + +use crate::types::runtime::TokioRuntimeContext; +use crate::types::task::TokioTaskHandle; + +impl HasSpawner for TokioRuntimeContext { + type Spawner = Self; + + fn spawner(&self) -> Self::Spawner { + self.clone() + } +} + +impl Spawner for TokioRuntimeContext { + fn spawn(&self, task: F) -> Box + where + F: Future + Send + 'static, + F::Output: Send + 'static, + { + let join_handle = self.runtime.spawn(async move { + task.await; + }); + Box::new(TokioTaskHandle(join_handle)) + } +} + +impl TaskHandle for TokioTaskHandle { + fn abort(self: Box) { + self.0.abort(); + } + + fn into_future(self: Box) -> Pin + Send + 'static>> { + Box::pin(async move { + let _ = self.0.await; + }) + } +} diff --git a/crates/relayer-runtime/src/impls/runtime/time.rs b/crates/relayer-runtime/src/impls/runtime/time.rs new file mode 100644 index 0000000000..ad5bbdb0c4 --- /dev/null +++ b/crates/relayer-runtime/src/impls/runtime/time.rs @@ -0,0 +1,18 @@ +use core::time::Duration; +use std::time::Instant; + +use ibc_relayer_components::runtime::traits::time::HasTime; + +use crate::types::runtime::TokioRuntimeContext; + +impl HasTime for TokioRuntimeContext { + type Time = Instant; + + fn now(&self) -> Instant { + Instant::now() + } + + fn duration_since(time: &Instant, other: &Instant) -> Duration { + time.duration_since(*other) + } +} diff --git a/crates/relayer-runtime/src/lib.rs b/crates/relayer-runtime/src/lib.rs index 9b8bd3c5d0..f7c9d4edbf 100644 --- a/crates/relayer-runtime/src/lib.rs +++ b/crates/relayer-runtime/src/lib.rs @@ -1,3 +1,4 @@ extern crate alloc; -pub mod tokio; +pub mod impls; +pub mod types; diff --git a/crates/relayer-runtime/src/tokio/context.rs b/crates/relayer-runtime/src/tokio/context.rs deleted file mode 100644 index 7860c561ea..0000000000 --- a/crates/relayer-runtime/src/tokio/context.rs +++ /dev/null @@ -1,176 +0,0 @@ -use alloc::sync::Arc; -use core::future::Future; -use core::pin::Pin; -use core::time::Duration; -use std::time::Instant; - -use async_trait::async_trait; -use futures::lock::{Mutex, MutexGuard}; -use futures::stream::Stream; -use ibc_relayer_all_in_one::one_for_all::traits::runtime::OfaRuntime; -use ibc_relayer_all_in_one::one_for_all::types::runtime::LogLevel; -use ibc_relayer_components::core::traits::sync::Async; -use ibc_relayer_components_extra::runtime::traits::spawn::TaskHandle; -use tokio::runtime::Runtime; -use tokio::sync::{mpsc, oneshot}; -use tokio::task::JoinHandle; -use tokio::time::sleep; -use tokio_stream::wrappers::UnboundedReceiverStream; -use tracing; - -use super::error::Error as TokioError; - -pub struct TokioRuntimeContext { - pub runtime: Arc, -} - -pub struct TokioTaskHandle(pub JoinHandle<()>); - -impl TokioRuntimeContext { - pub fn new(runtime: Arc) -> Self { - Self { runtime } - } -} - -#[async_trait] -impl OfaRuntime for TokioRuntimeContext { - type Error = TokioError; - - type Time = Instant; - - type Mutex = Mutex; - - type MutexGuard<'a, T: Async> = MutexGuard<'a, T>; - - type Sender = mpsc::UnboundedSender - where - T: Async; - - type Receiver = mpsc::UnboundedReceiver - where - T: Async; - - type SenderOnce = oneshot::Sender - where - T: Async; - - type ReceiverOnce = oneshot::Receiver - where - T: Async; - - async fn log(&self, level: LogLevel, message: &str) { - match level { - LogLevel::Error => tracing::error!(message), - LogLevel::Warn => tracing::warn!(message), - LogLevel::Info => tracing::info!(message), - LogLevel::Debug => tracing::debug!(message), - LogLevel::Trace => tracing::trace!(message), - } - } - - async fn sleep(&self, duration: Duration) { - sleep(duration).await; - } - - fn now(&self) -> Instant { - Instant::now() - } - - fn duration_since(time: &Instant, other: &Instant) -> Duration { - time.duration_since(*other) - } - - fn new_mutex(item: T) -> Self::Mutex { - Mutex::new(item) - } - - async fn acquire_mutex<'a, T: Async>(mutex: &'a Self::Mutex) -> Self::MutexGuard<'a, T> { - mutex.lock().await - } - - fn spawn(&self, task: F) -> Box - where - F: Future + Send + 'static, - F::Output: Send + 'static, - { - let join_handle = self.runtime.spawn(async move { - task.await; - }); - Box::new(TokioTaskHandle(join_handle)) - } - - fn new_channel() -> (Self::Sender, Self::Receiver) - where - T: Async, - { - mpsc::unbounded_channel() - } - - fn send(sender: &Self::Sender, value: T) -> Result<(), Self::Error> - where - T: Async, - { - sender.send(value).map_err(|_| TokioError::channel_closed()) - } - - async fn receive(receiver: &mut Self::Receiver) -> Result - where - T: Async, - { - receiver.recv().await.ok_or_else(TokioError::channel_closed) - } - - fn try_receive(receiver: &mut Self::Receiver) -> Result, Self::Error> - where - T: Async, - { - match receiver.try_recv() { - Ok(batch) => Ok(Some(batch)), - Err(mpsc::error::TryRecvError::Empty) => Ok(None), - Err(mpsc::error::TryRecvError::Disconnected) => Err(TokioError::channel_closed()), - } - } - - fn receiver_to_stream( - receiver: Self::Receiver, - ) -> Pin + Send + 'static>> - where - T: Async, - { - Box::pin(UnboundedReceiverStream::new(receiver)) - } - - fn new_channel_once() -> (Self::SenderOnce, Self::ReceiverOnce) - where - T: Async, - { - let (sender, receiver) = oneshot::channel(); - (sender, receiver) - } - - fn send_once(sender: Self::SenderOnce, value: T) -> Result<(), Self::Error> - where - T: Async, - { - sender.send(value).map_err(|_| TokioError::channel_closed()) - } - - async fn receive_once(receiver: Self::ReceiverOnce) -> Result - where - T: Async, - { - receiver.await.map_err(|_| TokioError::channel_closed()) - } -} - -impl TaskHandle for TokioTaskHandle { - fn abort(self: Box) { - self.0.abort(); - } - - fn into_future(self: Box) -> Pin + Send + 'static>> { - Box::pin(async move { - let _ = self.0.await; - }) - } -} diff --git a/crates/relayer-runtime/src/tokio/logger/mod.rs b/crates/relayer-runtime/src/tokio/logger/mod.rs deleted file mode 100644 index e48396b95a..0000000000 --- a/crates/relayer-runtime/src/tokio/logger/mod.rs +++ /dev/null @@ -1,4 +0,0 @@ -pub mod level; -pub mod log; -pub mod tracing; -pub mod value; diff --git a/crates/relayer-runtime/src/tokio/mod.rs b/crates/relayer-runtime/src/tokio/mod.rs deleted file mode 100644 index 892ec026b8..0000000000 --- a/crates/relayer-runtime/src/tokio/mod.rs +++ /dev/null @@ -1,3 +0,0 @@ -pub mod context; -pub mod error; -pub mod logger; diff --git a/crates/relayer-runtime/src/tokio/error.rs b/crates/relayer-runtime/src/types/error.rs similarity index 100% rename from crates/relayer-runtime/src/tokio/error.rs rename to crates/relayer-runtime/src/types/error.rs diff --git a/crates/relayer-runtime/src/tokio/logger/log.rs b/crates/relayer-runtime/src/types/log/entries.rs similarity index 75% rename from crates/relayer-runtime/src/tokio/logger/log.rs rename to crates/relayer-runtime/src/types/log/entries.rs index 762d3650a3..7e5ed547e8 100644 --- a/crates/relayer-runtime/src/tokio/logger/log.rs +++ b/crates/relayer-runtime/src/types/log/entries.rs @@ -1,14 +1,14 @@ use core::cell::RefCell; use core::fmt::{self, Display}; -use crate::tokio::logger::value::LogValue; +use crate::types::log::value::LogValue; #[derive(Default)] -pub struct Log<'a> { +pub struct LogEntries<'a> { pub fields: RefCell)>>, } -impl<'a> Display for Log<'a> { +impl<'a> Display for LogEntries<'a> { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { f.debug_map() .entries(self.fields.borrow().iter().map(|&(k, ref v)| (k, v))) diff --git a/crates/relayer-runtime/src/tokio/logger/level.rs b/crates/relayer-runtime/src/types/log/level.rs similarity index 100% rename from crates/relayer-runtime/src/tokio/logger/level.rs rename to crates/relayer-runtime/src/types/log/level.rs diff --git a/crates/relayer-runtime/src/types/log/logger.rs b/crates/relayer-runtime/src/types/log/logger.rs new file mode 100644 index 0000000000..f273ce82d5 --- /dev/null +++ b/crates/relayer-runtime/src/types/log/logger.rs @@ -0,0 +1 @@ +pub struct TracingLogger; diff --git a/crates/relayer-runtime/src/types/log/mod.rs b/crates/relayer-runtime/src/types/log/mod.rs new file mode 100644 index 0000000000..99d681c4d4 --- /dev/null +++ b/crates/relayer-runtime/src/types/log/mod.rs @@ -0,0 +1,4 @@ +pub mod entries; +pub mod level; +pub mod logger; +pub mod value; diff --git a/crates/relayer-runtime/src/tokio/logger/value.rs b/crates/relayer-runtime/src/types/log/value.rs similarity index 100% rename from crates/relayer-runtime/src/tokio/logger/value.rs rename to crates/relayer-runtime/src/types/log/value.rs diff --git a/crates/relayer-runtime/src/types/mod.rs b/crates/relayer-runtime/src/types/mod.rs new file mode 100644 index 0000000000..7ba4bf4763 --- /dev/null +++ b/crates/relayer-runtime/src/types/mod.rs @@ -0,0 +1,4 @@ +pub mod error; +pub mod log; +pub mod runtime; +pub mod task; diff --git a/crates/relayer-runtime/src/types/runtime.rs b/crates/relayer-runtime/src/types/runtime.rs new file mode 100644 index 0000000000..28bbe04a2c --- /dev/null +++ b/crates/relayer-runtime/src/types/runtime.rs @@ -0,0 +1,13 @@ +use alloc::sync::Arc; +use tokio::runtime::Runtime; + +#[derive(Clone)] +pub struct TokioRuntimeContext { + pub runtime: Arc, +} + +impl TokioRuntimeContext { + pub fn new(runtime: Arc) -> Self { + Self { runtime } + } +} diff --git a/crates/relayer-runtime/src/types/task.rs b/crates/relayer-runtime/src/types/task.rs new file mode 100644 index 0000000000..c7c23e3032 --- /dev/null +++ b/crates/relayer-runtime/src/types/task.rs @@ -0,0 +1,3 @@ +use tokio::task::JoinHandle; + +pub struct TokioTaskHandle(pub JoinHandle<()>); diff --git a/tools/test-framework/src/framework/binary/next.rs b/tools/test-framework/src/framework/binary/next.rs index 9b18706c61..68d8a52883 100644 --- a/tools/test-framework/src/framework/binary/next.rs +++ b/tools/test-framework/src/framework/binary/next.rs @@ -282,7 +282,7 @@ impl CanSpawnRelayer for TestContextV2 let runtime = self.relayer.birelay.runtime(); let birelay = self.relayer.clone(); - let handle = runtime.runtime.runtime.spawn(async move { + let handle = runtime.runtime.spawn(async move { let _ = birelay.auto_relay().await; });