Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def __init__(self, host):
self.channels = {}
self.input = self.out = None
if ':' in host:
host, port = host.split(':', 1)
port = int(port)
else:
port = AMQP_PORT
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((host, port))
self.input = _AMQPReader(sock.makefile('r'))
self.out = sock.makefile('w')
self.out.write(AMQP_PROTOCOL_HEADER)
self.out.flush()
self.waiting = True
while self.waiting:
self.wait()
def dispatch_method(self, channel, payload):
if len(payload) < 4:
raise Exception('Method frame too short')
class_id, method_id = unpack('>HH', payload[:4])
args = _AMQPReader(payload[4:])
if class_id == 10:
return self.dispatch_method_connection(method_id, args)
if class_id in [20, 30]:
ch = self.channels[channel]
return ch.dispatch_method(class_id, method_id, args)
def read_table(self):
len = unpack('>I', self.input.read(4))[0]
table_data = _AMQPReader(self.input.read(len))
result = {}
while table_data.input.tell() < len:
name = table_data.read_shortstr()
ftype = table_data.input.read(1)
if ftype == 'S':
val = table_data.read_longstr()
elif ftype == 'I':
val = unpack('i', table_data.input.read(4))[0]
elif ftype == 'D':
d = table_data.read_octet()
n = table_data.read_long()
val = decimal(n) / decimal(10 ** d)
elif ftype == 'T':
val = datetime.fromtimestamp(table_data.read_longlong())
## FIXME: timezone ?
elif ftype == 'F':