Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_exchange_delete_AMQPError(self, chan, conn):
# Prepare test
self.hc.channel = chan
self.hc.channel.exchange_delete.side_effect = AMQPError(self.AMQPError_msg)
# Execute test
self.hc.exchange_delete(self.xname)
# Evaluate results
expected = [call.exchange_delete(self.xname)]
self.assertEqual(expected, chan.mock_calls, self.amqp_channel_assert_msg)
self.assertErrorInLog()
def test_restore_clear_AMQPError(self, hc, chan):
# Prepare test
chan.queue_unbind.side_effect = AMQPError(self.AMQPError_msg)
chan.exchange_delete.side_effect = AMQPError(self.AMQPError_msg)
self.pub.channel = chan
self.pub.restore_queue = self.qname
self.pub.restore_exchange = self.xname
# Execute test
self.pub.restore_clear()
# Evaluate results
expected = [call.queue_unbind(self.qname, self.xname, '#'), call.exchange_delete(self.xname)]
self.assertEqual(expected, chan.mock_calls, self.amqp_channel_assert_msg)
self.assertIsNone(self.pub.restore_queue)
self.assertIsNone(self.pub.restore_exchange)
self.assertErrorInLog()
self.find_in_log(r'{}'.format(self.AMQPError_msg))
def test_restore_set_exchange_declare_AMQPError(self, hc, chan):
# Prepare test
chan.exchange_declare.side_effect = AMQPError(self.AMQPError_msg)
self.pub.channel = chan
self.restore_queue = self.qname
self.post_exchange = self.xname
self.program_name = str(self.__class__.__name__.lower())
self.config_name = "myconfig"
ex = os._exit
os._exit = self.pub.logger.error
# Execute test
self.pub.restore_set(self)
os._exit = ex
# Evaluate results
expected = [call.exchange_declare(self.pub.restore_exchange, 'topic', auto_delete=True, durable=False)]
self.assertEqual(expected, chan.mock_calls, self.amqp_channel_assert_msg)
self.assertEqual(self.restore_queue, self.pub.restore_queue)
regex = r'{}.{}.{}.restore.\d+'.format(self.xname, self.program_name, self.config_name)
self.assertRegex(self.pub.restore_exchange, regex)
def test_connect__multiple_amqp_connect_errors(self, sleep, chan, conn):
# Prepare test
conn.return_value = conn
conn.connect.side_effect = [
AMQPError(self.AMQPError_msg),
SSLError('SSLError stub'),
IOError('IOError stub'),
OSError('OSError stub'),
Exception(self.Exception_msg),
DEFAULT
]
# Execute test
ok = self.hc.connect()
# Evaluate results
expected = [call(self.hc.host, userid=self.hc.user, password=self.hc.password, virtual_host=self.hc.vhost,
ssl=self.hc.ssl), call.connect()]*6 + [call.channel()]
self.assertEqual(expected, conn.mock_calls, self.amqp_connection_assert_msg)
expected = [call(2), call(4), call(8), call(16), call(32)]
self.assertEqual(expected, sleep.mock_calls, self.sleep_assert_msg)
self.assertTrue(ok)
self.assertIsNotNone(self.hc.connection)
def test_build__multiple_bindings__bind_Exception_loophole(self, sleep, hc, chan):
# Prepare test
xname = self.xname_fmt.format(self.test_build.__name__)
xkey = self.xkey_fmt.format(self.test_build.__name__)
self.q.bindings.append((xname, xkey))
self.q.bindings.append((xname + '_1', xkey + '_1'))
chan.queue_bind.side_effect = [AMQPError(self.AMQPError_msg), AMQPError(self.AMQPError_msg), # both bindings fail
AMQPError(self.AMQPError_msg), DEFAULT, # xname fails, xname_1 succeeds
# DEFAULT # this is for pulse
]*2
hc.new_channel.return_value = chan
self.q.declare = Mock(return_value=self.msg_count)
hc.user = 'test_user'
hc.host = 'test_host'
self.q.hc = hc
# Execute test
self.q.build()
# Evaluate results
expected = [call.new_channel()]
self.assertEqual(expected, hc.mock_calls, self.hc_assert_msg)
expected = [call.queue_bind(self.q.name, xname, xkey),
call.queue_bind(self.q.name, xname + '_1', xkey + '_1')
# ,call.queue_bind(self.q.name, xname + '_1', self.pulse_key)
def effect(*args, **kwargs):
if kwargs.get('passive'):
raise ChannelError('some channel error')
return 0, 3, 0
qd.side_effect = effect
def test_connect_errback(self, sleep, connect):
c = self.NoopConsumer()
Transport.connection_errors = (ChannelError,)
connect.on_nth_call_do(ChannelError('error'), n=1)
c.connect()
connect.assert_called_with()
def effect(*args, **kwargs):
if kwargs.get('passive'):
raise ChannelError('some channel error')
return 0, 3, 0
qd.side_effect = effect
def test_connect_errback(self, sleep, connect):
c = self.NoopConsumer()
Transport.connection_errors = (ChannelError,)
connect.on_nth_call_do(ChannelError('error'), n=1)
c.connect()
connect.assert_called_with()
def test_connection_error():
with patch('django_nameko.rpc.ClusterRpcProxy') as FakeClusterRpcProxy:
pool = get_pool()
assert pool.queue.qsize() == 4, pool.queue.qsize()
client = pool.next()
assert pool.queue.qsize() == 3, pool.queue.qsize()
with tools.assert_raises(ConnectionError):
with pool.next():
assert pool.queue.qsize() == 2, pool.queue.qsize
raise ConnectionError("connection closed")
assert pool.queue.qsize() == 3, pool.queue.qsize
# this has cleared all 4 proxy since runtimeerror is expected to broke them all
with client:
assert pool.queue.qsize() == 3, pool.queue.qsize()
client.foo.bar()
assert call().start().foo.bar() in FakeClusterRpcProxy.mock_calls
assert pool.queue.qsize() == 4, pool.queue.qsize()
destroy_pool()