Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
options = {'passive': passive,
'durable': durable,
'exclusive': exclusive,
'auto-delete': auto_delete,
'arguments': arguments}
if queue.startswith('celeryev') or queue.endswith('pidbox'):
options['qpid.policy_type'] = 'ring'
try:
self._broker.addQueue(queue, options=options)
except Exception as exc:
if OBJECT_ALREADY_EXISTS_STRING not in str(exc):
raise exc
queue_to_check = self._broker.getQueue(queue)
message_count = queue_to_check.values['msgDepth']
consumer_count = queue_to_check.values['consumerCount']
return amqp.protocol.queue_declare_ok_t(queue, message_count,
consumer_count)
def queue_declare(self, queue=None, passive=False, **kwargs):
"""Declare queue."""
queue = queue or 'amq.gen-%s' % uuid()
if passive and not self._has_queue(queue, **kwargs):
raise ChannelError(
'NOT_FOUND - no queue {0!r} in vhost {1!r}'.format(
queue, self.connection.client.virtual_host or '/'),
(50, 10), 'Channel.queue_declare', '404',
)
else:
self._new_queue(queue, **kwargs)
return queue_declare_ok_t(queue, self._size(queue), 0)
def queue_declare(self, queue=None, passive=False, **kwargs):
"""Declare queue."""
queue = queue or 'amq.gen-%s' % uuid()
if passive and not self._has_queue(queue, **kwargs):
raise ChannelError(
'NOT_FOUND - no queue {0!r} in vhost {1!r}'.format(
queue, self.connection.client.virtual_host or '/'),
(50, 10), 'Channel.queue_declare', '404',
)
else:
self._new_queue(queue, **kwargs)
return queue_declare_ok_t(queue, self._size(queue), 0)
def queue_declare(self, queue='', passive=False, durable=False,
exclusive=False, auto_delete=False, arguments=None,
nowait=False):
return queue_declare_ok_t(
*self.connection._queue_declare(
self.channel_id, queue, passive, durable,
exclusive, auto_delete, arguments or {},
)