use crate::encoding::{request::Request, response::Response}; use anyhow::{anyhow, Result}; use log::debug; use std::str::FromStr; use tokio::{ io::{AsyncBufReadExt, AsyncWriteExt, BufReader, BufWriter}, net::{tcp::OwnedWriteHalf, TcpStream}, sync::{ mpsc::{channel, Receiver}, Mutex, }, }; use super::Transport; pub struct TcpTransport { write: Mutex>, read: Mutex>, } impl TcpTransport { pub async fn new(stream: TcpStream) -> Result { let (read, write) = stream.into_split(); let (tx, rx) = channel(16); tokio::task::spawn(async move { let mut sock = BufReader::new(read); let mut message = String::new(); loop { while !message.ends_with("\r\n\r\n") { sock.read_line(&mut message).await.unwrap(); } let mesg = Response::from_str(message.trim()).unwrap(); debug!("<- {mesg}"); tx.send(mesg).await.unwrap(); message.clear(); } }); Ok(Self { write: BufWriter::new(write).into(), read: rx.into(), }) } } impl Transport for TcpTransport { async fn recv(&self) -> Result { self.read.lock().await.recv().await.ok_or(anyhow!("end")) } async fn send(&self, request: Request) -> Result<()> { debug!("-> {request}"); let mut g = self.write.lock().await; g.write_all(format!("{request}").as_bytes()).await?; g.flush().await?; Ok(()) } }