From 384ddd782b989218ceb55b7147aa8698425d1464 Mon Sep 17 00:00:00 2001 From: metamuffin Date: Fri, 5 Jul 2024 02:27:48 +0200 Subject: simpletransaction works --- src/transport/tcp.rs | 58 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 58 insertions(+) create mode 100644 src/transport/tcp.rs (limited to 'src/transport/tcp.rs') diff --git a/src/transport/tcp.rs b/src/transport/tcp.rs new file mode 100644 index 0000000..e196db3 --- /dev/null +++ b/src/transport/tcp.rs @@ -0,0 +1,58 @@ +use crate::encoding::{request::Request, response::Response}; +use anyhow::{anyhow, Result}; +use log::debug; +use std::str::FromStr; +use tokio::{ + io::{AsyncBufReadExt, AsyncWriteExt, BufReader, BufWriter}, + net::{tcp::OwnedWriteHalf, TcpStream}, + sync::{ + mpsc::{channel, Receiver}, + Mutex, + }, +}; + +use super::Transport; + +pub struct TcpTransport { + write: Mutex>, + read: Mutex>, +} + +impl TcpTransport { + pub async fn new(stream: TcpStream) -> Result { + let (read, write) = stream.into_split(); + + let (tx, rx) = channel(16); + + tokio::task::spawn(async move { + let mut sock = BufReader::new(read); + let mut message = String::new(); + loop { + while !message.ends_with("\r\n\r\n") { + sock.read_line(&mut message).await.unwrap(); + } + let mesg = Response::from_str(message.trim()).unwrap(); + debug!("<- {mesg:?}"); + tx.send(mesg).await.unwrap(); + message.clear(); + } + }); + + Ok(Self { + write: BufWriter::new(write).into(), + read: rx.into(), + }) + } +} +impl Transport for TcpTransport { + async fn recv(&self) -> Result { + self.read.lock().await.recv().await.ok_or(anyhow!("end")) + } + async fn send(&self, request: Request) -> Result<()> { + debug!("-> {request:?}"); + let mut g = self.write.lock().await; + g.write_all(format!("{request}").as_bytes()).await?; + g.flush().await?; + Ok(()) + } +} -- cgit v1.2.3-70-g09d2