diff options
Diffstat (limited to 'src/transport')
-rw-r--r-- | src/transport/mod.rs | 6 | ||||
-rw-r--r-- | src/transport/tcp.rs | 47 | ||||
-rw-r--r-- | src/transport/udp.rs | 8 |
3 files changed, 26 insertions, 35 deletions
diff --git a/src/transport/mod.rs b/src/transport/mod.rs index cf49d31..b4c512b 100644 --- a/src/transport/mod.rs +++ b/src/transport/mod.rs @@ -1,4 +1,4 @@ -use crate::encoding::{request::Request, response::Response}; +use crate::encoding::Message; use anyhow::Result; pub mod tcp; @@ -6,6 +6,6 @@ pub mod udp; #[allow(async_fn_in_trait)] pub trait Transport { - async fn recv(&self) -> Result<Response>; - async fn send(&self, request: Request) -> Result<()>; + async fn recv(&self) -> Result<Message>; + async fn send(&self, message: Message) -> Result<()>; } diff --git a/src/transport/tcp.rs b/src/transport/tcp.rs index 8c0a024..efe433d 100644 --- a/src/transport/tcp.rs +++ b/src/transport/tcp.rs @@ -1,53 +1,44 @@ use super::Transport; -use crate::encoding::{request::Request, response::Response}; -use anyhow::{anyhow, Result}; +use crate::encoding::Message; +use anyhow::Result; use log::debug; use std::str::FromStr; use tokio::{ io::{AsyncBufReadExt, AsyncWriteExt, BufReader, BufWriter}, - net::{tcp::OwnedWriteHalf, TcpStream}, - sync::{ - mpsc::{channel, Receiver}, - Mutex, + net::{ + tcp::{OwnedReadHalf, OwnedWriteHalf}, + TcpStream, }, + sync::Mutex, }; pub struct TcpTransport { write: Mutex<BufWriter<OwnedWriteHalf>>, - read: Mutex<Receiver<Response>>, + read: Mutex<BufReader<OwnedReadHalf>>, } impl TcpTransport { pub async fn new(stream: TcpStream) -> Result<Self> { let (read, write) = stream.into_split(); - - let (tx, rx) = channel(16); - - tokio::task::spawn(async move { - let mut sock = BufReader::new(read); - let mut message = String::new(); - loop { - while !message.ends_with("\r\n\r\n") { - sock.read_line(&mut message).await.unwrap(); - } - let mesg = Response::from_str(message.trim()).unwrap(); - debug!("<- {mesg}"); - tx.send(mesg).await.unwrap(); - message.clear(); - } - }); - Ok(Self { write: BufWriter::new(write).into(), - read: rx.into(), + read: BufReader::new(read).into(), }) } } + impl Transport for TcpTransport { - async fn recv(&self) -> Result<Response> { - self.read.lock().await.recv().await.ok_or(anyhow!("end")) + async fn recv(&self) -> Result<Message> { + let mut g = self.read.lock().await; + let mut message = String::new(); + while !message.ends_with("\r\n\r\n") { + g.read_line(&mut message).await?; + } + let mesg = Message::from_str(message.trim())?; + debug!("<- {mesg}"); + Ok(mesg) } - async fn send(&self, request: Request) -> Result<()> { + async fn send(&self, request: Message) -> Result<()> { debug!("-> {request}"); let mut g = self.write.lock().await; g.write_all(format!("{request}").as_bytes()).await?; diff --git a/src/transport/udp.rs b/src/transport/udp.rs index c86ce50..391e2b3 100644 --- a/src/transport/udp.rs +++ b/src/transport/udp.rs @@ -1,5 +1,5 @@ use super::Transport; -use crate::encoding::{request::Request, response::Response}; +use crate::encoding::Message; use anyhow::Result; use log::debug; use std::str::FromStr; @@ -15,14 +15,14 @@ impl UdpTransport { } } impl Transport for UdpTransport { - async fn recv(&self) -> Result<Response> { + async fn recv(&self) -> Result<Message> { let mut buf = [0; 1024]; let size = self.sock.recv(&mut buf).await?; let message = String::from_utf8(buf[..size].to_vec())?; debug!("{message}"); - Response::from_str(message.trim_end()) + Message::from_str(message.trim_end()) } - async fn send(&self, request: Request) -> Result<()> { + async fn send(&self, request: Message) -> Result<()> { debug!("-> {request}"); self.sock.send(format!("{request}").as_bytes()).await?; Ok(()) |