From 6af8b165fe8cbab35721a8797ca85cda454a5ff4 Mon Sep 17 00:00:00 2001 From: metamuffin Date: Mon, 6 Jan 2025 18:54:00 +0100 Subject: new network --- server/src/main.rs | 106 ++++++++-------------------- server/src/network.rs | 191 ++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 222 insertions(+), 75 deletions(-) create mode 100644 server/src/network.rs (limited to 'server/src') diff --git a/server/src/main.rs b/server/src/main.rs index 6b2911f..a8473c6 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -1,18 +1,12 @@ +pub mod network; + use anyhow::Result; use clap::Parser; -use log::{debug, info}; -use std::{ - collections::HashMap, - io::{BufReader, BufWriter, Write}, - net::{IpAddr, TcpListener, TcpStream}, - sync::{ - Arc, Mutex, - mpsc::{Receiver, Sender, channel}, - }, - thread::spawn, -}; +use log::warn; +use network::ServerNetwork; +use std::net::{IpAddr, SocketAddr}; use weareshared::{ - packets::{Packet, ReadWrite}, + packets::{Data, Packet}, store::ResourceStore, tree::SceneTree, }; @@ -26,7 +20,6 @@ struct Args { } struct State { - tx: HashMap>, store: ResourceStore, tree: SceneTree, } @@ -35,101 +28,64 @@ fn main() -> Result<()> { env_logger::init_from_env("LOG"); let args = Args::parse(); - let listener = TcpListener::bind((args.bind_addr, args.port))?; - info!("bound to {}", listener.local_addr()?); - let state = Arc::new(Mutex::new(State::new()?)); - for conn in 0.. { - let (sock, addr) = listener.accept()?; - info!("{addr} connected"); - let (tx, rx) = channel(); - let state2 = state.clone(); - let sock2 = sock.try_clone().unwrap(); - spawn(move || { - for p in state2.lock().unwrap().tree.prime_client() { - tx.send(p).unwrap(); - } - state2.lock().unwrap().tx.insert(conn, tx); - let _ = handle_conn_read(conn, sock, state2.clone()); - info!("{addr} disconnected"); - state2.lock().unwrap().tx.remove(&conn); - }); - spawn(move || { - let _ = handle_conn_write(conn, sock2, rx); - }); - } - Ok(()) -} + let net = ServerNetwork::new(SocketAddr::new(args.bind_addr, args.port))?; + let mut state = State::new()?; -fn handle_conn_read(conn: usize, sock: TcpStream, state: Arc>) -> Result<()> { - let mut sock = BufReader::new(sock); - loop { - let packet = Packet::read(&mut sock)?; - debug!("{conn} <- {packet:?}"); - state.lock().unwrap().handle_packet(conn, packet)?; - } -} -fn handle_conn_write(conn: usize, sock: TcpStream, rx: Receiver) -> Result<()> { - let mut sock = BufWriter::new(sock); - for packet in rx { - debug!("{conn} -> {packet:?}"); - packet.write(&mut sock)?; - sock.flush()?; + while let Ok((conn, packet)) = net.recv.recv() { + if let Err(e) = state.handle_packet(conn, packet, &net) { + warn!("state handler error: ({conn}) {e}"); + } } + Ok(()) } + impl State { pub fn new() -> Result { Ok(Self { - tx: HashMap::new(), store: ResourceStore::new_persistent( &xdg::BaseDirectories::with_prefix("weareserver")?.place_cache_file("resources")?, )?, tree: SceneTree::default(), }) } - pub fn broadcast(&self, packet: Packet) -> Result<()> { - for tx in self.tx.values() { - tx.send(packet.clone())?; - } - Ok(()) - } - pub fn send(&self, conn: usize, packet: Packet) -> Result<()> { - if let Some(tx) = self.tx.get(&conn) { - tx.send(packet)?; - } - Ok(()) - } - pub fn handle_packet(&mut self, conn: usize, packet: Packet) -> Result<()> { + pub fn handle_packet(&mut self, conn: u128, packet: Packet, net: &ServerNetwork) -> Result<()> { self.tree.update(&packet); match packet { + Packet::Connect(_) => { + for p in self.tree.prime_client() { + net.send(conn, p, true); + } + } + Packet::Disconnect => {} Packet::RequestResource(resource) => { if let Some(r) = self.store.get(resource)? { - self.send(conn, Packet::RespondResource(r))?; + net.send(conn, Packet::RespondResource(Data(r)), true); } else { - self.broadcast(Packet::RequestResource(resource))?; + net.broadcast(Packet::RequestResource(resource), true); } } Packet::RespondResource(data) => { - self.store.set(&data)?; - self.broadcast(Packet::RespondResource(data))?; + self.store.set(&data.0)?; + net.broadcast(Packet::RespondResource(data), true); } Packet::Add(object, resource) => { - self.broadcast(Packet::Add(object, resource))?; + net.broadcast(Packet::Add(object, resource), true); } Packet::Remove(object) => { - self.broadcast(Packet::Remove(object))?; + net.broadcast(Packet::Remove(object), true); } Packet::Position(object, pos, rot) => { - self.broadcast(Packet::Position(object, pos, rot))?; + net.broadcast(Packet::Position(object, pos, rot), true); } Packet::Pose(object, vec) => { - self.broadcast(Packet::Pose(object, vec))?; + net.broadcast(Packet::Pose(object, vec), true); } Packet::Parent(parent, child) => { - self.broadcast(Packet::Parent(parent, child))?; + net.broadcast(Packet::Parent(parent, child), true); } Packet::Sound(object, vec) => { - self.broadcast(Packet::Sound(object, vec))?; + net.broadcast(Packet::Sound(object, vec), true); } } Ok(()) diff --git a/server/src/network.rs b/server/src/network.rs new file mode 100644 index 0000000..22cbc65 --- /dev/null +++ b/server/src/network.rs @@ -0,0 +1,191 @@ +use anyhow::Result; +use log::{debug, error, info, warn}; +use std::{ + collections::HashMap, + io::{BufReader, BufWriter, Write}, + net::{SocketAddr, TcpListener, TcpStream, UdpSocket}, + sync::{ + Arc, Mutex, + mpsc::{Receiver, Sender, SyncSender, channel, sync_channel}, + }, + thread::spawn, + time::Instant, +}; +use weareshared::packets::{Packet, ReadWrite}; + +pub struct ServerNetwork { + conns: Arc>>, Option)>>>, + udp_sock: Arc, + pub recv: Receiver<(u128, Packet)>, +} +impl ServerNetwork { + pub fn new(addr: SocketAddr) -> Result { + let (recv_tx, recv_rx) = channel(); + let conns = Arc::new(Mutex::new(HashMap::new())); + + let tcp_listener = TcpListener::bind(addr)?; + info!("tcp bound to {}", tcp_listener.local_addr()?); + let udp_sock = Arc::new(UdpSocket::bind(addr)?); + info!("udp bound to {}", udp_sock.local_addr()?); + + let s = Self { + conns: conns.clone(), + recv: recv_rx, + udp_sock: udp_sock.clone(), + }; + let recv_tx2 = recv_tx.clone(); + let conns2 = conns.clone(); + spawn(move || { + info!("tcp listener thread started"); + loop { + let (mut sock, addr) = match tcp_listener.accept() { + Ok(x) => x, + Err(e) => { + warn!("cannot accept tcp: {e}"); + continue; + } + }; + info!("TCP connected {addr}"); + + let conns = conns.clone(); + let recv_tx = recv_tx.clone(); + spawn(move || { + let conn = match Packet::read(&mut sock) { + Ok(Packet::Connect(x)) => x, + Ok(_) => { + warn!("client send non-connect packet first"); + return; + } + Err(e) => { + warn!("client handshake failed: {e}"); + return; + } + }; + let (send_tx, send_rx) = sync_channel(128); + conns.lock().unwrap().insert(conn, (send_tx, None)); + + let sock2 = match sock.try_clone() { + Ok(x) => x, + Err(e) => { + error!("cannot duplicate client socket: {e}"); + return; + } + }; + spawn(move || { + if let Err(e) = handle_conn_write(conn, sock2, send_rx) { + warn!("client outbound error: {e}"); + } + }); + if let Err(e) = handle_conn_read(conn, sock, recv_tx) { + warn!("client inbound error: {e}"); + } + conns.lock().unwrap().remove(&conn); + }); + } + }); + spawn(move || { + info!("udp listener thread started"); + let mut buf = [0u8; 8096]; + struct CState { + last_ack: Instant, + conn_id: u128, + } + let mut cstates = HashMap::new(); + let mut last_check = Instant::now(); + loop { + let (size, addr) = match udp_sock.recv_from(&mut buf) { + Err(e) => { + warn!("udp recv failed: {e}"); + continue; + } + Ok(s) => s, + }; + + let mut packet = &buf[..size]; + let packet = match Packet::read(&mut packet) { + Ok(p) => p, + Err(e) => { + warn!("invalid packet from {e}"); + continue; + } + }; + if let Packet::Connect(id) = &packet { + if let Some((_send, udp_addr)) = conns2.lock().unwrap().get_mut(id) { + *udp_addr = Some(addr); + cstates.insert(addr, CState { + conn_id: *id, + last_ack: Instant::now(), + }); + } + } + if let Some(conn) = cstates.get(&addr) { + let cid = conn.conn_id; + if let Packet::Disconnect = &packet { + cstates.remove(&addr); + } + recv_tx2.send((cid, packet)).unwrap(); + } + + if last_check.elapsed().as_secs() > 10 { + cstates.retain(|addr, state| { + if state.last_ack.elapsed().as_secs() > 30 { + warn!("client dropped: {addr}"); + recv_tx2.send((state.conn_id, Packet::Disconnect)).unwrap(); + false + } else { + true + } + }); + last_check = Instant::now(); + } + } + }); + Ok(s) + } + pub fn broadcast(&self, packet: Packet, reliable: bool) { + let ser = Arc::new(packet.write_alloc()); + for (cid, (tcp, udp)) in self.conns.lock().unwrap().iter() { + if !reliable { + if let Some(addr) = udp { + self.udp_sock.send_to(&ser, addr).unwrap(); + continue; + } + } + if let Err(_) = tcp.send(ser.clone()) { + warn!("{cid}: queue full, packet dropped") + } + } + } + pub fn send(&self, conn: u128, packet: Packet, reliable: bool) { + let ser = Arc::new(packet.write_alloc()); + if let Some((tcp, udp)) = self.conns.lock().unwrap().get(&conn) { + if !reliable { + if let Some(addr) = udp { + self.udp_sock.send_to(&ser, addr).unwrap(); + return; + } + } + if let Err(_) = tcp.send(ser.clone()) { + warn!("{conn}: queue full, packet dropped") + } + } + } +} + +fn handle_conn_read(conn: u128, sock: TcpStream, tx: Sender<(u128, Packet)>) -> Result<()> { + let mut sock = BufReader::new(sock); + loop { + let packet = Packet::read(&mut sock)?; + debug!("{conn} <- {packet:?}"); + tx.send((conn, packet)).unwrap(); + } +} +fn handle_conn_write(conn: u128, sock: TcpStream, rx: Receiver>>) -> Result<()> { + let mut sock = BufWriter::new(sock); + for packet in rx { + debug!("{conn} -> {packet:?}"); + sock.write_all(&packet)?; + sock.flush()?; + } + Ok(()) +} -- cgit v1.2.3-70-g09d2