aboutsummaryrefslogtreecommitdiff
path: root/stream/src/lib.rs
diff options
context:
space:
mode:
Diffstat (limited to 'stream/src/lib.rs')
-rw-r--r--stream/src/lib.rs74
1 files changed, 46 insertions, 28 deletions
diff --git a/stream/src/lib.rs b/stream/src/lib.rs
index 751ecfa..59b4960 100644
--- a/stream/src/lib.rs
+++ b/stream/src/lib.rs
@@ -9,8 +9,7 @@ pub mod hls;
pub mod jhls;
pub mod webvtt;
-use anyhow::Result;
-use ebml_struct::matroska::{Info, Tracks};
+use anyhow::{anyhow, Context, Result};
use jellybase::common::{
stream::{
StreamContainer, StreamFormatInfo, StreamInfo, StreamSegmentInfo, StreamSpec,
@@ -18,12 +17,11 @@ use jellybase::common::{
},
LocalTrack, Node,
};
-use jellymatroska::block::LacingType;
use jellyremuxer::metadata::{matroska_metadata, MatroskaMetadata};
-use std::{collections::BTreeSet, ops::Range, path::PathBuf, sync::Arc};
+use std::{collections::BTreeSet, io::SeekFrom, ops::Range, path::PathBuf, sync::Arc};
use tokio::{
fs::File,
- io::{duplex, AsyncReadExt, AsyncWriteExt, DuplexStream},
+ io::{duplex, AsyncReadExt, AsyncSeekExt, AsyncWriteExt, DuplexStream},
task::spawn_blocking,
};
use tokio_util::io::SyncIoBridge;
@@ -75,7 +73,7 @@ pub async fn stream(
StreamSpec::Whep { track, seek } => todo!(),
StreamSpec::WhepControl { token } => todo!(),
StreamSpec::Remux { tracks, container } => todo!(),
- StreamSpec::Original { track } => todo!(),
+ StreamSpec::Original { track } => original_stream(info, track, range, b).await?,
StreamSpec::HlsSuperMultiVariant { container } => todo!(),
StreamSpec::HlsMultiVariant { segment, container } => todo!(),
StreamSpec::HlsVariant {
@@ -102,15 +100,26 @@ async fn async_matroska_metadata(path: PathBuf) -> Result<Arc<Option<MatroskaMet
Ok(spawn_blocking(move || matroska_metadata(&path)).await??)
}
-async fn stream_info(info: Arc<SMediaInfo>) -> Result<StreamInfo> {
+struct InternalStreamInfo {
+ paths: Vec<PathBuf>,
+ metadata: Vec<MatroskaMetadata>,
+ track_to_file: Vec<usize>,
+}
+
+async fn stream_info(info: Arc<SMediaInfo>) -> Result<(InternalStreamInfo, StreamInfo)> {
let mut metadata = Vec::new();
+ let mut paths = Vec::new();
for path in &info.files {
- metadata.extend((*async_matroska_metadata(path.clone()).await?).clone());
+ if let Some(meta) = (*async_matroska_metadata(path.clone()).await?).clone() {
+ metadata.push(meta);
+ paths.push(path.clone());
+ }
}
let mut tracks = Vec::new();
+ let mut track_to_file = Vec::new();
- for m in &metadata {
+ for (i, m) in metadata.iter().enumerate() {
if let Some(t) = &m.tracks {
for t in &t.entries {
let mut formats = Vec::new();
@@ -134,7 +143,8 @@ async fn stream_info(info: Arc<SMediaInfo>) -> Result<StreamInfo> {
_ => todo!(),
},
formats,
- })
+ });
+ track_to_file.push(i);
}
}
}
@@ -144,14 +154,21 @@ async fn stream_info(info: Arc<SMediaInfo>) -> Result<StreamInfo> {
duration: 0,
tracks,
};
- Ok(StreamInfo {
- name: info.info.title.clone(),
- segments: vec![segment],
- })
+ Ok((
+ InternalStreamInfo {
+ metadata,
+ paths,
+ track_to_file,
+ },
+ StreamInfo {
+ name: info.info.title.clone(),
+ segments: vec![segment],
+ },
+ ))
}
async fn write_stream_info(info: Arc<SMediaInfo>, mut b: DuplexStream) -> Result<()> {
- let info = stream_info(info).await?;
+ let (_, info) = stream_info(info).await?;
b.write_all(&serde_json::to_vec(&info)?).await?;
Ok(())
}
@@ -181,24 +198,25 @@ async fn remux_stream(
}
async fn original_stream(
- local_tracks: Vec<LocalTrack>,
- spec: StreamSpec,
+ info: Arc<SMediaInfo>,
+ track: usize,
range: Range<usize>,
b: DuplexStream,
) -> Result<()> {
- // if spec.track.len() != 1 {
- // bail!("invalid amout of source \"tracks\". original only allows for exactly one.")
- // }
+ let (iinfo, _info) = stream_info(info).await?;
- // let source = local_tracks[spec.track[0]].clone();
- // let mut file = File::open(CONF.media_path.join(source.path))
- // .await
- // .context("opening source")?;
- // file.seek(SeekFrom::Start(range.start as u64))
- // .await
- // .context("seek source")?;
+ let file_index = *iinfo
+ .track_to_file
+ .get(track)
+ .ok_or(anyhow!("unknown track"))?;
+ let mut file = File::open(&iinfo.paths[file_index])
+ .await
+ .context("opening source")?;
+ file.seek(SeekFrom::Start(range.start as u64))
+ .await
+ .context("seek source")?;
- // tokio::task::spawn(copy_stream(file, b, range.end - range.start));
+ tokio::task::spawn(copy_stream(file, b, range.end - range.start));
Ok(())
}