pub mod auth; use crate::{ encoding::{ headers::{CSeq, CallID, ContentLength}, request::Request, response::Response, Message, }, transport::Transport, }; use anyhow::{anyhow, Result}; use std::{ collections::HashMap, sync::atomic::{AtomicU32, Ordering}, }; use tokio::sync::{ mpsc::{self, channel}, RwLock, }; pub struct TransactionUser { transport: T, sequence: AtomicU32, pending_requests: RwLock>>, } impl TransactionUser { pub fn new(transport: T) -> Self { Self { sequence: 0.into(), pending_requests: Default::default(), transport, } } pub async fn process_incoming(&self) -> Result { loop { let mesg = self.transport.recv().await?; match mesg { Message::Request(req) => break Ok(req), Message::Response(resp) => { let cseq = resp .headers .get() .ok_or(anyhow!("response cseq missing"))??; self.pending_requests .write() .await .get_mut(&cseq) .ok_or(anyhow!("message was not requested"))? .send(resp) .await?; } } } } pub async fn respond(&self, req: &Request, mut resp: Response) -> Result<()> { resp.headers.insert( req.headers .get::() .ok_or(anyhow!("cseq is mandatory"))??, ); resp.headers.insert( req.headers .get::() .ok_or(anyhow!("call-id is mandatory"))??, ); resp.headers.insert(ContentLength(resp.body.len())); self.transport.send(Message::Response(resp)).await?; Ok(()) } pub async fn transact(&self, mut request: Request) -> Result> { let seq = self.sequence.fetch_add(1, Ordering::Relaxed); let cseq = CSeq(seq, request.method); request.headers.insert(cseq); request.headers.insert(ContentLength(request.body.len())); let (tx, rx) = channel(4); self.transport.send(Message::Request(request)).await?; self.pending_requests.write().await.insert(cseq, tx); Ok(rx) } }