aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Cargo.lock15
-rw-r--r--Cargo.toml1
-rw-r--r--client/src/lib.rs12
-rw-r--r--common/src/lib.rs5
-rw-r--r--common/src/stream.rs38
-rw-r--r--server/Cargo.toml4
-rw-r--r--server/src/main.rs4
-rw-r--r--server/src/routes/mod.rs3
-rw-r--r--server/src/routes/stream.rs86
-rw-r--r--server/src/routes/ui/player.rs40
-rw-r--r--stream/Cargo.toml14
-rw-r--r--stream/src/lib.rs15
12 files changed, 147 insertions, 90 deletions
diff --git a/Cargo.lock b/Cargo.lock
index ab667da..d7246e6 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1602,6 +1602,19 @@ dependencies = [
]
[[package]]
+name = "jellystream"
+version = "0.1.0"
+dependencies = [
+ "anyhow",
+ "jellybase",
+ "jellycommon",
+ "jellytranscoder",
+ "log",
+ "tokio",
+ "tokio-util",
+]
+
+[[package]]
name = "jellything"
version = "0.1.0"
dependencies = [
@@ -1618,7 +1631,7 @@ dependencies = [
"jellybase",
"jellyclient",
"jellycommon",
- "jellyremuxer",
+ "jellystream",
"jellytranscoder",
"log",
"markup",
diff --git a/Cargo.toml b/Cargo.toml
index cd408b3..ac93eff 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -15,6 +15,7 @@ resolver = "2"
[workspace.dependencies]
rocket = { git = "https://github.com/SergioBenitez/Rocket", rev = "2cf38a5aa37fe046e46b03740835787f0396307b" }
log = "0.4.20"
+anyhow = "1.0.75"
[profile.dev.package.rav1e]
opt-level = 3
diff --git a/client/src/lib.rs b/client/src/lib.rs
index a3c61aa..bd9882c 100644
--- a/client/src/lib.rs
+++ b/client/src/lib.rs
@@ -4,6 +4,7 @@
Copyright (C) 2023 metamuffin <metamuffin.org>
*/
use anyhow::Result;
+use jc::stream::StreamSpec;
use jellycommon::NodePublic;
use log::debug;
use reqwest::{
@@ -102,17 +103,12 @@ impl Session {
Ok(())
}
- pub fn stream(&self, id: &str, tracks: &[usize], webm: bool) -> String {
+ pub fn stream(&self, id: &str, stream_spec: &StreamSpec) -> String {
format!(
- "{}/n/{}/stream?tracks={}&webm={}&{}",
+ "{}/n/{}/stream?{}&{}",
self.instance.base(),
id,
- tracks
- .iter()
- .map(|v| format!("{v}"))
- .collect::<Vec<_>>()
- .join(","),
- if webm { "1" } else { "0" },
+ todo!(),
self.session_param()
)
}
diff --git a/common/src/lib.rs b/common/src/lib.rs
index 6c76903..b7f975a 100644
--- a/common/src/lib.rs
+++ b/common/src/lib.rs
@@ -6,6 +6,7 @@
pub mod config;
pub mod helpers;
pub mod r#impl;
+pub mod stream;
use bincode::{Decode, Encode};
#[cfg(feature = "rocket")]
@@ -88,10 +89,12 @@ pub enum PublicMediaSource {
Remote(String),
}
+pub type TrackID = usize;
+
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct LocalTrack {
pub path: PathBuf,
- pub track: usize,
+ pub track: TrackID,
pub codec_private: Option<Vec<u8>>,
}
diff --git a/common/src/stream.rs b/common/src/stream.rs
new file mode 100644
index 0000000..59166b8
--- /dev/null
+++ b/common/src/stream.rs
@@ -0,0 +1,38 @@
+#[cfg(feature = "rocket")]
+use rocket::{FromForm, FromFormField, UriDisplayQuery};
+use serde::{Deserialize, Serialize};
+
+#[derive(Debug, Clone, Deserialize, Serialize)]
+#[cfg_attr(feature = "rocket", derive(FromForm, UriDisplayQuery))]
+pub struct StreamSpec {
+ pub tracks: Vec<usize>,
+ pub format: StreamFormat,
+ pub abr: Option<usize>,
+ pub vbr: Option<usize>,
+ pub index: Option<usize>,
+}
+
+#[rustfmt::skip]
+#[derive(Debug, Clone, Deserialize, Serialize)]
+#[serde(rename_all = "snake_case")]
+#[cfg_attr(feature = "rocket", derive(FromFormField, UriDisplayQuery))]
+pub enum StreamFormat {
+ #[cfg_attr(feature = "rocket", field(value = "original"))] Original,
+ #[cfg_attr(feature = "rocket", field(value = "matroska"))] Matroska,
+ #[cfg_attr(feature = "rocket", field(value = "webm"))] Webm,
+ #[cfg_attr(feature = "rocket", field(value = "hls"))] Hls,
+ #[cfg_attr(feature = "rocket", field(value = "jhls"))] Jhls,
+ #[cfg_attr(feature = "rocket", field(value = "hlsseg"))] Segment,
+}
+
+impl Default for StreamSpec {
+ fn default() -> Self {
+ Self {
+ tracks: Default::default(),
+ format: StreamFormat::Webm,
+ abr: Default::default(),
+ vbr: Default::default(),
+ index: Default::default(),
+ }
+ }
+}
diff --git a/server/Cargo.toml b/server/Cargo.toml
index 57592cf..9a87f61 100644
--- a/server/Cargo.toml
+++ b/server/Cargo.toml
@@ -6,7 +6,7 @@ edition = "2021"
[dependencies]
jellycommon = { path = "../common", features = ["rocket"] }
jellybase = { path = "../base" }
-jellyremuxer = { path = "../remuxer" }
+jellystream = { path = "../stream" }
jellyclient = { path = "../client" }
jellytranscoder = { path = "../transcoder" }
@@ -15,8 +15,8 @@ bincode = { version = "2.0.0-rc.3", features = ["serde", "derive"] }
serde_json = "1.0.107"
log = { workspace = true }
+anyhow = { workspace = true }
env_logger = "0.10.0"
-anyhow = "1.0.75"
rand = "0.8.5"
base64 = "0.21.4"
chrono = { version = "0.4.31", features = ["serde"] }
diff --git a/server/src/main.rs b/server/src/main.rs
index 43b1db4..f2c1440 100644
--- a/server/src/main.rs
+++ b/server/src/main.rs
@@ -10,7 +10,6 @@ use crate::routes::ui::admin::log::enable_logging;
use database::Database;
use federation::Federation;
use jellybase::CONF;
-use jellyremuxer::RemuxerContext;
use log::{error, warn};
use routes::build_rocket;
use tokio::fs::create_dir_all;
@@ -33,14 +32,13 @@ fn main() {
}
async fn async_main() {
create_dir_all(&CONF.cache_path).await.unwrap();
- let remuxer = RemuxerContext::new();
let database = Database::open(&CONF.database_path).unwrap();
let federation = Federation::initialize();
database.create_admin();
// if let Err(err) = import::import(&database, &federation).await {
// log::error!("import not sucessful: {err:?}")
// }
- let r = build_rocket(remuxer, database, federation).launch().await;
+ let r = build_rocket(database, federation).launch().await;
match r {
Ok(_) => warn!("server shutdown"),
Err(e) => error!("server exited: {e}"),
diff --git a/server/src/routes/mod.rs b/server/src/routes/mod.rs
index 86a55ac..6fe5018 100644
--- a/server/src/routes/mod.rs
+++ b/server/src/routes/mod.rs
@@ -7,7 +7,6 @@ use crate::{database::Database, federation::Federation, routes::ui::error::MyRes
use api::{r_api_account_login, r_api_node_raw, r_api_root, r_api_version};
use base64::Engine;
use jellybase::CONF;
-use jellyremuxer::RemuxerContext;
use log::warn;
use rand::random;
use rocket::{
@@ -47,7 +46,6 @@ macro_rules! uri {
}
pub fn build_rocket(
- remuxer: RemuxerContext,
database: Database,
federation: Federation,
) -> Rocket<Build> {
@@ -70,7 +68,6 @@ pub fn build_rocket(
),
..Default::default()
})
- .manage(remuxer)
.manage(database)
.manage(federation)
.attach(AdHoc::on_response("set server header", |_req, res| {
diff --git a/server/src/routes/stream.rs b/server/src/routes/stream.rs
index e920341..6b268e4 100644
--- a/server/src/routes/stream.rs
+++ b/server/src/routes/stream.rs
@@ -7,9 +7,8 @@ use super::ui::{account::session::Session, error::MyError};
use crate::{database::Database, federation::Federation};
use anyhow::{anyhow, Result};
use jellybase::CONF;
-use jellycommon::MediaSource;
-use jellyremuxer::RemuxerContext;
-use log::{debug, info, warn};
+use jellycommon::{stream::StreamSpec, MediaSource};
+use log::{info, warn};
use rocket::{
get,
http::{ContentType, Header, Status},
@@ -17,56 +16,43 @@ use rocket::{
response::{self, Redirect, Responder},
Either, Request, Response, State,
};
-use std::{
- ops::{Deref, Range},
- time::Duration,
-};
-use tokio::io::{duplex, DuplexStream};
-use tokio_util::io::SyncIoBridge;
+use std::{ops::Range, time::Duration};
+use tokio::io::DuplexStream;
-#[get("/n/<id>/stream?<tracks>&<webm>")]
+#[get("/n/<id>/stream?<spec>")]
pub async fn r_stream(
_sess: Session,
- id: String,
- webm: Option<bool>,
- tracks: String,
- remuxer: &State<RemuxerContext>,
federation: &State<Federation>,
db: &State<Database>,
+ id: String,
range: Option<RequestRange>,
+ spec: StreamSpec,
) -> Result<Either<StreamResponse, Redirect>, MyError> {
let node = db.node.get(&id)?.ok_or(anyhow!("node does not exist"))?;
let source = node
.private
.source
+ .as_ref()
.ok_or(anyhow!("item does not contain media"))?;
- let tracks = tracks
- .split(',')
- .map(|e| e.parse().map_err(|_| anyhow!("invalid number")))
- .collect::<Result<Vec<_>, _>>()?;
+ if let MediaSource::Remote { host, remote_id } = source {
+ let (username, password, _) = CONF
+ .remote_credentials
+ .get(host)
+ .ok_or(anyhow!("no credentials on the server-side"))?;
- let source_tracks = match source {
- MediaSource::Local { tracks } => tracks,
- MediaSource::Remote { host, remote_id } => {
- let (username, password, _) = CONF
- .remote_credentials
- .get(&host)
- .ok_or(anyhow!("no credentials on the server-side"))?;
+ let instance = federation.get_instance(&host)?.to_owned();
+ let session = instance
+ .login(
+ username.to_owned(),
+ password.to_owned(),
+ Duration::from_secs(60),
+ )
+ .await?;
- let instance = federation.get_instance(&host)?.to_owned();
- let session = instance
- .login(
- username.to_owned(),
- password.to_owned(),
- Duration::from_secs(60),
- )
- .await?;
-
- let uri = session.stream(&remote_id, &tracks, webm.unwrap_or(false));
- return Ok(Either::Right(Redirect::found(uri)));
- }
- };
+ let uri = session.stream(&remote_id, &spec);
+ return Ok(Either::Right(Redirect::found(uri)));
+ }
info!(
"stream request (range={})",
@@ -76,10 +62,6 @@ pub async fn r_stream(
.unwrap_or(format!("none"))
);
- let (a, b) = duplex(4096);
- let remuxer = remuxer.deref().clone();
- let b = SyncIoBridge::new(b);
-
let urange = match &range {
Some(r) => {
let r = r.0.get(0).unwrap_or(&(None..None));
@@ -88,21 +70,13 @@ pub async fn r_stream(
None => 0..(isize::MAX as usize),
};
- tokio::task::spawn_blocking(move || {
- if let Err(e) = remuxer.generate_into(
- b,
- urange,
- CONF.library_path.clone(),
- node.public,
- source_tracks,
- tracks,
- webm.unwrap_or(false),
- ) {
- warn!("stream stopped: {e}")
+ match jellystream::stream(node, spec, urange).await {
+ Ok(stream) => Ok(Either::Left(StreamResponse { stream, range })),
+ Err(e) => {
+ warn!("stream error: {e}");
+ Err(MyError(e))
}
- });
- debug!("starting stream");
- Ok(Either::Left(StreamResponse { stream: a, range }))
+ }
}
pub struct StreamResponse {
diff --git a/server/src/routes/ui/player.rs b/server/src/routes/ui/player.rs
index 005e513..fa9657f 100644
--- a/server/src/routes/ui/player.rs
+++ b/server/src/routes/ui/player.rs
@@ -17,15 +17,18 @@ use crate::{
uri,
};
use anyhow::anyhow;
-use jellycommon::{Node, SourceTrackKind};
+use jellycommon::{
+ stream::{StreamFormat, StreamSpec},
+ Node, SourceTrackKind, TrackID,
+};
use markup::DynRender;
use rocket::{get, FromForm, State, UriDisplayQuery};
#[derive(FromForm, Default, Clone, Debug, UriDisplayQuery)]
pub struct PlayerConfig {
- pub a: Option<u64>,
- pub v: Option<u64>,
- pub s: Option<u64>,
+ pub a: Option<TrackID>,
+ pub v: Option<TrackID>,
+ pub s: Option<TrackID>,
pub webm: bool,
}
@@ -37,24 +40,29 @@ pub fn r_player(
conf: PlayerConfig,
) -> MyResult<DynLayoutPage<'_>> {
let item = db.node.get(&id)?.ok_or(anyhow!("node does not exist"))?;
- let tracks = None
- .into_iter()
- .chain(conf.v.into_iter())
- .chain(conf.a.into_iter())
- .chain(conf.s.into_iter())
- .map(|e| format!("{e}"))
- .collect::<Vec<_>>()
- .join(",");
- let conf = player_conf(item.clone(), !tracks.is_empty())?;
+ let spec = StreamSpec {
+ tracks: None
+ .into_iter()
+ .chain(conf.v.into_iter())
+ .chain(conf.a.into_iter())
+ .chain(conf.s.into_iter())
+ .collect::<Vec<_>>(),
+ format: StreamFormat::Webm,
+ ..Default::default()
+ };
+
+ let playing = !spec.tracks.is_empty();
+
+ let conf = player_conf(item.clone(), playing)?;
Ok(LayoutPage {
title: item.public.title.to_owned(),
class: Some("player"),
content: markup::new! {
- @if tracks.is_empty() {
- img.backdrop[src=uri!(r_item_assets(&id, AssetRole::Backdrop, Some(2048))).to_string()];
+ @if playing {
+ video[src=uri!(r_stream(&id, &spec)), controls, preload="auto"]{}
} else {
- video[src=uri!(r_stream(&id, &tracks, Some(true))), controls, preload="auto"]{}
+ img.backdrop[src=uri!(r_item_assets(&id, AssetRole::Backdrop, Some(2048))).to_string()];
}
@conf
},
diff --git a/stream/Cargo.toml b/stream/Cargo.toml
new file mode 100644
index 0000000..804bc1c
--- /dev/null
+++ b/stream/Cargo.toml
@@ -0,0 +1,14 @@
+[package]
+name = "jellystream"
+version = "0.1.0"
+edition = "2021"
+
+[dependencies]
+jellycommon = { path = "../common", features = ["rocket"] }
+jellybase = { path = "../base" }
+jellytranscoder = { path = "../transcoder" }
+
+log = { workspace = true }
+anyhow = { workspace = true }
+tokio = { version = "1.32.0", features = ["io-util"] }
+tokio-util = { version = "0.7.9", features = ["io", "io-util"] }
diff --git a/stream/src/lib.rs b/stream/src/lib.rs
new file mode 100644
index 0000000..df75cf5
--- /dev/null
+++ b/stream/src/lib.rs
@@ -0,0 +1,15 @@
+use jellycommon::{stream::StreamSpec, Node};
+use std::ops::Range;
+use tokio::io::{duplex, DuplexStream};
+use tokio_util::io::SyncIoBridge;
+
+pub async fn stream(
+ node: Node,
+ spec: StreamSpec,
+ range: Range<usize>,
+) -> anyhow::Result<DuplexStream> {
+ let (a, b) = duplex(4096);
+ let b = SyncIoBridge::new(b);
+
+ Ok(a)
+}