use crate::{handle_packet, CLIENTS, CLIENT_ID_COUNTER}; use crossbeam_channel::{Receiver, Sender}; use karlcommon::{version, ClientboundPacket, ProtoError, ServerboundPacket}; use log::{debug, error, info, warn}; use std::{ io::{self, BufRead, BufReader, ErrorKind, Read, Write}, thread, }; pub fn handle_connection(handle_client: F, arg: T) where F: FnOnce( u32, (Sender, Receiver), T, ) -> anyhow::Result<()>, { let id = CLIENT_ID_COUNTER.fetch_add(1, std::sync::atomic::Ordering::Relaxed); let (responder, responses) = crossbeam_channel::unbounded(); responder .send(ClientboundPacket::Handshake { version: version!(), }) .unwrap(); info!("client connected: {id}"); CLIENTS.write().unwrap().insert(id, responder.clone()); match handle_client(id, (responder, responses), arg) { Ok(_) => info!("client ({id}) dropped properly"), Err(e) => error!("client ({id}) dropped bc error: {e}"), } CLIENTS.write().unwrap().remove(&id); } pub fn stream( id: u32, (responder, responses): (Sender, Receiver), (rstream, wstream): (ReadStream, WriteStream), ) -> anyhow::Result<()> { let mut reader = BufReader::new(rstream); thread::spawn(move || { let mut wstream = wstream; for m in responses { debug!("{id} -> {m:?}"); match wstream .write_fmt(format_args!("{}\n", serde_json::to_string(&m).unwrap())) .map_err(|e| e.kind()) { Ok(_) => (), Err(ErrorKind::BrokenPipe) => break, Err(e) => error!("network error: {:?}", e), } } }); { let mut buf = String::new(); loop { if reader.read_line(&mut buf)? == 0 { break Ok(()); }; match serde_json::from_str::(buf.as_str()) { Ok(packet) => { debug!("{id} <- {packet:?}"); handle_packet(id, packet, responder.clone()); } Err(err) => { warn!("client error: {:?}", &err); responder .send(ClientboundPacket::Error(ProtoError::FormatError(format!( "{}", &err )))) .map_err(|_| io::Error::from(ErrorKind::InvalidInput))? } } buf.clear(); } } }