diff options
author | metamuffin <metamuffin@disroot.org> | 2022-12-11 14:33:17 +0100 |
---|---|---|
committer | metamuffin <metamuffin@disroot.org> | 2022-12-11 14:33:17 +0100 |
commit | 43bd492ffe61dfec82c172f3eeda869809aa0932 (patch) | |
tree | 301b8da07ef08da9dffb239c24e30775e97d230d /client-native-gui | |
parent | 62172003909956850879dee1252980b2547f8451 (diff) | |
download | keks-meet-43bd492ffe61dfec82c172f3eeda869809aa0932.tar keks-meet-43bd492ffe61dfec82c172f3eeda869809aa0932.tar.bz2 keks-meet-43bd492ffe61dfec82c172f3eeda869809aa0932.tar.zst |
stream playback with libmpv (native client)
Diffstat (limited to 'client-native-gui')
-rw-r--r-- | client-native-gui/Cargo.toml | 5 | ||||
-rw-r--r-- | client-native-gui/mpv.conf | 13 | ||||
-rw-r--r-- | client-native-gui/src/main.rs | 194 |
3 files changed, 200 insertions, 12 deletions
diff --git a/client-native-gui/Cargo.toml b/client-native-gui/Cargo.toml index 3912bc9..e917276 100644 --- a/client-native-gui/Cargo.toml +++ b/client-native-gui/Cargo.toml @@ -10,6 +10,11 @@ async-std = "1.12.0" tokio = { version = "1.21.2", features = ["full"] } env_logger = "0.8" log = "0.4" +anyhow = "1.0.66" +crossbeam-channel = "0.5.6" egui = "0.19.0" eframe = "0.19.0" + +libmpv = { git = "https://github.com/sirno/libmpv-rs.git", branch = "upgrade-libmpv" } + diff --git a/client-native-gui/mpv.conf b/client-native-gui/mpv.conf new file mode 100644 index 0000000..3669492 --- /dev/null +++ b/client-native-gui/mpv.conf @@ -0,0 +1,13 @@ +# configuration for the libmpv used in keks-meet +# this is equivalent to --profile=low-latency +audio-buffer=0 +vd-lavc-threads=1 +cache-pause=no +demuxer-lavf-o-add=fflags=+nobuffer +demuxer-lavf-probe-info=nostreams +demuxer-lavf-analyzeduration=0.1 +video-sync=audio +interpolation=no +video-latency-hacks=yes +stream-buffer-size=4k +correct-pts=yes diff --git a/client-native-gui/src/main.rs b/client-native-gui/src/main.rs index 69d1657..f92fa7a 100644 --- a/client-native-gui/src/main.rs +++ b/client-native-gui/src/main.rs @@ -5,13 +5,28 @@ use client_native_lib::{ instance::Instance, peer::Peer, protocol::{ProvideInfo, RelayMessage}, + webrtc::{ + rtcp::payload_feedbacks::picture_loss_indication::PictureLossIndication, + rtp::{codecs::h264::H264Packet, packetizer::Depacketizer}, + track::track_remote::TrackRemote, + }, Config, EventHandler, }; +use crossbeam_channel::Sender; use eframe::egui; use egui::{Ui, Visuals}; +use log::{debug, error, warn}; use std::{ - collections::HashMap, - sync::{Arc, RwLock}, + collections::{HashMap, VecDeque}, + fs::File, + io::Write, + ops::Deref, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, RwLock, + }, + thread, + time::Duration, }; use tokio::task::JoinHandle; @@ -44,8 +59,8 @@ enum App { } struct Ingame { - instance: Arc<Instance>, - handler: Arc<Handler>, + pub instance: Arc<Instance>, + pub handler: Arc<Handler>, } struct Handler { @@ -60,13 +75,15 @@ struct GuiPeer { struct GuiResource { info: ProvideInfo, - state: GuiResourceState, + state: Arc<RwLock<GuiResourceState>>, } +#[derive(Debug, Clone)] enum GuiResourceState { Available, Connecting, Connected, + Disconnecting, } impl App { @@ -136,12 +153,13 @@ impl GuiResource { "{} {} {:?}", self.info.id, self.info.kind, self.info.label )); - match self.state { + let state = self.state.read().unwrap().to_owned(); + match state { GuiResourceState::Available => { if ui.button("Enable").clicked() { let id = self.info.id.clone(); let peer = peer.clone(); - self.state = GuiResourceState::Connecting; + *self.state.write().unwrap() = GuiResourceState::Connecting; tokio::spawn(async move { peer.request_resource(id).await }); } } @@ -151,11 +169,17 @@ impl GuiResource { ui.label("Connecting...") }); } + GuiResourceState::Disconnecting => { + ui.horizontal(|ui| { + ui.spinner(); + ui.label("Disconnecting...") + }); + } GuiResourceState::Connected => { if ui.button("Disable").clicked() { let id = self.info.id.clone(); let peer = peer.clone(); - self.state = GuiResourceState::Available; + *self.state.write().unwrap() = GuiResourceState::Disconnecting; tokio::spawn(async move { peer.request_stop_resource(id).await }); } } @@ -213,7 +237,7 @@ impl EventHandler for Handler { info.id.clone(), GuiResource { info, - state: GuiResourceState::Available, + state: Arc::new(RwLock::new(GuiResourceState::Available)), }, ); } @@ -239,9 +263,20 @@ impl EventHandler for Handler { ) -> client_native_lib::DynFut<()> { if let Some(gp) = self.peers.write().unwrap().get_mut(&peer.id) { if let Some(gr) = gp.resources.get_mut(&resource.id) { - gr.state = GuiResourceState::Connected - - // TODO + *gr.state.write().unwrap() = GuiResourceState::Connected; + match channel { + client_native_lib::peer::TransportChannel::Track(track) => { + let peer = gp.peer.clone(); + let state = gr.state.clone(); + tokio::task::spawn_blocking(move || { + play(peer, track); + *state.write().unwrap() = GuiResourceState::Available; + }); + } + client_native_lib::peer::TransportChannel::DataChannel(_) => { + warn!("cant handle data channel yet") + } + } } } Box::pin(async move {}) @@ -261,3 +296,138 @@ impl EventHandler for Handler { Box::pin(async move {}) } } + +pub fn play(peer: Arc<Peer>, track: Arc<TrackRemote>) { + let rid = block_on(track.stream_id()); + let (exit_tx, exit_rx) = crossbeam_channel::unbounded(); + let has_exited = Arc::new(AtomicBool::new(false)); + let buffer = Arc::new(RwLock::new(VecDeque::new())); + + { + let buffer = buffer.clone(); + let track = track.clone(); + tokio::spawn(async move { + if let Err(e) = track_to_raw(track, buffer).await { + error!("export error: {e}"); + } + }); + } + { + let media_ssrc = track.ssrc(); + let peer = peer.clone(); + let has_exited = has_exited.clone(); + tokio::spawn(async move { + loop { + tokio::time::sleep(Duration::from_secs(3)).await; + debug!("sending pli"); + let r = peer + .peer_connection + .write_rtcp(&[Box::new(PictureLossIndication { + sender_ssrc: 0, + media_ssrc, + })]) + .await; + if r.is_err() { + break; + } + if has_exited.load(Ordering::Relaxed) { + break; + } + } + debug!("pli send loop exited"); + }); + } + type State = (Arc<RwLock<VecDeque<u8>>>, Sender<()>, Arc<AtomicBool>); + fn open(state: &mut State, uri: &str) -> State { + debug!("mpv open: {uri}"); + state.clone() + } + fn close(state: Box<State>) { + let _ = state.1.send(()); + debug!("mpv close"); + } + fn read(state: &mut State, buf: &mut [i8]) -> i64 { + let mut i = 0; + debug!("mpv request {} bytes", buf.len()); + // TODO this is horrible + loop { + if state.2.load(Ordering::Relaxed) { + return 0; + } + let mut state = state.0.write().unwrap(); + while let Some(e) = state.pop_front() { + if i >= buf.len() { + break; + } + buf[i] = e as i8; + i += 1; + } + if i != 0 { + break; + } + drop(state); + thread::sleep(Duration::from_millis(10)); + } + debug!("mpv read {i} bytes"); + i as i64 + } + + let mpv = libmpv::Mpv::new().unwrap(); + File::create("/tmp/keks-meet-temp.conf") + .unwrap() + .write_all(include_bytes!("../mpv.conf")) + .unwrap(); + mpv.load_config("/tmp/keks-meet-temp.conf").unwrap(); + + let proto = unsafe { + libmpv::protocol::Protocol::new( + "keks-meet-track".to_owned(), + (buffer, exit_tx.clone(), has_exited.clone()), + open, + close, + read, + None, + None, + ) + }; + let proto_ctx = mpv.create_protocol_context(); + proto_ctx.register(proto).unwrap(); + mpv.playlist_load_files(&[( + &format!("keks-meet-track://{}", rid), + libmpv::FileState::AppendPlay, + None, + )]) + .unwrap(); + + block_on(track.onmute(move || { + debug!("mute"); + let _ = exit_tx.send(()); + Box::pin(async move {}) + })); + exit_rx.recv().unwrap(); + has_exited.store(true, Ordering::Relaxed); + block_on(peer.request_stop_resource(rid)) +} + +async fn track_to_raw( + track: Arc<TrackRemote>, + target: Arc<RwLock<VecDeque<u8>>>, +) -> anyhow::Result<()> { + let mut cached_packet = H264Packet::default(); + loop { + let (packet, _) = track.read_rtp().await?; + if !packet.payload.is_empty() { + let raw_payload = cached_packet.depacketize(&packet.payload)?; + if raw_payload.len() != 0 { + debug!("writing {} bytes", raw_payload.len()); + + let mut target = target.write().unwrap(); + if target.len() < 10_000_000 { + target.extend(raw_payload.into_iter()); + } else { + warn!("buffer is getting too big, dropping some data"); + } + } + } + } +} |