Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def on_pause():
self.set_transport(transport)
self.transport.send([Packet(Packet.UPGRADE, '')])
self.transport_ready_event.set()
def close():
self.send([Packet(Packet.CLOSE)])
self.client.stop_loop(self.read_loop)
def on_transport_open():
transport.send([Packet(Packet.PING, 'probe')])
transport.once('packet', on_packet)
def send(self, message, binary=False):
self.send_packet(Packet(Packet.MESSAGE, message, binary))
elif packet_type >= 48:
packet_type -= 48
binary = False
else:
binary = True
packet_data = None
if len(encoded_packet) > 1:
if binary:
if b64:
packet_data = base64.b64decode(encoded_packet[1:])
else:
packet_data = encoded_packet[1:]
else:
packet_data = encoded_packet[1:].decode('utf-8')
return Packet(packet_type, packet_data, binary)