diff options
Diffstat (limited to 'sip/src/transport')
-rw-r--r-- | sip/src/transport/mod.rs | 11 | ||||
-rw-r--r-- | sip/src/transport/tcp.rs | 48 | ||||
-rw-r--r-- | sip/src/transport/udp.rs | 35 |
3 files changed, 94 insertions, 0 deletions
diff --git a/sip/src/transport/mod.rs b/sip/src/transport/mod.rs new file mode 100644 index 0000000..b4c512b --- /dev/null +++ b/sip/src/transport/mod.rs @@ -0,0 +1,11 @@ +use crate::encoding::Message; +use anyhow::Result; + +pub mod tcp; +pub mod udp; + +#[allow(async_fn_in_trait)] +pub trait Transport { + async fn recv(&self) -> Result<Message>; + async fn send(&self, message: Message) -> Result<()>; +} diff --git a/sip/src/transport/tcp.rs b/sip/src/transport/tcp.rs new file mode 100644 index 0000000..efe433d --- /dev/null +++ b/sip/src/transport/tcp.rs @@ -0,0 +1,48 @@ +use super::Transport; +use crate::encoding::Message; +use anyhow::Result; +use log::debug; +use std::str::FromStr; +use tokio::{ + io::{AsyncBufReadExt, AsyncWriteExt, BufReader, BufWriter}, + net::{ + tcp::{OwnedReadHalf, OwnedWriteHalf}, + TcpStream, + }, + sync::Mutex, +}; + +pub struct TcpTransport { + write: Mutex<BufWriter<OwnedWriteHalf>>, + read: Mutex<BufReader<OwnedReadHalf>>, +} + +impl TcpTransport { + pub async fn new(stream: TcpStream) -> Result<Self> { + let (read, write) = stream.into_split(); + Ok(Self { + write: BufWriter::new(write).into(), + read: BufReader::new(read).into(), + }) + } +} + +impl Transport for TcpTransport { + async fn recv(&self) -> Result<Message> { + let mut g = self.read.lock().await; + let mut message = String::new(); + while !message.ends_with("\r\n\r\n") { + g.read_line(&mut message).await?; + } + let mesg = Message::from_str(message.trim())?; + debug!("<- {mesg}"); + Ok(mesg) + } + async fn send(&self, request: Message) -> Result<()> { + debug!("-> {request}"); + let mut g = self.write.lock().await; + g.write_all(format!("{request}").as_bytes()).await?; + g.flush().await?; + Ok(()) + } +} diff --git a/sip/src/transport/udp.rs b/sip/src/transport/udp.rs new file mode 100644 index 0000000..c0d7829 --- /dev/null +++ b/sip/src/transport/udp.rs @@ -0,0 +1,35 @@ +use super::Transport; +use crate::encoding::Message; +use anyhow::{anyhow, Result}; +use log::debug; +use std::str::FromStr; +use tokio::net::UdpSocket; + +pub struct UdpTransport { + sock: UdpSocket, +} + +impl UdpTransport { + pub async fn new(sock: UdpSocket) -> Result<Self> { + Ok(Self { sock }) + } +} +impl Transport for UdpTransport { + async fn recv(&self) -> Result<Message> { + let mut buf = [0; 1024]; + let size = self.sock.recv(&mut buf).await?; + let message = String::from_utf8(buf[..size].to_vec())?; + let (head, body) = message + .split_once("\r\n\r\n") + .ok_or(anyhow!("header end missing"))?; + debug!("<- {head}\n\n{body}"); + let mut mesg = Message::from_str(head.trim_end())?; + *mesg.body_mut() = body.to_string(); + Ok(mesg) + } + async fn send(&self, request: Message) -> Result<()> { + debug!("-> {request}"); + self.sock.send(format!("{request}").as_bytes()).await?; + Ok(()) + } +} |