diff options
| -rw-r--r-- | server/src/logic/stream.rs | 181 | ||||
| -rw-r--r-- | server/src/main.rs | 1 | ||||
| -rw-r--r-- | stream/src/lib.rs | 12 | ||||
| -rw-r--r-- | stream/src/stream_info.rs | 16 |
4 files changed, 81 insertions, 129 deletions
diff --git a/server/src/logic/stream.rs b/server/src/logic/stream.rs index 430c10c..6f0fdc4 100644 --- a/server/src/logic/stream.rs +++ b/server/src/logic/stream.rs @@ -5,7 +5,11 @@ */ use crate::{request_info::RequestInfo, ui::error::MyError}; use anyhow::{Result, anyhow}; -use jellycommon::stream::StreamSpec; +use jellycommon::{ + NO_SLUG, NO_TITLE, NO_TRACK, TR_SOURCE, TRSOURCE_LOCAL_PATH, jellyobject::Path, + stream::StreamSpec, +}; +use jellydb::{Filter, Query, Sort}; use jellystream::SMediaInfo; use log::{info, warn}; use rocket::{ @@ -41,128 +45,83 @@ pub async fn r_stream_head( })) } -#[get("/n/<id>/stream?<spec..>")] +#[get("/n/<slug>/stream?<spec..>")] pub async fn r_stream( - session: RequestInfo<'_>, - id: &str, + ri: RequestInfo<'_>, + slug: &str, range: Option<RequestRange>, spec: BTreeMap<String, String>, -) -> Result<Either<StreamResponse, RedirectResponse>, MyError> { +) -> Result<StreamResponse, MyError> { let spec = StreamSpec::from_query_kv(&spec).map_err(|x| anyhow!("spec invalid: {x}"))?; - // // TODO perm - // let node = get_node( - // &session.0, - // NodeID::from_slug(id), - // false, - // false, - // NodeFilterSort::default(), - // )? - // .node; - - // let media = Arc::new( - // node.media - // .clone() - // .ok_or(anyhow!("item does not contain media"))?, - // ); - - // TODO its unclear how requests with multiple tracks should be handled. - // if spec.track.len() == 1 { - // let ti = spec.track[0]; - // if let TrackSource::Remote(remote_index) = media.tracks[ti].source { - // session - // .user - // .permissions - // .assert(&UserPermission::FederatedContent)?; - - // let track = &node.media.as_ref().ok_or(anyhow!("no media"))?.tracks[ti]; - // let host = track - // .federated - // .last() - // .ok_or(anyhow!("federation inconsistent"))?; - // let FederationAccount { - // password, username, .. - // } = SECRETS - // .federation - // .get(host) - // .ok_or(anyhow!("no credentials on the server-side"))?; - - // info!("creating session on {host}"); - // let instance = federation.get_instance(host)?.to_owned(); - // let session = instance - // .login(CreateSessionParams { - // username: username.to_owned(), - // password: password.to_owned(), - // expire: Some(60), - // drop_permissions: Some(HashSet::from_iter([ - // UserPermission::ManageSelf, - // UserPermission::Admin, // in case somebody federated the admin :))) - // ])), - // }) - // .await?; + let mut node = None; + ri.state.database.transaction(&mut |txn| { + if let Some(row) = txn.query_single(Query { + filter: Filter::Match(Path(vec![NO_SLUG.0]), slug.into()), + sort: Sort::None, + })? { + node = txn.get(row)?; + } + Ok(()) + })?; - // let uri = session.stream_url( - // node.slug.clone().into(), - // &StreamSpec { - // track: vec![remote_index], - // ..spec - // }, - // ); - // info!("federation redirect"); - // return Ok(Either::Right(RedirectResponse(uri))); - // } - // } + let Some(node) = node.as_ref().map(|n| n.as_object()) else { + Err(anyhow!("node not found"))? + }; - // info!( - // "stream request (range={})", - // range - // .as_ref() - // .map(|r| r.to_cr_hv()) - // .unwrap_or("none".to_string()) - // ); + info!( + "stream request (range={})", + range + .as_ref() + .map(|r| r.to_cr_hv()) + .unwrap_or("none".to_string()) + ); - // let urange = match &range { - // Some(r) => { - // let r = r.0.first().unwrap_or(&(None..None)); - // r.start.unwrap_or(0)..r.end.unwrap_or(u64::MAX) - // } - // None => 0..u64::MAX, - // }; + let urange = match &range { + Some(r) => { + let r = r.0.first().unwrap_or(&(None..None)); + r.start.unwrap_or(0)..r.end.unwrap_or(u64::MAX) + } + None => 0..u64::MAX, + }; - // let head = jellystream::stream_head(&spec); + let head = jellystream::stream_head(&spec); - // let mut sources = BTreeSet::new(); - // for t in &media.tracks { - // if let TrackSource::Local(path, _) = &t.source { - // sources.insert(path.to_owned()); - // } - // } - // let media = Arc::new(SMediaInfo { - // files: sources, - // title: node.title.clone(), - // }); + let mut sources = BTreeSet::new(); + for track in node.iter(NO_TRACK) { + if let Some(s) = track.get(TR_SOURCE) { + if let Some(path) = s.get(TRSOURCE_LOCAL_PATH) { + sources.insert(path.into()); + } + } + } + let media = Arc::new(SMediaInfo { + files: sources, + title: node.get(NO_TITLE).map(String::from), + cache: ri.state.cache.clone(), + config: ri.state.config.stream.clone(), + }); - // // TODO cleaner solution needed - // let mut reader = match spawn_blocking(move || jellystream::stream(media, spec, urange)) - // .await - // .unwrap() - // { - // Ok(o) => o, - // Err(e) => { - // warn!("stream error: {e:?}"); - // Err(e)? - // } - // }; - // let (stream_write, stream_read) = duplex(4096); - // spawn_blocking(move || std::io::copy(&mut reader, &mut SyncIoBridge::new(stream_write))); + // TODO too many threads + let mut reader = match spawn_blocking(move || jellystream::stream(media, spec, urange)) + .await + .unwrap() + { + Ok(o) => o, + Err(e) => { + warn!("stream error: {e:?}"); + Err(e)? + } + }; + let (stream_write, stream_read) = duplex(4096); + spawn_blocking(move || std::io::copy(&mut reader, &mut SyncIoBridge::new(stream_write))); - // Ok(Either::Left(StreamResponse { - // stream: stream_read, - // range, - // advertise_range: head.range_supported, - // content_type: head.content_type, - // })) - todo!() + Ok(StreamResponse { + stream: stream_read, + range, + advertise_range: head.range_supported, + content_type: head.content_type, + }) } pub struct RedirectResponse(String); diff --git a/server/src/main.rs b/server/src/main.rs index 8b4beb5..cbad704 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -65,6 +65,7 @@ pub struct State { pub struct Config { pub import: jellyimport::Config, pub ui: jellyui::Config, + pub stream: Arc<jellystream::Config>, pub session_key: String, pub admin_password: String, pub asset_path: PathBuf, diff --git a/stream/src/lib.rs b/stream/src/lib.rs index 424e44b..1407643 100644 --- a/stream/src/lib.rs +++ b/stream/src/lib.rs @@ -25,7 +25,7 @@ use std::{ io::{Read, Seek, SeekFrom}, ops::Range, path::PathBuf, - sync::{Arc, LazyLock, Mutex}, + sync::Arc, }; use stream_info::{stream_info, write_stream_info}; @@ -39,19 +39,11 @@ pub struct Config { #[serde(default)] pub offer_av1: bool, } -pub static CONF_PRELOAD: Mutex<Option<Config>> = Mutex::new(None); -static CONF: LazyLock<Config> = LazyLock::new(|| { - CONF_PRELOAD - .lock() - .unwrap() - .take() - .expect("stream config not preloaded. logic error") -}); - pub struct SMediaInfo { pub title: Option<String>, pub files: BTreeSet<PathBuf>, pub cache: Arc<Cache>, + pub config: Arc<Config>, } pub struct StreamHead { diff --git a/stream/src/stream_info.rs b/stream/src/stream_info.rs index 33dd288..694a44f 100644 --- a/stream/src/stream_info.rs +++ b/stream/src/stream_info.rs @@ -3,7 +3,7 @@ which is licensed under the GNU Affero General Public License (version 3); see /COPYING. Copyright (C) 2026 metamuffin <metamuffin.org> */ -use crate::{CONF, SMediaInfo, cues::generate_cues, metadata::read_metadata}; +use crate::{Config, SMediaInfo, cues::generate_cues, metadata::read_metadata}; use anyhow::Result; use jellyremuxer::matroska::{self, Segment, TrackEntry, TrackType}; use jellystream_types::{ @@ -47,7 +47,7 @@ pub(crate) fn stream_info(info: &SMediaInfo) -> Result<(InternalStreamInfo, Stre matroska::TrackType::Subtitle => TrackKind::Subtitle, _ => todo!(), }, - formats: stream_formats(t, byterate * 8.), + formats: stream_formats(&info.config, t, byterate * 8.), }); track_to_file.push((i, t.track_number)); } @@ -71,7 +71,7 @@ pub(crate) fn stream_info(info: &SMediaInfo) -> Result<(InternalStreamInfo, Stre )) } -fn stream_formats(t: &TrackEntry, remux_bitrate: f64) -> Vec<StreamFormatInfo> { +fn stream_formats(config: &Config, t: &TrackEntry, remux_bitrate: f64) -> Vec<StreamFormatInfo> { let mut formats = Vec::new(); formats.push(StreamFormatInfo { codec: t.codec_id.to_string(), @@ -103,11 +103,11 @@ fn stream_formats(t: &TrackEntry, remux_bitrate: f64) -> Vec<StreamFormatInfo> { // most codecs use chroma subsampling that requires even dims let h = ((w * sh) / sw) & !1; // clear last bit to ensure even height. for (cid, enable) in [ - ("V_AV1", CONF.offer_av1), - ("V_VP8", CONF.offer_vp8), - ("V_VP9", CONF.offer_vp9), - ("V_MPEG4/ISO/AVC", CONF.offer_avc), - ("V_MPEGH/ISO/HEVC", CONF.offer_hevc), + ("V_AV1", config.offer_av1), + ("V_VP8", config.offer_vp8), + ("V_VP9", config.offer_vp9), + ("V_MPEG4/ISO/AVC", config.offer_avc), + ("V_MPEGH/ISO/HEVC", config.offer_hevc), ] { if enable { formats.push(StreamFormatInfo { |