Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
from __future__ import absolute_import, print_function
import math
from amqp import promise
from kombu import Connection, Exchange, Queue, Consumer
from kombu.async import Hub
from kombu.five import monotonic
N = 1000
TEST_QUEUE = Queue('test3', Exchange('test3'))
FREQ = int(math.ceil(math.sqrt(N)))
program_finished = promise()
def on_ack_sent(message):
if not message.delivery_tag % FREQ:
print('acked: {0:4d}/{1}'.format(message.delivery_tag, N))
if message.delivery_tag >= N:
consumer.stop(callback=program_finished, close_channel=True)
def on_message(message):
message.ack(callback=on_ack_sent)
if __name__ == '__main__':
loop = Hub()
connection = Connection('pyamqp://')
def basic_cancel(self, consumer_tag):
# If we are busy reading messages we may experience
# a race condition where a message is consumed after
# cancelling, so we must delay this operation until reading
# is complete (Issue celery/celery#1773).
connection = self.connection
if connection:
if connection.cycle._in_protected_read:
return connection.cycle.after_read.add(
promise(self._basic_cancel, (consumer_tag, )),
)
return self._basic_cancel(consumer_tag)
def autodiscover_tasks(self, packages, related_name='tasks', force=False):
if force:
return self._autodiscover_tasks(packages, related_name)
signals.import_modules.connect(promise(
self._autodiscover_tasks, (packages, related_name),
), weak=False, sender=self)
def call_soon(self, callback, *args):
handle = promise(callback, args)
self._ready.append(handle)
return handle