Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_queue_job(app):
rq = RQ(app, is_async=True)
rq.connection.flushdb()
rq.job(add)
job1 = add.queue(1, 2)
assert isinstance(job1, import_attribute(rq.job_class))
assert job1.args == (1, 2)
assert job1.kwargs == {}
assert job1.timeout == add.helper.timeout == rq.default_timeout
job2 = add.queue(3, 4, description='job 2')
assert job2.description == 'job 2'
job3_id = uuid.uuid4().hex
job3 = add.queue(5, 6, job_id=job3_id)
assert job3.id == job3_id
job4 = add.queue(7, 8, depends_on=job3)
assert job4.dependency.id == job3.id
other_queue = 'other_queue'
job5 = add.queue(9, 10, queue=other_queue)
Return queue class from config or from RQ settings, otherwise return DjangoRQ.
If ``queue_class`` is provided, it takes priority.
The full priority list for queue class sources:
1. ``queue_class`` argument
2. ``QUEUE_CLASS`` in ``config`` argument
3. ``QUEUE_CLASS`` in base settings (``RQ``)
"""
RQ = getattr(settings, 'RQ', {})
if queue_class is None:
queue_class = RQ.get('QUEUE_CLASS', DjangoRQ)
if config:
queue_class = config.get('QUEUE_CLASS', queue_class)
if isinstance(queue_class, str):
queue_class = import_attribute(queue_class)
return queue_class
Returns an RQ queue instance with the given name, e.g.::
default_queue = rq.get_queue()
low_queue = rq.get_queue('low')
:param name: Name of the queue to return, defaults to
:attr:`~flask_rq2.RQ.default_queue`.
:type name: str
:return: An RQ queue instance.
:rtype: ``rq.queue.Queue``
"""
if not name:
name = self.default_queue
queue = self._queue_instances.get(name)
if queue is None:
queue_cls = import_attribute(self.queue_class)
queue = queue_cls(
name=name,
default_timeout=self.default_timeout,
is_async=self._is_async,
connection=self.connection,
job_class=self.job_class
)
self._queue_instances[name] = queue
return queue
def get_job_class(job_class=None):
"""
Return job class from RQ settings, otherwise return Job.
If `job_class` is not None, it is used as an override (can be
python import path as string).
"""
RQ = getattr(settings, 'RQ', {})
if job_class is None:
job_class = RQ.get('JOB_CLASS', Job)
if isinstance(job_class, str):
job_class = import_attribute(job_class)
return job_class
def get_worker(self, *queues):
"""
Returns an RQ worker instance for the given queue names, e.g.::
configured_worker = rq.get_worker()
default_worker = rq.get_worker('default')
default_low_worker = rq.get_worker('default', 'low')
:param \\*queues: Names of queues the worker should act on, falls back
to the configured queues.
"""
if not queues:
queues = self.queues
queues = [self.get_queue(name) for name in queues]
worker_cls = import_attribute(self.worker_class)
worker = worker_cls(
queues,
connection=self.connection,
job_class=self.job_class,
queue_class=self.queue_class,
)
for exception_handler in self._exception_handlers:
worker.push_exc_handler(import_attribute(exception_handler))
return worker
def func(self):
func_name = self.func_name
if func_name is None:
return None
if self.instance:
return getattr(self.instance, func_name)
return import_attribute(self.func_name)
def _connect(self):
connection_class = import_attribute(self.connection_class)
return connection_class.from_url(self.redis_url)
is_async=True, job_class=None, **kwargs):
self.connection = resolve_connection(connection)
prefix = self.redis_queue_namespace_prefix
self.name = name
self._key = '{0}{1}'.format(prefix, name)
self._default_timeout = parse_timeout(default_timeout) or self.DEFAULT_TIMEOUT
self._is_async = is_async
if 'async' in kwargs:
self._is_async = kwargs['async']
warnings.warn('The `async` keyword is deprecated. Use `is_async` instead', DeprecationWarning)
# override class attribute job_class if one was passed
if job_class is not None:
if isinstance(job_class, string_types):
job_class = import_attribute(job_class)
self.job_class = job_class
def get_exception_handlers():
"""
Custom exception handlers could be defined in settings.py:
RQ = {
'EXCEPTION_HANDLERS': ['path.to.handler'],
}
"""
from .settings import EXCEPTION_HANDLERS
return [import_attribute(path) for path in EXCEPTION_HANDLERS]