diff options
author | metamuffin <metamuffin@disroot.org> | 2025-01-26 21:07:11 +0100 |
---|---|---|
committer | metamuffin <metamuffin@disroot.org> | 2025-01-26 21:07:11 +0100 |
commit | f0b213d51ee7b44362b5618bf5bd1eacf50d2bb8 (patch) | |
tree | 57d2ed752f0839c37a8e141413bdcf6b3d9dece5 /server | |
parent | f195ae4a1059270f14163b62e7860c3630ee5895 (diff) | |
download | weareserver-f0b213d51ee7b44362b5618bf5bd1eacf50d2bb8.tar weareserver-f0b213d51ee7b44362b5618bf5bd1eacf50d2bb8.tar.bz2 weareserver-f0b213d51ee7b44362b5618bf5bd1eacf50d2bb8.tar.zst |
refactor network
Diffstat (limited to 'server')
-rw-r--r-- | server/src/network.rs | 221 |
1 files changed, 113 insertions, 108 deletions
diff --git a/server/src/network.rs b/server/src/network.rs index e632e97..c59b54b 100644 --- a/server/src/network.rs +++ b/server/src/network.rs @@ -29,8 +29,9 @@ use std::{ }; use weareshared::{helper::ReadWrite, packets::Packet, tree::PacketSink}; +type Conns = Arc<Mutex<HashMap<u128, (SyncSender<Arc<Vec<u8>>>, Option<SocketAddr>)>>>; pub struct ServerNetwork { - conns: Arc<Mutex<HashMap<u128, (SyncSender<Arc<Vec<u8>>>, Option<SocketAddr>)>>>, + conns: Conns, udp_sock: Arc<UdpSocket>, pub recv: Receiver<(u128, Packet)>, } @@ -51,113 +52,8 @@ impl ServerNetwork { }; let recv_tx2 = recv_tx.clone(); let conns2 = conns.clone(); - spawn(move || { - info!("tcp listener thread started"); - loop { - let (mut sock, addr) = match tcp_listener.accept() { - Ok(x) => x, - Err(e) => { - warn!("cannot accept tcp: {e}"); - continue; - } - }; - info!("TCP connected {addr}"); - - let conns = conns.clone(); - let recv_tx = recv_tx.clone(); - spawn(move || { - let conn = match Packet::read(&mut sock) { - Ok(Packet::Connect(x)) => x, - Ok(_) => { - warn!("client send non-connect packet first"); - return; - } - Err(e) => { - warn!("client handshake failed: {e}"); - return; - } - }; - let (send_tx, send_rx) = sync_channel(128); - conns.lock().unwrap().insert(conn, (send_tx, None)); - recv_tx.send((conn, Packet::Connect(conn))).unwrap(); - - let sock2 = match sock.try_clone() { - Ok(x) => x, - Err(e) => { - error!("cannot duplicate client socket: {e}"); - return; - } - }; - spawn(move || { - if let Err(e) = handle_conn_write(sock2, send_rx) { - warn!("client outbound error: {e}"); - } - }); - if let Err(e) = handle_conn_read(conn, sock, &recv_tx) { - warn!("client inbound error: {e}"); - } - recv_tx.send((conn, Packet::Disconnect)).unwrap(); - conns.lock().unwrap().remove(&conn); - }); - } - }); - spawn(move || { - info!("udp listener thread started"); - let mut buf = [0u8; 8096]; - struct CState { - last_ack: Instant, - conn_id: u128, - } - let mut cstates = HashMap::new(); - let mut last_check = Instant::now(); - loop { - let (size, addr) = match udp_sock.recv_from(&mut buf) { - Err(e) => { - warn!("udp recv failed: {e}"); - continue; - } - Ok(s) => s, - }; - - let mut packet = &buf[..size]; - let packet = match Packet::read(&mut packet) { - Ok(p) => p, - Err(e) => { - warn!("invalid packet from {e}"); - continue; - } - }; - if let Packet::Connect(id) = &packet { - if let Some((_send, udp_addr)) = conns2.lock().unwrap().get_mut(id) { - *udp_addr = Some(addr); - cstates.insert(addr, CState { - conn_id: *id, - last_ack: Instant::now(), - }); - } - } else if let Some(conn) = cstates.get(&addr) { - let cid = conn.conn_id; - if let Packet::Disconnect = &packet { - cstates.remove(&addr); - } - debug!("{cid} <~ {packet:?}"); - recv_tx2.send((cid, packet)).unwrap(); - } - - if last_check.elapsed().as_secs() > 10 { - cstates.retain(|addr, state| { - if state.last_ack.elapsed().as_secs() > 30 { - warn!("client dropped: {addr}"); - recv_tx2.send((state.conn_id, Packet::Disconnect)).unwrap(); - false - } else { - true - } - }); - last_check = Instant::now(); - } - } - }); + spawn(move || serve_tcp(tcp_listener, conns, recv_tx)); + spawn(move || serve_udp(udp_sock, conns2, recv_tx2)); Ok(s) } pub fn broadcast(&self, packet: Packet, reliable: bool) { @@ -193,6 +89,115 @@ impl ServerNetwork { } } +fn serve_udp(udp_sock: Arc<UdpSocket>, conns: Conns, recv_tx: Sender<(u128, Packet)>) { + info!("udp listener thread started"); + let mut buf = [0u8; 8096]; + struct CState { + last_ack: Instant, + conn_id: u128, + } + let mut cstates = HashMap::new(); + let mut last_check = Instant::now(); + loop { + let (size, addr) = match udp_sock.recv_from(&mut buf) { + Err(e) => { + warn!("udp recv failed: {e}"); + continue; + } + Ok(s) => s, + }; + + let mut packet = &buf[..size]; + let packet = match Packet::read(&mut packet) { + Ok(p) => p, + Err(e) => { + warn!("invalid packet from {e}"); + continue; + } + }; + if let Packet::Connect(id) = &packet { + if let Some((_send, udp_addr)) = conns.lock().unwrap().get_mut(id) { + *udp_addr = Some(addr); + cstates.insert(addr, CState { + conn_id: *id, + last_ack: Instant::now(), + }); + } + } else if let Some(conn) = cstates.get(&addr) { + let cid = conn.conn_id; + if let Packet::Disconnect = &packet { + cstates.remove(&addr); + } + debug!("{cid} <~ {packet:?}"); + recv_tx.send((cid, packet)).unwrap(); + } + + if last_check.elapsed().as_secs() > 10 { + cstates.retain(|addr, state| { + if state.last_ack.elapsed().as_secs() > 30 { + warn!("client dropped: {addr}"); + recv_tx.send((state.conn_id, Packet::Disconnect)).unwrap(); + false + } else { + true + } + }); + last_check = Instant::now(); + } + } +} + +fn serve_tcp(tcp_listener: TcpListener, conns: Conns, recv_tx: Sender<(u128, Packet)>) { + info!("tcp listener thread started"); + loop { + let (mut sock, addr) = match tcp_listener.accept() { + Ok(x) => x, + Err(e) => { + warn!("cannot accept tcp: {e}"); + continue; + } + }; + info!("TCP connected {addr}"); + + let conns = conns.clone(); + let recv_tx = recv_tx.clone(); + spawn(move || { + let conn = match Packet::read(&mut sock) { + Ok(Packet::Connect(x)) => x, + Ok(_) => { + warn!("client send non-connect packet first"); + return; + } + Err(e) => { + warn!("client handshake failed: {e}"); + return; + } + }; + let (send_tx, send_rx) = sync_channel(128); + conns.lock().unwrap().insert(conn, (send_tx, None)); + recv_tx.send((conn, Packet::Connect(conn))).unwrap(); + + let sock2 = match sock.try_clone() { + Ok(x) => x, + Err(e) => { + error!("cannot duplicate client socket: {e}"); + return; + } + }; + spawn(move || { + if let Err(e) = handle_conn_write(sock2, send_rx) { + warn!("client outbound error: {e}"); + } + }); + if let Err(e) = handle_conn_read(conn, sock, &recv_tx) { + warn!("client inbound error: {e}"); + } + recv_tx.send((conn, Packet::Disconnect)).unwrap(); + conns.lock().unwrap().remove(&conn); + }); + } +} + fn handle_conn_read(conn: u128, sock: TcpStream, tx: &Sender<(u128, Packet)>) -> Result<()> { let mut sock = BufReader::new(sock); loop { |