Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def __enter__(self):
if self._proxy is None:
self._pool._reload(1) # reload 1 worker and raise error
self.__del__()
raise RuntimeError("This RpcContext has been stopped already")
elif self._rpc is None:
# try to start the RPC proxy if it haven't been started yet (first RPC call of this connection)
try:
self._rpc = self._proxy.start()
except (IOError, ConnectionError): # if failed then reload 1 worker and reraise
self._pool._reload(1) # reload 1 worker
self.__del__()
raise
self._enable_rpc_call = True
return weakref.proxy(self)
def requeue_message(self, message):
# only attempt to requeue if the message connection is alive;
# otherwise the message will already have been reclaimed by the broker
if message.channel.connection:
try:
message.requeue()
except ConnectionError: # pragma: no cover
pass # ignore connection closing inside conditional
def requeue_message(self, message):
# only attempt to requeue if the message connection is alive;
# otherwise the message will already have been reclaimed by the broker
if message.channel.connection:
try:
message.requeue()
except ConnectionError: # pragma: no cover
pass # ignore connection closing inside conditional
while self.keep_running:
try:
connection = consumer.connection
# Do not assume the consumer still has the connection, it may have been already closed, we don't know.
# Unfortunately, the only way to check it is to invoke the method and catch AttributeError
# if connection is already None.
try:
connection.drain_events(timeout=timeout)
except AttributeError:
consumer = self._get_consumer()
# Special-case AMQP-level connection errors and recreate the connection if any is caught.
except AMQPConnectionError:
logger.warn('Caught AMQP connection error in mainloop e:`%s`', format_exc())
if connection:
connection.close()
consumer = self._get_consumer()
# Regular network-level errors - assume the AMQP connection is still fine and treat it
# as an opportunity to perform the heartbeat.
except conn_errors:
try:
connection.heartbeat_check()
except Exception:
hb_errors_so_far += 1
if hb_errors_so_far % log_every == 0:
logger.warn('Exception in heartbeat (%s so far), e:`%s`', hb_errors_so_far, format_exc())
def do(self, *args, **kwargs):
sleep_seconds=1
while self.block() == True:
try:
fn(self, *args, **kwargs)
break
except NotFound as err:
self.logging.error("AMQP error. Function: %s Reason: %s"%(fn.__name__,err))
if self.auto_create == True:
self.brokerCreateQueue(kwargs["consume_queue"])
except ConnectionError as err:
sleep_seconds*=2
self.logging.error("AMQP error. Function: %s Reason: %s"%(fn.__name__,err))
except Exception as err:
sleep_seconds*=2
self.logging.error("AMQP error. Function: %s Reason: %s"%(fn.__name__,err))
sleep(sleep_seconds)
self.logging.info("Sleeping for %s seconds."%sleep_seconds)
return do
body, message = self.replies.pop(correlation_id)
self.provider.handle_message(body, message)
except socket.timeout:
# TODO: this conflates an rpc timeout with a socket read timeout.
# a better rpc proxy implementation would recover from a socket
# timeout if the rpc timeout had not yet been reached
timeout_error = RpcTimeout(self.timeout)
event = self.provider._reply_events.pop(correlation_id)
event.send_exception(timeout_error)
# timeout is implemented using socket timeout, so when it
# fires the connection is closed and must be re-established
self._setup_consumer()
except (IOError, ConnectionError) as exc:
# in case this was a temporary error, attempt to reconnect
# and try again. if we fail to reconnect, the error will bubble
self._setup_consumer()
self.get_message(correlation_id)
except KeyboardInterrupt as exc:
event = self.provider._reply_events.pop(correlation_id)
event.send_exception(exc)
# exception may have killed the connection
self._setup_consumer()