From cbc111f90b5facc1f2a9dd79ced216279d6260af Mon Sep 17 00:00:00 2001 From: metamuffin Date: Tue, 19 Nov 2024 02:08:52 +0100 Subject: move files + rtp parser --- sip/src/transaction/mod.rs | 87 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 87 insertions(+) create mode 100644 sip/src/transaction/mod.rs (limited to 'sip/src/transaction/mod.rs') diff --git a/sip/src/transaction/mod.rs b/sip/src/transaction/mod.rs new file mode 100644 index 0000000..3368c47 --- /dev/null +++ b/sip/src/transaction/mod.rs @@ -0,0 +1,87 @@ +pub mod auth; + +use crate::{ + encoding::{ + headers::{CSeq, CallID, ContentLength}, + request::Request, + response::Response, + Message, + }, + transport::Transport, +}; +use anyhow::{anyhow, Result}; +use std::{ + collections::HashMap, + sync::atomic::{AtomicU32, Ordering}, +}; +use tokio::sync::{ + mpsc::{self, channel}, + RwLock, +}; + +pub struct TransactionUser { + transport: T, + sequence: AtomicU32, + pending_requests: RwLock>>, +} + +impl TransactionUser { + pub fn new(transport: T) -> Self { + Self { + sequence: 0.into(), + pending_requests: Default::default(), + transport, + } + } + + pub async fn process_incoming(&self) -> Result { + loop { + let mesg = self.transport.recv().await?; + match mesg { + Message::Request(req) => break Ok(req), + Message::Response(resp) => { + let cseq = resp + .headers + .get() + .ok_or(anyhow!("response cseq missing"))??; + self.pending_requests + .write() + .await + .get_mut(&cseq) + .ok_or(anyhow!("message was not requested"))? + .send(resp) + .await?; + } + } + } + } + pub async fn respond(&self, req: &Request, mut resp: Response) -> Result<()> { + resp.headers.insert( + req.headers + .get::() + .ok_or(anyhow!("cseq is mandatory"))??, + ); + resp.headers.insert( + req.headers + .get::() + .ok_or(anyhow!("call-id is mandatory"))??, + ); + resp.headers.insert(ContentLength(resp.body.len())); + self.transport.send(Message::Response(resp)).await?; + Ok(()) + } + + pub async fn transact(&self, mut request: Request) -> Result> { + let seq = self.sequence.fetch_add(1, Ordering::Relaxed); + let cseq = CSeq(seq, request.method); + request.headers.insert(cseq); + request.headers.insert(ContentLength(request.body.len())); + + let (tx, rx) = channel(4); + + self.transport.send(Message::Request(request)).await?; + self.pending_requests.write().await.insert(cseq, tx); + + Ok(rx) + } +} -- cgit v1.2.3-70-g09d2