Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_same_address_port(self):
config = RESOURCES_DIR + "yaml/phantom_request_same_address.yml"
self.configure(yaml.load(open(config).read()))
self.assertRaises(TaurusConfigError, self.obj.prepare)
def test_repetition_exceptions(self):
self.configure({"execution": {
"concurrency": 2,
"ramp-up": "1h",
"scenario": {
"requests": [
"http://blazedemo.com",
"http://ya.ru"]}}})
self.obj.prepare()
self.assertEqual(len(self.obj.resource_files()), 0)
self.assertRaises(TaurusConfigError, self.obj.startup)
def _get_request_path(self, request, scenario):
parsed_url = parse.urlparse(request.url)
if not self._target.get("scheme"):
self._target["scheme"] = parsed_url.scheme
if not self._target.get("netloc"):
self._target["netloc"] = parsed_url.netloc
if parsed_url.scheme != self._target["scheme"] or parsed_url.netloc != self._target["netloc"]:
raise TaurusConfigError("Address port and host must be the same")
path = parsed_url.path
if parsed_url.query:
path += "?" + parsed_url.query
else:
if request.method == "GET" and isinstance(request.body, dict):
path += "?" + urlencode(request.body)
if not parsed_url.netloc:
parsed_url = parse.urlparse(scenario.get("default-address", ""))
self.hostname = parsed_url.netloc.split(':')[0] if ':' in parsed_url.netloc else parsed_url.netloc
self.use_ssl = parsed_url.scheme == 'https'
if parsed_url.port:
self.port = parsed_url.port
else:
self.port = 443 if self.use_ssl else 80
super(LocustIOExecutor, self).prepare()
self.stdout = open(self.engine.create_artifact("locust", ".out"), 'w')
self.stderr = open(self.engine.create_artifact("locust", ".err"), 'w')
self.install_required_tools()
self.scenario = self.get_scenario()
self.__setup_script()
self.engine.existing_artifact(self.script)
# path to taurus dir. It's necessary for bzt usage inside tools/helpers
self.env.add_path({"PYTHONPATH": get_full_path(__file__, step_up=3)})
self.is_master = self.execution.get("master", self.is_master)
if self.is_master:
count_error = TaurusConfigError("Slaves count required when starting in master mode")
self.expected_slaves = int(self.execution.get("slaves", count_error))
slaves_ldjson = self.engine.create_artifact("locust-slaves", ".ldjson")
self.reader = SlavesReader(slaves_ldjson, self.expected_slaves, self.log)
self.env.set({"SLAVES_LDJSON": slaves_ldjson})
else:
kpi_jtl = self.engine.create_artifact("kpi", ".jtl")
self.reader = JTLReader(kpi_jtl, self.log)
self.env.set({"JTL": kpi_jtl})
if isinstance(self.engine.aggregator, ConsolidatingAggregator):
self.engine.aggregator.add_underling(self.reader)
def _find_project(self, proj_name):
"""
:rtype: bzt.bza.Project
"""
if isinstance(proj_name, (int, float)): # TODO: what if it's string "123"?
proj_id = int(proj_name)
self.log.debug("Treating project name as ID: %s", proj_id)
project = self.workspaces.projects(ident=proj_id).first()
if not project:
raise TaurusConfigError("BlazeMeter project not found by ID: %s" % proj_id)
return project
elif proj_name is not None:
return self.workspaces.projects(name=proj_name).first()
return None
def _gen_jsonpath_assertions(self, request):
stmts = []
jpath_assertions = request.config.get("assert-jsonpath", [])
for idx, assertion in enumerate(jpath_assertions):
assertion = ensure_is_dict(jpath_assertions, idx, "jsonpath")
exc = TaurusConfigError('JSON Path not found in assertion: %s' % assertion)
query = assertion.get('jsonpath', exc)
expected = assertion.get('expected-value', None)
method = "assert_not_jsonpath" if assertion.get('invert', False) else "assert_jsonpath"
stmts.append(ast.Expr(
ast.Call(
func=ast.Attribute(
value=ast.Name(id="response", ctx=ast.Load()),
attr=method,
ctx=ast.Load()
),
args=[self.gen_expr(query)],
keywords=[ast.keyword(arg="expected_value", value=self.gen_expr(expected))],
starargs=None,
kwargs=None
)
))
if self.cumulative_results is None:
test_data_source = self.parameters.get("data-source", "sample-labels")
if test_data_source == "sample-labels":
if not self.last_second:
self.log.warning("No last second data to generate XUnit.xml")
else:
writer = XUnitFileWriter(self.engine)
self.process_sample_labels(writer)
writer.save_report(filename)
elif test_data_source == "pass-fail":
writer = XUnitFileWriter(self.engine)
self.process_pass_fail(writer)
writer.save_report(filename)
else:
raise TaurusConfigError("Unsupported data source: %s" % test_data_source)
else:
writer = XUnitFileWriter(self.engine)
self.process_functional(writer)
writer.save_report(filename)
self.report_file_path = filename # TODO: just for backward compatibility, remove later
"""
elements = []
authorizations = scenario.get("authorization")
if authorizations:
clear_flag = False
if isinstance(authorizations, dict):
if "clear" in authorizations or "list" in authorizations: # full form
clear_flag = authorizations.get("clear", False)
authorizations = authorizations.get("list", [])
else:
authorizations = [authorizations] # short form
if not isinstance(authorizations, list):
raise TaurusConfigError("Wrong authorization format: %s" % authorizations)
auth_manager = JMX.get_auth_manager(authorizations, clear_flag)
elements.append(auth_manager)
elements.append(etree.Element("hashTree"))
return elements
def _get_field_functor(self, subject, percentage):
if '/' not in subject:
raise TaurusConfigError("Wrong syntax for monitoring criteria subject: %s" % subject)
host = subject[:subject.index('/')]
metric = subject[subject.index('/') + 1:]
return lambda x: (x[metric] if x['source'] == host and metric in x else None)
def __init__(self, config, owner):
self.owner = owner
self.config = config
self.agg_buffer = OrderedDict()
if not 'threshold' in config:
raise TaurusConfigError("Criteria string is malformed in its threshold part.")
self.percentage = str(config['threshold']).endswith('%')
if not 'subject' in config:
raise TaurusConfigError("Criteria string is malformed in its subject part.")
if config['subject'] == 'bytes':
self.threshold = get_bytes_count(config.get('threshold'))
else:
self.threshold = dehumanize_time(config.get('threshold'))
self.get_value = self._get_field_functor(config['subject'], self.percentage)
self.window_logic = config.get('logic', 'for')
self.agg_logic = self._get_aggregator_functor(self.window_logic, config['subject'])
if not 'condition' in config:
raise TaurusConfigError("Criteria string is malformed in its condition part.")
self.condition = self._get_condition_functor(config.get('condition'))
self.stop = config.get('stop', True)
self.fail = config.get('fail', True)
self.message = config.get('message', None)
self.window = dehumanize_time(config.get('timeframe', 0))
self._start = sys.maxsize