diff options
Diffstat (limited to 'server/src/logic/stream.rs')
-rw-r--r-- | server/src/logic/stream.rs | 246 |
1 files changed, 246 insertions, 0 deletions
diff --git a/server/src/logic/stream.rs b/server/src/logic/stream.rs new file mode 100644 index 0000000..5bba9c2 --- /dev/null +++ b/server/src/logic/stream.rs @@ -0,0 +1,246 @@ +/* + This file is part of jellything (https://codeberg.org/metamuffin/jellything) + which is licensed under the GNU Affero General Public License (version 3); see /COPYING. + Copyright (C) 2025 metamuffin <metamuffin.org> +*/ +use super::session::Session; +use crate::{database::Database, ui::error::MyError}; +use anyhow::{anyhow, Result}; +use jellybase::{assetfed::AssetInner, federation::Federation}; +use jellycommon::{stream::StreamSpec, TrackSource}; +use jellystream::SMediaInfo; +use log::{info, warn}; +use rocket::{ + get, head, + http::{Header, Status}, + request::{self, FromRequest}, + response::{self, Redirect, Responder}, + Either, Request, Response, State, +}; +use std::{ + collections::{BTreeMap, BTreeSet}, + ops::Range, + sync::Arc, +}; +use tokio::io::{duplex, DuplexStream}; + +#[head("/n/<_id>/stream?<spec..>")] +pub async fn r_stream_head( + _sess: Session, + _id: &str, + spec: BTreeMap<String, String>, +) -> Result<Either<StreamResponse, Redirect>, MyError> { + let spec = StreamSpec::from_query_kv(&spec).map_err(|x| anyhow!("spec invalid: {x}"))?; + let head = jellystream::stream_head(&spec); + Ok(Either::Left(StreamResponse { + stream: duplex(0).0, + advertise_range: head.range_supported, + content_type: head.content_type, + range: None, + })) +} + +#[get("/n/<id>/stream?<spec..>")] +pub async fn r_stream( + _session: Session, + _federation: &State<Federation>, + db: &State<Database>, + id: &str, + range: Option<RequestRange>, + spec: BTreeMap<String, String>, +) -> Result<Either<StreamResponse, RedirectResponse>, MyError> { + let spec = StreamSpec::from_query_kv(&spec).map_err(|x| anyhow!("spec invalid: {x}"))?; + // TODO perm + let node = db + .get_node_slug(id)? + .ok_or(anyhow!("node does not exist"))?; + + let media = Arc::new( + node.media + .clone() + .ok_or(anyhow!("item does not contain media"))?, + ); + + // TODO its unclear how requests with multiple tracks should be handled. + // if spec.track.len() == 1 { + // let ti = spec.track[0]; + // if let TrackSource::Remote(remote_index) = media.tracks[ti].source { + // session + // .user + // .permissions + // .assert(&UserPermission::FederatedContent)?; + + // let track = &node.media.as_ref().ok_or(anyhow!("no media"))?.tracks[ti]; + // let host = track + // .federated + // .last() + // .ok_or(anyhow!("federation inconsistent"))?; + + // let FederationAccount { + // password, username, .. + // } = SECRETS + // .federation + // .get(host) + // .ok_or(anyhow!("no credentials on the server-side"))?; + + // info!("creating session on {host}"); + // let instance = federation.get_instance(host)?.to_owned(); + // let session = instance + // .login(CreateSessionParams { + // username: username.to_owned(), + // password: password.to_owned(), + // expire: Some(60), + // drop_permissions: Some(HashSet::from_iter([ + // UserPermission::ManageSelf, + // UserPermission::Admin, // in case somebody federated the admin :))) + // ])), + // }) + // .await?; + + // let uri = session.stream_url( + // node.slug.clone().into(), + // &StreamSpec { + // track: vec![remote_index], + // ..spec + // }, + // ); + // info!("federation redirect"); + // return Ok(Either::Right(RedirectResponse(uri))); + // } + // } + + info!( + "stream request (range={})", + range + .as_ref() + .map(|r| r.to_cr_hv()) + .unwrap_or("none".to_string()) + ); + + let urange = match &range { + Some(r) => { + let r = r.0.first().unwrap_or(&(None..None)); + r.start.unwrap_or(0)..r.end.unwrap_or(isize::MAX as usize) + } + None => 0..(isize::MAX as usize), + }; + + let head = jellystream::stream_head(&spec); + + let mut sources = BTreeSet::new(); + for t in &media.tracks { + if let TrackSource::Local(x) = &t.source { + if let AssetInner::LocalTrack(m) = AssetInner::deser(&x.0)? { + sources.insert(m.path); + } + } + } + let media = Arc::new(SMediaInfo { + files: sources, + info: node, + }); + + match jellystream::stream(media, spec, urange).await { + Ok(stream) => Ok(Either::Left(StreamResponse { + stream, + range, + advertise_range: head.range_supported, + content_type: head.content_type, + })), + Err(e) => { + warn!("stream error: {e}"); + Err(MyError(e)) + } + } +} + +pub struct RedirectResponse(String); + +#[rocket::async_trait] +impl<'r> Responder<'r, 'static> for RedirectResponse { + fn respond_to(self, _: &'r Request<'_>) -> response::Result<'static> { + let mut b = Response::build(); + b.status(Status::Found); + b.header(Header::new("access-control-allow-origin", "*")); + b.header(Header::new("location", self.0)); + Ok(b.finalize()) + } +} + +pub struct StreamResponse { + stream: DuplexStream, + advertise_range: bool, + content_type: &'static str, + range: Option<RequestRange>, +} + +#[rocket::async_trait] +impl<'r> Responder<'r, 'static> for StreamResponse { + fn respond_to(self, _: &'r Request<'_>) -> response::Result<'static> { + let mut b = Response::build(); + b.status(Status::Ok); + b.header(Header::new("access-control-allow-origin", "*")); + if self.advertise_range { + //* it is very important here to not reply with content range if we didnt advertise. + //* mpv requests range but will crash if we dont pretend to not support it. + if let Some(range) = self.range { + b.status(Status::PartialContent); + b.header(Header::new("content-range", range.to_cr_hv())); + } + b.header(Header::new("accept-ranges", "bytes")); + } + b.header(Header::new("content-type", self.content_type)) + .streamed_body(self.stream) + .ok() + } +} + +#[derive(Debug)] +pub struct RequestRange(Vec<Range<Option<usize>>>); + +impl RequestRange { + pub fn to_cr_hv(&self) -> String { + assert_eq!(self.0.len(), 1); + format!( + "bytes {}-{}/*", + self.0[0].start.map(|e| e.to_string()).unwrap_or_default(), + self.0[0].end.map(|e| e.to_string()).unwrap_or_default() + ) + } + pub fn from_hv(s: &str) -> Result<Self> { + Ok(Self( + s.strip_prefix("bytes=") + .ok_or(anyhow!("prefix expected"))? + .split(',') + .map(|s| { + let (l, r) = s + .split_once('-') + .ok_or(anyhow!("range delimeter missing"))?; + let km = |s: &str| { + if s.is_empty() { + Ok::<_, anyhow::Error>(None) + } else { + Ok(Some(s.parse()?)) + } + }; + Ok(km(l)?..km(r)?) + }) + .collect::<Result<Vec<_>>>()?, + )) + } +} + +#[rocket::async_trait] +impl<'r> FromRequest<'r> for RequestRange { + type Error = anyhow::Error; + + async fn from_request(req: &'r Request<'_>) -> request::Outcome<Self, Self::Error> { + match req.headers().get("range").next() { + Some(v) => match Self::from_hv(v) { + Ok(v) => rocket::outcome::Outcome::Success(v), + Err(e) => rocket::outcome::Outcome::Error((Status::BadRequest, e)), + }, + None => rocket::outcome::Outcome::Forward(Status::Ok), + } + } +} |