aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/encoding/headermap.rs18
-rw-r--r--src/encoding/headers.rs71
-rw-r--r--src/encoding/method.rs18
-rw-r--r--src/encoding/request.rs1
-rw-r--r--src/encoding/uri.rs1
-rw-r--r--src/lib.rs2
-rw-r--r--src/transaction/mod.rs58
-rw-r--r--src/transport/client.rs0
-rw-r--r--src/transport/mod.rs11
-rw-r--r--src/transport/tcp.rs58
10 files changed, 232 insertions, 6 deletions
diff --git a/src/encoding/headermap.rs b/src/encoding/headermap.rs
index 313d8b5..2f9f097 100644
--- a/src/encoding/headermap.rs
+++ b/src/encoding/headermap.rs
@@ -13,11 +13,17 @@ impl HeaderMap {
self.0.push((H::NAME.to_string(), format!("{h}")));
self
}
- pub fn get<H: Header>(&self) -> Option<Result<H>> {
+ pub fn insert<H: Header>(&mut self, h: H) {
+ self.0.push((H::NAME.to_string(), format!("{h}")));
+ }
+ pub fn get_raw(&self, name: &str) -> Option<&str> {
self.0
.iter()
- .find(|(k, _)| k == H::NAME)
- .map(|(_, v)| H::from_str(v))
+ .find(|(k, _)| k.eq_ignore_ascii_case(name))
+ .map(|(_, v)| v.as_str())
+ }
+ pub fn get<H: Header>(&self) -> Option<Result<H>> {
+ self.get_raw(H::NAME).map(H::from_str)
}
pub fn insert_raw(&mut self, key: String, value: String) {
self.0.push((key, value))
@@ -32,3 +38,9 @@ impl Display for HeaderMap {
Ok(())
}
}
+
+impl FromIterator<(String, String)> for HeaderMap {
+ fn from_iter<T: IntoIterator<Item = (String, String)>>(iter: T) -> Self {
+ Self(Vec::from_iter(iter))
+ }
+}
diff --git a/src/encoding/headers.rs b/src/encoding/headers.rs
index d837ee5..6bffc7e 100644
--- a/src/encoding/headers.rs
+++ b/src/encoding/headers.rs
@@ -1,7 +1,10 @@
+use super::{headermap::HeaderMap, method::Method};
+use anyhow::{anyhow, bail};
use std::{fmt::Display, str::FromStr};
macro_rules! header {
($hname:literal, struct $name:ident($type:ty)) => {
+ #[derive(Debug)]
pub struct $name(pub $type);
impl Header for $name {
const NAME: &'static str = $hname;
@@ -26,7 +29,6 @@ pub trait Header: FromStr<Err = anyhow::Error> + Display {
header!("Content-Length", struct ContentLength(usize));
header!("Call-ID", struct CallID(String));
-header!("CSeq", struct CSeq(String));
header!("Via", struct Via(String));
header!("Contact", struct Contact(String));
header!("Max-Forwards", struct MaxForwards(usize));
@@ -34,3 +36,70 @@ header!("From", struct From(String));
header!("To", struct To(String));
header!("User-Agent", struct UserAgent(String));
header!("Allow", struct Allow(String));
+
+#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)]
+pub struct CSeq(pub u32, pub Method);
+
+impl Header for CSeq {
+ const NAME: &'static str = "CSeq";
+}
+impl Display for CSeq {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ write!(f, "{} {}", self.0, self.1)
+ }
+}
+impl FromStr for CSeq {
+ type Err = anyhow::Error;
+ fn from_str(s: &str) -> Result<Self, Self::Err> {
+ let (seq, method) = s.split_once(" ").ok_or(anyhow!("method missing"))?;
+ Ok(CSeq(seq.parse()?, method.parse()?))
+ }
+}
+
+#[derive(Debug)]
+pub struct WWWAuthenticate {
+ pub realm: String,
+ pub nonce: String,
+}
+impl Header for WWWAuthenticate {
+ const NAME: &'static str = "WWW-Authenticate";
+}
+impl Display for WWWAuthenticate {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ write!(f, "Digest realm={:?}, nonce={:?}", self.realm, self.nonce)
+ }
+}
+impl FromStr for WWWAuthenticate {
+ type Err = anyhow::Error;
+ fn from_str(s: &str) -> Result<Self, Self::Err> {
+ // TODO this is totally wrong
+ let kvs = s
+ .strip_prefix("Digest ")
+ .ok_or(anyhow!("type not digest"))?
+ .split(", ")
+ .map(|e| {
+ let Some((k, v)) = e.split_once("=") else {
+ bail!("not a KV-pair")
+ };
+ Ok((
+ k.to_string(),
+ v.strip_prefix("\"")
+ .ok_or(anyhow!("start quote missing"))?
+ .strip_suffix("\"")
+ .ok_or(anyhow!("end quote missing"))?
+ .to_string(),
+ ))
+ })
+ .try_collect::<HeaderMap>()?;
+ Ok(WWWAuthenticate {
+ realm: kvs
+ .get_raw("realm")
+ .ok_or(anyhow!("realm missing"))?
+ .to_string(),
+ nonce: kvs
+ .get_raw("nonce")
+ .ok_or(anyhow!("nonce missing"))?
+ .to_string(),
+ })
+ }
+}
diff --git a/src/encoding/method.rs b/src/encoding/method.rs
index 5f8110a..73829b0 100644
--- a/src/encoding/method.rs
+++ b/src/encoding/method.rs
@@ -1,5 +1,7 @@
-use std::fmt::Display;
+use anyhow::bail;
+use std::{fmt::Display, str::FromStr};
+#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum Method {
Register,
Invite,
@@ -21,3 +23,17 @@ impl Display for Method {
})
}
}
+impl FromStr for Method {
+ type Err = anyhow::Error;
+ fn from_str(s: &str) -> Result<Self, Self::Err> {
+ Ok(match s {
+ "REGISTER" => Method::Register,
+ "INVITE" => Method::Invite,
+ "ACK" => Method::Ack,
+ "OPTION" => Method::Option,
+ "CANCEL" => Method::Cancel,
+ "BYE" => Method::Bye,
+ _ => bail!("unknown method"),
+ })
+ }
+}
diff --git a/src/encoding/request.rs b/src/encoding/request.rs
index 03b93c9..124522c 100644
--- a/src/encoding/request.rs
+++ b/src/encoding/request.rs
@@ -1,6 +1,7 @@
use super::{headermap::HeaderMap, method::Method, uri::Uri};
use std::fmt::Display;
+#[derive(Debug)]
pub struct Request {
pub method: Method,
pub uri: Uri,
diff --git a/src/encoding/uri.rs b/src/encoding/uri.rs
index 2d77df2..64572ba 100644
--- a/src/encoding/uri.rs
+++ b/src/encoding/uri.rs
@@ -1,5 +1,6 @@
use std::fmt::Display;
+#[derive(Debug)]
pub struct Uri {
pub content: String,
}
diff --git a/src/lib.rs b/src/lib.rs
index d9935d3..6c6cc3b 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -1,2 +1,4 @@
+#![feature(iterator_try_collect)]
pub mod encoding;
pub mod transport;
+pub mod transaction;
diff --git a/src/transaction/mod.rs b/src/transaction/mod.rs
new file mode 100644
index 0000000..2953cb7
--- /dev/null
+++ b/src/transaction/mod.rs
@@ -0,0 +1,58 @@
+use crate::{
+ encoding::{headers::CSeq, request::Request, response::Response},
+ transport::Transport,
+};
+use anyhow::{anyhow, Result};
+use std::{
+ collections::HashMap,
+ sync::atomic::{AtomicU32, Ordering},
+};
+use tokio::sync::{
+ mpsc::{self, channel},
+ RwLock,
+};
+
+pub struct TransactionUser<T> {
+ transport: T,
+ sequence: AtomicU32,
+ pending_requests: RwLock<HashMap<CSeq, mpsc::Sender<Response>>>,
+}
+
+impl<T: Transport> TransactionUser<T> {
+ pub fn new(transport: T) -> Self {
+ Self {
+ sequence: 0.into(),
+ pending_requests: Default::default(),
+ transport,
+ }
+ }
+
+ pub async fn process_responses(&self) -> Result<()> {
+ let resp = self.transport.recv().await?;
+ let cseq = resp
+ .headers
+ .get()
+ .ok_or(anyhow!("response cseq missing"))??;
+ self.pending_requests
+ .write()
+ .await
+ .get_mut(&cseq)
+ .ok_or(anyhow!("message was not requested"))?
+ .send(resp)
+ .await?;
+ Ok(())
+ }
+
+ pub async fn transact(&self, mut request: Request) -> Result<mpsc::Receiver<Response>> {
+ let seq = self.sequence.fetch_add(1, Ordering::Relaxed);
+ let cseq = CSeq(seq, request.method);
+ request.headers.insert(cseq);
+
+ let (tx, rx) = channel(4);
+
+ self.transport.send(request).await?;
+ self.pending_requests.write().await.insert(cseq, tx);
+
+ Ok(rx)
+ }
+}
diff --git a/src/transport/client.rs b/src/transport/client.rs
deleted file mode 100644
index e69de29..0000000
--- a/src/transport/client.rs
+++ /dev/null
diff --git a/src/transport/mod.rs b/src/transport/mod.rs
index b9babe5..3fa82df 100644
--- a/src/transport/mod.rs
+++ b/src/transport/mod.rs
@@ -1 +1,10 @@
-pub mod client;
+use crate::encoding::{request::Request, response::Response};
+use anyhow::Result;
+
+pub mod tcp;
+
+#[allow(async_fn_in_trait)]
+pub trait Transport {
+ async fn recv(&self) -> Result<Response>;
+ async fn send(&self, request: Request) -> Result<()>;
+}
diff --git a/src/transport/tcp.rs b/src/transport/tcp.rs
new file mode 100644
index 0000000..e196db3
--- /dev/null
+++ b/src/transport/tcp.rs
@@ -0,0 +1,58 @@
+use crate::encoding::{request::Request, response::Response};
+use anyhow::{anyhow, Result};
+use log::debug;
+use std::str::FromStr;
+use tokio::{
+ io::{AsyncBufReadExt, AsyncWriteExt, BufReader, BufWriter},
+ net::{tcp::OwnedWriteHalf, TcpStream},
+ sync::{
+ mpsc::{channel, Receiver},
+ Mutex,
+ },
+};
+
+use super::Transport;
+
+pub struct TcpTransport {
+ write: Mutex<BufWriter<OwnedWriteHalf>>,
+ read: Mutex<Receiver<Response>>,
+}
+
+impl TcpTransport {
+ pub async fn new(stream: TcpStream) -> Result<Self> {
+ let (read, write) = stream.into_split();
+
+ let (tx, rx) = channel(16);
+
+ tokio::task::spawn(async move {
+ let mut sock = BufReader::new(read);
+ let mut message = String::new();
+ loop {
+ while !message.ends_with("\r\n\r\n") {
+ sock.read_line(&mut message).await.unwrap();
+ }
+ let mesg = Response::from_str(message.trim()).unwrap();
+ debug!("<- {mesg:?}");
+ tx.send(mesg).await.unwrap();
+ message.clear();
+ }
+ });
+
+ Ok(Self {
+ write: BufWriter::new(write).into(),
+ read: rx.into(),
+ })
+ }
+}
+impl Transport for TcpTransport {
+ async fn recv(&self) -> Result<Response> {
+ self.read.lock().await.recv().await.ok_or(anyhow!("end"))
+ }
+ async fn send(&self, request: Request) -> Result<()> {
+ debug!("-> {request:?}");
+ let mut g = self.write.lock().await;
+ g.write_all(format!("{request}").as_bytes()).await?;
+ g.flush().await?;
+ Ok(())
+ }
+}