aboutsummaryrefslogtreecommitdiff
path: root/server/src/import.rs
diff options
context:
space:
mode:
Diffstat (limited to 'server/src/import.rs')
-rw-r--r--server/src/import.rs254
1 files changed, 0 insertions, 254 deletions
diff --git a/server/src/import.rs b/server/src/import.rs
deleted file mode 100644
index dc32fbf..0000000
--- a/server/src/import.rs
+++ /dev/null
@@ -1,254 +0,0 @@
-/*
- This file is part of jellything (https://codeberg.org/metamuffin/jellything)
- which is licensed under the GNU Affero General Public License (version 3); see /COPYING.
- Copyright (C) 2023 metamuffin <metamuffin.org>
-*/
-use crate::{database::Database, federation::Federation, CONF};
-use anyhow::{anyhow, bail, Context, Ok};
-use async_recursion::async_recursion;
-use futures::{stream::FuturesUnordered, StreamExt, TryFutureExt};
-use jellybase::cache::async_cache_file;
-use jellyclient::Session;
-use jellycommon::{AssetLocation, AssetRole, MediaSource, Node, NodePrivate, RemoteImportOptions};
-use log::{debug, error, info};
-use std::{
- ffi::OsStr,
- fs::File,
- os::unix::prelude::OsStrExt,
- path::PathBuf,
- sync::{Arc, LazyLock},
-};
-use tokio::sync::Semaphore;
-
-static IMPORT_SEM: LazyLock<Semaphore> = LazyLock::new(|| Semaphore::new(1));
-
-pub async fn import(db: &Database, fed: &Federation) -> anyhow::Result<()> {
- info!("clearing node tree");
- let permit = IMPORT_SEM.try_acquire()?;
- db.node.clear()?;
- info!("importing...");
- let (_, errs) = import_path(CONF.library_path.clone(), db, fed, vec![])
- .await
- .context("indexing")?;
- info!("import completed");
- drop(permit);
- if errs == 0 {
- Ok(())
- } else {
- Err(anyhow!(
- "partial import, {errs} errors occured; see server log"
- ))
- }
-}
-
-#[async_recursion]
-pub async fn import_path(
- path: PathBuf,
- db: &Database,
- fed: &Federation,
- mut node_path: Vec<String>,
-) -> anyhow::Result<(Vec<String>, usize)> {
- if path.is_dir() {
- let mpath = path.join("directory.json");
- let children_paths = path.read_dir()?.map(Result::unwrap).filter_map(|e| {
- if e.path().extension() == Some(&OsStr::from_bytes(b"jelly"))
- || e.metadata().unwrap().is_dir()
- {
- Some(e.path())
- } else {
- None
- }
- });
- let identifier = if mpath.exists() {
- path.file_name().unwrap().to_str().unwrap().to_string()
- } else {
- node_path
- .last()
- .cloned()
- .ok_or(anyhow!("non-root node requires parent"))?
- };
-
- node_path.push(identifier.clone());
- let mut all: FuturesUnordered<_> = children_paths
- .into_iter()
- .map(|p| import_path(p.clone(), db, fed, node_path.clone()).map_err(|e| (p, e)))
- .collect();
- node_path.pop(); // we will set the dirs path later and need it to not be included
-
- let mut children_ids = Vec::new();
- let mut errs = 0;
- while let Some(k) = all.next().await {
- match k {
- core::result::Result::Ok((els, errs2)) => {
- errs += errs2;
- children_ids.extend(els)
- }
- Err((p, e)) => {
- errs += 1;
- error!("import of {p:?} failed: {e:?}")
- }
- }
- }
- if mpath.exists() {
- let mut node: Node =
- serde_json::from_reader(File::open(mpath).context("metadata missing")?)?;
-
- node.public.children = children_ids;
- node.public.path = node_path;
- node.public.id = Some(identifier.to_owned());
- info!("adding {identifier}");
- db.node.insert(&identifier, &node)?;
- Ok((vec![identifier], errs))
- } else {
- Ok((children_ids, errs))
- }
- } else if path.is_file() {
- info!("loading {path:?}");
- let datafile = File::open(path.clone()).context("cant load metadata")?;
- let mut node: Node = serde_json::from_reader(datafile).context("invalid metadata")?;
- let identifier = node.private.id.clone().unwrap_or_else(|| {
- path.file_name()
- .unwrap()
- .to_str()
- .unwrap()
- .strip_suffix(".jelly")
- .unwrap()
- .to_string()
- });
-
- let idents = if let Some(io) = node.private.import.take() {
- let session = fed
- .get_session(&io.host)
- .await
- .context("creating session")?;
-
- import_remote(io, db, &session, identifier.clone(), node_path)
- .await
- .context("federated import")?
- } else {
- debug!("adding {identifier}");
- node.public.path = node_path;
- node.public.id = Some(identifier.to_owned());
- let did_insert = db.node.insert(&identifier, &node)?.is_none();
- if did_insert {
- vec![identifier]
- } else {
- vec![]
- }
- };
- Ok((idents, 0))
- } else {
- bail!("did somebody really put a fifo or socket in the library?!")
- }
-}
-
-static SEM_REMOTE_IMPORT: LazyLock<Semaphore> = LazyLock::new(|| Semaphore::new(16));
-
-#[async_recursion]
-async fn import_remote(
- mut opts: RemoteImportOptions,
- db: &Database,
- session: &Arc<Session>,
- identifier: String,
- mut node_path: Vec<String>,
-) -> anyhow::Result<Vec<String>> {
- let _permit = SEM_REMOTE_IMPORT.acquire().await.unwrap();
- info!("loading federated node {identifier:?}");
-
- let flatten = opts.flatten;
- opts.flatten = false;
-
- let node = session
- .node(&opts.id)
- .await
- .context("fetching remote node")?;
-
- if node.federated.as_ref() == Some(&CONF.hostname) {
- return Ok(vec![]); // node is federated from us, lets not import it
- }
-
- let poster =
- cache_federation_asset(session.to_owned(), opts.id.clone(), AssetRole::Poster).await?;
- let backdrop =
- cache_federation_asset(session.to_owned(), opts.id.clone(), AssetRole::Backdrop).await?;
-
- drop(_permit);
-
- let mut did_insert = false;
- if !flatten {
- let mut node = Node {
- public: node.clone(),
- private: NodePrivate {
- backdrop: Some(backdrop),
- poster: Some(poster),
- import: None,
- id: None,
- source: Some(MediaSource::Remote {
- host: opts.host.clone(),
- remote_id: opts.id.clone(),
- }),
- },
- };
- node.public.path = node_path.clone();
- node.public.federated = Some(opts.host.clone());
- node.public
- .children
- .iter_mut()
- .for_each(|c| *c = format!("{}{c}", opts.prefix.clone().unwrap_or(String::new())));
-
- debug!("adding {identifier}");
- node.public.id = Some(identifier.to_owned());
- did_insert = db
- .node
- .fetch_and_update(&identifier, |pnode| Some(pnode.unwrap_or(node.clone())))?
- .is_none();
- node_path.push(opts.id.clone());
- }
-
- let mut children: FuturesUnordered<_> = node
- .children
- .iter()
- .map(|c| {
- let prefixed = format!("{}{c}", opts.prefix.clone().unwrap_or(String::new()));
- import_remote(
- RemoteImportOptions {
- id: c.to_owned(),
- ..opts.clone()
- },
- db,
- session,
- prefixed,
- node_path.clone(),
- )
- })
- .collect();
-
- let mut children_idents = Vec::new();
- while let Some(r) = children.next().await {
- children_idents.extend(r?);
- }
- Ok(if flatten {
- children_idents
- } else if did_insert {
- vec![identifier]
- } else {
- vec![]
- })
-}
-
-async fn cache_federation_asset(
- session: Arc<Session>,
- identifier: String,
- role: AssetRole,
-) -> anyhow::Result<AssetLocation> {
- async_cache_file(
- &["federation-asset", role.as_str(), &identifier.clone()],
- move |out| async move {
- let session = session;
- session
- .node_asset(identifier.as_str(), role, 1024, out)
- .await
- },
- )
- .await
-}