Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
Tags VASP Fworker(s) of a Workflow; e.g. it can be used to run large-memory jobs on a separate queue
Args:
original_wf (Workflow):
tag (string): user-defined tag to be added under fw.spec._fworker (e.g. "large memory", "big", etc)
fw_name_constraint (string): name of the fireworks to be modified (all if None is passed)
Returns:
modified workflow with tagged Fworkers
"""
wf_dict = original_wf.to_dict()
for idx_fw, idx_t in get_fws_and_tasks(original_wf, fw_name_constraint=fw_name_constraint,
task_name_constraint="RunVasp"):
wf_dict["fws"][idx_fw]["spec"]["_fworker"] = tag
return Workflow.from_dict(wf_dict)
def use_scratch_dir(original_wf, scratch_dir):
"""
For all RunVaspCustodian tasks, add the desired scratch dir.
:param original_wf:
:param scratch_dir: The scratch dir to use. Supports env_chk
"""
wf_dict = original_wf.to_dict()
for idx_fw, idx_t in get_fws_and_tasks(original_wf, task_name_constraint="RunVaspCustodian"):
wf_dict["fws"][idx_fw]["spec"]["_tasks"][idx_t]["scratch_dir"] = scratch_dir
return Workflow.from_dict(wf_dict)
Every FireWork begins by writing an empty file with the name
"FW--". This makes it easy to figure out what jobs are in what
launcher directories, e.g. "ls -l launch*/FW--*" from within a "block" dir.
Args:
original_wf (Workflow)
use_slug (bool): whether to replace whitespace-type chars with a slug
"""
wf_dict = original_wf.to_dict()
for idx, fw in enumerate(wf_dict["fws"]):
fname = "FW--{}".format(fw["name"])
if use_slug:
fname = get_slug(fname)
wf_dict["fws"][idx]["spec"]["_tasks"].insert(0, FileWriteTask(
files_to_write=[{"filename": fname, "contents": ""}]).to_dict())
return Workflow.from_dict(wf_dict)
Args:
original_wf (Workflow)
ref_dirs (dict): key=firework name, value=path to the reference vasp calculation directory
params_to_check (list): optional list of incar parameters to check.
"""
if not params_to_check:
params_to_check = ["ISPIN", "ENCUT", "ISMEAR", "SIGMA", "IBRION", "LORBIT", "NBANDS", "LMAXMIX"]
wf_dict = original_wf.to_dict()
for idx_fw, fw in enumerate(original_wf.fws):
for job_type in ref_dirs.keys():
if job_type in fw.name:
for idx_t, t in enumerate(fw.tasks):
if "RunVasp" in str(t):
wf_dict["fws"][idx_fw]["spec"]["_tasks"][idx_t] = \
RunVaspFake(ref_dir=ref_dirs[job_type], params_to_check=params_to_check).to_dict()
return Workflow.from_dict(wf_dict)
def use_scratch_dir(original_wf, scratch_dir):
"""
For all RunVaspCustodian tasks, add the desired scratch dir.
:param original_wf:
:param scratch_dir: The scratch dir to use. Supports env_chk
"""
wf_dict = original_wf.to_dict()
for idx_fw, idx_t in get_fws_and_tasks(original_wf, task_name_constraint="RunVaspCustodian"):
wf_dict["fws"][idx_fw]["spec"]["_tasks"][idx_t]["scratch_dir"] = scratch_dir
return Workflow.from_dict(wf_dict)
# FW metadata
for idx_fw in range(len(original_wf.fws)):
if "tags" in wf_dict["fws"][idx_fw]["spec"]:
wf_dict["fws"][idx_fw]["spec"]["tags"].extend(tags_list)
else:
wf_dict["fws"][idx_fw]["spec"]["tags"] = tags_list
# DB insertion tasks
for constraint in ["VaspToDbTask", "BoltztrapToDbTask"]:
for idx_fw, idx_t in get_fws_and_tasks(original_wf, task_name_constraint=constraint):
if "tags" in wf_dict["fws"][idx_fw]["spec"]["_tasks"][idx_t]["additional_fields"]:
wf_dict["fws"][idx_fw]["spec"]["_tasks"][idx_t]["additional_fields"]["tags"].extend(tags_list)
else:
wf_dict["fws"][idx_fw]["spec"]["_tasks"][idx_t]["additional_fields"]["tags"] = tags_list
return Workflow.from_dict(wf_dict)
"""
Replaces all tasks with "RunVasp*" (e.g. RunVaspCustodian) to be
RunVaspDirect.
Args:
original_wf (Workflow): original workflow
fw_name_constraint (str): Only apply changes to FWs where fw_name
contains this substring.
"""
wf_dict = original_wf.to_dict()
vasp_fws_and_tasks = get_fws_and_tasks(original_wf, fw_name_constraint=fw_name_constraint,
task_name_constraint="RunVasp")
for idx_fw, idx_t in vasp_fws_and_tasks:
vasp_cmd = wf_dict["fws"][idx_fw]["spec"]["_tasks"][idx_t]["vasp_cmd"]
wf_dict["fws"][idx_fw]["spec"]["_tasks"][idx_t] = RunVaspDirect(vasp_cmd=vasp_cmd).to_dict()
return Workflow.from_dict(wf_dict)
modify_incar_params = modify_incar_params or {"incar_update": {"LSORBIT": "T", "NBANDS": nbands, "MAGMOM": magmom,
"ISPIN": 1, "LMAXMIX": 4, "ISYM": 0}}
for idx_fw, idx_t in get_fws_and_tasks(original_wf, fw_name_constraint=fw_name_constraint,
task_name_constraint="RunVasp"):
if "nscf" in wf_dict["fws"][idx_fw]["name"]:
wf_dict["fws"][idx_fw]["spec"]["_tasks"][idx_t]["vasp_cmd"] = ">>vasp_ncl<<"
wf_dict["fws"][idx_fw]["spec"]["_tasks"].insert(idx_t, ModifyIncar(**modify_incar_params).to_dict())
wf_dict["fws"][idx_fw]["name"] += " soc"
for idx_fw, idx_t in get_fws_and_tasks(original_wf, fw_name_constraint=fw_name_constraint,
task_name_constraint="RunBoltztrap"):
wf_dict["fws"][idx_fw]["name"] += " soc"
return Workflow.from_dict(wf_dict)
def add_additional_fields_to_taskdocs(original_wf, update_dict=None):
"""
For all VaspToDbTasks in a given workflow, add information
to "additional_fields" to be placed in the task doc.
Args:
original_wf (Workflow)
update_dict (Dict): dictionary to add additional_fields
"""
wf_dict = original_wf.to_dict()
for idx_fw, idx_t in get_fws_and_tasks(original_wf, task_name_constraint="VaspToDbTask"):
wf_dict["fws"][idx_fw]["spec"]["_tasks"][idx_t]["additional_fields"].update(update_dict)
return Workflow.from_dict(wf_dict)
Args:
original_wf (Workflow)
tracked_files (list) : list of files to be tracked
nlines (int): number of lines at the end of files to be tracked
"""
if tracked_files is None:
tracked_files = ["OUTCAR", "OSZICAR"]
trackers = [Tracker(f, nlines=nlines, allow_zipped=True) for f in tracked_files]
wf_dict = original_wf.to_dict()
for idx_fw, idx_t in get_fws_and_tasks(original_wf, task_name_constraint="RunVasp"):
if "_trackers" in wf_dict["fws"][idx_fw]["spec"]:
wf_dict["fws"][idx_fw]["spec"]["_trackers"].extend(trackers)
else:
wf_dict["fws"][idx_fw]["spec"]["_trackers"] = trackers
return Workflow.from_dict(wf_dict)