diff --git a/examples/nrf-sdc/src/bin/ble_l2cap_central.rs b/examples/nrf-sdc/src/bin/ble_l2cap_central.rs index c39ef2c..736e03a 100644 --- a/examples/nrf-sdc/src/bin/ble_l2cap_central.rs +++ b/examples/nrf-sdc/src/bin/ble_l2cap_central.rs @@ -134,11 +134,11 @@ async fn main(spawner: Spawner) { let conn = unwrap!(ble.connect(&config).await); info!("Connected, creating l2cap channel"); const PAYLOAD_LEN: usize = 27; - let mut ch1 = unwrap!(L2capChannel::::create(&ble, &conn, 0x2349, &Default::default(),).await); + let mut ch1 = unwrap!(L2capChannel::create(&ble, &conn, 0x2349, &Default::default(),).await); info!("New l2cap channel created, sending some data!"); for i in 0..10 { let tx = [i; PAYLOAD_LEN]; - unwrap!(ch1.send(&ble, &tx).await); + unwrap!(ch1.send::<_, PAYLOAD_LEN>(&ble, &tx).await); } info!("Sent data, waiting for them to be sent back"); let mut rx = [0; PAYLOAD_LEN]; diff --git a/examples/nrf-sdc/src/bin/ble_l2cap_peripheral.rs b/examples/nrf-sdc/src/bin/ble_l2cap_peripheral.rs index d007720..0850aed 100644 --- a/examples/nrf-sdc/src/bin/ble_l2cap_peripheral.rs +++ b/examples/nrf-sdc/src/bin/ble_l2cap_peripheral.rs @@ -135,8 +135,7 @@ async fn main(spawner: Spawner) { info!("Connection established"); - let mut ch1 = - unwrap!(L2capChannel::::accept(&ble, &conn, &[0x2349], &Default::default(),).await); + let mut ch1 = unwrap!(L2capChannel::accept(&ble, &conn, &[0x2349], &Default::default(),).await); info!("L2CAP channel accepted"); @@ -153,7 +152,7 @@ async fn main(spawner: Spawner) { Timer::after(Duration::from_secs(1)).await; for i in 0..10 { let tx = [i; PAYLOAD_LEN]; - unwrap!(ch1.send(&ble, &tx).await); + unwrap!(ch1.send::<_, PAYLOAD_LEN>(&ble, &tx).await); } info!("L2CAP data echoed"); diff --git a/host/src/channel_manager.rs b/host/src/channel_manager.rs index 4b93eec..e23900f 100644 --- a/host/src/channel_manager.rs +++ b/host/src/channel_manager.rs @@ -11,6 +11,7 @@ use embassy_sync::waitqueue::WakerRegistration; use crate::cursor::WriteCursor; use crate::host::BleHost; +use crate::l2cap::L2capChannel; use crate::packet_pool::{AllocId, GlobalPacketPool, Packet}; use crate::pdu::Pdu; use crate::types::l2cap::{ @@ -72,7 +73,7 @@ impl<'d> State<'d> { fn print(&self, verbose: bool) { for (idx, storage) in self.channels.iter().enumerate() { if verbose || storage.state != ChannelState::Disconnected { - debug!("[l2cap][idx = {}] state = {:?}", idx, storage); + debug!("[l2cap][idx = {}] {:?}", idx, storage); } } } @@ -85,6 +86,11 @@ impl<'d> State<'d> { self.next_req_id = self.next_req_id.wrapping_add(1); next } + + fn inc_ref(&mut self, index: ChannelIndex) { + let state = &mut self.channels[index.0 as usize]; + state.refcount = unwrap!(state.refcount.checked_add(1), "Too many references to the same channel"); + } } impl<'d, const RXQ: usize> ChannelManager<'d, RXQ> { @@ -158,9 +164,9 @@ impl<'d, const RXQ: usize> ChannelManager<'d, RXQ> { credit_flow: CreditFlowPolicy, initial_credits: Option, ble: &BleHost<'_, T>, - ) -> Result> { + ) -> Result, BleHostError> { // Wait until we find a channel for our connection in the connecting state matching our PSM. - let (idx, req_id, mps, mtu, cid, credits) = poll_fn(|cx| { + let (channel, req_id, mps, mtu, cid, credits) = poll_fn(|cx| { let mut state = self.state.borrow_mut(); for (idx, chan) in state.channels.iter_mut().enumerate() { match chan.state { @@ -173,15 +179,15 @@ impl<'d, const RXQ: usize> ChannelManager<'d, RXQ> { initial_credits.unwrap_or(self.pool.min_available(AllocId::from_channel(chan.cid)) as u16), ); chan.state = ChannelState::Connected; - - return Poll::Ready(( - ChannelIndex(idx as u8), - req_id, - chan.mps, - chan.mtu, - chan.cid, - chan.flow_control.available(), - )); + let mps = chan.mps; + let mtu = chan.mtu; + let cid = chan.cid; + let available = chan.flow_control.available(); + assert_eq!(chan.refcount, 0); + let index = ChannelIndex(idx as u8); + + state.inc_ref(index); + return Poll::Ready((L2capChannel::new(index, self), req_id, mps, mtu, cid, available)); } _ => {} } @@ -206,11 +212,7 @@ impl<'d, const RXQ: usize> ChannelManager<'d, RXQ> { &mut tx[..], ) .await?; - - self.with_mut(|state| { - state.channels[idx.0 as usize].refcount = 1; - }); - Ok(idx) + Ok(channel) } pub(crate) async fn create( @@ -221,7 +223,7 @@ impl<'d, const RXQ: usize> ChannelManager<'d, RXQ> { credit_flow: CreditFlowPolicy, initial_credits: Option, ble: &BleHost<'_, T>, - ) -> Result> { + ) -> Result, BleHostError> { let req_id = self.next_request_id(); let mut credits = 0; let mut cid: u16 = 0; @@ -257,18 +259,18 @@ impl<'d, const RXQ: usize> ChannelManager<'d, RXQ> { let storage = &mut state.channels[idx.0 as usize]; match storage.state { ChannelState::Disconnecting | ChannelState::PeerDisconnecting => { - return Poll::Ready(Err(Error::Disconnected)); + return Poll::Ready(Err(Error::Disconnected.into())); } ChannelState::Connected => { - storage.refcount = 1; - return Poll::Ready(Ok(())); + assert_eq!(storage.refcount, 0); + state.inc_ref(idx); + return Poll::Ready(Ok(L2capChannel::new(idx, self))); } _ => {} } Poll::Pending }) - .await?; - Ok(idx) + .await } /// Dispatch an incoming L2CAP packet to the appropriate channel. @@ -290,7 +292,7 @@ impl<'d, const RXQ: usize> ChannelManager<'d, RXQ> { trace!("[l2cap][cid = {}] no credits available", header.channel); return Err(Error::OutOfMemory); } - storage.flow_control.received(1); + storage.flow_control.confirm_received(1); } _ => {} } @@ -590,7 +592,7 @@ impl<'d, const RXQ: usize> ChannelManager<'d, RXQ> { if chan.state == ChannelState::Connected { return Ok((chan.conn.unwrap(), chan.cid, chan.flow_control.process())); } - trace!("[l2cap][flow_control] channel {:?} not found", index); + trace!("[l2cap][flow_control_process] channel {:?} not found", index); Err(Error::NotFound) })?; @@ -601,6 +603,15 @@ impl<'d, const RXQ: usize> ChannelManager<'d, RXQ> { // Reuse packet buffer for signalling data to save the extra TX buffer let mut hci = ble.acl(conn, 1).await?; hci.signal(identifier, &signal, packet.as_mut()).await?; + self.with_mut(|state| { + let chan = &mut state.channels[index.0 as usize]; + if chan.state == ChannelState::Connected { + chan.flow_control.confirm_granted(credits); + return Ok(()); + } + trace!("[l2cap][flow_control_grant] channel {:?} not found", index); + Err(Error::NotFound) + })?; } Ok(()) } @@ -659,8 +670,7 @@ impl<'d, const RXQ: usize> ChannelManager<'d, RXQ> { pub(crate) fn inc_ref(&self, index: ChannelIndex) { self.with_mut(|state| { - let state = &mut state.channels[index.0 as usize]; - state.refcount = unwrap!(state.refcount.checked_add(1), "too many references to the same channel"); + state.inc_ref(index); }); } @@ -746,6 +756,8 @@ pub(crate) trait DynamicChannelManager { fn inc_ref(&self, index: ChannelIndex); fn dec_ref(&self, index: ChannelIndex); fn disconnect(&self, index: ChannelIndex); + #[cfg(feature = "defmt")] + fn print(&self, index: ChannelIndex, f: defmt::Formatter); } impl<'d, const RXQ: usize> DynamicChannelManager for ChannelManager<'d, RXQ> { @@ -758,6 +770,14 @@ impl<'d, const RXQ: usize> DynamicChannelManager for ChannelManager<'d, RXQ> { fn disconnect(&self, index: ChannelIndex) { ChannelManager::disconnect(self, index) } + #[cfg(feature = "defmt")] + fn print(&self, index: ChannelIndex, f: defmt::Formatter) { + use defmt::Format; + self.with_mut(|state| { + let chan = &mut state.channels[index.0 as usize]; + chan.format(f); + }) + } } #[derive(Debug)] @@ -781,7 +801,7 @@ impl defmt::Format for ChannelStorage { fn format(&self, f: defmt::Formatter<'_>) { defmt::write!( f, - "state = {}, conn = {}, cid = {}, peer = {}, mps = {}, mtu = {}, our credits {}, their credits = {}, refs = {}", + "state = {}, c = {}, cid = {}, peer = {}, mps = {}, mtu = {}, cred out {}, cred in = {}, ref = {}", self.state, self.conn, self.cid, @@ -871,29 +891,30 @@ impl CreditFlowControl { self.credits } - fn received(&mut self, n: u16) { + fn confirm_received(&mut self, n: u16) { self.credits = self.credits.saturating_sub(n); self.received = self.received.saturating_add(n); } + // Confirm that we've granted amount credits + fn confirm_granted(&mut self, amount: u16) { + self.received = self.received.saturating_sub(amount); + self.credits = self.credits.saturating_add(amount); + } + + // Check if policy says we should grant more credits fn process(&mut self) -> Option { match self.policy { CreditFlowPolicy::Every(count) => { if self.received >= count { - let amount = self.received; - self.received = 0; - self.credits += amount; - Some(amount) + Some(self.received) } else { None } } CreditFlowPolicy::MinThreshold(threshold) => { if self.credits < threshold { - let amount = self.received; - self.received = 0; - self.credits += amount; - Some(amount) + Some(self.received) } else { None } diff --git a/host/src/connection.rs b/host/src/connection.rs index 7a57a8c..3ce2fc9 100644 --- a/host/src/connection.rs +++ b/host/src/connection.rs @@ -43,10 +43,7 @@ pub struct Connection<'d> { impl<'d> Clone for Connection<'d> { fn clone(&self) -> Self { self.manager.inc_ref(self.index); - Self { - index: self.index, - manager: self.manager, - } + Connection::new(self.index, self.manager) } } diff --git a/host/src/connection_manager.rs b/host/src/connection_manager.rs index 084659e..60d2e88 100644 --- a/host/src/connection_manager.rs +++ b/host/src/connection_manager.rs @@ -197,24 +197,27 @@ impl<'d> ConnectionManager<'d> { for peer in peers.iter() { if storage.peer_addr_kind.unwrap() == peer.0 && &storage.peer_addr.unwrap() == peer.1 { storage.state = ConnectionState::Connected; - storage.refcount = 1; trace!( "[link][poll_accept] connection handle {:?} in role {:?} accepted", handle, role ); + assert_eq!(storage.refcount, 0); + state.inc_ref(idx as u8); return Poll::Ready(Connection::new(idx as u8, self)); } } } else { storage.state = ConnectionState::Connected; - storage.refcount = 1; + assert_eq!(storage.refcount, 0); trace!( "[link][poll_accept] connection handle {:?} in role {:?} accepted", handle, role ); + assert_eq!(storage.refcount, 0); + state.inc_ref(idx as u8); return Poll::Ready(Connection::new(idx as u8, self)); } } @@ -433,7 +436,7 @@ impl defmt::Format for ConnectionStorage { fn format(&self, f: defmt::Formatter<'_>) { defmt::write!( f, - "state = {}, conn = {}, credits = {}, role = {:?}, peer = {:02x}, refs = {}", + "state = {}, conn = {}, flow = {}, role = {}, peer = {:02x}, ref = {}", self.state, self.handle, self.link_credits, diff --git a/host/src/host.rs b/host/src/host.rs index f39c269..0c4a98c 100644 --- a/host/src/host.rs +++ b/host/src/host.rs @@ -772,7 +772,6 @@ where ) -> bool { match status.to_result() { Ok(_) => { - trace!("[host] connected event on handle {:?}", handle); if let Err(err) = self.connections.connect(handle, peer_addr_kind, peer_addr, role) { warn!("Error establishing connection: {:?}", err); return false; diff --git a/host/src/l2cap.rs b/host/src/l2cap.rs index 350ba68..82cdd0a 100644 --- a/host/src/l2cap.rs +++ b/host/src/l2cap.rs @@ -10,22 +10,27 @@ use crate::BleHostError; pub(crate) mod sar; /// Handle representing an L2CAP channel. -pub struct L2capChannel<'d, const TX_MTU: usize> { +pub struct L2capChannel<'d> { index: ChannelIndex, manager: &'d dyn DynamicChannelManager, } -impl<'d, const TX_MTU: usize> Clone for L2capChannel<'d, TX_MTU> { +impl<'d> Clone for L2capChannel<'d> { fn clone(&self) -> Self { self.manager.inc_ref(self.index); - Self { - index: self.index, - manager: self.manager, - } + L2capChannel::new(self.index, self.manager) } } -impl<'d, const TX_MTU: usize> Drop for L2capChannel<'d, TX_MTU> { +#[cfg(feature = "defmt")] +impl defmt::Format for L2capChannel<'_> { + fn format(&self, f: defmt::Formatter<'_>) { + defmt::write!(f, "{}, ", self.index); + self.manager.print(self.index, f); + } +} + +impl<'d> Drop for L2capChannel<'d> { fn drop(&mut self) { self.manager.dec_ref(self.index); } @@ -51,7 +56,11 @@ impl Default for L2capChannelConfig { } } -impl<'d, const TX_MTU: usize> L2capChannel<'d, TX_MTU> { +impl<'d> L2capChannel<'d> { + pub(crate) fn new(index: ChannelIndex, manager: &'d dyn DynamicChannelManager) -> Self { + Self { index, manager } + } + /// Disconnect this channel. pub fn disconnect(&mut self) { self.manager.disconnect(self.index); @@ -63,7 +72,7 @@ impl<'d, const TX_MTU: usize> L2capChannel<'d, TX_MTU> { /// /// If the channel has been closed or the channel id is not valid, an error is returned. /// If there are no available credits to send, waits until more credits are available. - pub async fn send( + pub async fn send( &mut self, ble: &BleHost<'_, T>, buf: &[u8], @@ -78,7 +87,7 @@ impl<'d, const TX_MTU: usize> L2capChannel<'d, TX_MTU> { /// /// If the channel has been closed or the channel id is not valid, an error is returned. /// If there are no available credits to send, returns Error::Busy. - pub fn try_send( + pub fn try_send( &mut self, ble: &BleHost<'_, T>, buf: &[u8], @@ -106,14 +115,9 @@ impl<'d, const TX_MTU: usize> L2capChannel<'d, TX_MTU> { config: &L2capChannelConfig, ) -> Result> { let handle = connection.handle(); - let index = ble - .channels + ble.channels .accept(handle, psm, config.mtu, config.flow_policy, config.initial_credits, ble) - .await?; - Ok(Self { - index, - manager: &ble.channels, - }) + .await } /// Create a new connection request with the provided PSM. @@ -125,8 +129,7 @@ impl<'d, const TX_MTU: usize> L2capChannel<'d, TX_MTU> { ) -> Result> where { let handle = connection.handle(); - let index = ble - .channels + ble.channels .create( connection.handle(), psm, @@ -135,10 +138,6 @@ where { config.initial_credits, ble, ) - .await?; - Ok(Self { - index, - manager: &ble.channels, - }) + .await } } diff --git a/host/tests/l2cap.rs b/host/tests/l2cap.rs index 4955a89..dccf5a9 100644 --- a/host/tests/l2cap.rs +++ b/host/tests/l2cap.rs @@ -62,7 +62,7 @@ async fn l2cap_connection_oriented_channels() { let conn = acceptor.accept().await?; println!("[peripheral] connected"); - let mut ch1 = L2capChannel::::accept(&adapter, &conn, &[0x2349], &Default::default()).await?; + let mut ch1 = L2capChannel::accept(&adapter, &conn, &[0x2349], &Default::default()).await?; println!("[peripheral] channel created"); @@ -77,7 +77,7 @@ async fn l2cap_connection_oriented_channels() { for i in 0..10 { let tx = [i; PAYLOAD_LEN]; - ch1.send(&adapter, &tx).await?; + ch1.send::<_, MTU>(&adapter, &tx).await?; } println!("[peripheral] data sent"); break; @@ -115,11 +115,11 @@ async fn l2cap_connection_oriented_channels() { loop { let conn = adapter.connect(&config).await.unwrap(); println!("[central] connected"); - let mut ch1 = L2capChannel::::create(&adapter, &conn, 0x2349, &Default::default()).await?; + let mut ch1 = L2capChannel::create(&adapter, &conn, 0x2349, &Default::default()).await?; println!("[central] channel created"); for i in 0..10 { let tx = [i; PAYLOAD_LEN]; - ch1.send(&adapter, &tx).await?; + ch1.send::<_, MTU>(&adapter, &tx).await?; } println!("[central] data sent"); let mut rx = [0; PAYLOAD_LEN];