/* wearechat - generic multiplayer game with voip Copyright (C) 2025 metamuffin This program is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software Foundation, version 3 of the License only. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. You should have received a copy of the GNU Affero General Public License along with this program. If not, see . */ use anyhow::Result; use log::{debug, error, info, warn}; use std::{ collections::HashMap, io::{BufReader, BufWriter, Write}, net::{SocketAddr, TcpListener, TcpStream, UdpSocket}, sync::{ Arc, Mutex, mpsc::{Receiver, Sender, SyncSender, channel, sync_channel}, }, thread::spawn, time::Instant, }; use weareshared::{helper::ReadWrite, packets::Packet, tree::PacketSink}; type Conns = Arc>>, Option)>>>; pub struct ServerNetwork { conns: Conns, udp_sock: Arc, pub recv: Receiver<(u128, Packet)>, } impl ServerNetwork { pub fn new(addr: SocketAddr) -> Result { let (recv_tx, recv_rx) = channel(); let conns = Arc::new(Mutex::new(HashMap::new())); let tcp_listener = TcpListener::bind(addr)?; info!("tcp bound to {}", tcp_listener.local_addr()?); let udp_sock = Arc::new(UdpSocket::bind(addr)?); info!("udp bound to {}", udp_sock.local_addr()?); let s = Self { conns: conns.clone(), recv: recv_rx, udp_sock: udp_sock.clone(), }; let recv_tx2 = recv_tx.clone(); let conns2 = conns.clone(); spawn(move || serve_tcp(tcp_listener, conns, recv_tx)); spawn(move || serve_udp(udp_sock, conns2, recv_tx2)); Ok(s) } pub fn broadcast(&self, packet: Packet, reliable: bool) { debug!("* -> {packet:?}"); let ser = Arc::new(packet.write_alloc().into_owned()); for (cid, (tcp, udp)) in self.conns.lock().unwrap().iter() { if !reliable { if let Some(addr) = udp { self.udp_sock.send_to(&ser, addr).unwrap(); continue; } } if tcp.send(ser.clone()).is_err() { warn!("{cid}: queue full, packet dropped") } } } pub fn send(&self, conn: u128, packet: Packet, reliable: bool) { let ser = Arc::new(packet.write_alloc().into_owned()); if let Some((tcp, udp)) = self.conns.lock().unwrap().get(&conn) { if !reliable { if let Some(addr) = udp { debug!("{conn} ~> {packet:?}"); self.udp_sock.send_to(&ser, addr).unwrap(); return; } } debug!("{conn} -> {packet:?}"); if tcp.send(ser.clone()).is_err() { warn!("{conn}: queue full, packet dropped") } } } } fn serve_udp(udp_sock: Arc, conns: Conns, recv_tx: Sender<(u128, Packet)>) { info!("udp listener thread started"); let mut buf = [0u8; 8096]; struct CState { last_ack: Instant, conn_id: u128, } let mut cstates = HashMap::new(); let mut last_check = Instant::now(); loop { let (size, addr) = match udp_sock.recv_from(&mut buf) { Err(e) => { warn!("udp recv failed: {e}"); continue; } Ok(s) => s, }; let mut packet = &buf[..size]; let packet = match Packet::read(&mut packet) { Ok(p) => p, Err(e) => { warn!("invalid packet from {e}"); continue; } }; if let Packet::Connect(id) = &packet { if let Some((_send, udp_addr)) = conns.lock().unwrap().get_mut(id) { *udp_addr = Some(addr); cstates.insert( addr, CState { conn_id: *id, last_ack: Instant::now(), }, ); } } else if let Some(conn) = cstates.get(&addr) { let cid = conn.conn_id; if let Packet::Disconnect = &packet { cstates.remove(&addr); } debug!("{cid} <~ {packet:?}"); recv_tx.send((cid, packet)).unwrap(); } if last_check.elapsed().as_secs() > 10 { cstates.retain(|addr, state| { if state.last_ack.elapsed().as_secs() > 30 { warn!("client dropped: {addr}"); recv_tx.send((state.conn_id, Packet::Disconnect)).unwrap(); false } else { true } }); last_check = Instant::now(); } } } fn serve_tcp(tcp_listener: TcpListener, conns: Conns, recv_tx: Sender<(u128, Packet)>) { info!("tcp listener thread started"); loop { let (mut sock, addr) = match tcp_listener.accept() { Ok(x) => x, Err(e) => { warn!("cannot accept tcp: {e}"); continue; } }; info!("TCP connected {addr}"); let conns = conns.clone(); let recv_tx = recv_tx.clone(); spawn(move || { let conn = match Packet::read(&mut sock) { Ok(Packet::Connect(x)) => x, Ok(_) => { warn!("client send non-connect packet first"); return; } Err(e) => { warn!("client handshake failed: {e}"); return; } }; let (send_tx, send_rx) = sync_channel(128); conns.lock().unwrap().insert(conn, (send_tx, None)); recv_tx.send((conn, Packet::Connect(conn))).unwrap(); let sock2 = match sock.try_clone() { Ok(x) => x, Err(e) => { error!("cannot duplicate client socket: {e}"); return; } }; spawn(move || { if let Err(e) = handle_conn_write(sock2, send_rx) { warn!("client outbound error: {e}"); } }); if let Err(e) = handle_conn_read(conn, sock, &recv_tx) { warn!("client inbound error: {e}"); } recv_tx.send((conn, Packet::Disconnect)).unwrap(); conns.lock().unwrap().remove(&conn); }); } } fn handle_conn_read(conn: u128, sock: TcpStream, tx: &Sender<(u128, Packet)>) -> Result<()> { let mut sock = BufReader::new(sock); loop { let packet = Packet::read(&mut sock)?; debug!("{conn} <- {packet:?}"); if matches!(packet, Packet::Disconnect) { return Ok(()); // not sending because generic disconnect sends it too } else { tx.send((conn, packet)).unwrap(); } } } fn handle_conn_write(sock: TcpStream, rx: Receiver>>) -> Result<()> { let mut sock = BufWriter::new(sock); for packet in rx { sock.write_all(&packet)?; sock.flush()?; } Ok(()) } impl ServerNetwork { pub fn br(&self) -> Broadcaster<'_> { Broadcaster(self) } } pub struct Broadcaster<'a>(&'a ServerNetwork); impl PacketSink for Broadcaster<'_> { fn push(&mut self, p: Packet) { self.0.broadcast(p, true); } }