aboutsummaryrefslogtreecommitdiff
path: root/src/server.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/server.rs')
-rw-r--r--src/server.rs228
1 files changed, 228 insertions, 0 deletions
diff --git a/src/server.rs b/src/server.rs
new file mode 100644
index 0000000..30ed0c8
--- /dev/null
+++ b/src/server.rs
@@ -0,0 +1,228 @@
+use crate::{Config, PeerConfig, Serial};
+use anyhow::{anyhow, bail};
+use log::{error, info, warn};
+use serde::{Deserialize, Serialize};
+use std::{
+ fs::{create_dir_all, read_to_string, remove_file, File},
+ io::{copy, BufRead, BufReader, BufWriter, Read, Write},
+ net::{SocketAddr, TcpListener, TcpStream},
+ os::unix::fs::MetadataExt,
+ sync::{Arc, Mutex},
+ thread::spawn,
+ time::SystemTime,
+};
+
+pub fn server(config: Arc<Config>) -> anyhow::Result<!> {
+ let listener = TcpListener::bind(&config.server.address)?;
+ info!("listening on {}", listener.local_addr()?);
+ loop {
+ let Ok((sock, addr)) = listener.accept() else {
+ error!("could not accept connection");
+ continue;
+ };
+ info!("connection from {addr}");
+ let config = config.clone();
+ spawn(
+ move || match handle_connection_wrapper(config, sock, addr) {
+ Ok(()) => (),
+ Err(err) => {
+ warn!("client error {addr}: {err:?}")
+ }
+ },
+ );
+ }
+}
+
+fn handle_connection_wrapper(
+ config: Arc<Config>,
+ sock: TcpStream,
+ addr: SocketAddr,
+) -> anyhow::Result<()> {
+ let mut rsock = BufReader::new(sock.try_clone()?);
+ let mut wsock = BufWriter::new(sock);
+ match handle_connection(config, &mut rsock, &mut wsock, addr) {
+ Ok(()) => Ok(()),
+ Err(e) => {
+ writeln!(wsock, "error,{e}")?;
+ wsock.flush()?;
+ Err(e)
+ }
+ }
+}
+fn handle_connection(
+ config: Arc<Config>,
+ rsock: &mut BufReader<TcpStream>,
+ wsock: &mut BufWriter<TcpStream>,
+ addr: SocketAddr,
+) -> anyhow::Result<()> {
+ let mut line = String::new();
+
+ rsock.by_ref().take(4096).read_line(&mut line)?;
+
+ let provided_secret = line.trim();
+ eprintln!("{:?}", provided_secret);
+ let Some(peer) = config
+ .peer
+ .iter()
+ .find(|p| p.shared_secret == provided_secret)
+ else {
+ bail!("invalid secret");
+ };
+
+ let peerdir = config.storage.root.join(&peer.name);
+ create_dir_all(&peerdir)?;
+
+ loop {
+ line.clear();
+ rsock.by_ref().take(4096).read_line(&mut line)?;
+ let mut toks = line.trim().split(",");
+ let command = toks.next().ok_or(anyhow!("command missing"))?;
+ eprintln!("{command:?}");
+ match command {
+ "quit" => break Ok(()),
+ "list" => {
+ let mut dir = peerdir
+ .read_dir()?
+ .map(|e| {
+ let e = e?;
+ Ok::<_, anyhow::Error>((
+ e.metadata()?.mtime(),
+ e.metadata()?.size(),
+ e.file_name()
+ .to_str()
+ .unwrap()
+ .to_string()
+ .parse::<Serial>()?,
+ ))
+ })
+ .try_collect::<Vec<_>>()?;
+ dir.sort_by_key(|(_, _, a)| *a);
+
+ for (m, s, p) in dir {
+ writeln!(wsock, "{m}:{s}:{p}")?;
+ }
+ writeln!(wsock)?;
+ wsock.flush()?;
+ }
+ "upload" => {
+ let size = toks.next().ok_or(anyhow!("size missing"))?.parse::<u64>()?;
+ if size > config.storage.size {
+ bail!("maximum size exceeded")
+ }
+
+ let serial = transact_user_state(&config, peer, |s| {
+ if s.last_upload.elapsed().unwrap().as_secs() > config.storage.upload_cooldown {
+ s.last_upload = SystemTime::now();
+ s.serial += 1;
+ Some(s.serial)
+ } else {
+ None
+ }
+ })?
+ .ok_or(anyhow!("upload cooldown"))?;
+ writeln!(wsock, "ready")?;
+ wsock.flush()?;
+
+ while peerdir.read_dir()?.fold(0, |a, _| a + 1) > config.storage.versions {
+ let mut dir = peerdir
+ .read_dir()?
+ .map(|e| {
+ let e = e?;
+ Ok::<_, anyhow::Error>(
+ e.file_name()
+ .to_str()
+ .unwrap()
+ .to_string()
+ .parse::<Serial>()?,
+ )
+ })
+ .try_collect::<Vec<_>>()?;
+ dir.sort();
+ let rem = dir[0];
+ info!("removing serial={rem}");
+ remove_file(peerdir.join(rem.to_string()))?;
+ }
+
+ info!("upload from {addr} size={size}");
+ let mut upload = rsock.get_ref().take(size);
+ let mut target =
+ BufWriter::new(File::create_new(peerdir.join(serial.to_string()))?);
+
+ copy(&mut upload, &mut target)?; // TODO speed limit
+
+ info!("done {addr}");
+ writeln!(wsock, "done")?;
+ wsock.flush()?;
+ }
+ "download" => {
+ let serial = toks
+ .next()
+ .ok_or(anyhow!("serial missing"))?
+ .parse::<Serial>()?;
+ let ok = transact_user_state(&config, peer, |s| {
+ if s.last_download.elapsed().unwrap().as_secs()
+ > config.storage.download_cooldown
+ {
+ s.last_download = SystemTime::now();
+ true
+ } else {
+ false
+ }
+ })?;
+ if !ok {
+ bail!("download cooldown")
+ }
+
+ let source = File::open(peerdir.join(serial.to_string()))?;
+ let size = source.metadata()?.size();
+ let mut source = BufReader::new(source);
+
+ writeln!(wsock, "ready,{size}")?;
+ wsock.flush()?;
+ copy(&mut source, wsock)?; // TODO speed limit
+
+ writeln!(wsock, "done")?;
+ wsock.flush()?;
+ }
+ _ => bail!("unknown command"),
+ }
+ }
+}
+
+#[derive(Serialize, Deserialize)]
+struct PeerState {
+ last_upload: SystemTime,
+ last_download: SystemTime,
+ serial: Serial,
+}
+
+static USER_DB: Mutex<()> = Mutex::new(());
+fn transact_user_state<T>(
+ config: &Config,
+ peer: &PeerConfig,
+ update: impl FnOnce(&mut PeerState) -> T,
+) -> anyhow::Result<T> {
+ let _g = USER_DB
+ .lock()
+ .map_err(|_| anyhow!("user database locked"))?;
+
+ let conf_path = config.storage.root.join(&peer.name).with_extension("db");
+
+ let mut state = read_to_string(&conf_path)
+ .map(|s| toml::from_str(&s))
+ .unwrap_or_else(|_| {
+ Ok(PeerState {
+ last_download: SystemTime::UNIX_EPOCH,
+ last_upload: SystemTime::UNIX_EPOCH,
+ serial: 0,
+ })
+ })?;
+
+ let res = update(&mut state);
+ let ser_state = toml::to_string_pretty(&state)?;
+
+ File::create(conf_path)?.write_all(ser_state.as_bytes())?;
+
+ drop(_g);
+ Ok(res)
+}