Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_connect__multiple_amqp_init_errors(self, sleep, chan, conn):
# Prepare test
conn.side_effect = [
RecoverableConnectionError('connection already closed'),
ValueError("Must supply authentication or userid/password"),
ValueError("Invalid login method", 'login_method'),
DEFAULT
]
# Execute test
ok = self.hc.connect()
# Evaluate results
expected = [call(self.hc.host, userid=self.hc.user, password=self.hc.password, virtual_host=self.hc.vhost,
ssl=self.hc.ssl)]*4 + [call().connect(), call().channel()]
self.assertEqual(expected, conn.mock_calls, self.amqp_connection_assert_msg)
expected = [call(2), call(4), call(8)]
self.assertEqual(expected, sleep.mock_calls)
self.assertTrue(ok)
self.assertIsNotNone(self.hc.connection)
self.assertIsNotNone(self.hc.channel)
self.assertEqual(1, len(self.hc.toclose))
def test_close_connection_lost(self, chan, conn):
# Prepare test
conn.close.side_effect = RecoverableConnectionError('Connection already closed.')
self.hc.connection = conn
# Execute test
self.hc.close()
# Evaluate results
expected = [call.close()]
self.assertEqual(expected, conn.mock_calls, self.amqp_connection_assert_msg)
self.assertEqual(0, len(self.hc.toclose))
self.assertIsNone(self.hc.connection)
self.assertErrorInLog()
# When: Calling maybe_declare default
maybe_declare(entity, channel)
# Then: It called declare on the entity queue and added it to list
assert entity.declare.call_count == 1
assert hash(entity) in channel.connection.client.declared_entities
# When: Calling maybe_declare default (again)
maybe_declare(entity, channel)
# Then: we did not call declare again because its already in our list
assert entity.declare.call_count == 1
# When: Entity channel connection has gone away
entity.channel.connection = None
# Then: maybe_declare must raise a RecoverableConnectionError
with pytest.raises(RecoverableConnectionError):
maybe_declare(entity)
def test_connect__multiple_new_channel_errors(self, sleep, chan, conn):
# Prepare test
self.hc.new_channel = Mock()
self.hc.new_channel.side_effect = [
RecoverableConnectionError('Connection already closed.'),
ConnectionError('Channel %r already open' % (1,)),
ResourceError('No free channel ids, current={0}, channel_max={1}'.format(101, 100), (20, 10)),
IOError('Socket closed'),
IOError('Server unexpectedly closed connection'),
OSError('Socket timeout related error'), chan]
# Execute test
ok = self.hc.connect()
# Evaluate results
expected = [call(self.hc.host, userid=self.hc.user, password=self.hc.password, virtual_host=self.hc.vhost,
ssl=self.hc.ssl), call().connect()]*7
self.assertEqual(expected, conn.mock_calls, self.amqp_connection_assert_msg)
expected = [call(2), call(4), call(8), call(16), call(32), call(64)]
self.assertEqual(expected, sleep.mock_calls, self.sleep_assert_msg)
self.assertTrue(ok)
self.assertIsNotNone(self.hc.connection)
self.assertIsNotNone(self.hc.channel)
if channel is None:
if not entity.is_bound:
raise ChannelError(
"channel is None and entity {} not bound.".format(entity))
channel = entity.channel
declared = ident = None
if channel.connection and entity.can_cache_declaration:
declared = channel.connection.client.declared_entities
ident = hash(entity)
if ident in declared:
return False
if not channel.connection:
raise RecoverableConnectionError('channel disconnected')
entity.declare(channel=channel)
if declared is not None and ident:
declared.add(ident)
if orig is not None:
orig.name = entity.name
return True
def _maybe_declare(entity, declared, ident, channel):
channel = channel or entity.channel
if not channel.connection:
raise RecoverableConnectionError('channel disconnected')
entity.declare()
if declared is not None and ident:
declared.add(ident)
return True