summaryrefslogtreecommitdiff
path: root/server
diff options
context:
space:
mode:
authormetamuffin <metamuffin@disroot.org>2025-01-26 21:07:11 +0100
committermetamuffin <metamuffin@disroot.org>2025-01-26 21:07:11 +0100
commitf0b213d51ee7b44362b5618bf5bd1eacf50d2bb8 (patch)
tree57d2ed752f0839c37a8e141413bdcf6b3d9dece5 /server
parentf195ae4a1059270f14163b62e7860c3630ee5895 (diff)
downloadweareserver-f0b213d51ee7b44362b5618bf5bd1eacf50d2bb8.tar
weareserver-f0b213d51ee7b44362b5618bf5bd1eacf50d2bb8.tar.bz2
weareserver-f0b213d51ee7b44362b5618bf5bd1eacf50d2bb8.tar.zst
refactor network
Diffstat (limited to 'server')
-rw-r--r--server/src/network.rs221
1 files changed, 113 insertions, 108 deletions
diff --git a/server/src/network.rs b/server/src/network.rs
index e632e97..c59b54b 100644
--- a/server/src/network.rs
+++ b/server/src/network.rs
@@ -29,8 +29,9 @@ use std::{
};
use weareshared::{helper::ReadWrite, packets::Packet, tree::PacketSink};
+type Conns = Arc<Mutex<HashMap<u128, (SyncSender<Arc<Vec<u8>>>, Option<SocketAddr>)>>>;
pub struct ServerNetwork {
- conns: Arc<Mutex<HashMap<u128, (SyncSender<Arc<Vec<u8>>>, Option<SocketAddr>)>>>,
+ conns: Conns,
udp_sock: Arc<UdpSocket>,
pub recv: Receiver<(u128, Packet)>,
}
@@ -51,113 +52,8 @@ impl ServerNetwork {
};
let recv_tx2 = recv_tx.clone();
let conns2 = conns.clone();
- spawn(move || {
- info!("tcp listener thread started");
- loop {
- let (mut sock, addr) = match tcp_listener.accept() {
- Ok(x) => x,
- Err(e) => {
- warn!("cannot accept tcp: {e}");
- continue;
- }
- };
- info!("TCP connected {addr}");
-
- let conns = conns.clone();
- let recv_tx = recv_tx.clone();
- spawn(move || {
- let conn = match Packet::read(&mut sock) {
- Ok(Packet::Connect(x)) => x,
- Ok(_) => {
- warn!("client send non-connect packet first");
- return;
- }
- Err(e) => {
- warn!("client handshake failed: {e}");
- return;
- }
- };
- let (send_tx, send_rx) = sync_channel(128);
- conns.lock().unwrap().insert(conn, (send_tx, None));
- recv_tx.send((conn, Packet::Connect(conn))).unwrap();
-
- let sock2 = match sock.try_clone() {
- Ok(x) => x,
- Err(e) => {
- error!("cannot duplicate client socket: {e}");
- return;
- }
- };
- spawn(move || {
- if let Err(e) = handle_conn_write(sock2, send_rx) {
- warn!("client outbound error: {e}");
- }
- });
- if let Err(e) = handle_conn_read(conn, sock, &recv_tx) {
- warn!("client inbound error: {e}");
- }
- recv_tx.send((conn, Packet::Disconnect)).unwrap();
- conns.lock().unwrap().remove(&conn);
- });
- }
- });
- spawn(move || {
- info!("udp listener thread started");
- let mut buf = [0u8; 8096];
- struct CState {
- last_ack: Instant,
- conn_id: u128,
- }
- let mut cstates = HashMap::new();
- let mut last_check = Instant::now();
- loop {
- let (size, addr) = match udp_sock.recv_from(&mut buf) {
- Err(e) => {
- warn!("udp recv failed: {e}");
- continue;
- }
- Ok(s) => s,
- };
-
- let mut packet = &buf[..size];
- let packet = match Packet::read(&mut packet) {
- Ok(p) => p,
- Err(e) => {
- warn!("invalid packet from {e}");
- continue;
- }
- };
- if let Packet::Connect(id) = &packet {
- if let Some((_send, udp_addr)) = conns2.lock().unwrap().get_mut(id) {
- *udp_addr = Some(addr);
- cstates.insert(addr, CState {
- conn_id: *id,
- last_ack: Instant::now(),
- });
- }
- } else if let Some(conn) = cstates.get(&addr) {
- let cid = conn.conn_id;
- if let Packet::Disconnect = &packet {
- cstates.remove(&addr);
- }
- debug!("{cid} <~ {packet:?}");
- recv_tx2.send((cid, packet)).unwrap();
- }
-
- if last_check.elapsed().as_secs() > 10 {
- cstates.retain(|addr, state| {
- if state.last_ack.elapsed().as_secs() > 30 {
- warn!("client dropped: {addr}");
- recv_tx2.send((state.conn_id, Packet::Disconnect)).unwrap();
- false
- } else {
- true
- }
- });
- last_check = Instant::now();
- }
- }
- });
+ spawn(move || serve_tcp(tcp_listener, conns, recv_tx));
+ spawn(move || serve_udp(udp_sock, conns2, recv_tx2));
Ok(s)
}
pub fn broadcast(&self, packet: Packet, reliable: bool) {
@@ -193,6 +89,115 @@ impl ServerNetwork {
}
}
+fn serve_udp(udp_sock: Arc<UdpSocket>, conns: Conns, recv_tx: Sender<(u128, Packet)>) {
+ info!("udp listener thread started");
+ let mut buf = [0u8; 8096];
+ struct CState {
+ last_ack: Instant,
+ conn_id: u128,
+ }
+ let mut cstates = HashMap::new();
+ let mut last_check = Instant::now();
+ loop {
+ let (size, addr) = match udp_sock.recv_from(&mut buf) {
+ Err(e) => {
+ warn!("udp recv failed: {e}");
+ continue;
+ }
+ Ok(s) => s,
+ };
+
+ let mut packet = &buf[..size];
+ let packet = match Packet::read(&mut packet) {
+ Ok(p) => p,
+ Err(e) => {
+ warn!("invalid packet from {e}");
+ continue;
+ }
+ };
+ if let Packet::Connect(id) = &packet {
+ if let Some((_send, udp_addr)) = conns.lock().unwrap().get_mut(id) {
+ *udp_addr = Some(addr);
+ cstates.insert(addr, CState {
+ conn_id: *id,
+ last_ack: Instant::now(),
+ });
+ }
+ } else if let Some(conn) = cstates.get(&addr) {
+ let cid = conn.conn_id;
+ if let Packet::Disconnect = &packet {
+ cstates.remove(&addr);
+ }
+ debug!("{cid} <~ {packet:?}");
+ recv_tx.send((cid, packet)).unwrap();
+ }
+
+ if last_check.elapsed().as_secs() > 10 {
+ cstates.retain(|addr, state| {
+ if state.last_ack.elapsed().as_secs() > 30 {
+ warn!("client dropped: {addr}");
+ recv_tx.send((state.conn_id, Packet::Disconnect)).unwrap();
+ false
+ } else {
+ true
+ }
+ });
+ last_check = Instant::now();
+ }
+ }
+}
+
+fn serve_tcp(tcp_listener: TcpListener, conns: Conns, recv_tx: Sender<(u128, Packet)>) {
+ info!("tcp listener thread started");
+ loop {
+ let (mut sock, addr) = match tcp_listener.accept() {
+ Ok(x) => x,
+ Err(e) => {
+ warn!("cannot accept tcp: {e}");
+ continue;
+ }
+ };
+ info!("TCP connected {addr}");
+
+ let conns = conns.clone();
+ let recv_tx = recv_tx.clone();
+ spawn(move || {
+ let conn = match Packet::read(&mut sock) {
+ Ok(Packet::Connect(x)) => x,
+ Ok(_) => {
+ warn!("client send non-connect packet first");
+ return;
+ }
+ Err(e) => {
+ warn!("client handshake failed: {e}");
+ return;
+ }
+ };
+ let (send_tx, send_rx) = sync_channel(128);
+ conns.lock().unwrap().insert(conn, (send_tx, None));
+ recv_tx.send((conn, Packet::Connect(conn))).unwrap();
+
+ let sock2 = match sock.try_clone() {
+ Ok(x) => x,
+ Err(e) => {
+ error!("cannot duplicate client socket: {e}");
+ return;
+ }
+ };
+ spawn(move || {
+ if let Err(e) = handle_conn_write(sock2, send_rx) {
+ warn!("client outbound error: {e}");
+ }
+ });
+ if let Err(e) = handle_conn_read(conn, sock, &recv_tx) {
+ warn!("client inbound error: {e}");
+ }
+ recv_tx.send((conn, Packet::Disconnect)).unwrap();
+ conns.lock().unwrap().remove(&conn);
+ });
+ }
+}
+
fn handle_conn_read(conn: u128, sock: TcpStream, tx: &Sender<(u128, Packet)>) -> Result<()> {
let mut sock = BufReader::new(sock);
loop {