Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_encoder_stereo_48khz(self):
encoder = get_encoder(PCMU_CODEC)
self.assertTrue(isinstance(encoder, PcmuEncoder))
for frame in self.create_audio_frames(
layout="stereo", sample_rate=48000, count=10
):
payloads, timestamp = encoder.encode(frame)
self.assertEqual(payloads, [b"\xff" * 160])
self.assertEqual(timestamp, frame.pts // 6)
def test_encoder_stereo_48khz(self):
encoder = get_encoder(PCMA_CODEC)
self.assertTrue(isinstance(encoder, PcmaEncoder))
for frame in self.create_audio_frames(
layout="stereo", sample_rate=48000, count=10
):
payloads, timestamp = encoder.encode(frame)
self.assertEqual(payloads, [b"\xd5" * 160])
self.assertEqual(timestamp, frame.pts // 6)
def test_send_data_with_gap_1_retransmit(self):
sent_tsns = []
async def mock_send_chunk(chunk):
sent_tsns.append(chunk.tsn)
client = RTCSctpTransport(self.client_transport)
client._last_sacked_tsn = 4294967295
client._local_tsn = 0
client._ssthresh = 131072
client._send_chunk = mock_send_chunk
# queue 8 chunks, but cwnd only allows 3
with self.assertTimerRestarted(client):
run(client._send(123, 456, b"M" * USERDATA_MAX_LENGTH * 8))
self.assertEqual(client._cwnd, 3600)
self.assertEqual(client._fast_recovery_exit, None)
self.assertEqual(client._flight_size, 3600)
self.assertEqual(sent_tsns, [0, 1, 2])
self.assertEqual(outstanding_tsns(client), [0, 1, 2])
self.assertEqual(queued_tsns(client), [3, 4, 5, 6, 7])
def test_t3_expired(self):
async def mock_send_chunk(chunk):
pass
async def mock_transmit():
pass
client = RTCSctpTransport(self.client_transport)
client._local_tsn = 0
client._send_chunk = mock_send_chunk
# 1 chunk
run(client._send(123, 456, b"M" * USERDATA_MAX_LENGTH))
self.assertIsNotNone(client._t3_handle)
self.assertEqual(outstanding_tsns(client), [0])
self.assertEqual(queued_tsns(client), [])
# t3 expires
client._transmit = mock_transmit
client._t3_expired()
self.assertIsNone(client._t3_handle)
self.assertEqual(outstanding_tsns(client), [0])
self.assertEqual(queued_tsns(client), [])
for chunk in client._outbound_queue:
queue = asyncio.Queue()
async def mock_send_rtp(data):
if not is_rtcp(data):
await queue.put(RtpPacket.parse(data))
self.local_transport._send_rtp = mock_send_rtp
sender = RTCRtpSender(VideoStreamTrack(), self.local_transport)
sender._ssrc = 1234
sender._rtx_ssrc = 2345
self.assertEqual(sender.kind, "video")
run(
sender.send(
RTCRtpParameters(
codecs=[
VP8_CODEC,
RTCRtpCodecParameters(
mimeType="video/rtx",
clockRate=90000,
payloadType=101,
parameters={"apt": 100},
),
]
)
)
)
# wait for one packet to be transmitted, and ask to retransmit
packet = run(queue.get())
run(sender._retransmit(packet.sequence_number))
"""
Ask for an RTP packet retransmission.
"""
queue = asyncio.Queue()
async def mock_send_rtp(data):
if not is_rtcp(data):
await queue.put(RtpPacket.parse(data))
self.local_transport._send_rtp = mock_send_rtp
sender = RTCRtpSender(VideoStreamTrack(), self.local_transport)
sender._ssrc = 1234
self.assertEqual(sender.kind, "video")
run(sender.send(RTCRtpParameters(codecs=[VP8_CODEC])))
# wait for one packet to be transmitted, and ask to retransmit
packet = run(queue.get())
run(sender._retransmit(packet.sequence_number))
# wait for packet to be retransmitted, then shutdown
run(asyncio.sleep(0.1))
run(sender.stop())
# check packet was retransmitted
found_rtx = None
while not queue.empty():
queue_packet = queue.get_nowait()
if queue_packet.sequence_number == packet.sequence_number:
found_rtx = queue_packet
break
def test_createAnswer_without_offer(self):
pc = RTCPeerConnection()
with self.assertRaises(InvalidStateError) as cm:
run(pc.createAnswer())
self.assertEqual(str(cm.exception), 'Cannot create answer in signaling state "stable"')
def test_setRemoteDescription_media_mismatch(self):
pc1 = RTCPeerConnection()
pc2 = RTCPeerConnection()
# apply offer
pc1.addTrack(AudioStreamTrack())
offer = run(pc1.createOffer())
run(pc1.setLocalDescription(offer))
run(pc2.setRemoteDescription(pc1.localDescription))
# apply answer
answer = run(pc2.createAnswer())
run(pc2.setLocalDescription(answer))
mangled = RTCSessionDescription(
sdp=pc2.localDescription.sdp.replace('m=audio', 'm=video'),
type=pc2.localDescription.type)
with self.assertRaises(ValueError) as cm:
run(pc1.setRemoteDescription(mangled))
self.assertEqual(str(cm.exception), 'Media sections in answer do not match offer')
def test_connect_video_codec_preferences_offerer(self):
pc1 = RTCPeerConnection()
pc1_states = track_states(pc1)
pc2 = RTCPeerConnection()
pc2_states = track_states(pc2)
self.assertEqual(pc1.iceConnectionState, 'new')
self.assertEqual(pc1.iceGatheringState, 'new')
self.assertIsNone(pc1.localDescription)
self.assertIsNone(pc1.remoteDescription)
self.assertEqual(pc2.iceConnectionState, 'new')
self.assertEqual(pc2.iceGatheringState, 'new')
self.assertIsNone(pc2.localDescription)
self.assertIsNone(pc2.remoteDescription)
# add track and set codec preferences to prefer H264
def test_connect_audio_offer_sendrecv_answer_sendonly(self):
pc1 = RTCPeerConnection()
pc1_states = track_states(pc1)
pc2 = RTCPeerConnection()
pc2_states = track_states(pc2)
self.assertEqual(pc1.iceConnectionState, 'new')
self.assertEqual(pc1.iceGatheringState, 'new')
self.assertIsNone(pc1.localDescription)
self.assertIsNone(pc1.remoteDescription)
self.assertEqual(pc2.iceConnectionState, 'new')
self.assertEqual(pc2.iceGatheringState, 'new')
self.assertIsNone(pc2.localDescription)
self.assertIsNone(pc2.remoteDescription)
# create offer