diff options
author | metamuffin <metamuffin@disroot.org> | 2025-01-30 01:27:11 +0100 |
---|---|---|
committer | metamuffin <metamuffin@disroot.org> | 2025-01-30 01:27:11 +0100 |
commit | a5a73dc868c714391e4da4a53b4e4993fc77372e (patch) | |
tree | 1bad34a616a7e20b40ef6a8100b8aa20ef5fb783 | |
parent | 59825d68efa1077382fd6acac73f75ae9dc3680a (diff) | |
download | jellything-a5a73dc868c714391e4da4a53b4e4993fc77372e.tar jellything-a5a73dc868c714391e4da4a53b4e4993fc77372e.tar.bz2 jellything-a5a73dc868c714391e4da4a53b4e4993fc77372e.tar.zst |
parallel import
-rw-r--r-- | Cargo.lock | 6 | ||||
-rw-r--r-- | base/src/assetfed.rs | 2 | ||||
-rw-r--r-- | common/src/config.rs | 4 | ||||
-rw-r--r-- | import/Cargo.toml | 3 | ||||
-rw-r--r-- | import/src/lib.rs | 140 | ||||
-rw-r--r-- | server/src/routes/ui/assets.rs | 2 | ||||
-rw-r--r-- | tool/src/add.rs | 2 |
7 files changed, 105 insertions, 54 deletions
@@ -652,9 +652,9 @@ dependencies = [ [[package]] name = "crossbeam-channel" -version = "0.5.13" +version = "0.5.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "33480d6946193aa8033910124896ca395333cae7e2d1113d1fef6c3272217df2" +checksum = "06ba6d68e24814cb8de6bb986db8222d3a027d15872cabc0d18817bc3c0e4471" dependencies = [ "crossbeam-utils", ] @@ -1748,12 +1748,14 @@ dependencies = [ "async-recursion", "base64", "bincode", + "crossbeam-channel", "ebml-struct", "futures", "jellybase", "jellyclient", "jellycommon", "log", + "rayon", "regex", "reqwest", "serde", diff --git a/base/src/assetfed.rs b/base/src/assetfed.rs index bb39bbd..d20446b 100644 --- a/base/src/assetfed.rs +++ b/base/src/assetfed.rs @@ -31,7 +31,7 @@ pub enum AssetInner { Federated { host: String, asset: Vec<u8> }, Cache(CachePath), Assets(PathBuf), - Library(PathBuf), + Media(PathBuf), } impl AssetInner { diff --git a/common/src/config.rs b/common/src/config.rs index 467a2ec..682fdd7 100644 --- a/common/src/config.rs +++ b/common/src/config.rs @@ -17,7 +17,6 @@ pub struct GlobalConfig { #[serde(default = "return_true" )] pub tls: bool, #[serde(default = "default::asset_path")] pub asset_path: PathBuf, #[serde(default = "default::database_path")] pub database_path: PathBuf, - #[serde(default = "default::library_path")] pub library_path: PathBuf, #[serde(default = "default::temp_path")] pub temp_path: PathBuf, #[serde(default = "default::cache_path")] pub cache_path: PathBuf, #[serde(default = "default::media_path")] pub media_path: PathBuf, @@ -74,9 +73,6 @@ mod default { pub fn database_path() -> PathBuf { "data/database".into() } - pub fn library_path() -> PathBuf { - "data/library".into() - } pub fn cache_path() -> PathBuf { "data/cache".into() } diff --git a/import/Cargo.toml b/import/Cargo.toml index b9bd6db..d0342df 100644 --- a/import/Cargo.toml +++ b/import/Cargo.toml @@ -10,6 +10,9 @@ jellyclient = { path = "../client" } ebml-struct = { git = "https://codeberg.org/metamuffin/ebml-struct" } +rayon = "1.10.0" +crossbeam-channel = "0.5.14" + log = { workspace = true } anyhow = "1.0.95" reqwest = { workspace = true } diff --git a/import/src/lib.rs b/import/src/lib.rs index 34780a8..243ac8a 100644 --- a/import/src/lib.rs +++ b/import/src/lib.rs @@ -11,17 +11,25 @@ use ebml_struct::{ }; use jellybase::{assetfed::AssetInner, cache::cache_file, database::Database, CONF, SECRETS}; use jellycommon::{ - Chapter, LocalTrack, MediaInfo, NodeID, NodeKind, Rating, SourceTrack, SourceTrackKind, + Chapter, LocalTrack, MediaInfo, Node, NodeID, NodeKind, Rating, SourceTrack, SourceTrackKind, TrackSource, }; -use log::info; +use log::{info, warn}; +use rayon::iter::{ + IntoParallelIterator, IntoParallelRefIterator, ParallelBridge, ParallelDrainRange, + ParallelIterator, +}; use regex::Regex; use std::{ collections::{HashMap, VecDeque}, fs::File, io::{BufReader, ErrorKind, Read, Write}, - path::Path, - sync::LazyLock, + mem::swap, + path::{Path, PathBuf}, + sync::{ + atomic::{AtomicUsize, Ordering}, + LazyLock, + }, time::UNIX_EPOCH, }; use tmdb::Tmdb; @@ -54,11 +62,9 @@ pub async fn import_wrap(db: Database, incremental: bool) -> Result<()> { let _sem = IMPORT_SEM.try_acquire()?; let jh = spawn_blocking(move || { - let errs = match import(&db, incremental) { - Err(e) => vec![format!("{e:#}")], - Ok(e) => e, - }; - *IMPORT_ERRORS.blocking_write() = errs; + if let Err(e) = import(&db, incremental) { + IMPORT_ERRORS.blocking_write().push(format!("{e:#}")); + } }); let _ = jh.await; @@ -66,67 +72,109 @@ pub async fn import_wrap(db: Database, incremental: bool) -> Result<()> { Ok(()) } -fn import(db: &Database, incremental: bool) -> Result<Vec<String>> { - let mut queue = VecDeque::from_iter(Some(CONF.media_path.clone())); - let mut errors = Vec::new(); +fn import(db: &Database, incremental: bool) -> Result<()> { + let mut queue_prev = vec![CONF.media_path.clone()]; + let mut queue_next; let apis = Apis { trakt: SECRETS.api.trakt.as_ref().map(|key| Trakt::new(key)), tmdb: SECRETS.api.tmdb.as_ref().map(|key| Tmdb::new(key)), }; - let mut num_skipped = 0; - let mut num_imported = 0; - while let Some(path) = queue.pop_front() { - if path.is_dir() { - for e in path.read_dir()? { - queue.push_back(e?.path()); - } + while !queue_prev.is_empty() { + queue_next = queue_prev + .par_drain(..) + .flat_map_iter( + move |path| match import_iter_inner(&path, db, incremental) { + Ok(ch) => ch, + Err(e) => { + IMPORT_ERRORS.blocking_write().push(format!("{e:#}")); + Vec::new() + } + }, + ) + .collect::<Vec<_>>(); + swap(&mut queue_next, &mut queue_prev); + } + Ok(()) +} + +fn import_iter_inner(path: &Path, db: &Database, incremental: bool) -> Result<Vec<PathBuf>> { + if path.is_dir() { + let mut o = Vec::new(); + for e in path.read_dir()? { + o.push(e?.path()); } - if path.is_file() { - let meta = path.metadata()?; - let mtime = meta - .modified()? - .duration_since(UNIX_EPOCH) - .unwrap() - .as_secs(); + return Ok(o); + } + if path.is_file() { + let meta = path.metadata()?; + let mtime = meta.modified()?.duration_since(UNIX_EPOCH)?.as_secs(); - if incremental { - if let Some(last_mtime) = db.get_import_file_mtime(&path)? { - if last_mtime >= mtime { - num_skipped += 1; - continue; - } + if incremental { + if let Some(last_mtime) = db.get_import_file_mtime(&path)? { + if last_mtime >= mtime { + return Ok(Vec::new()); } } - num_imported += 1; - if let Err(e) = import_file(db, &path).context(anyhow!("{path:?}")) { - errors.push(format!("{e:#}")); - } - db.set_import_file_mtime(&path, mtime)?; } + + import_file(&db, &path).context(anyhow!("{path:?}"))?; + db.set_import_file_mtime(&path, mtime)?; } - info!("import finished. skipped={num_skipped} imported={num_imported}"); - Ok(errors) + return Ok(Vec::new()); } fn import_file(db: &Database, path: &Path) -> Result<()> { let filename = path.file_stem().unwrap().to_string_lossy(); + let parent = NodeID::from_slug( + &path + .parent() + .ok_or(anyhow!("no parent"))? + .file_name() + .ok_or(anyhow!("parent no filename"))? + .to_string_lossy(), + ); match filename.as_ref() { - "poster" => (), + "poster" => { + db.update_node_init(parent, |node| { + node.poster = Some(AssetInner::Media(path.to_owned()).ser()); + Ok(()) + })?; + } + "backdrop" => { + db.update_node_init(parent, |node| { + node.backdrop = Some(AssetInner::Media(path.to_owned()).ser()); + Ok(()) + })?; + } + "info" => { + let data = serde_yaml::from_reader::<_, Node>(BufReader::new(File::open(path)?))?; + db.update_node_init(parent, |node| { + fn merge_option<T>(a: &mut Option<T>, b: Option<T>) { + if b.is_some() { + *a = b; + } + } + merge_option(&mut node.title, data.title); + merge_option(&mut node.tagline, data.tagline); + merge_option(&mut node.description, data.description); + Ok(()) + })?; + } _ => (), } let mut magic = [0; 4]; File::open(path)?.read_exact(&mut magic).ok(); if matches!(magic, [0x1A, 0x45, 0xDF, 0xA3]) { - import_media_file(db, path).context("media file")?; + import_media_file(db, path, parent).context("media file")?; } Ok(()) } -fn import_media_file(db: &Database, path: &Path) -> Result<()> { +fn import_media_file(db: &Database, path: &Path, parent: NodeID) -> Result<()> { info!("reading media file {path:?}"); let mut file = BufReader::new(File::open(path)?); let mut file = file.by_ref().take(u64::MAX); @@ -165,7 +213,7 @@ fn import_media_file(db: &Database, path: &Path) -> Result<()> { .context("infojson")?, ); } - "cover.webp" => { + "cover.webp" | "cover.png" | "cover.jpg" | "cover.jpeg" | "cover.avif" => { cover = Some( AssetInner::Cache(cache_file( &["att-cover", path.to_string_lossy().as_ref()], @@ -181,7 +229,6 @@ fn import_media_file(db: &Database, path: &Path) -> Result<()> { } } } - EL_VOID | EL_CRC32 | EL_CUES | EL_SEEKHEAD => { seg.consume()?; } @@ -216,7 +263,7 @@ fn import_media_file(db: &Database, path: &Path) -> Result<()> { let slug = infojson .as_ref() - .map(|ij| ij.id.to_owned()) + .map(|ij| format!("youtube-{}", ij.id)) .unwrap_or(make_kebab(&filepath_stem)); db.update_node_init(NodeID::from_slug(&slug), |node| { @@ -225,6 +272,9 @@ fn import_media_file(db: &Database, path: &Path) -> Result<()> { node.poster = cover; node.description = tags.remove("DESCRIPTION"); node.tagline = tags.remove("COMMENT"); + if !node.parents.contains(&parent) { + node.parents.push(parent) + } if let Some(infojson) = infojson { node.kind = Some( if infojson.duration.unwrap_or(0.) < 600. diff --git a/server/src/routes/ui/assets.rs b/server/src/routes/ui/assets.rs index 689c7f1..ac808b1 100644 --- a/server/src/routes/ui/assets.rs +++ b/server/src/routes/ui/assets.rs @@ -55,7 +55,7 @@ pub async fn resolve_asset(asset: AssetInner) -> anyhow::Result<PathBuf> { match asset { AssetInner::Cache(c) => Ok(c.abs()), AssetInner::Assets(c) => Ok(CONF.asset_path.join(c)), - AssetInner::Library(c) => Ok(CONF.library_path.join(c)), + AssetInner::Media(c) => Ok(CONF.media_path.join(c)), _ => unreachable!(), } } diff --git a/tool/src/add.rs b/tool/src/add.rs index 368743e..fdaa14e 100644 --- a/tool/src/add.rs +++ b/tool/src/add.rs @@ -40,7 +40,7 @@ pub async fn add(action: Action) -> anyhow::Result<()> { library_path } else { let mut directories = Vec::new(); - find_folders(&CONF.library_path, &PathBuf::new(), &mut directories) + find_folders(&CONF.media_path, &PathBuf::new(), &mut directories) .context("listing library directories")?; let mut default = 0; |