Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def __init__(self, app=None, hostname=None, **kwargs):
self.app = app or self.app
self.hostname = default_nodename(hostname)
self.app.loader.init_worker()
self.on_before_init(**kwargs)
self.setup_defaults(**kwargs)
self.on_after_init(**kwargs)
self.setup_instance(**self.prepare_args(**kwargs))
self._finalize = [
Finalize(self, self._send_worker_shutdown, exitpriority=10),
]
# while waiting for unfinished work at shutdown.
if not threads:
self.check_timeouts = self._timeout_handler.handle_event
else:
self._timeout_handler = None
self._timeout_handler_started = False
self._timeout_handler_mutex = None
# Thread processing results in the outqueue.
self._result_handler = self.create_result_handler()
self.handle_result_event = self._result_handler.handle_event
if threads:
self._result_handler.start()
self._terminate = Finalize(
self, self._terminate_pool,
args=(self._taskqueue, self._inqueue, self._outqueue,
self._pool, self._worker_handler, self._task_handler,
self._result_handler, self._cache,
self._timeout_handler,
self._help_stuff_finish_args()),
exitpriority=15,
)
# On process exit we will wait for data to be flushed to pipe.
#
# However, if this process created the queue then all
# processes which use the queue will be descendants of this
# process. Therefore waiting for the queue to be flushed
# is pointless once all the child processes have been joined.
created_by_this_process = (self._opid == os.getpid())
if not self._joincancelled and not created_by_this_process:
self._jointhread = Finalize(
self._thread, Queue._finalize_join,
[weakref.ref(self._thread)],
exitpriority=-5
)
# Send sentinel to the thread queue object when garbage collected
self._close = Finalize(
self, Queue._finalize_close,
[self._buffer, self._notempty],
exitpriority=10
)
# send information to child
prep_data = get_preparation_data(process_obj._name)
os.close(from_parent_fd)
to_child = os.fdopen(to_child_fd, 'wb')
Popen._tls.process_handle = self.pid
try:
dump(prep_data, to_child, HIGHEST_PROTOCOL)
dump(process_obj, to_child, HIGHEST_PROTOCOL)
finally:
del(Popen._tls.process_handle)
to_child.close()
# `w` will be closed when the child exits, at which point `r`
# will become ready for reading (using e.g. select()).
os.close(w)
util.Finalize(self, os.close, (r,))
def __init__(self, address, backlog=None):
self._address = address
self._handle_queue = [self._new_handle(first=True)]
self._last_accepted = None
sub_debug('listener created with address=%r', self._address)
self.close = Finalize(
self, PipeListener._finalize_pipe_listener,
args=(self._handle_queue, self._address), exitpriority=0
)
# SO_REUSEADDR has different semantics on Windows (issue #2550).
if os.name == 'posix':
self._socket.setsockopt(socket.SOL_SOCKET,
socket.SO_REUSEADDR, 1)
self._socket.setblocking(True)
self._socket.bind(address)
self._socket.listen(backlog)
self._address = self._socket.getsockname()
except OSError:
self._socket.close()
raise
self._family = family
self._last_accepted = None
if family == 'AF_UNIX':
self._unlink = util.Finalize(
self, os.unlink, args=(address,), exitpriority=0
)
else:
self._unlink = None
try:
# SO_REUSEADDR has different semantics on Windows (Issue #2550).
if os.name == 'posix':
self._socket.setsockopt(socket.SOL_SOCKET,
socket.SO_REUSEADDR, 1)
self._socket.bind(address)
self._socket.listen(backlog)
self._address = self._socket.getsockname()
except OSError:
self._socket.close()
raise
self._family = family
self._last_accepted = None
if family == 'AF_UNIX':
self._unlink = Finalize(
self, os.unlink, args=(address,), exitpriority=0
)
else:
self._unlink = None
)
self._thread.daemon = True
debug('doing self._thread.start()')
self._thread.start()
debug('... done self._thread.start()')
# On process exit we will wait for data to be flushed to pipe.
#
# However, if this process created the queue then all
# processes which use the queue will be descendants of this
# process. Therefore waiting for the queue to be flushed
# is pointless once all the child processes have been joined.
created_by_this_process = (self._opid == os.getpid())
if not self._joincancelled and not created_by_this_process:
self._jointhread = Finalize(
self._thread, Queue._finalize_join,
[weakref.ref(self._thread)],
exitpriority=-5
)
# Send sentinel to the thread queue object when garbage collected
self._close = Finalize(
self, Queue._finalize_close,
[self._buffer, self._notempty],
exitpriority=10
)
# send information to child
prep_data = get_preparation_data(process_obj._name)
os.close(from_parent_fd)
to_child = os.fdopen(to_child_fd, 'wb')
Popen._tls.process_handle = self.pid
try:
dump(prep_data, to_child, HIGHEST_PROTOCOL)
dump(process_obj, to_child, HIGHEST_PROTOCOL)
finally:
del(Popen._tls.process_handle)
to_child.close()
# `w` will be closed when the child exits, at which point `r`
# will become ready for reading (using e.g. select()).
os.close(w)
util.Finalize(self, os.close, (r,))