Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add allow_multicast flags from mesh config to starcast integration #834

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .unreleased/LLT-5108
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add allow_multicast flags from meshnet config to Starcast implementation.
8 changes: 4 additions & 4 deletions crates/telio-starcast/src/nat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ pub enum Error {
pub trait Nat {
/// Translate incoming packet (from the transport socket to the multicast peer)
/// Change the source to the multicast peer's ip and natted port
fn translate_incoming(&mut self, packet: &mut [u8]) -> Result<(), Error>;
fn translate_incoming(&mut self, packet: &mut [u8]) -> Result<IpAddr, Error>;
/// Translate outgoing packet (from the multicast peer to the transport socket)
/// Change the destination to the peer's original ip and port
fn translate_outgoing(&mut self, packet: &mut [u8]) -> Result<IpAddr, Error>;
Expand Down Expand Up @@ -92,7 +92,7 @@ impl StarcastNat {
fn translate_incoming_internal<'a, P: MutableIpPacket<'a>>(
&mut self,
packet: &'a mut [u8],
) -> Result<(), Error> {
) -> Result<IpAddr, Error> {
let mut ip_packet = P::new(packet).ok_or(Error::PacketTooShort)?;
if ip_packet.get_next_level_protocol() != IpNextHeaderProtocols::Udp {
return Err(Error::UnexpectedTransportProtocol);
Expand Down Expand Up @@ -139,7 +139,7 @@ impl StarcastNat {
))
}

Ok(())
Ok(old_src_ip.into())
}

fn translate_outgoing_internal<'a, P: MutableIpPacket<'a>>(
Expand Down Expand Up @@ -181,7 +181,7 @@ impl StarcastNat {
}

impl Nat for StarcastNat {
fn translate_incoming(&mut self, packet: &mut [u8]) -> Result<(), Error> {
fn translate_incoming(&mut self, packet: &mut [u8]) -> Result<IpAddr, Error> {
match packet.first().ok_or(Error::PacketTooShort)? >> 4 {
4 => self.translate_incoming_internal::<MutableIpv4Packet>(packet),
6 => self.translate_incoming_internal::<MutableIpv6Packet>(packet),
Expand Down
135 changes: 106 additions & 29 deletions crates/telio-starcast/src/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,15 +83,19 @@ pub struct Peer {
pub addr: SocketAddr,
/// The public key of the peer
pub public_key: PublicKey,
/// Whether our node accepts multicast messages from the peer or not.
pub allow_multicast: bool,
/// Whether the peer node accepts multicast messages from our node or not.
pub peer_allows_multicast: bool,
}

/// Config for transport component
/// Contains fields that can change at runtime
pub enum Config {
/// Simple transport config, has IP of peer but not port
Simple(Vec<(PublicKey, IpAddr)>),
Simple(Vec<(PublicKey, IpAddr, bool, bool)>),
/// Full transport config, has full socket address of peer
Full(Vec<(PublicKey, SocketAddr)>),
Full(Vec<(PublicKey, SocketAddr, bool, bool)>),
}

/// The starcast transport component
Expand Down Expand Up @@ -200,14 +204,25 @@ impl State {
self.peers = match config {
Config::Simple(peers) => peers
.into_iter()
.map(|(public_key, addr)| Peer {
public_key,
addr: SocketAddr::new(addr, MULTICAST_TRANSPORT_PORT),
})
.map(
|(public_key, addr, allow_multicast, peer_allows_multicast)| Peer {
public_key,
addr: SocketAddr::new(addr, MULTICAST_TRANSPORT_PORT),
allow_multicast,
peer_allows_multicast,
},
)
.collect(),
Config::Full(peers) => peers
.into_iter()
.map(|(public_key, addr)| Peer { public_key, addr })
.map(
|(public_key, addr, allow_multicast, peer_allows_multicast)| Peer {
public_key,
addr,
allow_multicast,
peer_allows_multicast,
},
)
.collect(),
};
}
Expand All @@ -216,11 +231,15 @@ impl State {
let Some(transport_socket) = self.transport_socket.as_ref() else {
return Err(Error::TransportSocketNotOpen);
};
let failed_peers = join_all(self.peers.iter().map(|peer| {
transport_socket
.send_to(&packet, peer.addr)
.map_err(|_| peer.public_key)
}))
// If peer_allows_multicast is false for a peer, we cannot send multicast packets to that peer,
// but we can still receive multicast packets from that peer.
let failed_peers = join_all(self.peers.iter().filter(|p| p.peer_allows_multicast).map(
|peer| {
transport_socket
.send_to(&packet, peer.addr)
.map_err(|_| peer.public_key)
},
))
.await
.into_iter()
.filter_map(|res| match res {
Expand Down Expand Up @@ -256,6 +275,34 @@ impl State {
.map_err(|_| Error::SocketSendError)
}

/// Separate method for handling starcast packets received on the transport socket from other
/// meshnet nodes and dropping those packets if multicast isn't allowed for those nodes.
async fn handle_incoming_packet(
&mut self,
mut packet: Vec<u8>,
send_permit: tokio::sync::mpsc::OwnedPermit<Vec<u8>>,
) -> Result<(), Error> {
let peer_ip = self
.nat
.translate_incoming(&mut packet)
.map_err(Error::NatError)?;
if self
.peers
.iter()
.find(|peer| peer.addr.ip() == peer_ip)
// If allow_multicast is false for a peer, we drop any multicast packets that were
// received from that peer, but we can still send multicast packets to that peer.
.filter(|peer| peer.allow_multicast)
.is_some()
{
// If a starcast packet is received from a peer which is not present in the peer list,
// we assume that multicast is disallowed for it.
send_permit.send(packet);
};

Ok(())
}

fn has_multicast_dst(&self, packet: &mut [u8]) -> Result<bool, Error> {
let dst = match packet.first().ok_or(Error::InvalidIpPacket)? >> 4 {
4 => Self::get_packet_dst::<MutableIpv4Packet>(packet),
Expand Down Expand Up @@ -315,12 +362,8 @@ impl Runtime for State {
}
Some((permit, Ok(bytes_read))) = wait_for_tx(&self.packet_chan.tx, transport_socket.recv(&mut self.recv_buffer)) => {
#[allow(clippy::expect_used)]
let mut packet = self.recv_buffer.get(..bytes_read).expect("We know bytes_read bytes should be in the buffer at this point").to_vec();
self.nat.translate_incoming(&mut packet)
.map_err(Error::NatError)
.map(|_| {
let _ = permit.send(packet);
})
let packet = self.recv_buffer.get(..bytes_read).expect("We know bytes_read bytes should be in the buffer at this point").to_vec();
self.handle_incoming_packet(packet, permit).await
}
else => {
telio_log_warn!("MutlicastListener: no events to wait on");
Expand Down Expand Up @@ -351,23 +394,49 @@ mod tests {
task: Task<State>,
transport_socket: Arc<UdpSocket>,
channel: Chan<Vec<u8>>,
peers: Vec<(PublicKey, UdpSocket)>,
peers: Vec<(PublicKey, UdpSocket, bool, bool)>,
}

impl Scaffold {
async fn start() -> Self {
let transport_socket = Arc::new(Self::bind_local_socket().await);
let mut peers = Vec::with_capacity(3);
for _ in 0..3 {
peers.push((SecretKey::gen().public(), Self::bind_local_socket().await));
}
// Peers with all the different possible meshnet configurations:
let peers = vec![
(
SecretKey::gen().public(),
Self::bind_local_socket().await,
true,
true,
),
(
SecretKey::gen().public(),
Self::bind_local_socket().await,
false,
true,
),
(
SecretKey::gen().public(),
Self::bind_local_socket().await,
true,
false,
),
(
SecretKey::gen().public(),
Self::bind_local_socket().await,
false,
false,
),
];

let (packet_chan, channel) = Chan::pipe();

let task_peers = peers
.iter()
.map(|(pk, s)| Peer {
.map(|(pk, s, allow_multicast, peer_allows_multicast)| Peer {
public_key: *pk,
addr: s.local_addr().unwrap(),
allow_multicast: *allow_multicast,
peer_allows_multicast: *peer_allows_multicast,
})
.collect();
let multicast_ips = vec![IpNet::new("224.0.0.0".parse().unwrap(), 4).unwrap()];
Expand Down Expand Up @@ -459,11 +528,19 @@ mod tests {

scaffold.channel.tx.send(packet.clone()).await.unwrap();

for (_, socket) in &scaffold.peers {
for (_, socket, _, peer_allows_multicast) in &scaffold.peers {
let mut buffer = vec![0; TEST_MAX_PACKET_SIZE];
let bytes_read = socket.recv(&mut buffer).await.unwrap();
buffer.truncate(bytes_read);
assert_eq!(buffer, packet);
if *peer_allows_multicast {
let bytes_read = socket.recv(&mut buffer).await.unwrap();
buffer.truncate(bytes_read);
assert_eq!(buffer, packet);
} else {
// Using timeout here, because otherwise the socket will just wait forever.
let result =
tokio::time::timeout(Duration::from_millis(100), socket.recv_from(&mut buffer))
.await;
assert!(result.is_err());
}
}

scaffold.stop().await;
Expand Down Expand Up @@ -494,7 +571,7 @@ mod tests {

tokio::task::yield_now().await;

for (_, socket) in scaffold.peers.iter().skip(1) {
for (_, socket, _, _) in scaffold.peers.iter().skip(1) {
let mut buffer = vec![0; TEST_MAX_PACKET_SIZE];
assert!(socket.try_recv(&mut buffer).is_err());
}
Expand Down
4 changes: 2 additions & 2 deletions nat-lab/tests/mesh_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,8 +167,8 @@ def to_peer_config_for_node(self, node) -> Peer:
is_local=node.is_local and self.is_local,
allow_incoming_connections=firewall_config.allow_incoming_connections,
allow_peer_send_files=firewall_config.allow_peer_send_files,
allow_multicast=False,
peer_allows_multicast=False,
allow_multicast=True,
peer_allows_multicast=True,
)

def set_peer_firewall_settings(
Expand Down
4 changes: 2 additions & 2 deletions nat-lab/tests/test_mesh_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ def test_to_peer_config(self) -> None:
is_local=False,
allow_incoming_connections=False,
allow_peer_send_files=False,
allow_multicast=False,
peer_allows_multicast=False,
allow_multicast=True,
peer_allows_multicast=True,
)
assert expected == node.to_peer_config_for_node(node)

Expand Down
57 changes: 57 additions & 0 deletions nat-lab/tests/test_multicast_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from utils.bindings import default_features, TelioAdapterType
from utils.connection_util import ConnectionTag, Connection, TargetOS
from utils.multicast import MulticastClient, MulticastServer
from utils.process import ProcessExecError


def generate_setup_parameter_pair(
Expand Down Expand Up @@ -99,3 +100,59 @@ async def test_multicast(setup_params: List[SetupParameters], protocol: str) ->
async with MulticastServer(beta_connection, protocol).run() as server:
await server.wait_till_ready()
await MulticastClient(alpha_connection, protocol).execute()


MUILTICAST_DISALLOWED_TEST_PARAMS = [
pytest.param(
generate_setup_parameter_pair([
(ConnectionTag.DOCKER_FULLCONE_CLIENT_1, TelioAdapterType.BORING_TUN),
(ConnectionTag.DOCKER_FULLCONE_CLIENT_2, TelioAdapterType.BORING_TUN),
]),
"ssdp",
),
pytest.param(
generate_setup_parameter_pair([
(ConnectionTag.DOCKER_SYMMETRIC_CLIENT_1, TelioAdapterType.BORING_TUN),
(ConnectionTag.DOCKER_SYMMETRIC_CLIENT_2, TelioAdapterType.BORING_TUN),
]),
"mdns",
),
]


@pytest.mark.asyncio
@pytest.mark.parametrize("setup_params, protocol", MUILTICAST_DISALLOWED_TEST_PARAMS)
async def test_multicast_disallowed(
setup_params: List[SetupParameters], protocol: str
) -> None:
async with AsyncExitStack() as exit_stack:
env = await setup_mesh_nodes(exit_stack, setup_params)

alpha_connection, beta_connection = [
conn.connection for conn in env.connections
]

client_alpha, client_beta = env.clients

alpha, beta = env.nodes
mesh_config_alpha = env.api.get_meshnet_config(alpha.id)
if mesh_config_alpha.peers is not None:
for peer in mesh_config_alpha.peers:
if peer.base.hostname == beta.hostname:
peer.allow_multicast = False
await client_alpha.set_meshnet_config(mesh_config_alpha)

mesh_config_beta = env.api.get_meshnet_config(beta.id)
if mesh_config_beta.peers is not None:
for peer in mesh_config_beta.peers:
if peer.base.hostname == alpha.hostname:
peer.peer_allows_multicast = False
await client_beta.set_meshnet_config(mesh_config_beta)

await add_multicast_route(alpha_connection)
await add_multicast_route(beta_connection)

async with MulticastServer(beta_connection, protocol).run() as server:
with pytest.raises(ProcessExecError):
await server.wait_till_ready()
await MulticastClient(alpha_connection, protocol).execute()
14 changes: 13 additions & 1 deletion src/device.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1790,14 +1790,26 @@ impl Runtime {
.peers
.unwrap_or_default()
.iter()
.filter(|p| {
// If neither our node nor peer node allow multicast, there's no point in keeping
// that peer in the config.
p.allow_multicast || p.peer_allows_multicast
})
.filter_map(|p| {
p.ip_addresses
.to_owned()
.unwrap_or_default()
.iter()
// While IPV6 support is not added yet for multicast, only using IPV4 IPs
.find(|ip| ip.is_ipv4())
.map(|ip| (p.base.public_key, ip.to_owned()))
.map(|ip| {
(
p.base.public_key,
ip.to_owned(),
p.allow_multicast,
p.peer_allows_multicast,
)
})
})
.collect();
let starcast_transport_config = StarcastTransportConfig::Simple(multicast_peers);
Expand Down
Loading