From 3a29113e965a94bdef06655f1583cc6e86edd606 Mon Sep 17 00:00:00 2001 From: metamuffin Date: Thu, 21 Dec 2023 23:57:42 +0100 Subject: rework import system pt. 1 --- import/src/lib.rs | 376 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 376 insertions(+) create mode 100644 import/src/lib.rs (limited to 'import/src/lib.rs') diff --git a/import/src/lib.rs b/import/src/lib.rs new file mode 100644 index 0000000..3698f79 --- /dev/null +++ b/import/src/lib.rs @@ -0,0 +1,376 @@ +/* + This file is part of jellything (https://codeberg.org/metamuffin/jellything) + which is licensed under the GNU Affero General Public License (version 3); see /COPYING. + Copyright (C) 2023 metamuffin +*/ +#![feature(lazy_cell)] +use anyhow::{Context, Ok}; +use async_recursion::async_recursion; +use futures::{stream::FuturesUnordered, StreamExt}; +use jellybase::{ + cache::async_cache_file, database::Database, federation::Federation, AssetLocationExt, CONF, +}; +use jellyclient::Session; +use jellycommon::{ + AssetLocation, AssetRole, ImportOptions, ImportSource, Node, NodePrivate, NodePublic, +}; +use log::{debug, info}; +use std::{ + cmp::Ordering, + ffi::OsStr, + fs::File, + os::unix::prelude::OsStrExt, + path::{Path, PathBuf}, + sync::{Arc, LazyLock}, +}; +use tokio::sync::Semaphore; + +static IMPORT_SEM: LazyLock = LazyLock::new(|| Semaphore::new(1)); + +pub async fn import(db: &Database, fed: &Federation) -> anyhow::Result<()> { + let permit = IMPORT_SEM.try_acquire()?; + info!("loading sources..."); + import_path(CONF.library_path.clone(), vec![], db, fed) + .await + .context("indexing")?; + info!("merging nodes..."); + merge_nodes(db).context("merging nodes")?; + info!("clearing temporary node tree"); + db.node_import.clear()?; + info!("import completed"); + drop(permit); + Ok(()) +} + +pub fn merge_nodes(db: &Database) -> anyhow::Result<()> { + for r in db.node_import.iter() { + let (id, mut nodes) = r?; + + nodes.sort_by(|(x, _), (y, _)| compare_index_path(x, y)); + + let node = nodes + .into_iter() + .map(|(_, x)| x) + .reduce(|x, y| merge_node(x, y)) + .unwrap(); + + db.node.insert(&id, &node)?; + } + Ok(()) +} + +fn compare_index_path(x: &[usize], y: &[usize]) -> Ordering { + if x.is_empty() { + Ordering::Greater + } else if y.is_empty() { + Ordering::Less + } else { + match x[0].cmp(&y[0]) { + o @ (Ordering::Less | Ordering::Greater) => o, + Ordering::Equal => compare_index_path(&x[1..], &y[1..]), + } + } +} + +#[async_recursion] +pub async fn import_path( + path: PathBuf, + index_path: Vec, + db: &Database, + fed: &Federation, +) -> anyhow::Result<()> { + if path.is_dir() { + let mut children_paths = path + .read_dir()? + .map(Result::unwrap) + .filter_map(|e| { + if e.path().extension() == Some(&OsStr::from_bytes(b"yaml")) + || e.metadata().unwrap().is_dir() + { + Some(e.path()) + } else { + None + } + }) + .collect::>(); + + children_paths.sort(); + + let mut children: FuturesUnordered<_> = children_paths + .into_iter() + .enumerate() + .map(|(i, p)| { + import_path( + p.clone(), + { + let mut path = index_path.clone(); + path.push(i); + path + }, + db, + fed, + ) + }) + .collect(); + + while let Some(k) = children.next().await { + k? + } + } else { + let opts: ImportOptions = serde_yaml::from_reader(File::open(&path)?)?; + + for s in opts.sources { + process_source(opts.id.clone(), s, &path, &index_path, db, fed).await?; + } + } + Ok(()) +} + +async fn process_source( + id: String, + s: ImportSource, + path: &Path, + index_path: &[usize], + db: &Database, + fed: &Federation, +) -> anyhow::Result<()> { + let insert_node = move |id: &String, n: Node| -> anyhow::Result<()> { + db.node_import.fetch_and_update(id, |l| { + let mut l = l.unwrap_or_default(); + l.push((index_path.to_vec(), n.clone())); + Some(l) + })?; + Ok(()) + }; + match s { + ImportSource::Override(n) => insert_node(&id, n)?, + ImportSource::Tmdb(_) => todo!(), + ImportSource::Media { location } => { + let path = location.path(); + } + ImportSource::Federated { host } => { + let session = fed.get_session(&host).await.context("creating session")?; + + import_remote(id, &host, db, &session, index_path) + .await + .context("federated import")? + } + ImportSource::AutoChildren => { + // TODO dont forget to update path of children + } + } + Ok(()) +} + +fn merge_node(x: Node, y: Node) -> Node { + Node { + public: NodePublic { + kind: x.public.kind.or(y.public.kind), + title: x.public.title.or(y.public.title), + id: x.public.id.or(y.public.id), + path: vec![], + children: if x.public.children.is_empty() { + x.public.children + } else { + y.public.children + }, + tagline: x.public.tagline.or(y.public.tagline), + description: x.public.description.or(y.public.description), + release_date: x.public.release_date.or(y.public.release_date), + index: x.public.index.or(y.public.index), + media: x.public.media.or(y.public.media), // TODO proper media merging + ratings: x + .public + .ratings + .into_iter() + .chain(y.public.ratings) + .collect(), + federated: x.public.federated.or(y.public.federated), + }, + private: NodePrivate { + id: x.private.id.or(y.private.id), + poster: x.private.poster.or(y.private.poster), + backdrop: x.private.backdrop.or(y.private.backdrop), + source: x.private.source.or(y.private.source), // TODO here too + }, + } +} + +// #[async_recursion] +// pub async fn import_path( +// path: PathBuf, +// db: &Database, +// fed: &Federation, +// mut node_path: Vec, +// ) -> anyhow::Result<(Vec, usize)> { +// if path.is_dir() { +// let mpath = path.join("directory.json"); +// let children_paths = path.read_dir()?.map(Result::unwrap).filter_map(|e| { +// if e.path().extension() == Some(&OsStr::from_bytes(b"jelly")) +// || e.metadata().unwrap().is_dir() +// { +// Some(e.path()) +// } else { +// None +// } +// }); +// let identifier = if mpath.exists() { +// path.file_name().unwrap().to_str().unwrap().to_string() +// } else { +// node_path +// .last() +// .cloned() +// .ok_or(anyhow!("non-root node requires parent"))? +// }; + +// node_path.push(identifier.clone()); +// let mut all: FuturesUnordered<_> = children_paths +// .into_iter() +// .map(|p| import_path(p.clone(), db, fed, node_path.clone()).map_err(|e| (p, e))) +// .collect(); +// node_path.pop(); // we will set the dirs path later and need it to not be included + +// let mut children_ids = Vec::new(); +// let mut errs = 0; +// while let Some(k) = all.next().await { +// match k { +// core::result::Result::Ok((els, errs2)) => { +// errs += errs2; +// children_ids.extend(els) +// } +// Err((p, e)) => { +// errs += 1; +// error!("import of {p:?} failed: {e:?}") +// } +// } +// } +// if mpath.exists() { +// let mut node: Node = +// serde_json::from_reader(File::open(mpath).context("metadata missing")?)?; + +// node.public.children = children_ids; +// node.public.path = node_path; +// node.public.id = Some(identifier.to_owned()); +// info!("adding {identifier}"); +// db.node.insert(&identifier, &node)?; +// Ok((vec![identifier], errs)) +// } else { +// Ok((children_ids, errs)) +// } +// } else if path.is_file() { +// info!("loading {path:?}"); +// let datafile = File::open(path.clone()).context("cant load metadata")?; +// let mut node: Node = serde_json::from_reader(datafile).context("invalid metadata")?; +// let identifier = node.private.id.clone().unwrap_or_else(|| { +// path.file_name() +// .unwrap() +// .to_str() +// .unwrap() +// .strip_suffix(".json") +// .unwrap() +// .to_string() +// }); + +// let idents = if let Some(io) = node.private.import.take() { +// let session = fed +// .get_session(&io.host) +// .await +// .context("creating session")?; + +// import_remote(io, db, &session, identifier.clone(), node_path) +// .await +// .context("federated import")? +// } else { +// debug!("adding {identifier}"); +// node.public.path = node_path; +// node.public.id = Some(identifier.to_owned()); +// let did_insert = db.node.insert(&identifier, &node)?.is_none(); +// if did_insert { +// vec![identifier] +// } else { +// vec![] +// } +// }; +// Ok((idents, 0)) +// } else { +// bail!("did somebody really put a fifo or socket in the library?!") +// } +// } + +static SEM_REMOTE_IMPORT: LazyLock = LazyLock::new(|| Semaphore::new(16)); + +#[async_recursion] +async fn import_remote( + id: String, + host: &str, + db: &Database, + session: &Arc, + index_path: &[usize], +) -> anyhow::Result<()> { + let insert_node = move |id: &String, n: Node| -> anyhow::Result<()> { + db.node_import.fetch_and_update(id, |l| { + let mut l = l.unwrap_or_default(); + l.push((index_path.to_vec(), n.clone())); + Some(l) + })?; + Ok(()) + }; + let _permit = SEM_REMOTE_IMPORT.acquire().await.unwrap(); + info!("loading federated node {id:?}"); + + let node = session.node(&id).await.context("fetching remote node")?; + + if node.federated.as_ref() == Some(&CONF.hostname) { + return Ok(()); + } + + // TODO maybe use lazy download + let poster = cache_federation_asset(session.to_owned(), id.clone(), AssetRole::Poster).await?; + let backdrop = + cache_federation_asset(session.to_owned(), id.clone(), AssetRole::Backdrop).await?; + + drop(_permit); + + let node = Node { + public: node.clone(), + private: NodePrivate { + backdrop: Some(backdrop), + poster: Some(poster), + id: None, + source: None, // TODO + }, + }; + + debug!("adding {id}"); + insert_node(&id, node.clone())?; + + let mut children: FuturesUnordered<_> = node + .public + .children + .iter() + .map(|c| import_remote(c.to_owned(), host, db, session, index_path)) + .collect(); + + while let Some(r) = children.next().await { + r?; + } + + Ok(()) +} + +async fn cache_federation_asset( + session: Arc, + identifier: String, + role: AssetRole, +) -> anyhow::Result { + async_cache_file( + &["fed-asset", role.as_str(), &identifier.clone()], + move |out| async move { + let session = session; + session + .node_asset(identifier.as_str(), role, 1024, out) + .await + }, + ) + .await +} -- cgit v1.2.3-70-g09d2 From 826c61c9612e855b19c3adb0e93d80bbfb4dc903 Mon Sep 17 00:00:00 2001 From: metamuffin Date: Fri, 22 Dec 2023 00:29:11 +0100 Subject: rework import system pt. 2 --- Cargo.lock | 2 ++ common/src/lib.rs | 4 +-- import/Cargo.toml | 4 +-- import/src/lib.rs | 65 ++++++++++++++++++++++++++++++++++++++++++-- server/src/routes/ui/home.rs | 2 +- tool/src/main.rs | 13 --------- 6 files changed, 69 insertions(+), 21 deletions(-) (limited to 'import/src/lib.rs') diff --git a/Cargo.lock b/Cargo.lock index 9a12441..bd09a54 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1391,9 +1391,11 @@ dependencies = [ "jellybase", "jellyclient", "jellycommon", + "jellymatroska", "jellyremuxer", "log", "reqwest", + "serde", "serde_json", "serde_yaml", "tokio", diff --git a/common/src/lib.rs b/common/src/lib.rs index e953d85..9a3535a 100644 --- a/common/src/lib.rs +++ b/common/src/lib.rs @@ -21,8 +21,8 @@ use std::{collections::BTreeMap, path::PathBuf}; #[derive(Debug, Clone, Deserialize, Serialize, Default)] pub struct Node { - pub public: NodePublic, - pub private: NodePrivate, + #[serde(default)] pub public: NodePublic, + #[serde(default)] pub private: NodePrivate, } #[rustfmt::skip] diff --git a/import/Cargo.toml b/import/Cargo.toml index f2ba7af..a54967c 100644 --- a/import/Cargo.toml +++ b/import/Cargo.toml @@ -7,14 +7,14 @@ edition = "2021" jellycommon = { path = "../common" } jellybase = { path = "../base" } jellyclient = { path = "../client" } -# jellymatroska = { path = "../matroska" } +jellymatroska = { path = "../matroska" } jellyremuxer = { path = "../remuxer" } log = { workspace = true } anyhow = "1.0.75" reqwest = { version = "0.11.22", features = ["blocking", "json"] } -# serde = { version = "1.0.193", features = ["derive"] } +serde = { version = "1.0.193", features = ["derive"] } serde_json = "1.0.108" serde_yaml = "0.9.27" # bincode = { version = "2.0.0-rc.3", features = ["serde"] } diff --git a/import/src/lib.rs b/import/src/lib.rs index 3698f79..ed0af2d 100644 --- a/import/src/lib.rs +++ b/import/src/lib.rs @@ -4,6 +4,9 @@ Copyright (C) 2023 metamuffin */ #![feature(lazy_cell)] +pub mod infojson; +pub mod tmdb; + use anyhow::{Context, Ok}; use async_recursion::async_recursion; use futures::{stream::FuturesUnordered, StreamExt}; @@ -14,16 +17,19 @@ use jellyclient::Session; use jellycommon::{ AssetLocation, AssetRole, ImportOptions, ImportSource, Node, NodePrivate, NodePublic, }; +use jellymatroska::read::EbmlReader; +use jellyremuxer::import::import_metadata; use log::{debug, info}; use std::{ cmp::Ordering, ffi::OsStr, fs::File, + io::BufReader, os::unix::prelude::OsStrExt, path::{Path, PathBuf}, sync::{Arc, LazyLock}, }; -use tokio::sync::Semaphore; +use tokio::{sync::Semaphore, task::spawn_blocking}; static IMPORT_SEM: LazyLock = LazyLock::new(|| Semaphore::new(1)); @@ -48,12 +54,15 @@ pub fn merge_nodes(db: &Database) -> anyhow::Result<()> { nodes.sort_by(|(x, _), (y, _)| compare_index_path(x, y)); - let node = nodes + let mut node = nodes .into_iter() .map(|(_, x)| x) .reduce(|x, y| merge_node(x, y)) .unwrap(); + node.public.id = Some(id.clone()); + node.public.path = vec!["library".to_string()]; // TODO reconstruct from children + db.node.insert(&id, &node)?; } Ok(()) @@ -146,7 +155,44 @@ async fn process_source( ImportSource::Override(n) => insert_node(&id, n)?, ImportSource::Tmdb(_) => todo!(), ImportSource::Media { location } => { - let path = location.path(); + let media_path = location.path(); + + let metadata = spawn_blocking(move || { + let input = BufReader::new(File::open(&media_path).unwrap()); + let mut input = EbmlReader::new(input); + import_metadata(&mut input) + }) + .await??; + + // if let Some(cover) = metadata.cover { + // let pu = path.join(format!( + // "cover.{}", + // match mime.as_str() { + // "image/webp" => "webp", + // "image/jpeg" => "jpeg", + // "image/png" => "png", + // _ => { + // warn!("unknown mime, just using webp"); + // "webp" + // } + // } + // )); + // if !pu.exists() { + // let mut f = tokio::fs::File::create(&pu).await?; + // f.write_all(&data).await?; + // } + // } + + let node = Node { + public: NodePublic { + title: metadata.title, + description: metadata.description, + tagline: metadata.tagline, + ..Default::default() + }, + private: NodePrivate::default(), + }; + insert_node(&id, node)?; } ImportSource::Federated { host } => { let session = fed.get_session(&host).await.context("creating session")?; @@ -374,3 +420,16 @@ async fn cache_federation_asset( ) .await } + +fn make_ident(s: &str) -> String { + let mut out = String::new(); + for s in s.chars() { + match s { + 'a'..='z' | '0'..='9' => out.push(s), + 'A'..='Z' => out.push(s.to_ascii_lowercase()), + '-' | ' ' | '_' | ':' => out.push('-'), + _ => (), + } + } + out +} diff --git a/server/src/routes/ui/home.rs b/server/src/routes/ui/home.rs index bcbc847..74e9b02 100644 --- a/server/src/routes/ui/home.rs +++ b/server/src/routes/ui/home.rs @@ -77,7 +77,7 @@ pub fn r_home(sess: Session, db: &State) -> MyResult { Ok(LayoutPage { title: "Home".to_string(), content: markup::new! { - p { "Welcome back " @sess.user.display_name } + p { "Welcome back, " @sess.user.display_name } h2 { "Explore " @CONF.brand } .homelist { ul {@for (id, node, udata) in &toplevel { li { @NodeCard { id, node, udata } } diff --git a/tool/src/main.rs b/tool/src/main.rs index 34337ce..6384822 100644 --- a/tool/src/main.rs +++ b/tool/src/main.rs @@ -193,19 +193,6 @@ fn main() -> anyhow::Result<()> { } } -fn make_ident(s: &str) -> String { - let mut out = String::new(); - for s in s.chars() { - match s { - 'a'..='z' | '0'..='9' => out.push(s), - 'A'..='Z' => out.push(s.to_ascii_lowercase()), - '-' | ' ' | '_' | ':' => out.push('-'), - _ => (), - } - } - out -} - fn ok_or_warn(r: Result) -> Option { match r { Ok(t) => Some(t), -- cgit v1.2.3-70-g09d2