Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def start(self, args):
version_major = args.read_octet()
version_minor = args.read_octet()
properties = args.read_table()
mechanisms = args.read_longstr().split(' ')
locales = args.read_longstr().split(' ')
print 'Start from server, version: %d.%d, properties: %s, mechanisms: %s, locales: %s' % (version_major, version_minor, str(properties), mechanisms, locales)
login = _AMQPWriter()
login.write_table({"LOGIN": "guest", "PASSWORD": "guest"})
login = login.getvalue()[4:] #Skip the length at the beginning
self.start_ok({'product': 'Python AMQP', 'version': '0.1'}, 'AMQPLAIN', login, 'en_US')
def open(self, virtual_host, capabilities='', insist=False):
args = _AMQPWriter()
args.write_shortstr(virtual_host)
args.write_shortstr(capabilities)
args.write_octet(1 if insist else 0)
self.send_method_frame(0, 10, 40, args.getvalue())
def send_method_frame(self, channel, class_id, method_id, packed_args):
pkt = _AMQPWriter()
pkt.write_octet(1)
pkt.write_short(channel)
pkt.write_long(len(packed_args)+4) # 4 = length of class_id and method_id in payload
pkt.write_short(class_id)
pkt.write_short(method_id)
pkt.write(packed_args)
pkt.write_octet(0xce)
pkt = pkt.getvalue()
# hexdump(pkt)
self.out.write(pkt)
self.out.flush()
def tune_ok(self, channel_max, frame_max, heartbeat):
args = _AMQPWriter()
args.write_short(channel_max)
args.write_long(frame_max)
args.write_short(heartbeat)
self.send_method_frame(0, 10, 31, args.getvalue())
self.open('/')
def serialize(self):
args = _AMQPWriter()
args.write_short(0)
packed_properties = args.getvalue()
return packed_properties, self.body
def write_table(self, d):
self.flushbits()
table_data = _AMQPWriter()
for k, v in d.items():
table_data.write_shortstr(k)
if isinstance(v, basestring):
if isinstance(v, unicode):
v = v.encode('utf-8')
table_data.write('S')
table_data.write_longstr(v)
elif isinstance(v, [int, long]):
table_data.write('I')
table_data.write(pack('>i', v))
elif isinstance(v, decimal):
table_data.write('D')
table_data.write_octet(4)
table_data.write_long(int(v * 10))
elif isinstance(v, datetime):
table_data.write('T')