diff --git a/Cargo.lock b/Cargo.lock index 4c12e6fb984..6f18028ccb8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2817,7 +2817,7 @@ dependencies = [ [[package]] name = "libp2p-floodsub" -version = "0.45.0" +version = "0.46.0" dependencies = [ "asynchronous-codec", "bytes", diff --git a/Cargo.toml b/Cargo.toml index da8d32e1a4a..41031be7e41 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -82,7 +82,7 @@ libp2p-connection-limits = { version = "0.4.0", path = "misc/connection-limits" libp2p-core = { version = "0.42.0", path = "core" } libp2p-dcutr = { version = "0.12.0", path = "protocols/dcutr" } libp2p-dns = { version = "0.42.0", path = "transports/dns" } -libp2p-floodsub = { version = "0.45.0", path = "protocols/floodsub" } +libp2p-floodsub = { version = "0.46.0", path = "protocols/floodsub" } libp2p-gossipsub = { version = "0.47.1", path = "protocols/gossipsub" } libp2p-identify = { version = "0.45.1", path = "protocols/identify" } libp2p-identity = { version = "0.2.9" } diff --git a/protocols/floodsub/CHANGELOG.md b/protocols/floodsub/CHANGELOG.md index 4192e0ea58d..58eecd0502c 100644 --- a/protocols/floodsub/CHANGELOG.md +++ b/protocols/floodsub/CHANGELOG.md @@ -1,3 +1,8 @@ +## 0.46.0 + +- Add a `max_message_len` option to `FloodsubConfig` for configuring the maximum message length. + See [PR 5588](https://github.com/libp2p/rust-libp2p/pull/5588) + ## 0.45.0 diff --git a/protocols/floodsub/Cargo.toml b/protocols/floodsub/Cargo.toml index 18d77e99e9c..f61811ddb03 100644 --- a/protocols/floodsub/Cargo.toml +++ b/protocols/floodsub/Cargo.toml @@ -3,7 +3,7 @@ name = "libp2p-floodsub" edition = "2021" rust-version = { workspace = true } description = "Floodsub protocol for libp2p" -version = "0.45.0" +version = "0.46.0" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" diff --git a/protocols/floodsub/src/layer.rs b/protocols/floodsub/src/layer.rs index 1a70d2213b2..ad78c3e18b2 100644 --- a/protocols/floodsub/src/layer.rs +++ b/protocols/floodsub/src/layer.rs @@ -35,6 +35,7 @@ use libp2p_swarm::{ dial_opts::DialOpts, CloseConnection, ConnectionDenied, ConnectionId, NetworkBehaviour, NotifyHandler, OneShotHandler, THandler, THandlerInEvent, THandlerOutEvent, ToSwarm, }; +use libp2p_swarm::{OneShotHandlerConfig, SubstreamProtocol}; use smallvec::SmallVec; use std::collections::hash_map::{DefaultHasher, HashMap}; use std::task::{Context, Poll}; @@ -45,6 +46,7 @@ pub struct Floodsub { /// Events that need to be yielded to the outside when polling. events: VecDeque>, + /// The floodsub configuration config: FloodsubConfig, /// List of peers to send messages to. @@ -97,6 +99,7 @@ impl Floodsub { topic, action: FloodsubSubscriptionAction::Subscribe, }], + max_message_len: self.config.max_message_len, }, }); } @@ -133,6 +136,7 @@ impl Floodsub { topic: topic.clone(), action: FloodsubSubscriptionAction::Subscribe, }], + max_message_len: self.config.max_message_len, }, }); } @@ -163,6 +167,7 @@ impl Floodsub { topic: topic.clone(), action: FloodsubSubscriptionAction::Unsubscribe, }], + max_message_len: self.config.max_message_len, }, }); } @@ -263,6 +268,7 @@ impl Floodsub { event: FloodsubRpc { subscriptions: Vec::new(), messages: vec![message.clone()], + max_message_len: self.config.max_message_len, }, }); } @@ -293,6 +299,7 @@ impl Floodsub { topic, action: FloodsubSubscriptionAction::Subscribe, }], + max_message_len: self.config.max_message_len, }, }); } @@ -338,7 +345,13 @@ impl NetworkBehaviour for Floodsub { _: &Multiaddr, _: &Multiaddr, ) -> Result, ConnectionDenied> { - Ok(Default::default()) + Ok(OneShotHandler::new( + SubstreamProtocol::new( + FloodsubProtocol::new().with_max_message_len(self.config.max_message_len), + (), + ), + OneShotHandlerConfig::default(), + )) } fn handle_established_outbound_connection( @@ -349,7 +362,13 @@ impl NetworkBehaviour for Floodsub { _: Endpoint, _: PortUse, ) -> Result, ConnectionDenied> { - Ok(Default::default()) + Ok(OneShotHandler::new( + SubstreamProtocol::new( + FloodsubProtocol::new().with_max_message_len(self.config.max_message_len), + (), + ), + OneShotHandlerConfig::default(), + )) } fn on_connection_handler_event( @@ -460,6 +479,7 @@ impl NetworkBehaviour for Floodsub { FloodsubRpc { subscriptions: Vec::new(), messages: vec![message.clone()], + max_message_len: self.config.max_message_len, }, )); } diff --git a/protocols/floodsub/src/lib.rs b/protocols/floodsub/src/lib.rs index 94766d5fdca..2f7308be464 100644 --- a/protocols/floodsub/src/lib.rs +++ b/protocols/floodsub/src/lib.rs @@ -23,6 +23,7 @@ #![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))] use libp2p_identity::PeerId; +use protocol::DEFAULT_MAX_MESSAGE_LEN_BYTES; pub mod protocol; @@ -48,6 +49,9 @@ pub struct FloodsubConfig { /// `true` if messages published by local node should be propagated as messages received from /// the network, `false` by default. pub subscribe_local_messages: bool, + + /// Maximum message length in bytes. Defaults to 2KiB. + pub max_message_len: usize, } impl FloodsubConfig { @@ -55,6 +59,20 @@ impl FloodsubConfig { Self { local_peer_id, subscribe_local_messages: false, + max_message_len: DEFAULT_MAX_MESSAGE_LEN_BYTES, } } + + /// Set whether or not messages published by local node should be + /// propagated as messages received from the network. + pub fn with_subscribe_local_messages(mut self, subscribe_local_messages: bool) -> Self { + self.subscribe_local_messages = subscribe_local_messages; + self + } + + /// Set the maximum message length in bytes. + pub fn with_max_message_len(mut self, max_message_len: usize) -> Self { + self.max_message_len = max_message_len; + self + } } diff --git a/protocols/floodsub/src/protocol.rs b/protocols/floodsub/src/protocol.rs index edc842be8ce..6d842f20d77 100644 --- a/protocols/floodsub/src/protocol.rs +++ b/protocols/floodsub/src/protocol.rs @@ -32,18 +32,35 @@ use libp2p_identity::PeerId; use libp2p_swarm::StreamProtocol; use std::{io, iter, pin::Pin}; -const MAX_MESSAGE_LEN_BYTES: usize = 2048; +pub(crate) const DEFAULT_MAX_MESSAGE_LEN_BYTES: usize = 2048; const PROTOCOL_NAME: StreamProtocol = StreamProtocol::new("/floodsub/1.0.0"); /// Implementation of `ConnectionUpgrade` for the floodsub protocol. -#[derive(Debug, Clone, Default)] -pub struct FloodsubProtocol {} +#[derive(Debug, Clone)] +pub struct FloodsubProtocol { + /// Maximum message length in bytes. + pub max_message_len: usize, +} impl FloodsubProtocol { - /// Builds a new `FloodsubProtocol`. - pub fn new() -> FloodsubProtocol { - FloodsubProtocol {} + /// Builds a new `FloodsubProtocol` with default parameters. + pub fn new() -> Self { + Self::default() + } + + /// Set the maximum message length. + pub fn with_max_message_len(mut self, max_message_len: usize) -> Self { + self.max_message_len = max_message_len; + self + } +} + +impl Default for FloodsubProtocol { + fn default() -> Self { + Self { + max_message_len: DEFAULT_MAX_MESSAGE_LEN_BYTES, + } } } @@ -68,7 +85,7 @@ where Box::pin(async move { let mut framed = Framed::new( socket, - quick_protobuf_codec::Codec::::new(MAX_MESSAGE_LEN_BYTES), + quick_protobuf_codec::Codec::::new(self.max_message_len), ); let rpc = framed @@ -102,6 +119,7 @@ where topic: Topic::new(sub.topic_id.unwrap_or_default()), }) .collect(), + max_message_len: self.max_message_len, }) }) } @@ -132,6 +150,8 @@ pub struct FloodsubRpc { pub messages: Vec, /// List of subscriptions. pub subscriptions: Vec, + /// Maximum message length in bytes. + pub max_message_len: usize, } impl UpgradeInfo for FloodsubRpc { @@ -155,7 +175,7 @@ where Box::pin(async move { let mut framed = Framed::new( socket, - quick_protobuf_codec::Codec::::new(MAX_MESSAGE_LEN_BYTES), + quick_protobuf_codec::Codec::::new(self.max_message_len), ); framed.send(self.into_rpc()).await?; framed.close().await?;