Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_create_scenario_queue_with_no_list_ng(self):
"""
Invalid scenario.yml
"""
os.makedirs(self._pj_dir)
pj_yaml_dict = {"scenario": {"arguments", "spam"}}
with open(self._pj_scenario_file, "w") as f:
f.write(yaml.dump(pj_yaml_dict, default_flow_style=False))
with pytest.raises(ScenarioFileInvalid) as excinfo:
manager = YamlScenarioManager(self._cmd_args)
manager.create_scenario_queue()
shutil.rmtree(self._pj_dir)
assert "invalid" in str(excinfo.value)
with open(self._pj_scenario_file, "w") as f:
f.write(yaml.dump(pj_yaml_dict, default_flow_style=False))
cmn_yaml_dict = {
"scenario": [
{
"arguments": {"retry_count": 10},
"class": "SftpDownload",
"step": "sftp_download",
}
]
}
with open(self._cmn_scenario_file, "w") as f:
f.write(yaml.dump(cmn_yaml_dict, default_flow_style=False))
with pytest.raises(ScenarioFileInvalid) as excinfo:
parser = YamlScenarioParser(self._pj_scenario_file, self._cmn_scenario_file)
yaml_scenario = parser.parse()
any("step" in y for y in yaml_scenario)
shutil.rmtree(self._pj_dir)
shutil.rmtree(self._cmn_dir)
assert "invalid" in str(excinfo.value)
pj_yaml = {
"scenario": [
{
"step": "spam",
"class": "HttpDownload",
"arguments": {
"auth": {"form_id": "spam"},
"src_url": "https://spam/",
},
}
]
}
with open(self._pj_scenario_file, "w") as f:
f.write(yaml.dump(pj_yaml, default_flow_style=False))
with pytest.raises(ScenarioFileInvalid) as excinfo:
manager = YamlScenarioManager(self._cmd_args)
manager.create_scenario_queue()
shutil.rmtree(self._pj_dir)
assert "class: is not specified" in str(excinfo.value)
def test_scenario_yaml_key_ng_with_no_scenario_key(self):
"""
scenario.yml essential key does not exist.
"""
os.makedirs(self._pj_dir)
test_data = {"spam": ""}
with open(self._scenario_file, "w") as f:
f.write(yaml.dump(test_data, default_flow_style=False))
pj_f = open(self._scenario_file, "r")
pj_yaml_dict = yaml.safe_load(pj_f)
with pytest.raises(ScenarioFileInvalid) as excinfo:
valid_instance = ScenarioYamlKey(pj_yaml_dict)
valid_instance()
shutil.rmtree(self._pj_dir)
assert "scenario" in str(excinfo.value)
def __replace_vars(self, yaml_v, var_name):
"""
Replace {{ var }} in string
Args:
yaml_v: replace target value
var_name: means {{ var }} itself
"""
cmd = self._dynamic_key_and_val[var_name]
if not cmd:
raise ScenarioFileInvalid(
"scenario.yml is invalid. 'with_vars' definition against %s does not exist."
% var_name
)
shell_output = subprocess.Popen(
cmd, stdout=subprocess.PIPE, shell=True
).communicate()[0]
shell_output = shell_output.strip()
# remove head byte string
shell_output = re.sub("^b", "", str(shell_output))
# remove '
shell_output = re.sub("'", "", str(shell_output))
return re.sub(r"{{(.*?)}}", shell_output, yaml_v)
def __exists_step(self, dict):
if "step" not in dict.keys():
raise ScenarioFileInvalid(
"scenario.yml is invalid. 'step:' does not exist."
)
def __call__(self):
if self.__io != "output":
raise ScenarioFileInvalid("io: output is not specified.")
def __call__(self):
"""
Validate parsed scenario.yml instance type
"""
yaml_dict = self._val
if not isinstance(yaml_dict, dict):
raise ScenarioFileInvalid(
"scenario.yml is invalid. Check scenario.yml format."
)
def __call__(self):
if not (self.__di_params):
raise ScenarioFileInvalid(
"Dependeny Injection parameters are essential after %s" % self.__di_key
)
if not (self.__di_params.get("class")):
raise ScenarioFileInvalid(
"class: is not specified after %s" % self.__di_key
)