aboutsummaryrefslogtreecommitdiff
path: root/src/transaction/mod.rs
diff options
context:
space:
mode:
authormetamuffin <metamuffin@disroot.org>2024-11-19 02:08:52 +0100
committermetamuffin <metamuffin@disroot.org>2024-11-19 02:08:52 +0100
commitcbc111f90b5facc1f2a9dd79ced216279d6260af (patch)
treefa5a1d2d67874413d8e66673825c6789e8cc0945 /src/transaction/mod.rs
parent2d9a31244eab6d3a9871369d3148de253e902d36 (diff)
downloadsip-rs-cbc111f90b5facc1f2a9dd79ced216279d6260af.tar
sip-rs-cbc111f90b5facc1f2a9dd79ced216279d6260af.tar.bz2
sip-rs-cbc111f90b5facc1f2a9dd79ced216279d6260af.tar.zst
move files + rtp parser
Diffstat (limited to 'src/transaction/mod.rs')
-rw-r--r--src/transaction/mod.rs87
1 files changed, 0 insertions, 87 deletions
diff --git a/src/transaction/mod.rs b/src/transaction/mod.rs
deleted file mode 100644
index 3368c47..0000000
--- a/src/transaction/mod.rs
+++ /dev/null
@@ -1,87 +0,0 @@
-pub mod auth;
-
-use crate::{
- encoding::{
- headers::{CSeq, CallID, ContentLength},
- request::Request,
- response::Response,
- Message,
- },
- transport::Transport,
-};
-use anyhow::{anyhow, Result};
-use std::{
- collections::HashMap,
- sync::atomic::{AtomicU32, Ordering},
-};
-use tokio::sync::{
- mpsc::{self, channel},
- RwLock,
-};
-
-pub struct TransactionUser<T> {
- transport: T,
- sequence: AtomicU32,
- pending_requests: RwLock<HashMap<CSeq, mpsc::Sender<Response>>>,
-}
-
-impl<T: Transport> TransactionUser<T> {
- pub fn new(transport: T) -> Self {
- Self {
- sequence: 0.into(),
- pending_requests: Default::default(),
- transport,
- }
- }
-
- pub async fn process_incoming(&self) -> Result<Request> {
- loop {
- let mesg = self.transport.recv().await?;
- match mesg {
- Message::Request(req) => break Ok(req),
- Message::Response(resp) => {
- let cseq = resp
- .headers
- .get()
- .ok_or(anyhow!("response cseq missing"))??;
- self.pending_requests
- .write()
- .await
- .get_mut(&cseq)
- .ok_or(anyhow!("message was not requested"))?
- .send(resp)
- .await?;
- }
- }
- }
- }
- pub async fn respond(&self, req: &Request, mut resp: Response) -> Result<()> {
- resp.headers.insert(
- req.headers
- .get::<CSeq>()
- .ok_or(anyhow!("cseq is mandatory"))??,
- );
- resp.headers.insert(
- req.headers
- .get::<CallID>()
- .ok_or(anyhow!("call-id is mandatory"))??,
- );
- resp.headers.insert(ContentLength(resp.body.len()));
- self.transport.send(Message::Response(resp)).await?;
- Ok(())
- }
-
- pub async fn transact(&self, mut request: Request) -> Result<mpsc::Receiver<Response>> {
- let seq = self.sequence.fetch_add(1, Ordering::Relaxed);
- let cseq = CSeq(seq, request.method);
- request.headers.insert(cseq);
- request.headers.insert(ContentLength(request.body.len()));
-
- let (tx, rx) = channel(4);
-
- self.transport.send(Message::Request(request)).await?;
- self.pending_requests.write().await.insert(cseq, tx);
-
- Ok(rx)
- }
-}