aboutsummaryrefslogtreecommitdiff
path: root/stream/src/lib.rs
diff options
context:
space:
mode:
Diffstat (limited to 'stream/src/lib.rs')
-rw-r--r--stream/src/lib.rs110
1 files changed, 40 insertions, 70 deletions
diff --git a/stream/src/lib.rs b/stream/src/lib.rs
index 5b4e8ed..60c283c 100644
--- a/stream/src/lib.rs
+++ b/stream/src/lib.rs
@@ -4,30 +4,29 @@
Copyright (C) 2025 metamuffin <metamuffin.org>
*/
#![feature(iterator_try_collect)]
+pub mod cues;
mod fragment;
mod fragment_index;
mod hls;
+pub mod metadata;
mod stream_info;
mod webvtt;
use anyhow::{anyhow, bail, Context, Result};
use fragment::fragment_stream;
use fragment_index::fragment_index_stream;
-use hls::{hls_multivariant_stream, hls_supermultivariant_stream, hls_variant_stream};
+use hls::{hls_multivariant_stream, hls_variant_stream};
use jellystream_types::{StreamContainer, StreamSpec};
use serde::{Deserialize, Serialize};
use std::{
collections::BTreeSet,
- io::SeekFrom,
+ fs::File,
+ io::{Read, Seek, SeekFrom},
ops::Range,
path::PathBuf,
sync::{Arc, LazyLock, Mutex},
};
use stream_info::{stream_info, write_stream_info};
-use tokio::{
- fs::File,
- io::{duplex, AsyncReadExt, AsyncSeekExt, AsyncWriteExt, DuplexStream},
-};
#[rustfmt::skip]
#[derive(Debug, Deserialize, Serialize, Default)]
@@ -60,98 +59,69 @@ pub struct StreamHead {
}
pub fn stream_head(spec: &StreamSpec) -> StreamHead {
- let cons = |ct: &'static str, rs: bool| StreamHead {
- content_type: ct,
- range_supported: rs,
- };
+ use StreamContainer::*;
+ use StreamSpec::*;
let container_ct = |x: StreamContainer| match x {
- StreamContainer::WebM => "video/webm",
- StreamContainer::Matroska => "video/x-matroska",
- StreamContainer::WebVTT => "text/vtt",
- StreamContainer::JVTT => "application/jellything-vtt+json",
- StreamContainer::MPEG4 => "video/mp4",
+ WebM => "video/webm",
+ Matroska => "video/x-matroska",
+ WebVTT => "text/vtt",
+ JVTT => "application/jellything-vtt+json",
+ MPEG4 => "video/mp4",
};
- match spec {
- StreamSpec::Remux { container, .. } => cons(container_ct(*container), true),
- StreamSpec::Original { .. } => cons("video/x-matroska", true),
- StreamSpec::HlsSuperMultiVariant { .. } => cons("application/vnd.apple.mpegurl", false),
- StreamSpec::HlsMultiVariant { .. } => cons("application/vnd.apple.mpegurl", false),
- StreamSpec::HlsVariant { .. } => cons("application/vnd.apple.mpegurl", false),
- StreamSpec::Info { .. } => cons("application/jellything-stream-info+json", false),
- StreamSpec::FragmentIndex { .. } => cons("application/jellything-frag-index+json", false),
- StreamSpec::Fragment { container, .. } => cons(container_ct(*container), false),
+ let range_supported = matches!(spec, Remux { .. } | Original { .. });
+ let content_type = match spec {
+ Original { .. } => "video/x-matroska",
+ HlsMultiVariant { .. } => "application/vnd.apple.mpegurl",
+ HlsVariant { .. } => "application/vnd.apple.mpegurl",
+ Info { .. } => "application/jellything-stream-info+json",
+ FragmentIndex { .. } => "application/jellything-frag-index+json",
+ Fragment { container, .. } => container_ct(*container),
+ Remux { container, .. } => container_ct(*container),
+ };
+ StreamHead {
+ content_type,
+ range_supported,
}
}
-pub async fn stream(
+pub fn stream(
info: Arc<SMediaInfo>,
spec: StreamSpec,
- range: Range<usize>,
-) -> Result<DuplexStream> {
- let (a, b) = duplex(4096);
-
+ range: Range<u64>,
+) -> Result<Box<dyn Read + Send + Sync>> {
match spec {
- StreamSpec::Original { track } => original_stream(info, track, range, b).await?,
- StreamSpec::HlsSuperMultiVariant { container } => {
- hls_supermultivariant_stream(b, info, container).await?;
- }
- StreamSpec::HlsMultiVariant { segment, container } => {
- hls_multivariant_stream(b, info, segment, container).await?
- }
+ StreamSpec::Original { track } => original_stream(info, track, range),
+ StreamSpec::HlsMultiVariant { container } => hls_multivariant_stream(info, container),
StreamSpec::HlsVariant {
- segment,
track,
container,
format,
- } => hls_variant_stream(b, info, segment, track, format, container).await?,
- StreamSpec::Info { segment: _ } => write_stream_info(info, b).await?,
- StreamSpec::FragmentIndex { segment, track } => {
- fragment_index_stream(b, info, segment, track).await?
- }
+ } => hls_variant_stream(info, track, format, container),
+ StreamSpec::Info => write_stream_info(info),
+ StreamSpec::FragmentIndex { track } => fragment_index_stream(info, track),
StreamSpec::Fragment {
- segment,
track,
index,
container,
format,
- } => fragment_stream(b, info, track, segment, index, format, container).await?,
+ } => fragment_stream(info, track, index, format, container),
_ => bail!("todo"),
}
-
- Ok(a)
}
-async fn original_stream(
+fn original_stream(
info: Arc<SMediaInfo>,
track: usize,
- range: Range<usize>,
- b: DuplexStream,
-) -> Result<()> {
- let (iinfo, _info) = stream_info(info).await?;
+ range: Range<u64>,
+) -> Result<Box<dyn Read+ Send + Sync>> {
+ let (iinfo, _info) = stream_info(info)?;
let (file_index, _) = *iinfo
.track_to_file
.get(track)
.ok_or(anyhow!("unknown track"))?;
- let mut file = File::open(&iinfo.paths[file_index])
- .await
- .context("opening source")?;
+ let mut file = File::open(&iinfo.paths[file_index]).context("opening source")?;
file.seek(SeekFrom::Start(range.start as u64))
- .await
.context("seek source")?;
- tokio::task::spawn(copy_stream(file, b, range.end - range.start));
-
- Ok(())
-}
-
-async fn copy_stream(mut inp: File, mut out: DuplexStream, mut amount: usize) -> Result<()> {
- let mut buf = [0u8; 4096];
- loop {
- let size = inp.read(&mut buf[..amount.min(4096)]).await?;
- if size == 0 {
- break Ok(());
- }
- out.write_all(&buf[..size]).await?;
- amount -= size;
- }
+ Ok(Box::new(file.take(range.end - range.start)))
}