Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_exchange_delete_AMQPError(self, chan, conn):
# Prepare test
self.hc.channel = chan
self.hc.channel.exchange_delete.side_effect = AMQPError(self.AMQPError_msg)
# Execute test
self.hc.exchange_delete(self.xname)
# Evaluate results
expected = [call.exchange_delete(self.xname)]
self.assertEqual(expected, chan.mock_calls, self.amqp_channel_assert_msg)
self.assertErrorInLog()
def test_restore_clear_AMQPError(self, hc, chan):
# Prepare test
chan.queue_unbind.side_effect = AMQPError(self.AMQPError_msg)
chan.exchange_delete.side_effect = AMQPError(self.AMQPError_msg)
self.pub.channel = chan
self.pub.restore_queue = self.qname
self.pub.restore_exchange = self.xname
# Execute test
self.pub.restore_clear()
# Evaluate results
expected = [call.queue_unbind(self.qname, self.xname, '#'), call.exchange_delete(self.xname)]
self.assertEqual(expected, chan.mock_calls, self.amqp_channel_assert_msg)
self.assertIsNone(self.pub.restore_queue)
self.assertIsNone(self.pub.restore_exchange)
self.assertErrorInLog()
self.find_in_log(r'{}'.format(self.AMQPError_msg))
def test_restore_set_exchange_declare_AMQPError(self, hc, chan):
# Prepare test
chan.exchange_declare.side_effect = AMQPError(self.AMQPError_msg)
self.pub.channel = chan
self.restore_queue = self.qname
self.post_exchange = self.xname
self.program_name = str(self.__class__.__name__.lower())
self.config_name = "myconfig"
ex = os._exit
os._exit = self.pub.logger.error
# Execute test
self.pub.restore_set(self)
os._exit = ex
# Evaluate results
expected = [call.exchange_declare(self.pub.restore_exchange, 'topic', auto_delete=True, durable=False)]
self.assertEqual(expected, chan.mock_calls, self.amqp_channel_assert_msg)
self.assertEqual(self.restore_queue, self.pub.restore_queue)
regex = r'{}.{}.{}.restore.\d+'.format(self.xname, self.program_name, self.config_name)
self.assertRegex(self.pub.restore_exchange, regex)
def test_connect__multiple_amqp_connect_errors(self, sleep, chan, conn):
# Prepare test
conn.return_value = conn
conn.connect.side_effect = [
AMQPError(self.AMQPError_msg),
SSLError('SSLError stub'),
IOError('IOError stub'),
OSError('OSError stub'),
Exception(self.Exception_msg),
DEFAULT
]
# Execute test
ok = self.hc.connect()
# Evaluate results
expected = [call(self.hc.host, userid=self.hc.user, password=self.hc.password, virtual_host=self.hc.vhost,
ssl=self.hc.ssl), call.connect()]*6 + [call.channel()]
self.assertEqual(expected, conn.mock_calls, self.amqp_connection_assert_msg)
expected = [call(2), call(4), call(8), call(16), call(32)]
self.assertEqual(expected, sleep.mock_calls, self.sleep_assert_msg)
self.assertTrue(ok)
self.assertIsNotNone(self.hc.connection)
def test_build__multiple_bindings__bind_Exception_loophole(self, sleep, hc, chan):
# Prepare test
xname = self.xname_fmt.format(self.test_build.__name__)
xkey = self.xkey_fmt.format(self.test_build.__name__)
self.q.bindings.append((xname, xkey))
self.q.bindings.append((xname + '_1', xkey + '_1'))
chan.queue_bind.side_effect = [AMQPError(self.AMQPError_msg), AMQPError(self.AMQPError_msg), # both bindings fail
AMQPError(self.AMQPError_msg), DEFAULT, # xname fails, xname_1 succeeds
# DEFAULT # this is for pulse
]*2
hc.new_channel.return_value = chan
self.q.declare = Mock(return_value=self.msg_count)
hc.user = 'test_user'
hc.host = 'test_host'
self.q.hc = hc
# Execute test
self.q.build()
# Evaluate results
expected = [call.new_channel()]
self.assertEqual(expected, hc.mock_calls, self.hc_assert_msg)
expected = [call.queue_bind(self.q.name, xname, xkey),
call.queue_bind(self.q.name, xname + '_1', xkey + '_1')
# ,call.queue_bind(self.q.name, xname + '_1', self.pulse_key)
def _run_mq_listener(self):
try:
self.consumer.register(self._mq_callback)
self.consumer.wait(self.mq_timeout_seconds)
except socket.timeout as e:
self.logger.warning(f'Queue {self.consumer.queue} is likely empty. Worker exits due to: {e}')
except (AMQPError, IOError) as e:
self.logger.error(f'AMQPError: {e}')
finally:
self.__del__()
self.logger.info('Exiting main thread. All auxiliary threads stopped.')
def stop(self):
# kill ioloops
for j in xrange(len(self.ioloops)):
self.ioloops[j].unlink(self._ioloop_failure)
self.ioloops[j].kill()
self.ioloops[j] = None
self.ioloops = None
# close channels
for rpcc in self.rpc_server_channels.values():
try:
rpcc.disconnect()
except amqp.AMQPError:
LOG.debug("exception in disconnect: ", exc_info=True)
for pc in self.pub_channels.values():
try:
pc.disconnect()
except amqp.AMQPError:
LOG.debug("exception in disconnect: ", exc_info=True)
for sc in self.sub_channels.values():
try:
sc.disconnect()
except amqp.AMQPError:
LOG.debug("exception in disconnect: ", exc_info=True)
for rfc in self.rpc_fanout_clients_channels:
try:
try:
producer.publish(
{
'experiment_uuid': experiment_uuid,
'job_uuid': job_uuid,
'log_line': log_line,
'status': status,
'task_type': task_type,
'task_idx': task_idx
},
routing_key='{}.{}.{}'.format(RoutingKeys.LOGS_SIDECARS,
experiment_uuid,
job_uuid),
exchange=settings.INTERNAL_EXCHANGE,
)
except (TimeoutError, AMQPError):
pass
elif self.performance_tracker.tracker.success.per_tick % 3 == 0:
document.os = 'Windows'
document.browser = 'IE-60'
document.language = 'ge_de'
document.country = 'germany'
else:
document.os = 'Android'
document.browser = 'FireMini-20'
document.language = 'es'
document.country = 'eu'
document.is_page_view = True
self.publisher.publish(document.document)
self.performance_tracker.tracker.increment_success()
time.sleep(SLEEP_TIME)
except (AMQPError, IOError) as e:
self.thread_is_running = False
self.performance_tracker.cancel()
self.logger.error(f'AMQPError: {e}')
except Exception as e:
self.performance_tracker.tracker.increment_failure()
self.logger.info(f'safety fuse: {e}')
def _try(self, method, _tries=1, **kwargs):
if self.channel is None:
self._connect()
try:
# import ipdb; ipdb.set_trace()
return getattr(self.channel, method)(**kwargs)
except (AMQPError, IOError) as e:
if _tries < MAX_TRIES:
self._connect()
return self._try(method, _tries + 1, **kwargs)
else:
raise ConnectionError(e)