aboutsummaryrefslogtreecommitdiff
path: root/server/src/routes/stream.rs
diff options
context:
space:
mode:
Diffstat (limited to 'server/src/routes/stream.rs')
-rw-r--r--server/src/routes/stream.rs246
1 files changed, 0 insertions, 246 deletions
diff --git a/server/src/routes/stream.rs b/server/src/routes/stream.rs
deleted file mode 100644
index 0fbeb3a..0000000
--- a/server/src/routes/stream.rs
+++ /dev/null
@@ -1,246 +0,0 @@
-/*
- This file is part of jellything (https://codeberg.org/metamuffin/jellything)
- which is licensed under the GNU Affero General Public License (version 3); see /COPYING.
- Copyright (C) 2025 metamuffin <metamuffin.org>
-*/
-use super::ui::{account::session::Session, error::MyError};
-use crate::database::Database;
-use anyhow::{anyhow, Result};
-use jellybase::{assetfed::AssetInner, federation::Federation};
-use jellycommon::{stream::StreamSpec, TrackSource};
-use jellystream::SMediaInfo;
-use log::{info, warn};
-use rocket::{
- get, head,
- http::{Header, Status},
- request::{self, FromRequest},
- response::{self, Redirect, Responder},
- Either, Request, Response, State,
-};
-use std::{
- collections::{BTreeMap, BTreeSet},
- ops::Range,
- sync::Arc,
-};
-use tokio::io::{duplex, DuplexStream};
-
-#[head("/n/<_id>/stream?<spec..>")]
-pub async fn r_stream_head(
- _sess: Session,
- _id: &str,
- spec: BTreeMap<String, String>,
-) -> Result<Either<StreamResponse, Redirect>, MyError> {
- let spec = StreamSpec::from_query_kv(&spec).map_err(|x| anyhow!("spec invalid: {x}"))?;
- let head = jellystream::stream_head(&spec);
- Ok(Either::Left(StreamResponse {
- stream: duplex(0).0,
- advertise_range: head.range_supported,
- content_type: head.content_type,
- range: None,
- }))
-}
-
-#[get("/n/<id>/stream?<spec..>")]
-pub async fn r_stream(
- _session: Session,
- _federation: &State<Federation>,
- db: &State<Database>,
- id: &str,
- range: Option<RequestRange>,
- spec: BTreeMap<String, String>,
-) -> Result<Either<StreamResponse, RedirectResponse>, MyError> {
- let spec = StreamSpec::from_query_kv(&spec).map_err(|x| anyhow!("spec invalid: {x}"))?;
- // TODO perm
- let node = db
- .get_node_slug(id)?
- .ok_or(anyhow!("node does not exist"))?;
-
- let media = Arc::new(
- node.media
- .clone()
- .ok_or(anyhow!("item does not contain media"))?,
- );
-
- // TODO its unclear how requests with multiple tracks should be handled.
- // if spec.track.len() == 1 {
- // let ti = spec.track[0];
- // if let TrackSource::Remote(remote_index) = media.tracks[ti].source {
- // session
- // .user
- // .permissions
- // .assert(&UserPermission::FederatedContent)?;
-
- // let track = &node.media.as_ref().ok_or(anyhow!("no media"))?.tracks[ti];
- // let host = track
- // .federated
- // .last()
- // .ok_or(anyhow!("federation inconsistent"))?;
-
- // let FederationAccount {
- // password, username, ..
- // } = SECRETS
- // .federation
- // .get(host)
- // .ok_or(anyhow!("no credentials on the server-side"))?;
-
- // info!("creating session on {host}");
- // let instance = federation.get_instance(host)?.to_owned();
- // let session = instance
- // .login(CreateSessionParams {
- // username: username.to_owned(),
- // password: password.to_owned(),
- // expire: Some(60),
- // drop_permissions: Some(HashSet::from_iter([
- // UserPermission::ManageSelf,
- // UserPermission::Admin, // in case somebody federated the admin :)))
- // ])),
- // })
- // .await?;
-
- // let uri = session.stream_url(
- // node.slug.clone().into(),
- // &StreamSpec {
- // track: vec![remote_index],
- // ..spec
- // },
- // );
- // info!("federation redirect");
- // return Ok(Either::Right(RedirectResponse(uri)));
- // }
- // }
-
- info!(
- "stream request (range={})",
- range
- .as_ref()
- .map(|r| r.to_cr_hv())
- .unwrap_or("none".to_string())
- );
-
- let urange = match &range {
- Some(r) => {
- let r = r.0.first().unwrap_or(&(None..None));
- r.start.unwrap_or(0)..r.end.unwrap_or(isize::MAX as usize)
- }
- None => 0..(isize::MAX as usize),
- };
-
- let head = jellystream::stream_head(&spec);
-
- let mut sources = BTreeSet::new();
- for t in &media.tracks {
- if let TrackSource::Local(x) = &t.source {
- if let AssetInner::LocalTrack(m) = AssetInner::deser(&x.0)? {
- sources.insert(m.path);
- }
- }
- }
- let media = Arc::new(SMediaInfo {
- files: sources,
- info: node,
- });
-
- match jellystream::stream(media, spec, urange).await {
- Ok(stream) => Ok(Either::Left(StreamResponse {
- stream,
- range,
- advertise_range: head.range_supported,
- content_type: head.content_type,
- })),
- Err(e) => {
- warn!("stream error: {e}");
- Err(MyError(e))
- }
- }
-}
-
-pub struct RedirectResponse(String);
-
-#[rocket::async_trait]
-impl<'r> Responder<'r, 'static> for RedirectResponse {
- fn respond_to(self, _: &'r Request<'_>) -> response::Result<'static> {
- let mut b = Response::build();
- b.status(Status::Found);
- b.header(Header::new("access-control-allow-origin", "*"));
- b.header(Header::new("location", self.0));
- Ok(b.finalize())
- }
-}
-
-pub struct StreamResponse {
- stream: DuplexStream,
- advertise_range: bool,
- content_type: &'static str,
- range: Option<RequestRange>,
-}
-
-#[rocket::async_trait]
-impl<'r> Responder<'r, 'static> for StreamResponse {
- fn respond_to(self, _: &'r Request<'_>) -> response::Result<'static> {
- let mut b = Response::build();
- b.status(Status::Ok);
- b.header(Header::new("access-control-allow-origin", "*"));
- if self.advertise_range {
- //* it is very important here to not reply with content range if we didnt advertise.
- //* mpv requests range but will crash if we dont pretend to not support it.
- if let Some(range) = self.range {
- b.status(Status::PartialContent);
- b.header(Header::new("content-range", range.to_cr_hv()));
- }
- b.header(Header::new("accept-ranges", "bytes"));
- }
- b.header(Header::new("content-type", self.content_type))
- .streamed_body(self.stream)
- .ok()
- }
-}
-
-#[derive(Debug)]
-pub struct RequestRange(Vec<Range<Option<usize>>>);
-
-impl RequestRange {
- pub fn to_cr_hv(&self) -> String {
- assert_eq!(self.0.len(), 1);
- format!(
- "bytes {}-{}/*",
- self.0[0].start.map(|e| e.to_string()).unwrap_or_default(),
- self.0[0].end.map(|e| e.to_string()).unwrap_or_default()
- )
- }
- pub fn from_hv(s: &str) -> Result<Self> {
- Ok(Self(
- s.strip_prefix("bytes=")
- .ok_or(anyhow!("prefix expected"))?
- .split(',')
- .map(|s| {
- let (l, r) = s
- .split_once('-')
- .ok_or(anyhow!("range delimeter missing"))?;
- let km = |s: &str| {
- if s.is_empty() {
- Ok::<_, anyhow::Error>(None)
- } else {
- Ok(Some(s.parse()?))
- }
- };
- Ok(km(l)?..km(r)?)
- })
- .collect::<Result<Vec<_>>>()?,
- ))
- }
-}
-
-#[rocket::async_trait]
-impl<'r> FromRequest<'r> for RequestRange {
- type Error = anyhow::Error;
-
- async fn from_request(req: &'r Request<'_>) -> request::Outcome<Self, Self::Error> {
- match req.headers().get("range").next() {
- Some(v) => match Self::from_hv(v) {
- Ok(v) => rocket::outcome::Outcome::Success(v),
- Err(e) => rocket::outcome::Outcome::Error((Status::BadRequest, e)),
- },
- None => rocket::outcome::Outcome::Forward(Status::Ok),
- }
- }
-}