aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authormetamuffin <metamuffin@disroot.org>2024-07-06 01:19:02 +0200
committermetamuffin <metamuffin@disroot.org>2024-07-06 01:19:02 +0200
commit5dd0fafce20ed37fdc97dc96539391ebdebffaff (patch)
treead93b9e8d0e9c9c7dbe5a858902c2ba0114a47cf
parenta4c52bedef04cfb927f3d7809680fed0425a5125 (diff)
downloadsip-rs-5dd0fafce20ed37fdc97dc96539391ebdebffaff.tar
sip-rs-5dd0fafce20ed37fdc97dc96539391ebdebffaff.tar.bz2
sip-rs-5dd0fafce20ed37fdc97dc96539391ebdebffaff.tar.zst
generalize to support requests. untested
-rw-r--r--examples/server.rs20
-rw-r--r--src/encoding/headermap.rs13
-rw-r--r--src/encoding/headers.rs1
-rw-r--r--src/encoding/mod.rs30
-rw-r--r--src/encoding/request.rs34
-rw-r--r--src/encoding/response.rs9
-rw-r--r--src/encoding/status.rs3
-rw-r--r--src/encoding/uri.rs12
-rw-r--r--src/transaction/mod.rs38
-rw-r--r--src/transport/mod.rs6
-rw-r--r--src/transport/tcp.rs47
-rw-r--r--src/transport/udp.rs8
12 files changed, 157 insertions, 64 deletions
diff --git a/examples/server.rs b/examples/server.rs
new file mode 100644
index 0000000..5e294b0
--- /dev/null
+++ b/examples/server.rs
@@ -0,0 +1,20 @@
+use anyhow::Result;
+use log::info;
+use sip::{transaction::TransactionUser, transport::tcp::TcpTransport};
+use tokio::net::TcpListener;
+
+#[tokio::main]
+async fn main() -> Result<()> {
+ let listener = TcpListener::bind("0.0.0.0:5060").await?;
+
+ loop {
+ let (stream, addr) = listener.accept().await?;
+ info!("connection from {addr}");
+ let transport = TcpTransport::new(stream).await?;
+ let tu = TransactionUser::new(transport);
+
+ let req = tu.process_incoming().await?;
+ }
+
+ Ok(())
+}
diff --git a/src/encoding/headermap.rs b/src/encoding/headermap.rs
index 65ef8d2..5d1fa0a 100644
--- a/src/encoding/headermap.rs
+++ b/src/encoding/headermap.rs
@@ -1,5 +1,5 @@
use super::headers::Header;
-use anyhow::Result;
+use anyhow::{anyhow, Result};
use std::fmt::Display;
#[derive(Debug, Clone)]
@@ -38,6 +38,17 @@ impl Display for HeaderMap {
Ok(())
}
}
+impl HeaderMap {
+ pub fn parse<'a>(lines: &mut impl Iterator<Item = &'a str>) -> Result<Self> {
+ let mut headers = HeaderMap::new();
+ for line in lines {
+ // TODO multiline values
+ let (key, value) = line.split_once(":").ok_or(anyhow!("header malformed"))?;
+ headers.insert_raw(key.trim().to_string(), value.trim().to_string())
+ }
+ Ok(headers)
+ }
+}
impl FromIterator<(String, String)> for HeaderMap {
fn from_iter<T: IntoIterator<Item = (String, String)>>(iter: T) -> Self {
diff --git a/src/encoding/headers.rs b/src/encoding/headers.rs
index 60dca7f..9e785f0 100644
--- a/src/encoding/headers.rs
+++ b/src/encoding/headers.rs
@@ -28,6 +28,7 @@ pub trait Header: FromStr<Err = anyhow::Error> + Display {
}
header!("Content-Length", struct ContentLength(usize));
+header!("Content-Type", struct ContentType(String));
header!("Call-ID", struct CallID(String));
header!("Via", struct Via(String));
header!("Contact", struct Contact(String));
diff --git a/src/encoding/mod.rs b/src/encoding/mod.rs
index a7dd227..f0796c8 100644
--- a/src/encoding/mod.rs
+++ b/src/encoding/mod.rs
@@ -1,3 +1,8 @@
+use std::{fmt::Display, str::FromStr};
+
+use request::Request;
+use response::Response;
+
pub mod headermap;
pub mod headers;
pub mod method;
@@ -5,3 +10,28 @@ pub mod request;
pub mod response;
pub mod status;
pub mod uri;
+
+#[derive(Debug, Clone)]
+pub enum Message {
+ Request(Request),
+ Response(Response),
+}
+
+impl Display for Message {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ match self {
+ Message::Request(r) => write!(f, "{r}"),
+ Message::Response(r) => write!(f, "{r}"),
+ }
+ }
+}
+impl FromStr for Message {
+ type Err = anyhow::Error;
+ fn from_str(s: &str) -> Result<Self, Self::Err> {
+ if s.starts_with("SIP/") {
+ Response::from_str(s).map(Message::Response)
+ } else {
+ Request::from_str(s).map(Message::Request)
+ }
+ }
+}
diff --git a/src/encoding/request.rs b/src/encoding/request.rs
index aecd006..c62bab3 100644
--- a/src/encoding/request.rs
+++ b/src/encoding/request.rs
@@ -1,5 +1,6 @@
use super::{headermap::HeaderMap, method::Method, uri::Uri};
-use std::fmt::Display;
+use anyhow::{anyhow, bail};
+use std::{fmt::Display, str::FromStr};
#[derive(Debug, Clone)]
pub struct Request {
@@ -20,3 +21,34 @@ impl Display for Request {
Ok(())
}
}
+impl FromStr for Request {
+ type Err = anyhow::Error;
+ fn from_str(s: &str) -> Result<Self, Self::Err> {
+ let mut lines = s.lines();
+ let statusline = lines.next().ok_or(anyhow!("status line missing"))?;
+ let (method, rest) = statusline
+ .split_once(" ")
+ .ok_or(anyhow!("status line malformed"))?;
+ let (uri, sipver) = rest
+ .split_once(" ")
+ .ok_or(anyhow!("status line malformed"))?;
+
+ let Some(ver) = sipver.strip_prefix("SIP/") else {
+ bail!("sip version malformed");
+ };
+ if ver != "2.0" {
+ bail!("sip version {ver:?} is not supported");
+ }
+
+ let uri = Uri::from_str(uri)?;
+
+ let headers = HeaderMap::parse(&mut lines)?;
+ let method = Method::from_str(method)?;
+
+ Ok(Self {
+ headers,
+ method,
+ uri,
+ })
+ }
+}
diff --git a/src/encoding/response.rs b/src/encoding/response.rs
index 7ee124f..ffd2878 100644
--- a/src/encoding/response.rs
+++ b/src/encoding/response.rs
@@ -2,7 +2,7 @@ use super::{headermap::HeaderMap, status::Status};
use anyhow::{anyhow, bail, Context};
use std::{fmt::Display, str::FromStr};
-#[derive(Debug)]
+#[derive(Debug, Clone)]
pub struct Response {
pub status: Status,
pub headers: HeaderMap,
@@ -28,12 +28,7 @@ impl FromStr for Response {
bail!("sip version {ver:?} is not supported");
}
- let mut headers = HeaderMap::new();
- for line in lines {
- // TODO multiline values
- let (key, value) = line.split_once(":").ok_or(anyhow!("header malformed"))?;
- headers.insert_raw(key.trim().to_string(), value.trim().to_string())
- }
+ let headers = HeaderMap::parse(&mut lines)?;
let status = Status::from_code(code);
Ok(Self { status, headers })
diff --git a/src/encoding/status.rs b/src/encoding/status.rs
index f06110d..9fe03d7 100644
--- a/src/encoding/status.rs
+++ b/src/encoding/status.rs
@@ -1,7 +1,6 @@
-
macro_rules! status_enum {
($v:vis enum $name:ident { $($variant:ident = $value:literal),*, }) => {
- #[derive(Debug)]
+ #[derive(Debug, Clone)]
$v enum $name { $($variant),*, Other(u16) }
impl $name { pub fn from_code(c: u16) -> Self { match c { $($value => Self::$variant),*, x => Self::Other(x) } } }
impl $name { pub fn to_code(&self) -> u16 { match self { $(Self::$variant => $value),*, Self::Other(x) => *x } } }
diff --git a/src/encoding/uri.rs b/src/encoding/uri.rs
index 7aeefdb..a56cad4 100644
--- a/src/encoding/uri.rs
+++ b/src/encoding/uri.rs
@@ -1,6 +1,6 @@
-use std::fmt::Display;
+use std::{fmt::Display, str::FromStr};
-#[derive(Debug,Clone)]
+#[derive(Debug, Clone)]
pub struct Uri {
pub content: String,
}
@@ -11,3 +11,11 @@ impl Display for Uri {
Ok(())
}
}
+impl FromStr for Uri {
+ type Err = anyhow::Error;
+ fn from_str(s: &str) -> Result<Self, Self::Err> {
+ Ok(Self {
+ content: s.to_string(),
+ })
+ }
+}
diff --git a/src/transaction/mod.rs b/src/transaction/mod.rs
index 2953cb7..3be9544 100644
--- a/src/transaction/mod.rs
+++ b/src/transaction/mod.rs
@@ -1,5 +1,5 @@
use crate::{
- encoding::{headers::CSeq, request::Request, response::Response},
+ encoding::{headers::CSeq, request::Request, response::Response, Message},
transport::Transport,
};
use anyhow::{anyhow, Result};
@@ -27,20 +27,26 @@ impl<T: Transport> TransactionUser<T> {
}
}
- pub async fn process_responses(&self) -> Result<()> {
- let resp = self.transport.recv().await?;
- let cseq = resp
- .headers
- .get()
- .ok_or(anyhow!("response cseq missing"))??;
- self.pending_requests
- .write()
- .await
- .get_mut(&cseq)
- .ok_or(anyhow!("message was not requested"))?
- .send(resp)
- .await?;
- Ok(())
+ pub async fn process_incoming(&self) -> Result<Request> {
+ loop {
+ let mesg = self.transport.recv().await?;
+ match mesg {
+ Message::Request(req) => break Ok(req),
+ Message::Response(resp) => {
+ let cseq = resp
+ .headers
+ .get()
+ .ok_or(anyhow!("response cseq missing"))??;
+ self.pending_requests
+ .write()
+ .await
+ .get_mut(&cseq)
+ .ok_or(anyhow!("message was not requested"))?
+ .send(resp)
+ .await?;
+ }
+ }
+ }
}
pub async fn transact(&self, mut request: Request) -> Result<mpsc::Receiver<Response>> {
@@ -50,7 +56,7 @@ impl<T: Transport> TransactionUser<T> {
let (tx, rx) = channel(4);
- self.transport.send(request).await?;
+ self.transport.send(Message::Request(request)).await?;
self.pending_requests.write().await.insert(cseq, tx);
Ok(rx)
diff --git a/src/transport/mod.rs b/src/transport/mod.rs
index cf49d31..b4c512b 100644
--- a/src/transport/mod.rs
+++ b/src/transport/mod.rs
@@ -1,4 +1,4 @@
-use crate::encoding::{request::Request, response::Response};
+use crate::encoding::Message;
use anyhow::Result;
pub mod tcp;
@@ -6,6 +6,6 @@ pub mod udp;
#[allow(async_fn_in_trait)]
pub trait Transport {
- async fn recv(&self) -> Result<Response>;
- async fn send(&self, request: Request) -> Result<()>;
+ async fn recv(&self) -> Result<Message>;
+ async fn send(&self, message: Message) -> Result<()>;
}
diff --git a/src/transport/tcp.rs b/src/transport/tcp.rs
index 8c0a024..efe433d 100644
--- a/src/transport/tcp.rs
+++ b/src/transport/tcp.rs
@@ -1,53 +1,44 @@
use super::Transport;
-use crate::encoding::{request::Request, response::Response};
-use anyhow::{anyhow, Result};
+use crate::encoding::Message;
+use anyhow::Result;
use log::debug;
use std::str::FromStr;
use tokio::{
io::{AsyncBufReadExt, AsyncWriteExt, BufReader, BufWriter},
- net::{tcp::OwnedWriteHalf, TcpStream},
- sync::{
- mpsc::{channel, Receiver},
- Mutex,
+ net::{
+ tcp::{OwnedReadHalf, OwnedWriteHalf},
+ TcpStream,
},
+ sync::Mutex,
};
pub struct TcpTransport {
write: Mutex<BufWriter<OwnedWriteHalf>>,
- read: Mutex<Receiver<Response>>,
+ read: Mutex<BufReader<OwnedReadHalf>>,
}
impl TcpTransport {
pub async fn new(stream: TcpStream) -> Result<Self> {
let (read, write) = stream.into_split();
-
- let (tx, rx) = channel(16);
-
- tokio::task::spawn(async move {
- let mut sock = BufReader::new(read);
- let mut message = String::new();
- loop {
- while !message.ends_with("\r\n\r\n") {
- sock.read_line(&mut message).await.unwrap();
- }
- let mesg = Response::from_str(message.trim()).unwrap();
- debug!("<- {mesg}");
- tx.send(mesg).await.unwrap();
- message.clear();
- }
- });
-
Ok(Self {
write: BufWriter::new(write).into(),
- read: rx.into(),
+ read: BufReader::new(read).into(),
})
}
}
+
impl Transport for TcpTransport {
- async fn recv(&self) -> Result<Response> {
- self.read.lock().await.recv().await.ok_or(anyhow!("end"))
+ async fn recv(&self) -> Result<Message> {
+ let mut g = self.read.lock().await;
+ let mut message = String::new();
+ while !message.ends_with("\r\n\r\n") {
+ g.read_line(&mut message).await?;
+ }
+ let mesg = Message::from_str(message.trim())?;
+ debug!("<- {mesg}");
+ Ok(mesg)
}
- async fn send(&self, request: Request) -> Result<()> {
+ async fn send(&self, request: Message) -> Result<()> {
debug!("-> {request}");
let mut g = self.write.lock().await;
g.write_all(format!("{request}").as_bytes()).await?;
diff --git a/src/transport/udp.rs b/src/transport/udp.rs
index c86ce50..391e2b3 100644
--- a/src/transport/udp.rs
+++ b/src/transport/udp.rs
@@ -1,5 +1,5 @@
use super::Transport;
-use crate::encoding::{request::Request, response::Response};
+use crate::encoding::Message;
use anyhow::Result;
use log::debug;
use std::str::FromStr;
@@ -15,14 +15,14 @@ impl UdpTransport {
}
}
impl Transport for UdpTransport {
- async fn recv(&self) -> Result<Response> {
+ async fn recv(&self) -> Result<Message> {
let mut buf = [0; 1024];
let size = self.sock.recv(&mut buf).await?;
let message = String::from_utf8(buf[..size].to_vec())?;
debug!("{message}");
- Response::from_str(message.trim_end())
+ Message::from_str(message.trim_end())
}
- async fn send(&self, request: Request) -> Result<()> {
+ async fn send(&self, request: Message) -> Result<()> {
debug!("-> {request}");
self.sock.send(format!("{request}").as_bytes()).await?;
Ok(())