Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def test_thread_sensitive_outside_async():
"""
Tests that thread_sensitive SyncToAsync where the outside is async code runs
in a single, separate thread.
"""
result_1 = {}
result_2 = {}
# Outer sync function
def outer(result):
middle(result)
outer = sync_to_async(outer, thread_sensitive=True)
# Middle async function
@async_to_sync
async def middle(result):
await inner(result)
# Inner sync function
def inner(result):
result["thread"] = threading.current_thread()
inner = sync_to_async(inner, thread_sensitive=True)
# Run it (in supposed parallel!)
await asyncio.wait([outer(result_1), inner(result_2)])
# They should not have run in the main thread, but in the same thread
async def test_trackingdata_notification(communicator):
# Disable Shipment post-save signal
await sync_to_async(models.signals.post_save.disconnect)(sender=Shipment, dispatch_uid='shipment_post_save')
shipment, _ = await sync_to_async(Shipment.objects.get_or_create)(
id='FAKE_SHIPMENT_ID',
owner_id=USER_ID,
storage_credentials_id='FAKE_STORAGE_CREDENTIALS_ID',
shipper_wallet_id='FAKE_SHIPPER_WALLET_ID',
carrier_wallet_id='FAKE_CARRIER_WALLET_ID',
contract_version='1.0.0'
)
# Re-enable Shipment post-save signal
await sync_to_async(models.signals.post_save.connect)(shipment_post_save, sender=Shipment,
dispatch_uid='shipment_post_save')
device = await sync_to_async(Device.objects.create)(id=random_id())
@sync_to_async
def test_method(self):
time.sleep(1)
return 44
shipper_wallet_id='FAKE_SHIPPER_WALLET_ID',
carrier_wallet_id='FAKE_CARRIER_WALLET_ID',
contract_version='1.0.0'
)
# Re-enable Shipment post-save signal
await sync_to_async(models.signals.post_save.connect)(shipment_post_save, sender=Shipment,
dispatch_uid='shipment_post_save')
job = await sync_to_async(AsyncJob.rpc_job_for_listener)(rpc_method=DummyRPCClient.do_whatever, rpc_parameters=[],
signing_wallet_id='FAKE_WALLET_ID', shipment=shipment)
# Disable Shipment job update signal
await sync_to_async(job_update.disconnect)(shipment_job_update, sender=Shipment, dispatch_uid='shipment_job_update')
await sync_to_async(job.message_set.create)(type=MessageType.ETH_TRANSACTION, body=json.dumps({'foo': 'bar'}))
# Enable Shipment job update signal
await sync_to_async(job_update.connect)(shipment_job_update, sender=Shipment, dispatch_uid='shipment_job_update')
response = await communicator.receive_json_from()
assert response['event'] == EventTypes.asyncjob_update.name
assert response['data']['data']['id'] == job.id
await communicator.disconnect()
threads = [TestThread() for _ in range(5)]
for thread in threads:
thread.start()
threads[0].join()
await sync_to_async(sync_function)(3)
assert test_local.counter == 4
await async_function(4)
assert test_local.counter == 5
for thread in threads[1:]:
thread.join()
await sync_to_async(sync_function)(5)
assert test_local.counter == 6
id='FAKE_SHIPMENT_ID',
owner_id=USER_ID,
storage_credentials_id='FAKE_STORAGE_CREDENTIALS_ID',
shipper_wallet_id='FAKE_SHIPPER_WALLET_ID',
carrier_wallet_id='FAKE_CARRIER_WALLET_ID',
contract_version='1.0.0'
)
# Re-enable Shipment post-save signal
await sync_to_async(models.signals.post_save.connect)(shipment_post_save, sender=Shipment,
dispatch_uid='shipment_post_save')
device = await sync_to_async(Device.objects.create)(id=random_id())
telemetry_id = random_id()
telemetry_data = await sync_to_async(TelemetryData.objects.create)(
id=telemetry_id,
device=device,
shipment=shipment,
hardware_id='hardware_id',
sensor_id='sensor_id',
value=867.5309,
version='1.1.0',
timestamp=datetime.datetime.now(datetime.timezone.utc)
)
response = await communicator.receive_json_from()
assert response['event'] == EventTypes.telemetrydata_update.name
assert response['data'] == {
'sensor_id': telemetry_data.sensor_id,
'timestamp': telemetry_data.timestamp.isoformat().replace('+00:00', 'Z'),
'hardware_id': telemetry_data.hardware_id,
@sync_to_async
def _hash_password(hasher: GenericHandler, password: str) -> str:
return hasher.hash(password)
message = json.dumps({
'msg_type': 'rt_connect',
'client_data': client_data
})
await websocket.send(message)
response = await websocket.recv()
assert response.decode() == '{ "msg_type": "rt_connect_ok" }'
await websocket.send('{ "msg_type": "rt_subscribe", "request_id": "A" }')
while True:
response = await websocket.recv()
try:
await sync_to_async(self.handle_data)(json.loads(response))
except (Error, ValueError) as e:
print(e)
async def disconnect(self, code):
"""
Websocket disconnect event handler, which closes session with guacd on websocket disconnect
"""
# socket disconnected - inform server that we are out
await self.log_ticket_action(self.ticket, 'disconnect', self.scope)
await sync_to_async(self.gclient.close)()