aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Cargo.lock6
-rw-r--r--base/src/assetfed.rs2
-rw-r--r--common/src/config.rs4
-rw-r--r--import/Cargo.toml3
-rw-r--r--import/src/lib.rs140
-rw-r--r--server/src/routes/ui/assets.rs2
-rw-r--r--tool/src/add.rs2
7 files changed, 105 insertions, 54 deletions
diff --git a/Cargo.lock b/Cargo.lock
index 8420eb4..4ab33fa 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -652,9 +652,9 @@ dependencies = [
[[package]]
name = "crossbeam-channel"
-version = "0.5.13"
+version = "0.5.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "33480d6946193aa8033910124896ca395333cae7e2d1113d1fef6c3272217df2"
+checksum = "06ba6d68e24814cb8de6bb986db8222d3a027d15872cabc0d18817bc3c0e4471"
dependencies = [
"crossbeam-utils",
]
@@ -1748,12 +1748,14 @@ dependencies = [
"async-recursion",
"base64",
"bincode",
+ "crossbeam-channel",
"ebml-struct",
"futures",
"jellybase",
"jellyclient",
"jellycommon",
"log",
+ "rayon",
"regex",
"reqwest",
"serde",
diff --git a/base/src/assetfed.rs b/base/src/assetfed.rs
index bb39bbd..d20446b 100644
--- a/base/src/assetfed.rs
+++ b/base/src/assetfed.rs
@@ -31,7 +31,7 @@ pub enum AssetInner {
Federated { host: String, asset: Vec<u8> },
Cache(CachePath),
Assets(PathBuf),
- Library(PathBuf),
+ Media(PathBuf),
}
impl AssetInner {
diff --git a/common/src/config.rs b/common/src/config.rs
index 467a2ec..682fdd7 100644
--- a/common/src/config.rs
+++ b/common/src/config.rs
@@ -17,7 +17,6 @@ pub struct GlobalConfig {
#[serde(default = "return_true" )] pub tls: bool,
#[serde(default = "default::asset_path")] pub asset_path: PathBuf,
#[serde(default = "default::database_path")] pub database_path: PathBuf,
- #[serde(default = "default::library_path")] pub library_path: PathBuf,
#[serde(default = "default::temp_path")] pub temp_path: PathBuf,
#[serde(default = "default::cache_path")] pub cache_path: PathBuf,
#[serde(default = "default::media_path")] pub media_path: PathBuf,
@@ -74,9 +73,6 @@ mod default {
pub fn database_path() -> PathBuf {
"data/database".into()
}
- pub fn library_path() -> PathBuf {
- "data/library".into()
- }
pub fn cache_path() -> PathBuf {
"data/cache".into()
}
diff --git a/import/Cargo.toml b/import/Cargo.toml
index b9bd6db..d0342df 100644
--- a/import/Cargo.toml
+++ b/import/Cargo.toml
@@ -10,6 +10,9 @@ jellyclient = { path = "../client" }
ebml-struct = { git = "https://codeberg.org/metamuffin/ebml-struct" }
+rayon = "1.10.0"
+crossbeam-channel = "0.5.14"
+
log = { workspace = true }
anyhow = "1.0.95"
reqwest = { workspace = true }
diff --git a/import/src/lib.rs b/import/src/lib.rs
index 34780a8..243ac8a 100644
--- a/import/src/lib.rs
+++ b/import/src/lib.rs
@@ -11,17 +11,25 @@ use ebml_struct::{
};
use jellybase::{assetfed::AssetInner, cache::cache_file, database::Database, CONF, SECRETS};
use jellycommon::{
- Chapter, LocalTrack, MediaInfo, NodeID, NodeKind, Rating, SourceTrack, SourceTrackKind,
+ Chapter, LocalTrack, MediaInfo, Node, NodeID, NodeKind, Rating, SourceTrack, SourceTrackKind,
TrackSource,
};
-use log::info;
+use log::{info, warn};
+use rayon::iter::{
+ IntoParallelIterator, IntoParallelRefIterator, ParallelBridge, ParallelDrainRange,
+ ParallelIterator,
+};
use regex::Regex;
use std::{
collections::{HashMap, VecDeque},
fs::File,
io::{BufReader, ErrorKind, Read, Write},
- path::Path,
- sync::LazyLock,
+ mem::swap,
+ path::{Path, PathBuf},
+ sync::{
+ atomic::{AtomicUsize, Ordering},
+ LazyLock,
+ },
time::UNIX_EPOCH,
};
use tmdb::Tmdb;
@@ -54,11 +62,9 @@ pub async fn import_wrap(db: Database, incremental: bool) -> Result<()> {
let _sem = IMPORT_SEM.try_acquire()?;
let jh = spawn_blocking(move || {
- let errs = match import(&db, incremental) {
- Err(e) => vec![format!("{e:#}")],
- Ok(e) => e,
- };
- *IMPORT_ERRORS.blocking_write() = errs;
+ if let Err(e) = import(&db, incremental) {
+ IMPORT_ERRORS.blocking_write().push(format!("{e:#}"));
+ }
});
let _ = jh.await;
@@ -66,67 +72,109 @@ pub async fn import_wrap(db: Database, incremental: bool) -> Result<()> {
Ok(())
}
-fn import(db: &Database, incremental: bool) -> Result<Vec<String>> {
- let mut queue = VecDeque::from_iter(Some(CONF.media_path.clone()));
- let mut errors = Vec::new();
+fn import(db: &Database, incremental: bool) -> Result<()> {
+ let mut queue_prev = vec![CONF.media_path.clone()];
+ let mut queue_next;
let apis = Apis {
trakt: SECRETS.api.trakt.as_ref().map(|key| Trakt::new(key)),
tmdb: SECRETS.api.tmdb.as_ref().map(|key| Tmdb::new(key)),
};
- let mut num_skipped = 0;
- let mut num_imported = 0;
- while let Some(path) = queue.pop_front() {
- if path.is_dir() {
- for e in path.read_dir()? {
- queue.push_back(e?.path());
- }
+ while !queue_prev.is_empty() {
+ queue_next = queue_prev
+ .par_drain(..)
+ .flat_map_iter(
+ move |path| match import_iter_inner(&path, db, incremental) {
+ Ok(ch) => ch,
+ Err(e) => {
+ IMPORT_ERRORS.blocking_write().push(format!("{e:#}"));
+ Vec::new()
+ }
+ },
+ )
+ .collect::<Vec<_>>();
+ swap(&mut queue_next, &mut queue_prev);
+ }
+ Ok(())
+}
+
+fn import_iter_inner(path: &Path, db: &Database, incremental: bool) -> Result<Vec<PathBuf>> {
+ if path.is_dir() {
+ let mut o = Vec::new();
+ for e in path.read_dir()? {
+ o.push(e?.path());
}
- if path.is_file() {
- let meta = path.metadata()?;
- let mtime = meta
- .modified()?
- .duration_since(UNIX_EPOCH)
- .unwrap()
- .as_secs();
+ return Ok(o);
+ }
+ if path.is_file() {
+ let meta = path.metadata()?;
+ let mtime = meta.modified()?.duration_since(UNIX_EPOCH)?.as_secs();
- if incremental {
- if let Some(last_mtime) = db.get_import_file_mtime(&path)? {
- if last_mtime >= mtime {
- num_skipped += 1;
- continue;
- }
+ if incremental {
+ if let Some(last_mtime) = db.get_import_file_mtime(&path)? {
+ if last_mtime >= mtime {
+ return Ok(Vec::new());
}
}
- num_imported += 1;
- if let Err(e) = import_file(db, &path).context(anyhow!("{path:?}")) {
- errors.push(format!("{e:#}"));
- }
- db.set_import_file_mtime(&path, mtime)?;
}
+
+ import_file(&db, &path).context(anyhow!("{path:?}"))?;
+ db.set_import_file_mtime(&path, mtime)?;
}
- info!("import finished. skipped={num_skipped} imported={num_imported}");
- Ok(errors)
+ return Ok(Vec::new());
}
fn import_file(db: &Database, path: &Path) -> Result<()> {
let filename = path.file_stem().unwrap().to_string_lossy();
+ let parent = NodeID::from_slug(
+ &path
+ .parent()
+ .ok_or(anyhow!("no parent"))?
+ .file_name()
+ .ok_or(anyhow!("parent no filename"))?
+ .to_string_lossy(),
+ );
match filename.as_ref() {
- "poster" => (),
+ "poster" => {
+ db.update_node_init(parent, |node| {
+ node.poster = Some(AssetInner::Media(path.to_owned()).ser());
+ Ok(())
+ })?;
+ }
+ "backdrop" => {
+ db.update_node_init(parent, |node| {
+ node.backdrop = Some(AssetInner::Media(path.to_owned()).ser());
+ Ok(())
+ })?;
+ }
+ "info" => {
+ let data = serde_yaml::from_reader::<_, Node>(BufReader::new(File::open(path)?))?;
+ db.update_node_init(parent, |node| {
+ fn merge_option<T>(a: &mut Option<T>, b: Option<T>) {
+ if b.is_some() {
+ *a = b;
+ }
+ }
+ merge_option(&mut node.title, data.title);
+ merge_option(&mut node.tagline, data.tagline);
+ merge_option(&mut node.description, data.description);
+ Ok(())
+ })?;
+ }
_ => (),
}
let mut magic = [0; 4];
File::open(path)?.read_exact(&mut magic).ok();
if matches!(magic, [0x1A, 0x45, 0xDF, 0xA3]) {
- import_media_file(db, path).context("media file")?;
+ import_media_file(db, path, parent).context("media file")?;
}
Ok(())
}
-fn import_media_file(db: &Database, path: &Path) -> Result<()> {
+fn import_media_file(db: &Database, path: &Path, parent: NodeID) -> Result<()> {
info!("reading media file {path:?}");
let mut file = BufReader::new(File::open(path)?);
let mut file = file.by_ref().take(u64::MAX);
@@ -165,7 +213,7 @@ fn import_media_file(db: &Database, path: &Path) -> Result<()> {
.context("infojson")?,
);
}
- "cover.webp" => {
+ "cover.webp" | "cover.png" | "cover.jpg" | "cover.jpeg" | "cover.avif" => {
cover = Some(
AssetInner::Cache(cache_file(
&["att-cover", path.to_string_lossy().as_ref()],
@@ -181,7 +229,6 @@ fn import_media_file(db: &Database, path: &Path) -> Result<()> {
}
}
}
-
EL_VOID | EL_CRC32 | EL_CUES | EL_SEEKHEAD => {
seg.consume()?;
}
@@ -216,7 +263,7 @@ fn import_media_file(db: &Database, path: &Path) -> Result<()> {
let slug = infojson
.as_ref()
- .map(|ij| ij.id.to_owned())
+ .map(|ij| format!("youtube-{}", ij.id))
.unwrap_or(make_kebab(&filepath_stem));
db.update_node_init(NodeID::from_slug(&slug), |node| {
@@ -225,6 +272,9 @@ fn import_media_file(db: &Database, path: &Path) -> Result<()> {
node.poster = cover;
node.description = tags.remove("DESCRIPTION");
node.tagline = tags.remove("COMMENT");
+ if !node.parents.contains(&parent) {
+ node.parents.push(parent)
+ }
if let Some(infojson) = infojson {
node.kind = Some(
if infojson.duration.unwrap_or(0.) < 600.
diff --git a/server/src/routes/ui/assets.rs b/server/src/routes/ui/assets.rs
index 689c7f1..ac808b1 100644
--- a/server/src/routes/ui/assets.rs
+++ b/server/src/routes/ui/assets.rs
@@ -55,7 +55,7 @@ pub async fn resolve_asset(asset: AssetInner) -> anyhow::Result<PathBuf> {
match asset {
AssetInner::Cache(c) => Ok(c.abs()),
AssetInner::Assets(c) => Ok(CONF.asset_path.join(c)),
- AssetInner::Library(c) => Ok(CONF.library_path.join(c)),
+ AssetInner::Media(c) => Ok(CONF.media_path.join(c)),
_ => unreachable!(),
}
}
diff --git a/tool/src/add.rs b/tool/src/add.rs
index 368743e..fdaa14e 100644
--- a/tool/src/add.rs
+++ b/tool/src/add.rs
@@ -40,7 +40,7 @@ pub async fn add(action: Action) -> anyhow::Result<()> {
library_path
} else {
let mut directories = Vec::new();
- find_folders(&CONF.library_path, &PathBuf::new(), &mut directories)
+ find_folders(&CONF.media_path, &PathBuf::new(), &mut directories)
.context("listing library directories")?;
let mut default = 0;