aboutsummaryrefslogtreecommitdiff
path: root/karld/src/interface/generic.rs
blob: e9d57b5cfc1725740487eb82c0c969e6dde43756 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
use crate::{handle_packet, CLIENTS, CLIENT_ID_COUNTER};
use crossbeam_channel::{Receiver, Sender};
use karlcommon::{version, ClientboundPacket, ProtoError, ServerboundPacket};
use log::{debug, error, info, warn};
use std::{
    io::{self, BufRead, BufReader, ErrorKind, Read, Write},
    thread,
};

pub fn handle_connection<F, T>(handle_client: F, arg: T)
where
    F: FnOnce(
        u32,
        (Sender<ClientboundPacket>, Receiver<ClientboundPacket>),
        T,
    ) -> anyhow::Result<()>,
{
    let id = CLIENT_ID_COUNTER.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
    let (responder, responses) = crossbeam_channel::unbounded();
    responder
        .send(ClientboundPacket::Handshake {
            version: version!(),
        })
        .unwrap();
    info!("client connected: {id}");
    CLIENTS.write().unwrap().insert(id, responder.clone());
    match handle_client(id, (responder, responses), arg) {
        Ok(_) => info!("client ({id}) dropped properly"),
        Err(e) => error!("client ({id}) dropped bc error: {e}"),
    }
    CLIENTS.write().unwrap().remove(&id);
}

pub fn stream<ReadStream: Read, WriteStream: Write + Send + 'static>(
    id: u32,
    (responder, responses): (Sender<ClientboundPacket>, Receiver<ClientboundPacket>),
    (rstream, wstream): (ReadStream, WriteStream),
) -> anyhow::Result<()> {
    let mut reader = BufReader::new(rstream);

    thread::spawn(move || {
        let mut wstream = wstream;
        for m in responses {
            debug!("{id} -> {m:?}");
            match wstream
                .write_fmt(format_args!("{}\n", serde_json::to_string(&m).unwrap()))
                .map_err(|e| e.kind())
            {
                Ok(_) => (),
                Err(ErrorKind::BrokenPipe) => break,
                Err(e) => error!("network error: {:?}", e),
            }
        }
    });
    {
        let mut buf = String::new();
        loop {
            if reader.read_line(&mut buf)? == 0 {
                break Ok(());
            };
            match serde_json::from_str::<ServerboundPacket>(buf.as_str()) {
                Ok(packet) => {
                    debug!("{id} <- {packet:?}");
                    handle_packet(id, packet, responder.clone());
                }
                Err(err) => {
                    warn!("client error: {:?}", &err);
                    responder
                        .send(ClientboundPacket::Error(ProtoError::FormatError(format!(
                            "{}",
                            &err
                        ))))
                        .map_err(|_| io::Error::from(ErrorKind::InvalidInput))?
                }
            }

            buf.clear();
        }
    }
}