summaryrefslogtreecommitdiff
path: root/server/src
diff options
context:
space:
mode:
authormetamuffin <metamuffin@disroot.org>2025-01-06 18:54:00 +0100
committermetamuffin <metamuffin@disroot.org>2025-01-06 18:54:00 +0100
commit6af8b165fe8cbab35721a8797ca85cda454a5ff4 (patch)
tree05006c8b378dca5f033a72b0f0b3196fb8691423 /server/src
parent14d348febd549b944d03030bf748f7817a2bedac (diff)
downloadweareserver-6af8b165fe8cbab35721a8797ca85cda454a5ff4.tar
weareserver-6af8b165fe8cbab35721a8797ca85cda454a5ff4.tar.bz2
weareserver-6af8b165fe8cbab35721a8797ca85cda454a5ff4.tar.zst
new network
Diffstat (limited to 'server/src')
-rw-r--r--server/src/main.rs106
-rw-r--r--server/src/network.rs191
2 files changed, 222 insertions, 75 deletions
diff --git a/server/src/main.rs b/server/src/main.rs
index 6b2911f..a8473c6 100644
--- a/server/src/main.rs
+++ b/server/src/main.rs
@@ -1,18 +1,12 @@
+pub mod network;
+
use anyhow::Result;
use clap::Parser;
-use log::{debug, info};
-use std::{
- collections::HashMap,
- io::{BufReader, BufWriter, Write},
- net::{IpAddr, TcpListener, TcpStream},
- sync::{
- Arc, Mutex,
- mpsc::{Receiver, Sender, channel},
- },
- thread::spawn,
-};
+use log::warn;
+use network::ServerNetwork;
+use std::net::{IpAddr, SocketAddr};
use weareshared::{
- packets::{Packet, ReadWrite},
+ packets::{Data, Packet},
store::ResourceStore,
tree::SceneTree,
};
@@ -26,7 +20,6 @@ struct Args {
}
struct State {
- tx: HashMap<usize, Sender<Packet>>,
store: ResourceStore,
tree: SceneTree,
}
@@ -35,101 +28,64 @@ fn main() -> Result<()> {
env_logger::init_from_env("LOG");
let args = Args::parse();
- let listener = TcpListener::bind((args.bind_addr, args.port))?;
- info!("bound to {}", listener.local_addr()?);
- let state = Arc::new(Mutex::new(State::new()?));
- for conn in 0.. {
- let (sock, addr) = listener.accept()?;
- info!("{addr} connected");
- let (tx, rx) = channel();
- let state2 = state.clone();
- let sock2 = sock.try_clone().unwrap();
- spawn(move || {
- for p in state2.lock().unwrap().tree.prime_client() {
- tx.send(p).unwrap();
- }
- state2.lock().unwrap().tx.insert(conn, tx);
- let _ = handle_conn_read(conn, sock, state2.clone());
- info!("{addr} disconnected");
- state2.lock().unwrap().tx.remove(&conn);
- });
- spawn(move || {
- let _ = handle_conn_write(conn, sock2, rx);
- });
- }
- Ok(())
-}
+ let net = ServerNetwork::new(SocketAddr::new(args.bind_addr, args.port))?;
+ let mut state = State::new()?;
-fn handle_conn_read(conn: usize, sock: TcpStream, state: Arc<Mutex<State>>) -> Result<()> {
- let mut sock = BufReader::new(sock);
- loop {
- let packet = Packet::read(&mut sock)?;
- debug!("{conn} <- {packet:?}");
- state.lock().unwrap().handle_packet(conn, packet)?;
- }
-}
-fn handle_conn_write(conn: usize, sock: TcpStream, rx: Receiver<Packet>) -> Result<()> {
- let mut sock = BufWriter::new(sock);
- for packet in rx {
- debug!("{conn} -> {packet:?}");
- packet.write(&mut sock)?;
- sock.flush()?;
+ while let Ok((conn, packet)) = net.recv.recv() {
+ if let Err(e) = state.handle_packet(conn, packet, &net) {
+ warn!("state handler error: ({conn}) {e}");
+ }
}
+
Ok(())
}
+
impl State {
pub fn new() -> Result<Self> {
Ok(Self {
- tx: HashMap::new(),
store: ResourceStore::new_persistent(
&xdg::BaseDirectories::with_prefix("weareserver")?.place_cache_file("resources")?,
)?,
tree: SceneTree::default(),
})
}
- pub fn broadcast(&self, packet: Packet) -> Result<()> {
- for tx in self.tx.values() {
- tx.send(packet.clone())?;
- }
- Ok(())
- }
- pub fn send(&self, conn: usize, packet: Packet) -> Result<()> {
- if let Some(tx) = self.tx.get(&conn) {
- tx.send(packet)?;
- }
- Ok(())
- }
- pub fn handle_packet(&mut self, conn: usize, packet: Packet) -> Result<()> {
+ pub fn handle_packet(&mut self, conn: u128, packet: Packet, net: &ServerNetwork) -> Result<()> {
self.tree.update(&packet);
match packet {
+ Packet::Connect(_) => {
+ for p in self.tree.prime_client() {
+ net.send(conn, p, true);
+ }
+ }
+ Packet::Disconnect => {}
Packet::RequestResource(resource) => {
if let Some(r) = self.store.get(resource)? {
- self.send(conn, Packet::RespondResource(r))?;
+ net.send(conn, Packet::RespondResource(Data(r)), true);
} else {
- self.broadcast(Packet::RequestResource(resource))?;
+ net.broadcast(Packet::RequestResource(resource), true);
}
}
Packet::RespondResource(data) => {
- self.store.set(&data)?;
- self.broadcast(Packet::RespondResource(data))?;
+ self.store.set(&data.0)?;
+ net.broadcast(Packet::RespondResource(data), true);
}
Packet::Add(object, resource) => {
- self.broadcast(Packet::Add(object, resource))?;
+ net.broadcast(Packet::Add(object, resource), true);
}
Packet::Remove(object) => {
- self.broadcast(Packet::Remove(object))?;
+ net.broadcast(Packet::Remove(object), true);
}
Packet::Position(object, pos, rot) => {
- self.broadcast(Packet::Position(object, pos, rot))?;
+ net.broadcast(Packet::Position(object, pos, rot), true);
}
Packet::Pose(object, vec) => {
- self.broadcast(Packet::Pose(object, vec))?;
+ net.broadcast(Packet::Pose(object, vec), true);
}
Packet::Parent(parent, child) => {
- self.broadcast(Packet::Parent(parent, child))?;
+ net.broadcast(Packet::Parent(parent, child), true);
}
Packet::Sound(object, vec) => {
- self.broadcast(Packet::Sound(object, vec))?;
+ net.broadcast(Packet::Sound(object, vec), true);
}
}
Ok(())
diff --git a/server/src/network.rs b/server/src/network.rs
new file mode 100644
index 0000000..22cbc65
--- /dev/null
+++ b/server/src/network.rs
@@ -0,0 +1,191 @@
+use anyhow::Result;
+use log::{debug, error, info, warn};
+use std::{
+ collections::HashMap,
+ io::{BufReader, BufWriter, Write},
+ net::{SocketAddr, TcpListener, TcpStream, UdpSocket},
+ sync::{
+ Arc, Mutex,
+ mpsc::{Receiver, Sender, SyncSender, channel, sync_channel},
+ },
+ thread::spawn,
+ time::Instant,
+};
+use weareshared::packets::{Packet, ReadWrite};
+
+pub struct ServerNetwork {
+ conns: Arc<Mutex<HashMap<u128, (SyncSender<Arc<Vec<u8>>>, Option<SocketAddr>)>>>,
+ udp_sock: Arc<UdpSocket>,
+ pub recv: Receiver<(u128, Packet)>,
+}
+impl ServerNetwork {
+ pub fn new(addr: SocketAddr) -> Result<Self> {
+ let (recv_tx, recv_rx) = channel();
+ let conns = Arc::new(Mutex::new(HashMap::new()));
+
+ let tcp_listener = TcpListener::bind(addr)?;
+ info!("tcp bound to {}", tcp_listener.local_addr()?);
+ let udp_sock = Arc::new(UdpSocket::bind(addr)?);
+ info!("udp bound to {}", udp_sock.local_addr()?);
+
+ let s = Self {
+ conns: conns.clone(),
+ recv: recv_rx,
+ udp_sock: udp_sock.clone(),
+ };
+ let recv_tx2 = recv_tx.clone();
+ let conns2 = conns.clone();
+ spawn(move || {
+ info!("tcp listener thread started");
+ loop {
+ let (mut sock, addr) = match tcp_listener.accept() {
+ Ok(x) => x,
+ Err(e) => {
+ warn!("cannot accept tcp: {e}");
+ continue;
+ }
+ };
+ info!("TCP connected {addr}");
+
+ let conns = conns.clone();
+ let recv_tx = recv_tx.clone();
+ spawn(move || {
+ let conn = match Packet::read(&mut sock) {
+ Ok(Packet::Connect(x)) => x,
+ Ok(_) => {
+ warn!("client send non-connect packet first");
+ return;
+ }
+ Err(e) => {
+ warn!("client handshake failed: {e}");
+ return;
+ }
+ };
+ let (send_tx, send_rx) = sync_channel(128);
+ conns.lock().unwrap().insert(conn, (send_tx, None));
+
+ let sock2 = match sock.try_clone() {
+ Ok(x) => x,
+ Err(e) => {
+ error!("cannot duplicate client socket: {e}");
+ return;
+ }
+ };
+ spawn(move || {
+ if let Err(e) = handle_conn_write(conn, sock2, send_rx) {
+ warn!("client outbound error: {e}");
+ }
+ });
+ if let Err(e) = handle_conn_read(conn, sock, recv_tx) {
+ warn!("client inbound error: {e}");
+ }
+ conns.lock().unwrap().remove(&conn);
+ });
+ }
+ });
+ spawn(move || {
+ info!("udp listener thread started");
+ let mut buf = [0u8; 8096];
+ struct CState {
+ last_ack: Instant,
+ conn_id: u128,
+ }
+ let mut cstates = HashMap::new();
+ let mut last_check = Instant::now();
+ loop {
+ let (size, addr) = match udp_sock.recv_from(&mut buf) {
+ Err(e) => {
+ warn!("udp recv failed: {e}");
+ continue;
+ }
+ Ok(s) => s,
+ };
+
+ let mut packet = &buf[..size];
+ let packet = match Packet::read(&mut packet) {
+ Ok(p) => p,
+ Err(e) => {
+ warn!("invalid packet from {e}");
+ continue;
+ }
+ };
+ if let Packet::Connect(id) = &packet {
+ if let Some((_send, udp_addr)) = conns2.lock().unwrap().get_mut(id) {
+ *udp_addr = Some(addr);
+ cstates.insert(addr, CState {
+ conn_id: *id,
+ last_ack: Instant::now(),
+ });
+ }
+ }
+ if let Some(conn) = cstates.get(&addr) {
+ let cid = conn.conn_id;
+ if let Packet::Disconnect = &packet {
+ cstates.remove(&addr);
+ }
+ recv_tx2.send((cid, packet)).unwrap();
+ }
+
+ if last_check.elapsed().as_secs() > 10 {
+ cstates.retain(|addr, state| {
+ if state.last_ack.elapsed().as_secs() > 30 {
+ warn!("client dropped: {addr}");
+ recv_tx2.send((state.conn_id, Packet::Disconnect)).unwrap();
+ false
+ } else {
+ true
+ }
+ });
+ last_check = Instant::now();
+ }
+ }
+ });
+ Ok(s)
+ }
+ pub fn broadcast(&self, packet: Packet, reliable: bool) {
+ let ser = Arc::new(packet.write_alloc());
+ for (cid, (tcp, udp)) in self.conns.lock().unwrap().iter() {
+ if !reliable {
+ if let Some(addr) = udp {
+ self.udp_sock.send_to(&ser, addr).unwrap();
+ continue;
+ }
+ }
+ if let Err(_) = tcp.send(ser.clone()) {
+ warn!("{cid}: queue full, packet dropped")
+ }
+ }
+ }
+ pub fn send(&self, conn: u128, packet: Packet, reliable: bool) {
+ let ser = Arc::new(packet.write_alloc());
+ if let Some((tcp, udp)) = self.conns.lock().unwrap().get(&conn) {
+ if !reliable {
+ if let Some(addr) = udp {
+ self.udp_sock.send_to(&ser, addr).unwrap();
+ return;
+ }
+ }
+ if let Err(_) = tcp.send(ser.clone()) {
+ warn!("{conn}: queue full, packet dropped")
+ }
+ }
+ }
+}
+
+fn handle_conn_read(conn: u128, sock: TcpStream, tx: Sender<(u128, Packet)>) -> Result<()> {
+ let mut sock = BufReader::new(sock);
+ loop {
+ let packet = Packet::read(&mut sock)?;
+ debug!("{conn} <- {packet:?}");
+ tx.send((conn, packet)).unwrap();
+ }
+}
+fn handle_conn_write(conn: u128, sock: TcpStream, rx: Receiver<Arc<Vec<u8>>>) -> Result<()> {
+ let mut sock = BufWriter::new(sock);
+ for packet in rx {
+ debug!("{conn} -> {packet:?}");
+ sock.write_all(&packet)?;
+ sock.flush()?;
+ }
+ Ok(())
+}