Skip to content

Commit

Permalink
Merge pull request #75 from embassy-rs/ref-improves
Browse files Browse the repository at this point in the history
Ref improves
  • Loading branch information
lulf authored Aug 8, 2024
2 parents 1c29e48 + a982c7a commit ecc98fc
Show file tree
Hide file tree
Showing 8 changed files with 96 additions and 78 deletions.
4 changes: 2 additions & 2 deletions examples/nrf-sdc/src/bin/ble_l2cap_central.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,11 +134,11 @@ async fn main(spawner: Spawner) {
let conn = unwrap!(ble.connect(&config).await);
info!("Connected, creating l2cap channel");
const PAYLOAD_LEN: usize = 27;
let mut ch1 = unwrap!(L2capChannel::<PAYLOAD_LEN>::create(&ble, &conn, 0x2349, &Default::default(),).await);
let mut ch1 = unwrap!(L2capChannel::create(&ble, &conn, 0x2349, &Default::default(),).await);
info!("New l2cap channel created, sending some data!");
for i in 0..10 {
let tx = [i; PAYLOAD_LEN];
unwrap!(ch1.send(&ble, &tx).await);
unwrap!(ch1.send::<_, PAYLOAD_LEN>(&ble, &tx).await);
}
info!("Sent data, waiting for them to be sent back");
let mut rx = [0; PAYLOAD_LEN];
Expand Down
5 changes: 2 additions & 3 deletions examples/nrf-sdc/src/bin/ble_l2cap_peripheral.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,7 @@ async fn main(spawner: Spawner) {

info!("Connection established");

let mut ch1 =
unwrap!(L2capChannel::<PAYLOAD_LEN>::accept(&ble, &conn, &[0x2349], &Default::default(),).await);
let mut ch1 = unwrap!(L2capChannel::accept(&ble, &conn, &[0x2349], &Default::default(),).await);

info!("L2CAP channel accepted");

Expand All @@ -153,7 +152,7 @@ async fn main(spawner: Spawner) {
Timer::after(Duration::from_secs(1)).await;
for i in 0..10 {
let tx = [i; PAYLOAD_LEN];
unwrap!(ch1.send(&ble, &tx).await);
unwrap!(ch1.send::<_, PAYLOAD_LEN>(&ble, &tx).await);
}
info!("L2CAP data echoed");

Expand Down
95 changes: 58 additions & 37 deletions host/src/channel_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use embassy_sync::waitqueue::WakerRegistration;

use crate::cursor::WriteCursor;
use crate::host::BleHost;
use crate::l2cap::L2capChannel;
use crate::packet_pool::{AllocId, GlobalPacketPool, Packet};
use crate::pdu::Pdu;
use crate::types::l2cap::{
Expand Down Expand Up @@ -72,7 +73,7 @@ impl<'d> State<'d> {
fn print(&self, verbose: bool) {
for (idx, storage) in self.channels.iter().enumerate() {
if verbose || storage.state != ChannelState::Disconnected {
debug!("[l2cap][idx = {}] state = {:?}", idx, storage);
debug!("[l2cap][idx = {}] {:?}", idx, storage);
}
}
}
Expand All @@ -85,6 +86,11 @@ impl<'d> State<'d> {
self.next_req_id = self.next_req_id.wrapping_add(1);
next
}

fn inc_ref(&mut self, index: ChannelIndex) {
let state = &mut self.channels[index.0 as usize];
state.refcount = unwrap!(state.refcount.checked_add(1), "Too many references to the same channel");
}
}

impl<'d, const RXQ: usize> ChannelManager<'d, RXQ> {
Expand Down Expand Up @@ -158,9 +164,9 @@ impl<'d, const RXQ: usize> ChannelManager<'d, RXQ> {
credit_flow: CreditFlowPolicy,
initial_credits: Option<u16>,
ble: &BleHost<'_, T>,
) -> Result<ChannelIndex, BleHostError<T::Error>> {
) -> Result<L2capChannel<'_>, BleHostError<T::Error>> {
// Wait until we find a channel for our connection in the connecting state matching our PSM.
let (idx, req_id, mps, mtu, cid, credits) = poll_fn(|cx| {
let (channel, req_id, mps, mtu, cid, credits) = poll_fn(|cx| {
let mut state = self.state.borrow_mut();
for (idx, chan) in state.channels.iter_mut().enumerate() {
match chan.state {
Expand All @@ -173,15 +179,15 @@ impl<'d, const RXQ: usize> ChannelManager<'d, RXQ> {
initial_credits.unwrap_or(self.pool.min_available(AllocId::from_channel(chan.cid)) as u16),
);
chan.state = ChannelState::Connected;

return Poll::Ready((
ChannelIndex(idx as u8),
req_id,
chan.mps,
chan.mtu,
chan.cid,
chan.flow_control.available(),
));
let mps = chan.mps;
let mtu = chan.mtu;
let cid = chan.cid;
let available = chan.flow_control.available();
assert_eq!(chan.refcount, 0);
let index = ChannelIndex(idx as u8);

state.inc_ref(index);
return Poll::Ready((L2capChannel::new(index, self), req_id, mps, mtu, cid, available));
}
_ => {}
}
Expand All @@ -206,11 +212,7 @@ impl<'d, const RXQ: usize> ChannelManager<'d, RXQ> {
&mut tx[..],
)
.await?;

self.with_mut(|state| {
state.channels[idx.0 as usize].refcount = 1;
});
Ok(idx)
Ok(channel)
}

pub(crate) async fn create<T: Controller>(
Expand All @@ -221,7 +223,7 @@ impl<'d, const RXQ: usize> ChannelManager<'d, RXQ> {
credit_flow: CreditFlowPolicy,
initial_credits: Option<u16>,
ble: &BleHost<'_, T>,
) -> Result<ChannelIndex, BleHostError<T::Error>> {
) -> Result<L2capChannel<'_>, BleHostError<T::Error>> {
let req_id = self.next_request_id();
let mut credits = 0;
let mut cid: u16 = 0;
Expand Down Expand Up @@ -257,18 +259,18 @@ impl<'d, const RXQ: usize> ChannelManager<'d, RXQ> {
let storage = &mut state.channels[idx.0 as usize];
match storage.state {
ChannelState::Disconnecting | ChannelState::PeerDisconnecting => {
return Poll::Ready(Err(Error::Disconnected));
return Poll::Ready(Err(Error::Disconnected.into()));
}
ChannelState::Connected => {
storage.refcount = 1;
return Poll::Ready(Ok(()));
assert_eq!(storage.refcount, 0);
state.inc_ref(idx);
return Poll::Ready(Ok(L2capChannel::new(idx, self)));
}
_ => {}
}
Poll::Pending
})
.await?;
Ok(idx)
.await
}

/// Dispatch an incoming L2CAP packet to the appropriate channel.
Expand All @@ -290,7 +292,7 @@ impl<'d, const RXQ: usize> ChannelManager<'d, RXQ> {
trace!("[l2cap][cid = {}] no credits available", header.channel);
return Err(Error::OutOfMemory);
}
storage.flow_control.received(1);
storage.flow_control.confirm_received(1);
}
_ => {}
}
Expand Down Expand Up @@ -590,7 +592,7 @@ impl<'d, const RXQ: usize> ChannelManager<'d, RXQ> {
if chan.state == ChannelState::Connected {
return Ok((chan.conn.unwrap(), chan.cid, chan.flow_control.process()));
}
trace!("[l2cap][flow_control] channel {:?} not found", index);
trace!("[l2cap][flow_control_process] channel {:?} not found", index);
Err(Error::NotFound)
})?;

Expand All @@ -601,6 +603,15 @@ impl<'d, const RXQ: usize> ChannelManager<'d, RXQ> {
// Reuse packet buffer for signalling data to save the extra TX buffer
let mut hci = ble.acl(conn, 1).await?;
hci.signal(identifier, &signal, packet.as_mut()).await?;
self.with_mut(|state| {
let chan = &mut state.channels[index.0 as usize];
if chan.state == ChannelState::Connected {
chan.flow_control.confirm_granted(credits);
return Ok(());
}
trace!("[l2cap][flow_control_grant] channel {:?} not found", index);
Err(Error::NotFound)
})?;
}
Ok(())
}
Expand Down Expand Up @@ -659,8 +670,7 @@ impl<'d, const RXQ: usize> ChannelManager<'d, RXQ> {

pub(crate) fn inc_ref(&self, index: ChannelIndex) {
self.with_mut(|state| {
let state = &mut state.channels[index.0 as usize];
state.refcount = unwrap!(state.refcount.checked_add(1), "too many references to the same channel");
state.inc_ref(index);
});
}

Expand Down Expand Up @@ -746,6 +756,8 @@ pub(crate) trait DynamicChannelManager {
fn inc_ref(&self, index: ChannelIndex);
fn dec_ref(&self, index: ChannelIndex);
fn disconnect(&self, index: ChannelIndex);
#[cfg(feature = "defmt")]
fn print(&self, index: ChannelIndex, f: defmt::Formatter);
}

impl<'d, const RXQ: usize> DynamicChannelManager for ChannelManager<'d, RXQ> {
Expand All @@ -758,6 +770,14 @@ impl<'d, const RXQ: usize> DynamicChannelManager for ChannelManager<'d, RXQ> {
fn disconnect(&self, index: ChannelIndex) {
ChannelManager::disconnect(self, index)
}
#[cfg(feature = "defmt")]
fn print(&self, index: ChannelIndex, f: defmt::Formatter) {
use defmt::Format;
self.with_mut(|state| {
let chan = &mut state.channels[index.0 as usize];
chan.format(f);
})
}
}

#[derive(Debug)]
Expand All @@ -781,7 +801,7 @@ impl defmt::Format for ChannelStorage {
fn format(&self, f: defmt::Formatter<'_>) {
defmt::write!(
f,
"state = {}, conn = {}, cid = {}, peer = {}, mps = {}, mtu = {}, our credits {}, their credits = {}, refs = {}",
"state = {}, c = {}, cid = {}, peer = {}, mps = {}, mtu = {}, cred out {}, cred in = {}, ref = {}",
self.state,
self.conn,
self.cid,
Expand Down Expand Up @@ -871,29 +891,30 @@ impl CreditFlowControl {
self.credits
}

fn received(&mut self, n: u16) {
fn confirm_received(&mut self, n: u16) {
self.credits = self.credits.saturating_sub(n);
self.received = self.received.saturating_add(n);
}

// Confirm that we've granted amount credits
fn confirm_granted(&mut self, amount: u16) {
self.received = self.received.saturating_sub(amount);
self.credits = self.credits.saturating_add(amount);
}

// Check if policy says we should grant more credits
fn process(&mut self) -> Option<u16> {
match self.policy {
CreditFlowPolicy::Every(count) => {
if self.received >= count {
let amount = self.received;
self.received = 0;
self.credits += amount;
Some(amount)
Some(self.received)
} else {
None
}
}
CreditFlowPolicy::MinThreshold(threshold) => {
if self.credits < threshold {
let amount = self.received;
self.received = 0;
self.credits += amount;
Some(amount)
Some(self.received)
} else {
None
}
Expand Down
5 changes: 1 addition & 4 deletions host/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,7 @@ pub struct Connection<'d> {
impl<'d> Clone for Connection<'d> {
fn clone(&self) -> Self {
self.manager.inc_ref(self.index);
Self {
index: self.index,
manager: self.manager,
}
Connection::new(self.index, self.manager)
}
}

Expand Down
9 changes: 6 additions & 3 deletions host/src/connection_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,24 +197,27 @@ impl<'d> ConnectionManager<'d> {
for peer in peers.iter() {
if storage.peer_addr_kind.unwrap() == peer.0 && &storage.peer_addr.unwrap() == peer.1 {
storage.state = ConnectionState::Connected;
storage.refcount = 1;
trace!(
"[link][poll_accept] connection handle {:?} in role {:?} accepted",
handle,
role
);
assert_eq!(storage.refcount, 0);
state.inc_ref(idx as u8);
return Poll::Ready(Connection::new(idx as u8, self));
}
}
} else {
storage.state = ConnectionState::Connected;
storage.refcount = 1;
assert_eq!(storage.refcount, 0);
trace!(
"[link][poll_accept] connection handle {:?} in role {:?} accepted",
handle,
role
);

assert_eq!(storage.refcount, 0);
state.inc_ref(idx as u8);
return Poll::Ready(Connection::new(idx as u8, self));
}
}
Expand Down Expand Up @@ -433,7 +436,7 @@ impl defmt::Format for ConnectionStorage {
fn format(&self, f: defmt::Formatter<'_>) {
defmt::write!(
f,
"state = {}, conn = {}, credits = {}, role = {:?}, peer = {:02x}, refs = {}",
"state = {}, conn = {}, flow = {}, role = {}, peer = {:02x}, ref = {}",
self.state,
self.handle,
self.link_credits,
Expand Down
1 change: 0 additions & 1 deletion host/src/host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -772,7 +772,6 @@ where
) -> bool {
match status.to_result() {
Ok(_) => {
trace!("[host] connected event on handle {:?}", handle);
if let Err(err) = self.connections.connect(handle, peer_addr_kind, peer_addr, role) {
warn!("Error establishing connection: {:?}", err);
return false;
Expand Down
Loading

0 comments on commit ecc98fc

Please sign in to comment.