aboutsummaryrefslogtreecommitdiff
path: root/import/src
diff options
context:
space:
mode:
authormetamuffin <metamuffin@disroot.org>2024-01-05 01:52:00 +0100
committermetamuffin <metamuffin@disroot.org>2024-01-05 01:52:00 +0100
commit7608522c8fdf46b93877da3fee4f21df3496beec (patch)
tree33b5819d17be535d7d44feedc675ebc0da4e16d4 /import/src
parentbe510f1167f316eb18a798e6413220e7f91f782a (diff)
downloadjellything-7608522c8fdf46b93877da3fee4f21df3496beec.tar
jellything-7608522c8fdf46b93877da3fee4f21df3496beec.tar.bz2
jellything-7608522c8fdf46b93877da3fee4f21df3496beec.tar.zst
recursive import fixes (untested)
Diffstat (limited to 'import/src')
-rw-r--r--import/src/infojson.rs8
-rw-r--r--import/src/lib.rs69
2 files changed, 48 insertions, 29 deletions
diff --git a/import/src/infojson.rs b/import/src/infojson.rs
index 3f0edc9..c18c19d 100644
--- a/import/src/infojson.rs
+++ b/import/src/infojson.rs
@@ -32,11 +32,11 @@ pub struct YVideo {
pub heatmap: Option<Vec<YHeatmapSample>>,
pub like_count: Option<usize>,
pub channel: Option<String>,
- pub channel_follower_count: usize,
+ pub channel_follower_count: Option<usize>,
pub channel_is_verified: Option<bool>,
- pub uploader: String,
- pub uploader_id: String,
- pub uploader_url: String,
+ pub uploader: Option<String>,
+ pub uploader_id: Option<String>,
+ pub uploader_url: Option<String>,
pub upload_date: String,
pub availability: String, // "public" | "private" | "unlisted",
pub original_url: Option<String>,
diff --git a/import/src/lib.rs b/import/src/lib.rs
index 015b0b4..2dbf994 100644
--- a/import/src/lib.rs
+++ b/import/src/lib.rs
@@ -31,7 +31,7 @@ use std::{
ffi::OsStr,
fs::File,
io::BufReader,
- os::unix::prelude::OsStrExt,
+ ops::Deref,
path::{Path, PathBuf},
sync::{Arc, LazyLock},
};
@@ -95,7 +95,7 @@ pub fn generate_node_paths(db: &Database) -> anyhow::Result<()> {
}
nc
})?
- .ok_or(anyhow!("node missing"))?;
+ .ok_or(anyhow!("node {c:?} missing"))?;
path.push(c);
for c in node.public.children {
@@ -132,7 +132,8 @@ pub async fn import_path(
.read_dir()?
.map(Result::unwrap)
.filter_map(|e| {
- if e.path().extension() == Some(&OsStr::from_bytes(b"yaml"))
+ if e.path().extension() == Some(&OsStr::new("yaml"))
+ || e.path().extension() == Some(&OsStr::new("jelly"))
|| e.metadata().unwrap().is_dir()
{
Some(e.path())
@@ -165,7 +166,12 @@ pub async fn import_path(
k?
}
} else {
- let opts: ImportOptions = serde_yaml::from_reader(File::open(&path)?)?;
+ let opts = File::open(&path).context(anyhow!("opening {path:?}"))?;
+ let opts: ImportOptions = if path.extension() == Some(OsStr::new("jelly")) {
+ serde_json::from_reader(opts).context(anyhow!("parsing json {path:?}"))?
+ } else {
+ serde_yaml::from_reader(opts).context(anyhow!("parsing yaml {path:?}"))?
+ };
for s in opts.sources {
process_source(opts.id.clone(), s, &path, &index_path, db, fed).await?;
@@ -174,6 +180,8 @@ pub async fn import_path(
Ok(())
}
+static SEM_IMPORT: Semaphore = Semaphore::const_new(2);
+
#[async_recursion]
async fn process_source(
id: String,
@@ -183,7 +191,6 @@ async fn process_source(
db: &Database,
fed: &Federation,
) -> anyhow::Result<()> {
- debug!("{id} enter");
let insert_node = move |id: &String, n: Node| -> anyhow::Result<()> {
db.node_import.fetch_and_update(id, |l| {
let mut l = l.unwrap_or_default();
@@ -246,7 +253,6 @@ async fn process_source(
let media_path = location.path();
if media_path.is_dir() {
let mut node = Node::default();
- let mut errors = Vec::new();
for f in media_path.read_dir()? {
let f = f?;
let child_path = f.path();
@@ -255,7 +261,7 @@ async fn process_source(
if &inf_id == "archive" {
continue;
}
- if let Err(err) = process_source(
+ process_source(
inf_id.clone(),
ImportSource::Media {
location: match &location {
@@ -274,24 +280,31 @@ async fn process_source(
fed,
)
.await
- .context("recursive media import")
- {
- errors.push(err);
- }
+ .context(anyhow!("recursive media import: {:?}", f.path()))?;
node.public.children.push(inf_id);
}
insert_node(&id, node)?;
- if !errors.is_empty() {
- bail!("errors: {errors:?}");
- }
} else if media_path.is_file() {
- let metadata = spawn_blocking(move || {
- let input =
- BufReader::new(File::open(&location.path()).context("opening media file")?);
- let mut input = EbmlReader::new(input);
- import_metadata(&mut input)
- })
- .await??;
+ let location_path = location.path();
+ let metadata = {
+ let _permit = SEM_IMPORT.acquire().await.unwrap();
+ spawn_blocking(move || {
+ cache_memory(
+ &["mkv-probe", location.path().to_str().unwrap()],
+ move || {
+ let input = BufReader::new(
+ File::open(&location.path()).context("opening media file")?,
+ );
+ let mut input = EbmlReader::new(input);
+ import_metadata(&mut input)
+ },
+ )
+ })
+ }
+ .await?
+ .context(anyhow!("probing {location_path:?}"))?
+ .deref()
+ .to_owned();
let mut node = Node::default();
@@ -371,11 +384,18 @@ async fn process_source(
.read_dir()?
.map(Result::unwrap)
.map(|e| e.path())
- .filter(|e| e.extension() == Some(&OsStr::from_bytes(b"yaml")));
+ .filter(|e| {
+ e.extension() == Some(OsStr::new("yaml"))
+ || e.extension() == Some(&OsStr::new("jelly"))
+ });
let mut children = Vec::new();
for p in paths {
- let opts: ImportOptions = serde_yaml::from_reader(File::open(&p)?)?;
+ let opts: ImportOptions = if p.extension() == Some(OsStr::new("jelly")) {
+ serde_json::from_reader(File::open(&p)?)?
+ } else {
+ serde_yaml::from_reader(File::open(&p)?)?
+ };
if opts.id != id {
children.push(opts.id);
}
@@ -392,7 +412,6 @@ async fn process_source(
)?;
}
}
- debug!("{id} exit");
Ok(())
}
@@ -457,7 +476,7 @@ fn merge_node(x: Node, y: Node) -> Node {
}
}
-static SEM_REMOTE_IMPORT: LazyLock<Semaphore> = LazyLock::new(|| Semaphore::new(16));
+static SEM_REMOTE_IMPORT: Semaphore = Semaphore::const_new(16);
#[async_recursion]
async fn import_remote(