Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_createAnswer_without_offer(self):
pc = RTCPeerConnection()
with self.assertRaises(InvalidStateError) as cm:
run(pc.createAnswer())
self.assertEqual(str(cm.exception), 'Cannot create answer in signaling state "stable"')
def test_setRemoteDescription_media_mismatch(self):
pc1 = RTCPeerConnection()
pc2 = RTCPeerConnection()
# apply offer
pc1.addTrack(AudioStreamTrack())
offer = run(pc1.createOffer())
run(pc1.setLocalDescription(offer))
run(pc2.setRemoteDescription(pc1.localDescription))
# apply answer
answer = run(pc2.createAnswer())
run(pc2.setLocalDescription(answer))
mangled = RTCSessionDescription(
sdp=pc2.localDescription.sdp.replace('m=audio', 'm=video'),
type=pc2.localDescription.type)
with self.assertRaises(ValueError) as cm:
run(pc1.setRemoteDescription(mangled))
self.assertEqual(str(cm.exception), 'Media sections in answer do not match offer')
def test_connect_video_codec_preferences_offerer(self):
pc1 = RTCPeerConnection()
pc1_states = track_states(pc1)
pc2 = RTCPeerConnection()
pc2_states = track_states(pc2)
self.assertEqual(pc1.iceConnectionState, 'new')
self.assertEqual(pc1.iceGatheringState, 'new')
self.assertIsNone(pc1.localDescription)
self.assertIsNone(pc1.remoteDescription)
self.assertEqual(pc2.iceConnectionState, 'new')
self.assertEqual(pc2.iceGatheringState, 'new')
self.assertIsNone(pc2.localDescription)
self.assertIsNone(pc2.remoteDescription)
# add track and set codec preferences to prefer H264
def test_connect_audio_offer_sendrecv_answer_sendonly(self):
pc1 = RTCPeerConnection()
pc1_states = track_states(pc1)
pc2 = RTCPeerConnection()
pc2_states = track_states(pc2)
self.assertEqual(pc1.iceConnectionState, 'new')
self.assertEqual(pc1.iceGatheringState, 'new')
self.assertIsNone(pc1.localDescription)
self.assertIsNone(pc1.remoteDescription)
self.assertEqual(pc2.iceConnectionState, 'new')
self.assertEqual(pc2.iceGatheringState, 'new')
self.assertIsNone(pc2.localDescription)
self.assertIsNone(pc2.remoteDescription)
# create offer
def test_connect_video_no_ssrc(self):
pc1 = RTCPeerConnection()
pc1_states = track_states(pc1)
pc2 = RTCPeerConnection()
pc2_states = track_states(pc2)
self.assertEqual(pc1.iceConnectionState, 'new')
self.assertEqual(pc1.iceGatheringState, 'new')
self.assertIsNone(pc1.localDescription)
self.assertIsNone(pc1.remoteDescription)
self.assertEqual(pc2.iceConnectionState, 'new')
self.assertEqual(pc2.iceGatheringState, 'new')
self.assertIsNone(pc2.localDescription)
self.assertIsNone(pc2.remoteDescription)
# create offer
def test_connect_audio_offer_recvonly(self):
pc1 = RTCPeerConnection()
pc1_states = track_states(pc1)
pc2 = RTCPeerConnection()
pc2_states = track_states(pc2)
self.assertEqual(pc1.iceConnectionState, 'new')
self.assertEqual(pc1.iceGatheringState, 'new')
self.assertIsNone(pc1.localDescription)
self.assertIsNone(pc1.remoteDescription)
self.assertEqual(pc2.iceConnectionState, 'new')
self.assertEqual(pc2.iceGatheringState, 'new')
self.assertIsNone(pc2.localDescription)
self.assertIsNone(pc2.remoteDescription)
# create offer
def test_connect_datachannel_modern_sdp(self):
pc1 = RTCPeerConnection()
pc1._sctpLegacySdp = False
pc1_data_messages = []
pc1_states = track_states(pc1)
pc2 = RTCPeerConnection()
pc2_data_channels = []
pc2_data_messages = []
pc2_states = track_states(pc2)
@pc2.on('datachannel')
def on_datachannel(channel):
self.assertEqual(channel.readyState, 'open')
pc2_data_channels.append(channel)
@channel.on('message')
def on_message(message):
def test_addTransceiver_audio_inactive(self):
pc = RTCPeerConnection()
# add transceiver
transceiver = pc.addTransceiver('audio', direction='inactive')
self.assertIsNotNone(transceiver)
self.assertEqual(transceiver.currentDirection, None)
self.assertEqual(transceiver.direction, 'inactive')
self.assertEqual(transceiver.sender.track, None)
self.assertEqual(transceiver.stopped, False)
self.assertEqual(pc.getSenders(), [transceiver.sender])
self.assertEqual(len(pc.getTransceivers()), 1)
# add track
track = AudioStreamTrack()
pc.addTrack(track)
self.assertEqual(transceiver.currentDirection, None)
self.assertEqual(transceiver.direction, 'sendonly')
def test_connect_audio_two_tracks(self):
pc1 = RTCPeerConnection()
pc1_states = track_states(pc1)
pc2 = RTCPeerConnection()
pc2_states = track_states(pc2)
self.assertEqual(pc1.iceConnectionState, 'new')
self.assertEqual(pc1.iceGatheringState, 'new')
self.assertIsNone(pc1.localDescription)
self.assertIsNone(pc1.remoteDescription)
self.assertEqual(pc2.iceConnectionState, 'new')
self.assertEqual(pc2.iceGatheringState, 'new')
self.assertIsNone(pc2.localDescription)
self.assertIsNone(pc2.remoteDescription)
# create offer
pc1.addTrack(AudioStreamTrack())
pc1.addTrack(AudioStreamTrack())
offer = run(pc1.createOffer())
async def offer(request):
params = await request.json()
print(params);
offer = RTCSessionDescription(sdp=params["sdp"], type=params["type"])
pc = RTCPeerConnection()
client_id = client_manager.create_new_client(pc)
# data = [True]
#recog_worker_thread = threading.Thread(target=recog_worker,args=(client_manager.get_client(client_id),data,))
#recog_worker_thread.start()
@pc.on("iceconnectionstatechange")
async def on_iceconnectionstatechange():
print("ICE connection state is %s" % pc.iceConnectionState)
if pc.iceConnectionState == "failed":
await client_manager.remove_client(client_id) #this already handles pc closing
faceregtrack = FacialRecognitionTrack(face_detect, client_manager.get_client(client_id));
@pc.on("datachannel")
def on_datachannel(channel):