use crate::{Config, PeerConfig, Serial}; use anyhow::{anyhow, bail}; use log::{debug, error, info, warn}; use serde::{Deserialize, Serialize}; use std::{ fs::{create_dir_all, read_to_string, remove_file, File}, io::{copy, BufRead, BufReader, BufWriter, Read, Write}, net::{SocketAddr, TcpListener, TcpStream}, os::unix::fs::MetadataExt, sync::{Arc, Mutex}, thread::spawn, time::SystemTime, }; pub fn server(config: Arc) -> anyhow::Result { let listener = TcpListener::bind(&config.server.address)?; info!("listening on {}", listener.local_addr()?); loop { let Ok((sock, addr)) = listener.accept() else { error!("could not accept connection"); continue; }; info!("connection from {addr}"); let config = config.clone(); spawn( move || match handle_connection_wrapper(config, sock, addr) { Ok(()) => { info!("connection closed gracefully") } Err(err) => { warn!("client error {addr}: {err:?}") } }, ); } } fn handle_connection_wrapper( config: Arc, sock: TcpStream, addr: SocketAddr, ) -> anyhow::Result<()> { let mut rsock = BufReader::new(sock.try_clone()?); let mut wsock = BufWriter::new(sock); match handle_connection(config, &mut rsock, &mut wsock, addr) { Ok(()) => Ok(()), Err(e) => { writeln!(wsock, "error,{e}")?; wsock.flush()?; Err(e) } } } fn handle_connection( config: Arc, rsock: &mut BufReader, wsock: &mut BufWriter, addr: SocketAddr, ) -> anyhow::Result<()> { let mut line = String::new(); rsock.by_ref().take(4096).read_line(&mut line)?; let provided_secret = line.trim(); let Some(peer) = config .peer .iter() .find(|p| p.shared_secret == provided_secret) else { bail!("invalid secret"); }; let peerdir = config.storage.root.join(&peer.name); create_dir_all(&peerdir)?; loop { line.clear(); rsock.by_ref().take(4096).read_line(&mut line)?; let mut toks = line.trim().split(","); let command = toks.next().ok_or(anyhow!("command missing"))?; debug!("command {command:?} issued"); match command { "quit" => break Ok(()), "list" => { let mut dir = peerdir .read_dir()? .map(|e| { let e = e?; Ok::<_, anyhow::Error>(( e.metadata()?.mtime(), e.metadata()?.size(), e.file_name() .to_str() .unwrap() .to_string() .parse::()?, )) }) .try_collect::>()?; dir.sort_by_key(|(_, _, a)| *a); for (m, s, p) in dir { writeln!(wsock, "{m}:{s}:{p}")?; } writeln!(wsock)?; wsock.flush()?; } "upload" => { let size = toks.next().ok_or(anyhow!("size missing"))?.parse::()?; if size > config.storage.size { bail!("maximum size exceeded") } let serial = transact_user_state(&config, peer, |s| { if s.last_upload.elapsed().unwrap().as_secs() > config.storage.upload_cooldown { s.last_upload = SystemTime::now(); s.serial += 1; Some(s.serial) } else { None } })? .ok_or(anyhow!("upload cooldown"))?; writeln!(wsock, "ready")?; wsock.flush()?; while peerdir.read_dir()?.fold(0, |a, _| a + 1) >= config.storage.versions { let mut dir = peerdir .read_dir()? .map(|e| { let e = e?; Ok::<_, anyhow::Error>( e.file_name() .to_str() .unwrap() .to_string() .parse::()?, ) }) .try_collect::>()?; dir.sort(); let rem = dir[0]; info!("removing serial={rem}"); remove_file(peerdir.join(rem.to_string()))?; } info!("upload from {addr} size={size} serial={serial}"); let mut upload = rsock.get_ref().take(size); let mut target = BufWriter::new(File::create_new(peerdir.join(serial.to_string()))?); copy(&mut upload, &mut target)?; // TODO speed limit info!("done {addr}"); writeln!(wsock, "done")?; wsock.flush()?; } "download" => { let serial = toks .next() .ok_or(anyhow!("serial missing"))? .parse::()?; let ok = transact_user_state(&config, peer, |s| { if s.last_download.elapsed().unwrap().as_secs() > config.storage.download_cooldown { s.last_download = SystemTime::now(); true } else { false } })?; if !ok { bail!("download cooldown") } let source = File::open(peerdir.join(serial.to_string()))?; let size = source.metadata()?.size(); let mut source = BufReader::new(source); info!("download for {addr} size={size} serial={serial}"); writeln!(wsock, "ready,{size}")?; wsock.flush()?; copy(&mut source, wsock)?; // TODO speed limit writeln!(wsock, "done")?; wsock.flush()?; } _ => bail!("unknown command"), } } } #[derive(Serialize, Deserialize)] struct PeerState { last_upload: SystemTime, last_download: SystemTime, serial: Serial, } static USER_DB: Mutex<()> = Mutex::new(()); fn transact_user_state( config: &Config, peer: &PeerConfig, update: impl FnOnce(&mut PeerState) -> T, ) -> anyhow::Result { let _g = USER_DB .lock() .map_err(|_| anyhow!("user database locked"))?; let conf_path = config.storage.root.join(&peer.name).with_extension("db"); let mut state = read_to_string(&conf_path) .map(|s| toml::from_str(&s)) .unwrap_or_else(|_| { Ok(PeerState { last_download: SystemTime::UNIX_EPOCH, last_upload: SystemTime::UNIX_EPOCH, serial: 0, }) })?; let res = update(&mut state); let ser_state = toml::to_string_pretty(&state)?; File::create(conf_path)?.write_all(ser_state.as_bytes())?; drop(_g); Ok(res) }