diff options
author | metamuffin <metamuffin@disroot.org> | 2023-08-02 23:07:55 +0200 |
---|---|---|
committer | metamuffin <metamuffin@disroot.org> | 2023-08-02 23:07:55 +0200 |
commit | 8e33fcdfbd9df042c0cfd8e9a2084993313961c9 (patch) | |
tree | 9b18183237177b6c2b7060140ed92e62581ab588 /server/src | |
parent | c81d8bbfd46d53fba6e0086b5f859f8af8639f4a (diff) | |
download | jellything-8e33fcdfbd9df042c0cfd8e9a2084993313961c9.tar jellything-8e33fcdfbd9df042c0cfd8e9a2084993313961c9.tar.bz2 jellything-8e33fcdfbd9df042c0cfd8e9a2084993313961c9.tar.zst |
federated import works but relies on private data
Diffstat (limited to 'server/src')
-rw-r--r-- | server/src/federation.rs | 55 | ||||
-rw-r--r-- | server/src/import.rs | 86 | ||||
-rw-r--r-- | server/src/main.rs | 13 | ||||
-rw-r--r-- | server/src/routes/api/mod.rs | 5 | ||||
-rw-r--r-- | server/src/routes/mod.rs | 9 | ||||
-rw-r--r-- | server/src/routes/stream.rs | 29 | ||||
-rw-r--r-- | server/src/routes/ui/account/admin.rs | 4 | ||||
-rw-r--r-- | server/src/routes/ui/error.rs | 7 |
8 files changed, 160 insertions, 48 deletions
diff --git a/server/src/federation.rs b/server/src/federation.rs new file mode 100644 index 0000000..578261b --- /dev/null +++ b/server/src/federation.rs @@ -0,0 +1,55 @@ +use crate::CONF; +use anyhow::anyhow; +use jellyclient::{Instance, Session}; +use std::{collections::HashMap, sync::Arc, time::Duration}; +use tokio::sync::RwLock; + +pub struct Federation { + instances: HashMap<String, Instance>, + sessions: RwLock<HashMap<String, Arc<Session>>>, +} + +impl Federation { + pub fn initialize() -> Self { + let instances = CONF + .remote_credentials + .iter() + .map(|(k, (_, _, tls))| (k.to_owned(), Instance::new(k.to_owned(), *tls))) + .collect::<HashMap<_, _>>(); + + Self { + instances, + sessions: Default::default(), + } + } + + pub fn get_instance(&self, host: &String) -> anyhow::Result<&Instance> { + Ok(self + .instances + .get(host) + .ok_or(anyhow!("unknown instance"))?) + } + pub async fn get_session(&self, host: &String) -> anyhow::Result<Arc<Session>> { + let mut w = self.sessions.write().await; + if let Some(s) = w.get(host) { + Ok(s.to_owned()) + } else { + let (username, password, _) = CONF + .remote_credentials + .get(host) + .ok_or(anyhow!("no credentials of the remote server"))?; + let s = Arc::new( + self.get_instance(host)? + .to_owned() + .login( + username.to_owned(), + password.to_owned(), + Duration::from_secs(60 * 60 * 24 * 356), + ) + .await?, + ); + w.insert(host.to_owned(), s.clone()); + Ok(s) + } + } +} diff --git a/server/src/import.rs b/server/src/import.rs index b306332..a172ad9 100644 --- a/server/src/import.rs +++ b/server/src/import.rs @@ -3,37 +3,42 @@ which is licensed under the GNU Affero General Public License (version 3); see /COPYING. Copyright (C) 2023 metamuffin <metamuffin.org> */ -use crate::{database::Database, CONF}; -use anyhow::{bail, Context, Ok}; +use crate::{database::Database, federation::Federation, CONF}; +use anyhow::{anyhow, bail, Context, Ok}; use async_recursion::async_recursion; -use jellycommon::Node; -use log::info; +use jellycommon::{Node, RemoteImportOptions}; +use log::{error, info}; use std::{ffi::OsStr, fs::File, os::unix::prelude::OsStrExt, path::PathBuf, sync::LazyLock}; use tokio::sync::Semaphore; static IMPORT_SEM: LazyLock<Semaphore> = LazyLock::new(|| Semaphore::new(1)); -pub async fn import(db: &Database) -> anyhow::Result<()> { +pub async fn import(db: &Database, fed: &Federation) -> anyhow::Result<()> { info!("clearing node tree"); let permit = IMPORT_SEM.try_acquire()?; - { - db.node.clear()?; - info!("importing..."); - import_path(CONF.library_path.clone(), db, None) - .await - .context("indexing")?; - info!("import completed"); - } + db.node.clear()?; + info!("importing..."); + let (_, errs) = import_path(CONF.library_path.clone(), db, fed, None) + .await + .context("indexing")?; + info!("import completed"); drop(permit); - Ok(()) + if errs == 0 { + Ok(()) + } else { + Err(anyhow!( + "partial import, {errs} errors occured; see server log" + )) + } } #[async_recursion] pub async fn import_path( path: PathBuf, db: &Database, + fed: &Federation, parent: Option<String>, -) -> anyhow::Result<Vec<String>> { +) -> anyhow::Result<(Vec<String>, usize)> { if path.is_dir() { let mpath = path.join("directory.json"); let children_paths = path.read_dir()?.map(Result::unwrap).filter_map(|e| { @@ -47,8 +52,19 @@ pub async fn import_path( }); let identifier = path.file_name().unwrap().to_str().unwrap().to_string(); let mut children_ids = Vec::new(); + let mut errs = 0; for p in children_paths { - children_ids.extend(import_path(p, db, Some(identifier.clone())).await?) + let k = import_path(p, db, fed, Some(identifier.clone())).await; + match k { + core::result::Result::Ok((els, errs2)) => { + errs += errs2; + children_ids.extend(els) + } + Err(e) => { + errs += 1; + error!("import failed: {e:?}") + } + } } if mpath.exists() { let mut data: Node = @@ -56,14 +72,14 @@ pub async fn import_path( data.public.children = children_ids; data.public.parent = parent; - info!("insert {identifier}"); + info!("adding {identifier}"); db.node.insert(&identifier, &data)?; - Ok(vec![identifier]) + Ok((vec![identifier], errs)) } else { - Ok(children_ids) + Ok((children_ids, errs)) } } else if path.is_file() { - info!("loading item {path:?}"); + info!("loading {path:?}"); let datafile = File::open(path.clone()).context("cant load metadata")?; let mut data: Node = serde_json::from_reader(datafile).context("invalid metadata")?; let identifier = path @@ -75,11 +91,37 @@ pub async fn import_path( .unwrap() .to_string(); - info!("insert {identifier}"); + if let Some(io) = data.private.import.take() { + let title = data.public.title; + data = import_remote(io, db, fed) + .await + .context("federated import")?; + data.public.title = title; + } + + info!("adding {identifier}"); data.public.parent = parent; db.node.insert(&identifier, &data)?; - Ok(vec![identifier]) + Ok((vec![identifier], 0)) } else { bail!("did somebody really put a fifo or socket in the library?!") } } + +async fn import_remote( + opts: RemoteImportOptions, + db: &Database, + fed: &Federation, +) -> anyhow::Result<Node> { + let sess = fed + .get_session(&opts.host) + .await + .context("creating session")?; + let node = sess.node(&opts.id).await.context("fetching remote node")?; + + if !node.public.children.is_empty() { + todo!() + } + + Ok(node) +} diff --git a/server/src/main.rs b/server/src/main.rs index 5b2d070..6a75c30 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -6,13 +6,16 @@ #![feature(lazy_cell)] use database::Database; +use federation::Federation; use jellycommon::config::GlobalConfig; use jellyremuxer::RemuxerContext; +use log::error; use once_cell::sync::Lazy; use routes::build_rocket; use std::fs::File; pub mod database; +pub mod federation; pub mod import; pub mod routes; @@ -37,7 +40,13 @@ fn main() { async fn async_main() { let remuxer = RemuxerContext::new(); let database = Database::open(&CONF.database_path).unwrap(); - import::import(&database).await.unwrap(); + let federation = Federation::initialize(); database.create_admin(); - build_rocket(remuxer, database).launch().await.unwrap(); + if let Err(err) = import::import(&database, &federation).await { + error!("import not sucessful: {err:?}") + } + build_rocket(remuxer, database, federation) + .launch() + .await + .unwrap(); } diff --git a/server/src/routes/api/mod.rs b/server/src/routes/api/mod.rs index e74b134..cc87525 100644 --- a/server/src/routes/api/mod.rs +++ b/server/src/routes/api/mod.rs @@ -4,8 +4,6 @@ Copyright (C) 2023 metamuffin <metamuffin.org> */ -use std::ops::Deref; - use super::ui::{ account::{login_logic, LoginForm}, error::MyResult, @@ -22,6 +20,7 @@ use rocket::{ Request, State, }; use serde_json::{json, Value}; +use std::ops::Deref; #[get("/api")] pub fn r_api_root() -> Redirect { @@ -33,7 +32,7 @@ pub fn r_api_version() -> &'static str { "2" } -#[post("/api/account/login", data = "<data>")] +#[post("/api/create_session", data = "<data>")] pub fn r_api_account_login(database: &State<Database>, data: Json<LoginForm>) -> MyResult<Value> { let token = login_logic(database, &data.username, &data.password)?; Ok(json!(token)) diff --git a/server/src/routes/mod.rs b/server/src/routes/mod.rs index be4d2cb..42eae1b 100644 --- a/server/src/routes/mod.rs +++ b/server/src/routes/mod.rs @@ -3,7 +3,7 @@ which is licensed under the GNU Affero General Public License (version 3); see /COPYING. Copyright (C) 2023 metamuffin <metamuffin.org> */ -use crate::{database::Database, routes::ui::error::MyResult, CONF}; +use crate::{database::Database, federation::Federation, routes::ui::error::MyResult, CONF}; use api::{r_api_account_login, r_api_root, r_api_version}; use base64::Engine; use jellyremuxer::RemuxerContext; @@ -45,7 +45,11 @@ macro_rules! uri { }; } -pub fn build_rocket(remuxer: RemuxerContext, database: Database) -> Rocket<Build> { +pub fn build_rocket( + remuxer: RemuxerContext, + database: Database, + federation: Federation, +) -> Rocket<Build> { rocket::build() .configure(Config { secret_key: SecretKey::derive_from( @@ -61,6 +65,7 @@ pub fn build_rocket(remuxer: RemuxerContext, database: Database) -> Rocket<Build }) .manage(remuxer) .manage(database) + .manage(federation) .attach(AdHoc::on_response("set server header", |_req, res| { res.set_header(Header::new("server", "jellything")); Box::pin(async {}) diff --git a/server/src/routes/stream.rs b/server/src/routes/stream.rs index 1277646..ee7880c 100644 --- a/server/src/routes/stream.rs +++ b/server/src/routes/stream.rs @@ -4,7 +4,7 @@ Copyright (C) 2023 metamuffin <metamuffin.org> */ use super::ui::{account::session::Session, error::MyError}; -use crate::{database::Database, CONF}; +use crate::{database::Database, federation::Federation, CONF}; use anyhow::{anyhow, Result}; use jellycommon::MediaSource; use jellyremuxer::RemuxerContext; @@ -30,6 +30,7 @@ pub async fn r_stream( webm: Option<bool>, tracks: String, remuxer: &State<RemuxerContext>, + federation: &State<Federation>, db: &State<Database>, range: Option<RequestRange>, ) -> Result<Either<StreamResponse, Redirect>, MyError> { @@ -47,27 +48,21 @@ pub async fn r_stream( let source_tracks = match source { MediaSource::Local { tracks } => tracks, MediaSource::Remote { host, remote_id } => { - let (username, password, tls) = CONF + let (username, password, _) = CONF .remote_credentials .get(&host) .ok_or(anyhow!("no credentials on the server-side"))?; - let instance = jellyclient::Instance { host, tls: *tls }; - let session = jellyclient::login( - &instance, - username.to_owned(), - password.to_owned(), - Duration::from_secs(60), - ) - .await?; + let instance = federation.get_instance(&host)?.to_owned(); + let session = instance + .login( + username.to_owned(), + password.to_owned(), + Duration::from_secs(60), + ) + .await?; - let uri = jellyclient::stream( - &instance, - &session, - &remote_id, - &tracks, - webm.unwrap_or(false), - ); + let uri = session.stream(&remote_id, &tracks, webm.unwrap_or(false)); return Ok(Either::Right(Redirect::found(uri))); } }; diff --git a/server/src/routes/ui/account/admin.rs b/server/src/routes/ui/account/admin.rs index 7124f4a..d0ad433 100644 --- a/server/src/routes/ui/account/admin.rs +++ b/server/src/routes/ui/account/admin.rs @@ -5,6 +5,7 @@ */ use crate::{ database::Database, + federation::Federation, import::import, routes::ui::{ account::session::Session, @@ -135,11 +136,12 @@ pub fn r_account_admin_remove_invite( pub async fn r_account_admin_import( session: Session, database: &State<Database>, + federation: &State<Federation>, ) -> MyResult<DynLayoutPage<'static>> { if !session.user.admin { Err(anyhow!("you not admin"))? } - let r = import(&database).await; + let r = import(&database, &federation).await; admin_dashboard( &database, Some(r.map_err(|e| e.into()).map(|_| "Import successful".into())), diff --git a/server/src/routes/ui/error.rs b/server/src/routes/ui/error.rs index 190650f..b538a06 100644 --- a/server/src/routes/ui/error.rs +++ b/server/src/routes/ui/error.rs @@ -36,7 +36,6 @@ pub fn r_api_catch<'a>(status: Status, _request: &Request) -> Value { pub type MyResult<T> = Result<T, MyError>; -#[derive(Debug)] pub struct MyError(pub anyhow::Error); impl<'r> Responder<'r, 'static> for MyError { @@ -61,6 +60,12 @@ impl<'r> Responder<'r, 'static> for MyError { } } +impl std::fmt::Debug for MyError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_fmt(format_args!("{:?}", self.0)) + } +} + impl Display for MyError { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { self.0.fmt(f) |