/* This file is part of jellything (https://codeberg.org/metamuffin/jellything) which is licensed under the GNU Affero General Public License (version 3); see /COPYING. Copyright (C) 2023 metamuffin */ use crate::{database::Database, federation::Federation, CONF}; use anyhow::{anyhow, bail, Context, Ok}; use async_recursion::async_recursion; use futures::{stream::FuturesUnordered, StreamExt, TryFutureExt}; use jellybase::cache::async_cache_file; use jellyclient::Session; use jellycommon::{AssetLocation, MediaSource, Node, NodePrivate, RemoteImportOptions}; use log::{debug, error, info}; use std::{ ffi::OsStr, fs::File, os::unix::prelude::OsStrExt, path::PathBuf, sync::{Arc, LazyLock}, }; use tokio::sync::Semaphore; static IMPORT_SEM: LazyLock = LazyLock::new(|| Semaphore::new(1)); pub async fn import(db: &Database, fed: &Federation) -> anyhow::Result<()> { info!("clearing node tree"); let permit = IMPORT_SEM.try_acquire()?; db.node.clear()?; info!("importing..."); let (_, errs) = import_path(CONF.library_path.clone(), db, fed, vec![]) .await .context("indexing")?; info!("import completed"); drop(permit); if errs == 0 { Ok(()) } else { Err(anyhow!( "partial import, {errs} errors occured; see server log" )) } } #[async_recursion] pub async fn import_path( path: PathBuf, db: &Database, fed: &Federation, mut node_path: Vec, ) -> anyhow::Result<(Vec, usize)> { if path.is_dir() { let mpath = path.join("directory.json"); let children_paths = path.read_dir()?.map(Result::unwrap).filter_map(|e| { if e.path().extension() == Some(&OsStr::from_bytes(b"jelly")) || e.metadata().unwrap().is_dir() { Some(e.path()) } else { None } }); let identifier = if mpath.exists() { path.file_name().unwrap().to_str().unwrap().to_string() } else { node_path .last() .cloned() .ok_or(anyhow!("non-root node requires parent"))? }; node_path.push(identifier.clone()); let mut all: FuturesUnordered<_> = children_paths .into_iter() .map(|p| import_path(p.clone(), db, fed, node_path.clone()).map_err(|e| (p, e))) .collect(); node_path.pop(); // we will set the dirs path later and need it to not be included let mut children_ids = Vec::new(); let mut errs = 0; while let Some(k) = all.next().await { match k { core::result::Result::Ok((els, errs2)) => { errs += errs2; children_ids.extend(els) } Err((p, e)) => { errs += 1; error!("import of {p:?} failed: {e:?}") } } } if mpath.exists() { let mut node: Node = serde_json::from_reader(File::open(mpath).context("metadata missing")?)?; node.public.children = children_ids; node.public.path = node_path; node.public.id = Some(identifier.to_owned()); info!("adding {identifier}"); db.node.insert(&identifier, &node)?; Ok((vec![identifier], errs)) } else { Ok((children_ids, errs)) } } else if path.is_file() { info!("loading {path:?}"); let datafile = File::open(path.clone()).context("cant load metadata")?; let mut node: Node = serde_json::from_reader(datafile).context("invalid metadata")?; let identifier = node.private.id.clone().unwrap_or_else(|| { path.file_name() .unwrap() .to_str() .unwrap() .strip_suffix(".jelly") .unwrap() .to_string() }); let idents = if let Some(io) = node.private.import.take() { let session = fed .get_session(&io.host) .await .context("creating session")?; import_remote(io, db, &session, identifier.clone(), node_path) .await .context("federated import")? } else { debug!("adding {identifier}"); node.public.path = node_path; node.public.id = Some(identifier.to_owned()); db.node.insert(&identifier, &node)?; vec![identifier] }; Ok((idents, 0)) } else { bail!("did somebody really put a fifo or socket in the library?!") } } static SEM_REMOTE_IMPORT: LazyLock = LazyLock::new(|| Semaphore::new(16)); #[async_recursion] async fn import_remote( mut opts: RemoteImportOptions, db: &Database, session: &Arc, identifier: String, mut node_path: Vec, ) -> anyhow::Result> { let _permit = SEM_REMOTE_IMPORT.acquire().await.unwrap(); info!("loading federated node {identifier:?}"); let flatten = opts.flatten; opts.flatten = false; let node = session .node(&opts.id) .await .context("fetching remote node")?; if node.federated.as_ref() == Some(&CONF.hostname) { return Ok(vec![]); // node is federated from us, lets not import it } let poster = cache_federation_asset(session.to_owned(), opts.id.clone(), "poster").await?; let backdrop = cache_federation_asset(session.to_owned(), opts.id.clone(), "backdrop").await?; drop(_permit); if !flatten { let mut node = Node { public: node.clone(), private: NodePrivate { backdrop: Some(backdrop), poster: Some(poster), import: None, id: None, source: Some(MediaSource::Remote { host: opts.host.clone(), remote_id: opts.id.clone(), }), }, }; node.public.path = node_path.clone(); node.public.federated = Some(opts.host.clone()); node.public .children .iter_mut() .for_each(|c| *c = format!("{}{c}", opts.prefix.clone().unwrap_or(String::new()))); debug!("adding {identifier}"); node.public.id = Some(identifier.to_owned()); db.node.insert(&identifier, &node)?; node_path.push(opts.id.clone()); } let mut children: FuturesUnordered<_> = node .children .iter() .map(|c| { let prefixed = format!("{}{c}", opts.prefix.clone().unwrap_or(String::new())); import_remote( RemoteImportOptions { id: c.to_owned(), ..opts.clone() }, db, session, prefixed, node_path.clone(), ) }) .collect(); let mut children_idents = Vec::new(); while let Some(r) = children.next().await { children_idents.extend(r?); } Ok(if flatten { children_idents } else { vec![identifier] }) } async fn cache_federation_asset( session: Arc, identifier: String, role: &'static str, ) -> anyhow::Result { async_cache_file( &["federation-asset", role, &identifier.clone()], move |out| async move { let session = session; session.node_asset(identifier.as_str(), role, out).await }, ) .await }