Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
while True:
# This task receiver socket is blocking.
try:
b_task_id, *buf = funcx_worker_socket.recv_multipart()
except Exception as e:
logger.debug(e)
# msg = task_socket.recv_pyobj()
logger.debug("Got buffer : {}".format(buf))
task_id = int.from_bytes(b_task_id, "little")
logger.info("Received task {}".format(task_id))
try:
result = execute_task(buf)
serialized_result = serialize_object(result)
except Exception:
result_package = {'task_id': task_id, 'exception': serialize_object(RemoteExceptionWrapper(*sys.exc_info()))}
logger.debug("Got exception something")
else:
result_package = {'task_id': task_id, 'result': serialized_result}
logger.info("Completed task {}".format(task_id))
pkl_package = pickle.dumps(result_package)
funcx_worker_socket.send_multipart([pkl_package])
if no_reuse:
logger.info("Exiting worker. Container will not be reused, breaking...")
funcx_worker_socket.close()
context.term()
"""
logger.debug("[WORKER_WATCHDOG_THREAD] Starting thread")
while not kill_event.is_set():
for worker_id, p in self.procs.items():
if not p.is_alive():
logger.info("[WORKER_WATCHDOG_THREAD] Worker {} has died".format(worker_id))
try:
task = self._tasks_in_progress.pop(worker_id)
logger.info("[WORKER_WATCHDOG_THREAD] Worker {} was busy when it died".format(worker_id))
try:
raise WorkerLost(worker_id, platform.node())
except Exception:
logger.info("[WORKER_WATCHDOG_THREAD] Putting exception for task {} in the pending result queue".format(task['task_id']))
result_package = {'task_id': task['task_id'], 'exception': serialize_object(RemoteExceptionWrapper(*sys.exc_info()))}
pkl_package = pickle.dumps(result_package)
self.pending_result_queue.put(pkl_package)
except KeyError:
logger.info("[WORKER_WATCHDOG_THREAD] Worker {} was not busy when it died".format(worker_id))
p = multiprocessing.Process(target=worker, args=(worker_id,
self.uid,
self.worker_count,
self.pending_task_queue,
self.pending_result_queue,
self.ready_worker_queue,
self._tasks_in_progress
), name="HTEX-Worker-{}".format(worker_id))
self.procs[worker_id] = p
logger.info("[WORKER_WATCHDOG_THREAD] Worker {} has been restarted".format(worker_id))
time.sleep(self.poll_period)
logger.debug("Synced")
task_request = b'TREQ'
while True:
comm.send(task_request, dest=0, tag=TASK_REQUEST_TAG)
# The worker will receive {'task_id':, 'buffer':}
req = comm.recv(source=0, tag=rank)
logger.debug("Got req: {}".format(req))
tid = req['task_id']
logger.debug("Got task: {}".format(tid))
try:
result = execute_task(req['buffer'])
except Exception as e:
result_package = {'task_id': tid, 'exception': serialize_object(e)}
logger.debug("No result due to exception: {} with result package {}".format(e, result_package))
else:
result_package = {'task_id': tid, 'result': serialize_object(result)}
logger.debug("Result: {}".format(result))
pkl_package = pickle.dumps(result_package)
comm.send(pkl_package, dest=0, tag=RESULT_TAG)
break
else:
# Received a valid message, handle it
logger.debug("[RUNNER] Got a valid task with ID {}".format(msg["task_id"]))
try:
response_obj = execute_task(msg['buffer'])
response = {"task_id": msg["task_id"],
"result": serialize_object(response_obj)}
logger.debug("[RUNNER] Returing result: {}".format(
deserialize_object(response["result"])))
except Exception as e:
logger.debug("[RUNNER] Caught task exception: {}".format(e))
response = {"task_id": msg["task_id"],
"exception": serialize_object(e)}
outgoing_q.put(response)
logger.debug("[RUNNER] Terminating")
def serialize(self, obj):
"""serialize objects.
Must return list of sendable buffers.
Can be extended for more efficient/noncopying serialization of numpy arrays, etc.
"""
return serialize_object(obj)
logger.debug("Setting kill event")
self._kill_event.set()
e = ManagerLost(manager)
result_package = {'task_id': -1, 'exception': serialize_object(e)}
pkl_package = pickle.dumps(result_package)
self.results_outgoing.send(pkl_package)
logger.warning("[MAIN] Sent failure reports, unregistering manager")
else:
logger.debug("[MAIN] Suppressing shutdown due to version incompatibility")
else:
# Registration has failed.
if self.suppress_failure is False:
self._kill_event.set()
e = BadRegistration(manager, critical=True)
result_package = {'task_id': -1, 'exception': serialize_object(e)}
pkl_package = pickle.dumps(result_package)
self.results_outgoing.send(pkl_package)
else:
logger.debug("[MAIN] Suppressing bad registration from manager:{}".format(
manager))
else:
self._ready_manager_queue[manager]['last'] = time.time()
if message[1] == b'HEARTBEAT':
logger.debug("[MAIN] Manager {} sends heartbeat".format(manager))
self.task_outgoing.send_multipart([manager, b'', PKL_HEARTBEAT_CODE])
else:
manager_adv = pickle.loads(message[1])
logger.debug("[MAIN] Manager {} requested {}".format(manager, manager_adv))
self._ready_manager_queue[manager]['free_capacity'].update(manager_adv)
self._ready_manager_queue[manager]['free_capacity']['total_workers'] = sum(manager_adv.values())
# The worker will receive {'task_id':, 'buffer':}
req = task_queue.get()
tasks_in_progress[worker_id] = req
tid = req['task_id']
logger.info("Received task {}".format(tid))
try:
worker_queue.get()
except queue.Empty:
logger.warning("Worker ID: {} failed to remove itself from ready_worker_queue".format(worker_id))
pass
try:
result = execute_task(req['buffer'])
serialized_result = serialize_object(result)
except Exception as e:
logger.info('Caught an exception: {}'.format(e))
result_package = {'task_id': tid, 'exception': serialize_object(RemoteExceptionWrapper(*sys.exc_info()))}
else:
result_package = {'task_id': tid, 'result': serialized_result}
# logger.debug("Result: {}".format(result))
logger.info("Completed task {}".format(tid))
pkl_package = pickle.dumps(result_package)
result_queue.put(pkl_package)
tasks_in_progress.pop(worker_id)
logger.info("Got task : {}".format(tid))
try:
worker_queue.get(worker_id)
except queue.Empty:
logger.warning("Worker ID: {} failed to remove itself from ready_worker_queue".format(worker_id))
pass
try:
result = execute_task(req['buffer'])
except Exception as e:
result_package = {'task_id': tid, 'exception': serialize_object(e)}
logger.debug("No result due to exception: {} with result package {}".format(e, result_package))
else:
result_package = {'task_id': tid, 'result': serialize_object(result)}
logger.debug("Result : {}".format(result))
logger.info("Completed task : {}".format(tid))
pkl_package = pickle.dumps(result_package)
result_queue.put(pkl_package)
try:
b_task_id, *buf = funcx_worker_socket.recv_multipart()
except Exception as e:
logger.debug(e)
# msg = task_socket.recv_pyobj()
logger.debug("Got buffer : {}".format(buf))
task_id = int.from_bytes(b_task_id, "little")
logger.info("Received task {}".format(task_id))
try:
result = execute_task(buf)
serialized_result = serialize_object(result)
except Exception:
result_package = {'task_id': task_id, 'exception': serialize_object(RemoteExceptionWrapper(*sys.exc_info()))}
logger.debug("Got exception something")
else:
result_package = {'task_id': task_id, 'result': serialized_result}
logger.info("Completed task {}".format(task_id))
pkl_package = pickle.dumps(result_package)
funcx_worker_socket.send_multipart([pkl_package])
if no_reuse:
logger.info("Exiting worker. Container will not be reused, breaking...")
funcx_worker_socket.close()
context.term()
return None
argname = prefix+"args"
kwargname = prefix+"kwargs"
resultname = prefix+"result"
ns = { fname : f, argname : args, kwargname : kwargs , resultname : None }
# print ns
working.update(ns)
code = "%s = %s(*%s,**%s)" % (resultname, fname, argname, kwargname)
try:
exec(code, shell.user_global_ns, shell.user_ns)
result = working.get(resultname)
finally:
for key in ns:
working.pop(key)
result_buf = serialize_object(result,
buffer_threshold=self.session.buffer_threshold,
item_threshold=self.session.item_threshold,
)
except BaseException as e:
# invoke IPython traceback formatting
shell.showtraceback()
reply_content = {
'traceback': [],
'ename': unicode_type(type(e).__name__),
'evalue': safe_unicode(e),
}
# get formatted traceback, which ipykernel recorded
if hasattr(shell, '_last_traceback'):
# ipykernel 4.4
reply_content['traceback'] = shell._last_traceback or []