aboutsummaryrefslogtreecommitdiff
path: root/server/src/logic/stream.rs
diff options
context:
space:
mode:
authormetamuffin <metamuffin@disroot.org>2025-04-27 19:25:11 +0200
committermetamuffin <metamuffin@disroot.org>2025-04-27 19:25:11 +0200
commit11a585b3dbe620dcc8772e713b22f1d9ba80d598 (patch)
tree44f8d97137412aefc79a2425a489c34fa3e5f6c5 /server/src/logic/stream.rs
parentd871aa7c5bba49ff55170b5d2dac9cd440ae7170 (diff)
downloadjellything-11a585b3dbe620dcc8772e713b22f1d9ba80d598.tar
jellything-11a585b3dbe620dcc8772e713b22f1d9ba80d598.tar.bz2
jellything-11a585b3dbe620dcc8772e713b22f1d9ba80d598.tar.zst
move files around
Diffstat (limited to 'server/src/logic/stream.rs')
-rw-r--r--server/src/logic/stream.rs246
1 files changed, 246 insertions, 0 deletions
diff --git a/server/src/logic/stream.rs b/server/src/logic/stream.rs
new file mode 100644
index 0000000..5bba9c2
--- /dev/null
+++ b/server/src/logic/stream.rs
@@ -0,0 +1,246 @@
+/*
+ This file is part of jellything (https://codeberg.org/metamuffin/jellything)
+ which is licensed under the GNU Affero General Public License (version 3); see /COPYING.
+ Copyright (C) 2025 metamuffin <metamuffin.org>
+*/
+use super::session::Session;
+use crate::{database::Database, ui::error::MyError};
+use anyhow::{anyhow, Result};
+use jellybase::{assetfed::AssetInner, federation::Federation};
+use jellycommon::{stream::StreamSpec, TrackSource};
+use jellystream::SMediaInfo;
+use log::{info, warn};
+use rocket::{
+ get, head,
+ http::{Header, Status},
+ request::{self, FromRequest},
+ response::{self, Redirect, Responder},
+ Either, Request, Response, State,
+};
+use std::{
+ collections::{BTreeMap, BTreeSet},
+ ops::Range,
+ sync::Arc,
+};
+use tokio::io::{duplex, DuplexStream};
+
+#[head("/n/<_id>/stream?<spec..>")]
+pub async fn r_stream_head(
+ _sess: Session,
+ _id: &str,
+ spec: BTreeMap<String, String>,
+) -> Result<Either<StreamResponse, Redirect>, MyError> {
+ let spec = StreamSpec::from_query_kv(&spec).map_err(|x| anyhow!("spec invalid: {x}"))?;
+ let head = jellystream::stream_head(&spec);
+ Ok(Either::Left(StreamResponse {
+ stream: duplex(0).0,
+ advertise_range: head.range_supported,
+ content_type: head.content_type,
+ range: None,
+ }))
+}
+
+#[get("/n/<id>/stream?<spec..>")]
+pub async fn r_stream(
+ _session: Session,
+ _federation: &State<Federation>,
+ db: &State<Database>,
+ id: &str,
+ range: Option<RequestRange>,
+ spec: BTreeMap<String, String>,
+) -> Result<Either<StreamResponse, RedirectResponse>, MyError> {
+ let spec = StreamSpec::from_query_kv(&spec).map_err(|x| anyhow!("spec invalid: {x}"))?;
+ // TODO perm
+ let node = db
+ .get_node_slug(id)?
+ .ok_or(anyhow!("node does not exist"))?;
+
+ let media = Arc::new(
+ node.media
+ .clone()
+ .ok_or(anyhow!("item does not contain media"))?,
+ );
+
+ // TODO its unclear how requests with multiple tracks should be handled.
+ // if spec.track.len() == 1 {
+ // let ti = spec.track[0];
+ // if let TrackSource::Remote(remote_index) = media.tracks[ti].source {
+ // session
+ // .user
+ // .permissions
+ // .assert(&UserPermission::FederatedContent)?;
+
+ // let track = &node.media.as_ref().ok_or(anyhow!("no media"))?.tracks[ti];
+ // let host = track
+ // .federated
+ // .last()
+ // .ok_or(anyhow!("federation inconsistent"))?;
+
+ // let FederationAccount {
+ // password, username, ..
+ // } = SECRETS
+ // .federation
+ // .get(host)
+ // .ok_or(anyhow!("no credentials on the server-side"))?;
+
+ // info!("creating session on {host}");
+ // let instance = federation.get_instance(host)?.to_owned();
+ // let session = instance
+ // .login(CreateSessionParams {
+ // username: username.to_owned(),
+ // password: password.to_owned(),
+ // expire: Some(60),
+ // drop_permissions: Some(HashSet::from_iter([
+ // UserPermission::ManageSelf,
+ // UserPermission::Admin, // in case somebody federated the admin :)))
+ // ])),
+ // })
+ // .await?;
+
+ // let uri = session.stream_url(
+ // node.slug.clone().into(),
+ // &StreamSpec {
+ // track: vec![remote_index],
+ // ..spec
+ // },
+ // );
+ // info!("federation redirect");
+ // return Ok(Either::Right(RedirectResponse(uri)));
+ // }
+ // }
+
+ info!(
+ "stream request (range={})",
+ range
+ .as_ref()
+ .map(|r| r.to_cr_hv())
+ .unwrap_or("none".to_string())
+ );
+
+ let urange = match &range {
+ Some(r) => {
+ let r = r.0.first().unwrap_or(&(None..None));
+ r.start.unwrap_or(0)..r.end.unwrap_or(isize::MAX as usize)
+ }
+ None => 0..(isize::MAX as usize),
+ };
+
+ let head = jellystream::stream_head(&spec);
+
+ let mut sources = BTreeSet::new();
+ for t in &media.tracks {
+ if let TrackSource::Local(x) = &t.source {
+ if let AssetInner::LocalTrack(m) = AssetInner::deser(&x.0)? {
+ sources.insert(m.path);
+ }
+ }
+ }
+ let media = Arc::new(SMediaInfo {
+ files: sources,
+ info: node,
+ });
+
+ match jellystream::stream(media, spec, urange).await {
+ Ok(stream) => Ok(Either::Left(StreamResponse {
+ stream,
+ range,
+ advertise_range: head.range_supported,
+ content_type: head.content_type,
+ })),
+ Err(e) => {
+ warn!("stream error: {e}");
+ Err(MyError(e))
+ }
+ }
+}
+
+pub struct RedirectResponse(String);
+
+#[rocket::async_trait]
+impl<'r> Responder<'r, 'static> for RedirectResponse {
+ fn respond_to(self, _: &'r Request<'_>) -> response::Result<'static> {
+ let mut b = Response::build();
+ b.status(Status::Found);
+ b.header(Header::new("access-control-allow-origin", "*"));
+ b.header(Header::new("location", self.0));
+ Ok(b.finalize())
+ }
+}
+
+pub struct StreamResponse {
+ stream: DuplexStream,
+ advertise_range: bool,
+ content_type: &'static str,
+ range: Option<RequestRange>,
+}
+
+#[rocket::async_trait]
+impl<'r> Responder<'r, 'static> for StreamResponse {
+ fn respond_to(self, _: &'r Request<'_>) -> response::Result<'static> {
+ let mut b = Response::build();
+ b.status(Status::Ok);
+ b.header(Header::new("access-control-allow-origin", "*"));
+ if self.advertise_range {
+ //* it is very important here to not reply with content range if we didnt advertise.
+ //* mpv requests range but will crash if we dont pretend to not support it.
+ if let Some(range) = self.range {
+ b.status(Status::PartialContent);
+ b.header(Header::new("content-range", range.to_cr_hv()));
+ }
+ b.header(Header::new("accept-ranges", "bytes"));
+ }
+ b.header(Header::new("content-type", self.content_type))
+ .streamed_body(self.stream)
+ .ok()
+ }
+}
+
+#[derive(Debug)]
+pub struct RequestRange(Vec<Range<Option<usize>>>);
+
+impl RequestRange {
+ pub fn to_cr_hv(&self) -> String {
+ assert_eq!(self.0.len(), 1);
+ format!(
+ "bytes {}-{}/*",
+ self.0[0].start.map(|e| e.to_string()).unwrap_or_default(),
+ self.0[0].end.map(|e| e.to_string()).unwrap_or_default()
+ )
+ }
+ pub fn from_hv(s: &str) -> Result<Self> {
+ Ok(Self(
+ s.strip_prefix("bytes=")
+ .ok_or(anyhow!("prefix expected"))?
+ .split(',')
+ .map(|s| {
+ let (l, r) = s
+ .split_once('-')
+ .ok_or(anyhow!("range delimeter missing"))?;
+ let km = |s: &str| {
+ if s.is_empty() {
+ Ok::<_, anyhow::Error>(None)
+ } else {
+ Ok(Some(s.parse()?))
+ }
+ };
+ Ok(km(l)?..km(r)?)
+ })
+ .collect::<Result<Vec<_>>>()?,
+ ))
+ }
+}
+
+#[rocket::async_trait]
+impl<'r> FromRequest<'r> for RequestRange {
+ type Error = anyhow::Error;
+
+ async fn from_request(req: &'r Request<'_>) -> request::Outcome<Self, Self::Error> {
+ match req.headers().get("range").next() {
+ Some(v) => match Self::from_hv(v) {
+ Ok(v) => rocket::outcome::Outcome::Success(v),
+ Err(e) => rocket::outcome::Outcome::Error((Status::BadRequest, e)),
+ },
+ None => rocket::outcome::Outcome::Forward(Status::Ok),
+ }
+ }
+}