From 79a3214c8e5b8b929cd603cdb2f232a340101611 Mon Sep 17 00:00:00 2001 From: Romain Ruetschi Date: Tue, 3 Sep 2024 14:42:18 +0200 Subject: [PATCH 1/5] feat(floodsub): Add `max_message_len` option to `FloodsubConfig` for configuring the maximum message length --- protocols/floodsub/src/layer.rs | 17 +++++++++++++++-- protocols/floodsub/src/lib.rs | 18 ++++++++++++++++++ protocols/floodsub/src/protocol.rs | 28 +++++++++++++++++++++------- 3 files changed, 54 insertions(+), 9 deletions(-) diff --git a/protocols/floodsub/src/layer.rs b/protocols/floodsub/src/layer.rs index 1a70d2213b2..50d2476fcdb 100644 --- a/protocols/floodsub/src/layer.rs +++ b/protocols/floodsub/src/layer.rs @@ -35,6 +35,7 @@ use libp2p_swarm::{ dial_opts::DialOpts, CloseConnection, ConnectionDenied, ConnectionId, NetworkBehaviour, NotifyHandler, OneShotHandler, THandler, THandlerInEvent, THandlerOutEvent, ToSwarm, }; +use libp2p_swarm::{OneShotHandlerConfig, SubstreamProtocol}; use smallvec::SmallVec; use std::collections::hash_map::{DefaultHasher, HashMap}; use std::task::{Context, Poll}; @@ -97,6 +98,7 @@ impl Floodsub { topic, action: FloodsubSubscriptionAction::Subscribe, }], + max_message_len: self.config.max_message_len, }, }); } @@ -133,6 +135,7 @@ impl Floodsub { topic: topic.clone(), action: FloodsubSubscriptionAction::Subscribe, }], + max_message_len: self.config.max_message_len, }, }); } @@ -163,6 +166,7 @@ impl Floodsub { topic: topic.clone(), action: FloodsubSubscriptionAction::Unsubscribe, }], + max_message_len: self.config.max_message_len, }, }); } @@ -263,6 +267,7 @@ impl Floodsub { event: FloodsubRpc { subscriptions: Vec::new(), messages: vec![message.clone()], + max_message_len: self.config.max_message_len, }, }); } @@ -293,6 +298,7 @@ impl Floodsub { topic, action: FloodsubSubscriptionAction::Subscribe, }], + max_message_len: self.config.max_message_len, }, }); } @@ -338,7 +344,10 @@ impl NetworkBehaviour for Floodsub { _: &Multiaddr, _: &Multiaddr, ) -> Result, ConnectionDenied> { - Ok(Default::default()) + Ok(OneShotHandler::new( + SubstreamProtocol::new(FloodsubProtocol::new(self.config.max_message_len), ()), + OneShotHandlerConfig::default(), + )) } fn handle_established_outbound_connection( @@ -349,7 +358,10 @@ impl NetworkBehaviour for Floodsub { _: Endpoint, _: PortUse, ) -> Result, ConnectionDenied> { - Ok(Default::default()) + Ok(OneShotHandler::new( + SubstreamProtocol::new(FloodsubProtocol::new(self.config.max_message_len), ()), + OneShotHandlerConfig::default(), + )) } fn on_connection_handler_event( @@ -460,6 +472,7 @@ impl NetworkBehaviour for Floodsub { FloodsubRpc { subscriptions: Vec::new(), messages: vec![message.clone()], + max_message_len: self.config.max_message_len, }, )); } diff --git a/protocols/floodsub/src/lib.rs b/protocols/floodsub/src/lib.rs index 94766d5fdca..2f7308be464 100644 --- a/protocols/floodsub/src/lib.rs +++ b/protocols/floodsub/src/lib.rs @@ -23,6 +23,7 @@ #![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))] use libp2p_identity::PeerId; +use protocol::DEFAULT_MAX_MESSAGE_LEN_BYTES; pub mod protocol; @@ -48,6 +49,9 @@ pub struct FloodsubConfig { /// `true` if messages published by local node should be propagated as messages received from /// the network, `false` by default. pub subscribe_local_messages: bool, + + /// Maximum message length in bytes. Defaults to 2KiB. + pub max_message_len: usize, } impl FloodsubConfig { @@ -55,6 +59,20 @@ impl FloodsubConfig { Self { local_peer_id, subscribe_local_messages: false, + max_message_len: DEFAULT_MAX_MESSAGE_LEN_BYTES, } } + + /// Set whether or not messages published by local node should be + /// propagated as messages received from the network. + pub fn with_subscribe_local_messages(mut self, subscribe_local_messages: bool) -> Self { + self.subscribe_local_messages = subscribe_local_messages; + self + } + + /// Set the maximum message length in bytes. + pub fn with_max_message_len(mut self, max_message_len: usize) -> Self { + self.max_message_len = max_message_len; + self + } } diff --git a/protocols/floodsub/src/protocol.rs b/protocols/floodsub/src/protocol.rs index edc842be8ce..cc5c2a6817a 100644 --- a/protocols/floodsub/src/protocol.rs +++ b/protocols/floodsub/src/protocol.rs @@ -32,18 +32,29 @@ use libp2p_identity::PeerId; use libp2p_swarm::StreamProtocol; use std::{io, iter, pin::Pin}; -const MAX_MESSAGE_LEN_BYTES: usize = 2048; +pub(crate) const DEFAULT_MAX_MESSAGE_LEN_BYTES: usize = 2048; const PROTOCOL_NAME: StreamProtocol = StreamProtocol::new("/floodsub/1.0.0"); /// Implementation of `ConnectionUpgrade` for the floodsub protocol. -#[derive(Debug, Clone, Default)] -pub struct FloodsubProtocol {} +#[derive(Debug, Clone)] +pub struct FloodsubProtocol { + /// Maximum message length in bytes. + pub max_message_len: usize, +} impl FloodsubProtocol { /// Builds a new `FloodsubProtocol`. - pub fn new() -> FloodsubProtocol { - FloodsubProtocol {} + pub fn new(max_message_len: usize) -> FloodsubProtocol { + Self { max_message_len } + } +} + +impl Default for FloodsubProtocol { + fn default() -> Self { + Self { + max_message_len: DEFAULT_MAX_MESSAGE_LEN_BYTES, + } } } @@ -68,7 +79,7 @@ where Box::pin(async move { let mut framed = Framed::new( socket, - quick_protobuf_codec::Codec::::new(MAX_MESSAGE_LEN_BYTES), + quick_protobuf_codec::Codec::::new(self.max_message_len), ); let rpc = framed @@ -102,6 +113,7 @@ where topic: Topic::new(sub.topic_id.unwrap_or_default()), }) .collect(), + max_message_len: self.max_message_len, }) }) } @@ -132,6 +144,8 @@ pub struct FloodsubRpc { pub messages: Vec, /// List of subscriptions. pub subscriptions: Vec, + /// Maximum message length in bytes. + pub max_message_len: usize, } impl UpgradeInfo for FloodsubRpc { @@ -155,7 +169,7 @@ where Box::pin(async move { let mut framed = Framed::new( socket, - quick_protobuf_codec::Codec::::new(MAX_MESSAGE_LEN_BYTES), + quick_protobuf_codec::Codec::::new(self.max_message_len), ); framed.send(self.into_rpc()).await?; framed.close().await?; From 8ac9a4c1f96549a86820d701e875a2d1e30f2669 Mon Sep 17 00:00:00 2001 From: Romain Ruetschi Date: Wed, 4 Sep 2024 15:45:43 +0200 Subject: [PATCH 2/5] Add changelog entry --- protocols/floodsub/CHANGELOG.md | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/protocols/floodsub/CHANGELOG.md b/protocols/floodsub/CHANGELOG.md index 4192e0ea58d..58eecd0502c 100644 --- a/protocols/floodsub/CHANGELOG.md +++ b/protocols/floodsub/CHANGELOG.md @@ -1,3 +1,8 @@ +## 0.46.0 + +- Add a `max_message_len` option to `FloodsubConfig` for configuring the maximum message length. + See [PR 5588](https://github.com/libp2p/rust-libp2p/pull/5588) + ## 0.45.0 From 3c11fa0d984fce1666db3d13c5e16db5f3dda7e1 Mon Sep 17 00:00:00 2001 From: Romain Ruetschi Date: Wed, 4 Sep 2024 15:49:47 +0200 Subject: [PATCH 3/5] Bump `libp2p-floodsub` version to v0.46.0 --- Cargo.lock | 2 +- Cargo.toml | 2 +- protocols/floodsub/Cargo.toml | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d0e45e9bb1a..1537bd73b6f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2818,7 +2818,7 @@ dependencies = [ [[package]] name = "libp2p-floodsub" -version = "0.45.0" +version = "0.46.0" dependencies = [ "asynchronous-codec", "bytes", diff --git a/Cargo.toml b/Cargo.toml index 8d63ac3ee1e..45a51e98086 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -82,7 +82,7 @@ libp2p-connection-limits = { version = "0.4.0", path = "misc/connection-limits" libp2p-core = { version = "0.42.0", path = "core" } libp2p-dcutr = { version = "0.12.0", path = "protocols/dcutr" } libp2p-dns = { version = "0.42.0", path = "transports/dns" } -libp2p-floodsub = { version = "0.45.0", path = "protocols/floodsub" } +libp2p-floodsub = { version = "0.46.0", path = "protocols/floodsub" } libp2p-gossipsub = { version = "0.47.0", path = "protocols/gossipsub" } libp2p-identify = { version = "0.45.0", path = "protocols/identify" } libp2p-identity = { version = "0.2.9" } diff --git a/protocols/floodsub/Cargo.toml b/protocols/floodsub/Cargo.toml index 18d77e99e9c..f61811ddb03 100644 --- a/protocols/floodsub/Cargo.toml +++ b/protocols/floodsub/Cargo.toml @@ -3,7 +3,7 @@ name = "libp2p-floodsub" edition = "2021" rust-version = { workspace = true } description = "Floodsub protocol for libp2p" -version = "0.45.0" +version = "0.46.0" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" From 3bc76e1a9298c6ddb1094d4fd188d9088c845a89 Mon Sep 17 00:00:00 2001 From: Romain Ruetschi Date: Fri, 6 Sep 2024 15:57:08 +0200 Subject: [PATCH 4/5] Take `max_message_len` from the config instead of as a parameter to `FloodsubProtocol::new` --- protocols/floodsub/src/layer.rs | 5 +++-- protocols/floodsub/src/protocol.rs | 15 +++++++++++---- 2 files changed, 14 insertions(+), 6 deletions(-) diff --git a/protocols/floodsub/src/layer.rs b/protocols/floodsub/src/layer.rs index 50d2476fcdb..56be2744e94 100644 --- a/protocols/floodsub/src/layer.rs +++ b/protocols/floodsub/src/layer.rs @@ -46,6 +46,7 @@ pub struct Floodsub { /// Events that need to be yielded to the outside when polling. events: VecDeque>, + /// The floodsub configuration config: FloodsubConfig, /// List of peers to send messages to. @@ -345,7 +346,7 @@ impl NetworkBehaviour for Floodsub { _: &Multiaddr, ) -> Result, ConnectionDenied> { Ok(OneShotHandler::new( - SubstreamProtocol::new(FloodsubProtocol::new(self.config.max_message_len), ()), + SubstreamProtocol::new(FloodsubProtocol::from_config(&self.config), ()), OneShotHandlerConfig::default(), )) } @@ -359,7 +360,7 @@ impl NetworkBehaviour for Floodsub { _: PortUse, ) -> Result, ConnectionDenied> { Ok(OneShotHandler::new( - SubstreamProtocol::new(FloodsubProtocol::new(self.config.max_message_len), ()), + SubstreamProtocol::new(FloodsubProtocol::from_config(&self.config), ()), OneShotHandlerConfig::default(), )) } diff --git a/protocols/floodsub/src/protocol.rs b/protocols/floodsub/src/protocol.rs index cc5c2a6817a..4742f7f664b 100644 --- a/protocols/floodsub/src/protocol.rs +++ b/protocols/floodsub/src/protocol.rs @@ -18,8 +18,8 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use crate::proto; use crate::topic::Topic; +use crate::{proto, FloodsubConfig}; use asynchronous_codec::Framed; use bytes::Bytes; use futures::{ @@ -44,9 +44,16 @@ pub struct FloodsubProtocol { } impl FloodsubProtocol { - /// Builds a new `FloodsubProtocol`. - pub fn new(max_message_len: usize) -> FloodsubProtocol { - Self { max_message_len } + /// Builds a new `FloodsubProtocol` with default parameters. + pub fn new() -> Self { + Self::default() + } + + /// Builds a new `FloodsubProtocol` taking parameters from the given configuration. + pub fn from_config(config: &FloodsubConfig) -> Self { + Self { + max_message_len: config.max_message_len, + } } } From e7b335daad9b77f2e11c59b49c8af030d4716e4a Mon Sep 17 00:00:00 2001 From: Romain Ruetschi Date: Sat, 7 Sep 2024 11:14:41 +0200 Subject: [PATCH 5/5] Remove `from_config` and add `with_max_message_len` to `FloodsubProtocol` --- protocols/floodsub/src/layer.rs | 10 ++++++++-- protocols/floodsub/src/protocol.rs | 11 +++++------ 2 files changed, 13 insertions(+), 8 deletions(-) diff --git a/protocols/floodsub/src/layer.rs b/protocols/floodsub/src/layer.rs index 56be2744e94..ad78c3e18b2 100644 --- a/protocols/floodsub/src/layer.rs +++ b/protocols/floodsub/src/layer.rs @@ -346,7 +346,10 @@ impl NetworkBehaviour for Floodsub { _: &Multiaddr, ) -> Result, ConnectionDenied> { Ok(OneShotHandler::new( - SubstreamProtocol::new(FloodsubProtocol::from_config(&self.config), ()), + SubstreamProtocol::new( + FloodsubProtocol::new().with_max_message_len(self.config.max_message_len), + (), + ), OneShotHandlerConfig::default(), )) } @@ -360,7 +363,10 @@ impl NetworkBehaviour for Floodsub { _: PortUse, ) -> Result, ConnectionDenied> { Ok(OneShotHandler::new( - SubstreamProtocol::new(FloodsubProtocol::from_config(&self.config), ()), + SubstreamProtocol::new( + FloodsubProtocol::new().with_max_message_len(self.config.max_message_len), + (), + ), OneShotHandlerConfig::default(), )) } diff --git a/protocols/floodsub/src/protocol.rs b/protocols/floodsub/src/protocol.rs index 4742f7f664b..6d842f20d77 100644 --- a/protocols/floodsub/src/protocol.rs +++ b/protocols/floodsub/src/protocol.rs @@ -18,8 +18,8 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. +use crate::proto; use crate::topic::Topic; -use crate::{proto, FloodsubConfig}; use asynchronous_codec::Framed; use bytes::Bytes; use futures::{ @@ -49,11 +49,10 @@ impl FloodsubProtocol { Self::default() } - /// Builds a new `FloodsubProtocol` taking parameters from the given configuration. - pub fn from_config(config: &FloodsubConfig) -> Self { - Self { - max_message_len: config.max_message_len, - } + /// Set the maximum message length. + pub fn with_max_message_len(mut self, max_message_len: usize) -> Self { + self.max_message_len = max_message_len; + self } }