Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_delete(self):
with pytest.raises(NotImplementedError):
virtual.AbstractChannel()._delete('queue')
conn.connect()
return conn
def close(self):
if self._client is not None:
return self._client.close()
super(Channel, self).close()
@property
def client(self):
if self._client is None:
self._client = self._open()
return self._client
class Transport(virtual.Transport):
Channel = Channel
interval = 1
default_port = DEFAULT_PORT
connection_errors = (socket.error,
SocketError,
IOError)
channel_errors = (socket.error,
IOError,
SocketError,
BeanstalkcException)
q = self._queue_for(queue)
size = q.qsize()
q.queue.clear()
return size
def close(self):
super(Channel, self).close()
for queue in values(self.queues):
queue.empty()
self.queues = {}
def after_reply_message_received(self, queue):
pass
class Transport(virtual.Transport):
"""In-memory Transport."""
Channel = Channel
#: memory backend state is global.
state = virtual.BrokerState()
implements = base.Transport.implements
driver_type = 'memory'
driver_name = 'memory'
def driver_version(self):
return 'N/A'
if m:
return loads(m)
raise Empty()
def _size(self, queue):
return Queue.objects.size(queue)
def _purge(self, queue):
return Queue.objects.purge(queue)
def refresh_connection(self):
from django import db
db.close_connection()
class Transport(virtual.Transport):
Channel = Channel
default_port = 0
polling_interval = POLLING_INTERVAL
channel_errors = (
virtual.Transport.channel_errors + (
errors.ObjectDoesNotExist, errors.MultipleObjectsReturned)
)
driver_type = 'sql'
driver_name = 'django'
def driver_version(self):
import django
return '.'.join(map(str, django.VERSION))
#this is for logging requests to SQS API
from utils.redis_utils import default_connection
from statistic.log_methods import LogNativeMethodsMetaclass, RedisLogBackend
class SQSLoggingConnection(SQSConnection):
__metaclass__ = LogNativeMethodsMetaclass
logger_backend = RedisLogBackend(default_connection)
if LOG_AMAZON_BROKER:
DEFAULT_CONNECTION = SQSLoggingConnection
else:
DEFAULT_CONNECTION = SQSConnection
class Channel(virtual.Channel):
#set logging these exceptions in any method of class by LogExceptionsMetaclass
__log_exceptions_logger_name = 'celery'
__log_exceptions = (
boto_exceptions.BotoClientError,
boto_exceptions.SDBPersistenceError,
boto_exceptions.BotoServerError,
AttributeError,
LookupError,
EnvironmentError,
RuntimeError,
SystemError,
ValueError
)
__metaclass__ = LogExceptionsMetaclass
ssl = False
ttl = False
connect_timeout = None
capped_queue_size = 100000
calc_queue_size = True
default_hostname = '127.0.0.1'
default_port = 27017
default_database = 'kombu_default'
messages_collection = 'messages'
routing_collection = 'messages.routing'
broadcast_collection = 'messages.broadcast'
queues_collection = 'messages.queues'
from_transport_options = (virtual.Channel.from_transport_options + (
'connect_timeout', 'ssl', 'ttl', 'capped_queue_size',
'default_hostname', 'default_port', 'default_database',
'messages_collection', 'routing_collection',
'broadcast_collection', 'queues_collection',
'calc_queue_size',
))
def __init__(self, *vargs, **kwargs):
super(Channel, self).__init__(*vargs, **kwargs)
self._broadcast_cursors = {}
# Evaluate connection
self.client
# AbstractChannel/Channel interface implementation
from redis import exceptions
# This exception suddenly changed name between redis-py versions
if hasattr(exceptions, 'InvalidData'):
DataError = exceptions.InvalidData
else:
DataError = exceptions.DataError
return error_classes_t(
(virtual.Transport.connection_errors + (
InconsistencyError,
socket.error,
IOError,
OSError,
exceptions.ConnectionError,
exceptions.AuthenticationError,
exceptions.TimeoutError)),
(virtual.Transport.channel_errors + (
DataError,
exceptions.InvalidResponse,
exceptions.ResponseError)),
)
from Queue import Empty
from anyjson import serialize, deserialize
from kombu.transport import virtual
from django.conf import settings
from django.core import exceptions as errors
from djkombu.models import Queue
POLLING_INTERVAL = getattr(settings, "DJKOMBU_POLLING_INTERVAL", 5.0)
class Channel(virtual.Channel):
def _new_queue(self, queue, **kwargs):
Queue.objects.get_or_create(name=queue)
def _put(self, queue, message, **kwargs):
Queue.objects.publish(queue, serialize(message))
def basic_consume(self, queue, *args, **kwargs):
qinfo = self.state.bindings[queue]
exchange = qinfo[0]
if self.typeof(exchange).type == "fanout":
return
super(Channel, self).basic_consume(queue, *args, **kwargs)
def _get(self, queue):
#self.refresh_connection()
from . import virtual
try:
from softlayer_messaging import get_client
from softlayer_messaging.errors import ResponseError
except ImportError: # pragma: no cover
get_client = ResponseError = None # noqa
# dots are replaced by dash, all other punctuation replaced by underscore.
CHARS_REPLACE_TABLE = {
ord(c): 0x5f for c in string.punctuation if c not in '_'
}
class Channel(virtual.Channel):
"""SLMQ Channel."""
default_visibility_timeout = 1800 # 30 minutes.
domain_format = 'kombu%(vhost)s'
_slmq = None
_queue_cache = {}
_noack_queues = set()
def __init__(self, *args, **kwargs):
if get_client is None:
raise ImportError(
'SLMQ transport requires the softlayer_messaging library',
)
super(Channel, self).__init__(*args, **kwargs)
queues = self.slmq.queues()
for queue in queues:
def refresh_connection(self):
from django import db
db.close_connection()
@cached_property
def Queue(self):
return symbol_by_name(self.queue_model)
class Transport(virtual.Transport):
Channel = Channel
default_port = 0
polling_interval = POLLING_INTERVAL
channel_errors = (
virtual.Transport.channel_errors + (
errors.ObjectDoesNotExist, errors.MultipleObjectsReturned)
)
driver_type = 'sql'
driver_name = 'django'
def driver_version(self):
import django
return '.'.join(map(str, django.VERSION))