aboutsummaryrefslogtreecommitdiff
path: root/src/transport/tcp.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/transport/tcp.rs')
-rw-r--r--src/transport/tcp.rs58
1 files changed, 58 insertions, 0 deletions
diff --git a/src/transport/tcp.rs b/src/transport/tcp.rs
new file mode 100644
index 0000000..e196db3
--- /dev/null
+++ b/src/transport/tcp.rs
@@ -0,0 +1,58 @@
+use crate::encoding::{request::Request, response::Response};
+use anyhow::{anyhow, Result};
+use log::debug;
+use std::str::FromStr;
+use tokio::{
+ io::{AsyncBufReadExt, AsyncWriteExt, BufReader, BufWriter},
+ net::{tcp::OwnedWriteHalf, TcpStream},
+ sync::{
+ mpsc::{channel, Receiver},
+ Mutex,
+ },
+};
+
+use super::Transport;
+
+pub struct TcpTransport {
+ write: Mutex<BufWriter<OwnedWriteHalf>>,
+ read: Mutex<Receiver<Response>>,
+}
+
+impl TcpTransport {
+ pub async fn new(stream: TcpStream) -> Result<Self> {
+ let (read, write) = stream.into_split();
+
+ let (tx, rx) = channel(16);
+
+ tokio::task::spawn(async move {
+ let mut sock = BufReader::new(read);
+ let mut message = String::new();
+ loop {
+ while !message.ends_with("\r\n\r\n") {
+ sock.read_line(&mut message).await.unwrap();
+ }
+ let mesg = Response::from_str(message.trim()).unwrap();
+ debug!("<- {mesg:?}");
+ tx.send(mesg).await.unwrap();
+ message.clear();
+ }
+ });
+
+ Ok(Self {
+ write: BufWriter::new(write).into(),
+ read: rx.into(),
+ })
+ }
+}
+impl Transport for TcpTransport {
+ async fn recv(&self) -> Result<Response> {
+ self.read.lock().await.recv().await.ok_or(anyhow!("end"))
+ }
+ async fn send(&self, request: Request) -> Result<()> {
+ debug!("-> {request:?}");
+ let mut g = self.write.lock().await;
+ g.write_all(format!("{request}").as_bytes()).await?;
+ g.flush().await?;
+ Ok(())
+ }
+}