Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def effect(*args, **kwargs):
if kwargs.get('passive'):
raise ChannelError('some channel error')
return 0, 3, 0
qd.side_effect = effect
def test_connect_errback(self, sleep, connect):
c = self.NoopConsumer()
Transport.connection_errors = (ChannelError,)
connect.on_nth_call_do(ChannelError('error'), n=1)
c.connect()
connect.assert_called_with()
def effect(*args, **kwargs):
if kwargs.get('passive'):
raise ChannelError('some channel error')
return 0, 3, 0
qd.side_effect = effect
def test_connect_errback(self, sleep, connect):
c = self.NoopConsumer()
Transport.connection_errors = (ChannelError,)
connect.on_nth_call_do(ChannelError('error'), n=1)
c.connect()
connect.assert_called_with()
def _ensure_channel_is_bound(entity, channel):
"""Make sure the channel is bound to the entity.
:param entity: generic kombu nomenclature, generally an exchange or queue
:param channel: channel to bind to the entity
:return: the updated entity
"""
is_bound = entity.is_bound
if not is_bound:
if not channel:
raise ChannelError(
"Cannot bind channel {} to entity {}".format(channel, entity))
entity = entity.bind(channel)
return entity
def _fn(messenger, *args, **kwargs):
repair = lambda: None
while not Thread.aborted():
try:
repair()
return fn(messenger, *args, **kwargs)
except ChannelError, le:
# 312: NO_ROUTE, 404: NOT_FOUND
if le.reply_code not in (312, 404):
log.warn(utf8(le))
repair = messenger.repair
sleep(DELAY)
else:
raise NotFound(*le.args)
except CONNECTION_EXCEPTIONS, pe:
log.warn(utf8(pe))
repair = messenger.repair
sleep(DELAY)
return _fn
@synchronized
def close(self):
"""
Close the endpoint.
"""
try:
channel = self.__channel
self.__channel = None
if channel is not None:
channel.close()
except (CONNECTION_EXCEPTIONS, ChannelError), e:
log.exception(str(e))