aboutsummaryrefslogtreecommitdiff
path: root/import
diff options
context:
space:
mode:
authormetamuffin <metamuffin@disroot.org>2025-01-30 01:27:11 +0100
committermetamuffin <metamuffin@disroot.org>2025-01-30 01:27:11 +0100
commita5a73dc868c714391e4da4a53b4e4993fc77372e (patch)
tree1bad34a616a7e20b40ef6a8100b8aa20ef5fb783 /import
parent59825d68efa1077382fd6acac73f75ae9dc3680a (diff)
downloadjellything-a5a73dc868c714391e4da4a53b4e4993fc77372e.tar
jellything-a5a73dc868c714391e4da4a53b4e4993fc77372e.tar.bz2
jellything-a5a73dc868c714391e4da4a53b4e4993fc77372e.tar.zst
parallel import
Diffstat (limited to 'import')
-rw-r--r--import/Cargo.toml3
-rw-r--r--import/src/lib.rs140
2 files changed, 98 insertions, 45 deletions
diff --git a/import/Cargo.toml b/import/Cargo.toml
index b9bd6db..d0342df 100644
--- a/import/Cargo.toml
+++ b/import/Cargo.toml
@@ -10,6 +10,9 @@ jellyclient = { path = "../client" }
ebml-struct = { git = "https://codeberg.org/metamuffin/ebml-struct" }
+rayon = "1.10.0"
+crossbeam-channel = "0.5.14"
+
log = { workspace = true }
anyhow = "1.0.95"
reqwest = { workspace = true }
diff --git a/import/src/lib.rs b/import/src/lib.rs
index 34780a8..243ac8a 100644
--- a/import/src/lib.rs
+++ b/import/src/lib.rs
@@ -11,17 +11,25 @@ use ebml_struct::{
};
use jellybase::{assetfed::AssetInner, cache::cache_file, database::Database, CONF, SECRETS};
use jellycommon::{
- Chapter, LocalTrack, MediaInfo, NodeID, NodeKind, Rating, SourceTrack, SourceTrackKind,
+ Chapter, LocalTrack, MediaInfo, Node, NodeID, NodeKind, Rating, SourceTrack, SourceTrackKind,
TrackSource,
};
-use log::info;
+use log::{info, warn};
+use rayon::iter::{
+ IntoParallelIterator, IntoParallelRefIterator, ParallelBridge, ParallelDrainRange,
+ ParallelIterator,
+};
use regex::Regex;
use std::{
collections::{HashMap, VecDeque},
fs::File,
io::{BufReader, ErrorKind, Read, Write},
- path::Path,
- sync::LazyLock,
+ mem::swap,
+ path::{Path, PathBuf},
+ sync::{
+ atomic::{AtomicUsize, Ordering},
+ LazyLock,
+ },
time::UNIX_EPOCH,
};
use tmdb::Tmdb;
@@ -54,11 +62,9 @@ pub async fn import_wrap(db: Database, incremental: bool) -> Result<()> {
let _sem = IMPORT_SEM.try_acquire()?;
let jh = spawn_blocking(move || {
- let errs = match import(&db, incremental) {
- Err(e) => vec![format!("{e:#}")],
- Ok(e) => e,
- };
- *IMPORT_ERRORS.blocking_write() = errs;
+ if let Err(e) = import(&db, incremental) {
+ IMPORT_ERRORS.blocking_write().push(format!("{e:#}"));
+ }
});
let _ = jh.await;
@@ -66,67 +72,109 @@ pub async fn import_wrap(db: Database, incremental: bool) -> Result<()> {
Ok(())
}
-fn import(db: &Database, incremental: bool) -> Result<Vec<String>> {
- let mut queue = VecDeque::from_iter(Some(CONF.media_path.clone()));
- let mut errors = Vec::new();
+fn import(db: &Database, incremental: bool) -> Result<()> {
+ let mut queue_prev = vec![CONF.media_path.clone()];
+ let mut queue_next;
let apis = Apis {
trakt: SECRETS.api.trakt.as_ref().map(|key| Trakt::new(key)),
tmdb: SECRETS.api.tmdb.as_ref().map(|key| Tmdb::new(key)),
};
- let mut num_skipped = 0;
- let mut num_imported = 0;
- while let Some(path) = queue.pop_front() {
- if path.is_dir() {
- for e in path.read_dir()? {
- queue.push_back(e?.path());
- }
+ while !queue_prev.is_empty() {
+ queue_next = queue_prev
+ .par_drain(..)
+ .flat_map_iter(
+ move |path| match import_iter_inner(&path, db, incremental) {
+ Ok(ch) => ch,
+ Err(e) => {
+ IMPORT_ERRORS.blocking_write().push(format!("{e:#}"));
+ Vec::new()
+ }
+ },
+ )
+ .collect::<Vec<_>>();
+ swap(&mut queue_next, &mut queue_prev);
+ }
+ Ok(())
+}
+
+fn import_iter_inner(path: &Path, db: &Database, incremental: bool) -> Result<Vec<PathBuf>> {
+ if path.is_dir() {
+ let mut o = Vec::new();
+ for e in path.read_dir()? {
+ o.push(e?.path());
}
- if path.is_file() {
- let meta = path.metadata()?;
- let mtime = meta
- .modified()?
- .duration_since(UNIX_EPOCH)
- .unwrap()
- .as_secs();
+ return Ok(o);
+ }
+ if path.is_file() {
+ let meta = path.metadata()?;
+ let mtime = meta.modified()?.duration_since(UNIX_EPOCH)?.as_secs();
- if incremental {
- if let Some(last_mtime) = db.get_import_file_mtime(&path)? {
- if last_mtime >= mtime {
- num_skipped += 1;
- continue;
- }
+ if incremental {
+ if let Some(last_mtime) = db.get_import_file_mtime(&path)? {
+ if last_mtime >= mtime {
+ return Ok(Vec::new());
}
}
- num_imported += 1;
- if let Err(e) = import_file(db, &path).context(anyhow!("{path:?}")) {
- errors.push(format!("{e:#}"));
- }
- db.set_import_file_mtime(&path, mtime)?;
}
+
+ import_file(&db, &path).context(anyhow!("{path:?}"))?;
+ db.set_import_file_mtime(&path, mtime)?;
}
- info!("import finished. skipped={num_skipped} imported={num_imported}");
- Ok(errors)
+ return Ok(Vec::new());
}
fn import_file(db: &Database, path: &Path) -> Result<()> {
let filename = path.file_stem().unwrap().to_string_lossy();
+ let parent = NodeID::from_slug(
+ &path
+ .parent()
+ .ok_or(anyhow!("no parent"))?
+ .file_name()
+ .ok_or(anyhow!("parent no filename"))?
+ .to_string_lossy(),
+ );
match filename.as_ref() {
- "poster" => (),
+ "poster" => {
+ db.update_node_init(parent, |node| {
+ node.poster = Some(AssetInner::Media(path.to_owned()).ser());
+ Ok(())
+ })?;
+ }
+ "backdrop" => {
+ db.update_node_init(parent, |node| {
+ node.backdrop = Some(AssetInner::Media(path.to_owned()).ser());
+ Ok(())
+ })?;
+ }
+ "info" => {
+ let data = serde_yaml::from_reader::<_, Node>(BufReader::new(File::open(path)?))?;
+ db.update_node_init(parent, |node| {
+ fn merge_option<T>(a: &mut Option<T>, b: Option<T>) {
+ if b.is_some() {
+ *a = b;
+ }
+ }
+ merge_option(&mut node.title, data.title);
+ merge_option(&mut node.tagline, data.tagline);
+ merge_option(&mut node.description, data.description);
+ Ok(())
+ })?;
+ }
_ => (),
}
let mut magic = [0; 4];
File::open(path)?.read_exact(&mut magic).ok();
if matches!(magic, [0x1A, 0x45, 0xDF, 0xA3]) {
- import_media_file(db, path).context("media file")?;
+ import_media_file(db, path, parent).context("media file")?;
}
Ok(())
}
-fn import_media_file(db: &Database, path: &Path) -> Result<()> {
+fn import_media_file(db: &Database, path: &Path, parent: NodeID) -> Result<()> {
info!("reading media file {path:?}");
let mut file = BufReader::new(File::open(path)?);
let mut file = file.by_ref().take(u64::MAX);
@@ -165,7 +213,7 @@ fn import_media_file(db: &Database, path: &Path) -> Result<()> {
.context("infojson")?,
);
}
- "cover.webp" => {
+ "cover.webp" | "cover.png" | "cover.jpg" | "cover.jpeg" | "cover.avif" => {
cover = Some(
AssetInner::Cache(cache_file(
&["att-cover", path.to_string_lossy().as_ref()],
@@ -181,7 +229,6 @@ fn import_media_file(db: &Database, path: &Path) -> Result<()> {
}
}
}
-
EL_VOID | EL_CRC32 | EL_CUES | EL_SEEKHEAD => {
seg.consume()?;
}
@@ -216,7 +263,7 @@ fn import_media_file(db: &Database, path: &Path) -> Result<()> {
let slug = infojson
.as_ref()
- .map(|ij| ij.id.to_owned())
+ .map(|ij| format!("youtube-{}", ij.id))
.unwrap_or(make_kebab(&filepath_stem));
db.update_node_init(NodeID::from_slug(&slug), |node| {
@@ -225,6 +272,9 @@ fn import_media_file(db: &Database, path: &Path) -> Result<()> {
node.poster = cover;
node.description = tags.remove("DESCRIPTION");
node.tagline = tags.remove("COMMENT");
+ if !node.parents.contains(&parent) {
+ node.parents.push(parent)
+ }
if let Some(infojson) = infojson {
node.kind = Some(
if infojson.duration.unwrap_or(0.) < 600.