Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_psfb_invalid(self):
data = load("rtcp_psfb_invalid.bin")
with self.assertRaises(ValueError) as cm:
RtcpPacket.parse(data)
self.assertEqual(
str(cm.exception), "RTCP payload-specific feedback length is invalid"
)
def test_sdes_item_truncated(self):
data = load("rtcp_sdes_item_truncated.bin")
with self.assertRaises(ValueError) as cm:
RtcpPacket.parse(data)
self.assertEqual(str(cm.exception), "RTCP SDES item is truncated")
def test_sdes(self):
data = load("rtcp_sdes.bin")
packets = RtcpPacket.parse(data)
self.assertEqual(len(packets), 1)
packet = packets[0]
self.assertTrue(isinstance(packet, RtcpSdesPacket))
self.assertEqual(packet.chunks[0].ssrc, 1831097322)
self.assertEqual(
packet.chunks[0].items, [(1, b"{63f459ea-41fe-4474-9d33-9707c9ee79d1}")]
)
self.assertEqual(bytes(packet), data)
def test_sdes_source_truncated(self):
data = load("rtcp_sdes_source_truncated.bin")
with self.assertRaises(ValueError) as cm:
RtcpPacket.parse(data)
self.assertEqual(str(cm.exception), "RTCP SDES source is truncated")
def test_rr_invalid(self):
data = load("rtcp_rr_invalid.bin")
with self.assertRaises(ValueError) as cm:
RtcpPacket.parse(data)
self.assertEqual(str(cm.exception), "RTCP receiver report length is invalid")
def test_sr_invalid(self):
data = load("rtcp_sr_invalid.bin")
with self.assertRaises(ValueError) as cm:
RtcpPacket.parse(data)
self.assertEqual(str(cm.exception), "RTCP sender report length is invalid")
def test_rr(self):
data = load("rtcp_rr.bin")
packets = RtcpPacket.parse(data)
self.assertEqual(len(packets), 1)
packet = packets[0]
self.assertTrue(isinstance(packet, RtcpRrPacket))
self.assertEqual(packet.ssrc, 817267719)
self.assertEqual(packet.reports[0].ssrc, 1200895919)
self.assertEqual(packet.reports[0].fraction_lost, 0)
self.assertEqual(packet.reports[0].packets_lost, 0)
self.assertEqual(packet.reports[0].highest_sequence, 630)
self.assertEqual(packet.reports[0].jitter, 1906)
self.assertEqual(packet.reports[0].lsr, 0)
self.assertEqual(packet.reports[0].dlsr, 0)
self.assertEqual(bytes(packet), data)
def test_rr_truncated(self):
data = load("rtcp_rr.bin")
for length in range(1, 4):
with self.assertRaises(ValueError) as cm:
RtcpPacket.parse(data[0:length])
self.assertEqual(
str(cm.exception), "RTCP packet length is less than 4 bytes"
)
for length in range(4, 32):
with self.assertRaises(ValueError) as cm:
RtcpPacket.parse(data[0:length])
self.assertEqual(str(cm.exception), "RTCP packet is truncated")
receiver = RTCRtpReceiver("audio", self.local_transport)
self.assertEqual(receiver.transport, self.local_transport)
receiver._track = RemoteStreamTrack(kind="audio")
self.assertEqual(receiver._track.readyState, "live")
run(receiver.receive(RTCRtpReceiveParameters(codecs=[PCMU_CODEC])))
# receive RTP
for i in range(10):
packet = RtpPacket.parse(load("rtp.bin"))
packet.sequence_number += i
packet.timestamp += i * 160
run(receiver._handle_rtp_packet(packet, arrival_time_ms=i * 20))
# receive RTCP SR
for packet in RtcpPacket.parse(load("rtcp_sr.bin")):
run(receiver._handle_rtcp_packet(packet))
# check stats
report = run(receiver.getStats())
self.assertTrue(isinstance(report, RTCStatsReport))
self.assertEqual(
sorted([s.type for s in report.values()]),
["inbound-rtp", "remote-outbound-rtp", "transport"],
)
# check sources
sources = receiver.getSynchronizationSources()
self.assertEqual(len(sources), 1)
self.assertTrue(isinstance(sources[0], RTCRtpSynchronizationSource))
self.assertEqual(sources[0].source, 4028317929)