Skip to content

Commit

Permalink
rave_rtsp: working towards client impl 2
Browse files Browse the repository at this point in the history
  • Loading branch information
gerwin3 committed Aug 18, 2023
1 parent b1650ba commit 412b4a9
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 21 deletions.
1 change: 0 additions & 1 deletion src/rave_rtp/src/packetization/h264.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
// TODO: use [`Unit`] over raw byte arrays
use crate::error::Error;
use crate::packet::Packet;
use crate::packetization::common::{PacketizationParameters, Packetizer};
Expand Down
82 changes: 68 additions & 14 deletions src/rave_rtsp/src/client.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
use std::str::FromStr;

use crate::error::Result;
use crate::io::AsClient;
use crate::message::Uri;
use crate::message::{Method, StatusCategory, Uri};
use crate::request::Request;
use crate::response::Response;
use crate::tokio_codec::Codec;
use crate::MaybeInterleaved;

use futures::SinkExt;

Expand All @@ -11,6 +15,17 @@ use tokio_stream::StreamExt;
type FramedRead = tokio_util::codec::FramedRead<tokio::net::tcp::OwnedReadHalf, Codec<AsClient>>;
type FramedWrite = tokio_util::codec::FramedWrite<tokio::net::tcp::OwnedWriteHalf, Codec<AsClient>>;

/// RTSP client.
///
/// Communicate with RTSP servers. The [`Client`] handles request and response,
/// serialization and deserialization, redirection and error handling.
///
/// # Example
///
/// ```
/// let client = Client::connect("rtsp://localhost/stream").await.unwrap();
/// println!("{:?}", client.options().await.unwrap());
/// ```
pub struct Client {
addr: std::net::SocketAddr,
uri: Uri,
Expand All @@ -23,15 +38,22 @@ pub struct Client {
impl Client {
pub async fn connect(uri: &Uri) -> Result<Client> {
let http::uri::Parts {
scheme,
authority,
path_and_query,
..
} = uri.into_parts();
if let Some("rtsp") = scheme {
todo!()
} else {
todo!() // ERR
scheme, authority, ..
} = uri.clone().into_parts();
// TODO: handle error
let authority = authority.unwrap();
match scheme {
Some(scheme) if scheme.as_str() == "rtsp" => {
let host = authority.host();
let port = authority.port_u16().unwrap_or(554);
let mut addrs = tokio::net::lookup_host((host, port)).await?;
let addr = addrs.next().unwrap(); // TODO: handle error
Self::connect_inner(addr, uri.clone()).await
}
_ => {
// TODO: error
todo!()
}
}
}

Expand Down Expand Up @@ -61,17 +83,49 @@ impl Client {
let write = FramedWrite::new(write, Codec::<AsClient>::new());
Ok(Self {
addr,
uri,
read,
write,
sequencer: Sequencer::new(),
session: None,
})
}

pub async fn options(&mut self) {
// TODO: cseq sequencer
// TODO: let request = Request::options(uri, self.sequencer.sequence())
todo!()
pub async fn options(&mut self) -> Result<Vec<Method>> {
let cseq = self.sequencer.sequence();
self.send(Request::options(&self.uri, cseq)).await?;
let response = self.receive().await?;
Ok(response
.headers
.get("Public")
.unwrap_or("")
.split(',')
// parse methods, trimming each method string, and leaving out
// invalid methods that could not be parsed
.filter_map(|method| Method::from_str(method.trim()).ok())
.collect())
}

#[inline]
async fn send(&mut self, request: Request) -> Result<()> {
self.write.send(request.into()).await
}

#[inline]
async fn receive(&mut self) -> Result<Response> {
// TODO: impl is quite easy: just handle unexpected None and interleaved as errors
let response = match self.read.next().await {
Some(Ok(MaybeInterleaved::Message(response))) => Ok(response),
Some(Ok(MaybeInterleaved::Interleaved { .. })) => Err(todo!()),
Some(Err(err)) => Err(err),
None => Err(todo!()),
}?;
// TODO: handle redirection
if response.status() == StatusCategory::Success {
Ok(response)
} else {
Err(todo!())
}
}

// TODO: other client calls
Expand Down
6 changes: 6 additions & 0 deletions src/rave_rtsp/src/interleaved.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,12 @@ pub enum MaybeInterleaved<M: Message> {
Interleaved { channel: ChannelId, payload: Bytes },
}

impl<M: Message> From<M> for MaybeInterleaved<M> {
fn from(message: M) -> Self {
MaybeInterleaved::Message(message)
}
}

impl<M: Message> std::fmt::Display for MaybeInterleaved<M> {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match self {
Expand Down
14 changes: 8 additions & 6 deletions src/rave_rtsp/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,6 @@ impl Headers {
])
}

pub fn from_iter(headers: impl IntoIterator<Item = (String, String)>) -> Self {
Self {
map: BTreeMap::from_iter(headers),
}
}

pub fn insert(&mut self, key: String, value: String) -> Option<String> {
self.map.insert(key, value)
}
Expand Down Expand Up @@ -80,6 +74,14 @@ impl From<BTreeMap<String, String>> for Headers {
}
}

impl std::iter::FromIterator<(String, String)> for Headers {
fn from_iter<I: IntoIterator<Item = (String, String)>>(headers: I) -> Self {
Self {
map: BTreeMap::from_iter(headers),
}
}
}

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum Method {
Describe,
Expand Down

0 comments on commit 412b4a9

Please sign in to comment.