Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def wrapped_run_function(task):
if get_setting("_dispatch_local_execution", default=False, deprecated_keys=["local_execution"]):
create_output_dirs(task)
run_function(task)
else:
run_task_remote(task)
def _create_task_process(self, task):
batch_system = BatchSystems(get_setting("batch_system", default=BatchSystems.lsf, task=task))
if batch_system == BatchSystems.lsf:
process_class = LSFProcess
elif batch_system == BatchSystems.htcondor:
process_class = HTCondorProcess
elif batch_system == BatchSystems.gbasf2:
process_class = Gbasf2Process
elif batch_system == BatchSystems.test:
process_class = TestProcess
elif batch_system == BatchSystems.local:
create_output_dirs(task)
return super()._create_task_process(task)
else:
raise NotImplementedError
return process_class(task=task, scheduler=self._scheduler,
result_queue=self._task_result_queue, worker_timeout=self._config.timeout)
def run_as_batch_worker(task_list, cli_args, kwargs):
found_task = False
for root_task in task_list:
for task in task_iterator(root_task):
if task.task_id != cli_args.task_id:
continue
found_task = True
set_setting("_dispatch_local_execution", True)
# TODO: We do not process the information if (a) we have a new dependency and (b) why the task has failed.
# TODO: Would be also nice to run the event handlers
try:
create_output_dirs(task)
task.run()
task.on_success()
except BaseException as ex:
task.on_failure(ex)
raise ex
return
if not found_task:
raise ValueError(f"The task id {task.task_id} to be executed by this batch worker "
f"does not exist in the locally reproduced task graph.")
def process(self):
create_output_dirs(self)
for key, file_list in self.get_input_file_names().items():
if hasattr(self, "keys") and key not in self.keys:
continue
args = self.cmd + [self.get_output_file_name(key)] + file_list
subprocess.check_call(args)