Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def gen_impl_wait(self, timeout, indent=None):
return self.gen_statement("self.driver.implicitly_wait(%s)" % dehumanize_time(timeout), indent=indent)
def _extract_named_args(self, req):
named_args = OrderedDict()
no_target = self._access_method() != ApiritifScriptGenerator.ACCESS_TARGET
if req.timeout is not None:
named_args['timeout'] = dehumanize_time(req.timeout)
elif "timeout" in self.scenario and no_target:
named_args['timeout'] = dehumanize_time(self.scenario.get("timeout"))
follow_redirects = req.priority_option('follow-redirects', None)
if follow_redirects is not None:
named_args['allow_redirects'] = follow_redirects
headers = {}
headers.update(self.scenario.get("headers"))
headers.update(req.headers)
if headers:
named_args['headers'] = self.gen_expr(headers)
merged_headers = dict([(key.lower(), value) for key, value in iteritems(headers)])
content_type = merged_headers.get("content-type")
def startup(self):
args = [self.tool.tool_path]
load = self.get_load()
load_iterations = load.iterations or 1
load_concurrency = load.concurrency or 1
if load.hold:
hold = int(ceil(dehumanize_time(load.hold)))
args += ['-t', str(hold)]
else:
args += ['-n', str(load_iterations * load_concurrency)] # ab waits for total number of iterations
timeout = self.get_scenario().get("timeout", None)
if timeout:
args += ['-s', str(ceil(dehumanize_time(timeout)))]
args += ['-c', str(load_concurrency)]
args += ['-d'] # do not print 'Processed *00 requests' every 100 requests or so
args += ['-r'] # do not crash on socket level errors
if self.tool.version and LooseVersion(self.tool.version) >= LooseVersion("2.4.7"):
args += ['-l'] # accept variable-len responses
args += ['-g', str(self._tsv_file)] # dump stats to TSV file
def _gen_wait_sleep_mngr(self, atype, tag, param, selectors):
elements = []
mode = "visibility" if param == 'visible' else 'presence'
if atype == 'wait':
exc = TaurusConfigError("wait action requires timeout in scenario: \n%s" % self.scenario)
timeout = dehumanize_time(self.scenario.get("timeout", exc))
locator_type = list(selectors[0].keys())[0]
locator_value = selectors[0][locator_type]
errmsg = "Element %r:%r failed to appear within %ss" % (locator_type, locator_value,
timeout)
elements.append(self._gen_get_locators("var_loc_wait", selectors))
elements.append(ast_call(
func=ast_attr(
fields=(
ast_call(
func="WebDriverWait",
args=[
ast_attr("self.driver"),
ast.Num(timeout)]),
"until")),
def prepare(self):
super(ExternalResultsLoader, self).prepare()
self._read_options()
assert self._data_file_pattern or self.data_file, "Option is required: data-file or data-file-pattern"
self.label = self.data_file
if self.errors_file:
self.errors_file = self.engine.find_file(self.errors_file)
str_wait = self.execution.get("wait-for-file", self.settings.get("wait-for-file", self._file_exists_wait))
self._file_exists_wait = dehumanize_time(str_wait)
def_timout = self.engine.check_interval * 10
str_to = self.execution.get("results-timeout", self.settings.get("results-timeout", def_timout))
self._result_timeout = dehumanize_time(str_to)
self._file_check_ts = time.time()
self._try_make_reader()
def connect(self):
exc = TaurusConfigError('Metric is required in Local monitoring client')
metric_names = self.config.get('metrics', exc)
bad_list = set(metric_names) - set(self.AVAILABLE_METRICS)
if bad_list:
self.log.warning('Wrong metrics found: %s', bad_list)
good_list = set(metric_names) & set(self.AVAILABLE_METRICS)
if not good_list:
raise exc
self.metrics = list(set(good_list))
self.monitor = LocalMonitor(self.log, self.metrics, self.engine)
self.interval = dehumanize_time(self.config.get("interval", self.engine.check_interval))
if self.config.get("logging", False):
if not PY3:
self.log.warning("Logging option doesn't work on python2.")
else:
self.logs_file = self.engine.create_artifact("local_monitoring_logs", ".csv")
with open(self.logs_file, "a", newline='') as mon_logs:
logs_writer = csv.writer(mon_logs, delimiter=',')
metrics = ['ts'] + sorted([metric for metric in good_list])
logs_writer.writerow(metrics)
def _get_scenario_props(self):
props = {}
scenario = self.get_scenario()
timeout = scenario.get('timeout', None)
if timeout is not None:
props['gatling.http.ahc.requestTimeout'] = int(dehumanize_time(timeout) * 1000)
if scenario.get('keepalive', True):
# gatling <= 2.2.0
props['gatling.http.ahc.allowPoolingConnections'] = 'true'
props['gatling.http.ahc.allowPoolingSslConnections'] = 'true'
# gatling > 2.2.0
props['gatling.http.ahc.keepAlive'] = 'true'
else:
# gatling <= 2.2.0
props['gatling.http.ahc.allowPoolingConnections'] = 'false'
props['gatling.http.ahc.allowPoolingSslConnections'] = 'false'
# gatling > 2.2.0
props['gatling.http.ahc.keepAlive'] = 'false'
return props
def __init__(self, config, owner):
self.owner = owner
self.config = config
self.agg_buffer = OrderedDict()
if not 'threshold' in config:
raise TaurusConfigError("Criteria string is malformed in its threshold part.")
self.percentage = str(config['threshold']).endswith('%')
if not 'subject' in config:
raise TaurusConfigError("Criteria string is malformed in its subject part.")
if config['subject'] == 'bytes':
self.threshold = get_bytes_count(config.get('threshold'))
else:
self.threshold = dehumanize_time(config.get('threshold'))
self.get_value = self._get_field_functor(config['subject'], self.percentage)
self.window_logic = config.get('logic', 'for')
self.agg_logic = self._get_aggregator_functor(self.window_logic, config['subject'])
if not 'condition' in config:
raise TaurusConfigError("Criteria string is malformed in its condition part.")
self.condition = self._get_condition_functor(config.get('condition'))
self.stop = config.get('stop', True)
self.fail = config.get('fail', True)
self.message = config.get('message', None)
self.window = dehumanize_time(config.get('timeframe', 0))
self._start = sys.maxsize
self._end = 0
self.is_candidate = False
self.is_triggered = False
def __gen_options(self, scenario):
options = etree.Element("options")
global_think_time = scenario.get_think_time()
if global_think_time:
think_time = int(dehumanize_time(global_think_time))
options.append(etree.Element("option", name="thinktime", value=str(think_time), random="false"))
global_tcp_timeout = scenario.get("timeout")
if global_tcp_timeout:
timeout = int(dehumanize_time(global_tcp_timeout) * 1000)
options.append(etree.Element("option", name="connect_timeout", value=str(timeout)))
global_max_retries = scenario.get("max-retries", 1)
options.append(etree.Element("option", name="max_retries", value=str(global_max_retries)))
return options
def gen_think_time(self, think_time, indent=None):
test_method = []
if think_time is not None:
delay = dehumanize_time(think_time)
if delay > 0:
test_method.append(self.gen_statement("sleep(%s)" % dehumanize_time(think_time), indent=indent))
test_method.append(self.gen_new_line())
return test_method