diff options
-rw-r--r-- | client/src/lib.rs | 128 | ||||
-rw-r--r-- | common/src/lib.rs | 4 | ||||
-rw-r--r-- | server/src/federation.rs | 55 | ||||
-rw-r--r-- | server/src/import.rs | 86 | ||||
-rw-r--r-- | server/src/main.rs | 13 | ||||
-rw-r--r-- | server/src/routes/api/mod.rs | 5 | ||||
-rw-r--r-- | server/src/routes/mod.rs | 9 | ||||
-rw-r--r-- | server/src/routes/stream.rs | 29 | ||||
-rw-r--r-- | server/src/routes/ui/account/admin.rs | 4 | ||||
-rw-r--r-- | server/src/routes/ui/error.rs | 7 |
10 files changed, 244 insertions, 96 deletions
diff --git a/client/src/lib.rs b/client/src/lib.rs index a4d0b01..4b111d1 100644 --- a/client/src/lib.rs +++ b/client/src/lib.rs @@ -1,14 +1,22 @@ -use reqwest::Client; +use anyhow::Result; +use jellycommon::Node; +use reqwest::{ + header::{HeaderMap, HeaderValue}, + Client, +}; use serde_json::json; use std::time::Duration; +#[derive(Debug, Clone)] pub struct Instance { pub host: String, pub tls: bool, } -pub struct Session(pub String); impl Instance { + pub fn new(host: String, tls: bool) -> Self { + Self { host, tls } + } pub fn base(&self) -> String { format!( "{}://{}", @@ -16,55 +24,83 @@ impl Instance { self.host ) } -} -impl Session { - pub fn session_param(&self) -> String { - format!("session={}", self.0) + pub async fn login( + self, + username: String, + password: String, + expire: Duration, + ) -> anyhow::Result<Session> { + let session_token = Client::builder() + .build()? + .post(format!("{}/api/create_session", self.base())) + .json(&json!({ + "expire": expire.as_secs(), + "password": password, + "username": username, + })) + .send() + .await? + .json() + .await?; + + let mut headers = HeaderMap::new(); + headers.insert( + "Cookie", + HeaderValue::from_str(&format!("session={session_token}")).unwrap(), + ); + headers.insert("Accept", HeaderValue::from_static("application/json")); + + Ok(Session { + instance: self, + session_token, + client: Client::builder().default_headers(headers).build().unwrap(), + }) } } -pub fn stream( - instance: &Instance, - session: &Session, - id: &str, - tracks: &[usize], - webm: bool, -) -> String { - format!( - "{}/n/{}/stream?tracks={}&webm={}&{}", - instance.base(), - id, - tracks - .iter() - .map(|v| format!("{v}")) - .collect::<Vec<_>>() - .join(","), - if webm { "1" } else { "0" }, - session.session_param() - ) +pub struct Session { + client: Client, + instance: Instance, + session_token: String, } -pub async fn login( - instance: &Instance, - username: String, - password: String, - expire: Duration, -) -> anyhow::Result<Session> { - let p = serde_json::to_string(&json!({ - "expire": expire.as_secs(), - "password": password, - "username": username, - })) - .unwrap(); +impl Session { + fn session_param(&self) -> String { + format!("session={}", self.session_token) + } + + pub async fn node(&self, id: &str) -> Result<Node> { + Ok(self + .client + .get(format!("{}/n/{id}", self.instance.base())) + .send() + .await? + .json() + .await?) + } - let r = Client::builder() - .build()? - .post(format!("{}/api/account/login", instance.base())) - .body(p) - .send() - .await? - .json() - .await?; + // pub async fn node_asset(&self, id: &str, role: AssetRole) -> Result<Node> { + // Ok(self + // .client + // .get(format!("/n/{id}")) + // .send() + // .await? + // .bytes() + // .await?) + // } - Ok(Session(r)) + pub fn stream(&self, id: &str, tracks: &[usize], webm: bool) -> String { + format!( + "{}/n/{}/stream?tracks={}&webm={}&{}", + self.instance.base(), + id, + tracks + .iter() + .map(|v| format!("{v}")) + .collect::<Vec<_>>() + .join(","), + if webm { "1" } else { "0" }, + self.session_param() + ) + } } diff --git a/common/src/lib.rs b/common/src/lib.rs index 34de946..6f6caf5 100644 --- a/common/src/lib.rs +++ b/common/src/lib.rs @@ -19,7 +19,7 @@ pub struct Node { #[rustfmt::skip] #[derive(Debug, Clone, Deserialize, Serialize)] pub struct NodePrivate { - #[serde(default)] pub import: Option<ImportOptions>, + #[serde(default)] pub import: Option<RemoteImportOptions>, #[serde(default)] pub poster: Option<PathBuf>, #[serde(default)] pub backdrop: Option<PathBuf>, #[serde(default)] pub source: Option<MediaSource>, @@ -40,7 +40,7 @@ pub struct NodePublic { #[rustfmt::skip] #[derive(Debug, Clone, Deserialize, Serialize)] -pub struct ImportOptions { +pub struct RemoteImportOptions { pub host: String, pub id: String, #[serde(default)] pub prefix: Option<String>, diff --git a/server/src/federation.rs b/server/src/federation.rs new file mode 100644 index 0000000..578261b --- /dev/null +++ b/server/src/federation.rs @@ -0,0 +1,55 @@ +use crate::CONF; +use anyhow::anyhow; +use jellyclient::{Instance, Session}; +use std::{collections::HashMap, sync::Arc, time::Duration}; +use tokio::sync::RwLock; + +pub struct Federation { + instances: HashMap<String, Instance>, + sessions: RwLock<HashMap<String, Arc<Session>>>, +} + +impl Federation { + pub fn initialize() -> Self { + let instances = CONF + .remote_credentials + .iter() + .map(|(k, (_, _, tls))| (k.to_owned(), Instance::new(k.to_owned(), *tls))) + .collect::<HashMap<_, _>>(); + + Self { + instances, + sessions: Default::default(), + } + } + + pub fn get_instance(&self, host: &String) -> anyhow::Result<&Instance> { + Ok(self + .instances + .get(host) + .ok_or(anyhow!("unknown instance"))?) + } + pub async fn get_session(&self, host: &String) -> anyhow::Result<Arc<Session>> { + let mut w = self.sessions.write().await; + if let Some(s) = w.get(host) { + Ok(s.to_owned()) + } else { + let (username, password, _) = CONF + .remote_credentials + .get(host) + .ok_or(anyhow!("no credentials of the remote server"))?; + let s = Arc::new( + self.get_instance(host)? + .to_owned() + .login( + username.to_owned(), + password.to_owned(), + Duration::from_secs(60 * 60 * 24 * 356), + ) + .await?, + ); + w.insert(host.to_owned(), s.clone()); + Ok(s) + } + } +} diff --git a/server/src/import.rs b/server/src/import.rs index b306332..a172ad9 100644 --- a/server/src/import.rs +++ b/server/src/import.rs @@ -3,37 +3,42 @@ which is licensed under the GNU Affero General Public License (version 3); see /COPYING. Copyright (C) 2023 metamuffin <metamuffin.org> */ -use crate::{database::Database, CONF}; -use anyhow::{bail, Context, Ok}; +use crate::{database::Database, federation::Federation, CONF}; +use anyhow::{anyhow, bail, Context, Ok}; use async_recursion::async_recursion; -use jellycommon::Node; -use log::info; +use jellycommon::{Node, RemoteImportOptions}; +use log::{error, info}; use std::{ffi::OsStr, fs::File, os::unix::prelude::OsStrExt, path::PathBuf, sync::LazyLock}; use tokio::sync::Semaphore; static IMPORT_SEM: LazyLock<Semaphore> = LazyLock::new(|| Semaphore::new(1)); -pub async fn import(db: &Database) -> anyhow::Result<()> { +pub async fn import(db: &Database, fed: &Federation) -> anyhow::Result<()> { info!("clearing node tree"); let permit = IMPORT_SEM.try_acquire()?; - { - db.node.clear()?; - info!("importing..."); - import_path(CONF.library_path.clone(), db, None) - .await - .context("indexing")?; - info!("import completed"); - } + db.node.clear()?; + info!("importing..."); + let (_, errs) = import_path(CONF.library_path.clone(), db, fed, None) + .await + .context("indexing")?; + info!("import completed"); drop(permit); - Ok(()) + if errs == 0 { + Ok(()) + } else { + Err(anyhow!( + "partial import, {errs} errors occured; see server log" + )) + } } #[async_recursion] pub async fn import_path( path: PathBuf, db: &Database, + fed: &Federation, parent: Option<String>, -) -> anyhow::Result<Vec<String>> { +) -> anyhow::Result<(Vec<String>, usize)> { if path.is_dir() { let mpath = path.join("directory.json"); let children_paths = path.read_dir()?.map(Result::unwrap).filter_map(|e| { @@ -47,8 +52,19 @@ pub async fn import_path( }); let identifier = path.file_name().unwrap().to_str().unwrap().to_string(); let mut children_ids = Vec::new(); + let mut errs = 0; for p in children_paths { - children_ids.extend(import_path(p, db, Some(identifier.clone())).await?) + let k = import_path(p, db, fed, Some(identifier.clone())).await; + match k { + core::result::Result::Ok((els, errs2)) => { + errs += errs2; + children_ids.extend(els) + } + Err(e) => { + errs += 1; + error!("import failed: {e:?}") + } + } } if mpath.exists() { let mut data: Node = @@ -56,14 +72,14 @@ pub async fn import_path( data.public.children = children_ids; data.public.parent = parent; - info!("insert {identifier}"); + info!("adding {identifier}"); db.node.insert(&identifier, &data)?; - Ok(vec![identifier]) + Ok((vec![identifier], errs)) } else { - Ok(children_ids) + Ok((children_ids, errs)) } } else if path.is_file() { - info!("loading item {path:?}"); + info!("loading {path:?}"); let datafile = File::open(path.clone()).context("cant load metadata")?; let mut data: Node = serde_json::from_reader(datafile).context("invalid metadata")?; let identifier = path @@ -75,11 +91,37 @@ pub async fn import_path( .unwrap() .to_string(); - info!("insert {identifier}"); + if let Some(io) = data.private.import.take() { + let title = data.public.title; + data = import_remote(io, db, fed) + .await + .context("federated import")?; + data.public.title = title; + } + + info!("adding {identifier}"); data.public.parent = parent; db.node.insert(&identifier, &data)?; - Ok(vec![identifier]) + Ok((vec![identifier], 0)) } else { bail!("did somebody really put a fifo or socket in the library?!") } } + +async fn import_remote( + opts: RemoteImportOptions, + db: &Database, + fed: &Federation, +) -> anyhow::Result<Node> { + let sess = fed + .get_session(&opts.host) + .await + .context("creating session")?; + let node = sess.node(&opts.id).await.context("fetching remote node")?; + + if !node.public.children.is_empty() { + todo!() + } + + Ok(node) +} diff --git a/server/src/main.rs b/server/src/main.rs index 5b2d070..6a75c30 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -6,13 +6,16 @@ #![feature(lazy_cell)] use database::Database; +use federation::Federation; use jellycommon::config::GlobalConfig; use jellyremuxer::RemuxerContext; +use log::error; use once_cell::sync::Lazy; use routes::build_rocket; use std::fs::File; pub mod database; +pub mod federation; pub mod import; pub mod routes; @@ -37,7 +40,13 @@ fn main() { async fn async_main() { let remuxer = RemuxerContext::new(); let database = Database::open(&CONF.database_path).unwrap(); - import::import(&database).await.unwrap(); + let federation = Federation::initialize(); database.create_admin(); - build_rocket(remuxer, database).launch().await.unwrap(); + if let Err(err) = import::import(&database, &federation).await { + error!("import not sucessful: {err:?}") + } + build_rocket(remuxer, database, federation) + .launch() + .await + .unwrap(); } diff --git a/server/src/routes/api/mod.rs b/server/src/routes/api/mod.rs index e74b134..cc87525 100644 --- a/server/src/routes/api/mod.rs +++ b/server/src/routes/api/mod.rs @@ -4,8 +4,6 @@ Copyright (C) 2023 metamuffin <metamuffin.org> */ -use std::ops::Deref; - use super::ui::{ account::{login_logic, LoginForm}, error::MyResult, @@ -22,6 +20,7 @@ use rocket::{ Request, State, }; use serde_json::{json, Value}; +use std::ops::Deref; #[get("/api")] pub fn r_api_root() -> Redirect { @@ -33,7 +32,7 @@ pub fn r_api_version() -> &'static str { "2" } -#[post("/api/account/login", data = "<data>")] +#[post("/api/create_session", data = "<data>")] pub fn r_api_account_login(database: &State<Database>, data: Json<LoginForm>) -> MyResult<Value> { let token = login_logic(database, &data.username, &data.password)?; Ok(json!(token)) diff --git a/server/src/routes/mod.rs b/server/src/routes/mod.rs index be4d2cb..42eae1b 100644 --- a/server/src/routes/mod.rs +++ b/server/src/routes/mod.rs @@ -3,7 +3,7 @@ which is licensed under the GNU Affero General Public License (version 3); see /COPYING. Copyright (C) 2023 metamuffin <metamuffin.org> */ -use crate::{database::Database, routes::ui::error::MyResult, CONF}; +use crate::{database::Database, federation::Federation, routes::ui::error::MyResult, CONF}; use api::{r_api_account_login, r_api_root, r_api_version}; use base64::Engine; use jellyremuxer::RemuxerContext; @@ -45,7 +45,11 @@ macro_rules! uri { }; } -pub fn build_rocket(remuxer: RemuxerContext, database: Database) -> Rocket<Build> { +pub fn build_rocket( + remuxer: RemuxerContext, + database: Database, + federation: Federation, +) -> Rocket<Build> { rocket::build() .configure(Config { secret_key: SecretKey::derive_from( @@ -61,6 +65,7 @@ pub fn build_rocket(remuxer: RemuxerContext, database: Database) -> Rocket<Build }) .manage(remuxer) .manage(database) + .manage(federation) .attach(AdHoc::on_response("set server header", |_req, res| { res.set_header(Header::new("server", "jellything")); Box::pin(async {}) diff --git a/server/src/routes/stream.rs b/server/src/routes/stream.rs index 1277646..ee7880c 100644 --- a/server/src/routes/stream.rs +++ b/server/src/routes/stream.rs @@ -4,7 +4,7 @@ Copyright (C) 2023 metamuffin <metamuffin.org> */ use super::ui::{account::session::Session, error::MyError}; -use crate::{database::Database, CONF}; +use crate::{database::Database, federation::Federation, CONF}; use anyhow::{anyhow, Result}; use jellycommon::MediaSource; use jellyremuxer::RemuxerContext; @@ -30,6 +30,7 @@ pub async fn r_stream( webm: Option<bool>, tracks: String, remuxer: &State<RemuxerContext>, + federation: &State<Federation>, db: &State<Database>, range: Option<RequestRange>, ) -> Result<Either<StreamResponse, Redirect>, MyError> { @@ -47,27 +48,21 @@ pub async fn r_stream( let source_tracks = match source { MediaSource::Local { tracks } => tracks, MediaSource::Remote { host, remote_id } => { - let (username, password, tls) = CONF + let (username, password, _) = CONF .remote_credentials .get(&host) .ok_or(anyhow!("no credentials on the server-side"))?; - let instance = jellyclient::Instance { host, tls: *tls }; - let session = jellyclient::login( - &instance, - username.to_owned(), - password.to_owned(), - Duration::from_secs(60), - ) - .await?; + let instance = federation.get_instance(&host)?.to_owned(); + let session = instance + .login( + username.to_owned(), + password.to_owned(), + Duration::from_secs(60), + ) + .await?; - let uri = jellyclient::stream( - &instance, - &session, - &remote_id, - &tracks, - webm.unwrap_or(false), - ); + let uri = session.stream(&remote_id, &tracks, webm.unwrap_or(false)); return Ok(Either::Right(Redirect::found(uri))); } }; diff --git a/server/src/routes/ui/account/admin.rs b/server/src/routes/ui/account/admin.rs index 7124f4a..d0ad433 100644 --- a/server/src/routes/ui/account/admin.rs +++ b/server/src/routes/ui/account/admin.rs @@ -5,6 +5,7 @@ */ use crate::{ database::Database, + federation::Federation, import::import, routes::ui::{ account::session::Session, @@ -135,11 +136,12 @@ pub fn r_account_admin_remove_invite( pub async fn r_account_admin_import( session: Session, database: &State<Database>, + federation: &State<Federation>, ) -> MyResult<DynLayoutPage<'static>> { if !session.user.admin { Err(anyhow!("you not admin"))? } - let r = import(&database).await; + let r = import(&database, &federation).await; admin_dashboard( &database, Some(r.map_err(|e| e.into()).map(|_| "Import successful".into())), diff --git a/server/src/routes/ui/error.rs b/server/src/routes/ui/error.rs index 190650f..b538a06 100644 --- a/server/src/routes/ui/error.rs +++ b/server/src/routes/ui/error.rs @@ -36,7 +36,6 @@ pub fn r_api_catch<'a>(status: Status, _request: &Request) -> Value { pub type MyResult<T> = Result<T, MyError>; -#[derive(Debug)] pub struct MyError(pub anyhow::Error); impl<'r> Responder<'r, 'static> for MyError { @@ -61,6 +60,12 @@ impl<'r> Responder<'r, 'static> for MyError { } } +impl std::fmt::Debug for MyError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_fmt(format_args!("{:?}", self.0)) + } +} + impl Display for MyError { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { self.0.fmt(f) |