aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authormetamuffin <metamuffin@disroot.org>2023-12-22 23:57:03 +0100
committermetamuffin <metamuffin@disroot.org>2023-12-22 23:57:03 +0100
commit75949cebdd61dd8f0d06f2e47081c460e2a442f0 (patch)
treeea0fc2ff003b7694ee06555d731bce1f08199136
parentc4682c231cbfa2bd4b44e14548800a64cc9cdbb8 (diff)
downloadjellything-75949cebdd61dd8f0d06f2e47081c460e2a442f0.tar
jellything-75949cebdd61dd8f0d06f2e47081c460e2a442f0.tar.bz2
jellything-75949cebdd61dd8f0d06f2e47081c460e2a442f0.tar.zst
rework import system pt. 8: federated streams & change jhls
-rw-r--r--base/src/federation.rs7
-rw-r--r--client/src/lib.rs14
-rw-r--r--common/src/jhls.rs10
-rw-r--r--common/src/user.rs13
-rw-r--r--server/src/routes/api/mod.rs13
-rw-r--r--server/src/routes/stream.rs75
-rw-r--r--stream/src/jhls.rs55
-rw-r--r--stream/src/lib.rs4
-rw-r--r--tool/src/main.rs8
9 files changed, 88 insertions, 111 deletions
diff --git a/base/src/federation.rs b/base/src/federation.rs
index 509b87c..9cd04ae 100644
--- a/base/src/federation.rs
+++ b/base/src/federation.rs
@@ -3,9 +3,10 @@
which is licensed under the GNU Affero General Public License (version 3); see /COPYING.
Copyright (C) 2023 metamuffin <metamuffin.org>
*/
-use anyhow::anyhow;
use crate::CONF;
-use jellyclient::{Instance, LoginDetails, Session};
+use anyhow::anyhow;
+use jellyclient::{Instance, Session};
+use jellycommon::user::CreateSessionParams;
use std::{collections::HashMap, sync::Arc};
use tokio::sync::RwLock;
@@ -46,7 +47,7 @@ impl Federation {
let s = Arc::new(
self.get_instance(host)?
.to_owned()
- .login(LoginDetails {
+ .login(CreateSessionParams {
username: username.to_owned(),
password: password.to_owned(),
expire: None,
diff --git a/client/src/lib.rs b/client/src/lib.rs
index 96349ed..f88ecc5 100644
--- a/client/src/lib.rs
+++ b/client/src/lib.rs
@@ -4,14 +4,12 @@
Copyright (C) 2023 metamuffin <metamuffin.org>
*/
use anyhow::Result;
-use jellycommon::user::UserPermission;
+use jellycommon::user::CreateSessionParams;
use log::{debug, info};
use reqwest::{
header::{HeaderMap, HeaderValue},
Client,
};
-use serde::Serialize;
-use std::collections::HashSet;
use stream::StreamSpec;
use tokio::io::AsyncWriteExt;
@@ -23,14 +21,6 @@ pub struct Instance {
pub tls: bool,
}
-#[derive(Serialize)]
-pub struct LoginDetails {
- pub username: String,
- pub password: String,
- pub expire: Option<i64>,
- pub drop_permissions: Option<HashSet<UserPermission>>,
-}
-
impl Instance {
pub fn new(host: String, tls: bool) -> Self {
Self { host, tls }
@@ -42,7 +32,7 @@ impl Instance {
self.host
)
}
- pub async fn login(self, data: LoginDetails) -> anyhow::Result<Session> {
+ pub async fn login(self, data: CreateSessionParams) -> anyhow::Result<Session> {
let session_token = Client::builder()
.build()?
.post(format!("{}/api/create_session", self.base()))
diff --git a/common/src/jhls.rs b/common/src/jhls.rs
index eadf9a2..33c67d2 100644
--- a/common/src/jhls.rs
+++ b/common/src/jhls.rs
@@ -3,20 +3,12 @@
which is licensed under the GNU Affero General Public License (version 3); see /COPYING.
Copyright (C) 2023 metamuffin <metamuffin.org>
*/
-use crate::SourceTrack;
use serde::{Deserialize, Serialize};
use std::ops::Range;
#[derive(Debug, Clone, Deserialize, Serialize)]
-pub struct JhlsMetadata {
- pub duration: f64,
+pub struct JhlsTrackIndex {
pub extra_profiles: Vec<EncodingProfile>,
- pub tracks: Vec<JhlsTrack>,
-}
-
-#[derive(Debug, Clone, Deserialize, Serialize)]
-pub struct JhlsTrack {
- pub info: SourceTrack,
pub segments: Vec<Range<f64>>,
}
diff --git a/common/src/user.rs b/common/src/user.rs
index 466423c..7e654a9 100644
--- a/common/src/user.rs
+++ b/common/src/user.rs
@@ -7,7 +7,10 @@ use crate::{stream::StreamFormat, user};
#[cfg(feature = "rocket")]
use rocket::{FromFormField, UriDisplayQuery};
use serde::{Deserialize, Serialize};
-use std::{collections::HashMap, fmt::Display};
+use std::{
+ collections::{HashMap, HashSet},
+ fmt::Display,
+};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct User {
@@ -32,6 +35,14 @@ pub enum WatchedState {
Watched,
}
+#[derive(Debug, Serialize, Deserialize)]
+pub struct CreateSessionParams {
+ pub username: String,
+ pub password: String,
+ pub expire: Option<i64>,
+ pub drop_permissions: Option<HashSet<UserPermission>>,
+}
+
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq)]
#[cfg_attr(feature = "rocket", derive(FromFormField, UriDisplayQuery))]
#[serde(rename_all = "snake_case")]
diff --git a/server/src/routes/api/mod.rs b/server/src/routes/api/mod.rs
index 87ed0e9..214c5de 100644
--- a/server/src/routes/api/mod.rs
+++ b/server/src/routes/api/mod.rs
@@ -9,7 +9,7 @@ use super::ui::{
};
use crate::database::Database;
use anyhow::{anyhow, Context};
-use jellycommon::{user::UserPermission, Node};
+use jellycommon::{user::CreateSessionParams, Node};
use rocket::{
get,
http::MediaType,
@@ -20,9 +20,8 @@ use rocket::{
serde::json::Json,
Request, State,
};
-use serde::Deserialize;
use serde_json::{json, Value};
-use std::{collections::HashSet, ops::Deref};
+use std::ops::Deref;
#[get("/api")]
pub fn r_api_root() -> Redirect {
@@ -34,14 +33,6 @@ pub fn r_api_version() -> &'static str {
"2"
}
-#[derive(Deserialize)]
-pub struct CreateSessionParams {
- username: String,
- password: String,
- expire: Option<i64>,
- drop_permissions: Option<HashSet<UserPermission>>,
-}
-
#[post("/api/create_session", data = "<data>")]
pub fn r_api_account_login(
database: &State<Database>,
diff --git a/server/src/routes/stream.rs b/server/src/routes/stream.rs
index 14db462..8d16ded 100644
--- a/server/src/routes/stream.rs
+++ b/server/src/routes/stream.rs
@@ -11,7 +11,11 @@ use jellybase::{
permission::{NodePermissionExt, PermissionSetExt},
CONF,
};
-use jellycommon::{stream::StreamSpec, user::UserPermission, TrackSource};
+use jellycommon::{
+ stream::StreamSpec,
+ user::{CreateSessionParams, UserPermission},
+ TrackSource,
+};
use log::{info, warn};
use rocket::{
get, head,
@@ -58,36 +62,51 @@ pub async fn r_stream(
.as_ref()
.ok_or(anyhow!("item does not contain media"))?;
- // TODO federated streams
- // if let MediaSource::Remote { host, remote_id } = source {
- // session
- // .user
- // .permissions
- // .assert(&UserPermission::FederatedContent)?;
+ // TODO its unclear how requests with multiple tracks should be handled.
+ if spec.tracks.len() == 1 {
+ let ti = spec.tracks[0];
+ if let TrackSource::Remote(remote_index) = source[ti] {
+ session
+ .user
+ .permissions
+ .assert(&UserPermission::FederatedContent)?;
+
+ let track = &node.public.media.ok_or(anyhow!("no media"))?.tracks[ti];
+ let host = track
+ .federated
+ .last()
+ .ok_or(anyhow!("federation inconsistent"))?;
- // let (username, password, _) = CONF
- // .remote_credentials
- // .get(host)
- // .ok_or(anyhow!("no credentials on the server-side"))?;
+ let (username, password, _) = CONF
+ .remote_credentials
+ .get(host)
+ .ok_or(anyhow!("no credentials on the server-side"))?;
- // info!("creating session on {host}");
- // let instance = federation.get_instance(&host)?.to_owned();
- // let session = instance
- // .login(LoginDetails {
- // username: username.to_owned(),
- // password: password.to_owned(),
- // expire: Some(60),
- // drop_permissions: Some(HashSet::from_iter([
- // UserPermission::ManageSelf,
- // UserPermission::Admin, // in case somebody federated the admin :)))
- // ])),
- // })
- // .await?;
+ info!("creating session on {host}");
+ let instance = federation.get_instance(&host)?.to_owned();
+ let session = instance
+ .login(CreateSessionParams {
+ username: username.to_owned(),
+ password: password.to_owned(),
+ expire: Some(60),
+ drop_permissions: Some(HashSet::from_iter([
+ UserPermission::ManageSelf,
+ UserPermission::Admin, // in case somebody federated the admin :)))
+ ])),
+ })
+ .await?;
- // let uri = session.stream(&remote_id, &spec);
- // info!("federation redirect");
- // return Ok(Either::Right(RedirectResponse(uri)));
- // }
+ let uri = session.stream(
+ node.public.id.as_ref().unwrap(),
+ &StreamSpec {
+ tracks: vec![remote_index],
+ ..spec
+ },
+ );
+ info!("federation redirect");
+ return Ok(Either::Right(RedirectResponse(uri)));
+ }
+ }
info!(
"stream request (range={})",
diff --git a/stream/src/jhls.rs b/stream/src/jhls.rs
index e61e918..e58aafe 100644
--- a/stream/src/jhls.rs
+++ b/stream/src/jhls.rs
@@ -6,66 +6,37 @@
use anyhow::{anyhow, Result};
use jellybase::{permission::PermissionSetExt, CONF};
use jellycommon::{
- jhls::{JhlsMetadata, JhlsTrack},
+ jhls::JhlsTrackIndex,
stream::StreamSpec,
user::{PermissionSet, UserPermission},
- Node, TrackSource,
+ LocalTrack, Node,
};
use tokio::io::{AsyncWriteExt, DuplexStream};
-pub async fn jhls_stream(
+pub async fn jhls_index(
node: Node,
- track_sources: &[TrackSource],
+ local_tracks: &[LocalTrack],
_spec: StreamSpec,
mut b: DuplexStream,
perms: &PermissionSet,
) -> Result<()> {
- let track_sources = track_sources.to_vec();
- let media = node.public.media.clone().unwrap();
- let tracks = tokio::task::spawn_blocking(move || {
- media
- .tracks
- .iter()
- .enumerate()
- .filter_map(|(i, t)| {
- // TODO can we maybe stream the subtitles as .mks in the future? that would be cool
- // subtitles can only be supported by snippet_index if we relax constraints on the contents of each cluster
- if matches!(t.kind, jellycommon::SourceTrackKind::Subtitles) {
- return Some(Ok(JhlsTrack {
- info: t.to_owned(),
- segments: vec![], // clients need to ignore this
- }));
- }
- match jellyremuxer::snippet::snippet_index(
- &CONF.library_path,
- &node.public,
- match &track_sources[i] {
- TrackSource::Local(x) => x,
- // TODO fetch seek index from the remote and create a single session to be sent in jhls
- TrackSource::Remote(_) => {
- return Some(Err(anyhow!("remote tracks dont work yet")))
- }
- },
- ) {
- Ok(segments) => Some(Ok::<_, anyhow::Error>(JhlsTrack {
- info: t.to_owned(),
- segments,
- })),
- Err(e) => Some(Err(e)),
- }
- })
- .try_collect::<Vec<_>>()
+ let local_track = local_tracks
+ .get(0)
+ .ok_or(anyhow!("track missing"))?
+ .to_owned();
+
+ let segments = tokio::task::spawn_blocking(move || {
+ jellyremuxer::snippet::snippet_index(&CONF.library_path, &node.public, &local_track)
})
.await??;
- let out = serde_json::to_string(&JhlsMetadata {
- tracks,
+ let out = serde_json::to_string(&JhlsTrackIndex {
extra_profiles: if perms.check(&UserPermission::Transcode) {
CONF.transcoding_profiles.clone()
} else {
vec![]
},
- duration: media.duration,
+ segments,
})?;
tokio::spawn(async move { b.write_all(out.as_bytes()).await });
Ok(())
diff --git a/stream/src/lib.rs b/stream/src/lib.rs
index e2137d3..e86230c 100644
--- a/stream/src/lib.rs
+++ b/stream/src/lib.rs
@@ -17,7 +17,7 @@ use jellycommon::{
user::{PermissionSet, UserPermission},
LocalTrack, Node, TrackSource,
};
-use jhls::jhls_stream;
+use jhls::jhls_index;
use segment::segment_stream;
use std::{io::SeekFrom, ops::Range};
use tokio::{
@@ -85,7 +85,7 @@ pub async fn stream(
StreamFormat::Matroska => remux_stream(node, local_tracks, spec, range, b).await?,
StreamFormat::HlsMaster => hls_master_stream(node, local_tracks, spec, b).await?,
StreamFormat::HlsVariant => hls_variant_stream(node, local_tracks, spec, b).await?,
- StreamFormat::Jhls => jhls_stream(node, &track_sources, spec, b, perms).await?,
+ StreamFormat::Jhls => jhls_index(node, &local_tracks, spec, b, perms).await?,
StreamFormat::Segment => segment_stream(node, local_tracks, spec, b, perms).await?,
StreamFormat::Webvtt => webvtt_stream(node, local_tracks, spec, b).await?,
}
diff --git a/tool/src/main.rs b/tool/src/main.rs
index 7b58125..6dc7f8a 100644
--- a/tool/src/main.rs
+++ b/tool/src/main.rs
@@ -10,8 +10,10 @@ pub mod migrate;
use anyhow::anyhow;
use base64::Engine;
use clap::{Parser, Subcommand, ValueEnum};
-use jellyclient::{Instance, LoginDetails};
-use jellycommon::{config::GlobalConfig, Node, NodeKind, NodePrivate, NodePublic};
+use jellyclient::Instance;
+use jellycommon::{
+ config::GlobalConfig, user::CreateSessionParams, Node, NodeKind, NodePrivate, NodePublic,
+};
use log::{info, warn};
use migrate::migrate;
use rand::random;
@@ -142,7 +144,7 @@ fn main() -> anyhow::Result<()> {
let inst = Instance::new(hostname.unwrap_or(config.hostname.clone()), !no_tls);
info!("login");
let session = inst
- .login(LoginDetails {
+ .login(CreateSessionParams {
drop_permissions: None,
expire: None,
password: config.admin_password,