/* This file is part of jellything (https://codeberg.org/metamuffin/jellything) which is licensed under the GNU Affero General Public License (version 3); see /COPYING. Copyright (C) 2026 metamuffin */ #![feature(duration_constants)] pub mod helpers; pub mod plugins; pub mod reporting; pub mod source_rank; use crate::{ plugins::{ImportPlugin, PluginContext, infojson::is_info_json, init_plugins, misc::is_cover}, source_rank::{ImportSource, SourceRanks}, }; use anyhow::{Context, Result, anyhow}; use jellycache::{Cache, HashKey}; use jellycommon::{ internal::{IM_MTIME, IM_PATH}, jellyobject::{self, ObjectBuffer, Path as TagPath, Tag}, *, }; use jellydb::{Database, Filter, Query, RowNum, Sort}; use jellyremuxer::{ demuxers::create_demuxer_autodetect, matroska::{self, AttachedFile, Segment}, }; use log::info; use rayon::{ ThreadPoolBuilder, iter::{IntoParallelIterator, ParallelBridge, ParallelIterator}, }; use serde::{Deserialize, Serialize}; use std::{ collections::HashSet, fs::{File, read_to_string}, mem::swap, path::{Path, PathBuf}, sync::{Arc, LazyLock, Mutex}, time::UNIX_EPOCH, }; use tokio::{runtime::Handle, sync::Semaphore, task::spawn_blocking}; #[derive(Debug, Deserialize, Serialize, Default, Clone)] pub struct Config { media_path: PathBuf, api: ApiSecrets, num_threads: usize, } #[derive(Serialize, Deserialize, Debug, Default, Clone)] pub struct ApiSecrets { pub acoustid: Option, pub tmdb: Option, pub tvdb: Option, pub imdb: Option, pub omdb: Option, pub fanart_tv: Option, pub trakt: Option, } pub const USER_AGENT: &str = concat!( "jellything/", env!("CARGO_PKG_VERSION"), " +https://codeberg.org/metamuffin/jellything" ); static IMPORT_SEM: LazyLock = LazyLock::new(|| Semaphore::new(1)); pub fn is_importing() -> bool { IMPORT_SEM.available_permits() == 0 } #[derive(Clone)] pub struct ImportConfig { pub config: Config, pub cache: Arc, pub db: Arc, } fn node_slug_query<'a>(slug: &'a str) -> Query<'a> { Query { filter: Filter::Match(jellyobject::Path(vec![NO_SLUG.0]), slug.into()), sort: Sort::None, } } impl ImportConfig { pub fn update_node( &self, node: RowNum, mut update: impl FnMut(ObjectBuffer) -> ObjectBuffer, ) -> Result<()> { self.db.transaction(&mut |txn| { let ob_before = txn.get(node)?.unwrap(); let ob_after = update(ob_before); txn.update(node, ob_after)?; Ok(()) })?; Ok(()) } pub fn update_node_slug( &self, slug: &str, mut update: impl FnMut(ObjectBuffer) -> ObjectBuffer, ) -> Result { let mut row = 0; self.db.transaction(&mut |txn| { row = match txn.query_single(node_slug_query(slug))? { Some(r) => r, None => txn.insert(ObjectBuffer::new(&mut [(NO_SLUG.0, &slug)]))?, }; let node = txn.get(row)?.unwrap(); let node = update(node); let node = node.as_object().insert(NO_SLUG, slug); txn.update(row, node)?; Ok(()) })?; Ok(row) } pub fn get_node(&self, node: RowNum) -> Result> { let mut buf = None; self.db.transaction(&mut |txn| { buf = txn.get(node)?; Ok(()) })?; Ok(buf) } } pub async fn import_wrap(ic: ImportConfig, incremental: bool) -> Result<()> { let _sem = IMPORT_SEM.try_acquire().context("already importing")?; let rt = Handle::current(); let tp = ThreadPoolBuilder::new() .thread_name(|n| format!("import #{n}")) .num_threads(ic.config.num_threads) .build()?; let jh = spawn_blocking(move || { tp.install(move || { reporting::start_import(); reporting::catch(import(ic, &rt, incremental)); reporting::end_import(); }) }); let _ = jh.await; Ok(()) } fn import(ic: ImportConfig, rt: &Handle, incremental: bool) -> Result<()> { reporting::set_stage(format!("Initializing Plugins"), 0); let plugins = init_plugins(&ic.config.api); let ranks = SourceRanks::new(); reporting::set_stage(format!("Indexing files"), 0); let files = Mutex::new(Vec::new()); import_traverse( &ic.config.media_path, &ic, incremental, None, InheritedFlags::default(), &files, )?; let files = files.into_inner().unwrap(); let mut nodes = Mutex::new(HashSet::new()); reporting::set_stage(format!("Importing files"), files.len()); files.into_par_iter().for_each(|(path, parent, iflags)| { reporting::set_task(format!("unknown: {path:?}")); import_file(&ic, &rt, &ranks, &nodes, &plugins, &path, parent, iflags); reporting::inc_finished(); reporting::set_task("idle".to_owned()); }); for n in 1.. { let size = nodes.get_mut().unwrap().len(); if size == 0 { break; } reporting::set_stage(format!("Processing nodes (pass #{n})"), size); info!("process iter (size={})", nodes.get_mut().unwrap().len()); let mut cur_nodes = HashSet::new(); swap(nodes.get_mut().unwrap(), &mut cur_nodes); cur_nodes.into_par_iter().for_each(|node| { reporting::set_task(format!("unknown: {node}")); process_node(&ic, &rt, &ranks, &plugins, &nodes, node); reporting::inc_finished(); reporting::set_task("idle".to_owned()); }); } Ok(()) } #[derive(Debug, Clone, Copy, Default)] pub struct InheritedFlags { hidden: bool, reduced: bool, use_acoustid: bool, } fn import_traverse( path: &Path, ic: &ImportConfig, incremental: bool, parent: Option, mut iflags: InheritedFlags, out: &Mutex>, ) -> Result<()> { if path.is_dir() { reporting::set_task(format!("indexing {path:?}")); let slug = get_node_slug(ic, path).unwrap(); // Some flags need to applied immediatly because they are inherited if let Ok(content) = read_to_string(path.join("flags")) { for flag in content.lines() { match flag.trim() { "hidden" => iflags.hidden = true, "reduced" => iflags.reduced = true, "use_acoustid" => iflags.use_acoustid = true, _ => (), } } } let row = ic.update_node_slug(&slug, |mut node| { if let Some(parent) = parent { node = node.as_object().extend(NO_PARENT, [parent]); } node = node.as_object().insert( NO_VISIBILITY, if iflags.hidden { VISI_HIDDEN } else if iflags.reduced { VISI_REDUCED } else { VISI_VISIBLE }, ); node })?; path.read_dir()?.par_bridge().try_for_each(|e| { let path = e?.path(); reporting::catch( import_traverse(&path, ic, incremental, Some(row), iflags, out) .context(anyhow!("index {:?}", path.file_name().unwrap())), ); anyhow::Ok(()) })?; return Ok(()); } if path.is_file() && let Some(parent) = parent { if incremental { if !compare_mtime(ic, path)? { return Ok(()); } } reporting::inc_total(); out.lock().unwrap().push((path.to_owned(), parent, iflags)); } Ok(()) } fn import_file( ic: &ImportConfig, rt: &Handle, ranks: &SourceRanks, pending_nodes: &Mutex>, plugins: &[Box], path: &Path, parent: RowNum, iflags: InheritedFlags, ) { let mut all_ok = true; let mut ct = PluginContext { ic, rt, is: ImportSource { tag: Tag::new(b"xxxx"), ranks, }, iflags, pending_nodes, }; let filename = path.file_name().unwrap().to_string_lossy(); if filename == "flags" { let Some(content) = reporting::catch(read_to_string(path).context(anyhow!("read flags at {path:?}"))) else { return; }; pending_nodes.lock().unwrap().insert(parent); for line in content.lines() { for p in plugins { let inf = p.info(); if inf.handle_instruction { ct.is.tag = inf.tag; reporting::set_task(format!("{}(inst): {path:?}", inf.name)); all_ok &= reporting::catch( p.instruction(&ct, parent, line) .context(anyhow!("{}(inst) {path:?}", inf.name)), ) .is_some(); } } } } if filename.ends_with("mkv") || filename.ends_with("mka") || filename.ends_with("mks") { let slug = get_node_slug(ic, path).unwrap(); let Some(row) = reporting::catch(ic.update_node_slug(&slug, |mut node| { node = node.as_object().extend(NO_PARENT, [parent]); node = node.as_object().insert( NO_VISIBILITY, if iflags.hidden { VISI_HIDDEN } else if iflags.reduced { VISI_REDUCED } else { VISI_VISIBLE }, ); node })) else { return; }; pending_nodes.lock().unwrap().insert(row); let flags = filename .split(".") .collect::>() .into_iter() .skip(1) .rev() .skip(1) .rev(); for line in flags { for p in plugins { let inf = p.info(); if inf.handle_instruction { ct.is.tag = inf.tag; reporting::set_task(format!("{}(inst): {path:?}", inf.name)); all_ok &= reporting::catch( p.instruction(&ct, row, line) .context(anyhow!("{}(inst) {path:?}", inf.name)), ) .is_some(); } } } reporting::set_task(format!("demuxer meta: {path:?}")); let Some(seg) = reporting::catch( read_media_metadata(&ct.ic.cache, path).context(anyhow!("media {path:?}")), ) else { return; }; for p in plugins { let inf = p.info(); if inf.handle_media { ct.is.tag = inf.tag; reporting::set_task(format!("{}(media): {path:?}", inf.name)); all_ok &= reporting::catch( p.media(&ct, row, path, &seg) .context(anyhow!("{}(media) {path:?}", inf.name)), ) .is_some(); } } } else { for p in plugins { let inf = p.info(); if inf.handle_file { ct.is.tag = inf.tag; reporting::set_task(format!("{}(file): {path:?}", inf.name)); all_ok &= reporting::catch( p.file(&ct, parent, path) .context(anyhow!("{}(file) {path:?}", inf.name)), ) .is_some(); } } } if all_ok { reporting::catch(update_mtime(ic, path).context("updating mtime")); } } fn process_node( dba: &ImportConfig, rt: &Handle, ranks: &SourceRanks, plugins: &[Box], pending_nodes: &Mutex>, node: RowNum, ) { let mut slug = String::new(); reporting::catch(dba.db.transaction(&mut |txn| { let no = txn.get(node)?.unwrap(); if let Some(s) = no.as_object().get(NO_SLUG) { slug = s.to_owned(); } Ok(()) })); for p in plugins { let inf = p.info(); if inf.handle_process { reporting::set_task(format!("{}(proc): {slug}", inf.name)); reporting::catch( p.process( &PluginContext { ic: dba, is: ImportSource { tag: inf.tag, ranks, }, rt, iflags: InheritedFlags::default(), pending_nodes, }, node, ) .context(anyhow!("{}(proc) {slug}", inf.name)), ); } } } fn compare_mtime(dba: &ImportConfig, path: &Path) -> Result { let meta = path.metadata()?; let mtime = meta.modified()?.duration_since(UNIX_EPOCH)?.as_secs(); let mut was_changed = false; dba.db.transaction(&mut |txn| { match txn.query_single(Query { filter: Filter::Match( TagPath(vec![IM_PATH.0]), path.to_string_lossy().to_string().into(), ), sort: Sort::None, })? { None => was_changed = true, Some(row) => { let meta = txn.get(row)?.unwrap(); let prev_mtime = meta.as_object().get(IM_MTIME).unwrap_or_default(); was_changed = mtime > prev_mtime } } Ok(()) })?; Ok(was_changed) } fn update_mtime(dba: &ImportConfig, path: &Path) -> Result<()> { let meta = path.metadata()?; let mtime = meta.modified()?.duration_since(UNIX_EPOCH)?.as_secs(); dba.db.transaction(&mut |txn| { let row = match txn.query_single(Query { filter: Filter::Match( TagPath(vec![IM_PATH.0]), path.to_string_lossy().to_string().into(), ), sort: Sort::None, })? { Some(row) => row, None => txn.insert(ObjectBuffer::new(&mut [( IM_PATH.0, &path.to_string_lossy().as_bytes(), )]))?, }; let mut ob = txn.get(row)?.unwrap(); ob = ob.as_object().insert(IM_MTIME, mtime); txn.update(row, ob)?; Ok(()) })?; Ok(()) } fn get_node_slug(ic: &ImportConfig, path: &Path) -> Option { if path == ic.config.media_path { return Some("library".to_string()); } let filename = path.file_name()?.to_string_lossy(); let filestem = filename.split_once(".").unwrap_or((&filename, "")).0; if path.parent()? == &ic.config.media_path { Some(format!("{filestem}")) } else { let parent_filename = path.parent()?.file_name()?.to_string_lossy(); let parent_filestem = parent_filename .split_once(".") .unwrap_or((&parent_filename, "")) .0; Some(format!("{parent_filestem}-{filestem}")) } } pub fn read_media_metadata(cache: &Cache, path: &Path) -> Result> { cache .cache_memory( &format!("media/metadata/{}.json", HashKey(path)), move || { let media = File::open(path)?; let mut media = create_demuxer_autodetect(Box::new(media))? .ok_or(anyhow!("media format unknown"))?; let info = media.info()?; let tracks = media.tracks()?; let tags = media.tags()?; let mut attachments = media.attachments()?; let chapters = media.chapters()?; // Replace data of useful attachments with cache key; delete data of all others if let Some(attachments) = &mut attachments { for att in &mut attachments.files { if let Some(fname) = is_useful_attachment(&att) { let key = cache.store( format!("media/attachment/{}-{fname}", HashKey(path)), || Ok(att.data.clone()), )?; att.data = key.as_bytes().to_vec(); } else { att.data.clear(); } } } Ok(Segment { info, tracks, tags: tags.into_iter().collect(), attachments, chapters, ..Default::default() }) }, ) .context("reading media metadata") } pub fn is_useful_attachment(a: &AttachedFile) -> Option<&'static str> { match a { _ if is_info_json(&a) => Some("info.json"), _ if is_cover(&a) => Some("cover.image"), _ => None, } } // for tok in filename_toks { // apply_node_flag(db, rthandle, apis, node, tok)?; // } // fn apply_musicbrainz_recording( // db: &Database, // rthandle: &Handle, // apis: &Apis, // node: NodeID, // mbid: String, // ) -> Result<()> { // let rec = apis.musicbrainz.lookup_recording(mbid, rthandle)?; // db.update_node_init(node, |node| { // node.title = Some(rec.title.clone()); // node.identifiers // .insert(IdentifierType::MusicbrainzRecording, rec.id.to_string()); // if let Some(a) = rec.artist_credit.first() { // node.subtitle = Some(a.artist.name.clone()); // node.identifiers // .insert(IdentifierType::MusicbrainzArtist, a.artist.id.to_string()); // } // // // TODO proper dedup // // node.people.clear(); // for rel in &rec.relations { // use musicbrainz::reltypes::*; // let a = match rel.type_id.as_str() { // INSTRUMENT => Some(("", CreditCategory::Instrument)), // VOCAL => Some(("", CreditCategory::Vocal)), // PRODUCER => Some(("", CreditCategory::Producer)), // MIX => Some(("mix ", CreditCategory::Engineer)), // PHONOGRAPHIC_COPYRIGHT => { // Some(("phonographic copyright ", CreditCategory::Engineer)) // } // PROGRAMMING => Some(("programming ", CreditCategory::Engineer)), // _ => None, // }; // if let Some((note, group)) = a { // let artist = rel.artist.as_ref().unwrap(); // let artist = apis // .musicbrainz // .lookup_artist(artist.id.clone(), rthandle)?; // let mut image_1 = None; // let mut image_2 = None; // for rel in &artist.relations { // match rel.type_id.as_str() { // WIKIDATA => { // let url = rel.url.as_ref().unwrap().resource.clone(); // if let Some(id) = url.strip_prefix("https://www.wikidata.org/wiki/") { // if let Some(filename) = // apis.wikidata.query_image_path(id.to_owned(), rthandle)? // { // image_1 = Some( // apis.wikimedia_commons // .image_by_filename(filename, rthandle)?, // ); // } // } // } // VGMDB => { // let url = rel.url.as_ref().unwrap().resource.clone(); // if let Some(id) = url.strip_prefix("https://vgmdb.net/artist/") { // let id = id.parse::().context("parse vgmdb id")?; // if let Some(path) = apis.vgmdb.get_artist_image(id, rthandle)? { // image_2 = Some(path); // } // } // } // _ => (), // } // } // let mut jobs = vec![]; // if !note.is_empty() { // jobs.push(note.to_string()); // } // jobs.extend(rel.attributes.clone()); // let _headshot = match image_1.or(image_2) { // Some(x) => x, // None => Asset(cache_store( // format!("fallback/{}.image", HashKey(&artist.sort_name)), // || generate_fallback(&artist.sort_name), // )?), // }; // node.credits.entry(group).or_default().push(Appearance { // jobs, // characters: vec![], // node: NodeID([0; 32]), // TODO // }); // } // } // for isrc in &rec.isrcs { // node.identifiers // .insert(IdentifierType::Isrc, isrc.to_string()); // } // Ok(()) // })?; // Ok(()) // } // fn make_kebab(i: &str) -> String { // let mut o = String::with_capacity(i.len()); // for c in i.chars() { // o.extend(match c { // 'A'..='Z' | 'a'..='z' | '0'..='9' | '_' | '-' => Some(c), // ' ' => Some('-'), // _ => None, // }); // } // o // }