aboutsummaryrefslogtreecommitdiff
path: root/stream/src/fragment.rs
blob: dfe101ea17488f6bde07a3d7d3e79702757aa49e (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
/*
    This file is part of jellything (https://codeberg.org/metamuffin/jellything)
    which is licensed under the GNU Affero General Public License (version 3); see /COPYING.
    Copyright (C) 2025 metamuffin <metamuffin.org>
*/
use crate::{stream_info, SMediaInfo};
use anyhow::{anyhow, bail, Result};
use jellybase::common::stream::StreamContainer;
use jellyremuxer::{matroska_to_mpeg4, matroska_to_webm::matroska_to_webm};
use jellytranscoder::fragment::transcode;
use log::warn;
use std::sync::Arc;
use tokio::{fs::File, io::DuplexStream};
use tokio_util::io::SyncIoBridge;

pub async fn fragment_stream(
    mut b: DuplexStream,
    info: Arc<SMediaInfo>,
    track: usize,
    segment: usize,
    index: usize,
    format_num: usize,
    container: StreamContainer,
) -> Result<()> {
    let (iinfo, info) = stream_info(info).await?;
    let (file_index, track_num) = *iinfo
        .track_to_file
        .get(track)
        .ok_or(anyhow!("track not found"))?;
    let path = iinfo.paths[file_index].clone();
    let seg = info
        .segments
        .get(segment)
        .ok_or(anyhow!("segment not found"))?;
    let track = seg.tracks.get(track).ok_or(anyhow!("track not found"))?;
    let format = track
        .formats
        .get(format_num)
        .ok_or(anyhow!("format not found"))?;

    if format.remux {
        tokio::task::spawn_blocking(move || {
            if let Err(err) = jellyremuxer::write_fragment_into(
                SyncIoBridge::new(b),
                &path,
                track_num,
                container == StreamContainer::WebM,
                &info.name.unwrap_or_default(),
                index,
            ) {
                warn!("segment stream error: {err}");
            }
        });
    } else {
        let location = transcode(
            &format!("{path:?} {track_num} {index} {format_num} {container}"), // TODO maybe not use the entire source
            track.kind,
            format,
            move |b| {
                tokio::task::spawn_blocking(move || {
                    if let Err(err) = jellyremuxer::write_fragment_into(
                        SyncIoBridge::new(b),
                        &path,
                        track_num,
                        false,
                        &info.name.unwrap_or_default(),
                        index,
                    ) {
                        warn!("segment stream error: {err}");
                    }
                });
            },
        )
        .await?;

        let mut frag = File::open(location.abs()).await?;
        match container {
            StreamContainer::WebM => {
                tokio::task::spawn_blocking(move || {
                    if let Err(err) =
                        matroska_to_webm(SyncIoBridge::new(frag), SyncIoBridge::new(b))
                    {
                        warn!("webm transmux failed: {err}");
                    }
                });
            }
            StreamContainer::Matroska => {
                tokio::task::spawn(async move {
                    if let Err(err) = tokio::io::copy(&mut frag, &mut b).await {
                        warn!("cannot write stream: {err}")
                    }
                });
            }
            StreamContainer::MPEG4 => {
                tokio::task::spawn_blocking(move || {
                    if let Err(err) =
                        matroska_to_mpeg4(SyncIoBridge::new(frag), SyncIoBridge::new(b))
                    {
                        warn!("mpeg4 transmux failed: {err}");
                    }
                });
            }
            _ => bail!("unsupported"),
        }
    }

    Ok(())
}