diff --git a/ethers-providers/src/rpc/pubsub.rs b/ethers-providers/src/rpc/pubsub.rs index d9aed1694..9bce7321f 100644 --- a/ethers-providers/src/rpc/pubsub.rs +++ b/ethers-providers/src/rpc/pubsub.rs @@ -1,4 +1,4 @@ -use crate::{JsonRpcClient, Middleware, Provider}; +use crate::{JsonRpcClient, Provider}; use ethers_core::types::U256; use futures_util::stream::Stream; use pin_project::{pin_project, pinned_drop}; @@ -28,7 +28,8 @@ pub trait PubsubClient: JsonRpcClient { #[pin_project(PinnedDrop)] /// Streams data from an installed filter via `eth_subscribe` pub struct SubscriptionStream<'a, P: PubsubClient, R: DeserializeOwned> { - /// The subscription's installed id on the ethereum node + /// A client-side ID for the subscription. This may not be the same + /// as the server-side ID for this subscription on the ethereum node. pub id: U256, loaded_elements: VecDeque, @@ -61,7 +62,10 @@ where /// Unsubscribes from the subscription. pub async fn unsubscribe(&self) -> Result { - self.provider.unsubscribe(self.id).await + // Make sure to use PubSubClient unsubscribe() rather than Provider unsubscribe() + // Only the former handles mappings between client- and server-side subscription IDs + P::unsubscribe((*self.provider).as_ref(), self.id).map_err(Into::into)?; + Ok(true) } /// Set the loaded elements buffer. This buffer contains logs waiting for diff --git a/ethers-providers/src/rpc/transports/ws/types.rs b/ethers-providers/src/rpc/transports/ws/types.rs index 883380b4d..00119040f 100644 --- a/ethers-providers/src/rpc/transports/ws/types.rs +++ b/ethers-providers/src/rpc/transports/ws/types.rs @@ -11,12 +11,6 @@ pub type Response = Result, JsonRpcError>; #[derive(serde::Deserialize, serde::Serialize)] pub struct SubId(pub U256); -impl SubId { - pub(super) fn serialize_raw(&self) -> Result, serde_json::Error> { - to_raw_value(&self) - } -} - #[derive(Deserialize, Debug, Clone)] pub struct Notification { pub subscription: U256, diff --git a/ethers-providers/tests/it/provider.rs b/ethers-providers/tests/it/provider.rs index 68a6c0b28..e1155cf12 100644 --- a/ethers-providers/tests/it/provider.rs +++ b/ethers-providers/tests/it/provider.rs @@ -95,6 +95,38 @@ mod eth_tests { assert_eq!(&block, blocks.last().unwrap()); } + #[tokio::test] + #[cfg(feature = "ws")] + async fn unsubscribe_blocks_ws() { + let (provider, _anvil) = crate::spawn_anvil_ws().await; + generic_unsubscribe_blocks_test(provider).await; + } + + #[tokio::test] + #[cfg(feature = "ipc")] + async fn unsubscribe_blocks_ipc() { + let (provider, _anvil, _ipc) = crate::spawn_anvil_ipc().await; + generic_unsubscribe_blocks_test(provider).await; + } + + #[cfg(any(feature = "ws", feature = "ipc"))] + async fn generic_unsubscribe_blocks_test(provider: M) + where + M: Middleware, + M::Provider: ethers_providers::PubsubClient, + { + { + let stream = provider.subscribe_blocks().await.unwrap(); + stream.unsubscribe().await.unwrap(); + } + { + let _stream = provider.subscribe_blocks().await.unwrap(); + // stream will be unsubscribed automatically here on drop + } + // Sleep to give the unsubscription messages time to propagate + tokio::time::sleep(crate::Duration::from_millis(200)).await; + } + #[tokio::test] async fn send_tx_http() { let (provider, anvil) = spawn_anvil();