use anyhow::Result; use log::{debug, error, info, warn}; use std::{ collections::HashMap, io::{BufReader, BufWriter, Write}, net::{SocketAddr, TcpListener, TcpStream, UdpSocket}, sync::{ Arc, Mutex, mpsc::{Receiver, Sender, SyncSender, channel, sync_channel}, }, thread::spawn, time::Instant, }; use weareshared::packets::{Packet, ReadWrite}; pub struct ServerNetwork { conns: Arc>>, Option)>>>, udp_sock: Arc, pub recv: Receiver<(u128, Packet)>, } impl ServerNetwork { pub fn new(addr: SocketAddr) -> Result { let (recv_tx, recv_rx) = channel(); let conns = Arc::new(Mutex::new(HashMap::new())); let tcp_listener = TcpListener::bind(addr)?; info!("tcp bound to {}", tcp_listener.local_addr()?); let udp_sock = Arc::new(UdpSocket::bind(addr)?); info!("udp bound to {}", udp_sock.local_addr()?); let s = Self { conns: conns.clone(), recv: recv_rx, udp_sock: udp_sock.clone(), }; let recv_tx2 = recv_tx.clone(); let conns2 = conns.clone(); spawn(move || { info!("tcp listener thread started"); loop { let (mut sock, addr) = match tcp_listener.accept() { Ok(x) => x, Err(e) => { warn!("cannot accept tcp: {e}"); continue; } }; info!("TCP connected {addr}"); let conns = conns.clone(); let recv_tx = recv_tx.clone(); spawn(move || { let conn = match Packet::read(&mut sock) { Ok(Packet::Connect(x)) => x, Ok(_) => { warn!("client send non-connect packet first"); return; } Err(e) => { warn!("client handshake failed: {e}"); return; } }; let (send_tx, send_rx) = sync_channel(128); conns.lock().unwrap().insert(conn, (send_tx, None)); let sock2 = match sock.try_clone() { Ok(x) => x, Err(e) => { error!("cannot duplicate client socket: {e}"); return; } }; spawn(move || { if let Err(e) = handle_conn_write(conn, sock2, send_rx) { warn!("client outbound error: {e}"); } }); if let Err(e) = handle_conn_read(conn, sock, recv_tx) { warn!("client inbound error: {e}"); } conns.lock().unwrap().remove(&conn); }); } }); spawn(move || { info!("udp listener thread started"); let mut buf = [0u8; 8096]; struct CState { last_ack: Instant, conn_id: u128, } let mut cstates = HashMap::new(); let mut last_check = Instant::now(); loop { let (size, addr) = match udp_sock.recv_from(&mut buf) { Err(e) => { warn!("udp recv failed: {e}"); continue; } Ok(s) => s, }; let mut packet = &buf[..size]; let packet = match Packet::read(&mut packet) { Ok(p) => p, Err(e) => { warn!("invalid packet from {e}"); continue; } }; if let Packet::Connect(id) = &packet { if let Some((_send, udp_addr)) = conns2.lock().unwrap().get_mut(id) { *udp_addr = Some(addr); cstates.insert(addr, CState { conn_id: *id, last_ack: Instant::now(), }); } } if let Some(conn) = cstates.get(&addr) { let cid = conn.conn_id; if let Packet::Disconnect = &packet { cstates.remove(&addr); } recv_tx2.send((cid, packet)).unwrap(); } if last_check.elapsed().as_secs() > 10 { cstates.retain(|addr, state| { if state.last_ack.elapsed().as_secs() > 30 { warn!("client dropped: {addr}"); recv_tx2.send((state.conn_id, Packet::Disconnect)).unwrap(); false } else { true } }); last_check = Instant::now(); } } }); Ok(s) } pub fn broadcast(&self, packet: Packet, reliable: bool) { let ser = Arc::new(packet.write_alloc()); for (cid, (tcp, udp)) in self.conns.lock().unwrap().iter() { if !reliable { if let Some(addr) = udp { self.udp_sock.send_to(&ser, addr).unwrap(); continue; } } if let Err(_) = tcp.send(ser.clone()) { warn!("{cid}: queue full, packet dropped") } } } pub fn send(&self, conn: u128, packet: Packet, reliable: bool) { let ser = Arc::new(packet.write_alloc()); if let Some((tcp, udp)) = self.conns.lock().unwrap().get(&conn) { if !reliable { if let Some(addr) = udp { self.udp_sock.send_to(&ser, addr).unwrap(); return; } } if let Err(_) = tcp.send(ser.clone()) { warn!("{conn}: queue full, packet dropped") } } } } fn handle_conn_read(conn: u128, sock: TcpStream, tx: Sender<(u128, Packet)>) -> Result<()> { let mut sock = BufReader::new(sock); loop { let packet = Packet::read(&mut sock)?; debug!("{conn} <- {packet:?}"); tx.send((conn, packet)).unwrap(); } } fn handle_conn_write(conn: u128, sock: TcpStream, rx: Receiver>>) -> Result<()> { let mut sock = BufWriter::new(sock); for packet in rx { debug!("{conn} -> {packet:?}"); sock.write_all(&packet)?; sock.flush()?; } Ok(()) }