Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_connection_error():
with patch('django_nameko.rpc.ClusterRpcProxy') as FakeClusterRpcProxy:
pool = get_pool()
assert pool.queue.qsize() == 4, pool.queue.qsize()
client = pool.next()
assert pool.queue.qsize() == 3, pool.queue.qsize()
with tools.assert_raises(ConnectionError):
with pool.next():
assert pool.queue.qsize() == 2, pool.queue.qsize
raise ConnectionError("connection closed")
assert pool.queue.qsize() == 3, pool.queue.qsize
# this has cleared all 4 proxy since runtimeerror is expected to broke them all
with client:
assert pool.queue.qsize() == 3, pool.queue.qsize()
client.foo.bar()
assert call().start().foo.bar() in FakeClusterRpcProxy.mock_calls
assert pool.queue.qsize() == 4, pool.queue.qsize()
destroy_pool()
def test_connection_error():
with patch('django_nameko.rpc.ClusterRpcProxy') as FakeClusterRpcProxy:
pool = get_pool()
assert pool.queue.qsize() == 4, pool.queue.qsize()
client = pool.next()
assert pool.queue.qsize() == 3, pool.queue.qsize()
with tools.assert_raises(ConnectionError):
with pool.next():
assert pool.queue.qsize() == 2, pool.queue.qsize
raise ConnectionError("connection closed")
assert pool.queue.qsize() == 3, pool.queue.qsize
# this has cleared all 4 proxy since runtimeerror is expected to broke them all
with client:
assert pool.queue.qsize() == 3, pool.queue.qsize()
client.foo.bar()
assert call().start().foo.bar() in FakeClusterRpcProxy.mock_calls
assert pool.queue.qsize() == 4, pool.queue.qsize()
destroy_pool()
from amqp import Connection as RealConnection
from amqp import ConnectionError
from gofer.compat import str
from gofer.common import ThreadSingleton
from gofer.messaging.adapter.model import Connector, BaseConnection
from gofer.messaging.adapter.connect import retry
log = getLogger(__name__)
VIRTUAL_HOST = '/'
USERID = 'guest'
PASSWORD = 'guest'
CONNECTION_EXCEPTIONS = (IOError, SocketError, ConnectionError, AttributeError)
class Connection(with_metaclass(ThreadSingleton, BaseConnection)):
"""
An AMQP broker connection.
"""
@staticmethod
def ssl_domain(connector):
"""
Get SSL properties
:param connector: A broker object.
:type connector: Connector
:return: The SSL properties
:rtype: dict
:raise: ValueError
def catch_error(func):
"""Catch errors of rabbitmq then reconnect"""
import amqp
try:
import pika.exceptions
connect_exceptions = (
pika.exceptions.ConnectionClosed,
pika.exceptions.AMQPConnectionError,
)
except ImportError:
connect_exceptions = ()
connect_exceptions += (
select.error,
socket.error,
amqp.ConnectionError
)
def wrap(self, *args, **kwargs):
try:
return func(self, *args, **kwargs)
except connect_exceptions as e:
logging.error('RabbitMQ error: %r, reconnect.', e)
self.reconnect()
return func(self, *args, **kwargs)
return wrap
class VersionMismatch(KombuError):
pass
class SerializerNotInstalled(KombuError):
"""Support for the requested serialization type is not installed"""
pass
class ContentDisallowed(SerializerNotInstalled):
"""Consumer does not allow this content-type."""
pass
class InconsistencyError(ConnectionError):
"""Data or environment has been found to be inconsistent,
depending on the cause it may be possible to retry the operation."""
pass
def catch_error(func):
"""Catch errors of rabbitmq then reconnect"""
import amqp
try:
import pika.exceptions
connect_exceptions = (
pika.exceptions.ConnectionClosed,
pika.exceptions.AMQPConnectionError,
)
except ImportError:
connect_exceptions = ()
connect_exceptions += (
select.error,
socket.error,
amqp.ConnectionError
)
def wrap(self, *args, **kwargs):
try:
return func(self, *args, **kwargs)
except connect_exceptions as e:
logging.error('RabbitMQ error: %r, reconnect.', e)
self.reconnect()
return func(self, *args, **kwargs)
return wrap
"""Maximum number of simultaneous channels exceeded."""
class VersionMismatch(KombuError):
"""Library dependency version mismatch."""
class SerializerNotInstalled(KombuError):
"""Support for the requested serialization type is not installed."""
class ContentDisallowed(SerializerNotInstalled):
"""Consumer does not allow this content-type."""
class InconsistencyError(ConnectionError):
"""Data or environment has been found to be inconsistent.
Depending on the cause it may be possible to retry the operation.
"""
@python_2_unicode_compatible
class HttpError(Exception):
"""HTTP Client Error."""
def __init__(self, code, message=None, response=None):
self.code = code
self.message = message
self.response = response
super(HttpError, self).__init__(code, message, response)
import time
import uuid
from datetime import datetime
import simplejson as json
import amqp
from amqp import Message, AMQPError, ConnectionError as AMQPConnectionError
MAX_TRIES = 3
META_FIELD = "_meta"
# mainly, this class I took from https://github.com/sievetech/hived/blob/master/hived/queue.py
# and adapted for my necessity.
class ConnectionError(AMQPConnectionError):
def __str__(self):
return '%s' % self.message
class SerializationError(Exception):
def __init__(self, exc, body=None):
super(Exception, self).__init__(*exc.args)
self.exc = exc
self.body = body
def __repr__(self):
return '%s: %s' % (self.exc, repr(self.body))
class RabbitQueue(object):
"""
from time import sleep
from logging import getLogger
from socket import error as SocketError
from amqp import Connection, ConnectionError
from gofer.messaging.adapter.model import BaseBroker
log = getLogger(__name__)
VIRTUAL_HOST = '/'
USERID = 'guest'
PASSWORD = 'guest'
CONNECTION_EXCEPTIONS = (IOError, SocketError, ConnectionError, AttributeError)
class Broker(BaseBroker):
"""
A generic AMQP broker.
"""
def __init__(self, url):
"""
:param url: The broker url.
Format: +://:@:.
:type url: str
"""
BaseBroker.__init__(self, url)
def connect(self):