Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def encode(self, packet):
bytes = six.text_type()
data, attachments = self.deconstruct_data(packet.data)
if attachments:
bytes += six.text_type(len(attachments)) + '-'
if packet.type == Packet.EVENT:
packet.type = Packet.BINARY_EVENT
elif packet.type == Packet.ACK:
packet.type = Packet.BINARY_ACK
bytes = six.text_type(packet.type) + bytes
if packet.namespace and packet.namespace != '/':
bytes += packet.namespace
if packet.id or data:
bytes += ','
if packet.id is not None:
bytes += six.text_type(packet.id)
if data is not None:
bytes += json.dumps(data, separators=(',', ':'))
return [bytes] + attachments
def encode(self, packet):
bytes = six.text_type()
data, attachments = self.deconstruct_data(packet.data)
if attachments:
bytes += six.text_type(len(attachments)) + '-'
if packet.type == Packet.EVENT:
packet.type = Packet.BINARY_EVENT
elif packet.type == Packet.ACK:
packet.type = Packet.BINARY_ACK
bytes = six.text_type(packet.type) + bytes
if packet.namespace and packet.namespace != '/':
bytes += packet.namespace
if packet.id or data:
bytes += ','
if packet.id is not None:
bytes += six.text_type(packet.id)
if data is not None:
bytes += json.dumps(data, separators=(',', ':'))
return [bytes] + attachments
def encode(self, packet):
bytes = six.text_type()
data, attachments = self.deconstruct_data(packet.data)
if attachments:
bytes += six.text_type(len(attachments)) + '-'
if packet.type == Packet.EVENT:
packet.type = Packet.BINARY_EVENT
elif packet.type == Packet.ACK:
packet.type = Packet.BINARY_ACK
bytes = six.text_type(packet.type) + bytes
if packet.namespace and packet.namespace != '/':
bytes += packet.namespace
if packet.id or data:
bytes += ','
if packet.id is not None:
bytes += six.text_type(packet.id)
if data is not None:
bytes += json.dumps(data, separators=(',', ':'))
return [bytes] + attachments
def encode(self, packet):
bytes = six.text_type()
data, attachments = self.deconstruct_data(packet.data)
if attachments:
bytes += six.text_type(len(attachments)) + '-'
if packet.type == Packet.EVENT:
packet.type = Packet.BINARY_EVENT
elif packet.type == Packet.ACK:
packet.type = Packet.BINARY_ACK
bytes = six.text_type(packet.type) + bytes
if packet.namespace and packet.namespace != '/':
bytes += packet.namespace
if packet.id or data:
bytes += ','
if packet.id is not None:
bytes += six.text_type(packet.id)
if data is not None:
bytes += json.dumps(data, separators=(',', ':'))
return [bytes] + attachments
def decode(self, bytes):
if not self.packet:
packet_type, num_attachments, namespace, packet_id, data = self.decode_packet(bytes)
self.packet = Packet(type=packet_type, namespace=namespace, id=packet_id)
self.raw_data = data
self.num_attachments = num_attachments
else:
self.attachments.append(bytes)
if self.num_attachments != len(self.attachments):
return None
packet = self.packet
packet.data = self.construct_data(self.raw_data, self.attachments)
self.reset()
return packet