Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def test_contextmanager(self):
async def task():
assert lock.locked()
async with lock:
results.append('2')
results = []
lock = create_lock()
async with create_task_group() as tg:
async with lock:
await tg.spawn(task)
await wait_all_tasks_blocked()
results.append('1')
assert not lock.locked()
assert results == ['1', '2']
async def test_manual_acquire(self):
async def task():
assert lock.locked()
await lock.acquire()
try:
results.append('2')
finally:
await lock.release()
results = []
lock = create_lock()
async with create_task_group() as tg:
await lock.acquire()
try:
await tg.spawn(task)
await wait_all_tasks_blocked()
results.append('1')
finally:
await lock.release()
assert not lock.locked()
assert results == ['1', '2']
async def test_cancel(self):
async def task():
nonlocal task_started, got_lock
task_started = True
async with lock:
got_lock = True
task_started = got_lock = False
lock = create_lock()
async with create_task_group() as tg:
async with lock:
await tg.spawn(task)
await tg.cancel_scope.cancel()
assert task_started
assert not got_lock
self._stream = stream
self._wsproto = wsproto
self._host = host
self._path = path
self._subprotocols = subprotocols
self._connection_subprotocol = None
self._handshake_headers = None
self._headers = headers
self.message_queue_size = message_queue_size
self.max_message_size = max_message_size
self._reject_status = None
self._reject_headers = None
self._reject_body = b''
self._stream_lock = anyio.create_lock()
self._message_size = 0
self._message_parts = []
self._event_queue = anyio.create_queue(self.message_queue_size)
self._pings = OrderedDict()
self._open_handshake = anyio.create_event()
self._close_handshake = anyio.create_event()
def get_ratelimit_lock(self, bucket: object) -> "anyio.Lock":
"""
Gets a ratelimit lock from the dict if it exists, otherwise creates a new one.
"""
try:
return self._rate_limits[bucket]
except KeyError:
lock = anyio.create_lock()
self._rate_limits[bucket] = lock
return lock
def __init__(self, client):
self.client = client
self.token = client.token
self.retries = 5
self.buckets = defaultdict(anyio.create_lock)
self.global_event = anyio.create_event()
# set global lock and create user agent
user_agent = 'DiscordBot ({0} {1}) Python/{2[0]}.{2[1]}'
self.user_agent = user_agent.format(
__github__, __version__, sys.version_info)
token = 'Bot {.token}'.format(self) if self.client.is_bot else self.token
headers = {
"Authorization": token,
"User-Agent": self.user_agent
}
self.session = asks.Session(headers=headers)
def __init__(self, token: str, *, max_connections: int = 10):
#: The token used for all requests.
self.token = token
# Calculated headers
from curious import USER_AGENT
headers = {"User-Agent": USER_AGENT, "Authorization": f"Bot {token}"}
self.endpoints = Endpoints()
self.headers = headers
#: The global ratelimit lock.
self.global_lock = anyio.create_lock()
self._rate_limits = weakref.WeakValueDictionary()
self._ratelimit_remaining = lru(1024)