diff options
author | metamuffin <metamuffin@disroot.org> | 2024-07-05 02:27:48 +0200 |
---|---|---|
committer | metamuffin <metamuffin@disroot.org> | 2024-07-05 02:27:48 +0200 |
commit | 384ddd782b989218ceb55b7147aa8698425d1464 (patch) | |
tree | 5bfb61317db2e9260129b53b8444f3c79a0bc708 /src | |
parent | 3f80205783bcf6a2ed682f6f21e5b1877d597328 (diff) | |
download | sip-rs-384ddd782b989218ceb55b7147aa8698425d1464.tar sip-rs-384ddd782b989218ceb55b7147aa8698425d1464.tar.bz2 sip-rs-384ddd782b989218ceb55b7147aa8698425d1464.tar.zst |
simpletransaction works
Diffstat (limited to 'src')
-rw-r--r-- | src/encoding/headermap.rs | 18 | ||||
-rw-r--r-- | src/encoding/headers.rs | 71 | ||||
-rw-r--r-- | src/encoding/method.rs | 18 | ||||
-rw-r--r-- | src/encoding/request.rs | 1 | ||||
-rw-r--r-- | src/encoding/uri.rs | 1 | ||||
-rw-r--r-- | src/lib.rs | 2 | ||||
-rw-r--r-- | src/transaction/mod.rs | 58 | ||||
-rw-r--r-- | src/transport/client.rs | 0 | ||||
-rw-r--r-- | src/transport/mod.rs | 11 | ||||
-rw-r--r-- | src/transport/tcp.rs | 58 |
10 files changed, 232 insertions, 6 deletions
diff --git a/src/encoding/headermap.rs b/src/encoding/headermap.rs index 313d8b5..2f9f097 100644 --- a/src/encoding/headermap.rs +++ b/src/encoding/headermap.rs @@ -13,11 +13,17 @@ impl HeaderMap { self.0.push((H::NAME.to_string(), format!("{h}"))); self } - pub fn get<H: Header>(&self) -> Option<Result<H>> { + pub fn insert<H: Header>(&mut self, h: H) { + self.0.push((H::NAME.to_string(), format!("{h}"))); + } + pub fn get_raw(&self, name: &str) -> Option<&str> { self.0 .iter() - .find(|(k, _)| k == H::NAME) - .map(|(_, v)| H::from_str(v)) + .find(|(k, _)| k.eq_ignore_ascii_case(name)) + .map(|(_, v)| v.as_str()) + } + pub fn get<H: Header>(&self) -> Option<Result<H>> { + self.get_raw(H::NAME).map(H::from_str) } pub fn insert_raw(&mut self, key: String, value: String) { self.0.push((key, value)) @@ -32,3 +38,9 @@ impl Display for HeaderMap { Ok(()) } } + +impl FromIterator<(String, String)> for HeaderMap { + fn from_iter<T: IntoIterator<Item = (String, String)>>(iter: T) -> Self { + Self(Vec::from_iter(iter)) + } +} diff --git a/src/encoding/headers.rs b/src/encoding/headers.rs index d837ee5..6bffc7e 100644 --- a/src/encoding/headers.rs +++ b/src/encoding/headers.rs @@ -1,7 +1,10 @@ +use super::{headermap::HeaderMap, method::Method}; +use anyhow::{anyhow, bail}; use std::{fmt::Display, str::FromStr}; macro_rules! header { ($hname:literal, struct $name:ident($type:ty)) => { + #[derive(Debug)] pub struct $name(pub $type); impl Header for $name { const NAME: &'static str = $hname; @@ -26,7 +29,6 @@ pub trait Header: FromStr<Err = anyhow::Error> + Display { header!("Content-Length", struct ContentLength(usize)); header!("Call-ID", struct CallID(String)); -header!("CSeq", struct CSeq(String)); header!("Via", struct Via(String)); header!("Contact", struct Contact(String)); header!("Max-Forwards", struct MaxForwards(usize)); @@ -34,3 +36,70 @@ header!("From", struct From(String)); header!("To", struct To(String)); header!("User-Agent", struct UserAgent(String)); header!("Allow", struct Allow(String)); + +#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)] +pub struct CSeq(pub u32, pub Method); + +impl Header for CSeq { + const NAME: &'static str = "CSeq"; +} +impl Display for CSeq { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{} {}", self.0, self.1) + } +} +impl FromStr for CSeq { + type Err = anyhow::Error; + fn from_str(s: &str) -> Result<Self, Self::Err> { + let (seq, method) = s.split_once(" ").ok_or(anyhow!("method missing"))?; + Ok(CSeq(seq.parse()?, method.parse()?)) + } +} + +#[derive(Debug)] +pub struct WWWAuthenticate { + pub realm: String, + pub nonce: String, +} +impl Header for WWWAuthenticate { + const NAME: &'static str = "WWW-Authenticate"; +} +impl Display for WWWAuthenticate { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "Digest realm={:?}, nonce={:?}", self.realm, self.nonce) + } +} +impl FromStr for WWWAuthenticate { + type Err = anyhow::Error; + fn from_str(s: &str) -> Result<Self, Self::Err> { + // TODO this is totally wrong + let kvs = s + .strip_prefix("Digest ") + .ok_or(anyhow!("type not digest"))? + .split(", ") + .map(|e| { + let Some((k, v)) = e.split_once("=") else { + bail!("not a KV-pair") + }; + Ok(( + k.to_string(), + v.strip_prefix("\"") + .ok_or(anyhow!("start quote missing"))? + .strip_suffix("\"") + .ok_or(anyhow!("end quote missing"))? + .to_string(), + )) + }) + .try_collect::<HeaderMap>()?; + Ok(WWWAuthenticate { + realm: kvs + .get_raw("realm") + .ok_or(anyhow!("realm missing"))? + .to_string(), + nonce: kvs + .get_raw("nonce") + .ok_or(anyhow!("nonce missing"))? + .to_string(), + }) + } +} diff --git a/src/encoding/method.rs b/src/encoding/method.rs index 5f8110a..73829b0 100644 --- a/src/encoding/method.rs +++ b/src/encoding/method.rs @@ -1,5 +1,7 @@ -use std::fmt::Display; +use anyhow::bail; +use std::{fmt::Display, str::FromStr}; +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] pub enum Method { Register, Invite, @@ -21,3 +23,17 @@ impl Display for Method { }) } } +impl FromStr for Method { + type Err = anyhow::Error; + fn from_str(s: &str) -> Result<Self, Self::Err> { + Ok(match s { + "REGISTER" => Method::Register, + "INVITE" => Method::Invite, + "ACK" => Method::Ack, + "OPTION" => Method::Option, + "CANCEL" => Method::Cancel, + "BYE" => Method::Bye, + _ => bail!("unknown method"), + }) + } +} diff --git a/src/encoding/request.rs b/src/encoding/request.rs index 03b93c9..124522c 100644 --- a/src/encoding/request.rs +++ b/src/encoding/request.rs @@ -1,6 +1,7 @@ use super::{headermap::HeaderMap, method::Method, uri::Uri}; use std::fmt::Display; +#[derive(Debug)] pub struct Request { pub method: Method, pub uri: Uri, diff --git a/src/encoding/uri.rs b/src/encoding/uri.rs index 2d77df2..64572ba 100644 --- a/src/encoding/uri.rs +++ b/src/encoding/uri.rs @@ -1,5 +1,6 @@ use std::fmt::Display; +#[derive(Debug)] pub struct Uri { pub content: String, } @@ -1,2 +1,4 @@ +#![feature(iterator_try_collect)] pub mod encoding; pub mod transport; +pub mod transaction; diff --git a/src/transaction/mod.rs b/src/transaction/mod.rs new file mode 100644 index 0000000..2953cb7 --- /dev/null +++ b/src/transaction/mod.rs @@ -0,0 +1,58 @@ +use crate::{ + encoding::{headers::CSeq, request::Request, response::Response}, + transport::Transport, +}; +use anyhow::{anyhow, Result}; +use std::{ + collections::HashMap, + sync::atomic::{AtomicU32, Ordering}, +}; +use tokio::sync::{ + mpsc::{self, channel}, + RwLock, +}; + +pub struct TransactionUser<T> { + transport: T, + sequence: AtomicU32, + pending_requests: RwLock<HashMap<CSeq, mpsc::Sender<Response>>>, +} + +impl<T: Transport> TransactionUser<T> { + pub fn new(transport: T) -> Self { + Self { + sequence: 0.into(), + pending_requests: Default::default(), + transport, + } + } + + pub async fn process_responses(&self) -> Result<()> { + let resp = self.transport.recv().await?; + let cseq = resp + .headers + .get() + .ok_or(anyhow!("response cseq missing"))??; + self.pending_requests + .write() + .await + .get_mut(&cseq) + .ok_or(anyhow!("message was not requested"))? + .send(resp) + .await?; + Ok(()) + } + + pub async fn transact(&self, mut request: Request) -> Result<mpsc::Receiver<Response>> { + let seq = self.sequence.fetch_add(1, Ordering::Relaxed); + let cseq = CSeq(seq, request.method); + request.headers.insert(cseq); + + let (tx, rx) = channel(4); + + self.transport.send(request).await?; + self.pending_requests.write().await.insert(cseq, tx); + + Ok(rx) + } +} diff --git a/src/transport/client.rs b/src/transport/client.rs deleted file mode 100644 index e69de29..0000000 --- a/src/transport/client.rs +++ /dev/null diff --git a/src/transport/mod.rs b/src/transport/mod.rs index b9babe5..3fa82df 100644 --- a/src/transport/mod.rs +++ b/src/transport/mod.rs @@ -1 +1,10 @@ -pub mod client; +use crate::encoding::{request::Request, response::Response}; +use anyhow::Result; + +pub mod tcp; + +#[allow(async_fn_in_trait)] +pub trait Transport { + async fn recv(&self) -> Result<Response>; + async fn send(&self, request: Request) -> Result<()>; +} diff --git a/src/transport/tcp.rs b/src/transport/tcp.rs new file mode 100644 index 0000000..e196db3 --- /dev/null +++ b/src/transport/tcp.rs @@ -0,0 +1,58 @@ +use crate::encoding::{request::Request, response::Response}; +use anyhow::{anyhow, Result}; +use log::debug; +use std::str::FromStr; +use tokio::{ + io::{AsyncBufReadExt, AsyncWriteExt, BufReader, BufWriter}, + net::{tcp::OwnedWriteHalf, TcpStream}, + sync::{ + mpsc::{channel, Receiver}, + Mutex, + }, +}; + +use super::Transport; + +pub struct TcpTransport { + write: Mutex<BufWriter<OwnedWriteHalf>>, + read: Mutex<Receiver<Response>>, +} + +impl TcpTransport { + pub async fn new(stream: TcpStream) -> Result<Self> { + let (read, write) = stream.into_split(); + + let (tx, rx) = channel(16); + + tokio::task::spawn(async move { + let mut sock = BufReader::new(read); + let mut message = String::new(); + loop { + while !message.ends_with("\r\n\r\n") { + sock.read_line(&mut message).await.unwrap(); + } + let mesg = Response::from_str(message.trim()).unwrap(); + debug!("<- {mesg:?}"); + tx.send(mesg).await.unwrap(); + message.clear(); + } + }); + + Ok(Self { + write: BufWriter::new(write).into(), + read: rx.into(), + }) + } +} +impl Transport for TcpTransport { + async fn recv(&self) -> Result<Response> { + self.read.lock().await.recv().await.ok_or(anyhow!("end")) + } + async fn send(&self, request: Request) -> Result<()> { + debug!("-> {request:?}"); + let mut g = self.write.lock().await; + g.write_all(format!("{request}").as_bytes()).await?; + g.flush().await?; + Ok(()) + } +} |