aboutsummaryrefslogtreecommitdiff
path: root/src/transaction
diff options
context:
space:
mode:
authormetamuffin <metamuffin@disroot.org>2024-07-06 01:19:02 +0200
committermetamuffin <metamuffin@disroot.org>2024-07-06 01:19:02 +0200
commit5dd0fafce20ed37fdc97dc96539391ebdebffaff (patch)
treead93b9e8d0e9c9c7dbe5a858902c2ba0114a47cf /src/transaction
parenta4c52bedef04cfb927f3d7809680fed0425a5125 (diff)
downloadsip-rs-5dd0fafce20ed37fdc97dc96539391ebdebffaff.tar
sip-rs-5dd0fafce20ed37fdc97dc96539391ebdebffaff.tar.bz2
sip-rs-5dd0fafce20ed37fdc97dc96539391ebdebffaff.tar.zst
generalize to support requests. untested
Diffstat (limited to 'src/transaction')
-rw-r--r--src/transaction/mod.rs38
1 files changed, 22 insertions, 16 deletions
diff --git a/src/transaction/mod.rs b/src/transaction/mod.rs
index 2953cb7..3be9544 100644
--- a/src/transaction/mod.rs
+++ b/src/transaction/mod.rs
@@ -1,5 +1,5 @@
use crate::{
- encoding::{headers::CSeq, request::Request, response::Response},
+ encoding::{headers::CSeq, request::Request, response::Response, Message},
transport::Transport,
};
use anyhow::{anyhow, Result};
@@ -27,20 +27,26 @@ impl<T: Transport> TransactionUser<T> {
}
}
- pub async fn process_responses(&self) -> Result<()> {
- let resp = self.transport.recv().await?;
- let cseq = resp
- .headers
- .get()
- .ok_or(anyhow!("response cseq missing"))??;
- self.pending_requests
- .write()
- .await
- .get_mut(&cseq)
- .ok_or(anyhow!("message was not requested"))?
- .send(resp)
- .await?;
- Ok(())
+ pub async fn process_incoming(&self) -> Result<Request> {
+ loop {
+ let mesg = self.transport.recv().await?;
+ match mesg {
+ Message::Request(req) => break Ok(req),
+ Message::Response(resp) => {
+ let cseq = resp
+ .headers
+ .get()
+ .ok_or(anyhow!("response cseq missing"))??;
+ self.pending_requests
+ .write()
+ .await
+ .get_mut(&cseq)
+ .ok_or(anyhow!("message was not requested"))?
+ .send(resp)
+ .await?;
+ }
+ }
+ }
}
pub async fn transact(&self, mut request: Request) -> Result<mpsc::Receiver<Response>> {
@@ -50,7 +56,7 @@ impl<T: Transport> TransactionUser<T> {
let (tx, rx) = channel(4);
- self.transport.send(request).await?;
+ self.transport.send(Message::Request(request)).await?;
self.pending_requests.write().await.insert(cseq, tx);
Ok(rx)