Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
if silent:
return self._cache.get(key)
return self._cache[key]
async def schedule_removal(self, key: str, seconds: int) -> None:
await asyncio.sleep(seconds, loop=self.loop)
try:
del self._cache[key]
except KeyError:
pass
def clear(self) -> None:
self._cache = {}
class WeakrefCache(Cache):
def __init__(self,
loop: Optional[asyncio.AbstractEventLoop] = None) -> None:
super().__init__(loop=loop)
self._cache = weakref.WeakValueDictionary()
self.event_prefix = 'event_'
self.auth = auth
self.auth.initialize(self)
self.http = HTTPClient(self, connector=kwargs.get('connector'))
self.http.add_header('Accept-Language', 'en-EN')
self.xmpp = XMPPClient(self, ws_connector=kwargs.get('ws_connector'))
self.party = None
self._listeners = {}
self._events = {}
self._friends = Cache()
self._pending_friends = Cache()
self._users = Cache()
self._blocked_users = Cache()
self._presences = Cache()
self._ready = asyncio.Event(loop=self.loop)
self._leave_lock = asyncio.Lock(loop=self.loop)
self._join_party_lock = LockEvent(loop=self.loop)
self._refresh_task = None
self._start_runner_task = None
self._closed = False
self._closing = False
self._restarting = False
self._first_start = True
self._join_confirmation = False
self.setup_internal()
self.kill_other_sessions = True
self.accept_eula = True
self.event_prefix = 'event_'
self.auth = auth
self.auth.initialize(self)
self.http = HTTPClient(self, connector=kwargs.get('connector'))
self.http.add_header('Accept-Language', 'en-EN')
self.xmpp = XMPPClient(self, ws_connector=kwargs.get('ws_connector'))
self.party = None
self._listeners = {}
self._events = {}
self._friends = Cache()
self._pending_friends = Cache()
self._users = Cache()
self._blocked_users = Cache()
self._presences = Cache()
self._ready = asyncio.Event(loop=self.loop)
self._leave_lock = asyncio.Lock(loop=self.loop)
self._join_party_lock = LockEvent(loop=self.loop)
self._refresh_task = None
self._start_runner_task = None
self._closed = False
self._closing = False
self._restarting = False
self._first_start = True
self._join_confirmation = False
self.setup_internal()
self.service_port = kwargs.get('xmpp_port', 5222)
self.kill_other_sessions = True
self.accept_eula = True
self.event_prefix = 'event_'
self.auth = auth
self.auth.initialize(self)
self.http = HTTPClient(self, connector=kwargs.get('connector'))
self.http.add_header('Accept-Language', 'en-EN')
self.xmpp = XMPPClient(self, ws_connector=kwargs.get('ws_connector'))
self.party = None
self._listeners = {}
self._events = {}
self._friends = Cache()
self._pending_friends = Cache()
self._users = Cache()
self._blocked_users = Cache()
self._presences = Cache()
self._ready = asyncio.Event(loop=self.loop)
self._leave_lock = asyncio.Lock(loop=self.loop)
self._join_party_lock = LockEvent(loop=self.loop)
self._refresh_task = None
self._start_runner_task = None
self._closed = False
self._closing = False
self._restarting = False
self._first_start = True
self._join_confirmation = False
self.accept_eula = True
self.event_prefix = 'event_'
self.auth = auth
self.auth.initialize(self)
self.http = HTTPClient(self, connector=kwargs.get('connector'))
self.http.add_header('Accept-Language', 'en-EN')
self.xmpp = XMPPClient(self, ws_connector=kwargs.get('ws_connector'))
self.party = None
self._listeners = {}
self._events = {}
self._friends = Cache()
self._pending_friends = Cache()
self._users = Cache()
self._blocked_users = Cache()
self._presences = Cache()
self._ready = asyncio.Event(loop=self.loop)
self._leave_lock = asyncio.Lock(loop=self.loop)
self._join_party_lock = LockEvent(loop=self.loop)
self._refresh_task = None
self._start_runner_task = None
self._closed = False
self._closing = False
self._restarting = False
self._first_start = True
self._join_confirmation = False
self.setup_internal()
self.kill_other_sessions = True
self.accept_eula = True
self.event_prefix = 'event_'
self.auth = auth
self.auth.initialize(self)
self.http = HTTPClient(self, connector=kwargs.get('connector'))
self.http.add_header('Accept-Language', 'en-EN')
self.xmpp = XMPPClient(self, ws_connector=kwargs.get('ws_connector'))
self.party = None
self._listeners = {}
self._events = {}
self._friends = Cache()
self._pending_friends = Cache()
self._users = Cache()
self._blocked_users = Cache()
self._presences = Cache()
self._ready = asyncio.Event(loop=self.loop)
self._leave_lock = asyncio.Lock(loop=self.loop)
self._join_party_lock = LockEvent(loop=self.loop)
self._refresh_task = None
self._start_runner_task = None
self._closed = False
self._closing = False
self._restarting = False
self._first_start = True
self._join_confirmation = False
self.setup_internal()