aboutsummaryrefslogtreecommitdiff
path: root/rtp/src
diff options
context:
space:
mode:
Diffstat (limited to 'rtp/src')
-rw-r--r--rtp/src/lib.rs78
-rw-r--r--rtp/src/rtcp.rs152
-rw-r--r--rtp/src/rtp.rs18
3 files changed, 227 insertions, 21 deletions
diff --git a/rtp/src/lib.rs b/rtp/src/lib.rs
index 8c87af8..0c80412 100644
--- a/rtp/src/lib.rs
+++ b/rtp/src/lib.rs
@@ -1,2 +1,78 @@
-pub mod rtp;
+#![feature(random)]
+use std::{
+ collections::HashSet,
+ random::random,
+ time::{Duration, Instant},
+};
+
+use rtcp::RtcpPacket;
+use rtp::SSRC;
+
pub mod rtcp;
+pub mod rtp;
+
+pub struct Session {
+ tp: Instant,
+ tc: Instant,
+ tn: Instant,
+ pmembers: usize,
+ members: usize,
+ senders: usize,
+ rtcp_bw: f32,
+ we_sent: bool,
+ avg_rtcp_size: f32,
+ initial: bool,
+
+ member_table: HashSet<SSRC>,
+}
+
+impl Session {
+ /// RFC 3550, Section 6.3.1
+ pub fn compute_rtcp_transmission_interval(&self) -> Duration {
+ let n;
+ let c;
+ if self.senders * 4 < self.members {
+ if self.we_sent {
+ // Senders use a quarter of the bandwidth
+ c = self.avg_rtcp_size / (self.rtcp_bw * 0.25);
+ n = self.senders;
+ } else {
+ // Receivers use the remaining three quarters
+ c = self.avg_rtcp_size / (self.rtcp_bw * 0.75);
+ n = self.members - self.senders
+ }
+ } else {
+ // Equally split between members
+ c = self.avg_rtcp_size / self.rtcp_bw;
+ n = self.members;
+ }
+
+ let tmin = if self.initial { 2.5f32 } else { 5. };
+ let td = tmin.max(n as f32 * c);
+ let t = td * ((random::<u16>() as f32) / (u16::MAX as f32) + 0.5);
+ let t = t / 1.5f32.exp();
+ Duration::from_secs_f32(t)
+ }
+
+ /// RFC 3550, Section 6.3.2
+ pub fn new() -> Self {
+ let mut se = Self {
+ tp: Instant::now(),
+ tc: Instant::now(),
+ tn: Instant::now(),
+ senders: 0,
+ pmembers: 1,
+ members: 1,
+ we_sent: false,
+ initial: true,
+ member_table: HashSet::new(), // TODO add own ssrc to members
+ rtcp_bw: 1000., // TODO set rtcp bandwidth from params
+ avg_rtcp_size: 1000., // TODO set to next likely rtcp packet size
+ };
+ se.tn += se.compute_rtcp_transmission_interval();
+ se
+ }
+
+ /// RFC 3550, Section 6.3.3
+ pub fn on_receive(&mut self, packet: RtcpPacket) {}
+}
diff --git a/rtp/src/rtcp.rs b/rtp/src/rtcp.rs
index 0ae8ea7..da0a683 100644
--- a/rtp/src/rtcp.rs
+++ b/rtp/src/rtcp.rs
@@ -1,26 +1,158 @@
+use crate::rtp::SSRC;
+
#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error("packet truncated")]
Truncated,
+ #[error("unsupported version")]
+ Version,
}
pub struct RtcpPacket<'a> {
- a: &'a [u8],
+ parts: Vec<RtcpPart<'a>>,
+}
+
+pub enum RtcpPart<'a> {
+ SenderReport(SenderReport<'a>),
+ ReceiverReport(ReceiverReport),
+ SourceDescription(SourceDescription),
+ Bye(Bye),
+ Application(Application<'a>),
}
-pub enum RtcpPart {
- SenderReport {},
- ReceiverReport {},
- SourceDescription {},
- Bye {},
- Application {},
+struct SenderReport<'a> {
+ ssrc: SSRC,
+ sender_info: SenderInfo,
+ reports: Vec<ReportBlock>,
+ extension: &'a [u8],
+}
+struct ReceiverReport {
+ sender_ssrc: SSRC,
+ reports: Vec<ReportBlock>,
+}
+struct SourceDescription {}
+struct Bye {}
+struct Application<'a> {
+ data: &'a [u8],
+}
+struct SenderInfo {
+ ntp_ts: u64,
+ rtp_ts: u32,
+ packet_count: u32,
+ octet_count: u32,
+}
+struct ReportBlock {
+ ssrc: SSRC,
+ fraction_lost: u8,
+ cumulative_packets_lost: u32,
+ ext_max_seq_num_recv: u32,
+ interarrical_jitter: u32,
+ lsr: u32,
+ dlsr: u32,
}
impl<'a> RtcpPacket<'a> {
- pub fn parse(packet: &'a [u8]) -> Result<RtcpPacket<'a>, Error> {
- Ok(Self { a: packet })
+ pub fn parse(mut packet: &'a [u8]) -> Result<RtcpPacket<'a>, Error> {
+ let mut parts = Vec::with_capacity(2);
+ while packet.len() > 0 {
+ if packet.len() < 4 {
+ return Err(Error::Truncated);
+ }
+ let version = (packet[0] & 0b11000000) >> 6;
+ if !matches!(version, 1 | 2) {
+ return Err(Error::Version);
+ }
+ let padding = (packet[0] & 0b00100000) != 0;
+ let num_reports = (packet[0] & 0b00011111) >> 0;
+ let packet_type = packet[1];
+ let length = u16::from_be_bytes([packet[2], packet[3]]);
+
+ match packet_type {
+ 200 => {
+ // Sender Report
+ if packet.len() < 28 + 24 * num_reports as usize {
+ return Err(Error::Truncated);
+ }
+ let ssrc = SSRC(u32::from_be_bytes(packet[4..8].try_into().unwrap()));
+ let sender_info = SenderInfo::parse(packet[8..28].try_into().unwrap());
+ let mut reports = Vec::with_capacity(num_reports as usize);
+ for n in 0..num_reports as usize {
+ reports.push(ReportBlock::parse(
+ packet[28 + 24 * n..28 + 24 * (n + 1)].try_into().unwrap(),
+ ));
+ }
+ let extension = &packet[28 + 24 * num_reports as usize..];
+ parts.push(RtcpPart::SenderReport(SenderReport {
+ reports,
+ sender_info,
+ ssrc,
+ extension,
+ }))
+ }
+ 201 => { // Receiver Report
+ }
+ _ => {}
+ }
+ packet = &packet[length as usize..];
+ }
+ Ok(Self { parts })
+ }
+ pub fn write(&self, out: &mut Vec<u8>) {
+ for part in &self.parts {
+ let version = 2;
+ let padding = false;
+ match part {
+ RtcpPart::SenderReport(sender_report) => {
+ out.push(
+ version << 6 | (padding as u8) << 5 | sender_report.reports.len() as u8,
+ );
+ out.push(200);
+
+ }
+ RtcpPart::ReceiverReport(receiver_report) => todo!(),
+ RtcpPart::SourceDescription(source_description) => todo!(),
+ RtcpPart::Bye(bye) => todo!(),
+ RtcpPart::Application(application) => todo!(),
+ }
+ }
+ }
+}
+impl SenderInfo {
+ pub const SIZE: usize = 5 * 4;
+ pub fn parse(packet: [u8; 5 * 4]) -> SenderInfo {
+ Self {
+ ntp_ts: u64::from_be_bytes(packet[0..8].try_into().unwrap()),
+ rtp_ts: u32::from_be_bytes(packet[8..12].try_into().unwrap()),
+ packet_count: u32::from_be_bytes(packet[12..16].try_into().unwrap()),
+ octet_count: u32::from_be_bytes(packet[16..20].try_into().unwrap()),
+ }
+ }
+ pub fn write(&self, out: &mut Vec<u8>) {
+ out.extend(self.ntp_ts.to_be_bytes());
+ out.extend(self.rtp_ts.to_be_bytes());
+ out.extend(self.packet_count.to_be_bytes());
+ out.extend(self.octet_count.to_be_bytes());
+ }
+}
+impl ReportBlock {
+ pub fn parse(packet: [u8; 6 * 4]) -> ReportBlock {
+ Self {
+ ssrc: SSRC(u32::from_be_bytes(packet[0..4].try_into().unwrap())),
+ fraction_lost: packet[4],
+ cumulative_packets_lost: u32::from_be_bytes([0, packet[5], packet[6], packet[7]]),
+ ext_max_seq_num_recv: u32::from_be_bytes(packet[8..12].try_into().unwrap()),
+ interarrical_jitter: u32::from_be_bytes(packet[12..16].try_into().unwrap()),
+ lsr: u32::from_be_bytes(packet[16..20].try_into().unwrap()),
+ dlsr: u32::from_be_bytes(packet[20..24].try_into().unwrap()),
+ }
}
pub fn write(&self, out: &mut Vec<u8>) {
- out.extend(self.a);
+ out.extend(self.ssrc.0.to_be_bytes());
+ out.push(self.fraction_lost);
+ out.extend(&self.cumulative_packets_lost.to_be_bytes()[1..]);
+ out.extend(self.ext_max_seq_num_recv.to_be_bytes());
+ out.extend(self.interarrical_jitter.to_be_bytes());
+ out.extend(self.lsr.to_be_bytes());
+ out.extend(self.dlsr.to_be_bytes());
}
}
diff --git a/rtp/src/rtp.rs b/rtp/src/rtp.rs
index a1ed166..581bc40 100644
--- a/rtp/src/rtp.rs
+++ b/rtp/src/rtp.rs
@@ -8,12 +8,15 @@ pub enum Error {
Padding,
}
+#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq, PartialOrd, Ord)]
+pub struct SSRC(pub u32);
+
pub struct RtpPacket<'a> {
marker: bool,
payload_type: u8,
sequence: u16,
timestamp: u32,
- ssrc: u32,
+ ssrc: SSRC,
csrc_count: u8,
csrcs: [u32; 15],
extension: Option<(u16, &'a [u8])>,
@@ -37,20 +40,15 @@ impl<'a> RtpPacket<'a> {
let marker = packet[1] >> 7 != 0;
let payload_type = packet[1] & 0x7f;
let sequence = u16::from_be_bytes([packet[2], packet[3]]);
- let timestamp = u32::from_be_bytes([packet[4], packet[5], packet[6], packet[7]]);
- let ssrc = u32::from_be_bytes([packet[8], packet[9], packet[10], packet[11]]);
+ let timestamp = u32::from_be_bytes(packet[4..8].try_into().unwrap());
+ let ssrc = SSRC(u32::from_be_bytes(packet[8..12].try_into().unwrap()));
let mut csrcs = [0u32; 15];
if packet.len() < 12 + csrc_count as usize * 4 {
return Err(Error::Truncated);
}
for n in 0..csrc_count as usize {
let off = 12 + n * 4;
- csrcs[n] = u32::from_be_bytes([
- packet[off + 0],
- packet[off + 1],
- packet[off + 2],
- packet[off + 3],
- ]);
+ csrcs[n] = u32::from_be_bytes(packet[off..off + 4].try_into().unwrap());
}
let mut offset = 12 + csrc_count as usize * 4;
@@ -110,7 +108,7 @@ impl<'a> RtpPacket<'a> {
out.push((self.payload_type & 0x7f) | ((self.marker as u8) << 7));
out.extend(self.sequence.to_be_bytes());
out.extend(self.timestamp.to_be_bytes());
- out.extend(self.ssrc.to_be_bytes());
+ out.extend(self.ssrc.0.to_be_bytes());
out.extend(
self.csrcs
.into_iter()