Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def expand_spec_no_study(filepath, override_vars=None):
"""
Get the expanded text of a spec without creating
a MerlinStudy. Expansion is limited to user variables
(the ones defined inside the yaml spec or at the command
line).
"""
error_override_vars(override_vars, filepath)
spec = MerlinSpec.load_specification(filepath)
full_spec = dump_with_overrides(spec, override_vars)
spec = MerlinSpec.load_spec_from_string(full_spec)
uvars = []
if "variables" in spec.environment:
uvars.append(spec.environment["variables"])
if "labels" in spec.environment:
uvars.append(spec.environment["labels"])
evaluated_uvars = determine_user_variables(*uvars)
return expand_by_line(full_spec, evaluated_uvars)
def write_expanded_spec(self, dest):
"""
Write a new yaml spec file with defaults and variable expansions.
Useful for provenance.
:param `dest`: destination for fully expanded yaml file
"""
# specification text including defaults and overridden user variables
full_spec = dump_with_overrides(self.spec, self.override_vars)
with open(dest, "w") as dumped_file:
dumped_file.write(full_spec)
# update spec so that user_vars update will be accurate
self.spec = MerlinSpec.load_specification(dest)
# expand user variables
self.write_expand_by_line(dest, self.user_vars)
# expand reserved words
self.write_expand_by_line(dest, self.special_vars)
def expanded_spec(self):
"""
Determines, writes to yaml, and loads into memory an expanded
specification.
"""
# Write expanded yaml spec
self.expanded_filepath = os.path.join(
self.info, self.spec.name.replace(" ", "_") + ".yaml"
)
# If we are restarting, we don't need to re-expand, just need to read
# in the previously expanded spec
if self.restart_dir is None:
self.write_expanded_spec(self.expanded_filepath)
return MerlinSpec.load_specification(
self.expanded_filepath, suppress_warning=False
)
def stop_workers(args):
"""
CLI command for stopping all workers.
:param `args`: parsed CLI arguments
"""
print(banner_small)
worker_names = []
if args.spec:
spec_path = verify_filepath(args.spec)
spec = MerlinSpec.load_specification(spec_path)
worker_names = spec.get_worker_names()
for worker_name in worker_names:
if "$" in worker_name:
LOG.warning(
f"Worker '{worker_name}' is unexpanded. Target provenance spec instead?"
)
router.stop_workers(args.task_server, worker_names, args.queues, args.workers)
def get_adapter_config(self, override_type=None):
spec = MerlinSpec.load_specification(self.spec.path)
adapter_config = dict(spec.batch)
if "type" not in adapter_config.keys():
adapter_config["type"] = "local"
# The type may be overriden, preserve the batch type
adapter_config["batch_type"] = adapter_config["type"]
if override_type is not None:
adapter_config["type"] = override_type
# if a dry run was ordered by the yaml spec OR the cli flag, do a dry run.
adapter_config["dry_run"] = self.dry_run or adapter_config["dry_run"]
# Add the version if using flux to switch the command in the step
if adapter_config["batch_type"] == "flux":