aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--server/src/logic/stream.rs181
-rw-r--r--server/src/main.rs1
-rw-r--r--stream/src/lib.rs12
-rw-r--r--stream/src/stream_info.rs16
4 files changed, 81 insertions, 129 deletions
diff --git a/server/src/logic/stream.rs b/server/src/logic/stream.rs
index 430c10c..6f0fdc4 100644
--- a/server/src/logic/stream.rs
+++ b/server/src/logic/stream.rs
@@ -5,7 +5,11 @@
*/
use crate::{request_info::RequestInfo, ui::error::MyError};
use anyhow::{Result, anyhow};
-use jellycommon::stream::StreamSpec;
+use jellycommon::{
+ NO_SLUG, NO_TITLE, NO_TRACK, TR_SOURCE, TRSOURCE_LOCAL_PATH, jellyobject::Path,
+ stream::StreamSpec,
+};
+use jellydb::{Filter, Query, Sort};
use jellystream::SMediaInfo;
use log::{info, warn};
use rocket::{
@@ -41,128 +45,83 @@ pub async fn r_stream_head(
}))
}
-#[get("/n/<id>/stream?<spec..>")]
+#[get("/n/<slug>/stream?<spec..>")]
pub async fn r_stream(
- session: RequestInfo<'_>,
- id: &str,
+ ri: RequestInfo<'_>,
+ slug: &str,
range: Option<RequestRange>,
spec: BTreeMap<String, String>,
-) -> Result<Either<StreamResponse, RedirectResponse>, MyError> {
+) -> Result<StreamResponse, MyError> {
let spec = StreamSpec::from_query_kv(&spec).map_err(|x| anyhow!("spec invalid: {x}"))?;
- // // TODO perm
- // let node = get_node(
- // &session.0,
- // NodeID::from_slug(id),
- // false,
- // false,
- // NodeFilterSort::default(),
- // )?
- // .node;
-
- // let media = Arc::new(
- // node.media
- // .clone()
- // .ok_or(anyhow!("item does not contain media"))?,
- // );
-
- // TODO its unclear how requests with multiple tracks should be handled.
- // if spec.track.len() == 1 {
- // let ti = spec.track[0];
- // if let TrackSource::Remote(remote_index) = media.tracks[ti].source {
- // session
- // .user
- // .permissions
- // .assert(&UserPermission::FederatedContent)?;
-
- // let track = &node.media.as_ref().ok_or(anyhow!("no media"))?.tracks[ti];
- // let host = track
- // .federated
- // .last()
- // .ok_or(anyhow!("federation inconsistent"))?;
- // let FederationAccount {
- // password, username, ..
- // } = SECRETS
- // .federation
- // .get(host)
- // .ok_or(anyhow!("no credentials on the server-side"))?;
-
- // info!("creating session on {host}");
- // let instance = federation.get_instance(host)?.to_owned();
- // let session = instance
- // .login(CreateSessionParams {
- // username: username.to_owned(),
- // password: password.to_owned(),
- // expire: Some(60),
- // drop_permissions: Some(HashSet::from_iter([
- // UserPermission::ManageSelf,
- // UserPermission::Admin, // in case somebody federated the admin :)))
- // ])),
- // })
- // .await?;
+ let mut node = None;
+ ri.state.database.transaction(&mut |txn| {
+ if let Some(row) = txn.query_single(Query {
+ filter: Filter::Match(Path(vec![NO_SLUG.0]), slug.into()),
+ sort: Sort::None,
+ })? {
+ node = txn.get(row)?;
+ }
+ Ok(())
+ })?;
- // let uri = session.stream_url(
- // node.slug.clone().into(),
- // &StreamSpec {
- // track: vec![remote_index],
- // ..spec
- // },
- // );
- // info!("federation redirect");
- // return Ok(Either::Right(RedirectResponse(uri)));
- // }
- // }
+ let Some(node) = node.as_ref().map(|n| n.as_object()) else {
+ Err(anyhow!("node not found"))?
+ };
- // info!(
- // "stream request (range={})",
- // range
- // .as_ref()
- // .map(|r| r.to_cr_hv())
- // .unwrap_or("none".to_string())
- // );
+ info!(
+ "stream request (range={})",
+ range
+ .as_ref()
+ .map(|r| r.to_cr_hv())
+ .unwrap_or("none".to_string())
+ );
- // let urange = match &range {
- // Some(r) => {
- // let r = r.0.first().unwrap_or(&(None..None));
- // r.start.unwrap_or(0)..r.end.unwrap_or(u64::MAX)
- // }
- // None => 0..u64::MAX,
- // };
+ let urange = match &range {
+ Some(r) => {
+ let r = r.0.first().unwrap_or(&(None..None));
+ r.start.unwrap_or(0)..r.end.unwrap_or(u64::MAX)
+ }
+ None => 0..u64::MAX,
+ };
- // let head = jellystream::stream_head(&spec);
+ let head = jellystream::stream_head(&spec);
- // let mut sources = BTreeSet::new();
- // for t in &media.tracks {
- // if let TrackSource::Local(path, _) = &t.source {
- // sources.insert(path.to_owned());
- // }
- // }
- // let media = Arc::new(SMediaInfo {
- // files: sources,
- // title: node.title.clone(),
- // });
+ let mut sources = BTreeSet::new();
+ for track in node.iter(NO_TRACK) {
+ if let Some(s) = track.get(TR_SOURCE) {
+ if let Some(path) = s.get(TRSOURCE_LOCAL_PATH) {
+ sources.insert(path.into());
+ }
+ }
+ }
+ let media = Arc::new(SMediaInfo {
+ files: sources,
+ title: node.get(NO_TITLE).map(String::from),
+ cache: ri.state.cache.clone(),
+ config: ri.state.config.stream.clone(),
+ });
- // // TODO cleaner solution needed
- // let mut reader = match spawn_blocking(move || jellystream::stream(media, spec, urange))
- // .await
- // .unwrap()
- // {
- // Ok(o) => o,
- // Err(e) => {
- // warn!("stream error: {e:?}");
- // Err(e)?
- // }
- // };
- // let (stream_write, stream_read) = duplex(4096);
- // spawn_blocking(move || std::io::copy(&mut reader, &mut SyncIoBridge::new(stream_write)));
+ // TODO too many threads
+ let mut reader = match spawn_blocking(move || jellystream::stream(media, spec, urange))
+ .await
+ .unwrap()
+ {
+ Ok(o) => o,
+ Err(e) => {
+ warn!("stream error: {e:?}");
+ Err(e)?
+ }
+ };
+ let (stream_write, stream_read) = duplex(4096);
+ spawn_blocking(move || std::io::copy(&mut reader, &mut SyncIoBridge::new(stream_write)));
- // Ok(Either::Left(StreamResponse {
- // stream: stream_read,
- // range,
- // advertise_range: head.range_supported,
- // content_type: head.content_type,
- // }))
- todo!()
+ Ok(StreamResponse {
+ stream: stream_read,
+ range,
+ advertise_range: head.range_supported,
+ content_type: head.content_type,
+ })
}
pub struct RedirectResponse(String);
diff --git a/server/src/main.rs b/server/src/main.rs
index 8b4beb5..cbad704 100644
--- a/server/src/main.rs
+++ b/server/src/main.rs
@@ -65,6 +65,7 @@ pub struct State {
pub struct Config {
pub import: jellyimport::Config,
pub ui: jellyui::Config,
+ pub stream: Arc<jellystream::Config>,
pub session_key: String,
pub admin_password: String,
pub asset_path: PathBuf,
diff --git a/stream/src/lib.rs b/stream/src/lib.rs
index 424e44b..1407643 100644
--- a/stream/src/lib.rs
+++ b/stream/src/lib.rs
@@ -25,7 +25,7 @@ use std::{
io::{Read, Seek, SeekFrom},
ops::Range,
path::PathBuf,
- sync::{Arc, LazyLock, Mutex},
+ sync::Arc,
};
use stream_info::{stream_info, write_stream_info};
@@ -39,19 +39,11 @@ pub struct Config {
#[serde(default)] pub offer_av1: bool,
}
-pub static CONF_PRELOAD: Mutex<Option<Config>> = Mutex::new(None);
-static CONF: LazyLock<Config> = LazyLock::new(|| {
- CONF_PRELOAD
- .lock()
- .unwrap()
- .take()
- .expect("stream config not preloaded. logic error")
-});
-
pub struct SMediaInfo {
pub title: Option<String>,
pub files: BTreeSet<PathBuf>,
pub cache: Arc<Cache>,
+ pub config: Arc<Config>,
}
pub struct StreamHead {
diff --git a/stream/src/stream_info.rs b/stream/src/stream_info.rs
index 33dd288..694a44f 100644
--- a/stream/src/stream_info.rs
+++ b/stream/src/stream_info.rs
@@ -3,7 +3,7 @@
which is licensed under the GNU Affero General Public License (version 3); see /COPYING.
Copyright (C) 2026 metamuffin <metamuffin.org>
*/
-use crate::{CONF, SMediaInfo, cues::generate_cues, metadata::read_metadata};
+use crate::{Config, SMediaInfo, cues::generate_cues, metadata::read_metadata};
use anyhow::Result;
use jellyremuxer::matroska::{self, Segment, TrackEntry, TrackType};
use jellystream_types::{
@@ -47,7 +47,7 @@ pub(crate) fn stream_info(info: &SMediaInfo) -> Result<(InternalStreamInfo, Stre
matroska::TrackType::Subtitle => TrackKind::Subtitle,
_ => todo!(),
},
- formats: stream_formats(t, byterate * 8.),
+ formats: stream_formats(&info.config, t, byterate * 8.),
});
track_to_file.push((i, t.track_number));
}
@@ -71,7 +71,7 @@ pub(crate) fn stream_info(info: &SMediaInfo) -> Result<(InternalStreamInfo, Stre
))
}
-fn stream_formats(t: &TrackEntry, remux_bitrate: f64) -> Vec<StreamFormatInfo> {
+fn stream_formats(config: &Config, t: &TrackEntry, remux_bitrate: f64) -> Vec<StreamFormatInfo> {
let mut formats = Vec::new();
formats.push(StreamFormatInfo {
codec: t.codec_id.to_string(),
@@ -103,11 +103,11 @@ fn stream_formats(t: &TrackEntry, remux_bitrate: f64) -> Vec<StreamFormatInfo> {
// most codecs use chroma subsampling that requires even dims
let h = ((w * sh) / sw) & !1; // clear last bit to ensure even height.
for (cid, enable) in [
- ("V_AV1", CONF.offer_av1),
- ("V_VP8", CONF.offer_vp8),
- ("V_VP9", CONF.offer_vp9),
- ("V_MPEG4/ISO/AVC", CONF.offer_avc),
- ("V_MPEGH/ISO/HEVC", CONF.offer_hevc),
+ ("V_AV1", config.offer_av1),
+ ("V_VP8", config.offer_vp8),
+ ("V_VP9", config.offer_vp9),
+ ("V_MPEG4/ISO/AVC", config.offer_avc),
+ ("V_MPEGH/ISO/HEVC", config.offer_hevc),
] {
if enable {
formats.push(StreamFormatInfo {