/* This file is part of jellything (https://codeberg.org/metamuffin/jellything) which is licensed under the GNU Affero General Public License (version 3); see /COPYING. Copyright (C) 2025 metamuffin */ use anyhow::{anyhow, Context, Result}; use ebml_struct::{ ids::*, matroska::*, read::{EbmlReadExt, TagRead}, }; use jellybase::{assetfed::AssetInner, cache::cache_file, database::Database, CONF, SECRETS}; use jellycommon::{ Chapter, LocalTrack, MediaInfo, Node, NodeID, NodeKind, Rating, SourceTrack, SourceTrackKind, TrackSource, }; use log::{info, warn}; use rayon::iter::{ IntoParallelIterator, IntoParallelRefIterator, ParallelBridge, ParallelDrainRange, ParallelIterator, }; use regex::Regex; use std::{ collections::{HashMap, VecDeque}, fs::File, io::{BufReader, ErrorKind, Read, Write}, mem::swap, path::{Path, PathBuf}, sync::{ atomic::{AtomicUsize, Ordering}, LazyLock, }, time::UNIX_EPOCH, }; use tmdb::Tmdb; use tokio::{ sync::{RwLock, Semaphore}, task::spawn_blocking, }; use trakt::Trakt; pub mod infojson; pub mod tmdb; pub mod trakt; static IMPORT_SEM: LazyLock = LazyLock::new(|| Semaphore::new(1)); pub static IMPORT_ERRORS: RwLock> = RwLock::const_new(Vec::new()); static RE_EPISODE_FILENAME: LazyLock = LazyLock::new(|| Regex::new(r#"([sS](\d+))?([eE](\d+))( (.+))?"#).unwrap()); struct Apis { trakt: Option, tmdb: Option, } pub fn is_importing() -> bool { IMPORT_SEM.available_permits() == 0 } pub async fn import_wrap(db: Database, incremental: bool) -> Result<()> { let _sem = IMPORT_SEM.try_acquire()?; let jh = spawn_blocking(move || { if let Err(e) = import(&db, incremental) { IMPORT_ERRORS.blocking_write().push(format!("{e:#}")); } }); let _ = jh.await; Ok(()) } fn import(db: &Database, incremental: bool) -> Result<()> { let mut queue_prev = vec![CONF.media_path.clone()]; let mut queue_next; let apis = Apis { trakt: SECRETS.api.trakt.as_ref().map(|key| Trakt::new(key)), tmdb: SECRETS.api.tmdb.as_ref().map(|key| Tmdb::new(key)), }; while !queue_prev.is_empty() { queue_next = queue_prev .par_drain(..) .flat_map_iter( move |path| match import_iter_inner(&path, db, incremental) { Ok(ch) => ch, Err(e) => { IMPORT_ERRORS.blocking_write().push(format!("{e:#}")); Vec::new() } }, ) .collect::>(); swap(&mut queue_next, &mut queue_prev); } Ok(()) } fn import_iter_inner(path: &Path, db: &Database, incremental: bool) -> Result> { if path.is_dir() { let mut o = Vec::new(); for e in path.read_dir()? { o.push(e?.path()); } return Ok(o); } if path.is_file() { let meta = path.metadata()?; let mtime = meta.modified()?.duration_since(UNIX_EPOCH)?.as_secs(); if incremental { if let Some(last_mtime) = db.get_import_file_mtime(&path)? { if last_mtime >= mtime { return Ok(Vec::new()); } } } import_file(&db, &path).context(anyhow!("{path:?}"))?; db.set_import_file_mtime(&path, mtime)?; } return Ok(Vec::new()); } fn import_file(db: &Database, path: &Path) -> Result<()> { let filename = path.file_stem().unwrap().to_string_lossy(); let parent = NodeID::from_slug( &path .parent() .ok_or(anyhow!("no parent"))? .file_name() .ok_or(anyhow!("parent no filename"))? .to_string_lossy(), ); match filename.as_ref() { "poster" => { db.update_node_init(parent, |node| { node.poster = Some(AssetInner::Media(path.to_owned()).ser()); Ok(()) })?; } "backdrop" => { db.update_node_init(parent, |node| { node.backdrop = Some(AssetInner::Media(path.to_owned()).ser()); Ok(()) })?; } "info" => { let data = serde_yaml::from_reader::<_, Node>(BufReader::new(File::open(path)?))?; db.update_node_init(parent, |node| { fn merge_option(a: &mut Option, b: Option) { if b.is_some() { *a = b; } } merge_option(&mut node.title, data.title); merge_option(&mut node.tagline, data.tagline); merge_option(&mut node.description, data.description); Ok(()) })?; } _ => (), } let mut magic = [0; 4]; File::open(path)?.read_exact(&mut magic).ok(); if matches!(magic, [0x1A, 0x45, 0xDF, 0xA3]) { import_media_file(db, path, parent).context("media file")?; } Ok(()) } fn import_media_file(db: &Database, path: &Path, parent: NodeID) -> Result<()> { info!("reading media file {path:?}"); let mut file = BufReader::new(File::open(path)?); let mut file = file.by_ref().take(u64::MAX); let (x, mut ebml) = file.read_tag()?; assert_eq!(x, EL_EBML); let ebml = Ebml::read(&mut ebml).unwrap(); assert!(ebml.doc_type == "matroska" || ebml.doc_type == "webm"); let (x, mut segment) = file.read_tag()?; assert_eq!(x, EL_SEGMENT); let mut info = None; let mut infojson = None; let mut tracks = None; let mut cover = None; let mut chapters = None; let mut tags = None; loop { let (x, mut seg) = match segment.read_tag() { Ok(o) => o, Err(e) if e.kind() == ErrorKind::UnexpectedEof => break, Err(e) => return Err(e.into()), }; match x { EL_INFO => info = Some(Info::read(&mut seg).context("info")?), EL_TRACKS => tracks = Some(Tracks::read(&mut seg).context("tracks")?), EL_CHAPTERS => chapters = Some(Chapters::read(&mut seg).context("chapters")?), EL_TAGS => tags = Some(Tags::read(&mut seg).context("tags")?), EL_ATTACHMENTS => { let attachments = Attachments::read(&mut seg).context("attachments")?; for f in attachments.files { match f.name.as_str() { "info.json" => { infojson = Some( serde_json::from_slice::(&f.data) .context("infojson")?, ); } "cover.webp" | "cover.png" | "cover.jpg" | "cover.jpeg" | "cover.avif" => { cover = Some( AssetInner::Cache(cache_file( &["att-cover", path.to_string_lossy().as_ref()], move |mut file| { file.write_all(&f.data)?; Ok(()) }, )?) .ser(), ) } a => println!("{a:?}"), } } } EL_VOID | EL_CRC32 | EL_CUES | EL_SEEKHEAD => { seg.consume()?; } EL_CLUSTER => { break; } id => { eprintln!("unknown top-level element {id:x}"); seg.consume()?; } } } let info = info.ok_or(anyhow!("no info"))?; let tracks = tracks.ok_or(anyhow!("no tracks"))?; let mut tags = tags .map(|tags| { tags.tags .into_iter() .flat_map(|t| t.simple_tags) .map(|st| (st.name, st.string.unwrap_or_default())) .collect::>() }) .unwrap_or_default(); let filepath_stem = path .file_stem() .ok_or(anyhow!("no file stem"))? .to_string_lossy() .to_string(); let slug = infojson .as_ref() .map(|ij| format!("youtube-{}", ij.id)) .unwrap_or(make_kebab(&filepath_stem)); db.update_node_init(NodeID::from_slug(&slug), |node| { node.slug = slug; node.title = info.title; node.poster = cover; node.description = tags.remove("DESCRIPTION"); node.tagline = tags.remove("COMMENT"); if !node.parents.contains(&parent) { node.parents.push(parent) } if let Some(infojson) = infojson { node.kind = Some( if infojson.duration.unwrap_or(0.) < 600. && infojson.aspect_ratio.unwrap_or(2.) < 1. { NodeKind::ShortFormVideo } else { NodeKind::Video }, ); node.title = Some(infojson.title); node.description = Some(infojson.description); node.tagline = Some(infojson.webpage_url); node.release_date = Some( infojson::parse_upload_date(&infojson.upload_date) .context("parsing upload date")?, ); node.ratings .insert(Rating::YoutubeViews, infojson.view_count as f64); if let Some(lc) = infojson.like_count { node.ratings.insert(Rating::YoutubeLikes, lc as f64); } } node.media = Some(MediaInfo { chapters: chapters .map(|c| { let mut chaps = Vec::new(); if let Some(ee) = c.edition_entries.first() { for ca in &ee.chapter_atoms { let mut labels = Vec::new(); for cd in &ca.displays { for lang in &cd.languages { labels.push((lang.to_owned(), cd.string.clone())) } } chaps.push(Chapter { labels, time_start: Some(ca.time_start as f64 * 1e-9), time_end: ca.time_end.map(|ts| ts as f64 * 1e-9), }) } } chaps }) .unwrap_or_default(), duration: (info.duration.unwrap_or_default() * info.timestamp_scale as f64) * 1e-9, tracks: tracks .entries .into_iter() .map(|track| SourceTrack { codec: track.codec_id, language: track.language, name: track.name.unwrap_or_default(), default_duration: track.default_duration, federated: Vec::new(), kind: if let Some(video) = track.video { SourceTrackKind::Video { width: video.pixel_width, height: video.pixel_height, display_width: video.display_width, display_height: video.display_height, display_unit: Some(video.display_unit), fps: video.frame_rate, } } else if let Some(audio) = track.audio { SourceTrackKind::Audio { channels: audio.channels as usize, sample_rate: audio.sampling_frequency, bit_depth: audio.bit_depth.map(|r| r as usize), } } else { SourceTrackKind::Subtitles }, source: TrackSource::Local(LocalTrack { codec_private: track.codec_private, path: path.to_owned(), track: track.track_number as usize, }), }) .collect(), }); Ok(()) })?; Ok(()) } fn make_kebab(i: &str) -> String { let mut o = String::with_capacity(i.len()); for c in i.chars() { o.extend(match c { 'A'..='Z' | 'a'..='z' | '0'..='9' | '_' | '-' => Some(c), ' ' => Some('-'), _ => None, }); } o }