use crate::{ encoding::{headers::CSeq, request::Request, response::Response}, transport::Transport, }; use anyhow::{anyhow, Result}; use std::{ collections::HashMap, sync::atomic::{AtomicU32, Ordering}, }; use tokio::sync::{ mpsc::{self, channel}, RwLock, }; pub struct TransactionUser { transport: T, sequence: AtomicU32, pending_requests: RwLock>>, } impl TransactionUser { pub fn new(transport: T) -> Self { Self { sequence: 0.into(), pending_requests: Default::default(), transport, } } pub async fn process_responses(&self) -> Result<()> { let resp = self.transport.recv().await?; let cseq = resp .headers .get() .ok_or(anyhow!("response cseq missing"))??; self.pending_requests .write() .await .get_mut(&cseq) .ok_or(anyhow!("message was not requested"))? .send(resp) .await?; Ok(()) } pub async fn transact(&self, mut request: Request) -> Result> { let seq = self.sequence.fetch_add(1, Ordering::Relaxed); let cseq = CSeq(seq, request.method); request.headers.insert(cseq); let (tx, rx) = channel(4); self.transport.send(request).await?; self.pending_requests.write().await.insert(cseq, tx); Ok(rx) } }