/* This file is part of jellything (https://codeberg.org/metamuffin/jellything) which is licensed under the GNU Affero General Public License (version 3); see /COPYING. Copyright (C) 2026 metamuffin */ #![feature(duration_constants, exit_status_error)] pub mod helpers; pub mod plugins; pub mod reporting; pub mod source_rank; use crate::{ plugins::{ImportPlugin, PluginContext, infojson::is_info_json, init_plugins, misc::is_cover}, source_rank::{ImportSource, SourceRanks}, }; use anyhow::{Context, Result, anyhow}; use jellycache::{Cache, HashKey}; use jellycommon::{ internal::{IM_MTIME, IM_PATH}, jellyobject::{self, ObjectBuffer, Path as TagPath, Tag}, *, }; use jellydb::{Database, Filter, Query, RowNum}; use jellyremuxer::{ demuxers::create_demuxer_autodetect, matroska::{self, AttachedFile, Segment}, }; use log::info; use rayon::{ ThreadPoolBuilder, iter::{IntoParallelIterator, ParallelBridge, ParallelIterator}, }; use serde::{Deserialize, Serialize}; use std::{ collections::HashSet, fs::{File, read_to_string}, mem::swap, path::{Path, PathBuf}, sync::{Arc, LazyLock, Mutex}, time::UNIX_EPOCH, }; use tokio::{runtime::Handle, sync::Semaphore, task::spawn_blocking}; pub use jellyimport_fallback_generator::generate_person_fallback; #[derive(Debug, Deserialize, Serialize, Default, Clone)] pub struct Config { media_path: PathBuf, api: ApiSecrets, num_threads: usize, } #[derive(Serialize, Deserialize, Debug, Default, Clone)] pub struct ApiSecrets { pub acoustid: Option, pub tmdb: Option, pub tvdb: Option, pub imdb: Option, pub omdb: Option, pub fanart_tv: Option, pub trakt: Option, } pub const USER_AGENT: &str = concat!( "jellything/", env!("CARGO_PKG_VERSION"), " +https://codeberg.org/metamuffin/jellything" ); static IMPORT_SEM: LazyLock = LazyLock::new(|| Semaphore::new(1)); pub fn is_importing() -> bool { IMPORT_SEM.available_permits() == 0 } #[derive(Clone)] pub struct ImportConfig { pub config: Config, pub cache: Arc, pub db: Arc, } fn node_slug_query<'a>(slug: &'a str) -> Query<'a> { Query { filter: Filter::Match(jellyobject::Path(vec![NO_SLUG.0]), slug.into()), ..Default::default() } } impl ImportConfig { pub fn update_node( &self, node: RowNum, mut update: impl FnMut(ObjectBuffer) -> ObjectBuffer, ) -> Result<()> { self.db.transaction(&mut |txn| { let ob_before = txn.get(node)?.unwrap(); let ob_after = update(ob_before); txn.update(node, ob_after)?; Ok(()) })?; Ok(()) } pub fn update_node_slug( &self, slug: &str, mut update: impl FnMut(ObjectBuffer) -> ObjectBuffer, ) -> Result { let mut row = 0; self.db.transaction(&mut |txn| { row = match txn.query_single(node_slug_query(slug))? { Some(r) => r, None => txn.insert(ObjectBuffer::new(&mut [(NO_SLUG.0, &slug)]))?, }; let node = txn.get(row)?.unwrap(); let node = update(node); let node = node.as_object().insert(NO_SLUG, slug); txn.update(row, node)?; Ok(()) })?; Ok(row) } pub fn get_node(&self, node: RowNum) -> Result> { let mut buf = None; self.db.transaction(&mut |txn| { buf = txn.get(node)?; Ok(()) })?; Ok(buf) } } pub async fn import_wrap(ic: ImportConfig, incremental: bool) -> Result<()> { let _sem = IMPORT_SEM.try_acquire().context("already importing")?; let rt = Handle::current(); let tp = ThreadPoolBuilder::new() .thread_name(|n| format!("import #{n}")) .num_threads(ic.config.num_threads) .build()?; let jh = spawn_blocking(move || { tp.install(move || { reporting::start_import(); reporting::catch(import(ic, &rt, incremental)); reporting::end_import(); }) }); let _ = jh.await; Ok(()) } fn import(ic: ImportConfig, rt: &Handle, incremental: bool) -> Result<()> { reporting::set_stage(format!("Initializing Plugins"), 0); let plugins = init_plugins(&ic.config.api); let ranks = SourceRanks::new(); reporting::set_stage(format!("Indexing files"), 0); let files = Mutex::new(Vec::new()); import_traverse( &ic.config.media_path, &ic, incremental, None, InheritedFlags::default(), &files, )?; let files = files.into_inner().unwrap(); let mut nodes = Mutex::new(HashSet::new()); reporting::set_stage(format!("Importing files"), files.len()); files.into_par_iter().for_each(|(path, parent, iflags)| { reporting::set_task(format!("unknown: {path:?}")); import_file(&ic, &rt, &ranks, &nodes, &plugins, &path, parent, iflags); reporting::inc_finished(); reporting::set_task("idle".to_owned()); }); for n in 1.. { let size = nodes.get_mut().unwrap().len(); if size == 0 { break; } reporting::set_stage(format!("Processing nodes (pass #{n})"), size); info!("process iter (size={})", nodes.get_mut().unwrap().len()); let mut cur_nodes = HashSet::new(); swap(nodes.get_mut().unwrap(), &mut cur_nodes); cur_nodes.into_par_iter().for_each(|node| { reporting::set_task(format!("unknown: {node}")); process_node(&ic, &rt, &ranks, &plugins, &nodes, node); reporting::inc_finished(); reporting::set_task("idle".to_owned()); }); } Ok(()) } #[derive(Debug, Clone, Copy, Default)] pub struct InheritedFlags { hidden: bool, reduced: bool, use_acoustid: bool, } fn import_traverse( path: &Path, ic: &ImportConfig, incremental: bool, parent: Option, mut iflags: InheritedFlags, out: &Mutex>, ) -> Result<()> { if path.is_dir() { reporting::set_task(format!("indexing {path:?}")); let slug = get_node_slug(ic, path).unwrap(); // Some flags need to applied immediatly because they are inherited if let Ok(content) = read_to_string(path.join("flags")) { for flag in content.lines() { match flag.trim() { "hidden" => iflags.hidden = true, "reduced" => iflags.reduced = true, "use_acoustid" => iflags.use_acoustid = true, _ => (), } } } let row = ic.update_node_slug(&slug, |mut node| { if let Some(parent) = parent { node = node.as_object().extend(NO_PARENT, [parent]); } node = node.as_object().insert( NO_VISIBILITY, if iflags.hidden { VISI_HIDDEN } else if iflags.reduced { VISI_REDUCED } else { VISI_VISIBLE }, ); node })?; path.read_dir()?.par_bridge().try_for_each(|e| { let path = e?.path(); reporting::catch( import_traverse(&path, ic, incremental, Some(row), iflags, out) .context(anyhow!("index {:?}", path.file_name().unwrap())), ); anyhow::Ok(()) })?; return Ok(()); } if path.is_file() && let Some(parent) = parent { if incremental { if !compare_mtime(ic, path)? { return Ok(()); } } reporting::inc_total(); out.lock().unwrap().push((path.to_owned(), parent, iflags)); } Ok(()) } fn import_file( ic: &ImportConfig, rt: &Handle, ranks: &SourceRanks, pending_nodes: &Mutex>, plugins: &[Box], path: &Path, parent: RowNum, iflags: InheritedFlags, ) { let mut all_ok = true; let mut ct = PluginContext { ic, rt, is: ImportSource { tag: Tag::new(b"xxxx"), ranks, }, iflags, pending_nodes, }; let filename = path.file_name().unwrap().to_string_lossy(); if filename == "flags" { let Some(content) = reporting::catch(read_to_string(path).context(anyhow!("read flags at {path:?}"))) else { return; }; pending_nodes.lock().unwrap().insert(parent); for line in content.lines() { for p in plugins { let inf = p.info(); if inf.handle_instruction { ct.is.tag = inf.tag; reporting::set_task(format!("{}(inst): {path:?}", inf.name)); all_ok &= reporting::catch( p.instruction(&ct, parent, line) .context(anyhow!("{}(inst) {path:?}", inf.name)), ) .is_some(); } } } } if filename.ends_with("mkv") || filename.ends_with("mka") || filename.ends_with("mks") { let slug = get_node_slug(ic, path).unwrap(); let Some(row) = reporting::catch(ic.update_node_slug(&slug, |mut node| { node = node.as_object().extend(NO_PARENT, [parent]); node = node.as_object().insert( NO_VISIBILITY, if iflags.hidden { VISI_HIDDEN } else if iflags.reduced { VISI_REDUCED } else { VISI_VISIBLE }, ); node })) else { return; }; pending_nodes.lock().unwrap().insert(row); let flags = filename .split(".") .collect::>() .into_iter() .skip(1) .rev() .skip(1) .rev(); for line in flags { for p in plugins { let inf = p.info(); if inf.handle_instruction { ct.is.tag = inf.tag; reporting::set_task(format!("{}(inst): {path:?}", inf.name)); all_ok &= reporting::catch( p.instruction(&ct, row, line) .context(anyhow!("{}(inst) {path:?}", inf.name)), ) .is_some(); } } } reporting::set_task(format!("demuxer meta: {path:?}")); let Some(seg) = reporting::catch( read_media_metadata(&ct.ic.cache, path).context(anyhow!("media {path:?}")), ) else { return; }; for p in plugins { let inf = p.info(); if inf.handle_media { ct.is.tag = inf.tag; reporting::set_task(format!("{}(media): {path:?}", inf.name)); all_ok &= reporting::catch( p.media(&ct, row, path, &seg) .context(anyhow!("{}(media) {path:?}", inf.name)), ) .is_some(); } } } else { for p in plugins { let inf = p.info(); if inf.handle_file { ct.is.tag = inf.tag; reporting::set_task(format!("{}(file): {path:?}", inf.name)); all_ok &= reporting::catch( p.file(&ct, parent, path) .context(anyhow!("{}(file) {path:?}", inf.name)), ) .is_some(); } } } if all_ok { reporting::catch(update_mtime(ic, path).context("updating mtime")); } } fn process_node( dba: &ImportConfig, rt: &Handle, ranks: &SourceRanks, plugins: &[Box], pending_nodes: &Mutex>, node: RowNum, ) { let mut slug = String::new(); reporting::catch(dba.db.transaction(&mut |txn| { let no = txn.get(node)?.unwrap(); if let Some(s) = no.as_object().get(NO_SLUG) { slug = s.to_owned(); } Ok(()) })); for p in plugins { let inf = p.info(); if inf.handle_process { reporting::set_task(format!("{}(proc): {slug}", inf.name)); reporting::catch( p.process( &PluginContext { ic: dba, is: ImportSource { tag: inf.tag, ranks, }, rt, iflags: InheritedFlags::default(), pending_nodes, }, node, ) .context(anyhow!("{}(proc) {slug}", inf.name)), ); } } } fn compare_mtime(dba: &ImportConfig, path: &Path) -> Result { let meta = path.metadata()?; let mtime = meta.modified()?.duration_since(UNIX_EPOCH)?.as_secs(); let mut was_changed = false; dba.db.transaction(&mut |txn| { match txn.query_single(Query { filter: Filter::Match( TagPath(vec![IM_PATH.0]), path.to_string_lossy().to_string().into(), ), ..Default::default() })? { None => was_changed = true, Some(row) => { let meta = txn.get(row)?.unwrap(); let prev_mtime = meta.as_object().get(IM_MTIME).unwrap_or_default(); was_changed = mtime > prev_mtime } } Ok(()) })?; Ok(was_changed) } fn update_mtime(dba: &ImportConfig, path: &Path) -> Result<()> { let meta = path.metadata()?; let mtime = meta.modified()?.duration_since(UNIX_EPOCH)?.as_secs(); dba.db.transaction(&mut |txn| { let row = match txn.query_single(Query { filter: Filter::Match( TagPath(vec![IM_PATH.0]), path.to_string_lossy().to_string().into(), ), ..Default::default() })? { Some(row) => row, None => txn.insert(ObjectBuffer::new(&mut [( IM_PATH.0, &path.to_string_lossy().as_bytes(), )]))?, }; let mut ob = txn.get(row)?.unwrap(); ob = ob.as_object().insert(IM_MTIME, mtime); txn.update(row, ob)?; Ok(()) })?; Ok(()) } fn get_node_slug(ic: &ImportConfig, path: &Path) -> Option { if path == ic.config.media_path { return Some("library".to_string()); } let filename = path.file_name()?.to_string_lossy(); let filestem = filename.split_once(".").unwrap_or((&filename, "")).0; if path.parent()? == &ic.config.media_path { Some(format!("{filestem}")) } else { let parent_filename = path.parent()?.file_name()?.to_string_lossy(); let parent_filestem = parent_filename .split_once(".") .unwrap_or((&parent_filename, "")) .0; Some(format!("{parent_filestem}-{filestem}")) } } pub fn read_media_metadata(cache: &Cache, path: &Path) -> Result> { cache .cache_memory( &format!("media/metadata/{}.json", HashKey(path)), move || { let media = File::open(path)?; let mut media = create_demuxer_autodetect(Box::new(media))? .ok_or(anyhow!("media format unknown"))?; let info = media.info()?; let tracks = media.tracks()?; let tags = media.tags()?; let mut attachments = media.attachments()?; let chapters = media.chapters()?; // Replace data of useful attachments with cache key; delete data of all others if let Some(attachments) = &mut attachments { for att in &mut attachments.files { if let Some(fname) = is_useful_attachment(&att) { let key = cache.store( format!("media/attachment/{}-{fname}", HashKey(path)), || Ok(att.data.clone()), )?; att.data = key.as_bytes().to_vec(); } else { att.data.clear(); } } } Ok(Segment { info, tracks, tags: tags.into_iter().collect(), attachments, chapters, ..Default::default() }) }, ) .context("reading media metadata") } pub fn is_useful_attachment(a: &AttachedFile) -> Option<&'static str> { match a { _ if is_info_json(&a) => Some("info.json"), _ if is_cover(&a) => Some("cover.image"), _ => None, } } // fn make_kebab(i: &str) -> String { // let mut o = String::with_capacity(i.len()); // for c in i.chars() { // o.extend(match c { // 'A'..='Z' | 'a'..='z' | '0'..='9' | '_' | '-' => Some(c), // ' ' => Some('-'), // _ => None, // }); // } // o // }