Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def get_worker_name():
p = current_process()
return p.initargs[1].split('@')[1]
def init(self, taskstate_id=None, apikey_id=None):
"""
Tasks should call this in their run() method to initialise stuff.
Returns False if anything bad happens.
"""
# Set our process ID if it hasn't been set yet
global this_process
if this_process is None:
this_process = int(current_process()._name.split('-')[1])
# Sleep for staggered worker startup
if settings.STAGGER_APITASK_STARTUP:
sleep_for = (this_process - 1) * 2
self._logger.warning('Worker #%d staggered startup: sleeping for %d seconds', this_process, sleep_for)
time.sleep(sleep_for)
# Clear the current query information so we don't bloat
if settings.DEBUG:
for db in settings.DATABASES.keys():
connections[db].queries = []
self._started = time.time()
self._api_log = []
self._cache_delta = None
import atexit
from billiard import current_process
from celery.five import monotonic
from celery.utils.debug import memdump, sample_mem
all_count = 0
bench_first = None
bench_start = None
bench_last = None
bench_every = C_BENCH_EVERY
bench_sample = []
__reserved = task_reserved
__ready = task_ready
if current_process()._name == 'MainProcess':
@atexit.register
def on_shutdown():
if bench_first is not None and bench_last is not None:
print('- Time spent in benchmark: {0!r}'.format(
bench_last - bench_first))
print('- Avg: {0}'.format(
sum(bench_sample) / len(bench_sample)))
memdump()
def task_reserved(request): # noqa
global bench_start
global bench_first
now = None
if bench_start is None:
bench_start = now = monotonic()
if bench_first is None:
def rebuild_handle(pickled_data):
from ..connection import Client
address, handle, inherited = pickled_data
if inherited:
return handle
sub_debug('rebuilding handle %d', handle)
conn = Client(address, authkey=current_process().authkey)
conn.send((handle, os.getpid()))
new_handle = recv_handle(conn)
conn.close()
return new_handle
def makeRecord(self, *args, **kwds):
record = Logger.makeRecord(self, *args, **kwds)
record.processName = current_process()._name
return record
def initfun():
setproctitle("Pool: %s" % mp.current_process()._name)
Requests are proxied through US Proxies to prevent MailChimp
blocks. This is an accepted technique among integrators and
does not violate MailChimp's Terms of Service.
"""
# Don't use a proxy if environment variable is set, e.g. in development
if os.environ.get('NO_PROXY'):
self.logger.info(
'NO_PROXY environment variable set. Not using a proxy.')
return
# Get the worker number for this Celery worker
# We want each worker to control its corresponding proxy process
# Note that workers are zero-indexed, proxy procceses are not
process = current_process()
# Fall back to proxy #1 if we can't ascertain the worker index
# e.g. anyone hacking with this app on windows
try:
proxy_process_number = str(process.index + 1)
except AttributeError:
proxy_process_number = '1'
# Use the US Proxies API to get the proxy info
proxy_request_uri = 'http://us-proxies.com/api.php'
proxy_params = (
('api', ''),
('uid', '9557'),
('pwd', os.environ.get('PROXY_AUTH_PWD')),
('cmd', 'rotate'),
('process', proxy_process_number),
def setup(**kwargs):
"""
Set affinity for the worker on startup (works on toss3 nodes)
:param `**kwargs`: keyword arguments
"""
if "CELERY_AFFINITY" in os.environ and int(os.environ["CELERY_AFFINITY"]) > 1:
# Number of cpus between workers.
cpu_skip = int(os.environ["CELERY_AFFINITY"])
npu = psutil.cpu_count()
p = psutil.Process()
current = billiard.current_process()
prefork_id = current._identity[0] - 1 # range 0:nworkers-1
cpu_slot = (prefork_id * cpu_skip) % npu
p.cpu_affinity(list(range(cpu_slot, cpu_slot + cpu_skip)))
def _get_listener():
global _listener
if _listener is None:
_lock.acquire()
try:
if _listener is None:
from ..connection import Listener
debug('starting listener and thread for sending handles')
_listener = Listener(authkey=current_process().authkey)
t = threading.Thread(target=_serve)
t.daemon = True
t.start()
finally:
_lock.release()
return _listener