diff options
author | metamuffin <metamuffin@disroot.org> | 2024-03-18 09:51:38 +0100 |
---|---|---|
committer | metamuffin <metamuffin@disroot.org> | 2024-03-18 09:51:38 +0100 |
commit | d3e3cfdaa53ae354b1de4f540426af3e6af8096d (patch) | |
tree | 20fc2addda444373f4c50377a4179503883d3cee /client-native-rift/src/main.rs | |
parent | c75b423e461010c722503fb9792f84ebc415c7d8 (diff) | |
download | keks-meet-d3e3cfdaa53ae354b1de4f540426af3e6af8096d.tar keks-meet-d3e3cfdaa53ae354b1de4f540426af3e6af8096d.tar.bz2 keks-meet-d3e3cfdaa53ae354b1de4f540426af3e6af8096d.tar.zst |
reworking rift: part two
Diffstat (limited to 'client-native-rift/src/main.rs')
-rw-r--r-- | client-native-rift/src/main.rs | 244 |
1 files changed, 93 insertions, 151 deletions
diff --git a/client-native-rift/src/main.rs b/client-native-rift/src/main.rs index 57a4947..c44f8c2 100644 --- a/client-native-rift/src/main.rs +++ b/client-native-rift/src/main.rs @@ -8,18 +8,22 @@ pub mod file; pub mod port; use clap::{ColorChoice, Parser}; -use file::FileSender; +use file::{DownloadHandler, FileSender}; use libkeks::{ instance::Instance, peer::{Peer, TransportChannel}, protocol::ProvideInfo, + webrtc::data_channel::RTCDataChannel, Config, DynFut, EventHandler, }; -use log::{info, warn}; -use port::PortExposer; +use log::{error, info, warn}; +use port::{ForwardHandler, PortExposer}; use rustyline::{error::ReadlineError, DefaultEditor}; -use std::{collections::HashMap, os::unix::prelude::MetadataExt, path::PathBuf, sync::Arc}; -use tokio::{fs, sync::RwLock}; +use std::{ + collections::HashMap, future::Future, os::unix::prelude::MetadataExt, path::PathBuf, pin::Pin, + sync::Arc, +}; +use tokio::{fs, net::TcpListener, sync::RwLock}; use users::get_current_username; fn main() { @@ -67,6 +71,22 @@ pub enum Command { Forward { id: String, port: Option<u16> }, } +struct State { + requested: HashMap<String, Box<dyn RequestHandler>>, +} +pub trait RequestHandler: Send + Sync + 'static { + fn on_connect( + &self, + resource: ProvideInfo, + channel: Arc<RTCDataChannel>, + ) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + Sync>>; +} + +#[derive(Clone)] +struct Handler { + state: Arc<RwLock<State>>, +} + fn get_username() -> String { get_current_username() .map(|u| u.to_str().unwrap().to_string()) @@ -141,6 +161,11 @@ async fn run() -> anyhow::Result<()> { for (rid, r) in p.remote_provided.read().await.iter() { if rid == &id { if r.kind == "file" { + state + .write() + .await + .requested + .insert(id.clone(), Box::new(DownloadHandler { path })); p.request_resource(id).await; } else { warn!("not a file"); @@ -163,10 +188,52 @@ async fn run() -> anyhow::Result<()> { })) .await; } - Command::Forward { id, port } => {} + Command::Forward { id, port } => { + let peers = inst.peers.read().await; + 'outer: for peer in peers.values() { + for (rid, r) in peer.remote_provided.read().await.iter() { + if rid == &id { + if r.kind == "port" { + let peer = peer.to_owned(); + let state = state.clone(); + tokio::task::spawn(async move { + let Ok(listener) = + TcpListener::bind(("127.0.0.1", port.unwrap_or(0))) + .await + else { + error!("cannot bind tcp listener"); + return; + }; + info!( + "tcp listener bound to {}", + listener.local_addr().unwrap() + ); + while let Ok((stream, addr)) = listener.accept().await { + info!("new connection from {addr:?}"); + state.write().await.requested.insert( + id.clone(), + Box::new(ForwardHandler { + stream: Arc::new(RwLock::new(Some(stream))), + }), + ); + peer.request_resource(id.clone()).await; + } + }); + } else { + warn!("not a port"); + } + break 'outer; + } + } + } + } }, Err(err) => err.print().unwrap(), }, + Err(ReadlineError::Eof) => { + info!("exit"); + break; + } Err(ReadlineError::Interrupted) => { info!("interrupted; exiting..."); break; @@ -175,64 +242,25 @@ async fn run() -> anyhow::Result<()> { } } - // match &args.action { - // Action::Send { filename } => { - // inst.add_local_resource(Box::new(FileSender { - // info: ProvideInfo { - // id: "the-file".to_string(), // we only share a single file so its fine - // kind: "file".to_string(), - // track_kind: None, - // label: Some(filename.clone().unwrap_or("stdin".to_string())), - // size: if let Some(filename) = &filename { - // Some(fs::metadata(filename).await.unwrap().size() as usize) - // } else { - // None - // }, - // }, - // reader_factory: args.action, - // })) - // .await; - // } - // _ => (), - // } Ok(()) } -struct State { - requested: HashMap<String, Box<dyn RequestHandler>>, -} -pub trait RequestHandler: Send + Sync + 'static { - -} - -#[derive(Clone)] -struct Handler { - state: Arc<RwLock<State>>, -} - impl EventHandler for Handler { fn peer_join(&self, _peer: Arc<Peer>) -> libkeks::DynFut<()> { Box::pin(async move {}) } - fn peer_leave(&self, _peer: Arc<Peer>) -> libkeks::DynFut<()> { Box::pin(async move {}) } - fn resource_added(&self, peer: Arc<Peer>, info: libkeks::protocol::ProvideInfo) -> DynFut<()> { - let id = info.id.clone(); - Box::pin(async move { - // match &args.action { - // Action::Receive { .. } => { - // if info.kind == "file" { - // peer.request_resource(id).await; - // } - // } - // _ => (), - // } - }) + fn resource_added( + &self, + _peer: Arc<Peer>, + _info: libkeks::protocol::ProvideInfo, + ) -> DynFut<()> { + Box::pin(async move {}) } fn resource_removed(&self, _peer: Arc<Peer>, _id: String) -> DynFut<()> { - Box::pin(async {}) + Box::pin(async move {}) } fn resource_connected( @@ -242,106 +270,20 @@ impl EventHandler for Handler { channel: TransportChannel, ) -> libkeks::DynFut<()> { let resource = resource.clone(); - let s = self.clone(); + let k = self.clone(); Box::pin(async move { - // match channel { - // TransportChannel::Track(_) => warn!("wrong type"), - // TransportChannel::DataChannel(dc) => { - // if resource.kind != "file" { - // return error!("we got a non-file resource for some reason…"); - // } - // let pos = Arc::new(AtomicUsize::new(0)); - // let writer: Arc<RwLock<Option<Pin<Box<dyn AsyncWrite + Send + Sync>>>>> = - // Arc::new(RwLock::new(None)); - // { - // let writer = writer.clone(); - // let s = s.clone(); - // dc.on_open(Box::new(move || { - // let s = s.clone(); - // let writer = writer.clone(); - // Box::pin(async move { - // info!("channel opened"); - // *writer.write().await = Some(s.args.action.create_writer().await) - // }) - // })); - // } - // { - // let writer = writer.clone(); - // dc.on_close(Box::new(move || { - // let writer = writer.clone(); - // Box::pin(async move { - // info!("channel closed"); - // *writer.write().await = None; - // exit(0); - // }) - // })); - // } - // { - // let writer = writer.clone(); - // dc.on_message(Box::new(move |mesg| { - // let writer = writer.clone(); - // let pos = pos.clone(); - // Box::pin(async move { - // // TODO - // if mesg.is_string { - // let s = String::from_utf8((&mesg.data).to_vec()).unwrap(); - // if &s == "end" { - // info!("EOF reached") - // } - // } else { - // let pos = pos.fetch_add(mesg.data.len(), Ordering::Relaxed); - // info!( - // "recv {:?} ({} of {})", - // mesg.data.len(), - // humansize::format_size(pos, DECIMAL), - // humansize::format_size(resource.size.unwrap_or(0), DECIMAL), - // ); - // writer - // .write() - // .await - // .as_mut() - // .unwrap() - // .write_all(&mesg.data) - // .await - // .unwrap(); - // } - // }) - // })) - // } - // dc.on_error(Box::new(move |err| { - // Box::pin(async move { - // error!("data channel errored: {err}"); - // }) - // })); - // } - // } + if let Some(handler) = k.state.write().await.requested.get(&resource.id) { + match channel { + TransportChannel::Track(_) => warn!("wrong type"), + TransportChannel::DataChannel(channel) => { + if let Err(e) = handler.on_connect(resource, channel).await { + warn!("request handler error: {e}"); + } + } + } + } else { + warn!("got {:?}, which was not requested", resource.id); + } }) } } - -// impl Action { -// pub async fn create_writer(&self) -> Pin<Box<dyn AsyncWrite + Send + Sync + 'static>> { -// match self { -// Action::Receive { filename } => { -// if let Some(filename) = filename { -// Box::pin(File::create(filename).await.unwrap()) -// } else { -// Box::pin(stdout()) -// } -// } -// _ => unreachable!(), -// } -// } -// pub async fn create_reader(&self) -> Pin<Box<dyn AsyncRead + Send + Sync + 'static>> { -// match self { -// Action::Send { filename } => { -// if let Some(filename) = filename { -// Box::pin(File::open(filename).await.unwrap()) -// } else { -// Box::pin(stdin()) -// } -// } -// _ => unreachable!(), -// } -// } -// } |