Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
@pytest.mark.asyncio
async def test_crud_virtuoso(virtuoso_client):
await virtuoso_client.put(sample_data, format=sample_format)
await virtuoso_client.delete()
await virtuoso_client.post(sample_data, format=sample_format)
async with virtuoso_client.get(format=sample_format) as res:
assert res.status == 200
text = await res.text()
assert "@prefix" in text
@pytest.mark.asyncio
@pytest.mark.parametrize(
"url,method,status,action,args,kwargs",
[
("/api/users", "GET", 200, "list", None, {}),
("/api/users", "POST", 201, "create", None, {"body": {"success": True}}),
("/api/users/2", "GET", 200, "retrieve", 2, {"body": {"success": True}}),
("/api/users/2", "PUT", 200, "update", 2, {"body": {"success": True}}),
("/api/users/2", "PATCH", 200, "partial_update", 2, {"body": {"success": True}}),
("/api/users/2", "DELETE", 204, "destroy", 2, {"body": {"success": True}}),
],
)
async def test_reqres_async_api_users_actions(
httpserver, url, method, status, action, args, kwargs, reqres_async_api
):
httpserver.expect_request(url, method=method).respond_with_json({"success": True}, status=status)
@pytest.mark.asyncio
@pytest.mark.parametrize("old_game_state, new_game_state, expected_calls", [
# not owned -> not owned
([], [], [])
# not owned -> owned
, ([], [_db_owned_game(_GAME_ID, _GAME_TITLE)], ["add"])
# owned -> owned
, (
[_db_owned_game(_GAME_ID, _GAME_TITLE)]
, [_db_owned_game(_GAME_ID, _GAME_TITLE)]
, []
)
# owned -> not owned
, ([_db_owned_game(_GAME_ID, _GAME_TITLE)], [], ["remove"])
])
async def test_owned_game_update(
old_game_state
import asyncio
import uuid
import pytest
import aiormq
pytestmark = pytest.mark.asyncio
async def test_simple(amqp_channel: aiormq.Channel):
await amqp_channel.basic_qos(prefetch_count=1)
assert amqp_channel.number
queue = asyncio.Queue()
deaclare_ok = await amqp_channel.queue_declare(auto_delete=True)
consume_ok = await amqp_channel.basic_consume(deaclare_ok.queue, queue.put)
await amqp_channel.basic_publish(
b"foo",
routing_key=deaclare_ok.queue,
properties=aiormq.spec.Basic.Properties(message_id="123"),
)
@pytest.mark.asyncio
@pytest.mark.parametrize(
'strategy', [Strategies.default, Strategies.latest, Strategies.any_flavour]
)
async def test_resolve_curse_simple_pkgs(manager, request, strategy):
results = await manager.resolve(
[
Defn('curse', 'tomcats', strategy),
Defn('curse', 'mythic-dungeon-tools', strategy),
Defn('curse', 'classiccastbars', strategy),
Defn('curse', 'elkbuffbars', strategy),
]
)
separate, retail_only, classic_only, flavour_explosion = results.values()
assert isinstance(separate, Pkg)
if manager.config.is_classic:
@pytest.mark.asyncio
async def test_get_info(prepare_kernel, get_headers):
app, client, create_kernel = prepare_kernel
kernel_info = await create_kernel()
url = '/v3/kernel/' + kernel_info['kernelId']
req_bytes = json.dumps({}).encode()
headers = get_headers('GET', url, req_bytes)
ret = await client.get(url, data=req_bytes, headers=headers)
assert ret.status == 200
rsp_json = await ret.json()
assert rsp_json['image'] == 'lablup/lua:5.3-alpine'
@pytest.mark.asyncio
async def test_async_example_job():
with mock.patch('examples.jobs.logger') as mock_logger:
await async_example_job()
assert mock_logger.warning.called
@pytest.mark.asyncio
async def test_httpx_exception_handling(client):
async with respx.HTTPXMock() as httpx_mock:
with asynctest.mock.patch(
"httpx._client.AsyncClient.dispatcher_for_url",
side_effect=ValueError("mock"),
):
url = "https://foo.bar/"
request = httpx_mock.get(url)
with pytest.raises(ValueError):
await client.get(url)
assert request.called is True
assert httpx_mock.stats.call_count == 1
_request, _response = httpx_mock.calls[-1]
assert _request is not None
assert _response is None
@pytest.mark.asyncio
def test_single_channel(pn):
chan = 'unique_asyncio'
envelope = yield from pn.publish().channel(chan).message('bla').future()
time = envelope.result.timetoken - 10
envelope = yield from pn.message_counts().channel(chan).channel_timetokens([time]).future()
assert(isinstance(envelope, AsyncioEnvelope))
assert not envelope.status.is_error()
assert envelope.result.channels[chan] == 1
assert isinstance(envelope.result, PNMessageCountResult)
assert isinstance(envelope.status, PNStatus)
@pytest.mark.asyncio
@pytest.mark.ttftt_engine(
name="coercion",
resolvers={"Query.nonNullBooleanField": resolve_unwrapped_field},
)
@pytest.mark.parametrize(
"query,variables,expected",
[
(
"""query { nonNullBooleanField }""",
None,
{
"data": None,
"errors": [
{
"message": "Missing mandatory argument < param > in field < Query.nonNullBooleanField >.",
"path": ["nonNullBooleanField"],