aboutsummaryrefslogtreecommitdiff
path: root/src/transport/tcp.rs
blob: f5986b9550ff8cf5d74d75da0378e2e16e4d1fbf (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
use crate::encoding::{request::Request, response::Response};
use anyhow::{anyhow, Result};
use log::debug;
use std::str::FromStr;
use tokio::{
    io::{AsyncBufReadExt, AsyncWriteExt, BufReader, BufWriter},
    net::{tcp::OwnedWriteHalf, TcpStream},
    sync::{
        mpsc::{channel, Receiver},
        Mutex,
    },
};

use super::Transport;

pub struct TcpTransport {
    write: Mutex<BufWriter<OwnedWriteHalf>>,
    read: Mutex<Receiver<Response>>,
}

impl TcpTransport {
    pub async fn new(stream: TcpStream) -> Result<Self> {
        let (read, write) = stream.into_split();

        let (tx, rx) = channel(16);

        tokio::task::spawn(async move {
            let mut sock = BufReader::new(read);
            let mut message = String::new();
            loop {
                while !message.ends_with("\r\n\r\n") {
                    sock.read_line(&mut message).await.unwrap();
                }
                let mesg = Response::from_str(message.trim()).unwrap();
                debug!("<- {mesg}");
                tx.send(mesg).await.unwrap();
                message.clear();
            }
        });

        Ok(Self {
            write: BufWriter::new(write).into(),
            read: rx.into(),
        })
    }
}
impl Transport for TcpTransport {
    async fn recv(&self) -> Result<Response> {
        self.read.lock().await.recv().await.ok_or(anyhow!("end"))
    }
    async fn send(&self, request: Request) -> Result<()> {
        debug!("-> {request}");
        let mut g = self.write.lock().await;
        g.write_all(format!("{request}").as_bytes()).await?;
        g.flush().await?;
        Ok(())
    }
}