/* This file is part of jellything (https://codeberg.org/metamuffin/jellything) which is licensed under the GNU Affero General Public License (version 3); see /COPYING. Copyright (C) 2026 metamuffin */ #![feature(duration_constants)] pub mod plugins; pub mod reporting; use crate::{ plugins::{ImportContext, ImportPlugin, infojson::is_info_json, init_plugins, misc::is_cover}, reporting::IMPORT_PROGRESS, }; use anyhow::{Context, Result, anyhow}; use jellycache::{HashKey, cache_memory, cache_store}; use jellycommon::{NodeID, Visibility}; use jellydb::Database; use jellyremuxer::{ demuxers::create_demuxer_autodetect, matroska::{self, AttachedFile, Segment}, }; use log::info; use rayon::{ ThreadPoolBuilder, iter::{IntoParallelIterator, ParallelBridge, ParallelIterator}, }; use serde::{Deserialize, Serialize}; use std::{ collections::HashSet, fs::{File, read_to_string}, mem::swap, path::{Path, PathBuf}, sync::{Arc, LazyLock, Mutex}, time::UNIX_EPOCH, }; use tokio::{runtime::Handle, sync::Semaphore, task::spawn_blocking}; #[rustfmt::skip] #[derive(Debug, Deserialize, Serialize, Default)] pub struct Config { media_path: PathBuf, api: ApiSecrets, num_threads: usize } #[derive(Serialize, Deserialize, Debug, Default)] pub struct ApiSecrets { pub acoustid: Option, pub tmdb: Option, pub tvdb: Option, pub imdb: Option, pub omdb: Option, pub fanart_tv: Option, pub trakt: Option, } pub static CONF_PRELOAD: Mutex> = Mutex::new(None); static CONF: LazyLock = LazyLock::new(|| { CONF_PRELOAD .lock() .unwrap() .take() .expect("import config not preloaded. logic error") }); pub const USER_AGENT: &str = concat!( "jellything/", env!("CARGO_PKG_VERSION"), " +https://codeberg.org/metamuffin/jellything" ); static IMPORT_SEM: LazyLock = LazyLock::new(|| Semaphore::new(1)); pub fn is_importing() -> bool { IMPORT_SEM.available_permits() == 0 } pub async fn import_wrap(db: Database, incremental: bool) -> Result<()> { let _sem = IMPORT_SEM.try_acquire().context("already importing")?; let rt = Handle::current(); let tp = ThreadPoolBuilder::new() .thread_name(|n| format!("import #{n}")) .num_threads(CONF.num_threads) .build()?; let jh = spawn_blocking(move || { tp.install(|| { reporting::start_import(); reporting::catch(import(&db, &rt, incremental)); reporting::end_import(); }) }); let _ = jh.await; Ok(()) } fn import(db: &Database, rt: &Handle, incremental: bool) -> Result<()> { let plugins = init_plugins(&CONF.api); let files = Mutex::new(Vec::new()); import_traverse( &CONF.media_path, db, incremental, NodeID::MIN, InheritedFlags::default(), &files, )?; let files = files.into_inner().unwrap(); let mut nodes = Mutex::new(HashSet::new()); files.into_par_iter().for_each(|(path, parent, iflags)| { reporting::set_task(format!("unknown: {path:?}")); import_file(db, &rt, &nodes, &plugins, &path, parent, iflags); IMPORT_PROGRESS .blocking_write() .as_mut() .unwrap() .finished_items += 1; reporting::set_task("idle".to_owned()); }); IMPORT_PROGRESS .blocking_write() .as_mut() .unwrap() .finished_items = 0; while !nodes.get_mut().unwrap().is_empty() { info!("process iter (size={})", nodes.get_mut().unwrap().len()); let mut cur_nodes = HashSet::new(); swap(nodes.get_mut().unwrap(), &mut cur_nodes); cur_nodes.into_par_iter().for_each(|node| { reporting::set_task(format!("unknown: {node}")); process_node(db, &rt, &plugins, &nodes, node); IMPORT_PROGRESS .blocking_write() .as_mut() .unwrap() .finished_items += 1; reporting::set_task("idle".to_owned()); }); } Ok(()) } #[derive(Debug, Clone, Copy)] pub struct InheritedFlags { visibility: Visibility, use_acoustid: bool, } impl Default for InheritedFlags { fn default() -> Self { Self { visibility: Visibility::Visible, use_acoustid: false, } } } fn import_traverse( path: &Path, db: &Database, incremental: bool, parent: NodeID, mut iflags: InheritedFlags, out: &Mutex>, ) -> Result<()> { if path.is_dir() { reporting::set_task(format!("indexing {path:?}")); let slug = get_node_slug(path).unwrap(); let node = NodeID::from_slug(&slug); // Some flags need to applied immediatly because they are inherited if let Ok(content) = read_to_string(path.join("flags")) { for flag in content.lines() { match flag.trim() { "hidden" => iflags.visibility = iflags.visibility.min(Visibility::Hidden), "reduced" => iflags.visibility = iflags.visibility.min(Visibility::Reduced), "use_acoustid" => iflags.use_acoustid = true, _ => (), } } } db.update_node_init(node, |n| { if parent != NodeID::MIN { n.parents.insert(parent); } n.slug = slug; n.visibility = iflags.visibility; })?; path.read_dir()?.par_bridge().try_for_each(|e| { let path = e?.path(); reporting::catch( import_traverse(&path, db, incremental, node, iflags, out) .context(anyhow!("index {:?}", path.file_name().unwrap())), ); anyhow::Ok(()) })?; return Ok(()); } if path.is_file() { let meta = path.metadata()?; let mtime = meta.modified()?.duration_since(UNIX_EPOCH)?.as_secs(); if incremental { if let Some(last_mtime) = db.get_import_file_mtime(path)? { if last_mtime >= mtime { return Ok(()); } } } IMPORT_PROGRESS .blocking_write() .as_mut() .unwrap() .total_items += 1; out.lock().unwrap().push((path.to_owned(), parent, iflags)); } Ok(()) } fn import_file( db: &Database, rt: &Handle, nodes: &Mutex>, plugins: &[Box], path: &Path, parent: NodeID, iflags: InheritedFlags, ) { let mut all_ok = true; let ct = ImportContext { db, rt, iflags, nodes, }; let filename = path.file_name().unwrap().to_string_lossy(); if filename == "flags" { let Some(content) = reporting::catch(read_to_string(path).context(anyhow!("read flags at {path:?}"))) else { return; }; nodes.lock().unwrap().insert(parent); for line in content.lines() { for p in plugins { let inf = p.info(); if inf.handle_instruction { reporting::set_task(format!("{}(inst): {path:?}", inf.name)); all_ok &= reporting::catch( p.instruction(&ct, parent, line) .context(anyhow!("{}(inst) {path:?}", inf.name)), ) .is_some(); } } } } if filename.ends_with("mkv") || filename.ends_with("mka") || filename.ends_with("mks") { let slug = get_node_slug(path).unwrap(); let node = NodeID::from_slug(&slug); nodes.lock().unwrap().insert(node); all_ok &= reporting::catch(db.update_node_init(node, |node| { node.slug = slug; if parent != NodeID::MIN { node.parents.insert(parent); } node.visibility = iflags.visibility; })) .is_some(); let flags = filename .split(".") .collect::>() .into_iter() .skip(1) .rev() .skip(1) .rev(); for line in flags { for p in plugins { let inf = p.info(); if inf.handle_instruction { reporting::set_task(format!("{}(inst): {path:?}", inf.name)); all_ok &= reporting::catch( p.instruction(&ct, node, line) .context(anyhow!("{}(inst) {path:?}", inf.name)), ) .is_some(); } } } reporting::set_task(format!("demuxer meta: {path:?}")); let Some(seg) = reporting::catch(read_media_metadata(path).context(anyhow!("media {path:?}"))) else { return; }; for p in plugins { let inf = p.info(); if inf.handle_media { reporting::set_task(format!("{}(media): {path:?}", inf.name)); all_ok &= reporting::catch( p.media(&ct, node, path, &seg) .context(anyhow!("{}(media) {path:?}", inf.name)), ) .is_some(); } } } else { for p in plugins { let inf = p.info(); if inf.handle_file { reporting::set_task(format!("{}(file): {path:?}", inf.name)); all_ok &= reporting::catch( p.file(&ct, parent, path) .context(anyhow!("{}(file) {path:?}", inf.name)), ) .is_some(); } } } if all_ok { reporting::catch(update_mtime(db, path).context("updating mtime")); } } fn process_node( db: &Database, rt: &Handle, plugins: &[Box], nodes: &Mutex>, node: NodeID, ) { let Some(data) = reporting::catch( db.get_node(node) .and_then(|e| e.ok_or(anyhow!("node missing"))), ) else { return; }; let slug = &data.slug; for p in plugins { let inf = p.info(); if inf.handle_process { reporting::set_task(format!("{}(proc): {slug}", inf.name)); let Some(data) = reporting::catch( db.get_node(node) .and_then(|e| e.ok_or(anyhow!("node missing"))), ) else { return; }; reporting::catch( p.process( &ImportContext { db, rt, iflags: InheritedFlags::default(), nodes, }, node, &data, ) .context(anyhow!("{}(proc) {slug}", inf.name)), ); } } } fn update_mtime(db: &Database, path: &Path) -> Result<()> { let meta = path.metadata()?; let mtime = meta.modified()?.duration_since(UNIX_EPOCH)?.as_secs(); db.set_import_file_mtime(path, mtime)?; Ok(()) } fn get_node_slug(path: &Path) -> Option { if path == CONF.media_path { return Some("library".to_string()); } let filename = path.file_name()?.to_string_lossy(); let filestem = filename.split_once(".").unwrap_or((&filename, "")).0; if path.parent()? == &CONF.media_path { Some(format!("{filestem}")) } else { let parent_filename = path.parent()?.file_name()?.to_string_lossy(); let parent_filestem = parent_filename .split_once(".") .unwrap_or((&parent_filename, "")) .0; Some(format!("{parent_filestem}-{filestem}")) } } pub fn read_media_metadata(path: &Path) -> Result> { cache_memory( &format!("media/metadata/{}.json", HashKey(path)), move || { let media = File::open(path)?; let mut media = create_demuxer_autodetect(Box::new(media))? .ok_or(anyhow!("media format unknown"))?; let info = media.info()?; let tracks = media.tracks()?; let tags = media.tags()?; let mut attachments = media.attachments()?; let chapters = media.chapters()?; // Replace data of useful attachments with cache key; delete data of all others if let Some(attachments) = &mut attachments { for att in &mut attachments.files { if let Some(fname) = is_useful_attachment(&att) { let key = cache_store( format!("media/attachment/{}-{fname}", HashKey(path)), || Ok(att.data.clone()), )?; att.data = key.as_bytes().to_vec(); } else { att.data.clear(); } } } Ok(Segment { info, tracks, tags: tags.into_iter().collect(), attachments, chapters, ..Default::default() }) }, ) .context("reading media metadata") } pub fn is_useful_attachment(a: &AttachedFile) -> Option<&'static str> { match a { _ if is_info_json(&a) => Some("info.json"), _ if is_cover(&a) => Some("cover.image"), _ => None, } } // for tok in filename_toks { // apply_node_flag(db, rthandle, apis, node, tok)?; // } // fn apply_musicbrainz_recording( // db: &Database, // rthandle: &Handle, // apis: &Apis, // node: NodeID, // mbid: String, // ) -> Result<()> { // let rec = apis.musicbrainz.lookup_recording(mbid, rthandle)?; // db.update_node_init(node, |node| { // node.title = Some(rec.title.clone()); // node.identifiers // .insert(IdentifierType::MusicbrainzRecording, rec.id.to_string()); // if let Some(a) = rec.artist_credit.first() { // node.subtitle = Some(a.artist.name.clone()); // node.identifiers // .insert(IdentifierType::MusicbrainzArtist, a.artist.id.to_string()); // } // // // TODO proper dedup // // node.people.clear(); // for rel in &rec.relations { // use musicbrainz::reltypes::*; // let a = match rel.type_id.as_str() { // INSTRUMENT => Some(("", CreditCategory::Instrument)), // VOCAL => Some(("", CreditCategory::Vocal)), // PRODUCER => Some(("", CreditCategory::Producer)), // MIX => Some(("mix ", CreditCategory::Engineer)), // PHONOGRAPHIC_COPYRIGHT => { // Some(("phonographic copyright ", CreditCategory::Engineer)) // } // PROGRAMMING => Some(("programming ", CreditCategory::Engineer)), // _ => None, // }; // if let Some((note, group)) = a { // let artist = rel.artist.as_ref().unwrap(); // let artist = apis // .musicbrainz // .lookup_artist(artist.id.clone(), rthandle)?; // let mut image_1 = None; // let mut image_2 = None; // for rel in &artist.relations { // match rel.type_id.as_str() { // WIKIDATA => { // let url = rel.url.as_ref().unwrap().resource.clone(); // if let Some(id) = url.strip_prefix("https://www.wikidata.org/wiki/") { // if let Some(filename) = // apis.wikidata.query_image_path(id.to_owned(), rthandle)? // { // image_1 = Some( // apis.wikimedia_commons // .image_by_filename(filename, rthandle)?, // ); // } // } // } // VGMDB => { // let url = rel.url.as_ref().unwrap().resource.clone(); // if let Some(id) = url.strip_prefix("https://vgmdb.net/artist/") { // let id = id.parse::().context("parse vgmdb id")?; // if let Some(path) = apis.vgmdb.get_artist_image(id, rthandle)? { // image_2 = Some(path); // } // } // } // _ => (), // } // } // let mut jobs = vec![]; // if !note.is_empty() { // jobs.push(note.to_string()); // } // jobs.extend(rel.attributes.clone()); // let _headshot = match image_1.or(image_2) { // Some(x) => x, // None => Asset(cache_store( // format!("fallback/{}.image", HashKey(&artist.sort_name)), // || generate_fallback(&artist.sort_name), // )?), // }; // node.credits.entry(group).or_default().push(Appearance { // jobs, // characters: vec![], // node: NodeID([0; 32]), // TODO // }); // } // } // for isrc in &rec.isrcs { // node.identifiers // .insert(IdentifierType::Isrc, isrc.to_string()); // } // Ok(()) // })?; // Ok(()) // } // fn make_kebab(i: &str) -> String { // let mut o = String::with_capacity(i.len()); // for c in i.chars() { // o.extend(match c { // 'A'..='Z' | 'a'..='z' | '0'..='9' | '_' | '-' => Some(c), // ' ' => Some('-'), // _ => None, // }); // } // o // }