Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def test_thread_sensitive_outside_async():
"""
Tests that thread_sensitive SyncToAsync where the outside is async code runs
in a single, separate thread.
"""
result_1 = {}
result_2 = {}
# Outer sync function
def outer(result):
middle(result)
outer = sync_to_async(outer, thread_sensitive=True)
# Middle async function
@async_to_sync
async def middle(result):
await inner(result)
# Inner sync function
def inner(result):
result["thread"] = threading.current_thread()
inner = sync_to_async(inner, thread_sensitive=True)
# Run it (in supposed parallel!)
await asyncio.wait([outer(result_1), inner(result_2)])
# They should not have run in the main thread, but in the same thread
async def test_websocket_consumer_connect_user_ip(
headers, client_address, expected, tracked_requests
):
async with app_with_scout() as app:
communicator = ApplicationCommunicator(
app,
asgi_websocket_scope(
path="/basic-ws/", headers=headers, client=(client_address, None)
),
)
await communicator.send_input({"type": "websocket.connect"})
response = await communicator.receive_output()
await communicator.wait(timeout=0.001)
assert response["type"] == "websocket.accept"
assert tracked_requests[0].tags["user_ip"] == expected
gc.collect()
# Trigger the local creation outside
e1.set()
e2.wait()
# New Locals should be empty
matched = len(
[local for local in locals if getattr(local, "foo", None) == "bar"]
)
t = threading.Thread(target=f)
t.start()
e1.wait()
# Creates locals outside of the inner thread
locals = [Local() for i in range(100)]
e2.set()
t.join()
assert matched == 0
def get_last_autoupdate(self, user=None):
"""
Get the last autoupdate as (changed_data, deleted_element_ids) for the given user.
changed_elements is a dict with element_ids as keys and the actual element as value
user_id=None if for full data, 0 for the anonymous and regular ids for users.
"""
user_id = None if user is None else user.id
current_change_id = async_to_sync(element_cache.get_current_change_id)()
_changed_elements, deleted_element_ids = async_to_sync(
element_cache.get_data_since
)(user_id=user_id, change_id=current_change_id)
changed_elements = {}
for collection, elements in _changed_elements.items():
for element in elements:
changed_elements[get_element_id(collection, element["id"])] = element
return (changed_elements, deleted_element_ids)
result['aborted'] = str(job_object.tests.filter(status=6).count())
return result
if created:
channel_layer = get_channel_layer()
async_to_sync(channel_layer.group_send)(
"job_tests_details" + "-" + instance.job.uuid,
{
"type": "message",
"message": data()
}
)
if instance:
channel_layer = get_channel_layer()
async_to_sync(channel_layer.group_send)(
"job_tests_details" + "-" + instance.job.uuid,
{
"type": "message",
"message": data()
}
async def test_trackingdata_notification(communicator):
# Disable Shipment post-save signal
await sync_to_async(models.signals.post_save.disconnect)(sender=Shipment, dispatch_uid='shipment_post_save')
shipment, _ = await sync_to_async(Shipment.objects.get_or_create)(
id='FAKE_SHIPMENT_ID',
owner_id=USER_ID,
storage_credentials_id='FAKE_STORAGE_CREDENTIALS_ID',
shipper_wallet_id='FAKE_SHIPPER_WALLET_ID',
carrier_wallet_id='FAKE_CARRIER_WALLET_ID',
contract_version='1.0.0'
)
# Re-enable Shipment post-save signal
await sync_to_async(models.signals.post_save.connect)(shipment_post_save, sender=Shipment,
dispatch_uid='shipment_post_save')
device = await sync_to_async(Device.objects.create)(id=random_id())
@sync_to_async
def test_method(self):
time.sleep(1)
return 44
shipper_wallet_id='FAKE_SHIPPER_WALLET_ID',
carrier_wallet_id='FAKE_CARRIER_WALLET_ID',
contract_version='1.0.0'
)
# Re-enable Shipment post-save signal
await sync_to_async(models.signals.post_save.connect)(shipment_post_save, sender=Shipment,
dispatch_uid='shipment_post_save')
job = await sync_to_async(AsyncJob.rpc_job_for_listener)(rpc_method=DummyRPCClient.do_whatever, rpc_parameters=[],
signing_wallet_id='FAKE_WALLET_ID', shipment=shipment)
# Disable Shipment job update signal
await sync_to_async(job_update.disconnect)(shipment_job_update, sender=Shipment, dispatch_uid='shipment_job_update')
await sync_to_async(job.message_set.create)(type=MessageType.ETH_TRANSACTION, body=json.dumps({'foo': 'bar'}))
# Enable Shipment job update signal
await sync_to_async(job_update.connect)(shipment_job_update, sender=Shipment, dispatch_uid='shipment_job_update')
response = await communicator.receive_json_from()
assert response['event'] == EventTypes.asyncjob_update.name
assert response['data']['data']['id'] == job.id
await communicator.disconnect()
threads = [TestThread() for _ in range(5)]
for thread in threads:
thread.start()
threads[0].join()
await sync_to_async(sync_function)(3)
assert test_local.counter == 4
await async_function(4)
assert test_local.counter == 5
for thread in threads[1:]:
thread.join()
await sync_to_async(sync_function)(5)
assert test_local.counter == 6
id='FAKE_SHIPMENT_ID',
owner_id=USER_ID,
storage_credentials_id='FAKE_STORAGE_CREDENTIALS_ID',
shipper_wallet_id='FAKE_SHIPPER_WALLET_ID',
carrier_wallet_id='FAKE_CARRIER_WALLET_ID',
contract_version='1.0.0'
)
# Re-enable Shipment post-save signal
await sync_to_async(models.signals.post_save.connect)(shipment_post_save, sender=Shipment,
dispatch_uid='shipment_post_save')
device = await sync_to_async(Device.objects.create)(id=random_id())
telemetry_id = random_id()
telemetry_data = await sync_to_async(TelemetryData.objects.create)(
id=telemetry_id,
device=device,
shipment=shipment,
hardware_id='hardware_id',
sensor_id='sensor_id',
value=867.5309,
version='1.1.0',
timestamp=datetime.datetime.now(datetime.timezone.utc)
)
response = await communicator.receive_json_from()
assert response['event'] == EventTypes.telemetrydata_update.name
assert response['data'] == {
'sensor_id': telemetry_data.sensor_id,
'timestamp': telemetry_data.timestamp.isoformat().replace('+00:00', 'Z'),
'hardware_id': telemetry_data.hardware_id,