/*
wearechat - generic multiplayer game with voip
Copyright (C) 2025 metamuffin
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published by
the Free Software Foundation, version 3 of the License only.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see .
*/
use anyhow::Result;
use log::{debug, error, info, warn};
use std::{
collections::HashMap,
io::{BufReader, BufWriter, Write},
net::{SocketAddr, TcpListener, TcpStream, UdpSocket},
sync::{
Arc, Mutex,
mpsc::{Receiver, Sender, SyncSender, channel, sync_channel},
},
thread::spawn,
time::Instant,
};
use weareshared::{helper::ReadWrite, packets::Packet, tree::PacketSink};
type Conns = Arc>>, Option)>>>;
pub struct ServerNetwork {
conns: Conns,
udp_sock: Arc,
pub recv: Receiver<(u128, Packet)>,
}
impl ServerNetwork {
pub fn new(addr: SocketAddr) -> Result {
let (recv_tx, recv_rx) = channel();
let conns = Arc::new(Mutex::new(HashMap::new()));
let tcp_listener = TcpListener::bind(addr)?;
info!("tcp bound to {}", tcp_listener.local_addr()?);
let udp_sock = Arc::new(UdpSocket::bind(addr)?);
info!("udp bound to {}", udp_sock.local_addr()?);
let s = Self {
conns: conns.clone(),
recv: recv_rx,
udp_sock: udp_sock.clone(),
};
let recv_tx2 = recv_tx.clone();
let conns2 = conns.clone();
spawn(move || serve_tcp(tcp_listener, conns, recv_tx));
spawn(move || serve_udp(udp_sock, conns2, recv_tx2));
Ok(s)
}
pub fn broadcast(&self, packet: Packet, reliable: bool) {
debug!("* -> {packet:?}");
let ser = Arc::new(packet.write_alloc().into_owned());
for (cid, (tcp, udp)) in self.conns.lock().unwrap().iter() {
if !reliable {
if let Some(addr) = udp {
self.udp_sock.send_to(&ser, addr).unwrap();
continue;
}
}
if tcp.send(ser.clone()).is_err() {
warn!("{cid}: queue full, packet dropped")
}
}
}
pub fn send(&self, conn: u128, packet: Packet, reliable: bool) {
let ser = Arc::new(packet.write_alloc().into_owned());
if let Some((tcp, udp)) = self.conns.lock().unwrap().get(&conn) {
if !reliable {
if let Some(addr) = udp {
debug!("{conn} ~> {packet:?}");
self.udp_sock.send_to(&ser, addr).unwrap();
return;
}
}
debug!("{conn} -> {packet:?}");
if tcp.send(ser.clone()).is_err() {
warn!("{conn}: queue full, packet dropped")
}
}
}
}
fn serve_udp(udp_sock: Arc, conns: Conns, recv_tx: Sender<(u128, Packet)>) {
info!("udp listener thread started");
let mut buf = [0u8; 8096];
struct CState {
last_ack: Instant,
conn_id: u128,
}
let mut cstates = HashMap::new();
let mut last_check = Instant::now();
loop {
let (size, addr) = match udp_sock.recv_from(&mut buf) {
Err(e) => {
warn!("udp recv failed: {e}");
continue;
}
Ok(s) => s,
};
let mut packet = &buf[..size];
let packet = match Packet::read(&mut packet) {
Ok(p) => p,
Err(e) => {
warn!("invalid packet from {e}");
continue;
}
};
if let Packet::Connect(id) = &packet {
if let Some((_send, udp_addr)) = conns.lock().unwrap().get_mut(id) {
*udp_addr = Some(addr);
cstates.insert(
addr,
CState {
conn_id: *id,
last_ack: Instant::now(),
},
);
}
} else if let Some(conn) = cstates.get(&addr) {
let cid = conn.conn_id;
if let Packet::Disconnect = &packet {
cstates.remove(&addr);
}
debug!("{cid} <~ {packet:?}");
recv_tx.send((cid, packet)).unwrap();
}
if last_check.elapsed().as_secs() > 10 {
cstates.retain(|addr, state| {
if state.last_ack.elapsed().as_secs() > 30 {
warn!("client dropped: {addr}");
recv_tx.send((state.conn_id, Packet::Disconnect)).unwrap();
false
} else {
true
}
});
last_check = Instant::now();
}
}
}
fn serve_tcp(tcp_listener: TcpListener, conns: Conns, recv_tx: Sender<(u128, Packet)>) {
info!("tcp listener thread started");
loop {
let (mut sock, addr) = match tcp_listener.accept() {
Ok(x) => x,
Err(e) => {
warn!("cannot accept tcp: {e}");
continue;
}
};
info!("TCP connected {addr}");
let conns = conns.clone();
let recv_tx = recv_tx.clone();
spawn(move || {
let conn = match Packet::read(&mut sock) {
Ok(Packet::Connect(x)) => x,
Ok(_) => {
warn!("client send non-connect packet first");
return;
}
Err(e) => {
warn!("client handshake failed: {e}");
return;
}
};
let (send_tx, send_rx) = sync_channel(128);
conns.lock().unwrap().insert(conn, (send_tx, None));
recv_tx.send((conn, Packet::Connect(conn))).unwrap();
let sock2 = match sock.try_clone() {
Ok(x) => x,
Err(e) => {
error!("cannot duplicate client socket: {e}");
return;
}
};
spawn(move || {
if let Err(e) = handle_conn_write(sock2, send_rx) {
warn!("client outbound error: {e}");
}
});
if let Err(e) = handle_conn_read(conn, sock, &recv_tx) {
warn!("client inbound error: {e}");
}
recv_tx.send((conn, Packet::Disconnect)).unwrap();
conns.lock().unwrap().remove(&conn);
});
}
}
fn handle_conn_read(conn: u128, sock: TcpStream, tx: &Sender<(u128, Packet)>) -> Result<()> {
let mut sock = BufReader::new(sock);
loop {
let packet = Packet::read(&mut sock)?;
debug!("{conn} <- {packet:?}");
if matches!(packet, Packet::Disconnect) {
return Ok(()); // not sending because generic disconnect sends it too
} else {
tx.send((conn, packet)).unwrap();
}
}
}
fn handle_conn_write(sock: TcpStream, rx: Receiver>>) -> Result<()> {
let mut sock = BufWriter::new(sock);
for packet in rx {
sock.write_all(&packet)?;
sock.flush()?;
}
Ok(())
}
impl ServerNetwork {
pub fn br(&self) -> Broadcaster<'_> {
Broadcaster(self)
}
}
pub struct Broadcaster<'a>(&'a ServerNetwork);
impl PacketSink for Broadcaster<'_> {
fn push(&mut self, p: Packet) {
self.0.broadcast(p, true);
}
}