diff options
author | metamuffin <metamuffin@disroot.org> | 2024-07-06 01:19:02 +0200 |
---|---|---|
committer | metamuffin <metamuffin@disroot.org> | 2024-07-06 01:19:02 +0200 |
commit | 5dd0fafce20ed37fdc97dc96539391ebdebffaff (patch) | |
tree | ad93b9e8d0e9c9c7dbe5a858902c2ba0114a47cf | |
parent | a4c52bedef04cfb927f3d7809680fed0425a5125 (diff) | |
download | sip-rs-5dd0fafce20ed37fdc97dc96539391ebdebffaff.tar sip-rs-5dd0fafce20ed37fdc97dc96539391ebdebffaff.tar.bz2 sip-rs-5dd0fafce20ed37fdc97dc96539391ebdebffaff.tar.zst |
generalize to support requests. untested
-rw-r--r-- | examples/server.rs | 20 | ||||
-rw-r--r-- | src/encoding/headermap.rs | 13 | ||||
-rw-r--r-- | src/encoding/headers.rs | 1 | ||||
-rw-r--r-- | src/encoding/mod.rs | 30 | ||||
-rw-r--r-- | src/encoding/request.rs | 34 | ||||
-rw-r--r-- | src/encoding/response.rs | 9 | ||||
-rw-r--r-- | src/encoding/status.rs | 3 | ||||
-rw-r--r-- | src/encoding/uri.rs | 12 | ||||
-rw-r--r-- | src/transaction/mod.rs | 38 | ||||
-rw-r--r-- | src/transport/mod.rs | 6 | ||||
-rw-r--r-- | src/transport/tcp.rs | 47 | ||||
-rw-r--r-- | src/transport/udp.rs | 8 |
12 files changed, 157 insertions, 64 deletions
diff --git a/examples/server.rs b/examples/server.rs new file mode 100644 index 0000000..5e294b0 --- /dev/null +++ b/examples/server.rs @@ -0,0 +1,20 @@ +use anyhow::Result; +use log::info; +use sip::{transaction::TransactionUser, transport::tcp::TcpTransport}; +use tokio::net::TcpListener; + +#[tokio::main] +async fn main() -> Result<()> { + let listener = TcpListener::bind("0.0.0.0:5060").await?; + + loop { + let (stream, addr) = listener.accept().await?; + info!("connection from {addr}"); + let transport = TcpTransport::new(stream).await?; + let tu = TransactionUser::new(transport); + + let req = tu.process_incoming().await?; + } + + Ok(()) +} diff --git a/src/encoding/headermap.rs b/src/encoding/headermap.rs index 65ef8d2..5d1fa0a 100644 --- a/src/encoding/headermap.rs +++ b/src/encoding/headermap.rs @@ -1,5 +1,5 @@ use super::headers::Header; -use anyhow::Result; +use anyhow::{anyhow, Result}; use std::fmt::Display; #[derive(Debug, Clone)] @@ -38,6 +38,17 @@ impl Display for HeaderMap { Ok(()) } } +impl HeaderMap { + pub fn parse<'a>(lines: &mut impl Iterator<Item = &'a str>) -> Result<Self> { + let mut headers = HeaderMap::new(); + for line in lines { + // TODO multiline values + let (key, value) = line.split_once(":").ok_or(anyhow!("header malformed"))?; + headers.insert_raw(key.trim().to_string(), value.trim().to_string()) + } + Ok(headers) + } +} impl FromIterator<(String, String)> for HeaderMap { fn from_iter<T: IntoIterator<Item = (String, String)>>(iter: T) -> Self { diff --git a/src/encoding/headers.rs b/src/encoding/headers.rs index 60dca7f..9e785f0 100644 --- a/src/encoding/headers.rs +++ b/src/encoding/headers.rs @@ -28,6 +28,7 @@ pub trait Header: FromStr<Err = anyhow::Error> + Display { } header!("Content-Length", struct ContentLength(usize)); +header!("Content-Type", struct ContentType(String)); header!("Call-ID", struct CallID(String)); header!("Via", struct Via(String)); header!("Contact", struct Contact(String)); diff --git a/src/encoding/mod.rs b/src/encoding/mod.rs index a7dd227..f0796c8 100644 --- a/src/encoding/mod.rs +++ b/src/encoding/mod.rs @@ -1,3 +1,8 @@ +use std::{fmt::Display, str::FromStr}; + +use request::Request; +use response::Response; + pub mod headermap; pub mod headers; pub mod method; @@ -5,3 +10,28 @@ pub mod request; pub mod response; pub mod status; pub mod uri; + +#[derive(Debug, Clone)] +pub enum Message { + Request(Request), + Response(Response), +} + +impl Display for Message { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Message::Request(r) => write!(f, "{r}"), + Message::Response(r) => write!(f, "{r}"), + } + } +} +impl FromStr for Message { + type Err = anyhow::Error; + fn from_str(s: &str) -> Result<Self, Self::Err> { + if s.starts_with("SIP/") { + Response::from_str(s).map(Message::Response) + } else { + Request::from_str(s).map(Message::Request) + } + } +} diff --git a/src/encoding/request.rs b/src/encoding/request.rs index aecd006..c62bab3 100644 --- a/src/encoding/request.rs +++ b/src/encoding/request.rs @@ -1,5 +1,6 @@ use super::{headermap::HeaderMap, method::Method, uri::Uri}; -use std::fmt::Display; +use anyhow::{anyhow, bail}; +use std::{fmt::Display, str::FromStr}; #[derive(Debug, Clone)] pub struct Request { @@ -20,3 +21,34 @@ impl Display for Request { Ok(()) } } +impl FromStr for Request { + type Err = anyhow::Error; + fn from_str(s: &str) -> Result<Self, Self::Err> { + let mut lines = s.lines(); + let statusline = lines.next().ok_or(anyhow!("status line missing"))?; + let (method, rest) = statusline + .split_once(" ") + .ok_or(anyhow!("status line malformed"))?; + let (uri, sipver) = rest + .split_once(" ") + .ok_or(anyhow!("status line malformed"))?; + + let Some(ver) = sipver.strip_prefix("SIP/") else { + bail!("sip version malformed"); + }; + if ver != "2.0" { + bail!("sip version {ver:?} is not supported"); + } + + let uri = Uri::from_str(uri)?; + + let headers = HeaderMap::parse(&mut lines)?; + let method = Method::from_str(method)?; + + Ok(Self { + headers, + method, + uri, + }) + } +} diff --git a/src/encoding/response.rs b/src/encoding/response.rs index 7ee124f..ffd2878 100644 --- a/src/encoding/response.rs +++ b/src/encoding/response.rs @@ -2,7 +2,7 @@ use super::{headermap::HeaderMap, status::Status}; use anyhow::{anyhow, bail, Context}; use std::{fmt::Display, str::FromStr}; -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct Response { pub status: Status, pub headers: HeaderMap, @@ -28,12 +28,7 @@ impl FromStr for Response { bail!("sip version {ver:?} is not supported"); } - let mut headers = HeaderMap::new(); - for line in lines { - // TODO multiline values - let (key, value) = line.split_once(":").ok_or(anyhow!("header malformed"))?; - headers.insert_raw(key.trim().to_string(), value.trim().to_string()) - } + let headers = HeaderMap::parse(&mut lines)?; let status = Status::from_code(code); Ok(Self { status, headers }) diff --git a/src/encoding/status.rs b/src/encoding/status.rs index f06110d..9fe03d7 100644 --- a/src/encoding/status.rs +++ b/src/encoding/status.rs @@ -1,7 +1,6 @@ - macro_rules! status_enum { ($v:vis enum $name:ident { $($variant:ident = $value:literal),*, }) => { - #[derive(Debug)] + #[derive(Debug, Clone)] $v enum $name { $($variant),*, Other(u16) } impl $name { pub fn from_code(c: u16) -> Self { match c { $($value => Self::$variant),*, x => Self::Other(x) } } } impl $name { pub fn to_code(&self) -> u16 { match self { $(Self::$variant => $value),*, Self::Other(x) => *x } } } diff --git a/src/encoding/uri.rs b/src/encoding/uri.rs index 7aeefdb..a56cad4 100644 --- a/src/encoding/uri.rs +++ b/src/encoding/uri.rs @@ -1,6 +1,6 @@ -use std::fmt::Display; +use std::{fmt::Display, str::FromStr}; -#[derive(Debug,Clone)] +#[derive(Debug, Clone)] pub struct Uri { pub content: String, } @@ -11,3 +11,11 @@ impl Display for Uri { Ok(()) } } +impl FromStr for Uri { + type Err = anyhow::Error; + fn from_str(s: &str) -> Result<Self, Self::Err> { + Ok(Self { + content: s.to_string(), + }) + } +} diff --git a/src/transaction/mod.rs b/src/transaction/mod.rs index 2953cb7..3be9544 100644 --- a/src/transaction/mod.rs +++ b/src/transaction/mod.rs @@ -1,5 +1,5 @@ use crate::{ - encoding::{headers::CSeq, request::Request, response::Response}, + encoding::{headers::CSeq, request::Request, response::Response, Message}, transport::Transport, }; use anyhow::{anyhow, Result}; @@ -27,20 +27,26 @@ impl<T: Transport> TransactionUser<T> { } } - pub async fn process_responses(&self) -> Result<()> { - let resp = self.transport.recv().await?; - let cseq = resp - .headers - .get() - .ok_or(anyhow!("response cseq missing"))??; - self.pending_requests - .write() - .await - .get_mut(&cseq) - .ok_or(anyhow!("message was not requested"))? - .send(resp) - .await?; - Ok(()) + pub async fn process_incoming(&self) -> Result<Request> { + loop { + let mesg = self.transport.recv().await?; + match mesg { + Message::Request(req) => break Ok(req), + Message::Response(resp) => { + let cseq = resp + .headers + .get() + .ok_or(anyhow!("response cseq missing"))??; + self.pending_requests + .write() + .await + .get_mut(&cseq) + .ok_or(anyhow!("message was not requested"))? + .send(resp) + .await?; + } + } + } } pub async fn transact(&self, mut request: Request) -> Result<mpsc::Receiver<Response>> { @@ -50,7 +56,7 @@ impl<T: Transport> TransactionUser<T> { let (tx, rx) = channel(4); - self.transport.send(request).await?; + self.transport.send(Message::Request(request)).await?; self.pending_requests.write().await.insert(cseq, tx); Ok(rx) diff --git a/src/transport/mod.rs b/src/transport/mod.rs index cf49d31..b4c512b 100644 --- a/src/transport/mod.rs +++ b/src/transport/mod.rs @@ -1,4 +1,4 @@ -use crate::encoding::{request::Request, response::Response}; +use crate::encoding::Message; use anyhow::Result; pub mod tcp; @@ -6,6 +6,6 @@ pub mod udp; #[allow(async_fn_in_trait)] pub trait Transport { - async fn recv(&self) -> Result<Response>; - async fn send(&self, request: Request) -> Result<()>; + async fn recv(&self) -> Result<Message>; + async fn send(&self, message: Message) -> Result<()>; } diff --git a/src/transport/tcp.rs b/src/transport/tcp.rs index 8c0a024..efe433d 100644 --- a/src/transport/tcp.rs +++ b/src/transport/tcp.rs @@ -1,53 +1,44 @@ use super::Transport; -use crate::encoding::{request::Request, response::Response}; -use anyhow::{anyhow, Result}; +use crate::encoding::Message; +use anyhow::Result; use log::debug; use std::str::FromStr; use tokio::{ io::{AsyncBufReadExt, AsyncWriteExt, BufReader, BufWriter}, - net::{tcp::OwnedWriteHalf, TcpStream}, - sync::{ - mpsc::{channel, Receiver}, - Mutex, + net::{ + tcp::{OwnedReadHalf, OwnedWriteHalf}, + TcpStream, }, + sync::Mutex, }; pub struct TcpTransport { write: Mutex<BufWriter<OwnedWriteHalf>>, - read: Mutex<Receiver<Response>>, + read: Mutex<BufReader<OwnedReadHalf>>, } impl TcpTransport { pub async fn new(stream: TcpStream) -> Result<Self> { let (read, write) = stream.into_split(); - - let (tx, rx) = channel(16); - - tokio::task::spawn(async move { - let mut sock = BufReader::new(read); - let mut message = String::new(); - loop { - while !message.ends_with("\r\n\r\n") { - sock.read_line(&mut message).await.unwrap(); - } - let mesg = Response::from_str(message.trim()).unwrap(); - debug!("<- {mesg}"); - tx.send(mesg).await.unwrap(); - message.clear(); - } - }); - Ok(Self { write: BufWriter::new(write).into(), - read: rx.into(), + read: BufReader::new(read).into(), }) } } + impl Transport for TcpTransport { - async fn recv(&self) -> Result<Response> { - self.read.lock().await.recv().await.ok_or(anyhow!("end")) + async fn recv(&self) -> Result<Message> { + let mut g = self.read.lock().await; + let mut message = String::new(); + while !message.ends_with("\r\n\r\n") { + g.read_line(&mut message).await?; + } + let mesg = Message::from_str(message.trim())?; + debug!("<- {mesg}"); + Ok(mesg) } - async fn send(&self, request: Request) -> Result<()> { + async fn send(&self, request: Message) -> Result<()> { debug!("-> {request}"); let mut g = self.write.lock().await; g.write_all(format!("{request}").as_bytes()).await?; diff --git a/src/transport/udp.rs b/src/transport/udp.rs index c86ce50..391e2b3 100644 --- a/src/transport/udp.rs +++ b/src/transport/udp.rs @@ -1,5 +1,5 @@ use super::Transport; -use crate::encoding::{request::Request, response::Response}; +use crate::encoding::Message; use anyhow::Result; use log::debug; use std::str::FromStr; @@ -15,14 +15,14 @@ impl UdpTransport { } } impl Transport for UdpTransport { - async fn recv(&self) -> Result<Response> { + async fn recv(&self) -> Result<Message> { let mut buf = [0; 1024]; let size = self.sock.recv(&mut buf).await?; let message = String::from_utf8(buf[..size].to_vec())?; debug!("{message}"); - Response::from_str(message.trim_end()) + Message::from_str(message.trim_end()) } - async fn send(&self, request: Request) -> Result<()> { + async fn send(&self, request: Message) -> Result<()> { debug!("-> {request}"); self.sock.send(format!("{request}").as_bytes()).await?; Ok(()) |