How to use the bzt.utils.dehumanize_time function in bzt

To help you get started, we’ve selected a few bzt examples, based on popular ways it is used in public projects.

Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.

github Blazemeter / taurus / bzt / modules / python / generators.py View on Github external
def gen_impl_wait(self, timeout, indent=None):
        return self.gen_statement("self.driver.implicitly_wait(%s)" % dehumanize_time(timeout), indent=indent)
github Blazemeter / taurus / bzt / modules / python / generators.py View on Github external
def _extract_named_args(self, req):
        named_args = OrderedDict()

        no_target = self._access_method() != ApiritifScriptGenerator.ACCESS_TARGET
        if req.timeout is not None:
            named_args['timeout'] = dehumanize_time(req.timeout)
        elif "timeout" in self.scenario and no_target:
            named_args['timeout'] = dehumanize_time(self.scenario.get("timeout"))

        follow_redirects = req.priority_option('follow-redirects', None)
        if follow_redirects is not None:
            named_args['allow_redirects'] = follow_redirects

        headers = {}
        headers.update(self.scenario.get("headers"))
        headers.update(req.headers)

        if headers:
            named_args['headers'] = self.gen_expr(headers)

        merged_headers = dict([(key.lower(), value) for key, value in iteritems(headers)])
        content_type = merged_headers.get("content-type")
github Blazemeter / taurus / bzt / modules / ab.py View on Github external
def startup(self):
        args = [self.tool.tool_path]
        load = self.get_load()
        load_iterations = load.iterations or 1
        load_concurrency = load.concurrency or 1

        if load.hold:
            hold = int(ceil(dehumanize_time(load.hold)))
            args += ['-t', str(hold)]
        else:
            args += ['-n', str(load_iterations * load_concurrency)]  # ab waits for total number of iterations

        timeout = self.get_scenario().get("timeout", None)
        if timeout:
            args += ['-s', str(ceil(dehumanize_time(timeout)))]

        args += ['-c', str(load_concurrency)]
        args += ['-d']  # do not print 'Processed *00 requests' every 100 requests or so
        args += ['-r']  # do not crash on socket level errors

        if self.tool.version and LooseVersion(self.tool.version) >= LooseVersion("2.4.7"):
            args += ['-l']  # accept variable-len responses

        args += ['-g', str(self._tsv_file)]  # dump stats to TSV file
