From f8d4c438d10450ead56c0082b037e466ef5f9f24 Mon Sep 17 00:00:00 2001 From: metamuffin Date: Thu, 25 Sep 2025 04:00:51 +0200 Subject: start media processing refactor --- Cargo.lock | 157 +++++++++-------- cache/Cargo.toml | 2 +- cache/src/lib.rs | 19 +- import/asset_token/Cargo.toml | 2 +- import/asset_token/src/lib.rs | 2 +- import/src/acoustid.rs | 12 +- import/src/musicbrainz.rs | 27 ++- import/src/tmdb.rs | 19 +- remuxer/Cargo.toml | 5 +- remuxer/src/bin/average_cluster_duration.rs | 38 ++++ remuxer/src/demuxers/flac.rs | 13 +- remuxer/src/demuxers/matroska.rs | 15 +- remuxer/src/demuxers/mod.rs | 14 +- remuxer/src/lib.rs | 3 + remuxer/src/muxers/matroska.rs | 63 +++++++ remuxer/src/muxers/mod.rs | 40 +++++ server/src/logic/stream.rs | 39 +++-- server/src/ui/player.rs | 1 - stream/Cargo.toml | 2 - stream/src/cues.rs | 57 ++++++ stream/src/fragment.rs | 262 ++++++++++++++++------------ stream/src/fragment_index.rs | 53 ++++-- stream/src/hls.rs | 86 +++------ stream/src/lib.rs | 110 +++++------- stream/src/metadata.rs | 26 +++ stream/src/stream_info.rs | 79 ++++----- stream/types/src/lib.rs | 78 ++------- 27 files changed, 698 insertions(+), 526 deletions(-) create mode 100644 remuxer/src/bin/average_cluster_duration.rs create mode 100644 remuxer/src/muxers/matroska.rs create mode 100644 remuxer/src/muxers/mod.rs create mode 100644 stream/src/cues.rs create mode 100644 stream/src/metadata.rs diff --git a/Cargo.lock b/Cargo.lock index 9af686d..aa95ea7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -168,9 +168,9 @@ dependencies = [ [[package]] name = "anyhow" -version = "1.0.99" +version = "1.0.100" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b0674a1ddeecb70197781e945de4b3b8ffb61fa939a5597bcf48503737663100" +checksum = "a23eb6b1614318a8071c9b2521f36b424b2c83db5eb3a0fead4a6c0809af6e61" [[package]] name = "approx" @@ -471,9 +471,9 @@ checksum = "d71b6127be86fdcfddb610f7182ac57211d4b18a3e9c82eb2d17662f2227ad6a" [[package]] name = "cc" -version = "1.2.37" +version = "1.2.38" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "65193589c6404eb80b450d618eaf9a2cafaaafd57ecce47370519ef674a7bd44" +checksum = "80f41ae168f955c12fb8960b057d70d0ca153fb83182b57d86380443527be7e9" dependencies = [ "find-msvc-tools", "jobserver", @@ -545,9 +545,9 @@ dependencies = [ [[package]] name = "clap" -version = "4.5.47" +version = "4.5.48" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7eac00902d9d136acd712710d71823fb8ac8004ca445a89e73a41d45aa712931" +checksum = "e2134bb3ea021b78629caa971416385309e0131b351b25e01dc16fb54e1b5fae" dependencies = [ "clap_builder", "clap_derive", @@ -555,9 +555,9 @@ dependencies = [ [[package]] name = "clap_builder" -version = "4.5.47" +version = "4.5.48" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2ad9bbf750e73b5884fb8a211a9424a1906c1e156724260fdae972f31d70e1d6" +checksum = "c2ba64afa3c0a6df7fa517765e31314e983f51dda798ffba27b988194fb65dc9" dependencies = [ "anstream", "anstyle", @@ -567,9 +567,9 @@ dependencies = [ [[package]] name = "clap_complete" -version = "4.5.57" +version = "4.5.58" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4d9501bd3f5f09f7bbee01da9a511073ed30a80cd7a509f1214bb74eadea71ad" +checksum = "75bf0b32ad2e152de789bb635ea4d3078f6b838ad7974143e99b99f45a04af4a" dependencies = [ "clap", ] @@ -776,12 +776,12 @@ checksum = "2a2330da5de22e8a3cb63252ce2abb30116bf5265e89c0e01bc17015ce30a476" [[package]] name = "deranged" -version = "0.5.3" +version = "0.5.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d630bccd429a5bb5a64b5e94f693bfc48c9f8566418fda4c494cc94f911f87cc" +checksum = "a41953f86f8a05768a6cda24def994fd2f424b04ec5c719cf89989779f199071" dependencies = [ "powerfmt", - "serde", + "serde_core", ] [[package]] @@ -862,7 +862,7 @@ checksum = "75b325c5dbd37f80359721ad39aca5a29fb04c89279657cffdda8736d0c0b9d2" [[package]] name = "ebml" version = "0.1.0" -source = "git+https://codeberg.org/metamuffin/ebml-rs#8a50865c9733737fffd05f4c088c9e150cf98520" +source = "git+https://codeberg.org/metamuffin/ebml-rs#1f66a95d8ed381ca48e0cbf5f7e6808ee720d966" dependencies = [ "ebml-derive", ] @@ -870,7 +870,7 @@ dependencies = [ [[package]] name = "ebml-derive" version = "0.1.0" -source = "git+https://codeberg.org/metamuffin/ebml-rs#8a50865c9733737fffd05f4c088c9e150cf98520" +source = "git+https://codeberg.org/metamuffin/ebml-rs#1f66a95d8ed381ca48e0cbf5f7e6808ee720d966" dependencies = [ "darling", "quote", @@ -1037,9 +1037,9 @@ dependencies = [ [[package]] name = "find-msvc-tools" -version = "0.1.1" +version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7fd99930f64d146689264c637b5af2f0233a933bef0d8570e2526bf9e083192d" +checksum = "1ced73b1dacfc750a6db6c0a0c3a3853c8b41997e2e2c563dc90804ae6867959" [[package]] name = "flate2" @@ -1232,7 +1232,7 @@ dependencies = [ "js-sys", "libc", "r-efi", - "wasi 0.14.5+wasi-0.2.4", + "wasi 0.14.7+wasi-0.2.4", "wasm-bindgen", ] @@ -1308,6 +1308,12 @@ dependencies = [ "foldhash", ] +[[package]] +name = "hashbrown" +version = "0.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5419bdc4f6a9207fbeba6d11b604d481addf78ecd10c11ad51e76c2f6482748d" + [[package]] name = "heck" version = "0.5.0" @@ -1491,9 +1497,9 @@ dependencies = [ [[package]] name = "hyper-util" -version = "0.1.16" +version = "0.1.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8d9b05277c7e8da2c93a568989bb6207bef0112e8d17df7a6eda4a3cf143bc5e" +checksum = "3c6995591a8f1380fcb4ba966a252a4b29188d51d2b89e3a252f5305be65aea8" dependencies = [ "base64", "bytes", @@ -1704,19 +1710,20 @@ dependencies = [ [[package]] name = "imgref" -version = "1.11.0" +version = "1.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d0263a3d970d5c054ed9312c0057b4f3bde9c0b33836d3637361d4a9e6e7a408" +checksum = "e7c5cedc30da3a610cac6b4ba17597bdf7152cf974e8aab3afb3d54455e371c8" [[package]] name = "indexmap" -version = "2.11.1" +version = "2.11.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "206a8042aec68fa4a62e8d3f7aa4ceb508177d9324faf261e1959e495b7a1921" +checksum = "4b0f83760fb341a774ed326568e19f5a863af4a952def8c39f9ab92fd95b88e5" dependencies = [ "equivalent", - "hashbrown", + "hashbrown 0.16.0", "serde", + "serde_core", ] [[package]] @@ -1835,11 +1842,11 @@ version = "0.1.0" dependencies = [ "anyhow", "base64", - "bincode", "humansize", "log", "rand 0.9.2", "serde", + "serde_json", "sha2", "tokio", ] @@ -1956,7 +1963,6 @@ dependencies = [ "ebml", "env_logger", "hex", - "jellycache", "log", "matroska", "serde", @@ -1975,8 +1981,6 @@ dependencies = [ "log", "serde", "serde_json", - "tokio", - "tokio-util", ] [[package]] @@ -2109,9 +2113,9 @@ dependencies = [ [[package]] name = "js-sys" -version = "0.3.78" +version = "0.3.81" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0c0b063578492ceec17683ef2f8c5e89121fbd0b172cbc280635ab7567db2738" +checksum = "ec48937a97411dcb524a265206ccd4c90bb711fca92b2792c407f268825b9305" dependencies = [ "once_cell", "wasm-bindgen", @@ -2167,9 +2171,9 @@ dependencies = [ [[package]] name = "libc" -version = "0.2.175" +version = "0.2.176" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6a82ae493e598baaea5209805c49bbf2ea7de956d50d7da0da1164f9c6d28543" +checksum = "58f929b4d672ea937a23a1ab494143d968337a5f47e56d0815df1e0890ddf174" [[package]] name = "libdav1d-sys" @@ -2270,7 +2274,7 @@ version = "0.12.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "234cf4f4a04dc1f57e24b96cc0cd600cf2af460d4161ac5ecdd0af8e1f3b2a38" dependencies = [ - "hashbrown", + "hashbrown 0.15.5", ] [[package]] @@ -2327,9 +2331,10 @@ dependencies = [ [[package]] name = "matroska" version = "0.1.0" -source = "git+https://codeberg.org/metamuffin/ebml-rs#8a50865c9733737fffd05f4c088c9e150cf98520" +source = "git+https://codeberg.org/metamuffin/ebml-rs#1f66a95d8ed381ca48e0cbf5f7e6808ee720d966" dependencies = [ "ebml", + "serde", ] [[package]] @@ -2879,9 +2884,9 @@ dependencies = [ [[package]] name = "pxfm" -version = "0.1.23" +version = "0.1.24" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f55f4fedc84ed39cb7a489322318976425e42a147e2be79d8f878e2884f94e84" +checksum = "83f9b339b02259ada5c0f4a389b7fb472f933aa17ce176fd2ad98f28bb401fde" dependencies = [ "num-traits", ] @@ -3433,9 +3438,9 @@ dependencies = [ [[package]] name = "rustls" -version = "0.23.31" +version = "0.23.32" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c0ebcbd2f03de0fc1122ad9bb24b127a5a6cd51d72604a3f3c50ac459762b6cc" +checksum = "cd3c25631629d034ce7cd9940adc9d45762d46de2b0f57193c4443b92c6d4d40" dependencies = [ "once_cell", "ring", @@ -3457,9 +3462,9 @@ dependencies = [ [[package]] name = "rustls-webpki" -version = "0.103.5" +version = "0.103.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b5a37813727b78798e53c2bec3f5e8fe12a6d6f8389bf9ca7802add4c9905ad8" +checksum = "8572f3c2cb9934231157b45499fc41e1f58c589fdfb81a844ba873265e80f8eb" dependencies = [ "ring", "rustls-pki-types", @@ -3501,9 +3506,9 @@ checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" [[package]] name = "serde" -version = "1.0.221" +version = "1.0.226" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "341877e04a22458705eb4e131a1508483c877dca2792b3781d4e5d8a6019ec43" +checksum = "0dca6411025b24b60bfa7ec1fe1f8e710ac09782dca409ee8237ba74b51295fd" dependencies = [ "serde_core", "serde_derive", @@ -3511,18 +3516,18 @@ dependencies = [ [[package]] name = "serde_core" -version = "1.0.221" +version = "1.0.226" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0c459bc0a14c840cb403fc14b148620de1e0778c96ecd6e0c8c3cacb6d8d00fe" +checksum = "ba2ba63999edb9dac981fb34b3e5c0d111a69b0924e253ed29d83f7c99e966a4" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.221" +version = "1.0.226" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d6185cf75117e20e62b1ff867b9518577271e58abe0037c40bb4794969355ab0" +checksum = "8db53ae22f34573731bafa1db20f04027b2d25e02d8205921b569171699cdb33" dependencies = [ "proc-macro2", "quote", @@ -3531,13 +3536,14 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.144" +version = "1.0.145" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "56177480b00303e689183f110b4e727bb4211d692c62d4fcd16d02be93077d40" +checksum = "402a6f66d8c709116cf22f558eab210f5a50187f702eb4d7e5ef38d9a7f1c79c" dependencies = [ "itoa", "memchr", "ryu", + "serde", "serde_core", ] @@ -3955,9 +3961,9 @@ checksum = "61c41af27dd6d1e27b1b16b489db798443478cef1f06a660c96db617ba5de3b1" [[package]] name = "tempfile" -version = "3.22.0" +version = "3.23.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "84fa4d11fadde498443cca10fd3ac23c951f0dc59e080e9f4b93d4df4e4eea53" +checksum = "2d31c77bdf42a745371d260a26ca7163f1e0924b64afa0b688e61b5a9fa02f16" dependencies = [ "fastrand", "getrandom 0.3.3", @@ -4031,11 +4037,12 @@ dependencies = [ [[package]] name = "time" -version = "0.3.43" +version = "0.3.44" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "83bde6f1ec10e72d583d91623c939f623002284ef622b87de38cfd546cbf2031" +checksum = "91e7d9e3bb61134e77bde20dd4825b97c010155709965fedf0f49bb138e52a9d" dependencies = [ "deranged", + "itoa", "num-conv", "powerfmt", "serde", @@ -4117,9 +4124,9 @@ dependencies = [ [[package]] name = "tokio-rustls" -version = "0.26.2" +version = "0.26.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8e727b36a1a0e8b74c376ac2211e40c2c8af09fb4013c60d910495810f008e9b" +checksum = "05f63835928ca123f1bef57abbcd23bb2ba0ac9ae1235f1e65bda0d06e7786bd" dependencies = [ "rustls", "tokio", @@ -4526,27 +4533,27 @@ checksum = "ccf3ec651a847eb01de73ccad15eb7d99f80485de043efb2f370cd654f4ea44b" [[package]] name = "wasi" -version = "0.14.5+wasi-0.2.4" +version = "0.14.7+wasi-0.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a4494f6290a82f5fe584817a676a34b9d6763e8d9d18204009fb31dceca98fd4" +checksum = "883478de20367e224c0090af9cf5f9fa85bed63a95c1abf3afc5c083ebc06e8c" dependencies = [ "wasip2", ] [[package]] name = "wasip2" -version = "1.0.0+wasi-0.2.4" +version = "1.0.1+wasi-0.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "03fa2761397e5bd52002cd7e73110c71af2109aca4e521a9f40473fe685b0a24" +checksum = "0562428422c63773dad2c345a1882263bbf4d65cf3f42e90921f787ef5ad58e7" dependencies = [ "wit-bindgen", ] [[package]] name = "wasm-bindgen" -version = "0.2.101" +version = "0.2.104" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7e14915cadd45b529bb8d1f343c4ed0ac1de926144b746e2710f9cd05df6603b" +checksum = "c1da10c01ae9f1ae40cbfac0bac3b1e724b320abfcf52229f80b547c0d250e2d" dependencies = [ "cfg-if", "once_cell", @@ -4557,9 +4564,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-backend" -version = "0.2.101" +version = "0.2.104" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e28d1ba982ca7923fd01448d5c30c6864d0a14109560296a162f80f305fb93bb" +checksum = "671c9a5a66f49d8a47345ab942e2cb93c7d1d0339065d4f8139c486121b43b19" dependencies = [ "bumpalo", "log", @@ -4571,9 +4578,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-futures" -version = "0.4.51" +version = "0.4.54" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0ca85039a9b469b38336411d6d6ced91f3fc87109a2a27b0c197663f5144dffe" +checksum = "7e038d41e478cc73bae0ff9b36c60cff1c98b8f38f8d7e8061e79ee63608ac5c" dependencies = [ "cfg-if", "js-sys", @@ -4584,9 +4591,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro" -version = "0.2.101" +version = "0.2.104" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7c3d463ae3eff775b0c45df9da45d68837702ac35af998361e2c84e7c5ec1b0d" +checksum = "7ca60477e4c59f5f2986c50191cd972e3a50d8a95603bc9434501cf156a9a119" dependencies = [ "quote", "wasm-bindgen-macro-support", @@ -4594,9 +4601,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro-support" -version = "0.2.101" +version = "0.2.104" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7bb4ce89b08211f923caf51d527662b75bdc9c9c7aab40f86dcb9fb85ac552aa" +checksum = "9f07d2f20d4da7b26400c9f4a0511e6e0345b040694e8a75bd41d578fa4421d7" dependencies = [ "proc-macro2", "quote", @@ -4607,18 +4614,18 @@ dependencies = [ [[package]] name = "wasm-bindgen-shared" -version = "0.2.101" +version = "0.2.104" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f143854a3b13752c6950862c906306adb27c7e839f7414cec8fea35beab624c1" +checksum = "bad67dc8b2a1a6e5448428adec4c3e84c43e561d8c9ee8a9e5aabeb193ec41d1" dependencies = [ "unicode-ident", ] [[package]] name = "web-sys" -version = "0.3.78" +version = "0.3.81" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "77e4b637749ff0d92b8fad63aa1f7cff3cbe125fd49c175cd6345e7272638b12" +checksum = "9367c417a924a74cae129e6a2ae3b47fabb1f8995595ab474029da749a8be120" dependencies = [ "js-sys", "wasm-bindgen", @@ -4988,9 +4995,9 @@ dependencies = [ [[package]] name = "wit-bindgen" -version = "0.45.1" +version = "0.46.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5c573471f125075647d03df72e026074b7203790d41351cd6edc96f46bcccd36" +checksum = "f17a85883d4e6d00e8a97c586de764dabcc06133f7f1d55dce5cdc070ad7fe59" [[package]] name = "writeable" diff --git a/cache/Cargo.toml b/cache/Cargo.toml index 42f5e00..aaf83e0 100644 --- a/cache/Cargo.toml +++ b/cache/Cargo.toml @@ -5,7 +5,6 @@ edition = "2024" [dependencies] base64 = "0.22.1" -bincode = "2.0.0-rc.3" humansize = "2.1.3" anyhow = "1.0.95" log = { workspace = true } @@ -13,3 +12,4 @@ tokio = { workspace = true } sha2 = "0.10.8" rand = "0.9.1" serde = "1.0.217" +serde_json = "1.0.145" diff --git a/cache/src/lib.rs b/cache/src/lib.rs index 9dc2f09..52245d6 100644 --- a/cache/src/lib.rs +++ b/cache/src/lib.rs @@ -5,7 +5,6 @@ */ use anyhow::{Context, anyhow}; use base64::Engine; -use bincode::{Decode, Encode}; use log::{info, warn}; use rand::random; use serde::{Deserialize, Serialize}; @@ -44,7 +43,7 @@ static CONF: LazyLock = LazyLock::new(|| { .expect("cache config not preloaded. logic error") }); -#[derive(Debug, Encode, Decode, Serialize, Clone, PartialEq, Eq)] +#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)] pub struct CachePath(pub PathBuf); impl CachePath { pub fn abs(&self) -> PathBuf { @@ -182,7 +181,7 @@ pub fn cache_memory( ) -> Result, anyhow::Error> where Fun: FnMut() -> Result, - T: Encode + Decode<()> + Send + Sync + 'static, + T: Serialize + for<'de> Deserialize<'de> + Send + Sync + 'static, { let (_, location) = cache_location(kind, &key); { @@ -201,14 +200,12 @@ where let location = cache_file(kind, &key, move |file| { let object = generate()?; let mut file = std::io::BufWriter::new(file); - bincode::encode_into_std_write(&object, &mut file, bincode::config::standard()) - .context("encoding cache object")?; + serde_json::to_writer(&mut file, &object).context("encoding cache object")?; file.flush()?; Ok(()) })?; let mut file = std::io::BufReader::new(std::fs::File::open(location.abs())?); - let object = bincode::decode_from_std_read::(&mut file, bincode::config::standard()) - .context("decoding cache object")?; + let object = serde_json::from_reader::<_, T>(&mut file).context("decoding cache object")?; let object = Arc::new(object); let size = file.stream_position()? as usize; // this is an approximation mainly since varint is used in bincode @@ -238,7 +235,7 @@ pub async fn async_cache_memory( where Fun: FnOnce() -> Fut, Fut: Future>, - T: Encode + Decode<()> + Send + Sync + 'static, + T: Serialize + for<'de> Deserialize<'de> + Send + Sync + 'static, { let (_, location) = cache_location(kind, &key); { @@ -256,8 +253,7 @@ where let location = async_cache_file(kind, &key, move |mut file| async move { let object = generate().await?; - let data = bincode::encode_to_vec(&object, bincode::config::standard()) - .context("encoding cache object")?; + let data = serde_json::to_vec(&object).context("encoding cache object")?; file.write_all(&data).await?; @@ -269,8 +265,7 @@ where file.read_to_end(&mut data) .await .context("reading cache object")?; - let (object, _) = bincode::decode_from_slice::(&data, bincode::config::standard()) - .context("decoding cache object")?; + let object = serde_json::from_slice::(&data).context("decoding cache object")?; let object = Arc::new(object); let size = file.stream_position().await? as usize; // this is an approximation mainly since varint is used in bincode diff --git a/import/asset_token/Cargo.toml b/import/asset_token/Cargo.toml index 95615ce..af20507 100644 --- a/import/asset_token/Cargo.toml +++ b/import/asset_token/Cargo.toml @@ -13,7 +13,7 @@ sha2 = "0.10.8" base64 = "0.22.1" tokio = { workspace = true } anyhow = "1.0.95" -bincode = "2.0.0-rc.3" +bincode = { version = "2.0.1", features = ["serde"] } rand = "0.9.0" serde_json = "1.0.138" aes-gcm-siv = "0.11.1" diff --git a/import/asset_token/src/lib.rs b/import/asset_token/src/lib.rs index 7334076..6f4ad7a 100644 --- a/import/asset_token/src/lib.rs +++ b/import/asset_token/src/lib.rs @@ -51,7 +51,7 @@ static ASSET_KEY: LazyLock = LazyLock::new(|| { #[derive(Debug, Encode, Decode, Serialize, PartialEq, Eq)] pub enum AssetInner { Federated { host: String, asset: Vec }, - Cache(CachePath), + Cache(#[bincode(with_serde)] CachePath), Assets(PathBuf), Media(PathBuf), LocalTrack(LocalTrack), diff --git a/import/src/acoustid.rs b/import/src/acoustid.rs index 741d491..cbdfc7a 100644 --- a/import/src/acoustid.rs +++ b/import/src/acoustid.rs @@ -12,7 +12,7 @@ use reqwest::{ header::{HeaderMap, HeaderName, HeaderValue}, Client, ClientBuilder, }; -use serde::Deserialize; +use serde::{Deserialize, Serialize}; use std::{path::Path, process::Stdio, sync::Arc, time::Duration}; use tokio::{ io::AsyncReadExt, @@ -27,30 +27,30 @@ pub(crate) struct AcoustID { rate_limit: Arc, } -#[derive(Debug, Hash, Clone, Encode, Decode)] +#[derive(Debug, Hash, Clone, Serialize, Deserialize)] pub(crate) struct Fingerprint { duration: u32, fingerprint: String, } -#[derive(Debug, Deserialize)] +#[derive(Debug, Serialize, Deserialize)] pub(crate) struct FpCalcOutput { duration: f32, fingerprint: String, } -#[derive(Deserialize, Encode, Decode)] +#[derive(Serialize, Deserialize, Encode, Decode)] pub(crate) struct AcoustIDLookupResultRecording { id: String, } -#[derive(Deserialize, Encode, Decode)] +#[derive(Serialize, Deserialize, Encode, Decode)] pub(crate) struct AcoustIDLookupResult { id: String, score: f32, #[serde(default)] recordings: Vec, } -#[derive(Deserialize, Encode, Decode)] +#[derive(Serialize, Deserialize, Encode, Decode)] pub(crate) struct AcoustIDLookupResponse { status: String, results: Vec, diff --git a/import/src/musicbrainz.rs b/import/src/musicbrainz.rs index 612c4ba..e4f38da 100644 --- a/import/src/musicbrainz.rs +++ b/import/src/musicbrainz.rs @@ -6,14 +6,13 @@ use crate::USER_AGENT; use anyhow::{Context, Result}; -use bincode::{Decode, Encode}; use jellycache::async_cache_memory; use log::info; use reqwest::{ header::{HeaderMap, HeaderName, HeaderValue}, Client, ClientBuilder, }; -use serde::Deserialize; +use serde::{Deserialize, Serialize}; use std::{collections::BTreeMap, sync::Arc, time::Duration}; use tokio::{ sync::Semaphore, @@ -40,7 +39,7 @@ pub struct MusicBrainz { rate_limit: Arc, } -#[derive(Debug, Deserialize, Encode, Decode)] +#[derive(Debug, Serialize, Deserialize)] #[serde(rename_all = "kebab-case")] pub struct MbRecordingRel { pub id: String, @@ -54,7 +53,7 @@ pub struct MbRecordingRel { pub artist_credit: Vec, } -#[derive(Debug, Deserialize, Encode, Decode)] +#[derive(Debug, Serialize, Deserialize)] #[serde(rename_all = "kebab-case")] pub struct MbArtistRel { pub id: String, @@ -72,14 +71,14 @@ pub struct MbArtistRel { pub relations: Vec, } -#[derive(Debug, Deserialize, Encode, Decode)] +#[derive(Debug, Serialize, Deserialize)] #[serde(rename_all = "kebab-case")] pub struct MbArtistCredit { pub name: String, pub artist: MbArtist, } -#[derive(Debug, Deserialize, Encode, Decode)] +#[derive(Debug, Serialize, Deserialize)] #[serde(rename_all = "kebab-case")] pub struct MbRelation { pub direction: String, @@ -103,7 +102,7 @@ pub struct MbRelation { pub event: Option, } -#[derive(Debug, Deserialize, Encode, Decode)] +#[derive(Debug, Serialize, Deserialize)] #[serde(rename_all = "kebab-case")] pub struct MbSeries { pub id: String, @@ -113,7 +112,7 @@ pub struct MbSeries { pub disambiguation: String, } -#[derive(Debug, Deserialize, Encode, Decode)] +#[derive(Debug, Serialize, Deserialize)] #[serde(rename_all = "kebab-case")] pub struct MbRecording { pub id: String, @@ -127,7 +126,7 @@ pub struct MbRecording { pub artist_credit: Vec, } -#[derive(Debug, Deserialize, Encode, Decode)] +#[derive(Debug, Serialize, Deserialize)] #[serde(rename_all = "kebab-case")] pub struct MbWork { pub id: String, @@ -141,7 +140,7 @@ pub struct MbWork { pub disambiguation: String, } -#[derive(Debug, Deserialize, Encode, Decode)] +#[derive(Debug, Serialize, Deserialize)] #[serde(rename_all = "kebab-case")] pub struct MbEvent { pub id: String, @@ -154,7 +153,7 @@ pub struct MbEvent { pub life_span: MbTimespan, } -#[derive(Debug, Deserialize, Encode, Decode)] +#[derive(Debug, Serialize, Deserialize)] #[serde(rename_all = "kebab-case")] pub struct MbArtist { pub id: String, @@ -166,7 +165,7 @@ pub struct MbArtist { pub sort_name: String, } -#[derive(Debug, Deserialize, Encode, Decode)] +#[derive(Debug, Serialize, Deserialize)] #[serde(rename_all = "kebab-case")] pub struct MbTimespan { pub begin: Option, @@ -174,7 +173,7 @@ pub struct MbTimespan { pub ended: bool, } -#[derive(Debug, Deserialize, Encode, Decode)] +#[derive(Debug, Serialize, Deserialize)] #[serde(rename_all = "kebab-case")] pub struct MbArea { pub name: String, @@ -185,7 +184,7 @@ pub struct MbArea { pub disambiguation: String, } -#[derive(Debug, Deserialize, Encode, Decode)] +#[derive(Debug, Serialize, Deserialize)] #[serde(rename_all = "kebab-case")] pub struct MbUrl { pub id: String, diff --git a/import/src/tmdb.rs b/import/src/tmdb.rs index ceb1650..ad99fde 100644 --- a/import/src/tmdb.rs +++ b/import/src/tmdb.rs @@ -5,7 +5,6 @@ */ use crate::USER_AGENT; use anyhow::{anyhow, bail, Context}; -use bincode::{Decode, Encode}; use jellycache::{async_cache_file, async_cache_memory, CachePath}; use jellycommon::{ chrono::{format::Parsed, Utc}, @@ -16,7 +15,7 @@ use reqwest::{ header::{HeaderMap, HeaderName, HeaderValue}, Client, ClientBuilder, }; -use serde::Deserialize; +use serde::{Deserialize, Serialize}; use std::sync::Arc; use tokio::io::AsyncWriteExt; @@ -165,7 +164,7 @@ pub fn parse_release_date(d: &str) -> anyhow::Result> { Ok(Some(p.to_datetime_with_timezone(&Utc)?.timestamp_millis())) } -#[derive(Debug, Clone, Deserialize, Encode, Decode)] +#[derive(Debug, Clone, Serialize, Deserialize)] pub struct TmdbEpisode { pub air_date: String, pub overview: String, @@ -177,12 +176,12 @@ pub struct TmdbEpisode { pub vote_count: usize, } -#[derive(Debug, Clone, Deserialize, Encode, Decode)] +#[derive(Debug, Clone, Serialize, Deserialize)] pub struct TmdbPersonImage { pub id: u64, pub profiles: Vec, } -#[derive(Debug, Clone, Deserialize, Encode, Decode)] +#[derive(Debug, Clone, Serialize, Deserialize)] pub struct TmdbPersonImageProfile { pub aspect_ratio: f64, pub height: u32, @@ -190,7 +189,7 @@ pub struct TmdbPersonImageProfile { pub file_path: String, } -#[derive(Debug, Clone, Deserialize, Encode, Decode)] +#[derive(Debug, Clone, Serialize, Deserialize)] pub struct TmdbQuery { pub page: usize, pub results: Vec, @@ -198,7 +197,7 @@ pub struct TmdbQuery { pub total_results: usize, } -#[derive(Debug, Clone, Deserialize, Encode, Decode)] +#[derive(Debug, Clone, Serialize, Deserialize)] pub struct TmdbQueryResult { pub adult: bool, pub backdrop_path: Option, @@ -216,7 +215,7 @@ pub struct TmdbQueryResult { pub vote_count: usize, } -#[derive(Debug, Clone, Deserialize, Encode, Decode)] +#[derive(Debug, Clone, Serialize, Deserialize)] pub struct TmdbDetails { pub adult: bool, pub backdrop_path: Option, @@ -240,13 +239,13 @@ pub struct TmdbDetails { pub tagline: Option, } -#[derive(Debug, Clone, Deserialize, Encode, Decode)] +#[derive(Debug, Clone, Serialize, Deserialize)] pub struct TmdbGenre { pub id: u64, pub name: String, } -#[derive(Debug, Clone, Deserialize, Encode, Decode)] +#[derive(Debug, Clone, Serialize, Deserialize)] pub struct TmdbProductionCompany { pub id: u64, pub name: String, diff --git a/remuxer/Cargo.toml b/remuxer/Cargo.toml index 24cd9ab..a8fd18f 100644 --- a/remuxer/Cargo.toml +++ b/remuxer/Cargo.toml @@ -4,7 +4,6 @@ version = "0.1.0" edition = "2024" [dependencies] -jellycache = { path = "../cache" } hex = "0.4.3" anyhow = "1.0.95" @@ -15,4 +14,6 @@ serde = { version = "1.0.217", features = ["derive"] } bincode = { version = "2.0.0-rc.3", features = ["serde"] } winter-ebml = { git = "https://codeberg.org/metamuffin/ebml-rs", package = "ebml" } -winter-matroska = { git = "https://codeberg.org/metamuffin/ebml-rs", package = "matroska" } +winter-matroska = { git = "https://codeberg.org/metamuffin/ebml-rs", package = "matroska", features = [ + "serde", +] } diff --git a/remuxer/src/bin/average_cluster_duration.rs b/remuxer/src/bin/average_cluster_duration.rs new file mode 100644 index 0000000..69bb79c --- /dev/null +++ b/remuxer/src/bin/average_cluster_duration.rs @@ -0,0 +1,38 @@ +/* + This file is part of jellything (https://codeberg.org/metamuffin/jellything) + which is licensed under the GNU Affero General Public License (version 3); see /COPYING. + Copyright (C) 2025 metamuffin +*/ + +use anyhow::{Result, anyhow}; +use jellyremuxer::demuxers::{Demuxer, DemuxerNew, matroska::MatroskaDemuxer}; +use std::{env::args, fs::File}; + +fn main() -> Result<()> { + env_logger::init_from_env("LOG"); + let path = args().nth(1).ok_or(anyhow!("first arg is input path"))?; + let file = File::open(path)?; + let mut reader = MatroskaDemuxer::new(Box::new(file)); + + let info = reader.info()?; + + reader.seek_cluster(None)?; + let mut num_clusters = 0; + let mut last_ts = 0; + let mut total_size = 0; + while let Some((_, cluster)) = reader.read_cluster()? { + last_ts = cluster.timestamp * info.timestamp_scale; + num_clusters += 1; + total_size += cluster + .simple_blocks + .iter() + .map(|b| b.data.len()) + .sum::() as u64 + } + + let average_duration = (last_ts / num_clusters) as f64 / 1_000_000_000.; + let average_size = (total_size / num_clusters) as f64 / 1_000_000.; + println!("{average_duration:>6.02}s {average_size:>6.02}MB"); + + Ok(()) +} diff --git a/remuxer/src/demuxers/flac.rs b/remuxer/src/demuxers/flac.rs index 04d15e0..b397a8c 100644 --- a/remuxer/src/demuxers/flac.rs +++ b/remuxer/src/demuxers/flac.rs @@ -7,7 +7,7 @@ use crate::demuxers::{Demuxer, DemuxerNew, ReadSeek}; use anyhow::{Result, anyhow, bail}; use std::io::{BufReader, Read, Seek, SeekFrom}; -use winter_matroska::{Audio, Cluster, TrackEntry, TrackType, Tracks}; +use winter_matroska::{Audio, Cluster, Info, TrackEntry, TrackType, Tracks}; pub struct FlacDemuxer { reader: BufReader>, @@ -28,6 +28,7 @@ struct MetadataBlock { r#type: u8, data: Vec, } +#[allow(unused)] impl MetadataBlock { const TY_STREAMINFO: u8 = 0; const TY_PADDING: u8 = 1; @@ -38,6 +39,7 @@ impl MetadataBlock { const TY_PICTURE: u8 = 6; } +#[allow(unused)] struct StreamInfo { min_block_size: u16, max_block_size: u16, @@ -109,6 +111,13 @@ impl FlacDemuxer { } } impl Demuxer for FlacDemuxer { + fn info(&mut self) -> Result { + Ok(Info { + duration: Some(120000.), // TODO + timestamp_scale: 1_000_000, + ..Default::default() + }) + } fn tracks(&mut self) -> Result> { let si = self.stream_info()?; let mut buf = Vec::new(); @@ -243,8 +252,6 @@ impl Demuxer for FlacDemuxer { let mut crc_buf = [0u8; 1]; self.reader.read_exact(&mut crc_buf)?; - - Ok(None) } } diff --git a/remuxer/src/demuxers/matroska.rs b/remuxer/src/demuxers/matroska.rs index 6301f15..b70054d 100644 --- a/remuxer/src/demuxers/matroska.rs +++ b/remuxer/src/demuxers/matroska.rs @@ -7,7 +7,7 @@ use crate::demuxers::{Demuxer, DemuxerNew, ReadSeek}; use anyhow::{Context, Result, anyhow, bail}; use log::debug; -use std::io::{BufReader, Read, Seek, SeekFrom}; +use std::io::{BufReader, ErrorKind, Read, Seek, SeekFrom}; use winter_ebml::{Ebml, EbmlHeader, VintReadExt, read_vint_slice}; use winter_matroska::{ Attachments, Chapters, Cluster, Cues, Info, MatroskaFile, SeekHead, Segment, Tags, Tracks, @@ -47,6 +47,7 @@ impl MatroskaDemuxer { if !matches!(header.doc_type.as_str(), "matroska" | "webm") { bail!("file is {:?} but not matroska/webm", header.doc_type) } + eprintln!("{header:?}"); if header.ebml_max_id_length != 4 { bail!( "file has invalid EBMLMaxIDLength of {}", @@ -156,8 +157,10 @@ impl MatroskaDemuxer { } } impl Demuxer for MatroskaDemuxer { - fn info(&mut self) -> Result> { - self.read_segment_tag("Info", Segment::TAG_INFO) + fn info(&mut self) -> Result { + Ok(self + .read_segment_tag("Info", Segment::TAG_INFO)? + .ok_or(anyhow!("info missing"))?) } fn tracks(&mut self) -> Result> { self.read_segment_tag("Tracks", Segment::TAG_TRACKS) @@ -187,7 +190,11 @@ impl Demuxer for MatroskaDemuxer { loop { let position = self.reader.stream_position()?; // TODO handle eof - let tag = self.reader.read_vint()?; + let tag = match self.reader.read_vint() { + Ok(val) => val, + Err(e) if e.kind() == ErrorKind::UnexpectedEof => return Ok(None), + Err(e) => return Err(e.into()), + }; let size = self.reader.read_vint()?; if tag != Segment::TAG_CLUSTERS { self.reader.seek_relative(size as i64)?; diff --git a/remuxer/src/demuxers/mod.rs b/remuxer/src/demuxers/mod.rs index 8940ca5..f001250 100644 --- a/remuxer/src/demuxers/mod.rs +++ b/remuxer/src/demuxers/mod.rs @@ -10,9 +10,10 @@ pub mod matroska; use crate::{ ContainerFormat, demuxers::{flac::FlacDemuxer, matroska::MatroskaDemuxer}, + magic::detect_container_format, }; use anyhow::Result; -use std::io::{Read, Seek}; +use std::io::{Read, Seek, SeekFrom}; use winter_matroska::{Attachments, Chapters, Cluster, Cues, Info, Tags, Tracks}; pub trait ReadSeek: Read + Seek {} @@ -24,7 +25,7 @@ pub trait DemuxerNew: Demuxer + Sized { #[rustfmt::skip] pub trait Demuxer { - fn info(&mut self) -> Result> { Ok(None) } + fn info(&mut self) -> Result; fn tracks(&mut self) -> Result> { Ok(None) } fn chapters(&mut self) -> Result> { Ok(None) } fn attachments(&mut self) -> Result> { Ok(None) } @@ -41,3 +42,12 @@ pub fn create_demuxer(container: ContainerFormat, reader: Box) -> ContainerFormat::Flac => Box::new(FlacDemuxer::new(reader)), } } +pub fn create_demuxer_autodetect( + mut reader: Box, +) -> Result>> { + let Some(container) = detect_container_format(&mut reader)? else { + return Ok(None); + }; + reader.seek(SeekFrom::Start(0))?; + Ok(Some(create_demuxer(container, reader))) +} diff --git a/remuxer/src/lib.rs b/remuxer/src/lib.rs index 049c12f..13ae06f 100644 --- a/remuxer/src/lib.rs +++ b/remuxer/src/lib.rs @@ -6,6 +6,9 @@ pub mod demuxers; pub mod magic; +pub mod muxers; + +pub use winter_matroska as matroska; #[derive(Debug, Clone, Copy, PartialEq)] pub enum ContainerFormat { diff --git a/remuxer/src/muxers/matroska.rs b/remuxer/src/muxers/matroska.rs new file mode 100644 index 0000000..47210c9 --- /dev/null +++ b/remuxer/src/muxers/matroska.rs @@ -0,0 +1,63 @@ +/* + This file is part of jellything (https://codeberg.org/metamuffin/jellything) + which is licensed under the GNU Affero General Public License (version 3); see /COPYING. + Copyright (C) 2025 metamuffin +*/ + +use crate::muxers::FragmentMuxer; +use anyhow::Result; +use std::io::Write; +use winter_ebml::{EbmlHeader, EbmlToVec}; +use winter_matroska::{Cluster, Info, MatroskaFile, Segment, Tracks}; + +fn write_fragment_shared( + out: &mut dyn Write, + info: Info, + tracks: Tracks, + cluster: Cluster, + webm: bool, +) -> Result<()> { + let file = MatroskaFile { + ebml_header: EbmlHeader { + ebml_version: 1, + ebml_read_version: 1, + ebml_max_id_length: 4, + ebml_max_size_length: 8, + doc_type: if webm { "webm" } else { "matroska" }.to_string(), + doc_type_version: 4, + doc_type_read_version: 2, + ..Default::default() + }, + segment: Segment { + info, + tracks: Some(tracks), + clusters: vec![cluster], + ..Default::default() + }, + }; + out.write_all(&file.to_vec())?; + Ok(()) +} + +pub struct MatroskaFragmentMuxer; +impl FragmentMuxer for MatroskaFragmentMuxer { + fn write_fragment( + out: &mut dyn Write, + info: Info, + tracks: Tracks, + cluster: Cluster, + ) -> Result<()> { + write_fragment_shared(out, info, tracks, cluster, false) + } +} +pub struct WebmFragmentMuxer; +impl FragmentMuxer for WebmFragmentMuxer { + fn write_fragment( + out: &mut dyn Write, + info: Info, + tracks: Tracks, + cluster: Cluster, + ) -> Result<()> { + write_fragment_shared(out, info, tracks, cluster, true) + } +} diff --git a/remuxer/src/muxers/mod.rs b/remuxer/src/muxers/mod.rs new file mode 100644 index 0000000..8752373 --- /dev/null +++ b/remuxer/src/muxers/mod.rs @@ -0,0 +1,40 @@ +/* + This file is part of jellything (https://codeberg.org/metamuffin/jellything) + which is licensed under the GNU Affero General Public License (version 3); see /COPYING. + Copyright (C) 2025 metamuffin +*/ + +pub mod matroska; + +use crate::{ + ContainerFormat, + muxers::matroska::{MatroskaFragmentMuxer, WebmFragmentMuxer}, +}; +use anyhow::Result; +use std::io::Write; +use winter_matroska::{Cluster, Info, Tracks}; + +pub trait FragmentMuxer { + fn write_fragment( + out: &mut dyn Write, + info: Info, + tracks: Tracks, + cluster: Cluster, + ) -> Result<()>; +} + +pub fn write_fragment( + container: ContainerFormat, + out: &mut dyn Write, + info: Info, + tracks: Tracks, + cluster: Cluster, +) -> Result<()> { + match container { + ContainerFormat::Matroska => { + MatroskaFragmentMuxer::write_fragment(out, info, tracks, cluster) + } + ContainerFormat::Webm => WebmFragmentMuxer::write_fragment(out, info, tracks, cluster), + _ => unimplemented!(), + } +} diff --git a/server/src/logic/stream.rs b/server/src/logic/stream.rs index e3650ec..dfe2f86 100644 --- a/server/src/logic/stream.rs +++ b/server/src/logic/stream.rs @@ -9,7 +9,7 @@ use jellycommon::{api::NodeFilterSort, stream::StreamSpec, NodeID, TrackSource}; use jellyimport::asset_token::AssetInner; use jellylogic::{node::get_node, session::Session}; use jellystream::SMediaInfo; -use log::{info, warn}; +use log::info; use rocket::{ get, head, http::{Header, Status}, @@ -22,7 +22,11 @@ use std::{ ops::Range, sync::Arc, }; -use tokio::io::{duplex, DuplexStream}; +use tokio::{ + io::{duplex, DuplexStream}, + task::spawn_blocking, +}; +use tokio_util::io::SyncIoBridge; #[head("/n/<_id>/stream?")] pub async fn r_stream_head( @@ -123,9 +127,9 @@ pub async fn r_stream( let urange = match &range { Some(r) => { let r = r.0.first().unwrap_or(&(None..None)); - r.start.unwrap_or(0)..r.end.unwrap_or(isize::MAX as usize) + r.start.unwrap_or(0)..r.end.unwrap_or(u64::MAX) } - None => 0..(isize::MAX as usize), + None => 0..u64::MAX, }; let head = jellystream::stream_head(&spec); @@ -143,18 +147,19 @@ pub async fn r_stream( title: node.title.clone(), }); - match jellystream::stream(media, spec, urange).await { - Ok(stream) => Ok(Either::Left(StreamResponse { - stream, - range, - advertise_range: head.range_supported, - content_type: head.content_type, - })), - Err(e) => { - warn!("stream error: {e}"); - Err(MyError(e)) - } - } + // TODO cleaner solution needed + let mut reader = spawn_blocking(move || jellystream::stream(media, spec, urange)) + .await + .unwrap()?; + let (stream_write, stream_read) = duplex(4096); + spawn_blocking(move || std::io::copy(&mut reader, &mut SyncIoBridge::new(stream_write))); + + Ok(Either::Left(StreamResponse { + stream: stream_read, + range, + advertise_range: head.range_supported, + content_type: head.content_type, + })) } pub struct RedirectResponse(String); @@ -199,7 +204,7 @@ impl<'r> Responder<'r, 'static> for StreamResponse { } #[derive(Debug)] -pub struct RequestRange(Vec>>); +pub struct RequestRange(Vec>>); impl RequestRange { pub fn to_cr_hv(&self) -> String { diff --git a/server/src/ui/player.rs b/server/src/ui/player.rs index 4938c14..091169b 100644 --- a/server/src/ui/player.rs +++ b/server/src/ui/player.rs @@ -29,7 +29,6 @@ fn jellynative_url(action: &str, seek: f64, secret: &str, node: &str, session: & let stream_url = format!( "/n/{node}/stream{}", StreamSpec::HlsMultiVariant { - segment: 0, container: StreamContainer::Matroska } .to_query() diff --git a/stream/Cargo.toml b/stream/Cargo.toml index fb8cfe2..8e71e1c 100644 --- a/stream/Cargo.toml +++ b/stream/Cargo.toml @@ -12,7 +12,5 @@ jellystream-types = { path = "types" } log = { workspace = true } anyhow = { workspace = true } -tokio = { version = "1.43.0", features = ["io-util"] } -tokio-util = { version = "0.7.13", features = ["io", "io-util"] } serde_json = "1.0.138" serde = { version = "1.0.217", features = ["derive"] } diff --git a/stream/src/cues.rs b/stream/src/cues.rs new file mode 100644 index 0000000..b486a6f --- /dev/null +++ b/stream/src/cues.rs @@ -0,0 +1,57 @@ +/* + This file is part of jellything (https://codeberg.org/metamuffin/jellything) + which is licensed under the GNU Affero General Public License (version 3); see /COPYING. + Copyright (C) 2025 metamuffin +*/ + +use anyhow::{anyhow, Result}; +use jellycache::cache_memory; +use jellyremuxer::demuxers::create_demuxer_autodetect; +use serde::{Deserialize, Serialize}; +use std::{collections::BTreeMap, fs::File, path::Path, sync::Arc}; + +#[derive(Serialize, Deserialize, Default)] +pub struct TrackStat { + pub num_blocks: usize, + pub total_size: u64, +} + +#[derive(Serialize, Deserialize)] +pub struct GeneratedCue { + pub position: u64, + pub time: u64, +} + +#[derive(Serialize, Deserialize)] +pub struct StatsAndCues { + pub stats: BTreeMap, + pub cues: Vec, +} + +pub fn generate_cues(path: &Path) -> Result> { + cache_memory("generated-cues", path, move || { + let media = File::open(path)?; + let mut media = + create_demuxer_autodetect(Box::new(media))?.ok_or(anyhow!("media format unknown"))?; + + let info = media.info()?; + media.seek_cluster(None)?; + + let mut stats = BTreeMap::::new(); + let mut cues = Vec::new(); + + while let Some((position, cluster)) = media.read_cluster()? { + cues.push(GeneratedCue { + position, + time: cluster.timestamp * info.timestamp_scale, + }); + for block in cluster.simple_blocks { + let e = stats.entry(block.track).or_default(); + e.num_blocks += 1; + e.total_size += block.data.len() as u64; + } + } + + Ok(StatsAndCues { stats, cues }) + }) +} diff --git a/stream/src/fragment.rs b/stream/src/fragment.rs index 89ce94f..3b4bb0f 100644 --- a/stream/src/fragment.rs +++ b/stream/src/fragment.rs @@ -3,44 +3,39 @@ which is licensed under the GNU Affero General Public License (version 3); see /COPYING. Copyright (C) 2025 metamuffin */ -use crate::{stream_info, SMediaInfo}; -use anyhow::{anyhow, bail, Result}; -use jellyremuxer::{matroska_to_mpeg4, matroska_to_webm::matroska_to_webm}; -use jellystream_types::StreamContainer; -use jellytranscoder::fragment::transcode; -use log::warn; +use crate::{cues::generate_cues, stream_info, SMediaInfo}; +use anyhow::{anyhow, Result}; +use jellyremuxer::{ + demuxers::create_demuxer_autodetect, matroska, muxers::write_fragment, ContainerFormat, +}; +use jellystream_types::{FormatNum, IndexNum, StreamContainer, TrackNum}; use std::{ - io::{Cursor, Seek, SeekFrom}, + fs::File, + io::{Cursor, Read}, sync::Arc, }; -use tokio::{fs::File, io::DuplexStream}; -use tokio_util::io::SyncIoBridge; -pub async fn fragment_stream( - mut b: DuplexStream, +pub fn fragment_stream( info: Arc, - track: usize, - segment: usize, - index: usize, - format_num: usize, + track: TrackNum, + index: IndexNum, + format_num: FormatNum, container: StreamContainer, -) -> Result<()> { - let (iinfo, info) = stream_info(info).await?; +) -> Result> { + let (iinfo, info) = stream_info(info)?; + let (file_index, track_num) = *iinfo .track_to_file .get(track) .ok_or(anyhow!("track not found"))?; - let path = iinfo.paths[file_index].clone(); - let seg = info - .segments - .get(segment) - .ok_or(anyhow!("segment not found"))?; - let track = seg.tracks.get(track).ok_or(anyhow!("track not found"))?; + let media_path = iinfo.paths[file_index].clone(); + let track = info.tracks.get(track).ok_or(anyhow!("track not found"))?; let format = track .formats .get(format_num) .ok_or(anyhow!("format not found"))?; - let orig_track = iinfo.metadata[file_index] + + let mk_track = iinfo.metadata[file_index] .tracks .as_ref() .unwrap() @@ -49,97 +44,138 @@ pub async fn fragment_stream( .find(|t| t.track_number == track_num) .unwrap(); + let mk_info = matroska::Info { + duration: Some(info.duration), + timestamp_scale: iinfo.metadata[file_index].info.timestamp_scale, + ..Default::default() + }; + let mk_tracks = matroska::Tracks { + entries: vec![mk_track.to_owned()], + }; + + let cue_stat = generate_cues(&media_path)?; + let cluster_offset = cue_stat + .cues + .get(index) + .ok_or(anyhow!("fragment index out of range"))? + .position; + + let cluster = { + let media_file = File::open(media_path)?; + let mut media = create_demuxer_autodetect(Box::new(media_file))? + .ok_or(anyhow!("media container unknown"))?; + media.seek_cluster(Some(cluster_offset))?; + media + .read_cluster()? + .ok_or(anyhow!("cluster unexpectedly missing"))? + .1 + }; + + let jr_container = match container { + StreamContainer::WebM => ContainerFormat::Webm, + StreamContainer::Matroska => ContainerFormat::Matroska, + StreamContainer::WebVTT => todo!(), + StreamContainer::MPEG4 => todo!(), + StreamContainer::JVTT => todo!(), + }; + if format.remux { - match container { - StreamContainer::WebM | StreamContainer::Matroska => { - tokio::task::spawn_blocking(move || { - if let Err(err) = jellyremuxer::write_fragment_into( - SyncIoBridge::new(b), - &path, - track_num, - container == StreamContainer::WebM, - &info.name.unwrap_or_default(), - index, - ) { - warn!("segment stream error: {err}"); - } - }); - } - StreamContainer::MPEG4 => { - tokio::task::spawn_blocking(move || { - let mut buf = Cursor::new(Vec::new()); - if let Err(err) = jellyremuxer::write_fragment_into( - &mut buf, - &path, - track_num, - false, - &info.name.unwrap_or_default(), - index, - ) { - warn!("segment stream error: {err}"); - } - buf.seek(SeekFrom::Start(0)).unwrap(); - if let Err(err) = matroska_to_mpeg4(buf, SyncIoBridge::new(b)) { - warn!("mpeg4 transmux failed: {err}"); - } - }); - } - StreamContainer::JVTT => {} - _ => bail!("not yet supported"), - } + let mut out = Vec::new(); + write_fragment(jr_container, &mut out, mk_info, mk_tracks, cluster)?; + Ok(Box::new(Cursor::new(out))) } else { - let location = transcode( - track.kind, - orig_track, - format, - &format!("{path:?} {track_num} {index}"), - move |b| { - tokio::task::spawn_blocking(move || { - if let Err(err) = jellyremuxer::write_fragment_into( - SyncIoBridge::new(b), - &path, - track_num, - false, - &info.name.unwrap_or_default(), - index, - ) { - warn!("segment stream error: {err}"); - } - }); - }, - ) - .await?; - - let mut frag = File::open(location.abs()).await?; - match container { - StreamContainer::WebM => { - tokio::task::spawn_blocking(move || { - if let Err(err) = - matroska_to_webm(SyncIoBridge::new(frag), SyncIoBridge::new(b)) - { - warn!("webm transmux failed: {err}"); - } - }); - } - StreamContainer::Matroska => { - tokio::task::spawn(async move { - if let Err(err) = tokio::io::copy(&mut frag, &mut b).await { - warn!("cannot write stream: {err}") - } - }); - } - StreamContainer::MPEG4 => { - tokio::task::spawn_blocking(move || { - if let Err(err) = - matroska_to_mpeg4(SyncIoBridge::new(frag), SyncIoBridge::new(b)) - { - warn!("mpeg4 transmux failed: {err}"); - } - }); - } - _ => bail!("unsupported"), - } + todo!() } - Ok(()) + // if format.remux { + // match container { + // StreamContainer::WebM | StreamContainer::Matroska => { + // tokio::task::spawn_blocking(move || { + // if let Err(err) = jellyremuxer::write_fragment_into( + // SyncIoBridge::new(b), + // &path, + // track_num, + // container == StreamContainer::WebM, + // &info.name.unwrap_or_default(), + // index, + // ) { + // warn!("segment stream error: {err}"); + // } + // }); + // } + // StreamContainer::MPEG4 => { + // tokio::task::spawn_blocking(move || { + // let mut buf = Cursor::new(Vec::new()); + // if let Err(err) = jellyremuxer::write_fragment_into( + // &mut buf, + // &path, + // track_num, + // false, + // &info.name.unwrap_or_default(), + // index, + // ) { + // warn!("segment stream error: {err}"); + // } + // buf.seek(SeekFrom::Start(0)).unwrap(); + // if let Err(err) = matroska_to_mpeg4(buf, SyncIoBridge::new(b)) { + // warn!("mpeg4 transmux failed: {err}"); + // } + // }); + // } + // StreamContainer::JVTT => {} + // _ => bail!("not yet supported"), + // } + // } else { + // let location = transcode( + // track.kind, + // orig_track, + // format, + // &format!("{path:?} {track_num} {index}"), + // move |b| { + // tokio::task::spawn_blocking(move || { + // if let Err(err) = jellyremuxer::write_fragment_into( + // SyncIoBridge::new(b), + // &path, + // track_num, + // false, + // &info.name.unwrap_or_default(), + // index, + // ) { + // warn!("segment stream error: {err}"); + // } + // }); + // }, + // ) + // .await?; + + // let mut frag = File::open(location.abs()).await?; + // match container { + // StreamContainer::WebM => { + // tokio::task::spawn_blocking(move || { + // if let Err(err) = + // matroska_to_webm(SyncIoBridge::new(frag), SyncIoBridge::new(b)) + // { + // warn!("webm transmux failed: {err}"); + // } + // }); + // } + // StreamContainer::Matroska => { + // tokio::task::spawn(async move { + // if let Err(err) = tokio::io::copy(&mut frag, &mut b).await { + // warn!("cannot write stream: {err}") + // } + // }); + // } + // StreamContainer::MPEG4 => { + // tokio::task::spawn_blocking(move || { + // if let Err(err) = + // matroska_to_mpeg4(SyncIoBridge::new(frag), SyncIoBridge::new(b)) + // { + // warn!("mpeg4 transmux failed: {err}"); + // } + // }); + // } + // _ => bail!("unsupported"), + // } + // } } diff --git a/stream/src/fragment_index.rs b/stream/src/fragment_index.rs index cb54948..9d82cd7 100644 --- a/stream/src/fragment_index.rs +++ b/stream/src/fragment_index.rs @@ -3,30 +3,45 @@ which is licensed under the GNU Affero General Public License (version 3); see /COPYING. Copyright (C) 2025 metamuffin */ -use crate::{stream_info, SMediaInfo}; +use crate::{cues::generate_cues, stream_info, SMediaInfo}; use anyhow::{anyhow, Result}; -use jellystream_types::{SegmentNum, TrackNum}; -use std::sync::Arc; -use tokio::io::{AsyncWriteExt, DuplexStream}; +use jellystream_types::TrackNum; +use std::{ + io::{Cursor, Read}, + ops::Range, + sync::Arc, +}; -pub async fn fragment_index_stream( - mut b: DuplexStream, - info: Arc, - _segment: SegmentNum, - track: TrackNum, -) -> Result<()> { - let (iinfo, _info) = stream_info(info).await?; - let (file_index, track_num) = *iinfo +pub fn fragment_index(info: Arc, track: TrackNum) -> Result>> { + let (iinfo, info) = stream_info(info)?; + let (file_index, _) = *iinfo .track_to_file .get(track) .ok_or(anyhow!("track not found"))?; - let fragments = tokio::task::spawn_blocking(move || { - jellyremuxer::fragment::fragment_index(&iinfo.paths[file_index], track_num) - }) - .await??; + let cue_stat = generate_cues(&iinfo.paths[file_index])?; + + Ok(cue_stat + .cues + .iter() + .map(|c| c.time as f64 / 1_000_000_000.) + .zip( + cue_stat + .cues + .iter() + .skip(1) + .map(|c| c.time as f64 / 1_000_000_000.) + .chain([info.duration]), + ) + .map(|(start, end)| start..end) + .collect()) +} - let out = serde_json::to_string(&fragments)?; - tokio::spawn(async move { b.write_all(out.as_bytes()).await }); - Ok(()) +pub fn fragment_index_stream( + info: Arc, + track: TrackNum, +) -> Result> { + Ok(Box::new(Cursor::new(serde_json::to_string( + &fragment_index(info, track)?, + )?))) } diff --git a/stream/src/hls.rs b/stream/src/hls.rs index 949ddb4..2c91365 100644 --- a/stream/src/hls.rs +++ b/stream/src/hls.rs @@ -4,60 +4,30 @@ Copyright (C) 2025 metamuffin */ -use crate::{stream_info, SMediaInfo}; -use anyhow::{anyhow, Result}; -use jellystream_types::{FormatNum, SegmentNum, StreamContainer, StreamSpec, TrackKind, TrackNum}; -use std::{fmt::Write, ops::Range, sync::Arc}; -use tokio::{ - io::{AsyncWriteExt, DuplexStream}, - task::spawn_blocking, +use crate::{fragment_index::fragment_index, stream_info, SMediaInfo}; +use anyhow::Result; +use jellystream_types::{FormatNum, StreamContainer, StreamSpec, TrackKind, TrackNum}; +use std::{ + fmt::Write, + io::{Cursor, Read}, + ops::Range, + sync::Arc, }; -pub async fn hls_supermultivariant_stream( - mut b: DuplexStream, +pub fn hls_multivariant_stream( info: Arc, container: StreamContainer, -) -> Result<()> { - let (_iinfo, info) = stream_info(info).await?; - let mut out = String::new(); - writeln!(out, "#EXTM3U")?; - writeln!(out, "#EXT-X-VERSION:4")?; - for (i, _seg) in info.segments.iter().enumerate() { - let uri = format!( - "stream{}", - StreamSpec::HlsMultiVariant { - segment: i, - container, - } - .to_query() - ); - writeln!(out, "{uri}")?; - } - tokio::spawn(async move { b.write_all(out.as_bytes()).await }); - Ok(()) -} - -pub async fn hls_multivariant_stream( - mut b: DuplexStream, - info: Arc, - segment: SegmentNum, - container: StreamContainer, -) -> Result<()> { - let (_iinfo, info) = stream_info(info).await?; - let seg = info - .segments - .get(segment) - .ok_or(anyhow!("segment not found"))?; +) -> Result> { + let (_iinfo, info) = stream_info(info)?; let mut out = String::new(); writeln!(out, "#EXTM3U")?; writeln!(out, "#EXT-X-VERSION:4")?; // writeln!(out, "#EXT-X-INDEPENDENT-SEGMENTS")?; - for (i, t) in seg.tracks.iter().enumerate() { + for (i, t) in info.tracks.iter().enumerate() { let uri = format!( "stream{}", StreamSpec::HlsVariant { - segment, track: i, container, format: 0 @@ -73,37 +43,23 @@ pub async fn hls_multivariant_stream( writeln!(out, "#EXT-X-STREAM-INF:BANDWIDTH=5000000,TYPE={type}")?; writeln!(out, "{uri}")?; } - tokio::spawn(async move { b.write_all(out.as_bytes()).await }); - Ok(()) + + Ok(Box::new(Cursor::new(out))) } -pub async fn hls_variant_stream( - mut b: DuplexStream, +pub fn hls_variant_stream( info: Arc, - segment: SegmentNum, track: TrackNum, format: FormatNum, container: StreamContainer, -) -> Result<()> { - let (iinfo, info) = stream_info(info).await?; - let (file_index, track_num) = *iinfo - .track_to_file - .get(track) - .ok_or(anyhow!("track not found"))?; - let seg = info - .segments - .get(segment) - .ok_or(anyhow!("segment not found"))?; - - let frags = spawn_blocking(move || { - jellyremuxer::fragment::fragment_index(&iinfo.paths[file_index], track_num) - }) - .await??; +) -> Result> { + let frags = fragment_index(info.clone(), track)?; + let (_, info) = stream_info(info)?; let mut out = String::new(); writeln!(out, "#EXTM3U")?; writeln!(out, "#EXT-X-PLAYLIST-TYPE:VOD")?; - writeln!(out, "#EXT-X-TARGETDURATION:{}", seg.duration)?; + writeln!(out, "#EXT-X-TARGETDURATION:{}", info.duration)?; writeln!(out, "#EXT-X-VERSION:4")?; writeln!(out, "#EXT-X-MEDIA-SEQUENCE:0")?; @@ -113,7 +69,6 @@ pub async fn hls_variant_stream( out, "stream{}", StreamSpec::Fragment { - segment, track, index, container, @@ -125,6 +80,5 @@ pub async fn hls_variant_stream( writeln!(out, "#EXT-X-ENDLIST")?; - tokio::spawn(async move { b.write_all(out.as_bytes()).await }); - Ok(()) + Ok(Box::new(Cursor::new(out))) } diff --git a/stream/src/lib.rs b/stream/src/lib.rs index 5b4e8ed..60c283c 100644 --- a/stream/src/lib.rs +++ b/stream/src/lib.rs @@ -4,30 +4,29 @@ Copyright (C) 2025 metamuffin */ #![feature(iterator_try_collect)] +pub mod cues; mod fragment; mod fragment_index; mod hls; +pub mod metadata; mod stream_info; mod webvtt; use anyhow::{anyhow, bail, Context, Result}; use fragment::fragment_stream; use fragment_index::fragment_index_stream; -use hls::{hls_multivariant_stream, hls_supermultivariant_stream, hls_variant_stream}; +use hls::{hls_multivariant_stream, hls_variant_stream}; use jellystream_types::{StreamContainer, StreamSpec}; use serde::{Deserialize, Serialize}; use std::{ collections::BTreeSet, - io::SeekFrom, + fs::File, + io::{Read, Seek, SeekFrom}, ops::Range, path::PathBuf, sync::{Arc, LazyLock, Mutex}, }; use stream_info::{stream_info, write_stream_info}; -use tokio::{ - fs::File, - io::{duplex, AsyncReadExt, AsyncSeekExt, AsyncWriteExt, DuplexStream}, -}; #[rustfmt::skip] #[derive(Debug, Deserialize, Serialize, Default)] @@ -60,98 +59,69 @@ pub struct StreamHead { } pub fn stream_head(spec: &StreamSpec) -> StreamHead { - let cons = |ct: &'static str, rs: bool| StreamHead { - content_type: ct, - range_supported: rs, - }; + use StreamContainer::*; + use StreamSpec::*; let container_ct = |x: StreamContainer| match x { - StreamContainer::WebM => "video/webm", - StreamContainer::Matroska => "video/x-matroska", - StreamContainer::WebVTT => "text/vtt", - StreamContainer::JVTT => "application/jellything-vtt+json", - StreamContainer::MPEG4 => "video/mp4", + WebM => "video/webm", + Matroska => "video/x-matroska", + WebVTT => "text/vtt", + JVTT => "application/jellything-vtt+json", + MPEG4 => "video/mp4", }; - match spec { - StreamSpec::Remux { container, .. } => cons(container_ct(*container), true), - StreamSpec::Original { .. } => cons("video/x-matroska", true), - StreamSpec::HlsSuperMultiVariant { .. } => cons("application/vnd.apple.mpegurl", false), - StreamSpec::HlsMultiVariant { .. } => cons("application/vnd.apple.mpegurl", false), - StreamSpec::HlsVariant { .. } => cons("application/vnd.apple.mpegurl", false), - StreamSpec::Info { .. } => cons("application/jellything-stream-info+json", false), - StreamSpec::FragmentIndex { .. } => cons("application/jellything-frag-index+json", false), - StreamSpec::Fragment { container, .. } => cons(container_ct(*container), false), + let range_supported = matches!(spec, Remux { .. } | Original { .. }); + let content_type = match spec { + Original { .. } => "video/x-matroska", + HlsMultiVariant { .. } => "application/vnd.apple.mpegurl", + HlsVariant { .. } => "application/vnd.apple.mpegurl", + Info { .. } => "application/jellything-stream-info+json", + FragmentIndex { .. } => "application/jellything-frag-index+json", + Fragment { container, .. } => container_ct(*container), + Remux { container, .. } => container_ct(*container), + }; + StreamHead { + content_type, + range_supported, } } -pub async fn stream( +pub fn stream( info: Arc, spec: StreamSpec, - range: Range, -) -> Result { - let (a, b) = duplex(4096); - + range: Range, +) -> Result> { match spec { - StreamSpec::Original { track } => original_stream(info, track, range, b).await?, - StreamSpec::HlsSuperMultiVariant { container } => { - hls_supermultivariant_stream(b, info, container).await?; - } - StreamSpec::HlsMultiVariant { segment, container } => { - hls_multivariant_stream(b, info, segment, container).await? - } + StreamSpec::Original { track } => original_stream(info, track, range), + StreamSpec::HlsMultiVariant { container } => hls_multivariant_stream(info, container), StreamSpec::HlsVariant { - segment, track, container, format, - } => hls_variant_stream(b, info, segment, track, format, container).await?, - StreamSpec::Info { segment: _ } => write_stream_info(info, b).await?, - StreamSpec::FragmentIndex { segment, track } => { - fragment_index_stream(b, info, segment, track).await? - } + } => hls_variant_stream(info, track, format, container), + StreamSpec::Info => write_stream_info(info), + StreamSpec::FragmentIndex { track } => fragment_index_stream(info, track), StreamSpec::Fragment { - segment, track, index, container, format, - } => fragment_stream(b, info, track, segment, index, format, container).await?, + } => fragment_stream(info, track, index, format, container), _ => bail!("todo"), } - - Ok(a) } -async fn original_stream( +fn original_stream( info: Arc, track: usize, - range: Range, - b: DuplexStream, -) -> Result<()> { - let (iinfo, _info) = stream_info(info).await?; + range: Range, +) -> Result> { + let (iinfo, _info) = stream_info(info)?; let (file_index, _) = *iinfo .track_to_file .get(track) .ok_or(anyhow!("unknown track"))?; - let mut file = File::open(&iinfo.paths[file_index]) - .await - .context("opening source")?; + let mut file = File::open(&iinfo.paths[file_index]).context("opening source")?; file.seek(SeekFrom::Start(range.start as u64)) - .await .context("seek source")?; - tokio::task::spawn(copy_stream(file, b, range.end - range.start)); - - Ok(()) -} - -async fn copy_stream(mut inp: File, mut out: DuplexStream, mut amount: usize) -> Result<()> { - let mut buf = [0u8; 4096]; - loop { - let size = inp.read(&mut buf[..amount.min(4096)]).await?; - if size == 0 { - break Ok(()); - } - out.write_all(&buf[..size]).await?; - amount -= size; - } + Ok(Box::new(file.take(range.end - range.start))) } diff --git a/stream/src/metadata.rs b/stream/src/metadata.rs new file mode 100644 index 0000000..9bfa3aa --- /dev/null +++ b/stream/src/metadata.rs @@ -0,0 +1,26 @@ +/* + This file is part of jellything (https://codeberg.org/metamuffin/jellything) + which is licensed under the GNU Affero General Public License (version 3); see /COPYING. + Copyright (C) 2025 metamuffin +*/ + +use anyhow::{anyhow, Result}; +use jellycache::cache_memory; +use jellyremuxer::{demuxers::create_demuxer_autodetect, matroska::Segment}; +use std::{fs::File, path::Path, sync::Arc}; + +pub fn read_metadata(path: &Path) -> Result> { + cache_memory("mkmeta-v4", path, move || { + let media = File::open(path)?; + let mut media = + create_demuxer_autodetect(Box::new(media))?.ok_or(anyhow!("media format unknown"))?; + + let info = media.info()?; + let tracks = media.tracks()?; + Ok(Segment { + info, + tracks, + ..Default::default() + }) + }) +} diff --git a/stream/src/stream_info.rs b/stream/src/stream_info.rs index 560ec9b..7ebc399 100644 --- a/stream/src/stream_info.rs +++ b/stream/src/stream_info.rs @@ -3,59 +3,51 @@ which is licensed under the GNU Affero General Public License (version 3); see /COPYING. Copyright (C) 2025 metamuffin */ -use crate::{SMediaInfo, CONF}; +use crate::{cues::generate_cues, metadata::read_metadata, SMediaInfo, CONF}; use anyhow::Result; -use jellyremuxer::{ - metadata::{matroska_metadata, MatroskaMetadata, MatroskaTrackEntry}, - seek_index::get_track_sizes, -}; +use jellyremuxer::matroska::{self, Segment, TrackEntry, TrackType}; use jellystream_types::{ - StreamContainer, StreamFormatInfo, StreamInfo, StreamSegmentInfo, StreamTrackInfo, TrackKind, + StreamContainer, StreamFormatInfo, StreamInfo, StreamTrackInfo, TrackKind, }; -use std::{collections::BTreeMap, path::PathBuf, sync::Arc}; -use tokio::{ - io::{AsyncWriteExt, DuplexStream}, - spawn, - task::spawn_blocking, +use std::{ + io::{Cursor, Read}, + path::PathBuf, + sync::Arc, }; -async fn async_matroska_metadata(path: PathBuf) -> Result> { - spawn_blocking(move || matroska_metadata(&path)).await? -} - -async fn async_get_track_sizes(path: PathBuf) -> Result> { - spawn_blocking(move || get_track_sizes(&path)).await? -} - pub(crate) struct InternalStreamInfo { pub paths: Vec, - pub metadata: Vec>, + pub metadata: Vec>, pub track_to_file: Vec<(usize, u64)>, } // TODO cache mem -pub(crate) async fn stream_info(info: Arc) -> Result<(InternalStreamInfo, StreamInfo)> { +pub(crate) fn stream_info(info: Arc) -> Result<(InternalStreamInfo, StreamInfo)> { let mut tracks = Vec::new(); let mut track_to_file = Vec::new(); let mut metadata_arr = Vec::new(); let mut paths = Vec::new(); for (i, path) in info.files.iter().enumerate() { - let metadata = async_matroska_metadata(path.clone()).await?; - let sizes = async_get_track_sizes(path.clone()).await?; + let metadata = read_metadata(&path)?; + let cue_stat = generate_cues(&path)?; if let Some(t) = &metadata.tracks { - let duration = media_duration(&metadata); + let duration = media_duration(&metadata.info); for t in &t.entries { - let bitrate = - sizes.get(&t.track_number).copied().unwrap_or_default() as f64 / duration * 8.; + let byterate = cue_stat + .stats + .get(&t.track_number) + .map(|e| e.total_size) + .unwrap_or_default() as f64 + / duration; tracks.push(StreamTrackInfo { name: None, kind: match t.track_type { - 1 => TrackKind::Video, - 2 => TrackKind::Audio, - 17 => TrackKind::Subtitle, + matroska::TrackType::Video => TrackKind::Video, + matroska::TrackType::Audio => TrackKind::Audio, + matroska::TrackType::Subtitle => TrackKind::Subtitle, _ => todo!(), }, - formats: stream_formats(t, bitrate), + formats: stream_formats(t, byterate * 8.), }); track_to_file.push((i, t.track_number)); } @@ -64,11 +56,7 @@ pub(crate) async fn stream_info(info: Arc) -> Result<(InternalStream paths.push(path.to_owned()); } - let segment = StreamSegmentInfo { - name: None, - duration: media_duration(&metadata_arr[0]), - tracks, - }; + let duration = media_duration(&metadata_arr[0].info); // TODO different durations?! Ok(( InternalStreamInfo { metadata: metadata_arr, @@ -77,12 +65,13 @@ pub(crate) async fn stream_info(info: Arc) -> Result<(InternalStream }, StreamInfo { name: info.title.clone(), - segments: vec![segment], + duration, + tracks, }, )) } -fn stream_formats(t: &MatroskaTrackEntry, remux_bitrate: f64) -> Vec { +fn stream_formats(t: &TrackEntry, remux_bitrate: f64) -> Vec { let mut formats = Vec::new(); formats.push(StreamFormatInfo { codec: t.codec_id.to_string(), @@ -97,7 +86,7 @@ fn stream_formats(t: &MatroskaTrackEntry, remux_bitrate: f64) -> Vec { + TrackType::Video => { let sw = t.video.as_ref().unwrap().pixel_width; let sh = t.video.as_ref().unwrap().pixel_height; for (w, br) in [ @@ -136,7 +125,7 @@ fn stream_formats(t: &MatroskaTrackEntry, remux_bitrate: f64) -> Vec { + TrackType::Audio => { for br in [256e3, 128e3, 64e3] { formats.push(StreamFormatInfo { codec: "A_OPUS".to_string(), @@ -151,7 +140,7 @@ fn stream_formats(t: &MatroskaTrackEntry, remux_bitrate: f64) -> Vec {} + TrackType::Subtitle => {} _ => {} } @@ -168,13 +157,11 @@ fn containers_by_codec(codec: &str) -> Vec { } } -pub(crate) async fn write_stream_info(info: Arc, mut b: DuplexStream) -> Result<()> { - let (_, info) = stream_info(info).await?; - spawn(async move { b.write_all(&serde_json::to_vec(&info)?).await }); - Ok(()) +pub(crate) fn write_stream_info(info: Arc) -> Result> { + let (_, info) = stream_info(info)?; + Ok(Box::new(Cursor::new(serde_json::to_vec(&info)?))) } -fn media_duration(m: &MatroskaMetadata) -> f64 { - let info = m.info.as_ref().unwrap(); +fn media_duration(info: &matroska::Info) -> f64 { (info.duration.unwrap_or_default() * info.timestamp_scale as f64) / 1_000_000_000. } diff --git a/stream/types/src/lib.rs b/stream/types/src/lib.rs index a90db03..a031f3a 100644 --- a/stream/types/src/lib.rs +++ b/stream/types/src/lib.rs @@ -6,7 +6,6 @@ use serde::{Deserialize, Serialize}; use std::{collections::BTreeMap, fmt::Display, str::FromStr}; -pub type SegmentNum = usize; pub type TrackNum = usize; pub type FormatNum = usize; pub type IndexNum = usize; @@ -27,28 +26,19 @@ pub enum StreamSpec { Original { track: TrackNum, }, - HlsSuperMultiVariant { - container: StreamContainer, - }, HlsMultiVariant { - segment: SegmentNum, container: StreamContainer, }, HlsVariant { - segment: SegmentNum, track: TrackNum, container: StreamContainer, format: FormatNum, }, - Info { - segment: Option, - }, + Info, FragmentIndex { - segment: SegmentNum, track: TrackNum, }, Fragment { - segment: SegmentNum, track: TrackNum, index: IndexNum, container: StreamContainer, @@ -64,12 +54,6 @@ pub enum StreamSpec { #[derive(Debug, Clone, Deserialize, Serialize)] pub struct StreamInfo { - pub name: Option, - pub segments: Vec, -} - -#[derive(Debug, Clone, Deserialize, Serialize)] -pub struct StreamSegmentInfo { pub name: Option, pub duration: f64, pub tracks: Vec, @@ -128,35 +112,25 @@ impl StreamSpec { ) } StreamSpec::Original { track } => format!("?original&track={track}"), - StreamSpec::HlsSuperMultiVariant { container } => { - format!("?hlssupermultivariant&container={container}") - } - StreamSpec::HlsMultiVariant { segment, container } => { - format!("?hlsmultivariant&segment={segment}&container={container}") + StreamSpec::HlsMultiVariant { container } => { + format!("?hlsmultivariant&container={container}") } StreamSpec::HlsVariant { - segment, track, container, format, - } => format!( - "?hlsvariant&segment={segment}&track={track}&container={container}&format={format}" - ), - StreamSpec::Info { - segment: Some(segment), - } => format!("?info&segment={segment}"), - StreamSpec::Info { segment: None } => "?info".to_string(), - StreamSpec::FragmentIndex { segment, track } => { - format!("?fragmentindex&segment={segment}&track={track}") + } => format!("?hlsvariant&track={track}&container={container}&format={format}"), + StreamSpec::Info => "?info".to_string(), + StreamSpec::FragmentIndex { track } => { + format!("?fragmentindex&track={track}") } StreamSpec::Fragment { - segment, track, index, container, format, } => format!( - "?fragment&segment={segment}&track={track}&index={index}&container={container}&format={format}" + "?fragment&track={track}&index={index}&container={container}&format={format}" ), } } @@ -173,32 +147,24 @@ impl StreamSpec { ) } StreamSpec::Original { track } => format!("?original&t={track}"), - StreamSpec::HlsSuperMultiVariant { container } => { - format!("?hlssupermultivariant&c={container}") - } - StreamSpec::HlsMultiVariant { segment, container } => { - format!("?hlsmultivariant&s={segment}&c={container}") + StreamSpec::HlsMultiVariant { container } => { + format!("?hlsmultivariant&c={container}") } StreamSpec::HlsVariant { - segment, track, container, format, - } => format!("?hlsvariant&s={segment}&t={track}&c={container}&f={format}"), - StreamSpec::Info { - segment: Some(segment), - } => format!("?info&s={segment}"), - StreamSpec::Info { segment: None } => "?info".to_string(), - StreamSpec::FragmentIndex { segment, track } => { - format!("?fragmentindex&s={segment}&t={track}") + } => format!("?hlsvariant&t={track}&c={container}&f={format}"), + StreamSpec::Info => "?info".to_string(), + StreamSpec::FragmentIndex { track } => { + format!("?fragmentindex&t={track}") } StreamSpec::Fragment { - segment, track, index, container, format, - } => format!("?fragment&s={segment}&t={track}&i={index}&c={container}&f={format}"), + } => format!("?fragment&t={track}&i={index}&c={container}&f={format}"), } } pub fn from_query_kv(query: &BTreeMap) -> Result { @@ -207,7 +173,7 @@ impl StreamSpec { .get(k) .or(query.get(ks)) .ok_or(k) - .and_then(|a| a.parse().map_err(|_| "invalid number")) + .and_then(|a| a.parse::().map_err(|_| "invalid number")) }; let get_container = || { query @@ -217,28 +183,19 @@ impl StreamSpec { .and_then(|s| s.parse().map_err(|()| "unknown container")) }; if query.contains_key("info") { - Ok(Self::Info { - segment: get_num("segment", "s").ok(), - }) - } else if query.contains_key("hlssupermultivariant") { - Ok(Self::HlsSuperMultiVariant { - container: get_container().ok().unwrap_or(StreamContainer::Matroska), - }) + Ok(Self::Info) } else if query.contains_key("hlsmultivariant") { Ok(Self::HlsMultiVariant { - segment: get_num("segment", "s")? as SegmentNum, container: get_container()?, }) } else if query.contains_key("hlsvariant") { Ok(Self::HlsVariant { - segment: get_num("segment", "s")? as SegmentNum, track: get_num("track", "t")? as TrackNum, format: get_num("format", "f")? as FormatNum, container: get_container()?, }) } else if query.contains_key("fragment") { Ok(Self::Fragment { - segment: get_num("segment", "s")? as SegmentNum, track: get_num("track", "t")? as TrackNum, format: get_num("format", "f")? as FormatNum, index: get_num("index", "i")? as IndexNum, @@ -246,7 +203,6 @@ impl StreamSpec { }) } else if query.contains_key("fragmentindex") { Ok(Self::FragmentIndex { - segment: get_num("segment", "s")? as SegmentNum, track: get_num("track", "t")? as TrackNum, }) } else { -- cgit v1.2.3-70-g09d2