From 6af8b165fe8cbab35721a8797ca85cda454a5ff4 Mon Sep 17 00:00:00 2001 From: metamuffin Date: Mon, 6 Jan 2025 18:54:00 +0100 Subject: new network --- server/src/main.rs | 106 ++++++++++++++++------------------------------------- 1 file changed, 31 insertions(+), 75 deletions(-) (limited to 'server/src/main.rs') diff --git a/server/src/main.rs b/server/src/main.rs index 6b2911f..a8473c6 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -1,18 +1,12 @@ +pub mod network; + use anyhow::Result; use clap::Parser; -use log::{debug, info}; -use std::{ - collections::HashMap, - io::{BufReader, BufWriter, Write}, - net::{IpAddr, TcpListener, TcpStream}, - sync::{ - Arc, Mutex, - mpsc::{Receiver, Sender, channel}, - }, - thread::spawn, -}; +use log::warn; +use network::ServerNetwork; +use std::net::{IpAddr, SocketAddr}; use weareshared::{ - packets::{Packet, ReadWrite}, + packets::{Data, Packet}, store::ResourceStore, tree::SceneTree, }; @@ -26,7 +20,6 @@ struct Args { } struct State { - tx: HashMap>, store: ResourceStore, tree: SceneTree, } @@ -35,101 +28,64 @@ fn main() -> Result<()> { env_logger::init_from_env("LOG"); let args = Args::parse(); - let listener = TcpListener::bind((args.bind_addr, args.port))?; - info!("bound to {}", listener.local_addr()?); - let state = Arc::new(Mutex::new(State::new()?)); - for conn in 0.. { - let (sock, addr) = listener.accept()?; - info!("{addr} connected"); - let (tx, rx) = channel(); - let state2 = state.clone(); - let sock2 = sock.try_clone().unwrap(); - spawn(move || { - for p in state2.lock().unwrap().tree.prime_client() { - tx.send(p).unwrap(); - } - state2.lock().unwrap().tx.insert(conn, tx); - let _ = handle_conn_read(conn, sock, state2.clone()); - info!("{addr} disconnected"); - state2.lock().unwrap().tx.remove(&conn); - }); - spawn(move || { - let _ = handle_conn_write(conn, sock2, rx); - }); - } - Ok(()) -} + let net = ServerNetwork::new(SocketAddr::new(args.bind_addr, args.port))?; + let mut state = State::new()?; -fn handle_conn_read(conn: usize, sock: TcpStream, state: Arc>) -> Result<()> { - let mut sock = BufReader::new(sock); - loop { - let packet = Packet::read(&mut sock)?; - debug!("{conn} <- {packet:?}"); - state.lock().unwrap().handle_packet(conn, packet)?; - } -} -fn handle_conn_write(conn: usize, sock: TcpStream, rx: Receiver) -> Result<()> { - let mut sock = BufWriter::new(sock); - for packet in rx { - debug!("{conn} -> {packet:?}"); - packet.write(&mut sock)?; - sock.flush()?; + while let Ok((conn, packet)) = net.recv.recv() { + if let Err(e) = state.handle_packet(conn, packet, &net) { + warn!("state handler error: ({conn}) {e}"); + } } + Ok(()) } + impl State { pub fn new() -> Result { Ok(Self { - tx: HashMap::new(), store: ResourceStore::new_persistent( &xdg::BaseDirectories::with_prefix("weareserver")?.place_cache_file("resources")?, )?, tree: SceneTree::default(), }) } - pub fn broadcast(&self, packet: Packet) -> Result<()> { - for tx in self.tx.values() { - tx.send(packet.clone())?; - } - Ok(()) - } - pub fn send(&self, conn: usize, packet: Packet) -> Result<()> { - if let Some(tx) = self.tx.get(&conn) { - tx.send(packet)?; - } - Ok(()) - } - pub fn handle_packet(&mut self, conn: usize, packet: Packet) -> Result<()> { + pub fn handle_packet(&mut self, conn: u128, packet: Packet, net: &ServerNetwork) -> Result<()> { self.tree.update(&packet); match packet { + Packet::Connect(_) => { + for p in self.tree.prime_client() { + net.send(conn, p, true); + } + } + Packet::Disconnect => {} Packet::RequestResource(resource) => { if let Some(r) = self.store.get(resource)? { - self.send(conn, Packet::RespondResource(r))?; + net.send(conn, Packet::RespondResource(Data(r)), true); } else { - self.broadcast(Packet::RequestResource(resource))?; + net.broadcast(Packet::RequestResource(resource), true); } } Packet::RespondResource(data) => { - self.store.set(&data)?; - self.broadcast(Packet::RespondResource(data))?; + self.store.set(&data.0)?; + net.broadcast(Packet::RespondResource(data), true); } Packet::Add(object, resource) => { - self.broadcast(Packet::Add(object, resource))?; + net.broadcast(Packet::Add(object, resource), true); } Packet::Remove(object) => { - self.broadcast(Packet::Remove(object))?; + net.broadcast(Packet::Remove(object), true); } Packet::Position(object, pos, rot) => { - self.broadcast(Packet::Position(object, pos, rot))?; + net.broadcast(Packet::Position(object, pos, rot), true); } Packet::Pose(object, vec) => { - self.broadcast(Packet::Pose(object, vec))?; + net.broadcast(Packet::Pose(object, vec), true); } Packet::Parent(parent, child) => { - self.broadcast(Packet::Parent(parent, child))?; + net.broadcast(Packet::Parent(parent, child), true); } Packet::Sound(object, vec) => { - self.broadcast(Packet::Sound(object, vec))?; + net.broadcast(Packet::Sound(object, vec), true); } } Ok(()) -- cgit v1.2.3-70-g09d2