Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_empty_close_message(self):
f = Frame(opcode=OPCODE_CLOSE, body=b'', fin=1, masking_key=os.urandom(4)).build()
s = Stream()
self.assertEqual(s.closing, None)
s.parser.send(f)
self.assertEqual(type(s.closing), CloseControlMessage)
elif frame.opcode == OPCODE_PING:
self.pings.append(PingControlMessage(some_bytes))
elif frame.opcode == OPCODE_PONG:
self.pongs.append(PongControlMessage(some_bytes))
else:
self.errors.append(CloseControlMessage(code=1003))
break
except ProtocolException:
self.errors.append(CloseControlMessage(code=1002))
break
except FrameTooLargeException:
self.errors.append(CloseControlMessage(code=1002, reason="Frame was too large"))
break
frame._cleanup()
frame.body = None
frame = None
if self.message is not None and self.message.completed:
utf8validator.reset()
utf8validator.reset()
utf8validator = None
self._cleanup()
if not py3k: reason = bytearray(reason)
is_valid, end_on_code_point, _, _ = utf8validator.validate(reason)
if not is_valid or not end_on_code_point:
self.errors.append(CloseControlMessage(code=1007, reason='Invalid UTF-8 bytes'))
break
reason = bytes(reason)
self.closing = CloseControlMessage(code=code, reason=reason)
elif frame.opcode == OPCODE_PING:
self.pings.append(PingControlMessage(some_bytes))
elif frame.opcode == OPCODE_PONG:
self.pongs.append(PongControlMessage(some_bytes))
else:
self.errors.append(CloseControlMessage(code=1003))
break
except ProtocolException:
self.errors.append(CloseControlMessage(code=1002))
break
except FrameTooLargeException:
self.errors.append(CloseControlMessage(code=1002, reason="Frame was too large"))
break
frame._cleanup()
frame.body = None
frame = None
if self.message is not None and self.message.completed:
utf8validator.reset()
try:
some_bytes = (yield next(frame.parser))
frame.parser.send(some_bytes)
except GeneratorExit:
running = False
break
except StopIteration:
frame._cleanup()
some_bytes = frame.body
# Let's avoid unmasking when there is no payload
if some_bytes:
if frame.masking_key and self.expect_masking:
some_bytes = frame.unmask(some_bytes)
elif not frame.masking_key and self.expect_masking:
msg = CloseControlMessage(code=1002, reason='Missing masking when expected')
self.errors.append(msg)
break
elif frame.masking_key and not self.expect_masking:
msg = CloseControlMessage(code=1002, reason='Masked when not expected')
self.errors.append(msg)
break
else:
# If we reach this stage, it's because
# the frame wasn't masked and we didn't expect
# it anyway. Therefore, on py2k, the bytes
# are actually a str object and can't be used
# in the utf8 validator as we need integers
# when we get each byte one by one.
# Our only solution here is to convert our
# string to a bytearray.
some_bytes = bytearray(some_bytes)
def close(self, code=1000, reason=''):
"""
Returns a close control message built from
a :class:`ws4py.messaging.CloseControlMessage` instance,
using the given status ``code`` and ``reason`` message.
"""
return CloseControlMessage(code=code, reason=reason)
m.extend(some_bytes)
m.completed = (frame.fin == 1)
if m.opcode == OPCODE_TEXT:
if some_bytes:
is_valid, end_on_code_point, _, _ = utf8validator.validate(some_bytes)
if not is_valid or (m.completed and not end_on_code_point):
self.errors.append(CloseControlMessage(code=1007, reason='Invalid UTF-8 bytes'))
break
elif frame.opcode == OPCODE_CLOSE:
code = 1000
reason = ""
if frame.payload_length == 0:
self.closing = CloseControlMessage(code=1000)
elif frame.payload_length == 1:
self.closing = CloseControlMessage(code=1002, reason='Payload has invalid length')
else:
try:
# at this stage, some_bytes have been unmasked
# so actually are held in a bytearray
code = int(unpack("!H", bytes(some_bytes[0:2]))[0])
except struct.error:
code = 1002
reason = 'Failed at decoding closing code'
else:
# Those codes are reserved or plainly forbidden
if code not in VALID_CLOSING_CODES and not (2999 < code < 5000):
reason = 'Invalid Closing Frame Code: %d' % code
code = 1002
elif frame.payload_length > 1:
reason = 'Failed at decoding closing code'
else:
# Those codes are reserved or plainly forbidden
if code not in VALID_CLOSING_CODES and not (2999 < code < 5000):
reason = 'Invalid Closing Frame Code: %d' % code
code = 1002
elif frame.payload_length > 1:
reason = some_bytes[2:] if frame.masking_key else frame.body[2:]
if not py3k: reason = bytearray(reason)
is_valid, end_on_code_point, _, _ = utf8validator.validate(reason)
if not is_valid or not end_on_code_point:
self.errors.append(CloseControlMessage(code=1007, reason='Invalid UTF-8 bytes'))
break
reason = bytes(reason)
self.closing = CloseControlMessage(code=code, reason=reason)
elif frame.opcode == OPCODE_PING:
self.pings.append(PingControlMessage(some_bytes))
elif frame.opcode == OPCODE_PONG:
self.pongs.append(PongControlMessage(some_bytes))
else:
self.errors.append(CloseControlMessage(code=1003))
break
except ProtocolException:
self.errors.append(CloseControlMessage(code=1002))
break
except FrameTooLargeException: