aboutsummaryrefslogtreecommitdiff
path: root/src/transaction/mod.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/transaction/mod.rs')
-rw-r--r--src/transaction/mod.rs58
1 files changed, 58 insertions, 0 deletions
diff --git a/src/transaction/mod.rs b/src/transaction/mod.rs
new file mode 100644
index 0000000..2953cb7
--- /dev/null
+++ b/src/transaction/mod.rs
@@ -0,0 +1,58 @@
+use crate::{
+ encoding::{headers::CSeq, request::Request, response::Response},
+ transport::Transport,
+};
+use anyhow::{anyhow, Result};
+use std::{
+ collections::HashMap,
+ sync::atomic::{AtomicU32, Ordering},
+};
+use tokio::sync::{
+ mpsc::{self, channel},
+ RwLock,
+};
+
+pub struct TransactionUser<T> {
+ transport: T,
+ sequence: AtomicU32,
+ pending_requests: RwLock<HashMap<CSeq, mpsc::Sender<Response>>>,
+}
+
+impl<T: Transport> TransactionUser<T> {
+ pub fn new(transport: T) -> Self {
+ Self {
+ sequence: 0.into(),
+ pending_requests: Default::default(),
+ transport,
+ }
+ }
+
+ pub async fn process_responses(&self) -> Result<()> {
+ let resp = self.transport.recv().await?;
+ let cseq = resp
+ .headers
+ .get()
+ .ok_or(anyhow!("response cseq missing"))??;
+ self.pending_requests
+ .write()
+ .await
+ .get_mut(&cseq)
+ .ok_or(anyhow!("message was not requested"))?
+ .send(resp)
+ .await?;
+ Ok(())
+ }
+
+ pub async fn transact(&self, mut request: Request) -> Result<mpsc::Receiver<Response>> {
+ let seq = self.sequence.fetch_add(1, Ordering::Relaxed);
+ let cseq = CSeq(seq, request.method);
+ request.headers.insert(cseq);
+
+ let (tx, rx) = channel(4);
+
+ self.transport.send(request).await?;
+ self.pending_requests.write().await.insert(cseq, tx);
+
+ Ok(rx)
+ }
+}