diff options
author | metamuffin <metamuffin@disroot.org> | 2024-07-05 02:27:48 +0200 |
---|---|---|
committer | metamuffin <metamuffin@disroot.org> | 2024-07-05 02:27:48 +0200 |
commit | 384ddd782b989218ceb55b7147aa8698425d1464 (patch) | |
tree | 5bfb61317db2e9260129b53b8444f3c79a0bc708 /src/transaction | |
parent | 3f80205783bcf6a2ed682f6f21e5b1877d597328 (diff) | |
download | sip-rs-384ddd782b989218ceb55b7147aa8698425d1464.tar sip-rs-384ddd782b989218ceb55b7147aa8698425d1464.tar.bz2 sip-rs-384ddd782b989218ceb55b7147aa8698425d1464.tar.zst |
simpletransaction works
Diffstat (limited to 'src/transaction')
-rw-r--r-- | src/transaction/mod.rs | 58 |
1 files changed, 58 insertions, 0 deletions
diff --git a/src/transaction/mod.rs b/src/transaction/mod.rs new file mode 100644 index 0000000..2953cb7 --- /dev/null +++ b/src/transaction/mod.rs @@ -0,0 +1,58 @@ +use crate::{ + encoding::{headers::CSeq, request::Request, response::Response}, + transport::Transport, +}; +use anyhow::{anyhow, Result}; +use std::{ + collections::HashMap, + sync::atomic::{AtomicU32, Ordering}, +}; +use tokio::sync::{ + mpsc::{self, channel}, + RwLock, +}; + +pub struct TransactionUser<T> { + transport: T, + sequence: AtomicU32, + pending_requests: RwLock<HashMap<CSeq, mpsc::Sender<Response>>>, +} + +impl<T: Transport> TransactionUser<T> { + pub fn new(transport: T) -> Self { + Self { + sequence: 0.into(), + pending_requests: Default::default(), + transport, + } + } + + pub async fn process_responses(&self) -> Result<()> { + let resp = self.transport.recv().await?; + let cseq = resp + .headers + .get() + .ok_or(anyhow!("response cseq missing"))??; + self.pending_requests + .write() + .await + .get_mut(&cseq) + .ok_or(anyhow!("message was not requested"))? + .send(resp) + .await?; + Ok(()) + } + + pub async fn transact(&self, mut request: Request) -> Result<mpsc::Receiver<Response>> { + let seq = self.sequence.fetch_add(1, Ordering::Relaxed); + let cseq = CSeq(seq, request.method); + request.headers.insert(cseq); + + let (tx, rx) = channel(4); + + self.transport.send(request).await?; + self.pending_requests.write().await.insert(cseq, tx); + + Ok(rx) + } +} |