diff options
author | metamuffin <metamuffin@disroot.org> | 2024-01-05 01:52:00 +0100 |
---|---|---|
committer | metamuffin <metamuffin@disroot.org> | 2024-01-05 01:52:00 +0100 |
commit | 7608522c8fdf46b93877da3fee4f21df3496beec (patch) | |
tree | 33b5819d17be535d7d44feedc675ebc0da4e16d4 /import/src/lib.rs | |
parent | be510f1167f316eb18a798e6413220e7f91f782a (diff) | |
download | jellything-7608522c8fdf46b93877da3fee4f21df3496beec.tar jellything-7608522c8fdf46b93877da3fee4f21df3496beec.tar.bz2 jellything-7608522c8fdf46b93877da3fee4f21df3496beec.tar.zst |
recursive import fixes (untested)
Diffstat (limited to 'import/src/lib.rs')
-rw-r--r-- | import/src/lib.rs | 69 |
1 files changed, 44 insertions, 25 deletions
diff --git a/import/src/lib.rs b/import/src/lib.rs index 015b0b4..2dbf994 100644 --- a/import/src/lib.rs +++ b/import/src/lib.rs @@ -31,7 +31,7 @@ use std::{ ffi::OsStr, fs::File, io::BufReader, - os::unix::prelude::OsStrExt, + ops::Deref, path::{Path, PathBuf}, sync::{Arc, LazyLock}, }; @@ -95,7 +95,7 @@ pub fn generate_node_paths(db: &Database) -> anyhow::Result<()> { } nc })? - .ok_or(anyhow!("node missing"))?; + .ok_or(anyhow!("node {c:?} missing"))?; path.push(c); for c in node.public.children { @@ -132,7 +132,8 @@ pub async fn import_path( .read_dir()? .map(Result::unwrap) .filter_map(|e| { - if e.path().extension() == Some(&OsStr::from_bytes(b"yaml")) + if e.path().extension() == Some(&OsStr::new("yaml")) + || e.path().extension() == Some(&OsStr::new("jelly")) || e.metadata().unwrap().is_dir() { Some(e.path()) @@ -165,7 +166,12 @@ pub async fn import_path( k? } } else { - let opts: ImportOptions = serde_yaml::from_reader(File::open(&path)?)?; + let opts = File::open(&path).context(anyhow!("opening {path:?}"))?; + let opts: ImportOptions = if path.extension() == Some(OsStr::new("jelly")) { + serde_json::from_reader(opts).context(anyhow!("parsing json {path:?}"))? + } else { + serde_yaml::from_reader(opts).context(anyhow!("parsing yaml {path:?}"))? + }; for s in opts.sources { process_source(opts.id.clone(), s, &path, &index_path, db, fed).await?; @@ -174,6 +180,8 @@ pub async fn import_path( Ok(()) } +static SEM_IMPORT: Semaphore = Semaphore::const_new(2); + #[async_recursion] async fn process_source( id: String, @@ -183,7 +191,6 @@ async fn process_source( db: &Database, fed: &Federation, ) -> anyhow::Result<()> { - debug!("{id} enter"); let insert_node = move |id: &String, n: Node| -> anyhow::Result<()> { db.node_import.fetch_and_update(id, |l| { let mut l = l.unwrap_or_default(); @@ -246,7 +253,6 @@ async fn process_source( let media_path = location.path(); if media_path.is_dir() { let mut node = Node::default(); - let mut errors = Vec::new(); for f in media_path.read_dir()? { let f = f?; let child_path = f.path(); @@ -255,7 +261,7 @@ async fn process_source( if &inf_id == "archive" { continue; } - if let Err(err) = process_source( + process_source( inf_id.clone(), ImportSource::Media { location: match &location { @@ -274,24 +280,31 @@ async fn process_source( fed, ) .await - .context("recursive media import") - { - errors.push(err); - } + .context(anyhow!("recursive media import: {:?}", f.path()))?; node.public.children.push(inf_id); } insert_node(&id, node)?; - if !errors.is_empty() { - bail!("errors: {errors:?}"); - } } else if media_path.is_file() { - let metadata = spawn_blocking(move || { - let input = - BufReader::new(File::open(&location.path()).context("opening media file")?); - let mut input = EbmlReader::new(input); - import_metadata(&mut input) - }) - .await??; + let location_path = location.path(); + let metadata = { + let _permit = SEM_IMPORT.acquire().await.unwrap(); + spawn_blocking(move || { + cache_memory( + &["mkv-probe", location.path().to_str().unwrap()], + move || { + let input = BufReader::new( + File::open(&location.path()).context("opening media file")?, + ); + let mut input = EbmlReader::new(input); + import_metadata(&mut input) + }, + ) + }) + } + .await? + .context(anyhow!("probing {location_path:?}"))? + .deref() + .to_owned(); let mut node = Node::default(); @@ -371,11 +384,18 @@ async fn process_source( .read_dir()? .map(Result::unwrap) .map(|e| e.path()) - .filter(|e| e.extension() == Some(&OsStr::from_bytes(b"yaml"))); + .filter(|e| { + e.extension() == Some(OsStr::new("yaml")) + || e.extension() == Some(&OsStr::new("jelly")) + }); let mut children = Vec::new(); for p in paths { - let opts: ImportOptions = serde_yaml::from_reader(File::open(&p)?)?; + let opts: ImportOptions = if p.extension() == Some(OsStr::new("jelly")) { + serde_json::from_reader(File::open(&p)?)? + } else { + serde_yaml::from_reader(File::open(&p)?)? + }; if opts.id != id { children.push(opts.id); } @@ -392,7 +412,6 @@ async fn process_source( )?; } } - debug!("{id} exit"); Ok(()) } @@ -457,7 +476,7 @@ fn merge_node(x: Node, y: Node) -> Node { } } -static SEM_REMOTE_IMPORT: LazyLock<Semaphore> = LazyLock::new(|| Semaphore::new(16)); +static SEM_REMOTE_IMPORT: Semaphore = Semaphore::const_new(16); #[async_recursion] async fn import_remote( |