aboutsummaryrefslogtreecommitdiff
path: root/sip/src
diff options
context:
space:
mode:
authormetamuffin <metamuffin@disroot.org>2024-11-19 02:08:52 +0100
committermetamuffin <metamuffin@disroot.org>2024-11-19 02:08:52 +0100
commitcbc111f90b5facc1f2a9dd79ced216279d6260af (patch)
treefa5a1d2d67874413d8e66673825c6789e8cc0945 /sip/src
parent2d9a31244eab6d3a9871369d3148de253e902d36 (diff)
downloadsip-rs-cbc111f90b5facc1f2a9dd79ced216279d6260af.tar
sip-rs-cbc111f90b5facc1f2a9dd79ced216279d6260af.tar.bz2
sip-rs-cbc111f90b5facc1f2a9dd79ced216279d6260af.tar.zst
move files + rtp parser
Diffstat (limited to 'sip/src')
-rw-r--r--sip/src/encoding/headermap.rs60
-rw-r--r--sip/src/encoding/headers.rs182
-rw-r--r--sip/src/encoding/method.rs39
-rw-r--r--sip/src/encoding/mod.rs46
-rw-r--r--sip/src/encoding/request.rs57
-rw-r--r--sip/src/encoding/response.rs53
-rw-r--r--sip/src/encoding/status.rs63
-rw-r--r--sip/src/encoding/uri.rs55
-rw-r--r--sip/src/lib.rs4
-rw-r--r--sip/src/transaction/auth.rs51
-rw-r--r--sip/src/transaction/mod.rs87
-rw-r--r--sip/src/transport/mod.rs11
-rw-r--r--sip/src/transport/tcp.rs48
-rw-r--r--sip/src/transport/udp.rs35
14 files changed, 791 insertions, 0 deletions
diff --git a/sip/src/encoding/headermap.rs b/sip/src/encoding/headermap.rs
new file mode 100644
index 0000000..01e1962
--- /dev/null
+++ b/sip/src/encoding/headermap.rs
@@ -0,0 +1,60 @@
+use super::headers::Header;
+use anyhow::{anyhow, Result};
+use std::fmt::Display;
+
+#[derive(Debug, Clone)]
+pub struct HeaderMap(pub Vec<(String, String)>);
+
+impl HeaderMap {
+ pub fn new() -> Self {
+ Self(vec![])
+ }
+ pub fn add<H: Header>(mut self, h: H) -> Self {
+ self.0.push((H::NAME.to_string(), format!("{h}")));
+ self
+ }
+ pub fn insert<H: Header>(&mut self, h: H) {
+ self.0.push((H::NAME.to_string(), format!("{h}")));
+ }
+ pub fn get_raw(&self, name: &str) -> Option<&str> {
+ self.0
+ .iter()
+ .find(|(k, _)| k.eq_ignore_ascii_case(name))
+ .map(|(_, v)| v.as_str())
+ }
+ pub fn get<H: Header>(&self) -> Option<Result<H>> {
+ self.get_raw(H::NAME).map(H::from_str)
+ }
+ pub fn get_res<H: Header>(&self) -> Result<H> {
+ self.get().ok_or(anyhow!("{} header missing", H::NAME))?
+ }
+ pub fn insert_raw(&mut self, key: String, value: String) {
+ self.0.push((key, value))
+ }
+}
+
+impl Display for HeaderMap {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ for (k, v) in &self.0 {
+ write!(f, "{k}: {v}\r\n")?;
+ }
+ Ok(())
+ }
+}
+impl HeaderMap {
+ pub fn parse<'a>(lines: &mut impl Iterator<Item = &'a str>) -> Result<Self> {
+ let mut headers = HeaderMap::new();
+ for line in lines {
+ // TODO multiline values
+ let (key, value) = line.split_once(":").ok_or(anyhow!("header malformed"))?;
+ headers.insert_raw(key.trim().to_string(), value.trim().to_string())
+ }
+ Ok(headers)
+ }
+}
+
+impl FromIterator<(String, String)> for HeaderMap {
+ fn from_iter<T: IntoIterator<Item = (String, String)>>(iter: T) -> Self {
+ Self(Vec::from_iter(iter))
+ }
+}
diff --git a/sip/src/encoding/headers.rs b/sip/src/encoding/headers.rs
new file mode 100644
index 0000000..e880739
--- /dev/null
+++ b/sip/src/encoding/headers.rs
@@ -0,0 +1,182 @@
+use super::{headermap::HeaderMap, method::Method, uri::Uri};
+use anyhow::{anyhow, bail, Result};
+use std::{fmt::Display, str::FromStr};
+
+macro_rules! header {
+ ($hname:literal, struct $name:ident($type:ty)) => {
+ #[derive(Debug)]
+ pub struct $name(pub $type);
+ impl Header for $name {
+ const NAME: &'static str = $hname;
+ }
+ impl Display for $name {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ write!(f, "{}", self.0)
+ }
+ }
+ impl FromStr for $name {
+ type Err = anyhow::Error;
+ fn from_str(s: &str) -> Result<Self, Self::Err> {
+ Ok($name(<$type>::from_str(s)?))
+ }
+ }
+ };
+}
+
+pub trait Header: FromStr<Err = anyhow::Error> + Display {
+ const NAME: &'static str;
+}
+
+header!("Content-Length", struct ContentLength(usize));
+header!("Content-Type", struct ContentType(String));
+header!("Call-ID", struct CallID(String));
+header!("Via", struct Via(String));
+header!("Max-Forwards", struct MaxForwards(usize));
+header!("From", struct From(String));
+header!("To", struct To(String));
+header!("User-Agent", struct UserAgent(String));
+header!("Allow", struct Allow(String));
+
+#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)]
+pub struct CSeq(pub u32, pub Method);
+
+impl Header for CSeq {
+ const NAME: &'static str = "CSeq";
+}
+impl Display for CSeq {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ write!(f, "{} {}", self.0, self.1)
+ }
+}
+impl FromStr for CSeq {
+ type Err = anyhow::Error;
+ fn from_str(s: &str) -> Result<Self, Self::Err> {
+ let (seq, method) = s.split_once(" ").ok_or(anyhow!("method missing"))?;
+ Ok(CSeq(seq.parse()?, method.parse()?))
+ }
+}
+
+#[derive(Debug)]
+pub struct WWWAuthenticate {
+ pub realm: String,
+ pub nonce: String,
+}
+impl Header for WWWAuthenticate {
+ const NAME: &'static str = "WWW-Authenticate";
+}
+impl Display for WWWAuthenticate {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ write!(f, "Digest realm={:?}, nonce={:?}", self.realm, self.nonce)
+ }
+}
+impl FromStr for WWWAuthenticate {
+ type Err = anyhow::Error;
+ fn from_str(s: &str) -> Result<Self, Self::Err> {
+ // TODO this is totally wrong
+ let kvs = s
+ .strip_prefix("Digest ")
+ .ok_or(anyhow!("type not digest"))?
+ .split(",")
+ .map(|e| {
+ let Some((k, v)) = e.split_once("=") else {
+ bail!("not a KV-pair")
+ };
+ Ok((k.trim().to_string(), v.trim().to_string()))
+ })
+ .try_collect::<HeaderMap>()?;
+ Ok(WWWAuthenticate {
+ realm: unquote(
+ &kvs.get_raw("realm")
+ .ok_or(anyhow!("realm missing"))?
+ .to_string(),
+ )?,
+ nonce: unquote(
+ &kvs.get_raw("nonce")
+ .ok_or(anyhow!("nonce missing"))?
+ .to_string(),
+ )?,
+ })
+ }
+}
+
+#[derive(Debug, Clone)]
+pub struct Authorization {
+ pub username: String,
+ pub realm: String,
+ pub uri: String,
+ pub nonce: String,
+ pub response: String,
+}
+impl Header for Authorization {
+ const NAME: &'static str = "Authorization";
+}
+impl Display for Authorization {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ let Self {
+ username,
+ realm,
+ nonce,
+ uri,
+ response,
+ } = self;
+ write!(
+ f,
+ "Digest username={username:?},\r\n realm={realm:?},\r\n nonce={nonce:?},\r\n uri={uri:?},\r\n response={response:?},\r\n algorithm=MD5"
+ )
+ }
+}
+impl FromStr for Authorization {
+ type Err = anyhow::Error;
+ fn from_str(_s: &str) -> Result<Self, Self::Err> {
+ todo!()
+ }
+}
+
+pub fn unquote(v: &str) -> Result<String> {
+ Ok(v.strip_prefix("\"")
+ .ok_or(anyhow!("start quote missing"))?
+ .strip_suffix("\"")
+ .ok_or(anyhow!("end quote missing"))?
+ .to_string())
+}
+
+#[derive(Debug)]
+pub struct Contact {
+ pub display_name: Option<String>,
+ pub uri: Uri,
+ pub params: String,
+}
+
+impl Header for Contact {
+ const NAME: &'static str = "Contact";
+}
+impl Display for Contact {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ let Self {
+ display_name,
+ uri,
+ params,
+ } = self;
+ if let Some(display_name) = display_name {
+ write!(f, "{display_name} <{uri}>{params}")
+ } else {
+ write!(f, "<{uri}>{params}")
+ }
+ }
+}
+impl FromStr for Contact {
+ type Err = anyhow::Error;
+ fn from_str(s: &str) -> Result<Self, Self::Err> {
+ let (display_name, rest) = s.split_once("<").ok_or(anyhow!("malformed contact"))?;
+ let (uri, params) = rest.split_once(">;").ok_or(anyhow!("malformed contact"))?;
+ Ok(Self {
+ display_name: if display_name.is_empty() {
+ None
+ } else {
+ Some(display_name.to_string())
+ },
+ params: params.to_string(),
+ uri: Uri::from_str(uri)?,
+ })
+ }
+}
diff --git a/sip/src/encoding/method.rs b/sip/src/encoding/method.rs
new file mode 100644
index 0000000..6d38cab
--- /dev/null
+++ b/sip/src/encoding/method.rs
@@ -0,0 +1,39 @@
+use anyhow::bail;
+use std::{fmt::Display, str::FromStr};
+
+#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
+pub enum Method {
+ Register,
+ Invite,
+ Ack,
+ Options,
+ Cancel,
+ Bye,
+}
+
+impl Display for Method {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ f.write_str(match self {
+ Method::Register => "REGISTER",
+ Method::Invite => "INVITE",
+ Method::Ack => "ACK",
+ Method::Options => "OPTIONS",
+ Method::Cancel => "CANCEL",
+ Method::Bye => "BYE",
+ })
+ }
+}
+impl FromStr for Method {
+ type Err = anyhow::Error;
+ fn from_str(s: &str) -> Result<Self, Self::Err> {
+ Ok(match s {
+ "REGISTER" => Method::Register,
+ "INVITE" => Method::Invite,
+ "ACK" => Method::Ack,
+ "OPTIONS" => Method::Options,
+ "CANCEL" => Method::Cancel,
+ "BYE" => Method::Bye,
+ _ => bail!("unknown method"),
+ })
+ }
+}
diff --git a/sip/src/encoding/mod.rs b/sip/src/encoding/mod.rs
new file mode 100644
index 0000000..816aa01
--- /dev/null
+++ b/sip/src/encoding/mod.rs
@@ -0,0 +1,46 @@
+use std::{fmt::Display, str::FromStr};
+
+use request::Request;
+use response::Response;
+
+pub mod headermap;
+pub mod headers;
+pub mod method;
+pub mod request;
+pub mod response;
+pub mod status;
+pub mod uri;
+
+#[derive(Debug, Clone)]
+pub enum Message {
+ Request(Request),
+ Response(Response),
+}
+
+impl Display for Message {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ match self {
+ Message::Request(r) => write!(f, "{r}"),
+ Message::Response(r) => write!(f, "{r}"),
+ }
+ }
+}
+impl FromStr for Message {
+ type Err = anyhow::Error;
+ fn from_str(s: &str) -> Result<Self, Self::Err> {
+ if s.starts_with("SIP/") {
+ Response::from_str(s).map(Message::Response)
+ } else {
+ Request::from_str(s).map(Message::Request)
+ }
+ }
+}
+
+impl Message {
+ pub fn body_mut(&mut self) -> &mut String {
+ match self {
+ Message::Request(r) => &mut r.body,
+ Message::Response(r) => &mut r.body,
+ }
+ }
+}
diff --git a/sip/src/encoding/request.rs b/sip/src/encoding/request.rs
new file mode 100644
index 0000000..ab41b7c
--- /dev/null
+++ b/sip/src/encoding/request.rs
@@ -0,0 +1,57 @@
+use super::{headermap::HeaderMap, method::Method, uri::Uri};
+use anyhow::{anyhow, bail};
+use std::{fmt::Display, str::FromStr};
+
+#[derive(Debug, Clone)]
+pub struct Request {
+ pub method: Method,
+ pub uri: Uri,
+ pub headers: HeaderMap,
+ pub body: String,
+}
+
+impl Display for Request {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ let Self {
+ headers,
+ method,
+ uri,
+ ..
+ } = self;
+ write!(f, "{method} {uri} SIP/2.0\r\n")?;
+ write!(f, "{headers}\r\n")?;
+ Ok(())
+ }
+}
+impl FromStr for Request {
+ type Err = anyhow::Error;
+ fn from_str(s: &str) -> Result<Self, Self::Err> {
+ let mut lines = s.lines();
+ let statusline = lines.next().ok_or(anyhow!("status line missing"))?;
+ let (method, rest) = statusline
+ .split_once(" ")
+ .ok_or(anyhow!("status line malformed"))?;
+ let (uri, sipver) = rest
+ .split_once(" ")
+ .ok_or(anyhow!("status line malformed"))?;
+
+ let Some(ver) = sipver.strip_prefix("SIP/") else {
+ bail!("sip version malformed");
+ };
+ if ver != "2.0" {
+ bail!("sip version {ver:?} is not supported");
+ }
+
+ let uri = Uri::from_str(uri)?;
+
+ let headers = HeaderMap::parse(&mut lines)?;
+ let method = Method::from_str(method)?;
+
+ Ok(Self {
+ body: String::new(),
+ headers,
+ method,
+ uri,
+ })
+ }
+}
diff --git a/sip/src/encoding/response.rs b/sip/src/encoding/response.rs
new file mode 100644
index 0000000..0b7996c
--- /dev/null
+++ b/sip/src/encoding/response.rs
@@ -0,0 +1,53 @@
+use super::{headermap::HeaderMap, status::Status};
+use anyhow::{anyhow, bail, Context};
+use std::{fmt::Display, str::FromStr};
+
+#[derive(Debug, Clone)]
+pub struct Response {
+ pub status: Status,
+ pub headers: HeaderMap,
+ pub body: String,
+}
+
+impl FromStr for Response {
+ type Err = anyhow::Error;
+ fn from_str(s: &str) -> Result<Self, Self::Err> {
+ let mut lines = s.lines();
+ let statusline = lines.next().ok_or(anyhow!("status line missing"))?;
+ let (sipver, rest) = statusline
+ .split_once(" ")
+ .ok_or(anyhow!("status line malformed"))?;
+ let (code, _status_str) = rest
+ .split_once(" ")
+ .ok_or(anyhow!("status line malformed"))?;
+ let code = u16::from_str(code).context("status code")?;
+
+ let Some(ver) = sipver.strip_prefix("SIP/") else {
+ bail!("sip version malformed");
+ };
+ if ver != "2.0" {
+ bail!("sip version {ver:?} is not supported");
+ }
+
+ let headers = HeaderMap::parse(&mut lines)?;
+
+ let status = Status::from_code(code);
+ Ok(Self {
+ status,
+ headers,
+ body: String::new(),
+ })
+ }
+}
+impl Display for Response {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ let Self {
+ status,
+ headers,
+ body,
+ } = self;
+ write!(f, "SIP/2.0 {} {status:?}\r\n", status.to_code())?;
+ write!(f, "{headers}\r\n{body}")?;
+ Ok(())
+ }
+}
diff --git a/sip/src/encoding/status.rs b/sip/src/encoding/status.rs
new file mode 100644
index 0000000..61b2d2a
--- /dev/null
+++ b/sip/src/encoding/status.rs
@@ -0,0 +1,63 @@
+macro_rules! status_enum {
+ ($v:vis enum $name:ident { $($variant:ident = $value:literal),*, }) => {
+ #[derive(Debug, Clone, Eq, PartialEq, Hash)]
+ $v enum $name { $($variant),*, Other(u16) }
+ impl $name { pub fn from_code(c: u16) -> Self { match c { $($value => Self::$variant),*, x => Self::Other(x) } } }
+ impl $name { pub fn to_code(&self) -> u16 { match self { $(Self::$variant => $value),*, Self::Other(x) => *x } } }
+ };
+}
+
+status_enum!(
+ pub enum Status {
+ Trying = 100,
+ Ringing = 180,
+ CallIsBeingForwarded = 181,
+ Queued = 182,
+ SessionProgress = 183,
+ Ok = 200,
+ MultipleChoices = 300,
+ MovedPermanently = 301,
+ MovedTemporarily = 302,
+ UseProxy = 305,
+ AlternativeService = 380,
+ BadRequest = 400,
+ Unauthorized = 401,
+ PaymentRequired = 402,
+ Forbidden = 403,
+ NotFound = 404,
+ MethodNotAllowed = 405,
+ NotAcceptable = 406,
+ ProxyAuthenticationRequired = 407,
+ RequestTimeout = 408,
+ Gone = 410,
+ RequestEntityTooLarge = 413,
+ RequestURITooLarge = 414,
+ UnsupportedMediaType = 415,
+ UnsupportedURIScheme = 416,
+ BadExtension = 420,
+ ExtensionRequired = 421,
+ IntervalTooBrief = 423,
+ TemporarilyNotAvailable = 480,
+ CallLegTransactionDoesNotExist = 481,
+ LoopDetected = 482,
+ TooManyHops = 483,
+ AddressIncomplete = 484,
+ Ambiguous = 485,
+ BusyHere = 486,
+ RequestTerminated = 487,
+ NotAcceptableHere = 488,
+ RequestPending = 491,
+ Undecipherable = 493,
+ InternalServerError = 500,
+ NotImplemented = 501,
+ BadGateway = 502,
+ ServiceUnavailable = 503,
+ ServerTimeout = 504,
+ SIPVersionNotSupported = 505,
+ MessageTooLarge = 513,
+ BusyEverywhere = 600,
+ Decline = 603,
+ DoesNotExistAnywhere = 604,
+ GlobalNotAcceptable = 606,
+ }
+);
diff --git a/sip/src/encoding/uri.rs b/sip/src/encoding/uri.rs
new file mode 100644
index 0000000..b1a1282
--- /dev/null
+++ b/sip/src/encoding/uri.rs
@@ -0,0 +1,55 @@
+use anyhow::anyhow;
+use std::{fmt::Display, str::FromStr};
+
+#[derive(Debug, Clone)]
+pub struct Uri {
+ pub protocol: String,
+ pub localpart: Option<String>,
+ pub addr: String,
+ pub params: String,
+}
+
+impl Display for Uri {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ let Self {
+ protocol,
+ localpart,
+ addr,
+ params,
+ } = self;
+ write!(
+ f,
+ "{protocol}:{}{addr}{}",
+ if let Some(localpart) = localpart {
+ format!("{localpart}@")
+ } else {
+ "".to_string()
+ },
+ if params.is_empty() {
+ "".to_string()
+ } else {
+ format!(";{params}")
+ }
+ )?;
+ Ok(())
+ }
+}
+impl FromStr for Uri {
+ type Err = anyhow::Error;
+ fn from_str(s: &str) -> Result<Self, Self::Err> {
+ let (pr, s) = s.split_once(":").ok_or(anyhow!("protocol sep"))?;
+ let (lp, s) = s.split_once("@").unwrap_or(("", s));
+ let (addr, params) = s.split_once(";").unwrap_or((s, ""));
+
+ Ok(Self {
+ addr: addr.to_owned(),
+ localpart: if lp.is_empty() {
+ None
+ } else {
+ Some(lp.to_string())
+ },
+ params: params.to_string(),
+ protocol: pr.to_string(),
+ })
+ }
+}
diff --git a/sip/src/lib.rs b/sip/src/lib.rs
new file mode 100644
index 0000000..6c6cc3b
--- /dev/null
+++ b/sip/src/lib.rs
@@ -0,0 +1,4 @@
+#![feature(iterator_try_collect)]
+pub mod encoding;
+pub mod transport;
+pub mod transaction;
diff --git a/sip/src/transaction/auth.rs b/sip/src/transaction/auth.rs
new file mode 100644
index 0000000..4c46f9b
--- /dev/null
+++ b/sip/src/transaction/auth.rs
@@ -0,0 +1,51 @@
+use crate::encoding::{
+ headers::{Authorization, WWWAuthenticate},
+ method::Method,
+ request::Request,
+ response::Response,
+ uri::Uri,
+};
+use anyhow::Result;
+
+impl Authorization {
+ pub fn construct(
+ request: &Request,
+ failed_response: &Response,
+ username: &str,
+ password: &str,
+ ) -> Result<Authorization> {
+ let challenge = failed_response.headers.get_res::<WWWAuthenticate>()?;
+
+ Ok(Authorization {
+ response: response_digest(
+ username.to_string(),
+ challenge.realm.clone(),
+ password.to_string(),
+ request.method,
+ challenge.nonce.clone(),
+ request.uri.clone(),
+ ),
+ nonce: challenge.nonce,
+ realm: challenge.realm,
+ uri: request.uri.to_string(),
+ username: username.to_string(),
+ })
+ }
+}
+
+fn response_digest(
+ username: String,
+ realm: String,
+ password: String,
+ method: Method,
+ nonce: String,
+ uri: Uri,
+) -> String {
+ let h = |s: String| hex::encode(md5::compute(s.as_bytes()).0);
+ let kd = |secret, data| h(format!("{secret}:{data}"));
+
+ let a1 = format!("{username}:{realm}:{password}");
+ let a2 = format!("{method}:{uri}");
+ let response_digest = kd(h(a1), format!("{nonce}:{}", h(a2)));
+ return response_digest;
+}
diff --git a/sip/src/transaction/mod.rs b/sip/src/transaction/mod.rs
new file mode 100644
index 0000000..3368c47
--- /dev/null
+++ b/sip/src/transaction/mod.rs
@@ -0,0 +1,87 @@
+pub mod auth;
+
+use crate::{
+ encoding::{
+ headers::{CSeq, CallID, ContentLength},
+ request::Request,
+ response::Response,
+ Message,
+ },
+ transport::Transport,
+};
+use anyhow::{anyhow, Result};
+use std::{
+ collections::HashMap,
+ sync::atomic::{AtomicU32, Ordering},
+};
+use tokio::sync::{
+ mpsc::{self, channel},
+ RwLock,
+};
+
+pub struct TransactionUser<T> {
+ transport: T,
+ sequence: AtomicU32,
+ pending_requests: RwLock<HashMap<CSeq, mpsc::Sender<Response>>>,
+}
+
+impl<T: Transport> TransactionUser<T> {
+ pub fn new(transport: T) -> Self {
+ Self {
+ sequence: 0.into(),
+ pending_requests: Default::default(),
+ transport,
+ }
+ }
+
+ pub async fn process_incoming(&self) -> Result<Request> {
+ loop {
+ let mesg = self.transport.recv().await?;
+ match mesg {
+ Message::Request(req) => break Ok(req),
+ Message::Response(resp) => {
+ let cseq = resp
+ .headers
+ .get()
+ .ok_or(anyhow!("response cseq missing"))??;
+ self.pending_requests
+ .write()
+ .await
+ .get_mut(&cseq)
+ .ok_or(anyhow!("message was not requested"))?
+ .send(resp)
+ .await?;
+ }
+ }
+ }
+ }
+ pub async fn respond(&self, req: &Request, mut resp: Response) -> Result<()> {
+ resp.headers.insert(
+ req.headers
+ .get::<CSeq>()
+ .ok_or(anyhow!("cseq is mandatory"))??,
+ );
+ resp.headers.insert(
+ req.headers
+ .get::<CallID>()
+ .ok_or(anyhow!("call-id is mandatory"))??,
+ );
+ resp.headers.insert(ContentLength(resp.body.len()));
+ self.transport.send(Message::Response(resp)).await?;
+ Ok(())
+ }
+
+ pub async fn transact(&self, mut request: Request) -> Result<mpsc::Receiver<Response>> {
+ let seq = self.sequence.fetch_add(1, Ordering::Relaxed);
+ let cseq = CSeq(seq, request.method);
+ request.headers.insert(cseq);
+ request.headers.insert(ContentLength(request.body.len()));
+
+ let (tx, rx) = channel(4);
+
+ self.transport.send(Message::Request(request)).await?;
+ self.pending_requests.write().await.insert(cseq, tx);
+
+ Ok(rx)
+ }
+}
diff --git a/sip/src/transport/mod.rs b/sip/src/transport/mod.rs
new file mode 100644
index 0000000..b4c512b
--- /dev/null
+++ b/sip/src/transport/mod.rs
@@ -0,0 +1,11 @@
+use crate::encoding::Message;
+use anyhow::Result;
+
+pub mod tcp;
+pub mod udp;
+
+#[allow(async_fn_in_trait)]
+pub trait Transport {
+ async fn recv(&self) -> Result<Message>;
+ async fn send(&self, message: Message) -> Result<()>;
+}
diff --git a/sip/src/transport/tcp.rs b/sip/src/transport/tcp.rs
new file mode 100644
index 0000000..efe433d
--- /dev/null
+++ b/sip/src/transport/tcp.rs
@@ -0,0 +1,48 @@
+use super::Transport;
+use crate::encoding::Message;
+use anyhow::Result;
+use log::debug;
+use std::str::FromStr;
+use tokio::{
+ io::{AsyncBufReadExt, AsyncWriteExt, BufReader, BufWriter},
+ net::{
+ tcp::{OwnedReadHalf, OwnedWriteHalf},
+ TcpStream,
+ },
+ sync::Mutex,
+};
+
+pub struct TcpTransport {
+ write: Mutex<BufWriter<OwnedWriteHalf>>,
+ read: Mutex<BufReader<OwnedReadHalf>>,
+}
+
+impl TcpTransport {
+ pub async fn new(stream: TcpStream) -> Result<Self> {
+ let (read, write) = stream.into_split();
+ Ok(Self {
+ write: BufWriter::new(write).into(),
+ read: BufReader::new(read).into(),
+ })
+ }
+}
+
+impl Transport for TcpTransport {
+ async fn recv(&self) -> Result<Message> {
+ let mut g = self.read.lock().await;
+ let mut message = String::new();
+ while !message.ends_with("\r\n\r\n") {
+ g.read_line(&mut message).await?;
+ }
+ let mesg = Message::from_str(message.trim())?;
+ debug!("<- {mesg}");
+ Ok(mesg)
+ }
+ async fn send(&self, request: Message) -> Result<()> {
+ debug!("-> {request}");
+ let mut g = self.write.lock().await;
+ g.write_all(format!("{request}").as_bytes()).await?;
+ g.flush().await?;
+ Ok(())
+ }
+}
diff --git a/sip/src/transport/udp.rs b/sip/src/transport/udp.rs
new file mode 100644
index 0000000..c0d7829
--- /dev/null
+++ b/sip/src/transport/udp.rs
@@ -0,0 +1,35 @@
+use super::Transport;
+use crate::encoding::Message;
+use anyhow::{anyhow, Result};
+use log::debug;
+use std::str::FromStr;
+use tokio::net::UdpSocket;
+
+pub struct UdpTransport {
+ sock: UdpSocket,
+}
+
+impl UdpTransport {
+ pub async fn new(sock: UdpSocket) -> Result<Self> {
+ Ok(Self { sock })
+ }
+}
+impl Transport for UdpTransport {
+ async fn recv(&self) -> Result<Message> {
+ let mut buf = [0; 1024];
+ let size = self.sock.recv(&mut buf).await?;
+ let message = String::from_utf8(buf[..size].to_vec())?;
+ let (head, body) = message
+ .split_once("\r\n\r\n")
+ .ok_or(anyhow!("header end missing"))?;
+ debug!("<- {head}\n\n{body}");
+ let mut mesg = Message::from_str(head.trim_end())?;
+ *mesg.body_mut() = body.to_string();
+ Ok(mesg)
+ }
+ async fn send(&self, request: Message) -> Result<()> {
+ debug!("-> {request}");
+ self.sock.send(format!("{request}").as_bytes()).await?;
+ Ok(())
+ }
+}