github Blazemeter / taurus / bzt / modules / apiritif / generator.py View on Github external
def _gen_wait_sleep_mngr(self, atype, tag, param, selectors):
        elements = []
        mode = "visibility" if param == 'visible' else 'presence'

        if atype == 'wait':
            exc = TaurusConfigError("wait action requires timeout in scenario: \n%s" % self.scenario)
            timeout = dehumanize_time(self.scenario.get("timeout", exc))
            locator_type = list(selectors[0].keys())[0]
            locator_value = selectors[0][locator_type]
            errmsg = "Element %r:%r failed to appear within %ss" % (locator_type, locator_value,
                                                                    timeout)

            elements.append(self._gen_get_locators("var_loc_wait", selectors))

            elements.append(ast_call(
                func=ast_attr(
                    fields=(
                        ast_call(
                            func="WebDriverWait",
                            args=[
                                ast_attr("self.driver"),
                                ast.Num(timeout)]),
                        "until")),
github Blazemeter / taurus / bzt / modules / external.py View on Github external
def prepare(self):
        super(ExternalResultsLoader, self).prepare()
        self._read_options()
        assert self._data_file_pattern or self.data_file, "Option is required: data-file or data-file-pattern"
        self.label = self.data_file
        if self.errors_file:
            self.errors_file = self.engine.find_file(self.errors_file)

        str_wait = self.execution.get("wait-for-file", self.settings.get("wait-for-file", self._file_exists_wait))
        self._file_exists_wait = dehumanize_time(str_wait)

        def_timout = self.engine.check_interval * 10
        str_to = self.execution.get("results-timeout", self.settings.get("results-timeout", def_timout))
        self._result_timeout = dehumanize_time(str_to)

        self._file_check_ts = time.time()
        self._try_make_reader()
github Blazemeter / taurus / bzt / modules / monitoring.py View on Github external
def connect(self):
        exc = TaurusConfigError('Metric is required in Local monitoring client')
        metric_names = self.config.get('metrics', exc)

        bad_list = set(metric_names) - set(self.AVAILABLE_METRICS)
        if bad_list:
            self.log.warning('Wrong metrics found: %s', bad_list)

        good_list = set(metric_names) & set(self.AVAILABLE_METRICS)
        if not good_list:
            raise exc

        self.metrics = list(set(good_list))

        self.monitor = LocalMonitor(self.log, self.metrics, self.engine)
        self.interval = dehumanize_time(self.config.get("interval", self.engine.check_interval))

        if self.config.get("logging", False):
            if not PY3:
                self.log.warning("Logging option doesn't work on python2.")
            else:
                self.logs_file = self.engine.create_artifact("local_monitoring_logs", ".csv")
                with open(self.logs_file, "a", newline='') as mon_logs:
                    logs_writer = csv.writer(mon_logs, delimiter=',')
                    metrics = ['ts'] + sorted([metric for metric in good_list])
                    logs_writer.writerow(metrics)
github Blazemeter / taurus / bzt / modules / gatling.py View on Github external
def _get_scenario_props(self):
        props = {}
        scenario = self.get_scenario()
        timeout = scenario.get('timeout', None)
        if timeout is not None:
            props['gatling.http.ahc.requestTimeout'] = int(dehumanize_time(timeout) * 1000)

        if scenario.get('keepalive', True):
            # gatling <= 2.2.0
            props['gatling.http.ahc.allowPoolingConnections'] = 'true'
            props['gatling.http.ahc.allowPoolingSslConnections'] = 'true'
            # gatling > 2.2.0
            props['gatling.http.ahc.keepAlive'] = 'true'
        else:
            # gatling <= 2.2.0
            props['gatling.http.ahc.allowPoolingConnections'] = 'false'
            props['gatling.http.ahc.allowPoolingSslConnections'] = 'false'
            # gatling > 2.2.0
            props['gatling.http.ahc.keepAlive'] = 'false'
        return props
github Blazemeter / taurus / bzt / modules / passfail.py View on Github external
def __init__(self, config, owner):
        self.owner = owner
        self.config = config
        self.agg_buffer = OrderedDict()
        if not 'threshold' in config:
            raise TaurusConfigError("Criteria string is malformed in its threshold part.")
        self.percentage = str(config['threshold']).endswith('%')
        if not 'subject' in config:
            raise TaurusConfigError("Criteria string is malformed in its subject part.")
        if config['subject'] == 'bytes':
            self.threshold = get_bytes_count(config.get('threshold'))
        else:
            self.threshold = dehumanize_time(config.get('threshold'))

        self.get_value = self._get_field_functor(config['subject'], self.percentage)
        self.window_logic = config.get('logic', 'for')
        self.agg_logic = self._get_aggregator_functor(self.window_logic, config['subject'])
        if not 'condition' in config:
            raise TaurusConfigError("Criteria string is malformed in its condition part.")
        self.condition = self._get_condition_functor(config.get('condition'))
        self.stop = config.get('stop', True)
        self.fail = config.get('fail', True)
        self.message = config.get('message', None)
        self.window = dehumanize_time(config.get('timeframe', 0))
        self._start = sys.maxsize
        self._end = 0
        self.is_candidate = False
        self.is_triggered = False
github Blazemeter / taurus / bzt / modules / tsung.py View on Github external
def __gen_options(self, scenario):
        options = etree.Element("options")

        global_think_time = scenario.get_think_time()
        if global_think_time:
            think_time = int(dehumanize_time(global_think_time))
            options.append(etree.Element("option", name="thinktime", value=str(think_time), random="false"))

        global_tcp_timeout = scenario.get("timeout")
        if global_tcp_timeout:
            timeout = int(dehumanize_time(global_tcp_timeout) * 1000)
            options.append(etree.Element("option", name="connect_timeout", value=str(timeout)))

        global_max_retries = scenario.get("max-retries", 1)
        options.append(etree.Element("option", name="max_retries", value=str(global_max_retries)))
        return options
github Blazemeter / taurus / bzt / modules / python / generators.py View on Github external
def gen_think_time(self, think_time, indent=None):
        test_method = []
        if think_time is not None:
            delay = dehumanize_time(think_time)
            if delay > 0:
                test_method.append(self.gen_statement("sleep(%s)" % dehumanize_time(think_time), indent=indent))
                test_method.append(self.gen_new_line())
        return test_method