aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--client/src/lib.rs128
-rw-r--r--common/src/lib.rs4
-rw-r--r--server/src/federation.rs55
-rw-r--r--server/src/import.rs86
-rw-r--r--server/src/main.rs13
-rw-r--r--server/src/routes/api/mod.rs5
-rw-r--r--server/src/routes/mod.rs9
-rw-r--r--server/src/routes/stream.rs29
-rw-r--r--server/src/routes/ui/account/admin.rs4
-rw-r--r--server/src/routes/ui/error.rs7
10 files changed, 244 insertions, 96 deletions
diff --git a/client/src/lib.rs b/client/src/lib.rs
index a4d0b01..4b111d1 100644
--- a/client/src/lib.rs
+++ b/client/src/lib.rs
@@ -1,14 +1,22 @@
-use reqwest::Client;
+use anyhow::Result;
+use jellycommon::Node;
+use reqwest::{
+ header::{HeaderMap, HeaderValue},
+ Client,
+};
use serde_json::json;
use std::time::Duration;
+#[derive(Debug, Clone)]
pub struct Instance {
pub host: String,
pub tls: bool,
}
-pub struct Session(pub String);
impl Instance {
+ pub fn new(host: String, tls: bool) -> Self {
+ Self { host, tls }
+ }
pub fn base(&self) -> String {
format!(
"{}://{}",
@@ -16,55 +24,83 @@ impl Instance {
self.host
)
}
-}
-impl Session {
- pub fn session_param(&self) -> String {
- format!("session={}", self.0)
+ pub async fn login(
+ self,
+ username: String,
+ password: String,
+ expire: Duration,
+ ) -> anyhow::Result<Session> {
+ let session_token = Client::builder()
+ .build()?
+ .post(format!("{}/api/create_session", self.base()))
+ .json(&json!({
+ "expire": expire.as_secs(),
+ "password": password,
+ "username": username,
+ }))
+ .send()
+ .await?
+ .json()
+ .await?;
+
+ let mut headers = HeaderMap::new();
+ headers.insert(
+ "Cookie",
+ HeaderValue::from_str(&format!("session={session_token}")).unwrap(),
+ );
+ headers.insert("Accept", HeaderValue::from_static("application/json"));
+
+ Ok(Session {
+ instance: self,
+ session_token,
+ client: Client::builder().default_headers(headers).build().unwrap(),
+ })
}
}
-pub fn stream(
- instance: &Instance,
- session: &Session,
- id: &str,
- tracks: &[usize],
- webm: bool,
-) -> String {
- format!(
- "{}/n/{}/stream?tracks={}&webm={}&{}",
- instance.base(),
- id,
- tracks
- .iter()
- .map(|v| format!("{v}"))
- .collect::<Vec<_>>()
- .join(","),
- if webm { "1" } else { "0" },
- session.session_param()
- )
+pub struct Session {
+ client: Client,
+ instance: Instance,
+ session_token: String,
}
-pub async fn login(
- instance: &Instance,
- username: String,
- password: String,
- expire: Duration,
-) -> anyhow::Result<Session> {
- let p = serde_json::to_string(&json!({
- "expire": expire.as_secs(),
- "password": password,
- "username": username,
- }))
- .unwrap();
+impl Session {
+ fn session_param(&self) -> String {
+ format!("session={}", self.session_token)
+ }
+
+ pub async fn node(&self, id: &str) -> Result<Node> {
+ Ok(self
+ .client
+ .get(format!("{}/n/{id}", self.instance.base()))
+ .send()
+ .await?
+ .json()
+ .await?)
+ }
- let r = Client::builder()
- .build()?
- .post(format!("{}/api/account/login", instance.base()))
- .body(p)
- .send()
- .await?
- .json()
- .await?;
+ // pub async fn node_asset(&self, id: &str, role: AssetRole) -> Result<Node> {
+ // Ok(self
+ // .client
+ // .get(format!("/n/{id}"))
+ // .send()
+ // .await?
+ // .bytes()
+ // .await?)
+ // }
- Ok(Session(r))
+ pub fn stream(&self, id: &str, tracks: &[usize], webm: bool) -> String {
+ format!(
+ "{}/n/{}/stream?tracks={}&webm={}&{}",
+ self.instance.base(),
+ id,
+ tracks
+ .iter()
+ .map(|v| format!("{v}"))
+ .collect::<Vec<_>>()
+ .join(","),
+ if webm { "1" } else { "0" },
+ self.session_param()
+ )
+ }
}
diff --git a/common/src/lib.rs b/common/src/lib.rs
index 34de946..6f6caf5 100644
--- a/common/src/lib.rs
+++ b/common/src/lib.rs
@@ -19,7 +19,7 @@ pub struct Node {
#[rustfmt::skip]
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct NodePrivate {
- #[serde(default)] pub import: Option<ImportOptions>,
+ #[serde(default)] pub import: Option<RemoteImportOptions>,
#[serde(default)] pub poster: Option<PathBuf>,
#[serde(default)] pub backdrop: Option<PathBuf>,
#[serde(default)] pub source: Option<MediaSource>,
@@ -40,7 +40,7 @@ pub struct NodePublic {
#[rustfmt::skip]
#[derive(Debug, Clone, Deserialize, Serialize)]
-pub struct ImportOptions {
+pub struct RemoteImportOptions {
pub host: String,
pub id: String,
#[serde(default)] pub prefix: Option<String>,
diff --git a/server/src/federation.rs b/server/src/federation.rs
new file mode 100644
index 0000000..578261b
--- /dev/null
+++ b/server/src/federation.rs
@@ -0,0 +1,55 @@
+use crate::CONF;
+use anyhow::anyhow;
+use jellyclient::{Instance, Session};
+use std::{collections::HashMap, sync::Arc, time::Duration};
+use tokio::sync::RwLock;
+
+pub struct Federation {
+ instances: HashMap<String, Instance>,
+ sessions: RwLock<HashMap<String, Arc<Session>>>,
+}
+
+impl Federation {
+ pub fn initialize() -> Self {
+ let instances = CONF
+ .remote_credentials
+ .iter()
+ .map(|(k, (_, _, tls))| (k.to_owned(), Instance::new(k.to_owned(), *tls)))
+ .collect::<HashMap<_, _>>();
+
+ Self {
+ instances,
+ sessions: Default::default(),
+ }
+ }
+
+ pub fn get_instance(&self, host: &String) -> anyhow::Result<&Instance> {
+ Ok(self
+ .instances
+ .get(host)
+ .ok_or(anyhow!("unknown instance"))?)
+ }
+ pub async fn get_session(&self, host: &String) -> anyhow::Result<Arc<Session>> {
+ let mut w = self.sessions.write().await;
+ if let Some(s) = w.get(host) {
+ Ok(s.to_owned())
+ } else {
+ let (username, password, _) = CONF
+ .remote_credentials
+ .get(host)
+ .ok_or(anyhow!("no credentials of the remote server"))?;
+ let s = Arc::new(
+ self.get_instance(host)?
+ .to_owned()
+ .login(
+ username.to_owned(),
+ password.to_owned(),
+ Duration::from_secs(60 * 60 * 24 * 356),
+ )
+ .await?,
+ );
+ w.insert(host.to_owned(), s.clone());
+ Ok(s)
+ }
+ }
+}
diff --git a/server/src/import.rs b/server/src/import.rs
index b306332..a172ad9 100644
--- a/server/src/import.rs
+++ b/server/src/import.rs
@@ -3,37 +3,42 @@
which is licensed under the GNU Affero General Public License (version 3); see /COPYING.
Copyright (C) 2023 metamuffin <metamuffin.org>
*/
-use crate::{database::Database, CONF};
-use anyhow::{bail, Context, Ok};
+use crate::{database::Database, federation::Federation, CONF};
+use anyhow::{anyhow, bail, Context, Ok};
use async_recursion::async_recursion;
-use jellycommon::Node;
-use log::info;
+use jellycommon::{Node, RemoteImportOptions};
+use log::{error, info};
use std::{ffi::OsStr, fs::File, os::unix::prelude::OsStrExt, path::PathBuf, sync::LazyLock};
use tokio::sync::Semaphore;
static IMPORT_SEM: LazyLock<Semaphore> = LazyLock::new(|| Semaphore::new(1));
-pub async fn import(db: &Database) -> anyhow::Result<()> {
+pub async fn import(db: &Database, fed: &Federation) -> anyhow::Result<()> {
info!("clearing node tree");
let permit = IMPORT_SEM.try_acquire()?;
- {
- db.node.clear()?;
- info!("importing...");
- import_path(CONF.library_path.clone(), db, None)
- .await
- .context("indexing")?;
- info!("import completed");
- }
+ db.node.clear()?;
+ info!("importing...");
+ let (_, errs) = import_path(CONF.library_path.clone(), db, fed, None)
+ .await
+ .context("indexing")?;
+ info!("import completed");
drop(permit);
- Ok(())
+ if errs == 0 {
+ Ok(())
+ } else {
+ Err(anyhow!(
+ "partial import, {errs} errors occured; see server log"
+ ))
+ }
}
#[async_recursion]
pub async fn import_path(
path: PathBuf,
db: &Database,
+ fed: &Federation,
parent: Option<String>,
-) -> anyhow::Result<Vec<String>> {
+) -> anyhow::Result<(Vec<String>, usize)> {
if path.is_dir() {
let mpath = path.join("directory.json");
let children_paths = path.read_dir()?.map(Result::unwrap).filter_map(|e| {
@@ -47,8 +52,19 @@ pub async fn import_path(
});
let identifier = path.file_name().unwrap().to_str().unwrap().to_string();
let mut children_ids = Vec::new();
+ let mut errs = 0;
for p in children_paths {
- children_ids.extend(import_path(p, db, Some(identifier.clone())).await?)
+ let k = import_path(p, db, fed, Some(identifier.clone())).await;
+ match k {
+ core::result::Result::Ok((els, errs2)) => {
+ errs += errs2;
+ children_ids.extend(els)
+ }
+ Err(e) => {
+ errs += 1;
+ error!("import failed: {e:?}")
+ }
+ }
}
if mpath.exists() {
let mut data: Node =
@@ -56,14 +72,14 @@ pub async fn import_path(
data.public.children = children_ids;
data.public.parent = parent;
- info!("insert {identifier}");
+ info!("adding {identifier}");
db.node.insert(&identifier, &data)?;
- Ok(vec![identifier])
+ Ok((vec![identifier], errs))
} else {
- Ok(children_ids)
+ Ok((children_ids, errs))
}
} else if path.is_file() {
- info!("loading item {path:?}");
+ info!("loading {path:?}");
let datafile = File::open(path.clone()).context("cant load metadata")?;
let mut data: Node = serde_json::from_reader(datafile).context("invalid metadata")?;
let identifier = path
@@ -75,11 +91,37 @@ pub async fn import_path(
.unwrap()
.to_string();
- info!("insert {identifier}");
+ if let Some(io) = data.private.import.take() {
+ let title = data.public.title;
+ data = import_remote(io, db, fed)
+ .await
+ .context("federated import")?;
+ data.public.title = title;
+ }
+
+ info!("adding {identifier}");
data.public.parent = parent;
db.node.insert(&identifier, &data)?;
- Ok(vec![identifier])
+ Ok((vec![identifier], 0))
} else {
bail!("did somebody really put a fifo or socket in the library?!")
}
}
+
+async fn import_remote(
+ opts: RemoteImportOptions,
+ db: &Database,
+ fed: &Federation,
+) -> anyhow::Result<Node> {
+ let sess = fed
+ .get_session(&opts.host)
+ .await
+ .context("creating session")?;
+ let node = sess.node(&opts.id).await.context("fetching remote node")?;
+
+ if !node.public.children.is_empty() {
+ todo!()
+ }
+
+ Ok(node)
+}
diff --git a/server/src/main.rs b/server/src/main.rs
index 5b2d070..6a75c30 100644
--- a/server/src/main.rs
+++ b/server/src/main.rs
@@ -6,13 +6,16 @@
#![feature(lazy_cell)]
use database::Database;
+use federation::Federation;
use jellycommon::config::GlobalConfig;
use jellyremuxer::RemuxerContext;
+use log::error;
use once_cell::sync::Lazy;
use routes::build_rocket;
use std::fs::File;
pub mod database;
+pub mod federation;
pub mod import;
pub mod routes;
@@ -37,7 +40,13 @@ fn main() {
async fn async_main() {
let remuxer = RemuxerContext::new();
let database = Database::open(&CONF.database_path).unwrap();
- import::import(&database).await.unwrap();
+ let federation = Federation::initialize();
database.create_admin();
- build_rocket(remuxer, database).launch().await.unwrap();
+ if let Err(err) = import::import(&database, &federation).await {
+ error!("import not sucessful: {err:?}")
+ }
+ build_rocket(remuxer, database, federation)
+ .launch()
+ .await
+ .unwrap();
}
diff --git a/server/src/routes/api/mod.rs b/server/src/routes/api/mod.rs
index e74b134..cc87525 100644
--- a/server/src/routes/api/mod.rs
+++ b/server/src/routes/api/mod.rs
@@ -4,8 +4,6 @@
Copyright (C) 2023 metamuffin <metamuffin.org>
*/
-use std::ops::Deref;
-
use super::ui::{
account::{login_logic, LoginForm},
error::MyResult,
@@ -22,6 +20,7 @@ use rocket::{
Request, State,
};
use serde_json::{json, Value};
+use std::ops::Deref;
#[get("/api")]
pub fn r_api_root() -> Redirect {
@@ -33,7 +32,7 @@ pub fn r_api_version() -> &'static str {
"2"
}
-#[post("/api/account/login", data = "<data>")]
+#[post("/api/create_session", data = "<data>")]
pub fn r_api_account_login(database: &State<Database>, data: Json<LoginForm>) -> MyResult<Value> {
let token = login_logic(database, &data.username, &data.password)?;
Ok(json!(token))
diff --git a/server/src/routes/mod.rs b/server/src/routes/mod.rs
index be4d2cb..42eae1b 100644
--- a/server/src/routes/mod.rs
+++ b/server/src/routes/mod.rs
@@ -3,7 +3,7 @@
which is licensed under the GNU Affero General Public License (version 3); see /COPYING.
Copyright (C) 2023 metamuffin <metamuffin.org>
*/
-use crate::{database::Database, routes::ui::error::MyResult, CONF};
+use crate::{database::Database, federation::Federation, routes::ui::error::MyResult, CONF};
use api::{r_api_account_login, r_api_root, r_api_version};
use base64::Engine;
use jellyremuxer::RemuxerContext;
@@ -45,7 +45,11 @@ macro_rules! uri {
};
}
-pub fn build_rocket(remuxer: RemuxerContext, database: Database) -> Rocket<Build> {
+pub fn build_rocket(
+ remuxer: RemuxerContext,
+ database: Database,
+ federation: Federation,
+) -> Rocket<Build> {
rocket::build()
.configure(Config {
secret_key: SecretKey::derive_from(
@@ -61,6 +65,7 @@ pub fn build_rocket(remuxer: RemuxerContext, database: Database) -> Rocket<Build
})
.manage(remuxer)
.manage(database)
+ .manage(federation)
.attach(AdHoc::on_response("set server header", |_req, res| {
res.set_header(Header::new("server", "jellything"));
Box::pin(async {})
diff --git a/server/src/routes/stream.rs b/server/src/routes/stream.rs
index 1277646..ee7880c 100644
--- a/server/src/routes/stream.rs
+++ b/server/src/routes/stream.rs
@@ -4,7 +4,7 @@
Copyright (C) 2023 metamuffin <metamuffin.org>
*/
use super::ui::{account::session::Session, error::MyError};
-use crate::{database::Database, CONF};
+use crate::{database::Database, federation::Federation, CONF};
use anyhow::{anyhow, Result};
use jellycommon::MediaSource;
use jellyremuxer::RemuxerContext;
@@ -30,6 +30,7 @@ pub async fn r_stream(
webm: Option<bool>,
tracks: String,
remuxer: &State<RemuxerContext>,
+ federation: &State<Federation>,
db: &State<Database>,
range: Option<RequestRange>,
) -> Result<Either<StreamResponse, Redirect>, MyError> {
@@ -47,27 +48,21 @@ pub async fn r_stream(
let source_tracks = match source {
MediaSource::Local { tracks } => tracks,
MediaSource::Remote { host, remote_id } => {
- let (username, password, tls) = CONF
+ let (username, password, _) = CONF
.remote_credentials
.get(&host)
.ok_or(anyhow!("no credentials on the server-side"))?;
- let instance = jellyclient::Instance { host, tls: *tls };
- let session = jellyclient::login(
- &instance,
- username.to_owned(),
- password.to_owned(),
- Duration::from_secs(60),
- )
- .await?;
+ let instance = federation.get_instance(&host)?.to_owned();
+ let session = instance
+ .login(
+ username.to_owned(),
+ password.to_owned(),
+ Duration::from_secs(60),
+ )
+ .await?;
- let uri = jellyclient::stream(
- &instance,
- &session,
- &remote_id,
- &tracks,
- webm.unwrap_or(false),
- );
+ let uri = session.stream(&remote_id, &tracks, webm.unwrap_or(false));
return Ok(Either::Right(Redirect::found(uri)));
}
};
diff --git a/server/src/routes/ui/account/admin.rs b/server/src/routes/ui/account/admin.rs
index 7124f4a..d0ad433 100644
--- a/server/src/routes/ui/account/admin.rs
+++ b/server/src/routes/ui/account/admin.rs
@@ -5,6 +5,7 @@
*/
use crate::{
database::Database,
+ federation::Federation,
import::import,
routes::ui::{
account::session::Session,
@@ -135,11 +136,12 @@ pub fn r_account_admin_remove_invite(
pub async fn r_account_admin_import(
session: Session,
database: &State<Database>,
+ federation: &State<Federation>,
) -> MyResult<DynLayoutPage<'static>> {
if !session.user.admin {
Err(anyhow!("you not admin"))?
}
- let r = import(&database).await;
+ let r = import(&database, &federation).await;
admin_dashboard(
&database,
Some(r.map_err(|e| e.into()).map(|_| "Import successful".into())),
diff --git a/server/src/routes/ui/error.rs b/server/src/routes/ui/error.rs
index 190650f..b538a06 100644
--- a/server/src/routes/ui/error.rs
+++ b/server/src/routes/ui/error.rs
@@ -36,7 +36,6 @@ pub fn r_api_catch<'a>(status: Status, _request: &Request) -> Value {
pub type MyResult<T> = Result<T, MyError>;
-#[derive(Debug)]
pub struct MyError(pub anyhow::Error);
impl<'r> Responder<'r, 'static> for MyError {
@@ -61,6 +60,12 @@ impl<'r> Responder<'r, 'static> for MyError {
}
}
+impl std::fmt::Debug for MyError {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ f.write_fmt(format_args!("{:?}", self.0))
+ }
+}
+
impl Display for MyError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.0.fmt(f)