Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_keyboard_interrupt(cli, cli_args, base_url, mocker, flask_app, swagger_20, workers):
# When a Schemathesis run in interrupted by keyboard or via SIGINT
endpoint = Endpoint("/success", "GET", {}, swagger_20, base_url=base_url)
if len(cli_args) == 2:
endpoint.app = flask_app
original = Case(endpoint).call_wsgi
else:
original = Case(endpoint).call
counter = 0
def mocked(*args, **kwargs):
nonlocal counter
counter += 1
if counter > 1:
# For threaded case it emulates SIGINT for the worker thread
raise KeyboardInterrupt
return original(*args, **kwargs)
if len(cli_args) == 2:
mocker.patch("schemathesis.Case.call_wsgi", wraps=mocked)
else:
mocker.patch("schemathesis.Case.call", wraps=mocked)
def test_keyboard_interrupt(cli, cli_args, base_url, mocker, flask_app, swagger_20, workers):
# When a Schemathesis run in interrupted by keyboard or via SIGINT
endpoint = Endpoint("/success", "GET", {}, swagger_20, base_url=base_url)
if len(cli_args) == 2:
endpoint.app = flask_app
original = Case(endpoint).call_wsgi
else:
original = Case(endpoint).call
counter = 0
def mocked(*args, **kwargs):
nonlocal counter
counter += 1
if counter > 1:
# For threaded case it emulates SIGINT for the worker thread
raise KeyboardInterrupt
return original(*args, **kwargs)
if len(cli_args) == 2:
mocker.patch("schemathesis.Case.call_wsgi", wraps=mocked)
else:
mocker.patch("schemathesis.Case.call", wraps=mocked)
result = cli.run(*cli_args, f"--workers={workers}")
assert result.exit_code == ExitCode.OK
def test_not_wsgi(schema):
case = Case(schema.endpoints["/api/success"]["GET"])
case.endpoint.app = None
with pytest.raises(
RuntimeError,
match="WSGI application instance is required. "
"Please, set `app` argument in the schema constructor or pass it to `call_wsgi`",
):
case.call_wsgi()