diff options
Diffstat (limited to 'server/src/network.rs')
-rw-r--r-- | server/src/network.rs | 191 |
1 files changed, 191 insertions, 0 deletions
diff --git a/server/src/network.rs b/server/src/network.rs new file mode 100644 index 0000000..22cbc65 --- /dev/null +++ b/server/src/network.rs @@ -0,0 +1,191 @@ +use anyhow::Result; +use log::{debug, error, info, warn}; +use std::{ + collections::HashMap, + io::{BufReader, BufWriter, Write}, + net::{SocketAddr, TcpListener, TcpStream, UdpSocket}, + sync::{ + Arc, Mutex, + mpsc::{Receiver, Sender, SyncSender, channel, sync_channel}, + }, + thread::spawn, + time::Instant, +}; +use weareshared::packets::{Packet, ReadWrite}; + +pub struct ServerNetwork { + conns: Arc<Mutex<HashMap<u128, (SyncSender<Arc<Vec<u8>>>, Option<SocketAddr>)>>>, + udp_sock: Arc<UdpSocket>, + pub recv: Receiver<(u128, Packet)>, +} +impl ServerNetwork { + pub fn new(addr: SocketAddr) -> Result<Self> { + let (recv_tx, recv_rx) = channel(); + let conns = Arc::new(Mutex::new(HashMap::new())); + + let tcp_listener = TcpListener::bind(addr)?; + info!("tcp bound to {}", tcp_listener.local_addr()?); + let udp_sock = Arc::new(UdpSocket::bind(addr)?); + info!("udp bound to {}", udp_sock.local_addr()?); + + let s = Self { + conns: conns.clone(), + recv: recv_rx, + udp_sock: udp_sock.clone(), + }; + let recv_tx2 = recv_tx.clone(); + let conns2 = conns.clone(); + spawn(move || { + info!("tcp listener thread started"); + loop { + let (mut sock, addr) = match tcp_listener.accept() { + Ok(x) => x, + Err(e) => { + warn!("cannot accept tcp: {e}"); + continue; + } + }; + info!("TCP connected {addr}"); + + let conns = conns.clone(); + let recv_tx = recv_tx.clone(); + spawn(move || { + let conn = match Packet::read(&mut sock) { + Ok(Packet::Connect(x)) => x, + Ok(_) => { + warn!("client send non-connect packet first"); + return; + } + Err(e) => { + warn!("client handshake failed: {e}"); + return; + } + }; + let (send_tx, send_rx) = sync_channel(128); + conns.lock().unwrap().insert(conn, (send_tx, None)); + + let sock2 = match sock.try_clone() { + Ok(x) => x, + Err(e) => { + error!("cannot duplicate client socket: {e}"); + return; + } + }; + spawn(move || { + if let Err(e) = handle_conn_write(conn, sock2, send_rx) { + warn!("client outbound error: {e}"); + } + }); + if let Err(e) = handle_conn_read(conn, sock, recv_tx) { + warn!("client inbound error: {e}"); + } + conns.lock().unwrap().remove(&conn); + }); + } + }); + spawn(move || { + info!("udp listener thread started"); + let mut buf = [0u8; 8096]; + struct CState { + last_ack: Instant, + conn_id: u128, + } + let mut cstates = HashMap::new(); + let mut last_check = Instant::now(); + loop { + let (size, addr) = match udp_sock.recv_from(&mut buf) { + Err(e) => { + warn!("udp recv failed: {e}"); + continue; + } + Ok(s) => s, + }; + + let mut packet = &buf[..size]; + let packet = match Packet::read(&mut packet) { + Ok(p) => p, + Err(e) => { + warn!("invalid packet from {e}"); + continue; + } + }; + if let Packet::Connect(id) = &packet { + if let Some((_send, udp_addr)) = conns2.lock().unwrap().get_mut(id) { + *udp_addr = Some(addr); + cstates.insert(addr, CState { + conn_id: *id, + last_ack: Instant::now(), + }); + } + } + if let Some(conn) = cstates.get(&addr) { + let cid = conn.conn_id; + if let Packet::Disconnect = &packet { + cstates.remove(&addr); + } + recv_tx2.send((cid, packet)).unwrap(); + } + + if last_check.elapsed().as_secs() > 10 { + cstates.retain(|addr, state| { + if state.last_ack.elapsed().as_secs() > 30 { + warn!("client dropped: {addr}"); + recv_tx2.send((state.conn_id, Packet::Disconnect)).unwrap(); + false + } else { + true + } + }); + last_check = Instant::now(); + } + } + }); + Ok(s) + } + pub fn broadcast(&self, packet: Packet, reliable: bool) { + let ser = Arc::new(packet.write_alloc()); + for (cid, (tcp, udp)) in self.conns.lock().unwrap().iter() { + if !reliable { + if let Some(addr) = udp { + self.udp_sock.send_to(&ser, addr).unwrap(); + continue; + } + } + if let Err(_) = tcp.send(ser.clone()) { + warn!("{cid}: queue full, packet dropped") + } + } + } + pub fn send(&self, conn: u128, packet: Packet, reliable: bool) { + let ser = Arc::new(packet.write_alloc()); + if let Some((tcp, udp)) = self.conns.lock().unwrap().get(&conn) { + if !reliable { + if let Some(addr) = udp { + self.udp_sock.send_to(&ser, addr).unwrap(); + return; + } + } + if let Err(_) = tcp.send(ser.clone()) { + warn!("{conn}: queue full, packet dropped") + } + } + } +} + +fn handle_conn_read(conn: u128, sock: TcpStream, tx: Sender<(u128, Packet)>) -> Result<()> { + let mut sock = BufReader::new(sock); + loop { + let packet = Packet::read(&mut sock)?; + debug!("{conn} <- {packet:?}"); + tx.send((conn, packet)).unwrap(); + } +} +fn handle_conn_write(conn: u128, sock: TcpStream, rx: Receiver<Arc<Vec<u8>>>) -> Result<()> { + let mut sock = BufWriter::new(sock); + for packet in rx { + debug!("{conn} -> {packet:?}"); + sock.write_all(&packet)?; + sock.flush()?; + } + Ok(()) +} |