Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
broker_use_ssl = self.capp.conf.BROKER_USE_SSL
broker = Broker(app.capp.connection().as_uri(include_password=True),
http_api=http_api, broker_options=broker_options, broker_use_ssl=broker_use_ssl)
queue_names = ControlHandler.get_active_queue_names()
if not queue_names:
queue_names = set([self.capp.conf.CELERY_DEFAULT_QUEUE]) |\
set([q.name for q in self.capp.conf.CELERY_QUEUES or [] if q.name])
queues = yield broker.queues(sorted(queue_names))
self.write({'active_queues': queues})
class ListTasks(BaseTaskHandler):
@web.authenticated
def get(self):
"""
List tasks
**Example request**:
.. sourcecode:: http
GET /api/tasks HTTP/1.1
Host: localhost:5555
User-Agent: HTTPie/0.8.0
**Example response**:
.. sourcecode:: http
:statuscode 200: no error
:statuscode 401: unauthorized request
:statuscode 404: unknown task
"""
args, kwargs, options = self.get_task_args()
logger.debug("Invoking task '%s' with '%s' and '%s'",
taskname, args, kwargs)
result = self.capp.send_task(
taskname, args=args, kwargs=kwargs, **options)
response = {'task-id': result.task_id}
if self.backend_configured(result):
response.update(state=result.state)
self.write(response)
class TaskResult(BaseTaskHandler):
@web.authenticated
def get(self, taskid):
"""
Get a task result
**Example request**:
.. sourcecode:: http
GET /api/task/result/c60be250-fe52-48df-befb-ac66174076e6 HTTP/1.1
Host: localhost:5555
**Example response**:
.. sourcecode:: http
expires = float(expires)
except ValueError:
expires = datetime.strptime(expires, self.DATE_FORMAT)
options['expires'] = expires
def safe_result(self, result):
"returns json encodable result"
try:
json.dumps(result)
except TypeError:
return repr(result)
else:
return result
class TaskApply(BaseTaskHandler):
@web.authenticated
@gen.coroutine
def post(self, taskname):
"""
Execute a task by name and wait results
**Example request**:
.. sourcecode:: http
POST /api/task/apply/tasks.add HTTP/1.1
Accept: application/json
Accept-Encoding: gzip, deflate, compress
Content-Length: 16
Content-Type: application/json; charset=utf-8
Host: localhost:5555
:statuscode 200: no error
:statuscode 401: unauthorized request
:statuscode 503: result backend is not configured
"""
logger.info("Aborting task '%s'", taskid)
result = AbortableAsyncResult(taskid)
if not self.backend_configured(result):
raise HTTPError(503)
result.abort()
self.write(dict(message="Aborted '%s'" % taskid))
class GetQueueLengths(BaseTaskHandler):
@web.authenticated
@gen.coroutine
def get(self):
"""
Return length of all active queues
**Example request**:
.. sourcecode:: http
GET /api/queues/length
Host: localhost:5555
**Example response**:
.. sourcecode:: http
"tasks.sleep"
]
}
:reqheader Authorization: optional OAuth token to authenticate
:statuscode 200: no error
:statuscode 401: unauthorized request
"""
seen_task_types = self.application.events.state.task_types()
response = {}
response['task-types'] = seen_task_types
self.write(response)
class TaskInfo(BaseTaskHandler):
@web.authenticated
def get(self, taskid):
"""
Get a task info
**Example request**:
.. sourcecode:: http
GET /api/task/info/91396550-c228-4111-9da4-9d88cfd5ddc6 HTTP/1.1
Accept: */*
Accept-Encoding: gzip, deflate, compress
Host: localhost:5555
**Example response**:
response = yield IOLoop.current().run_in_executor(
None, self.wait_results, result, response)
self.write(response)
def wait_results(self, result, response):
# Wait until task finished and do not raise anything
result.get(propagate=False)
# Write results and finish async function
self.update_response_result(response, result)
if self.backend_configured(result):
response.update(state=result.state)
return response
class TaskAsyncApply(BaseTaskHandler):
DATE_FORMAT = '%Y-%m-%d %H:%M:%S.%f'
@web.authenticated
def post(self, taskname):
"""
Execute a task
**Example request**:
.. sourcecode:: http
POST /api/task/async-apply/tasks.add HTTP/1.1
Accept: application/json
Accept-Encoding: gzip, deflate, compress
Content-Length: 16
Content-Type: application/json; charset=utf-8
except KeyError:
raise HTTPError(404, "Unknown task '%s'" % taskname)
try:
self.normalize_options(options)
except ValueError:
raise HTTPError(400, 'Invalid option')
result = task.apply_async(args=args, kwargs=kwargs, **options)
response = {'task-id': result.task_id}
if self.backend_configured(result):
response.update(state=result.state)
self.write(response)
class TaskSend(BaseTaskHandler):
@web.authenticated
def post(self, taskname):
"""
Execute a task by name (doesn't require task sources)
**Example request**:
.. sourcecode:: http
POST /api/task/send-task/tasks.add HTTP/1.1
Accept: application/json
Accept-Encoding: gzip, deflate, compress
Content-Length: 16
Content-Type: application/json; charset=utf-8
Host: localhost:5555
timeout = float(timeout) if timeout is not None else None
result = AsyncResult(taskid)
if not self.backend_configured(result):
raise HTTPError(503)
response = {'task-id': taskid, 'state': result.state}
if timeout:
result.get(timeout=timeout, propagate=False)
self.update_response_result(response, result)
elif result.ready():
self.update_response_result(response, result)
self.write(response)
class TaskAbort(BaseTaskHandler):
@web.authenticated
def post(self, taskid):
"""
Abort a running task
**Example request**:
.. sourcecode:: http
POST /api/task/abort/c60be250-fe52-48df-befb-ac66174076e6 HTTP/1.1
Host: localhost:5555
**Example response**:
.. sourcecode:: http
type = type if type != 'All' else None
state = state if state != 'All' else None
result = []
for task_id, task in tasks.iter_tasks(
app.events, limit=limit, type=type,
worker=worker, state=state,
received_start=received_start,
received_end=received_end):
task = tasks.as_dict(task)
task.pop('worker', None)
result.append((task_id, task))
self.write(dict(result))
class ListTaskTypes(BaseTaskHandler):
@web.authenticated
def get(self):
"""
List (seen) task types
**Example request**:
.. sourcecode:: http
GET /api/task/types HTTP/1.1
Host: localhost:5555
**Example response**:
.. sourcecode:: http