Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
# create rpc out channel
self.rpc_out_channel = \
self._connections[-1].channel()
self.rpc_out_queue = self.rpc_out_channel.queue_declare(
exclusive=False
)
self.rpc_out_channel.basic_consume(
callback=self._rpc_callback,
no_ack=True,
exclusive=True
)
self.amqp_config['on_blocked'] = self._blocked
self.amqp_config['on_unblocked'] = self._unblocked
self._rpc_in_connection = amqp.connection.Connection(
**self.amqp_config
)
self._rpc_in_connection.sock._read_event.priority = self.priority
self._rpc_in_connection.sock._write_event.priority = self.priority
for rpcc in self.rpc_server_channels.values():
rpcc.connect(self._rpc_in_connection)
if start_dispatching:
self.start_dispatching()
def _amqp_consumer(self):
while True:
try:
conn = amqp.connection.Connection(
userid=self.rabbitmq_username,
password=self.rabbitmq_password
)
channel = conn.channel()
channel.exchange_declare(
self.exchange,
'fanout',
durable=False,
auto_delete=False
)
q = channel.queue_declare(
exclusive=False
)
channel.queue_bind(
queue=q.queue,
def setupConnectivity(self):
while self.loop():
try:
self.connection = amqp_connection(
host=self.kwargs.host,
port=self.kwargs.port,
virtual_host=self.kwargs.vhost,
userid=self.kwargs.user,
password=self.kwargs.password
)
self.channel = self.connection.channel()
if self.kwargs.exchange != "":
self.channel.exchange_declare(
self.kwargs.exchange,
self.kwargs.exchange_type,
durable=self.kwargs.exchange_durable,
auto_delete=self.kwargs.exchange_auto_delete,
passive=self.kwargs.exchange_passive,
arguments=self._exchange_arguments
def setupConnectivity(self):
while self.loop():
try:
self.connection = amqp_connection(
host=self.kwargs.host,
port=self.kwargs.port,
virtual_host=self.kwargs.vhost,
userid=self.kwargs.user,
password=self.kwargs.password
)
self.channel = self.connection.channel()
if self.kwargs.exchange != "":
self.channel.exchange_declare(
self.kwargs.exchange,
self.kwargs.exchange_type,
durable=self.kwargs.exchange_durable,
auto_delete=self.kwargs.exchange_auto_delete,
passive=self.kwargs.exchange_passive,
arguments=self._exchange_arguments
def __init__(self, ftlist, config):
super(AMQPMaster, self).__init__()
self.ftlist = ftlist
self.config = config
self.status_glet = None
self._status = {}
self._connection = amqp.connection.Connection(**self.config)
self._out_channel = self._connection.channel()
self._out_channel.exchange_declare(
AMQP_BUS_EXCHANGE,
'fanout',
auto_delete=False
)
self._in_channel = self._connection.channel()
self._in_channel.queue_declare(
queue=AMQP_PREFIX+'master',
exclusive=True,
auto_delete=True
)
self._in_channel.basic_consume(
callback=self._in_callback,