diff options
-rw-r--r-- | rtp/src/lib.rs | 78 | ||||
-rw-r--r-- | rtp/src/rtcp.rs | 152 | ||||
-rw-r--r-- | rtp/src/rtp.rs | 18 |
3 files changed, 227 insertions, 21 deletions
diff --git a/rtp/src/lib.rs b/rtp/src/lib.rs index 8c87af8..0c80412 100644 --- a/rtp/src/lib.rs +++ b/rtp/src/lib.rs @@ -1,2 +1,78 @@ -pub mod rtp; +#![feature(random)] +use std::{ + collections::HashSet, + random::random, + time::{Duration, Instant}, +}; + +use rtcp::RtcpPacket; +use rtp::SSRC; + pub mod rtcp; +pub mod rtp; + +pub struct Session { + tp: Instant, + tc: Instant, + tn: Instant, + pmembers: usize, + members: usize, + senders: usize, + rtcp_bw: f32, + we_sent: bool, + avg_rtcp_size: f32, + initial: bool, + + member_table: HashSet<SSRC>, +} + +impl Session { + /// RFC 3550, Section 6.3.1 + pub fn compute_rtcp_transmission_interval(&self) -> Duration { + let n; + let c; + if self.senders * 4 < self.members { + if self.we_sent { + // Senders use a quarter of the bandwidth + c = self.avg_rtcp_size / (self.rtcp_bw * 0.25); + n = self.senders; + } else { + // Receivers use the remaining three quarters + c = self.avg_rtcp_size / (self.rtcp_bw * 0.75); + n = self.members - self.senders + } + } else { + // Equally split between members + c = self.avg_rtcp_size / self.rtcp_bw; + n = self.members; + } + + let tmin = if self.initial { 2.5f32 } else { 5. }; + let td = tmin.max(n as f32 * c); + let t = td * ((random::<u16>() as f32) / (u16::MAX as f32) + 0.5); + let t = t / 1.5f32.exp(); + Duration::from_secs_f32(t) + } + + /// RFC 3550, Section 6.3.2 + pub fn new() -> Self { + let mut se = Self { + tp: Instant::now(), + tc: Instant::now(), + tn: Instant::now(), + senders: 0, + pmembers: 1, + members: 1, + we_sent: false, + initial: true, + member_table: HashSet::new(), // TODO add own ssrc to members + rtcp_bw: 1000., // TODO set rtcp bandwidth from params + avg_rtcp_size: 1000., // TODO set to next likely rtcp packet size + }; + se.tn += se.compute_rtcp_transmission_interval(); + se + } + + /// RFC 3550, Section 6.3.3 + pub fn on_receive(&mut self, packet: RtcpPacket) {} +} diff --git a/rtp/src/rtcp.rs b/rtp/src/rtcp.rs index 0ae8ea7..da0a683 100644 --- a/rtp/src/rtcp.rs +++ b/rtp/src/rtcp.rs @@ -1,26 +1,158 @@ +use crate::rtp::SSRC; + #[derive(Debug, thiserror::Error)] pub enum Error { #[error("packet truncated")] Truncated, + #[error("unsupported version")] + Version, } pub struct RtcpPacket<'a> { - a: &'a [u8], + parts: Vec<RtcpPart<'a>>, +} + +pub enum RtcpPart<'a> { + SenderReport(SenderReport<'a>), + ReceiverReport(ReceiverReport), + SourceDescription(SourceDescription), + Bye(Bye), + Application(Application<'a>), } -pub enum RtcpPart { - SenderReport {}, - ReceiverReport {}, - SourceDescription {}, - Bye {}, - Application {}, +struct SenderReport<'a> { + ssrc: SSRC, + sender_info: SenderInfo, + reports: Vec<ReportBlock>, + extension: &'a [u8], +} +struct ReceiverReport { + sender_ssrc: SSRC, + reports: Vec<ReportBlock>, +} +struct SourceDescription {} +struct Bye {} +struct Application<'a> { + data: &'a [u8], +} +struct SenderInfo { + ntp_ts: u64, + rtp_ts: u32, + packet_count: u32, + octet_count: u32, +} +struct ReportBlock { + ssrc: SSRC, + fraction_lost: u8, + cumulative_packets_lost: u32, + ext_max_seq_num_recv: u32, + interarrical_jitter: u32, + lsr: u32, + dlsr: u32, } impl<'a> RtcpPacket<'a> { - pub fn parse(packet: &'a [u8]) -> Result<RtcpPacket<'a>, Error> { - Ok(Self { a: packet }) + pub fn parse(mut packet: &'a [u8]) -> Result<RtcpPacket<'a>, Error> { + let mut parts = Vec::with_capacity(2); + while packet.len() > 0 { + if packet.len() < 4 { + return Err(Error::Truncated); + } + let version = (packet[0] & 0b11000000) >> 6; + if !matches!(version, 1 | 2) { + return Err(Error::Version); + } + let padding = (packet[0] & 0b00100000) != 0; + let num_reports = (packet[0] & 0b00011111) >> 0; + let packet_type = packet[1]; + let length = u16::from_be_bytes([packet[2], packet[3]]); + + match packet_type { + 200 => { + // Sender Report + if packet.len() < 28 + 24 * num_reports as usize { + return Err(Error::Truncated); + } + let ssrc = SSRC(u32::from_be_bytes(packet[4..8].try_into().unwrap())); + let sender_info = SenderInfo::parse(packet[8..28].try_into().unwrap()); + let mut reports = Vec::with_capacity(num_reports as usize); + for n in 0..num_reports as usize { + reports.push(ReportBlock::parse( + packet[28 + 24 * n..28 + 24 * (n + 1)].try_into().unwrap(), + )); + } + let extension = &packet[28 + 24 * num_reports as usize..]; + parts.push(RtcpPart::SenderReport(SenderReport { + reports, + sender_info, + ssrc, + extension, + })) + } + 201 => { // Receiver Report + } + _ => {} + } + packet = &packet[length as usize..]; + } + Ok(Self { parts }) + } + pub fn write(&self, out: &mut Vec<u8>) { + for part in &self.parts { + let version = 2; + let padding = false; + match part { + RtcpPart::SenderReport(sender_report) => { + out.push( + version << 6 | (padding as u8) << 5 | sender_report.reports.len() as u8, + ); + out.push(200); + + } + RtcpPart::ReceiverReport(receiver_report) => todo!(), + RtcpPart::SourceDescription(source_description) => todo!(), + RtcpPart::Bye(bye) => todo!(), + RtcpPart::Application(application) => todo!(), + } + } + } +} +impl SenderInfo { + pub const SIZE: usize = 5 * 4; + pub fn parse(packet: [u8; 5 * 4]) -> SenderInfo { + Self { + ntp_ts: u64::from_be_bytes(packet[0..8].try_into().unwrap()), + rtp_ts: u32::from_be_bytes(packet[8..12].try_into().unwrap()), + packet_count: u32::from_be_bytes(packet[12..16].try_into().unwrap()), + octet_count: u32::from_be_bytes(packet[16..20].try_into().unwrap()), + } + } + pub fn write(&self, out: &mut Vec<u8>) { + out.extend(self.ntp_ts.to_be_bytes()); + out.extend(self.rtp_ts.to_be_bytes()); + out.extend(self.packet_count.to_be_bytes()); + out.extend(self.octet_count.to_be_bytes()); + } +} +impl ReportBlock { + pub fn parse(packet: [u8; 6 * 4]) -> ReportBlock { + Self { + ssrc: SSRC(u32::from_be_bytes(packet[0..4].try_into().unwrap())), + fraction_lost: packet[4], + cumulative_packets_lost: u32::from_be_bytes([0, packet[5], packet[6], packet[7]]), + ext_max_seq_num_recv: u32::from_be_bytes(packet[8..12].try_into().unwrap()), + interarrical_jitter: u32::from_be_bytes(packet[12..16].try_into().unwrap()), + lsr: u32::from_be_bytes(packet[16..20].try_into().unwrap()), + dlsr: u32::from_be_bytes(packet[20..24].try_into().unwrap()), + } } pub fn write(&self, out: &mut Vec<u8>) { - out.extend(self.a); + out.extend(self.ssrc.0.to_be_bytes()); + out.push(self.fraction_lost); + out.extend(&self.cumulative_packets_lost.to_be_bytes()[1..]); + out.extend(self.ext_max_seq_num_recv.to_be_bytes()); + out.extend(self.interarrical_jitter.to_be_bytes()); + out.extend(self.lsr.to_be_bytes()); + out.extend(self.dlsr.to_be_bytes()); } } diff --git a/rtp/src/rtp.rs b/rtp/src/rtp.rs index a1ed166..581bc40 100644 --- a/rtp/src/rtp.rs +++ b/rtp/src/rtp.rs @@ -8,12 +8,15 @@ pub enum Error { Padding, } +#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq, PartialOrd, Ord)] +pub struct SSRC(pub u32); + pub struct RtpPacket<'a> { marker: bool, payload_type: u8, sequence: u16, timestamp: u32, - ssrc: u32, + ssrc: SSRC, csrc_count: u8, csrcs: [u32; 15], extension: Option<(u16, &'a [u8])>, @@ -37,20 +40,15 @@ impl<'a> RtpPacket<'a> { let marker = packet[1] >> 7 != 0; let payload_type = packet[1] & 0x7f; let sequence = u16::from_be_bytes([packet[2], packet[3]]); - let timestamp = u32::from_be_bytes([packet[4], packet[5], packet[6], packet[7]]); - let ssrc = u32::from_be_bytes([packet[8], packet[9], packet[10], packet[11]]); + let timestamp = u32::from_be_bytes(packet[4..8].try_into().unwrap()); + let ssrc = SSRC(u32::from_be_bytes(packet[8..12].try_into().unwrap())); let mut csrcs = [0u32; 15]; if packet.len() < 12 + csrc_count as usize * 4 { return Err(Error::Truncated); } for n in 0..csrc_count as usize { let off = 12 + n * 4; - csrcs[n] = u32::from_be_bytes([ - packet[off + 0], - packet[off + 1], - packet[off + 2], - packet[off + 3], - ]); + csrcs[n] = u32::from_be_bytes(packet[off..off + 4].try_into().unwrap()); } let mut offset = 12 + csrc_count as usize * 4; @@ -110,7 +108,7 @@ impl<'a> RtpPacket<'a> { out.push((self.payload_type & 0x7f) | ((self.marker as u8) << 7)); out.extend(self.sequence.to_be_bytes()); out.extend(self.timestamp.to_be_bytes()); - out.extend(self.ssrc.to_be_bytes()); + out.extend(self.ssrc.0.to_be_bytes()); out.extend( self.csrcs .into_iter() |