diff options
Diffstat (limited to 'src/server.rs')
-rw-r--r-- | src/server.rs | 228 |
1 files changed, 228 insertions, 0 deletions
diff --git a/src/server.rs b/src/server.rs new file mode 100644 index 0000000..30ed0c8 --- /dev/null +++ b/src/server.rs @@ -0,0 +1,228 @@ +use crate::{Config, PeerConfig, Serial}; +use anyhow::{anyhow, bail}; +use log::{error, info, warn}; +use serde::{Deserialize, Serialize}; +use std::{ + fs::{create_dir_all, read_to_string, remove_file, File}, + io::{copy, BufRead, BufReader, BufWriter, Read, Write}, + net::{SocketAddr, TcpListener, TcpStream}, + os::unix::fs::MetadataExt, + sync::{Arc, Mutex}, + thread::spawn, + time::SystemTime, +}; + +pub fn server(config: Arc<Config>) -> anyhow::Result<!> { + let listener = TcpListener::bind(&config.server.address)?; + info!("listening on {}", listener.local_addr()?); + loop { + let Ok((sock, addr)) = listener.accept() else { + error!("could not accept connection"); + continue; + }; + info!("connection from {addr}"); + let config = config.clone(); + spawn( + move || match handle_connection_wrapper(config, sock, addr) { + Ok(()) => (), + Err(err) => { + warn!("client error {addr}: {err:?}") + } + }, + ); + } +} + +fn handle_connection_wrapper( + config: Arc<Config>, + sock: TcpStream, + addr: SocketAddr, +) -> anyhow::Result<()> { + let mut rsock = BufReader::new(sock.try_clone()?); + let mut wsock = BufWriter::new(sock); + match handle_connection(config, &mut rsock, &mut wsock, addr) { + Ok(()) => Ok(()), + Err(e) => { + writeln!(wsock, "error,{e}")?; + wsock.flush()?; + Err(e) + } + } +} +fn handle_connection( + config: Arc<Config>, + rsock: &mut BufReader<TcpStream>, + wsock: &mut BufWriter<TcpStream>, + addr: SocketAddr, +) -> anyhow::Result<()> { + let mut line = String::new(); + + rsock.by_ref().take(4096).read_line(&mut line)?; + + let provided_secret = line.trim(); + eprintln!("{:?}", provided_secret); + let Some(peer) = config + .peer + .iter() + .find(|p| p.shared_secret == provided_secret) + else { + bail!("invalid secret"); + }; + + let peerdir = config.storage.root.join(&peer.name); + create_dir_all(&peerdir)?; + + loop { + line.clear(); + rsock.by_ref().take(4096).read_line(&mut line)?; + let mut toks = line.trim().split(","); + let command = toks.next().ok_or(anyhow!("command missing"))?; + eprintln!("{command:?}"); + match command { + "quit" => break Ok(()), + "list" => { + let mut dir = peerdir + .read_dir()? + .map(|e| { + let e = e?; + Ok::<_, anyhow::Error>(( + e.metadata()?.mtime(), + e.metadata()?.size(), + e.file_name() + .to_str() + .unwrap() + .to_string() + .parse::<Serial>()?, + )) + }) + .try_collect::<Vec<_>>()?; + dir.sort_by_key(|(_, _, a)| *a); + + for (m, s, p) in dir { + writeln!(wsock, "{m}:{s}:{p}")?; + } + writeln!(wsock)?; + wsock.flush()?; + } + "upload" => { + let size = toks.next().ok_or(anyhow!("size missing"))?.parse::<u64>()?; + if size > config.storage.size { + bail!("maximum size exceeded") + } + + let serial = transact_user_state(&config, peer, |s| { + if s.last_upload.elapsed().unwrap().as_secs() > config.storage.upload_cooldown { + s.last_upload = SystemTime::now(); + s.serial += 1; + Some(s.serial) + } else { + None + } + })? + .ok_or(anyhow!("upload cooldown"))?; + writeln!(wsock, "ready")?; + wsock.flush()?; + + while peerdir.read_dir()?.fold(0, |a, _| a + 1) > config.storage.versions { + let mut dir = peerdir + .read_dir()? + .map(|e| { + let e = e?; + Ok::<_, anyhow::Error>( + e.file_name() + .to_str() + .unwrap() + .to_string() + .parse::<Serial>()?, + ) + }) + .try_collect::<Vec<_>>()?; + dir.sort(); + let rem = dir[0]; + info!("removing serial={rem}"); + remove_file(peerdir.join(rem.to_string()))?; + } + + info!("upload from {addr} size={size}"); + let mut upload = rsock.get_ref().take(size); + let mut target = + BufWriter::new(File::create_new(peerdir.join(serial.to_string()))?); + + copy(&mut upload, &mut target)?; // TODO speed limit + + info!("done {addr}"); + writeln!(wsock, "done")?; + wsock.flush()?; + } + "download" => { + let serial = toks + .next() + .ok_or(anyhow!("serial missing"))? + .parse::<Serial>()?; + let ok = transact_user_state(&config, peer, |s| { + if s.last_download.elapsed().unwrap().as_secs() + > config.storage.download_cooldown + { + s.last_download = SystemTime::now(); + true + } else { + false + } + })?; + if !ok { + bail!("download cooldown") + } + + let source = File::open(peerdir.join(serial.to_string()))?; + let size = source.metadata()?.size(); + let mut source = BufReader::new(source); + + writeln!(wsock, "ready,{size}")?; + wsock.flush()?; + copy(&mut source, wsock)?; // TODO speed limit + + writeln!(wsock, "done")?; + wsock.flush()?; + } + _ => bail!("unknown command"), + } + } +} + +#[derive(Serialize, Deserialize)] +struct PeerState { + last_upload: SystemTime, + last_download: SystemTime, + serial: Serial, +} + +static USER_DB: Mutex<()> = Mutex::new(()); +fn transact_user_state<T>( + config: &Config, + peer: &PeerConfig, + update: impl FnOnce(&mut PeerState) -> T, +) -> anyhow::Result<T> { + let _g = USER_DB + .lock() + .map_err(|_| anyhow!("user database locked"))?; + + let conf_path = config.storage.root.join(&peer.name).with_extension("db"); + + let mut state = read_to_string(&conf_path) + .map(|s| toml::from_str(&s)) + .unwrap_or_else(|_| { + Ok(PeerState { + last_download: SystemTime::UNIX_EPOCH, + last_upload: SystemTime::UNIX_EPOCH, + serial: 0, + }) + })?; + + let res = update(&mut state); + let ser_state = toml::to_string_pretty(&state)?; + + File::create(conf_path)?.write_all(ser_state.as_bytes())?; + + drop(_g); + Ok(res) +} |