aboutsummaryrefslogtreecommitdiff
path: root/sip/src/transport/tcp.rs
diff options
context:
space:
mode:
Diffstat (limited to 'sip/src/transport/tcp.rs')
-rw-r--r--sip/src/transport/tcp.rs48
1 files changed, 48 insertions, 0 deletions
diff --git a/sip/src/transport/tcp.rs b/sip/src/transport/tcp.rs
new file mode 100644
index 0000000..efe433d
--- /dev/null
+++ b/sip/src/transport/tcp.rs
@@ -0,0 +1,48 @@
+use super::Transport;
+use crate::encoding::Message;
+use anyhow::Result;
+use log::debug;
+use std::str::FromStr;
+use tokio::{
+ io::{AsyncBufReadExt, AsyncWriteExt, BufReader, BufWriter},
+ net::{
+ tcp::{OwnedReadHalf, OwnedWriteHalf},
+ TcpStream,
+ },
+ sync::Mutex,
+};
+
+pub struct TcpTransport {
+ write: Mutex<BufWriter<OwnedWriteHalf>>,
+ read: Mutex<BufReader<OwnedReadHalf>>,
+}
+
+impl TcpTransport {
+ pub async fn new(stream: TcpStream) -> Result<Self> {
+ let (read, write) = stream.into_split();
+ Ok(Self {
+ write: BufWriter::new(write).into(),
+ read: BufReader::new(read).into(),
+ })
+ }
+}
+
+impl Transport for TcpTransport {
+ async fn recv(&self) -> Result<Message> {
+ let mut g = self.read.lock().await;
+ let mut message = String::new();
+ while !message.ends_with("\r\n\r\n") {
+ g.read_line(&mut message).await?;
+ }
+ let mesg = Message::from_str(message.trim())?;
+ debug!("<- {mesg}");
+ Ok(mesg)
+ }
+ async fn send(&self, request: Message) -> Result<()> {
+ debug!("-> {request}");
+ let mut g = self.write.lock().await;
+ g.write_all(format!("{request}").as_bytes()).await?;
+ g.flush().await?;
+ Ok(())
+ }
+}