Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def get_gbasf2_env(gbasf2_install_directory=None):
"""
Return the gbasf2 environment dict which can be used to run gbasf2 commands.
:param gbasf2_install_directory: Directory into which gbasf2 has been
installed. When set to the default value ``None``, it looks for the
value of the ``gbasf2_install_directory`` setting and when that is not
set, it uses the default of most installation instructions, which is
``~/gbasf2KEK``.
:return: Dictionary containing the environment that you get from sourcing the gbasf2 setup script.
"""
if gbasf2_install_directory is None:
gbasf2_install_directory = get_setting("gbasf2_install_directory", default="~/gbasf2KEK")
gbasf2_setup_path = os.path.join(gbasf2_install_directory, "BelleDIRAC/gbasf2/tools/setup")
if not os.path.isfile(os.path.expanduser(gbasf2_setup_path)):
raise FileNotFoundError(
f"Could not find gbasf2 setup files in ``{gbasf2_install_directory}``.\n" +
"Make sure to that gbasf2 is installed at that location."
)
# complete bash command to set up the gbasf2 environment
# piping output to /dev/null, because we want that our final script only prints the ``env`` output
gbasf2_setup_command_str = f"source {gbasf2_setup_path} > /dev/null"
# command to execute the gbasf2 setup command in a fresh shell and output the produced environment
echo_gbasf2_env_command = shlex.split(f"env -i bash -c '{gbasf2_setup_command_str} > /dev/null && env'")
gbasf2_env_string = subprocess.run(
echo_gbasf2_env_command, check=True, stdout=subprocess.PIPE, encoding="utf-8"
).stdout
gbasf2_env = dict(line.split("=", 1) for line in gbasf2_env_string.splitlines())
return gbasf2_env
if gbasf2_input_dataset is not False:
gbasf2_command_str += f" -i {gbasf2_input_dataset} "
gbasf2_n_repition_jobs = get_setting("gbasf2_n_repition_job", default=False, task=self.task)
if gbasf2_n_repition_jobs is not False:
gbasf2_command_str += f" --repetition {gbasf2_n_repition_jobs} "
# now add some additional optional options to the gbasf2 job submission string
# whether to ask user for confirmation before submitting job
force_submission = get_setting("gbasf2_force_submission", default=True, task=self.task)
if force_submission:
gbasf2_command_str += " --force "
# estimated cpu time per sub-job in minutes
cpu_minutes = get_setting("gbasf2_cputime", default=False, task=self.task)
if cpu_minutes is not False:
gbasf2_command_str += f" --cputime {cpu_minutes} "
# estimated number or processed events per second
evtpersec = get_setting("gbasf2_evtpersec", default=False, task=self.task)
if evtpersec is not False:
gbasf2_command_str += f" --evtpersec {evtpersec} "
# gbasf2 job priority
priority = get_setting("gbasf2_priority", default=False, task=self.task)
if priority is not False:
assert 0 <= priority <= 10, "Priority should be integer between 0 and 10."
gbasf2_command_str += f" --priority {priority} "
# gbasf2 job type (e.g. User, Production, ...)
jobtype = get_setting("gbasf2_jobtype", default=False, task=self.task)
force_submission = get_setting("gbasf2_force_submission", default=True, task=self.task)
if force_submission:
gbasf2_command_str += " --force "
# estimated cpu time per sub-job in minutes
cpu_minutes = get_setting("gbasf2_cputime", default=False, task=self.task)
if cpu_minutes is not False:
gbasf2_command_str += f" --cputime {cpu_minutes} "
# estimated number or processed events per second
evtpersec = get_setting("gbasf2_evtpersec", default=False, task=self.task)
if evtpersec is not False:
gbasf2_command_str += f" --evtpersec {evtpersec} "
# gbasf2 job priority
priority = get_setting("gbasf2_priority", default=False, task=self.task)
if priority is not False:
assert 0 <= priority <= 10, "Priority should be integer between 0 and 10."
gbasf2_command_str += f" --priority {priority} "
# gbasf2 job type (e.g. User, Production, ...)
jobtype = get_setting("gbasf2_jobtype", default=False, task=self.task)
if jobtype is not False:
gbasf2_command_str += f" --jobtype {jobtype} "
# additional basf2 options to use on grid
basf2opt = get_setting("gbasf2_basf2opt", default=False, task=self.task)
if basf2opt is not False:
gbasf2_command_str += f" --basf2opt='{basf2opt}' "
# optional string of additional parameters to append to gbasf2 command
gbasf2_additional_params = get_setting("gbasf2_additional_params", default=False, task=self.task)
stdout_log_file = os.path.abspath(os.path.join(log_file_dir, "stdout"))
submit_file_content.append(f"output = {stdout_log_file}")
stderr_log_file = os.path.abspath(os.path.join(log_file_dir, "stderr"))
submit_file_content.append(f"error = {stderr_log_file}")
job_log_file = os.path.abspath(os.path.join(log_file_dir, "job.log"))
submit_file_content.append(f"log = {job_log_file}")
# Specify the executable
executable_file = create_executable_wrapper(self.task)
submit_file_content.append(f"executable = {os.path.basename(executable_file)}")
# Specify additional settings
general_settings = get_setting("htcondor_settings", dict())
try:
general_settings.update(self.task.htcondor_settings)
except AttributeError:
pass
transfer_files = get_setting("transfer_files", task=self.task, default=[])
if transfer_files:
working_dir = get_setting("working_dir", task=self.task, default="")
if not working_dir or working_dir != ".":
raise ValueError("If using transfer_files, the working_dir must be explicitely set to '.'")
general_settings.setdefault("should_transfer_files", "YES")
general_settings.setdefault("when_to_transfer_output", "ON_EXIT")
transfer_files = set(transfer_files)
def create_executable_wrapper(task):
"""
To incorporate all settings (environment, working paths, remote or locally)
we create an executable bash script which is called instead of the application
and which will setup everything accordingly before doing the actual work.
"""
shell = get_setting("shell", task=task, default="bash")
executable_wrapper_content = [f"#!/bin/{shell}", "set -e"]
# 1. First part is the folder we need to change if given
working_dir = get_setting("working_dir", task=task, default=os.path.abspath(os.path.dirname(get_filename())))
executable_wrapper_content.append(f"cd {working_dir}")
executable_wrapper_content.append("echo 'Working in the folder:'; pwd")
# 2. Second part of the executable wrapper, the environment.
executable_wrapper_content.append("echo 'Setting up the environment'")
# (a) If given, use the environment script
env_setup_script = get_setting("env_script", task=task, default="")
if env_setup_script:
# The script will be called from the directory of the script. So we have to make sure the
# env_script is reachable from there (not from where we are currently)
if not os.path.isfile(map_folder(env_setup_script)):
raise FileNotFoundError(f"Environment setup script {env_setup_script} does not exist.")
executable_wrapper_content.append(f"source {env_setup_script}")
# (b) Now override with any environment from the task or settings
def _build_gbasf2_submit_command(self):
"""
Function to create the gbasf2 submit command to pass to run_with_gbasf2
from the task options and attributes.
"""
gbasf2_release = get_setting("gbasf2_release", default=get_basf2_git_hash(), task=self.task)
gbasf2_additional_files = get_setting("gbasf2_additional_files", default=[], task=self.task)
assert not isinstance(gbasf2_additional_files, str), "gbasf2_additional_files should be a list or tuple, not a string."
gbasf2_input_sandbox_files = [os.path.basename(self.pickle_file_path)] + gbasf2_additional_files
gbasf2_command_str = (f"gbasf2 {self.wrapper_file_path} -f {' '.join(gbasf2_input_sandbox_files)} " +
f"-p {self.gbasf2_project_name} -s {gbasf2_release} ")
gbasf2_input_dataset = get_setting("gbasf2_input_dataset", default=False, task=self.task)
if gbasf2_input_dataset is not False:
gbasf2_command_str += f" -i {gbasf2_input_dataset} "
gbasf2_n_repition_jobs = get_setting("gbasf2_n_repition_job", default=False, task=self.task)
if gbasf2_n_repition_jobs is not False:
gbasf2_command_str += f" --repetition {gbasf2_n_repition_jobs} "
# now add some additional optional options to the gbasf2 job submission string
# whether to ask user for confirmation before submitting job
job_log_file = os.path.abspath(os.path.join(log_file_dir, "job.log"))
submit_file_content.append(f"log = {job_log_file}")
# Specify the executable
executable_file = create_executable_wrapper(self.task)
submit_file_content.append(f"executable = {os.path.basename(executable_file)}")
# Specify additional settings
general_settings = get_setting("htcondor_settings", dict())
try:
general_settings.update(self.task.htcondor_settings)
except AttributeError:
pass
transfer_files = get_setting("transfer_files", task=self.task, default=[])
if transfer_files:
working_dir = get_setting("working_dir", task=self.task, default="")
if not working_dir or working_dir != ".":
raise ValueError("If using transfer_files, the working_dir must be explicitely set to '.'")
general_settings.setdefault("should_transfer_files", "YES")
general_settings.setdefault("when_to_transfer_output", "ON_EXIT")
transfer_files = set(transfer_files)
for transfer_file in transfer_files:
if os.path.abspath(transfer_file) != transfer_file:
raise ValueError(f"You should only give absolute file names in transfer_files! {os.path.abspath(transfer_file)} != {transfer_file}")
env_setup_script = get_setting("env_script", task=self.task, default="")
if env_setup_script:
def _build_gbasf2_submit_command(self):
"""
Function to create the gbasf2 submit command to pass to run_with_gbasf2
from the task options and attributes.
"""
gbasf2_release = get_setting("gbasf2_release", default=get_basf2_git_hash(), task=self.task)
gbasf2_additional_files = get_setting("gbasf2_additional_files", default=[], task=self.task)
assert not isinstance(gbasf2_additional_files, str), "gbasf2_additional_files should be a list or tuple, not a string."
gbasf2_input_sandbox_files = [os.path.basename(self.pickle_file_path)] + gbasf2_additional_files
gbasf2_command_str = (f"gbasf2 {self.wrapper_file_path} -f {' '.join(gbasf2_input_sandbox_files)} " +
f"-p {self.gbasf2_project_name} -s {gbasf2_release} ")
gbasf2_input_dataset = get_setting("gbasf2_input_dataset", default=False, task=self.task)
if gbasf2_input_dataset is not False:
gbasf2_command_str += f" -i {gbasf2_input_dataset} "
gbasf2_n_repition_jobs = get_setting("gbasf2_n_repition_job", default=False, task=self.task)
if gbasf2_n_repition_jobs is not False:
gbasf2_command_str += f" --repetition {gbasf2_n_repition_jobs} "
# now add some additional optional options to the gbasf2 job submission string
# whether to ask user for confirmation before submitting job
force_submission = get_setting("gbasf2_force_submission", default=True, task=self.task)
if force_submission:
gbasf2_command_str += " --force "
# estimated cpu time per sub-job in minutes
cpu_minutes = get_setting("gbasf2_cputime", default=False, task=self.task)