aboutsummaryrefslogtreecommitdiff
path: root/server/src
diff options
context:
space:
mode:
authormetamuffin <metamuffin@disroot.org>2023-08-02 23:07:55 +0200
committermetamuffin <metamuffin@disroot.org>2023-08-02 23:07:55 +0200
commit8e33fcdfbd9df042c0cfd8e9a2084993313961c9 (patch)
tree9b18183237177b6c2b7060140ed92e62581ab588 /server/src
parentc81d8bbfd46d53fba6e0086b5f859f8af8639f4a (diff)
downloadjellything-8e33fcdfbd9df042c0cfd8e9a2084993313961c9.tar
jellything-8e33fcdfbd9df042c0cfd8e9a2084993313961c9.tar.bz2
jellything-8e33fcdfbd9df042c0cfd8e9a2084993313961c9.tar.zst
federated import works but relies on private data
Diffstat (limited to 'server/src')
-rw-r--r--server/src/federation.rs55
-rw-r--r--server/src/import.rs86
-rw-r--r--server/src/main.rs13
-rw-r--r--server/src/routes/api/mod.rs5
-rw-r--r--server/src/routes/mod.rs9
-rw-r--r--server/src/routes/stream.rs29
-rw-r--r--server/src/routes/ui/account/admin.rs4
-rw-r--r--server/src/routes/ui/error.rs7
8 files changed, 160 insertions, 48 deletions
diff --git a/server/src/federation.rs b/server/src/federation.rs
new file mode 100644
index 0000000..578261b
--- /dev/null
+++ b/server/src/federation.rs
@@ -0,0 +1,55 @@
+use crate::CONF;
+use anyhow::anyhow;
+use jellyclient::{Instance, Session};
+use std::{collections::HashMap, sync::Arc, time::Duration};
+use tokio::sync::RwLock;
+
+pub struct Federation {
+ instances: HashMap<String, Instance>,
+ sessions: RwLock<HashMap<String, Arc<Session>>>,
+}
+
+impl Federation {
+ pub fn initialize() -> Self {
+ let instances = CONF
+ .remote_credentials
+ .iter()
+ .map(|(k, (_, _, tls))| (k.to_owned(), Instance::new(k.to_owned(), *tls)))
+ .collect::<HashMap<_, _>>();
+
+ Self {
+ instances,
+ sessions: Default::default(),
+ }
+ }
+
+ pub fn get_instance(&self, host: &String) -> anyhow::Result<&Instance> {
+ Ok(self
+ .instances
+ .get(host)
+ .ok_or(anyhow!("unknown instance"))?)
+ }
+ pub async fn get_session(&self, host: &String) -> anyhow::Result<Arc<Session>> {
+ let mut w = self.sessions.write().await;
+ if let Some(s) = w.get(host) {
+ Ok(s.to_owned())
+ } else {
+ let (username, password, _) = CONF
+ .remote_credentials
+ .get(host)
+ .ok_or(anyhow!("no credentials of the remote server"))?;
+ let s = Arc::new(
+ self.get_instance(host)?
+ .to_owned()
+ .login(
+ username.to_owned(),
+ password.to_owned(),
+ Duration::from_secs(60 * 60 * 24 * 356),
+ )
+ .await?,
+ );
+ w.insert(host.to_owned(), s.clone());
+ Ok(s)
+ }
+ }
+}
diff --git a/server/src/import.rs b/server/src/import.rs
index b306332..a172ad9 100644
--- a/server/src/import.rs
+++ b/server/src/import.rs
@@ -3,37 +3,42 @@
which is licensed under the GNU Affero General Public License (version 3); see /COPYING.
Copyright (C) 2023 metamuffin <metamuffin.org>
*/
-use crate::{database::Database, CONF};
-use anyhow::{bail, Context, Ok};
+use crate::{database::Database, federation::Federation, CONF};
+use anyhow::{anyhow, bail, Context, Ok};
use async_recursion::async_recursion;
-use jellycommon::Node;
-use log::info;
+use jellycommon::{Node, RemoteImportOptions};
+use log::{error, info};
use std::{ffi::OsStr, fs::File, os::unix::prelude::OsStrExt, path::PathBuf, sync::LazyLock};
use tokio::sync::Semaphore;
static IMPORT_SEM: LazyLock<Semaphore> = LazyLock::new(|| Semaphore::new(1));
-pub async fn import(db: &Database) -> anyhow::Result<()> {
+pub async fn import(db: &Database, fed: &Federation) -> anyhow::Result<()> {
info!("clearing node tree");
let permit = IMPORT_SEM.try_acquire()?;
- {
- db.node.clear()?;
- info!("importing...");
- import_path(CONF.library_path.clone(), db, None)
- .await
- .context("indexing")?;
- info!("import completed");
- }
+ db.node.clear()?;
+ info!("importing...");
+ let (_, errs) = import_path(CONF.library_path.clone(), db, fed, None)
+ .await
+ .context("indexing")?;
+ info!("import completed");
drop(permit);
- Ok(())
+ if errs == 0 {
+ Ok(())
+ } else {
+ Err(anyhow!(
+ "partial import, {errs} errors occured; see server log"
+ ))
+ }
}
#[async_recursion]
pub async fn import_path(
path: PathBuf,
db: &Database,
+ fed: &Federation,
parent: Option<String>,
-) -> anyhow::Result<Vec<String>> {
+) -> anyhow::Result<(Vec<String>, usize)> {
if path.is_dir() {
let mpath = path.join("directory.json");
let children_paths = path.read_dir()?.map(Result::unwrap).filter_map(|e| {
@@ -47,8 +52,19 @@ pub async fn import_path(
});
let identifier = path.file_name().unwrap().to_str().unwrap().to_string();
let mut children_ids = Vec::new();
+ let mut errs = 0;
for p in children_paths {
- children_ids.extend(import_path(p, db, Some(identifier.clone())).await?)
+ let k = import_path(p, db, fed, Some(identifier.clone())).await;
+ match k {
+ core::result::Result::Ok((els, errs2)) => {
+ errs += errs2;
+ children_ids.extend(els)
+ }
+ Err(e) => {
+ errs += 1;
+ error!("import failed: {e:?}")
+ }
+ }
}
if mpath.exists() {
let mut data: Node =
@@ -56,14 +72,14 @@ pub async fn import_path(
data.public.children = children_ids;
data.public.parent = parent;
- info!("insert {identifier}");
+ info!("adding {identifier}");
db.node.insert(&identifier, &data)?;
- Ok(vec![identifier])
+ Ok((vec![identifier], errs))
} else {
- Ok(children_ids)
+ Ok((children_ids, errs))
}
} else if path.is_file() {
- info!("loading item {path:?}");
+ info!("loading {path:?}");
let datafile = File::open(path.clone()).context("cant load metadata")?;
let mut data: Node = serde_json::from_reader(datafile).context("invalid metadata")?;
let identifier = path
@@ -75,11 +91,37 @@ pub async fn import_path(
.unwrap()
.to_string();
- info!("insert {identifier}");
+ if let Some(io) = data.private.import.take() {
+ let title = data.public.title;
+ data = import_remote(io, db, fed)
+ .await
+ .context("federated import")?;
+ data.public.title = title;
+ }
+
+ info!("adding {identifier}");
data.public.parent = parent;
db.node.insert(&identifier, &data)?;
- Ok(vec![identifier])
+ Ok((vec![identifier], 0))
} else {
bail!("did somebody really put a fifo or socket in the library?!")
}
}
+
+async fn import_remote(
+ opts: RemoteImportOptions,
+ db: &Database,
+ fed: &Federation,
+) -> anyhow::Result<Node> {
+ let sess = fed
+ .get_session(&opts.host)
+ .await
+ .context("creating session")?;
+ let node = sess.node(&opts.id).await.context("fetching remote node")?;
+
+ if !node.public.children.is_empty() {
+ todo!()
+ }
+
+ Ok(node)
+}
diff --git a/server/src/main.rs b/server/src/main.rs
index 5b2d070..6a75c30 100644
--- a/server/src/main.rs
+++ b/server/src/main.rs
@@ -6,13 +6,16 @@
#![feature(lazy_cell)]
use database::Database;
+use federation::Federation;
use jellycommon::config::GlobalConfig;
use jellyremuxer::RemuxerContext;
+use log::error;
use once_cell::sync::Lazy;
use routes::build_rocket;
use std::fs::File;
pub mod database;
+pub mod federation;
pub mod import;
pub mod routes;
@@ -37,7 +40,13 @@ fn main() {
async fn async_main() {
let remuxer = RemuxerContext::new();
let database = Database::open(&CONF.database_path).unwrap();
- import::import(&database).await.unwrap();
+ let federation = Federation::initialize();
database.create_admin();
- build_rocket(remuxer, database).launch().await.unwrap();
+ if let Err(err) = import::import(&database, &federation).await {
+ error!("import not sucessful: {err:?}")
+ }
+ build_rocket(remuxer, database, federation)
+ .launch()
+ .await
+ .unwrap();
}
diff --git a/server/src/routes/api/mod.rs b/server/src/routes/api/mod.rs
index e74b134..cc87525 100644
--- a/server/src/routes/api/mod.rs
+++ b/server/src/routes/api/mod.rs
@@ -4,8 +4,6 @@
Copyright (C) 2023 metamuffin <metamuffin.org>
*/
-use std::ops::Deref;
-
use super::ui::{
account::{login_logic, LoginForm},
error::MyResult,
@@ -22,6 +20,7 @@ use rocket::{
Request, State,
};
use serde_json::{json, Value};
+use std::ops::Deref;
#[get("/api")]
pub fn r_api_root() -> Redirect {
@@ -33,7 +32,7 @@ pub fn r_api_version() -> &'static str {
"2"
}
-#[post("/api/account/login", data = "<data>")]
+#[post("/api/create_session", data = "<data>")]
pub fn r_api_account_login(database: &State<Database>, data: Json<LoginForm>) -> MyResult<Value> {
let token = login_logic(database, &data.username, &data.password)?;
Ok(json!(token))
diff --git a/server/src/routes/mod.rs b/server/src/routes/mod.rs
index be4d2cb..42eae1b 100644
--- a/server/src/routes/mod.rs
+++ b/server/src/routes/mod.rs
@@ -3,7 +3,7 @@
which is licensed under the GNU Affero General Public License (version 3); see /COPYING.
Copyright (C) 2023 metamuffin <metamuffin.org>
*/
-use crate::{database::Database, routes::ui::error::MyResult, CONF};
+use crate::{database::Database, federation::Federation, routes::ui::error::MyResult, CONF};
use api::{r_api_account_login, r_api_root, r_api_version};
use base64::Engine;
use jellyremuxer::RemuxerContext;
@@ -45,7 +45,11 @@ macro_rules! uri {
};
}
-pub fn build_rocket(remuxer: RemuxerContext, database: Database) -> Rocket<Build> {
+pub fn build_rocket(
+ remuxer: RemuxerContext,
+ database: Database,
+ federation: Federation,
+) -> Rocket<Build> {
rocket::build()
.configure(Config {
secret_key: SecretKey::derive_from(
@@ -61,6 +65,7 @@ pub fn build_rocket(remuxer: RemuxerContext, database: Database) -> Rocket<Build
})
.manage(remuxer)
.manage(database)
+ .manage(federation)
.attach(AdHoc::on_response("set server header", |_req, res| {
res.set_header(Header::new("server", "jellything"));
Box::pin(async {})
diff --git a/server/src/routes/stream.rs b/server/src/routes/stream.rs
index 1277646..ee7880c 100644
--- a/server/src/routes/stream.rs
+++ b/server/src/routes/stream.rs
@@ -4,7 +4,7 @@
Copyright (C) 2023 metamuffin <metamuffin.org>
*/
use super::ui::{account::session::Session, error::MyError};
-use crate::{database::Database, CONF};
+use crate::{database::Database, federation::Federation, CONF};
use anyhow::{anyhow, Result};
use jellycommon::MediaSource;
use jellyremuxer::RemuxerContext;
@@ -30,6 +30,7 @@ pub async fn r_stream(
webm: Option<bool>,
tracks: String,
remuxer: &State<RemuxerContext>,
+ federation: &State<Federation>,
db: &State<Database>,
range: Option<RequestRange>,
) -> Result<Either<StreamResponse, Redirect>, MyError> {
@@ -47,27 +48,21 @@ pub async fn r_stream(
let source_tracks = match source {
MediaSource::Local { tracks } => tracks,
MediaSource::Remote { host, remote_id } => {
- let (username, password, tls) = CONF
+ let (username, password, _) = CONF
.remote_credentials
.get(&host)
.ok_or(anyhow!("no credentials on the server-side"))?;
- let instance = jellyclient::Instance { host, tls: *tls };
- let session = jellyclient::login(
- &instance,
- username.to_owned(),
- password.to_owned(),
- Duration::from_secs(60),
- )
- .await?;
+ let instance = federation.get_instance(&host)?.to_owned();
+ let session = instance
+ .login(
+ username.to_owned(),
+ password.to_owned(),
+ Duration::from_secs(60),
+ )
+ .await?;
- let uri = jellyclient::stream(
- &instance,
- &session,
- &remote_id,
- &tracks,
- webm.unwrap_or(false),
- );
+ let uri = session.stream(&remote_id, &tracks, webm.unwrap_or(false));
return Ok(Either::Right(Redirect::found(uri)));
}
};
diff --git a/server/src/routes/ui/account/admin.rs b/server/src/routes/ui/account/admin.rs
index 7124f4a..d0ad433 100644
--- a/server/src/routes/ui/account/admin.rs
+++ b/server/src/routes/ui/account/admin.rs
@@ -5,6 +5,7 @@
*/
use crate::{
database::Database,
+ federation::Federation,
import::import,
routes::ui::{
account::session::Session,
@@ -135,11 +136,12 @@ pub fn r_account_admin_remove_invite(
pub async fn r_account_admin_import(
session: Session,
database: &State<Database>,
+ federation: &State<Federation>,
) -> MyResult<DynLayoutPage<'static>> {
if !session.user.admin {
Err(anyhow!("you not admin"))?
}
- let r = import(&database).await;
+ let r = import(&database, &federation).await;
admin_dashboard(
&database,
Some(r.map_err(|e| e.into()).map(|_| "Import successful".into())),
diff --git a/server/src/routes/ui/error.rs b/server/src/routes/ui/error.rs
index 190650f..b538a06 100644
--- a/server/src/routes/ui/error.rs
+++ b/server/src/routes/ui/error.rs
@@ -36,7 +36,6 @@ pub fn r_api_catch<'a>(status: Status, _request: &Request) -> Value {
pub type MyResult<T> = Result<T, MyError>;
-#[derive(Debug)]
pub struct MyError(pub anyhow::Error);
impl<'r> Responder<'r, 'static> for MyError {
@@ -61,6 +60,12 @@ impl<'r> Responder<'r, 'static> for MyError {
}
}
+impl std::fmt::Debug for MyError {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ f.write_fmt(format_args!("{:?}", self.0))
+ }
+}
+
impl Display for MyError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.0.fmt(f)