Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
queue = asyncio.Queue()
async def mock_send_rtp(data):
if not is_rtcp(data):
await queue.put(RtpPacket.parse(data))
self.local_transport._send_rtp = mock_send_rtp
sender = RTCRtpSender(VideoStreamTrack(), self.local_transport)
sender._ssrc = 1234
sender._rtx_ssrc = 2345
self.assertEqual(sender.kind, "video")
run(
sender.send(
RTCRtpParameters(
codecs=[
VP8_CODEC,
RTCRtpCodecParameters(
mimeType="video/rtx",
clockRate=90000,
payloadType=101,
parameters={"apt": 100},
),
]
)
)
)
# wait for one packet to be transmitted, and ask to retransmit
packet = run(queue.get())
run(sender._retransmit(packet.sequence_number))
"""
Ask for an RTP packet retransmission.
"""
queue = asyncio.Queue()
async def mock_send_rtp(data):
if not is_rtcp(data):
await queue.put(RtpPacket.parse(data))
self.local_transport._send_rtp = mock_send_rtp
sender = RTCRtpSender(VideoStreamTrack(), self.local_transport)
sender._ssrc = 1234
self.assertEqual(sender.kind, "video")
run(sender.send(RTCRtpParameters(codecs=[VP8_CODEC])))
# wait for one packet to be transmitted, and ask to retransmit
packet = run(queue.get())
run(sender._retransmit(packet.sequence_number))
# wait for packet to be retransmitted, then shutdown
run(asyncio.sleep(0.1))
run(sender.stop())
# check packet was retransmitted
found_rtx = None
while not queue.empty():
queue_packet = queue.get_nowait()
if queue_packet.sequence_number == packet.sequence_number:
found_rtx = queue_packet
break
def test_handle_rtcp_rr(self):
sender = RTCRtpSender(VideoStreamTrack(), self.local_transport)
self.assertEqual(sender.kind, "video")
run(sender.send(RTCRtpParameters(codecs=[VP8_CODEC])))
# receive RTCP RR
packet = RtcpRrPacket(
ssrc=1234,
reports=[
RtcpReceiverInfo(
ssrc=sender._ssrc,
fraction_lost=0,
packets_lost=0,
highest_sequence=630,
jitter=1906,
lsr=0,
dlsr=0,
)
],
)
def test_handle_rtcp_pli(self):
sender = RTCRtpSender(VideoStreamTrack(), self.local_transport)
self.assertEqual(sender.kind, "video")
run(sender.send(RTCRtpParameters(codecs=[VP8_CODEC])))
# receive RTCP feedback NACK
packet = RtcpPsfbPacket(fmt=RTCP_PSFB_PLI, ssrc=1234, media_ssrc=sender._ssrc)
run(sender._handle_rtcp_packet(packet))
# clean shutdown
run(sender.stop())
def test_track_ended(self):
track = AudioStreamTrack()
sender = RTCRtpSender(track, self.local_transport)
run(sender.send(RTCRtpParameters(codecs=[PCMU_CODEC])))
# stop track and wait for RTP loop to exit
track.stop()
run(asyncio.sleep(0.1))
0x74,
0x78, # RtpStreamId
0xD5,
0x73,
0x74,
0x72,
0x65,
0x61,
0x6D, # RepairedRtpStreamId
0x00,
0x00, # Padding to 32bit boundary.
]
)
extensions_map = rtp.HeaderExtensionsMap()
extensions_map.configure(
RTCRtpParameters(
headerExtensions=[
RTCRtpHeaderExtensionParameters(
id=2, uri="urn:ietf:params:rtp-hdrext:toffset"
),
RTCRtpHeaderExtensionParameters(
id=4, uri="urn:ietf:params:rtp-hdrext:ssrc-audio-level"
),
RTCRtpHeaderExtensionParameters(
id=6,
uri="http://www.webrtc.org/experiments/rtp-hdrext/abs-send-time",
),
RTCRtpHeaderExtensionParameters(
id=8,
uri="http://www.ietf.org/id/draft-holmer-rmcat-transport-wide-cc-extensions-01",
),
RTCRtpHeaderExtensionParameters(
def test_handle_rtcp_remb(self):
sender = RTCRtpSender(VideoStreamTrack(), self.local_transport)
self.assertEqual(sender.kind, "video")
run(sender.send(RTCRtpParameters(codecs=[VP8_CODEC])))
# receive RTCP feedback REMB
packet = RtcpPsfbPacket(
fmt=RTCP_PSFB_APP,
ssrc=1234,
media_ssrc=0,
fci=pack_remb_fci(4160000, [sender._ssrc]),
)
run(sender._handle_rtcp_packet(packet))
# receive RTCP feedback REMB (malformed)
packet = RtcpPsfbPacket(fmt=RTCP_PSFB_APP, ssrc=1234, media_ssrc=0, fci=b"JUNK")
run(sender._handle_rtcp_packet(packet))
# clean shutdown
run(sender.stop())
def test_handle_rtcp_nack(self):
sender = RTCRtpSender(VideoStreamTrack(), self.local_transport)
self.assertEqual(sender.kind, "video")
run(sender.send(RTCRtpParameters(codecs=[VP8_CODEC])))
# receive RTCP feedback NACK
packet = RtcpRtpfbPacket(
fmt=RTCP_RTPFB_NACK, ssrc=1234, media_ssrc=sender._ssrc
)
packet.lost.append(7654)
run(sender._handle_rtcp_packet(packet))
# clean shutdown
run(sender.stop())
def test_padding_only_with_header_extensions(self):
extensions_map = rtp.HeaderExtensionsMap()
extensions_map.configure(
RTCRtpParameters(
headerExtensions=[
RTCRtpHeaderExtensionParameters(
id=2,
uri="http://www.webrtc.org/experiments/rtp-hdrext/abs-send-time",
)
]
)
)
data = load("rtp_only_padding_with_header_extensions.bin")
packet = RtpPacket.parse(data, extensions_map)
self.assertEqual(packet.version, 2)
self.assertEqual(packet.marker, 0)
self.assertEqual(packet.payload_type, 98)
self.assertEqual(packet.sequence_number, 22138)
self.assertEqual(packet.timestamp, 3171065731)
def test_send_keyframe(self):
"""
Ask for a keyframe.
"""
queue = asyncio.Queue()
async def mock_send_rtp(data):
if not is_rtcp(data):
await queue.put(RtpPacket.parse(data))
self.local_transport._send_rtp = mock_send_rtp
sender = RTCRtpSender(VideoStreamTrack(), self.local_transport)
self.assertEqual(sender.kind, "video")
run(sender.send(RTCRtpParameters(codecs=[VP8_CODEC])))
# wait for one packet to be transmitted, and ask for keyframe
run(queue.get())
sender._send_keyframe()
# wait for packet to be transmitted, then shutdown
run(asyncio.sleep(0.1))
run(sender.stop())