diff options
Diffstat (limited to 'src/transaction')
-rw-r--r-- | src/transaction/mod.rs | 38 |
1 files changed, 22 insertions, 16 deletions
diff --git a/src/transaction/mod.rs b/src/transaction/mod.rs index 2953cb7..3be9544 100644 --- a/src/transaction/mod.rs +++ b/src/transaction/mod.rs @@ -1,5 +1,5 @@ use crate::{ - encoding::{headers::CSeq, request::Request, response::Response}, + encoding::{headers::CSeq, request::Request, response::Response, Message}, transport::Transport, }; use anyhow::{anyhow, Result}; @@ -27,20 +27,26 @@ impl<T: Transport> TransactionUser<T> { } } - pub async fn process_responses(&self) -> Result<()> { - let resp = self.transport.recv().await?; - let cseq = resp - .headers - .get() - .ok_or(anyhow!("response cseq missing"))??; - self.pending_requests - .write() - .await - .get_mut(&cseq) - .ok_or(anyhow!("message was not requested"))? - .send(resp) - .await?; - Ok(()) + pub async fn process_incoming(&self) -> Result<Request> { + loop { + let mesg = self.transport.recv().await?; + match mesg { + Message::Request(req) => break Ok(req), + Message::Response(resp) => { + let cseq = resp + .headers + .get() + .ok_or(anyhow!("response cseq missing"))??; + self.pending_requests + .write() + .await + .get_mut(&cseq) + .ok_or(anyhow!("message was not requested"))? + .send(resp) + .await?; + } + } + } } pub async fn transact(&self, mut request: Request) -> Result<mpsc::Receiver<Response>> { @@ -50,7 +56,7 @@ impl<T: Transport> TransactionUser<T> { let (tx, rx) = channel(4); - self.transport.send(request).await?; + self.transport.send(Message::Request(request)).await?; self.pending_requests.write().await.insert(cseq, tx); Ok(rx) |