diff options
-rw-r--r-- | Cargo.lock | 26 | ||||
-rw-r--r-- | Cargo.toml | 3 | ||||
-rw-r--r-- | readme.md | 8 | ||||
-rw-r--r-- | sip/examples/server.rs | 19 | ||||
-rw-r--r-- | sip/src/encoding/headers.rs | 2 | ||||
-rw-r--r-- | sip/src/transport/mod.rs | 21 | ||||
-rw-r--r-- | tools/Cargo.toml | 15 | ||||
-rw-r--r-- | tools/src/main.rs | 259 |
8 files changed, 329 insertions, 24 deletions
@@ -77,9 +77,9 @@ dependencies = [ [[package]] name = "anyhow" -version = "1.0.93" +version = "1.0.96" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4c95c10ba0b00a02636238b814946408b1322d5ac4760326e6fb8ec956d85775" +checksum = "6b964d184e89d9b6b67dd2715bc8e74cf3107fb2b529990c90cf517326150bf4" [[package]] name = "autocfg" @@ -150,9 +150,9 @@ dependencies = [ [[package]] name = "env_logger" -version = "0.11.5" +version = "0.11.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e13fa619b91fb2381732789fc5de83b45675e882f66623b7d8cb4f643017018d" +checksum = "dcaee3d8e3cfc3fd92428d477bc97fc29ec8716d180c0d74c643bb26166660e0" dependencies = [ "anstream", "anstyle", @@ -220,9 +220,9 @@ dependencies = [ [[package]] name = "log" -version = "0.4.22" +version = "0.4.26" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a7a70ba024b9dc04c27ea2f0c0548feb474ec5c54bba33a7f72f873a39d07b24" +checksum = "30bde2b3dc3671ae49d8e2e9f044c7c005836e7a023ee57cffa25ab82764bb9e" [[package]] name = "md5" @@ -520,6 +520,20 @@ dependencies = [ ] [[package]] +name = "tools" +version = "0.1.0" +dependencies = [ + "anyhow", + "env_logger", + "hex", + "log", + "rtp", + "sdp", + "sip", + "tokio", +] + +[[package]] name = "unicode-ident" version = "1.0.13" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1,7 +1,8 @@ [workspace] resolver = "2" -members = ["rtp", "sdp", "sip"] +members = ["rtp", "sdp", "sip", "tools"] [workspace.dependencies] sdp = { path = "sdp" } rtp = { path = "rtp" } +sip = { path = "sip" } @@ -1,6 +1,6 @@ # Multimedia Streaming Libraries for Rust -Current implementation target: +## Feature target - RTP ([RFC3550](https://www.rfc-editor.org/rfc/rfc3550#section-5)) - Packet types, parsers and serializers @@ -14,5 +14,11 @@ Current implementation target: - TURN - maybe DTLS +## Current state + +Mostly non-conformant SIP implementation with some parts it RTP/RTCP +implemented. + ## License + AGPL-3.0-only; See [COPYING](./COPYING) diff --git a/sip/examples/server.rs b/sip/examples/server.rs index 02c06b4..859c35e 100644 --- a/sip/examples/server.rs +++ b/sip/examples/server.rs @@ -47,7 +47,10 @@ async fn handle_client(stream: TcpStream, addr: SocketAddr) -> Result<()> { let to: To = req.headers.get_res()?; let via: Via = req.headers.get_res()?; let contact: Contact = req.headers.get_res()?; - info!("Registered {}", contact.uri.localpart.as_ref().unwrap()); + info!( + "({addr}) Registered {}", + contact.uri.localpart.as_ref().unwrap() + ); tu.respond( &req, @@ -67,17 +70,3 @@ async fn handle_client(stream: TcpStream, addr: SocketAddr) -> Result<()> { if req.method == Method::Invite {} } } - -/* -[2024-07-05T22:22:40Z DEBUG sip::transport::udp] SIP/2.0 200 OK - Via: SIP/2.0/UDP 198.18.1.135:52125;branch=Uz7r7ysvrS91q9j9;rport=52125 - Contact: <sip:metatest-0x5e11f89f7900@198.18.1.135:40708>;expires=379;+sip.instance="<urn:uuid:9b2421ef-43ed-3b3b-9181-607ba5c13fb9>" - Contact: <sip:metatest-0x5e11f89f7900@198.18.1.135:47412>;expires=411 - Contact: <sip:metatest-0x5e11f89f7900@198.18.1.135:52125>;expires=600 - To: <sip:metatest@198.18.0.220>;tag=81756f4e - From: <sip:metatest@198.18.0.220>;tag=-AK4OkphTFVZv50h - Call-ID: U3Fyb6vT1BhWxiKG - CSeq: 1 REGISTER - User-Agent: AGFEO SIP V3.00.15 n (MAC=00094070069C) - Content-Length: 0 -*/ diff --git a/sip/src/encoding/headers.rs b/sip/src/encoding/headers.rs index e880739..30587c7 100644 --- a/sip/src/encoding/headers.rs +++ b/sip/src/encoding/headers.rs @@ -140,7 +140,7 @@ pub fn unquote(v: &str) -> Result<String> { .to_string()) } -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct Contact { pub display_name: Option<String>, pub uri: Uri, diff --git a/sip/src/transport/mod.rs b/sip/src/transport/mod.rs index b4c512b..b9cdaeb 100644 --- a/sip/src/transport/mod.rs +++ b/sip/src/transport/mod.rs @@ -1,5 +1,7 @@ use crate::encoding::Message; use anyhow::Result; +use tcp::TcpTransport; +use udp::UdpTransport; pub mod tcp; pub mod udp; @@ -9,3 +11,22 @@ pub trait Transport { async fn recv(&self) -> Result<Message>; async fn send(&self, message: Message) -> Result<()>; } + +pub enum InetTransport { + Udp(UdpTransport), + Tcp(TcpTransport), +} +impl Transport for InetTransport { + async fn recv(&self) -> Result<Message> { + match self { + InetTransport::Udp(t) => t.recv().await, + InetTransport::Tcp(t) => t.recv().await, + } + } + async fn send(&self, message: Message) -> Result<()> { + match self { + InetTransport::Udp(t) => t.send(message).await, + InetTransport::Tcp(t) => t.send(message).await, + } + } +} diff --git a/tools/Cargo.toml b/tools/Cargo.toml new file mode 100644 index 0000000..8f0f5f8 --- /dev/null +++ b/tools/Cargo.toml @@ -0,0 +1,15 @@ +[package] +name = "tools" +version = "0.1.0" +edition = "2024" + +[dependencies] +hex = "0.4.3" +env_logger = "0.11.6" +log = "0.4.26" +anyhow = "1.0.96" +tokio = { version = "1.38.0", features = ["full"] } + +sip = { workspace = true } +sdp = { workspace = true } +rtp = { workspace = true } diff --git a/tools/src/main.rs b/tools/src/main.rs new file mode 100644 index 0000000..c24f366 --- /dev/null +++ b/tools/src/main.rs @@ -0,0 +1,259 @@ +#![feature(random)] +use anyhow::Result; +use log::{info, warn}; +use rtp::{ + rtcp::{Application, ReceiverReport, RtcpPacket, RtcpPart, SdesType, SourceDescription}, + rtp::SSRC, +}; +use sdp::{MediaDescription, SessionDescription, TimeDescription}; +use sip::{ + encoding::{ + headermap::HeaderMap, + headers::{ + Allow, Authorization, CallID, Contact, ContentType, From, MaxForwards, To, UserAgent, + Via, + }, + method::Method, + request::Request, + response::Response, + status::Status, + uri::Uri, + }, + transaction::TransactionUser, + transport::{InetTransport, tcp::TcpTransport, udp::UdpTransport}, +}; +use std::{ + env::args, + net::{SocketAddr, ToSocketAddrs}, + random::random, + str::FromStr, + sync::Arc, + time::{Duration, SystemTime}, +}; +use tokio::{ + net::{TcpStream, UdpSocket}, + signal::ctrl_c, + spawn, + time::sleep, +}; + +#[tokio::main] +async fn main() -> Result<()> { + env_logger::init_from_env("LOG"); + + let mut args = args().skip(1); + let tport = args.next().unwrap(); + let to_addr = args.next().unwrap(); + let username = args.next().unwrap(); + let password = args.next().unwrap(); + // let callee = args.next().unwrap(); // TODO support calling + + let (transport, local_addr) = match tport.as_str() { + "tcp" => { + let sock = TcpStream::connect((to_addr.as_str(), 5060)).await?; + let local_addr = sock.local_addr()?; + let transport = TcpTransport::new(sock).await?; + (InetTransport::Tcp(transport), local_addr) + } + "udp" => { + let sock = UdpSocket::bind("0.0.0.0:0").await?; + sock.connect((to_addr.as_str(), 5060)).await?; + let local_addr = sock.local_addr()?; + let transport = UdpTransport::new(sock).await?; + (InetTransport::Udp(transport), local_addr) + } + _ => unreachable!(), + }; + let tu = Arc::new(TransactionUser::new(transport)); + + { + let tu = tu.clone(); + spawn(async move { + loop { + if let Err(e) = tu.process_incoming().await { + warn!("error processing responses: {e}"); + } + } + }); + } + + let branch = hex::encode([(); 12].map(|()| random::<u8>())); + let call_id = hex::encode([(); 12].map(|()| random::<u8>())); + let tag = hex::encode([(); 12].map(|()| random::<u8>())); + let ctag = hex::encode([(); 8].map(|()| random::<u8>())); + + let self_contact = Contact { + display_name: None, + uri: Uri::from_str(&format!("sip:{username}-{ctag}@{local_addr}"))?, + params: ";expires=3600".to_string(), + }; + + let mut req = Request { + method: Method::Register, + uri: Uri::from_str(&format!("sip:{to_addr}:5060;transport={tport}"))?, + headers: HeaderMap::new() + .add(Via(format!( + "SIP/2.0/{} {local_addr};branch={branch};rport", + tport.to_uppercase() + ))) + .add(MaxForwards(70)) + .add(To(format!("<sip:{username}@{to_addr}:5060>"))) + .add(From(format!("<sip:{username}@{to_addr}:5060>;tag={tag}"))) + .add(CallID(call_id.clone())) + .add(UserAgent("sip-rs test tool".to_string())) + .add(self_contact.clone()) + .add(Allow( + "INVITE,ACK,BYE,CANCEL,OPTIONS,NOTIFY,INFO,MESSAGE,UPDATE,REFER".to_string(), + )), + body: String::new(), + }; + + let fail_resp = tu.transact(req.clone()).await?.recv().await.unwrap(); + req.headers.insert(Authorization::construct( + &req, &fail_resp, &username, &password, + )?); + let _resp = tu.transact(req.clone()).await?.recv().await.unwrap(); + + loop { + let req = tu.process_incoming().await?; + if req.method == Method::Invite { + handle_call(req, tu, local_addr, &username, &self_contact).await?; + break; + } + } + + ctrl_c().await?; + Ok(()) +} + +async fn handle_call( + req: Request, + tu: Arc<TransactionUser<InetTransport>>, + local_addr: SocketAddr, + username: &str, + self_contact: &Contact, +) -> Result<()> { + let remote_sd = SessionDescription::from_str(&req.body)?; + eprintln!("{remote_sd:#?}"); + + let remote_media_addr = { + let a = remote_sd.connection_information.unwrap(); + let addr = a.split(" ").nth(2).unwrap(); + let port = remote_sd.media_descriptions[0] + .name + .split(" ") + .nth(1) + .unwrap() + .parse::<u16>() + .unwrap(); + (addr, port).to_socket_addrs()?.next().unwrap() + }; + let remote_control_addr = SocketAddr::new(remote_media_addr.ip(), remote_media_addr.port() + 1); + + let rtp_sock = UdpSocket::bind("0.0.0.0:6002").await?; + rtp_sock.connect(remote_media_addr).await?; + let rtcp_sock = UdpSocket::bind("0.0.0.0:6003").await?; + rtcp_sock.connect(remote_control_addr).await?; + + let own_ssrc = SSRC(random()); + + let sess_id = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap() + .as_secs(); + let sess_ver = 321113608; + + let media_port = rtp_sock.local_addr().unwrap().port(); + let local_sd = SessionDescription { + version: format!("0"), + originator: format!("- {sess_id} {sess_ver} IN IP4 {}", local_addr.ip()), + session_name: format!("-"), + connection_information: Some(format!("IN IP4 {}", local_addr.ip())), + time_descriptions: vec![TimeDescription { + time: "0 0".to_string(), + repeat_times: vec![], + }], + media_descriptions: vec![MediaDescription { + name: format!("audio {media_port} RTP/AVP 0 8 101"), + attributes: vec![ + "rtpmap:8 PCMA/8000".to_string(), + "rtpmap:0 PCMU/8000".to_string(), + "rtpmap:101 telephone-event/8000".to_string(), + "fmtp:101 0-15".to_string(), + "sendrecv".to_string(), + format!("ssrc:{} cname:blub", own_ssrc.0), + "minptime:20".to_string(), + "ptime:20".to_string(), + "label:1".to_string(), + ], + ..Default::default() + }], + ..Default::default() + }; + + info!("own sd: {local_sd}"); + + let resp = Response { + status: Status::Ok, // answering + headers: HeaderMap::new() + .add(req.headers.get::<Via>().unwrap()?) + .add(req.headers.get::<From>().unwrap()?) + .add(req.headers.get::<To>().unwrap()?) + .add(ContentType("application/sdp".to_string())) + .add(self_contact.to_owned()) + .add(Allow( + "INVITE,ACK,BYE,CANCEL,OPTIONS,NOTIFY,INFO,MESSAGE,UPDATE,REFER".to_string(), + )), + body: local_sd.to_string(), + }; + + tu.respond(&req, Response { + status: Status::Ringing, + body: String::new(), + headers: resp.headers.clone(), + }) + .await?; + + sleep(Duration::from_secs(1)).await; + + tu.respond(&req, resp).await?; + + let mut buf = vec![]; + RtcpPacket { + parts: vec![ + RtcpPart::ReceiverReport(ReceiverReport { + sender_ssrc: own_ssrc, + reports: vec![], + }), + RtcpPart::SourceDescription(SourceDescription { + sources: vec![(own_ssrc, vec![( + SdesType::CanonicalName, + format!("sip:{username}@{local_addr}"), + )])], + }), + RtcpPart::Application(Application { + ssrc: own_ssrc, + subtype: 0, + name: [b'P', b'I', b'N', b'G'], + data: b"PONG", + }), + ], + } + .write(&mut buf); + rtcp_sock.send(&buf).await?; + + let mut buf = [0u8; 16_000]; + for _ in 0.. { + let size = rtp_sock.recv(&mut buf).await?; + let packet = &buf[..size]; + eprintln!("{packet:?}"); + } + + // let fail_resp = tu.transact(req.clone()).await?.recv().await.unwrap(); + // req.headers.insert(Authorization::construct( + // &req, &fail_resp, username, &username, + // )?); + // let mut resp = tu.transact(req.clone()).await?; + + Ok(()) +} |