diff options
author | metamuffin <metamuffin@disroot.org> | 2023-09-29 20:56:36 +0200 |
---|---|---|
committer | metamuffin <metamuffin@disroot.org> | 2023-09-29 20:56:36 +0200 |
commit | c62eb3a2fdaa80f472be6ecbfc2cbf2479d8d914 (patch) | |
tree | 7a32678b59c123ea6fbe6c01237aec5e3b143e87 | |
parent | 29b12a48bcfa3aa0f814f7b39a64868b6313c13d (diff) | |
download | jellything-c62eb3a2fdaa80f472be6ecbfc2cbf2479d8d914.tar jellything-c62eb3a2fdaa80f472be6ecbfc2cbf2479d8d914.tar.bz2 jellything-c62eb3a2fdaa80f472be6ecbfc2cbf2479d8d914.tar.zst |
move stream generation to new crate
-rw-r--r-- | Cargo.lock | 15 | ||||
-rw-r--r-- | Cargo.toml | 1 | ||||
-rw-r--r-- | client/src/lib.rs | 12 | ||||
-rw-r--r-- | common/src/lib.rs | 5 | ||||
-rw-r--r-- | common/src/stream.rs | 38 | ||||
-rw-r--r-- | server/Cargo.toml | 4 | ||||
-rw-r--r-- | server/src/main.rs | 4 | ||||
-rw-r--r-- | server/src/routes/mod.rs | 3 | ||||
-rw-r--r-- | server/src/routes/stream.rs | 86 | ||||
-rw-r--r-- | server/src/routes/ui/player.rs | 40 | ||||
-rw-r--r-- | stream/Cargo.toml | 14 | ||||
-rw-r--r-- | stream/src/lib.rs | 15 |
12 files changed, 147 insertions, 90 deletions
@@ -1602,6 +1602,19 @@ dependencies = [ ] [[package]] +name = "jellystream" +version = "0.1.0" +dependencies = [ + "anyhow", + "jellybase", + "jellycommon", + "jellytranscoder", + "log", + "tokio", + "tokio-util", +] + +[[package]] name = "jellything" version = "0.1.0" dependencies = [ @@ -1618,7 +1631,7 @@ dependencies = [ "jellybase", "jellyclient", "jellycommon", - "jellyremuxer", + "jellystream", "jellytranscoder", "log", "markup", @@ -15,6 +15,7 @@ resolver = "2" [workspace.dependencies] rocket = { git = "https://github.com/SergioBenitez/Rocket", rev = "2cf38a5aa37fe046e46b03740835787f0396307b" } log = "0.4.20" +anyhow = "1.0.75" [profile.dev.package.rav1e] opt-level = 3 diff --git a/client/src/lib.rs b/client/src/lib.rs index a3c61aa..bd9882c 100644 --- a/client/src/lib.rs +++ b/client/src/lib.rs @@ -4,6 +4,7 @@ Copyright (C) 2023 metamuffin <metamuffin.org> */ use anyhow::Result; +use jc::stream::StreamSpec; use jellycommon::NodePublic; use log::debug; use reqwest::{ @@ -102,17 +103,12 @@ impl Session { Ok(()) } - pub fn stream(&self, id: &str, tracks: &[usize], webm: bool) -> String { + pub fn stream(&self, id: &str, stream_spec: &StreamSpec) -> String { format!( - "{}/n/{}/stream?tracks={}&webm={}&{}", + "{}/n/{}/stream?{}&{}", self.instance.base(), id, - tracks - .iter() - .map(|v| format!("{v}")) - .collect::<Vec<_>>() - .join(","), - if webm { "1" } else { "0" }, + todo!(), self.session_param() ) } diff --git a/common/src/lib.rs b/common/src/lib.rs index 6c76903..b7f975a 100644 --- a/common/src/lib.rs +++ b/common/src/lib.rs @@ -6,6 +6,7 @@ pub mod config; pub mod helpers; pub mod r#impl; +pub mod stream; use bincode::{Decode, Encode}; #[cfg(feature = "rocket")] @@ -88,10 +89,12 @@ pub enum PublicMediaSource { Remote(String), } +pub type TrackID = usize; + #[derive(Debug, Clone, Deserialize, Serialize)] pub struct LocalTrack { pub path: PathBuf, - pub track: usize, + pub track: TrackID, pub codec_private: Option<Vec<u8>>, } diff --git a/common/src/stream.rs b/common/src/stream.rs new file mode 100644 index 0000000..59166b8 --- /dev/null +++ b/common/src/stream.rs @@ -0,0 +1,38 @@ +#[cfg(feature = "rocket")] +use rocket::{FromForm, FromFormField, UriDisplayQuery}; +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Clone, Deserialize, Serialize)] +#[cfg_attr(feature = "rocket", derive(FromForm, UriDisplayQuery))] +pub struct StreamSpec { + pub tracks: Vec<usize>, + pub format: StreamFormat, + pub abr: Option<usize>, + pub vbr: Option<usize>, + pub index: Option<usize>, +} + +#[rustfmt::skip] +#[derive(Debug, Clone, Deserialize, Serialize)] +#[serde(rename_all = "snake_case")] +#[cfg_attr(feature = "rocket", derive(FromFormField, UriDisplayQuery))] +pub enum StreamFormat { + #[cfg_attr(feature = "rocket", field(value = "original"))] Original, + #[cfg_attr(feature = "rocket", field(value = "matroska"))] Matroska, + #[cfg_attr(feature = "rocket", field(value = "webm"))] Webm, + #[cfg_attr(feature = "rocket", field(value = "hls"))] Hls, + #[cfg_attr(feature = "rocket", field(value = "jhls"))] Jhls, + #[cfg_attr(feature = "rocket", field(value = "hlsseg"))] Segment, +} + +impl Default for StreamSpec { + fn default() -> Self { + Self { + tracks: Default::default(), + format: StreamFormat::Webm, + abr: Default::default(), + vbr: Default::default(), + index: Default::default(), + } + } +} diff --git a/server/Cargo.toml b/server/Cargo.toml index 57592cf..9a87f61 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -6,7 +6,7 @@ edition = "2021" [dependencies] jellycommon = { path = "../common", features = ["rocket"] } jellybase = { path = "../base" } -jellyremuxer = { path = "../remuxer" } +jellystream = { path = "../stream" } jellyclient = { path = "../client" } jellytranscoder = { path = "../transcoder" } @@ -15,8 +15,8 @@ bincode = { version = "2.0.0-rc.3", features = ["serde", "derive"] } serde_json = "1.0.107" log = { workspace = true } +anyhow = { workspace = true } env_logger = "0.10.0" -anyhow = "1.0.75" rand = "0.8.5" base64 = "0.21.4" chrono = { version = "0.4.31", features = ["serde"] } diff --git a/server/src/main.rs b/server/src/main.rs index 43b1db4..f2c1440 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -10,7 +10,6 @@ use crate::routes::ui::admin::log::enable_logging; use database::Database; use federation::Federation; use jellybase::CONF; -use jellyremuxer::RemuxerContext; use log::{error, warn}; use routes::build_rocket; use tokio::fs::create_dir_all; @@ -33,14 +32,13 @@ fn main() { } async fn async_main() { create_dir_all(&CONF.cache_path).await.unwrap(); - let remuxer = RemuxerContext::new(); let database = Database::open(&CONF.database_path).unwrap(); let federation = Federation::initialize(); database.create_admin(); // if let Err(err) = import::import(&database, &federation).await { // log::error!("import not sucessful: {err:?}") // } - let r = build_rocket(remuxer, database, federation).launch().await; + let r = build_rocket(database, federation).launch().await; match r { Ok(_) => warn!("server shutdown"), Err(e) => error!("server exited: {e}"), diff --git a/server/src/routes/mod.rs b/server/src/routes/mod.rs index 86a55ac..6fe5018 100644 --- a/server/src/routes/mod.rs +++ b/server/src/routes/mod.rs @@ -7,7 +7,6 @@ use crate::{database::Database, federation::Federation, routes::ui::error::MyRes use api::{r_api_account_login, r_api_node_raw, r_api_root, r_api_version}; use base64::Engine; use jellybase::CONF; -use jellyremuxer::RemuxerContext; use log::warn; use rand::random; use rocket::{ @@ -47,7 +46,6 @@ macro_rules! uri { } pub fn build_rocket( - remuxer: RemuxerContext, database: Database, federation: Federation, ) -> Rocket<Build> { @@ -70,7 +68,6 @@ pub fn build_rocket( ), ..Default::default() }) - .manage(remuxer) .manage(database) .manage(federation) .attach(AdHoc::on_response("set server header", |_req, res| { diff --git a/server/src/routes/stream.rs b/server/src/routes/stream.rs index e920341..6b268e4 100644 --- a/server/src/routes/stream.rs +++ b/server/src/routes/stream.rs @@ -7,9 +7,8 @@ use super::ui::{account::session::Session, error::MyError}; use crate::{database::Database, federation::Federation}; use anyhow::{anyhow, Result}; use jellybase::CONF; -use jellycommon::MediaSource; -use jellyremuxer::RemuxerContext; -use log::{debug, info, warn}; +use jellycommon::{stream::StreamSpec, MediaSource}; +use log::{info, warn}; use rocket::{ get, http::{ContentType, Header, Status}, @@ -17,56 +16,43 @@ use rocket::{ response::{self, Redirect, Responder}, Either, Request, Response, State, }; -use std::{ - ops::{Deref, Range}, - time::Duration, -}; -use tokio::io::{duplex, DuplexStream}; -use tokio_util::io::SyncIoBridge; +use std::{ops::Range, time::Duration}; +use tokio::io::DuplexStream; -#[get("/n/<id>/stream?<tracks>&<webm>")] +#[get("/n/<id>/stream?<spec>")] pub async fn r_stream( _sess: Session, - id: String, - webm: Option<bool>, - tracks: String, - remuxer: &State<RemuxerContext>, federation: &State<Federation>, db: &State<Database>, + id: String, range: Option<RequestRange>, + spec: StreamSpec, ) -> Result<Either<StreamResponse, Redirect>, MyError> { let node = db.node.get(&id)?.ok_or(anyhow!("node does not exist"))?; let source = node .private .source + .as_ref() .ok_or(anyhow!("item does not contain media"))?; - let tracks = tracks - .split(',') - .map(|e| e.parse().map_err(|_| anyhow!("invalid number"))) - .collect::<Result<Vec<_>, _>>()?; + if let MediaSource::Remote { host, remote_id } = source { + let (username, password, _) = CONF + .remote_credentials + .get(host) + .ok_or(anyhow!("no credentials on the server-side"))?; - let source_tracks = match source { - MediaSource::Local { tracks } => tracks, - MediaSource::Remote { host, remote_id } => { - let (username, password, _) = CONF - .remote_credentials - .get(&host) - .ok_or(anyhow!("no credentials on the server-side"))?; + let instance = federation.get_instance(&host)?.to_owned(); + let session = instance + .login( + username.to_owned(), + password.to_owned(), + Duration::from_secs(60), + ) + .await?; - let instance = federation.get_instance(&host)?.to_owned(); - let session = instance - .login( - username.to_owned(), - password.to_owned(), - Duration::from_secs(60), - ) - .await?; - - let uri = session.stream(&remote_id, &tracks, webm.unwrap_or(false)); - return Ok(Either::Right(Redirect::found(uri))); - } - }; + let uri = session.stream(&remote_id, &spec); + return Ok(Either::Right(Redirect::found(uri))); + } info!( "stream request (range={})", @@ -76,10 +62,6 @@ pub async fn r_stream( .unwrap_or(format!("none")) ); - let (a, b) = duplex(4096); - let remuxer = remuxer.deref().clone(); - let b = SyncIoBridge::new(b); - let urange = match &range { Some(r) => { let r = r.0.get(0).unwrap_or(&(None..None)); @@ -88,21 +70,13 @@ pub async fn r_stream( None => 0..(isize::MAX as usize), }; - tokio::task::spawn_blocking(move || { - if let Err(e) = remuxer.generate_into( - b, - urange, - CONF.library_path.clone(), - node.public, - source_tracks, - tracks, - webm.unwrap_or(false), - ) { - warn!("stream stopped: {e}") + match jellystream::stream(node, spec, urange).await { + Ok(stream) => Ok(Either::Left(StreamResponse { stream, range })), + Err(e) => { + warn!("stream error: {e}"); + Err(MyError(e)) } - }); - debug!("starting stream"); - Ok(Either::Left(StreamResponse { stream: a, range })) + } } pub struct StreamResponse { diff --git a/server/src/routes/ui/player.rs b/server/src/routes/ui/player.rs index 005e513..fa9657f 100644 --- a/server/src/routes/ui/player.rs +++ b/server/src/routes/ui/player.rs @@ -17,15 +17,18 @@ use crate::{ uri, }; use anyhow::anyhow; -use jellycommon::{Node, SourceTrackKind}; +use jellycommon::{ + stream::{StreamFormat, StreamSpec}, + Node, SourceTrackKind, TrackID, +}; use markup::DynRender; use rocket::{get, FromForm, State, UriDisplayQuery}; #[derive(FromForm, Default, Clone, Debug, UriDisplayQuery)] pub struct PlayerConfig { - pub a: Option<u64>, - pub v: Option<u64>, - pub s: Option<u64>, + pub a: Option<TrackID>, + pub v: Option<TrackID>, + pub s: Option<TrackID>, pub webm: bool, } @@ -37,24 +40,29 @@ pub fn r_player( conf: PlayerConfig, ) -> MyResult<DynLayoutPage<'_>> { let item = db.node.get(&id)?.ok_or(anyhow!("node does not exist"))?; - let tracks = None - .into_iter() - .chain(conf.v.into_iter()) - .chain(conf.a.into_iter()) - .chain(conf.s.into_iter()) - .map(|e| format!("{e}")) - .collect::<Vec<_>>() - .join(","); - let conf = player_conf(item.clone(), !tracks.is_empty())?; + let spec = StreamSpec { + tracks: None + .into_iter() + .chain(conf.v.into_iter()) + .chain(conf.a.into_iter()) + .chain(conf.s.into_iter()) + .collect::<Vec<_>>(), + format: StreamFormat::Webm, + ..Default::default() + }; + + let playing = !spec.tracks.is_empty(); + + let conf = player_conf(item.clone(), playing)?; Ok(LayoutPage { title: item.public.title.to_owned(), class: Some("player"), content: markup::new! { - @if tracks.is_empty() { - img.backdrop[src=uri!(r_item_assets(&id, AssetRole::Backdrop, Some(2048))).to_string()]; + @if playing { + video[src=uri!(r_stream(&id, &spec)), controls, preload="auto"]{} } else { - video[src=uri!(r_stream(&id, &tracks, Some(true))), controls, preload="auto"]{} + img.backdrop[src=uri!(r_item_assets(&id, AssetRole::Backdrop, Some(2048))).to_string()]; } @conf }, diff --git a/stream/Cargo.toml b/stream/Cargo.toml new file mode 100644 index 0000000..804bc1c --- /dev/null +++ b/stream/Cargo.toml @@ -0,0 +1,14 @@ +[package] +name = "jellystream" +version = "0.1.0" +edition = "2021" + +[dependencies] +jellycommon = { path = "../common", features = ["rocket"] } +jellybase = { path = "../base" } +jellytranscoder = { path = "../transcoder" } + +log = { workspace = true } +anyhow = { workspace = true } +tokio = { version = "1.32.0", features = ["io-util"] } +tokio-util = { version = "0.7.9", features = ["io", "io-util"] } diff --git a/stream/src/lib.rs b/stream/src/lib.rs new file mode 100644 index 0000000..df75cf5 --- /dev/null +++ b/stream/src/lib.rs @@ -0,0 +1,15 @@ +use jellycommon::{stream::StreamSpec, Node}; +use std::ops::Range; +use tokio::io::{duplex, DuplexStream}; +use tokio_util::io::SyncIoBridge; + +pub async fn stream( + node: Node, + spec: StreamSpec, + range: Range<usize>, +) -> anyhow::Result<DuplexStream> { + let (a, b) = duplex(4096); + let b = SyncIoBridge::new(b); + + Ok(a) +} |