summaryrefslogtreecommitdiff
path: root/server/src/network.rs
diff options
context:
space:
mode:
Diffstat (limited to 'server/src/network.rs')
-rw-r--r--server/src/network.rs191
1 files changed, 191 insertions, 0 deletions
diff --git a/server/src/network.rs b/server/src/network.rs
new file mode 100644
index 0000000..22cbc65
--- /dev/null
+++ b/server/src/network.rs
@@ -0,0 +1,191 @@
+use anyhow::Result;
+use log::{debug, error, info, warn};
+use std::{
+ collections::HashMap,
+ io::{BufReader, BufWriter, Write},
+ net::{SocketAddr, TcpListener, TcpStream, UdpSocket},
+ sync::{
+ Arc, Mutex,
+ mpsc::{Receiver, Sender, SyncSender, channel, sync_channel},
+ },
+ thread::spawn,
+ time::Instant,
+};
+use weareshared::packets::{Packet, ReadWrite};
+
+pub struct ServerNetwork {
+ conns: Arc<Mutex<HashMap<u128, (SyncSender<Arc<Vec<u8>>>, Option<SocketAddr>)>>>,
+ udp_sock: Arc<UdpSocket>,
+ pub recv: Receiver<(u128, Packet)>,
+}
+impl ServerNetwork {
+ pub fn new(addr: SocketAddr) -> Result<Self> {
+ let (recv_tx, recv_rx) = channel();
+ let conns = Arc::new(Mutex::new(HashMap::new()));
+
+ let tcp_listener = TcpListener::bind(addr)?;
+ info!("tcp bound to {}", tcp_listener.local_addr()?);
+ let udp_sock = Arc::new(UdpSocket::bind(addr)?);
+ info!("udp bound to {}", udp_sock.local_addr()?);
+
+ let s = Self {
+ conns: conns.clone(),
+ recv: recv_rx,
+ udp_sock: udp_sock.clone(),
+ };
+ let recv_tx2 = recv_tx.clone();
+ let conns2 = conns.clone();
+ spawn(move || {
+ info!("tcp listener thread started");
+ loop {
+ let (mut sock, addr) = match tcp_listener.accept() {
+ Ok(x) => x,
+ Err(e) => {
+ warn!("cannot accept tcp: {e}");
+ continue;
+ }
+ };
+ info!("TCP connected {addr}");
+
+ let conns = conns.clone();
+ let recv_tx = recv_tx.clone();
+ spawn(move || {
+ let conn = match Packet::read(&mut sock) {
+ Ok(Packet::Connect(x)) => x,
+ Ok(_) => {
+ warn!("client send non-connect packet first");
+ return;
+ }
+ Err(e) => {
+ warn!("client handshake failed: {e}");
+ return;
+ }
+ };
+ let (send_tx, send_rx) = sync_channel(128);
+ conns.lock().unwrap().insert(conn, (send_tx, None));
+
+ let sock2 = match sock.try_clone() {
+ Ok(x) => x,
+ Err(e) => {
+ error!("cannot duplicate client socket: {e}");
+ return;
+ }
+ };
+ spawn(move || {
+ if let Err(e) = handle_conn_write(conn, sock2, send_rx) {
+ warn!("client outbound error: {e}");
+ }
+ });
+ if let Err(e) = handle_conn_read(conn, sock, recv_tx) {
+ warn!("client inbound error: {e}");
+ }
+ conns.lock().unwrap().remove(&conn);
+ });
+ }
+ });
+ spawn(move || {
+ info!("udp listener thread started");
+ let mut buf = [0u8; 8096];
+ struct CState {
+ last_ack: Instant,
+ conn_id: u128,
+ }
+ let mut cstates = HashMap::new();
+ let mut last_check = Instant::now();
+ loop {
+ let (size, addr) = match udp_sock.recv_from(&mut buf) {
+ Err(e) => {
+ warn!("udp recv failed: {e}");
+ continue;
+ }
+ Ok(s) => s,
+ };
+
+ let mut packet = &buf[..size];
+ let packet = match Packet::read(&mut packet) {
+ Ok(p) => p,
+ Err(e) => {
+ warn!("invalid packet from {e}");
+ continue;
+ }
+ };
+ if let Packet::Connect(id) = &packet {
+ if let Some((_send, udp_addr)) = conns2.lock().unwrap().get_mut(id) {
+ *udp_addr = Some(addr);
+ cstates.insert(addr, CState {
+ conn_id: *id,
+ last_ack: Instant::now(),
+ });
+ }
+ }
+ if let Some(conn) = cstates.get(&addr) {
+ let cid = conn.conn_id;
+ if let Packet::Disconnect = &packet {
+ cstates.remove(&addr);
+ }
+ recv_tx2.send((cid, packet)).unwrap();
+ }
+
+ if last_check.elapsed().as_secs() > 10 {
+ cstates.retain(|addr, state| {
+ if state.last_ack.elapsed().as_secs() > 30 {
+ warn!("client dropped: {addr}");
+ recv_tx2.send((state.conn_id, Packet::Disconnect)).unwrap();
+ false
+ } else {
+ true
+ }
+ });
+ last_check = Instant::now();
+ }
+ }
+ });
+ Ok(s)
+ }
+ pub fn broadcast(&self, packet: Packet, reliable: bool) {
+ let ser = Arc::new(packet.write_alloc());
+ for (cid, (tcp, udp)) in self.conns.lock().unwrap().iter() {
+ if !reliable {
+ if let Some(addr) = udp {
+ self.udp_sock.send_to(&ser, addr).unwrap();
+ continue;
+ }
+ }
+ if let Err(_) = tcp.send(ser.clone()) {
+ warn!("{cid}: queue full, packet dropped")
+ }
+ }
+ }
+ pub fn send(&self, conn: u128, packet: Packet, reliable: bool) {
+ let ser = Arc::new(packet.write_alloc());
+ if let Some((tcp, udp)) = self.conns.lock().unwrap().get(&conn) {
+ if !reliable {
+ if let Some(addr) = udp {
+ self.udp_sock.send_to(&ser, addr).unwrap();
+ return;
+ }
+ }
+ if let Err(_) = tcp.send(ser.clone()) {
+ warn!("{conn}: queue full, packet dropped")
+ }
+ }
+ }
+}
+
+fn handle_conn_read(conn: u128, sock: TcpStream, tx: Sender<(u128, Packet)>) -> Result<()> {
+ let mut sock = BufReader::new(sock);
+ loop {
+ let packet = Packet::read(&mut sock)?;
+ debug!("{conn} <- {packet:?}");
+ tx.send((conn, packet)).unwrap();
+ }
+}
+fn handle_conn_write(conn: u128, sock: TcpStream, rx: Receiver<Arc<Vec<u8>>>) -> Result<()> {
+ let mut sock = BufWriter::new(sock);
+ for packet in rx {
+ debug!("{conn} -> {packet:?}");
+ sock.write_all(&packet)?;
+ sock.flush()?;
+ }
+ Ok(())
+}