Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_init(self):
b = Broker('redis+socket:///path/to/socket')
self.assertFalse(isinstance(b, RabbitMQ))
self.assertTrue(isinstance(b, RedisSocket))
def test_url(self):
b = Broker('redis+socket:///path/to/socket')
self.assertEqual(None, b.host)
self.assertEqual(None, b.port)
self.assertEqual('path/to/socket', b.vhost)
def test_redis_client_args(self):
b = Broker('rediss://:pass@host:4444/5', broker_use_ssl=self.BROKER_USE_SSL_OPTIONS)
self.assertEqual('host', b.host)
self.assertEqual(4444, b.port)
self.assertEqual(5, b.vhost)
self.assertEqual('pass', b.password)
redis_client_args = b._get_redis_client_args()
for ssl_key, ssl_val in self.BROKER_USE_SSL_OPTIONS.items():
self.assertIn(ssl_key, redis_client_args)
self.assertEqual(ssl_val, redis_client_args[ssl_key])
def test_url_with_password(self):
b = Broker('redis://:pass@host:4444/5')
self.assertEqual('host', b.host)
self.assertEqual(4444, b.port)
self.assertEqual(5, b.vhost)
self.assertEqual('pass', b.password)
def test_priority_steps(self):
custom_steps = list(range(10))
cases = [(RedisBase.DEFAULT_PRIORITY_STEPS, {}),
(custom_steps, {'priority_steps': custom_steps})]
for expected, options in cases:
b = Broker('redis://localhost:6379/0', broker_options=options)
self.assertEqual(expected, b.priority_steps)
def test_url_defaults(self):
b = Broker('redis://')
self.assertEqual('localhost', b.host)
self.assertEqual(6379, b.port)
self.assertEqual(0, b.vhost)
self.assertIsNone(b.username)
self.assertIsNone(b.password)
def test_init_without_broker_use_ssl(self):
with self.assertRaises(ValueError):
Broker('rediss://localhost:6379/0')
:statuscode 200: no error
:statuscode 401: unauthorized request
:statuscode 503: result backend is not configured
"""
app = self.application
broker_options = self.capp.conf.BROKER_TRANSPORT_OPTIONS
http_api = None
if app.transport == 'amqp' and app.options.broker_api:
http_api = app.options.broker_api
broker_use_ssl = None
if self.capp.conf.BROKER_USE_SSL:
broker_use_ssl = self.capp.conf.BROKER_USE_SSL
broker = Broker(app.capp.connection().as_uri(include_password=True),
http_api=http_api, broker_options=broker_options, broker_use_ssl=broker_use_ssl)
queue_names = ControlHandler.get_active_queue_names()
if not queue_names:
queue_names = set([self.capp.conf.CELERY_DEFAULT_QUEUE]) |\
set([q.name for q in self.capp.conf.CELERY_QUEUES or [] if q.name])
queues = yield broker.queues(sorted(queue_names))
self.write({'active_queues': queues})
def main():
broker_url = sys.argv[1] if len(sys.argv) > 1 else 'amqp://'
queue_name = sys.argv[2] if len(sys.argv) > 2 else 'celery'
if len(sys.argv) > 3:
http_api = sys.argv[3]
else:
http_api = 'http://guest:guest@localhost:15672/api/'
broker = Broker(broker_url, http_api=http_api)
queues = yield broker.queues([queue_name])
if queues:
print(queues)
io_loop.stop()
def get(self):
app = self.application
capp = app.capp
try:
broker_use_ssl = None
if self.capp.conf.BROKER_USE_SSL:
broker_use_ssl = self.capp.conf.BROKER_USE_SSL
broker = Broker(capp.connection().as_uri(include_password=True),
http_api=app.options.broker_api, broker_use_ssl=broker_use_ssl)
except NotImplementedError:
self.write({})
return
queue_names = ControlHandler.get_active_queue_names()
if not queue_names:
queue_names = set([self.capp.conf.CELERY_DEFAULT_QUEUE]) | \
set([q.name for q in self.capp.conf.CELERY_QUEUES or [] if q.name])
queues = yield broker.queues(queue_names)
data = defaultdict(int)
for queue in queues:
data[queue['name']] = queue.get('messages', 0)
self.write(data)