Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_stop(self):
sender = RTCRtpSender(AudioStreamTrack(), self.local_transport)
self.assertEqual(sender.kind, "audio")
run(sender.send(RTCRtpParameters(codecs=[PCMU_CODEC])))
# clean shutdown
run(sender.stop())
def test_handle_rtcp_pli(self):
sender = RTCRtpSender(VideoStreamTrack(), self.local_transport)
self.assertEqual(sender.kind, "video")
run(sender.send(RTCRtpParameters(codecs=[VP8_CODEC])))
# receive RTCP feedback NACK
packet = RtcpPsfbPacket(fmt=RTCP_PSFB_PLI, ssrc=1234, media_ssrc=sender._ssrc)
run(sender._handle_rtcp_packet(packet))
# clean shutdown
run(sender.stop())
def test_handle_rtcp_remb(self):
sender = RTCRtpSender(VideoStreamTrack(), self.local_transport)
self.assertEqual(sender.kind, "video")
run(sender.send(RTCRtpParameters(codecs=[VP8_CODEC])))
# receive RTCP feedback REMB
packet = RtcpPsfbPacket(
fmt=RTCP_PSFB_APP,
ssrc=1234,
media_ssrc=0,
fci=pack_remb_fci(4160000, [sender._ssrc]),
)
run(sender._handle_rtcp_packet(packet))
# receive RTCP feedback REMB (malformed)
packet = RtcpPsfbPacket(fmt=RTCP_PSFB_APP, ssrc=1234, media_ssrc=0, fci=b"JUNK")
run(sender._handle_rtcp_packet(packet))
def test_retransmit_with_rtx(self):
"""
Ask for an RTP packet retransmission.
"""
queue = asyncio.Queue()
async def mock_send_rtp(data):
if not is_rtcp(data):
await queue.put(RtpPacket.parse(data))
self.local_transport._send_rtp = mock_send_rtp
sender = RTCRtpSender(VideoStreamTrack(), self.local_transport)
sender._ssrc = 1234
sender._rtx_ssrc = 2345
self.assertEqual(sender.kind, "video")
run(
sender.send(
RTCRtpParameters(
codecs=[
VP8_CODEC,
RTCRtpCodecParameters(
mimeType="video/rtx",
clockRate=90000,
payloadType=101,
parameters={"apt": 100},
),
]
def test_track_ended(self):
track = AudioStreamTrack()
sender = RTCRtpSender(track, self.local_transport)
run(sender.send(RTCRtpParameters(codecs=[PCMU_CODEC])))
# stop track and wait for RTP loop to exit
track.stop()
run(asyncio.sleep(0.1))
def test_send_keyframe(self):
"""
Ask for a keyframe.
"""
queue = asyncio.Queue()
async def mock_send_rtp(data):
if not is_rtcp(data):
await queue.put(RtpPacket.parse(data))
self.local_transport._send_rtp = mock_send_rtp
sender = RTCRtpSender(VideoStreamTrack(), self.local_transport)
self.assertEqual(sender.kind, "video")
run(sender.send(RTCRtpParameters(codecs=[VP8_CODEC])))
# wait for one packet to be transmitted, and ask for keyframe
run(queue.get())
sender._send_keyframe()
# wait for packet to be transmitted, then shutdown
run(asyncio.sleep(0.1))
run(sender.stop())
pc2 = RTCPeerConnection()
pc2_states = track_states(pc2)
self.assertEqual(pc1.iceConnectionState, 'new')
self.assertEqual(pc1.iceGatheringState, 'new')
self.assertIsNone(pc1.localDescription)
self.assertIsNone(pc1.remoteDescription)
self.assertEqual(pc2.iceConnectionState, 'new')
self.assertEqual(pc2.iceGatheringState, 'new')
self.assertIsNone(pc2.localDescription)
self.assertIsNone(pc2.remoteDescription)
# add track and set codec preferences to prefer H264
pc1.addTrack(VideoStreamTrack())
capabilities = RTCRtpSender.getCapabilities('video')
preferences = list(filter(lambda x: x.name == 'H264', capabilities.codecs))
preferences += list(filter(lambda x: x.name == 'VP8', capabilities.codecs))
preferences += list(filter(lambda x: x.name == 'rtx', capabilities.codecs))
transceiver = pc1.getTransceivers()[0]
transceiver.setCodecPreferences(preferences)
# create offer
offer = run(pc1.createOffer())
self.assertEqual(offer.type, 'offer')
self.assertTrue('m=video ' in offer.sdp)
self.assertFalse('a=candidate:' in offer.sdp)
self.assertFalse('a=end-of-candidates' in offer.sdp)
run(pc1.setLocalDescription(offer))
self.assertEqual(pc1.iceConnectionState, 'new')
self.assertEqual(pc1.iceGatheringState, 'complete')
def test_stop_before_send(self):
sender = RTCRtpSender(AudioStreamTrack(), self.local_transport)
run(sender.stop())
pc2 = RTCPeerConnection()
pc2_states = track_states(pc2)
self.assertEqual(pc1.iceConnectionState, 'new')
self.assertEqual(pc1.iceGatheringState, 'new')
self.assertIsNone(pc1.localDescription)
self.assertIsNone(pc1.remoteDescription)
self.assertEqual(pc2.iceConnectionState, 'new')
self.assertEqual(pc2.iceGatheringState, 'new')
self.assertIsNone(pc2.localDescription)
self.assertIsNone(pc2.remoteDescription)
# add track and set codec preferences to prefer PCMA / PCMU
pc1.addTrack(AudioStreamTrack())
capabilities = RTCRtpSender.getCapabilities('audio')
preferences = list(filter(lambda x: x.name == 'PCMA', capabilities.codecs))
preferences += list(filter(lambda x: x.name == 'PCMU', capabilities.codecs))
transceiver = pc1.getTransceivers()[0]
transceiver.setCodecPreferences(preferences)
# create offer
offer = run(pc1.createOffer())
self.assertEqual(offer.type, 'offer')
self.assertTrue('m=audio ' in offer.sdp)
self.assertFalse('a=candidate:' in offer.sdp)
self.assertFalse('a=end-of-candidates' in offer.sdp)
run(pc1.setLocalDescription(offer))
self.assertEqual(pc1.iceConnectionState, 'new')
self.assertEqual(pc1.iceGatheringState, 'complete')
self.assertEqual(mids(pc1), ['0'])
def test_handle_rtcp_nack(self):
sender = RTCRtpSender(VideoStreamTrack(), self.local_transport)
self.assertEqual(sender.kind, "video")
run(sender.send(RTCRtpParameters(codecs=[VP8_CODEC])))
# receive RTCP feedback NACK
packet = RtcpRtpfbPacket(
fmt=RTCP_RTPFB_NACK, ssrc=1234, media_ssrc=sender._ssrc
)
packet.lost.append(7654)
run(sender._handle_rtcp_packet(packet))
# clean shutdown
run(sender.stop())