From beab67b2e38808f91ecd390190d527fc7db5499d Mon Sep 17 00:00:00 2001 From: metamuffin Date: Sun, 23 Feb 2025 12:14:27 +0100 Subject: call receive tool --- tools/src/main.rs | 259 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 259 insertions(+) create mode 100644 tools/src/main.rs (limited to 'tools/src/main.rs') diff --git a/tools/src/main.rs b/tools/src/main.rs new file mode 100644 index 0000000..c24f366 --- /dev/null +++ b/tools/src/main.rs @@ -0,0 +1,259 @@ +#![feature(random)] +use anyhow::Result; +use log::{info, warn}; +use rtp::{ + rtcp::{Application, ReceiverReport, RtcpPacket, RtcpPart, SdesType, SourceDescription}, + rtp::SSRC, +}; +use sdp::{MediaDescription, SessionDescription, TimeDescription}; +use sip::{ + encoding::{ + headermap::HeaderMap, + headers::{ + Allow, Authorization, CallID, Contact, ContentType, From, MaxForwards, To, UserAgent, + Via, + }, + method::Method, + request::Request, + response::Response, + status::Status, + uri::Uri, + }, + transaction::TransactionUser, + transport::{InetTransport, tcp::TcpTransport, udp::UdpTransport}, +}; +use std::{ + env::args, + net::{SocketAddr, ToSocketAddrs}, + random::random, + str::FromStr, + sync::Arc, + time::{Duration, SystemTime}, +}; +use tokio::{ + net::{TcpStream, UdpSocket}, + signal::ctrl_c, + spawn, + time::sleep, +}; + +#[tokio::main] +async fn main() -> Result<()> { + env_logger::init_from_env("LOG"); + + let mut args = args().skip(1); + let tport = args.next().unwrap(); + let to_addr = args.next().unwrap(); + let username = args.next().unwrap(); + let password = args.next().unwrap(); + // let callee = args.next().unwrap(); // TODO support calling + + let (transport, local_addr) = match tport.as_str() { + "tcp" => { + let sock = TcpStream::connect((to_addr.as_str(), 5060)).await?; + let local_addr = sock.local_addr()?; + let transport = TcpTransport::new(sock).await?; + (InetTransport::Tcp(transport), local_addr) + } + "udp" => { + let sock = UdpSocket::bind("0.0.0.0:0").await?; + sock.connect((to_addr.as_str(), 5060)).await?; + let local_addr = sock.local_addr()?; + let transport = UdpTransport::new(sock).await?; + (InetTransport::Udp(transport), local_addr) + } + _ => unreachable!(), + }; + let tu = Arc::new(TransactionUser::new(transport)); + + { + let tu = tu.clone(); + spawn(async move { + loop { + if let Err(e) = tu.process_incoming().await { + warn!("error processing responses: {e}"); + } + } + }); + } + + let branch = hex::encode([(); 12].map(|()| random::())); + let call_id = hex::encode([(); 12].map(|()| random::())); + let tag = hex::encode([(); 12].map(|()| random::())); + let ctag = hex::encode([(); 8].map(|()| random::())); + + let self_contact = Contact { + display_name: None, + uri: Uri::from_str(&format!("sip:{username}-{ctag}@{local_addr}"))?, + params: ";expires=3600".to_string(), + }; + + let mut req = Request { + method: Method::Register, + uri: Uri::from_str(&format!("sip:{to_addr}:5060;transport={tport}"))?, + headers: HeaderMap::new() + .add(Via(format!( + "SIP/2.0/{} {local_addr};branch={branch};rport", + tport.to_uppercase() + ))) + .add(MaxForwards(70)) + .add(To(format!(""))) + .add(From(format!(";tag={tag}"))) + .add(CallID(call_id.clone())) + .add(UserAgent("sip-rs test tool".to_string())) + .add(self_contact.clone()) + .add(Allow( + "INVITE,ACK,BYE,CANCEL,OPTIONS,NOTIFY,INFO,MESSAGE,UPDATE,REFER".to_string(), + )), + body: String::new(), + }; + + let fail_resp = tu.transact(req.clone()).await?.recv().await.unwrap(); + req.headers.insert(Authorization::construct( + &req, &fail_resp, &username, &password, + )?); + let _resp = tu.transact(req.clone()).await?.recv().await.unwrap(); + + loop { + let req = tu.process_incoming().await?; + if req.method == Method::Invite { + handle_call(req, tu, local_addr, &username, &self_contact).await?; + break; + } + } + + ctrl_c().await?; + Ok(()) +} + +async fn handle_call( + req: Request, + tu: Arc>, + local_addr: SocketAddr, + username: &str, + self_contact: &Contact, +) -> Result<()> { + let remote_sd = SessionDescription::from_str(&req.body)?; + eprintln!("{remote_sd:#?}"); + + let remote_media_addr = { + let a = remote_sd.connection_information.unwrap(); + let addr = a.split(" ").nth(2).unwrap(); + let port = remote_sd.media_descriptions[0] + .name + .split(" ") + .nth(1) + .unwrap() + .parse::() + .unwrap(); + (addr, port).to_socket_addrs()?.next().unwrap() + }; + let remote_control_addr = SocketAddr::new(remote_media_addr.ip(), remote_media_addr.port() + 1); + + let rtp_sock = UdpSocket::bind("0.0.0.0:6002").await?; + rtp_sock.connect(remote_media_addr).await?; + let rtcp_sock = UdpSocket::bind("0.0.0.0:6003").await?; + rtcp_sock.connect(remote_control_addr).await?; + + let own_ssrc = SSRC(random()); + + let sess_id = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap() + .as_secs(); + let sess_ver = 321113608; + + let media_port = rtp_sock.local_addr().unwrap().port(); + let local_sd = SessionDescription { + version: format!("0"), + originator: format!("- {sess_id} {sess_ver} IN IP4 {}", local_addr.ip()), + session_name: format!("-"), + connection_information: Some(format!("IN IP4 {}", local_addr.ip())), + time_descriptions: vec![TimeDescription { + time: "0 0".to_string(), + repeat_times: vec![], + }], + media_descriptions: vec![MediaDescription { + name: format!("audio {media_port} RTP/AVP 0 8 101"), + attributes: vec![ + "rtpmap:8 PCMA/8000".to_string(), + "rtpmap:0 PCMU/8000".to_string(), + "rtpmap:101 telephone-event/8000".to_string(), + "fmtp:101 0-15".to_string(), + "sendrecv".to_string(), + format!("ssrc:{} cname:blub", own_ssrc.0), + "minptime:20".to_string(), + "ptime:20".to_string(), + "label:1".to_string(), + ], + ..Default::default() + }], + ..Default::default() + }; + + info!("own sd: {local_sd}"); + + let resp = Response { + status: Status::Ok, // answering + headers: HeaderMap::new() + .add(req.headers.get::().unwrap()?) + .add(req.headers.get::().unwrap()?) + .add(req.headers.get::().unwrap()?) + .add(ContentType("application/sdp".to_string())) + .add(self_contact.to_owned()) + .add(Allow( + "INVITE,ACK,BYE,CANCEL,OPTIONS,NOTIFY,INFO,MESSAGE,UPDATE,REFER".to_string(), + )), + body: local_sd.to_string(), + }; + + tu.respond(&req, Response { + status: Status::Ringing, + body: String::new(), + headers: resp.headers.clone(), + }) + .await?; + + sleep(Duration::from_secs(1)).await; + + tu.respond(&req, resp).await?; + + let mut buf = vec![]; + RtcpPacket { + parts: vec![ + RtcpPart::ReceiverReport(ReceiverReport { + sender_ssrc: own_ssrc, + reports: vec![], + }), + RtcpPart::SourceDescription(SourceDescription { + sources: vec![(own_ssrc, vec![( + SdesType::CanonicalName, + format!("sip:{username}@{local_addr}"), + )])], + }), + RtcpPart::Application(Application { + ssrc: own_ssrc, + subtype: 0, + name: [b'P', b'I', b'N', b'G'], + data: b"PONG", + }), + ], + } + .write(&mut buf); + rtcp_sock.send(&buf).await?; + + let mut buf = [0u8; 16_000]; + for _ in 0.. { + let size = rtp_sock.recv(&mut buf).await?; + let packet = &buf[..size]; + eprintln!("{packet:?}"); + } + + // let fail_resp = tu.transact(req.clone()).await?.recv().await.unwrap(); + // req.headers.insert(Authorization::construct( + // &req, &fail_resp, username, &username, + // )?); + // let mut resp = tu.transact(req.clone()).await?; + + Ok(()) +} -- cgit v1.2.3-70-g09d2