aboutsummaryrefslogtreecommitdiff
path: root/sip/src/transport
diff options
context:
space:
mode:
Diffstat (limited to 'sip/src/transport')
-rw-r--r--sip/src/transport/mod.rs11
-rw-r--r--sip/src/transport/tcp.rs48
-rw-r--r--sip/src/transport/udp.rs35
3 files changed, 94 insertions, 0 deletions
diff --git a/sip/src/transport/mod.rs b/sip/src/transport/mod.rs
new file mode 100644
index 0000000..b4c512b
--- /dev/null
+++ b/sip/src/transport/mod.rs
@@ -0,0 +1,11 @@
+use crate::encoding::Message;
+use anyhow::Result;
+
+pub mod tcp;
+pub mod udp;
+
+#[allow(async_fn_in_trait)]
+pub trait Transport {
+ async fn recv(&self) -> Result<Message>;
+ async fn send(&self, message: Message) -> Result<()>;
+}
diff --git a/sip/src/transport/tcp.rs b/sip/src/transport/tcp.rs
new file mode 100644
index 0000000..efe433d
--- /dev/null
+++ b/sip/src/transport/tcp.rs
@@ -0,0 +1,48 @@
+use super::Transport;
+use crate::encoding::Message;
+use anyhow::Result;
+use log::debug;
+use std::str::FromStr;
+use tokio::{
+ io::{AsyncBufReadExt, AsyncWriteExt, BufReader, BufWriter},
+ net::{
+ tcp::{OwnedReadHalf, OwnedWriteHalf},
+ TcpStream,
+ },
+ sync::Mutex,
+};
+
+pub struct TcpTransport {
+ write: Mutex<BufWriter<OwnedWriteHalf>>,
+ read: Mutex<BufReader<OwnedReadHalf>>,
+}
+
+impl TcpTransport {
+ pub async fn new(stream: TcpStream) -> Result<Self> {
+ let (read, write) = stream.into_split();
+ Ok(Self {
+ write: BufWriter::new(write).into(),
+ read: BufReader::new(read).into(),
+ })
+ }
+}
+
+impl Transport for TcpTransport {
+ async fn recv(&self) -> Result<Message> {
+ let mut g = self.read.lock().await;
+ let mut message = String::new();
+ while !message.ends_with("\r\n\r\n") {
+ g.read_line(&mut message).await?;
+ }
+ let mesg = Message::from_str(message.trim())?;
+ debug!("<- {mesg}");
+ Ok(mesg)
+ }
+ async fn send(&self, request: Message) -> Result<()> {
+ debug!("-> {request}");
+ let mut g = self.write.lock().await;
+ g.write_all(format!("{request}").as_bytes()).await?;
+ g.flush().await?;
+ Ok(())
+ }
+}
diff --git a/sip/src/transport/udp.rs b/sip/src/transport/udp.rs
new file mode 100644
index 0000000..c0d7829
--- /dev/null
+++ b/sip/src/transport/udp.rs
@@ -0,0 +1,35 @@
+use super::Transport;
+use crate::encoding::Message;
+use anyhow::{anyhow, Result};
+use log::debug;
+use std::str::FromStr;
+use tokio::net::UdpSocket;
+
+pub struct UdpTransport {
+ sock: UdpSocket,
+}
+
+impl UdpTransport {
+ pub async fn new(sock: UdpSocket) -> Result<Self> {
+ Ok(Self { sock })
+ }
+}
+impl Transport for UdpTransport {
+ async fn recv(&self) -> Result<Message> {
+ let mut buf = [0; 1024];
+ let size = self.sock.recv(&mut buf).await?;
+ let message = String::from_utf8(buf[..size].to_vec())?;
+ let (head, body) = message
+ .split_once("\r\n\r\n")
+ .ok_or(anyhow!("header end missing"))?;
+ debug!("<- {head}\n\n{body}");
+ let mut mesg = Message::from_str(head.trim_end())?;
+ *mesg.body_mut() = body.to_string();
+ Ok(mesg)
+ }
+ async fn send(&self, request: Message) -> Result<()> {
+ debug!("-> {request}");
+ self.sock.send(format!("{request}").as_bytes()).await?;
+ Ok(())
+ }
+}