use anyhow::Result; use futures_util::{SinkExt, StreamExt}; use log::{debug, info, warn}; use std::{fs::File, sync::Arc, time::Duration}; use tokio::{ io::{AsyncBufReadExt, AsyncWriteExt, BufReader}, net::TcpListener, spawn, sync::{broadcast, RwLock}, time::sleep, }; use tokio_tungstenite::tungstenite::Message; use undercooked::{ customer::customer, data::build_gamedata, game::Game, protocol::{PacketC, PacketS}, }; #[tokio::main] async fn main() -> Result<()> { env_logger::init_from_env("LOG"); let raw_listener = TcpListener::bind("0.0.0.0:27031").await?; let ws_listener = TcpListener::bind("0.0.0.0:27032").await?; info!( "listening for line-based tcp on {}", raw_listener.local_addr()? ); info!("listening for websockets on {}", ws_listener.local_addr()?); let data = build_gamedata( serde_yaml::from_reader(File::open("data/recipes.yaml").unwrap()).unwrap(), serde_yaml::from_reader(File::open("data/map.yaml").unwrap()).unwrap(), serde_yaml::from_reader(File::open("data/demands.yaml").unwrap()).unwrap(), ); let game = Arc::new(RwLock::new(Game::new(data.into()))); let (tx, rx) = broadcast::channel::(1024); { let game = game.clone(); spawn(async move { let dt = 1. / 25.; loop { { let mut g = game.write().await; g.tick(dt); while let Some(p) = g.packet_out() { debug!("-> {p:?}"); let _ = tx.send(p); } } sleep(Duration::from_secs_f32(dt)).await; } }); } spawn(customer(game.clone(), rx.resubscribe())); for id in 1.. { tokio::select! { r = raw_listener.accept() => { let (sock, addr) = r?; let (read, mut write) = sock.into_split(); let game = game.clone(); let mut rx = rx.resubscribe(); info!("{addr} connected"); let init = game.write().await.prime_client(id); spawn(async move { for p in init { write .write_all(serde_json::to_string(&p).unwrap().as_bytes()) .await?; write.write_all(b"\n").await?; } while let Ok(packet) = rx.recv().await { write .write_all(serde_json::to_string(&packet).unwrap().as_bytes()) .await?; write.write_all(b"\n").await?; } Ok::<_, anyhow::Error>(()) }); spawn(async move { let mut read = BufReader::new(read).lines(); while let Ok(Some(line)) = read.next_line().await { let Ok(packet): Result = serde_json::from_str(&line) else { warn!("invalid json over tcp"); break }; debug!("<- {id} {packet:?}"); if let Err(e) = game.write().await.packet_in(id, packet) { warn!("client error: {e}"); } } let _ = game.write().await.packet_in(id, PacketS::Leave); }); } r = ws_listener.accept() => { let (sock, addr) = r?; let Ok(sock) = tokio_tungstenite::accept_async(sock).await else { warn!("invalid ws handshake"); continue }; let (mut write, mut read) = sock.split(); let game = game.clone(); let mut rx = rx.resubscribe(); info!("{addr} connected via ws"); let init = game.write().await.prime_client(id); spawn(async move { for p in init { if let Err(e) = write.send(tokio_tungstenite::tungstenite::Message::Text( serde_json::to_string(&p).unwrap(), )).await { warn!("ws error on init: {e}"); return; } } while let Ok(packet) = rx.recv().await { if let Err(e) = write.send(tokio_tungstenite::tungstenite::Message::Text( serde_json::to_string(&packet).unwrap(), )).await { warn!("ws error: {e}"); break; } } }); spawn(async move { while let Some(Ok(message)) = read.next().await { match message { Message::Text(line) => { let Ok(packet): Result = serde_json::from_str(&line) else { warn!("invalid json over ws"); break }; debug!("<- {id} {packet:?}"); if let Err(e) = game.write().await.packet_in(id, packet) { warn!("client error: {e}"); } }, Message::Close(_) => break, _ => (), } } let _ = game.write().await.packet_in(id, PacketS::Leave); }); } } } Ok(()) }