How to use the fortnitepy.cache.Cache function in fortnitepy

To help you get started, we’ve selected a few fortnitepy examples, based on popular ways it is used in public projects.

Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.

github Terbau / fortnitepy / fortnitepy / cache.py View on Github external
if silent:
            return self._cache.get(key)
        return self._cache[key]

    async def schedule_removal(self, key: str, seconds: int) -> None:
        await asyncio.sleep(seconds, loop=self.loop)
        try:
            del self._cache[key]
        except KeyError:
            pass

    def clear(self) -> None:
        self._cache = {}


class WeakrefCache(Cache):
    def __init__(self,
                 loop: Optional[asyncio.AbstractEventLoop] = None) -> None:
        super().__init__(loop=loop)
        self._cache = weakref.WeakValueDictionary()
github Terbau / fortnitepy / fortnitepy / client.py View on Github external
self.event_prefix = 'event_'

        self.auth = auth
        self.auth.initialize(self)
        self.http = HTTPClient(self, connector=kwargs.get('connector'))
        self.http.add_header('Accept-Language', 'en-EN')
        self.xmpp = XMPPClient(self, ws_connector=kwargs.get('ws_connector'))
        self.party = None

        self._listeners = {}
        self._events = {}
        self._friends = Cache()
        self._pending_friends = Cache()
        self._users = Cache()
        self._blocked_users = Cache()
        self._presences = Cache()
        self._ready = asyncio.Event(loop=self.loop)
        self._leave_lock = asyncio.Lock(loop=self.loop)
        self._join_party_lock = LockEvent(loop=self.loop)
        self._refresh_task = None
        self._start_runner_task = None
        self._closed = False
        self._closing = False
        self._restarting = False
        self._first_start = True

        self._join_confirmation = False

        self.setup_internal()
github Terbau / fortnitepy / fortnitepy / client.py View on Github external
self.kill_other_sessions = True
        self.accept_eula = True
        self.event_prefix = 'event_'

        self.auth = auth
        self.auth.initialize(self)
        self.http = HTTPClient(self, connector=kwargs.get('connector'))
        self.http.add_header('Accept-Language', 'en-EN')
        self.xmpp = XMPPClient(self, ws_connector=kwargs.get('ws_connector'))
        self.party = None

        self._listeners = {}
        self._events = {}
        self._friends = Cache()
        self._pending_friends = Cache()
        self._users = Cache()
        self._blocked_users = Cache()
        self._presences = Cache()
        self._ready = asyncio.Event(loop=self.loop)
        self._leave_lock = asyncio.Lock(loop=self.loop)
        self._join_party_lock = LockEvent(loop=self.loop)
        self._refresh_task = None
        self._start_runner_task = None
        self._closed = False
        self._closing = False
        self._restarting = False
        self._first_start = True

        self._join_confirmation = False

        self.setup_internal()
github Terbau / fortnitepy / fortnitepy / client.py View on Github external
self.service_port = kwargs.get('xmpp_port', 5222)

        self.kill_other_sessions = True
        self.accept_eula = True
        self.event_prefix = 'event_'

        self.auth = auth
        self.auth.initialize(self)
        self.http = HTTPClient(self, connector=kwargs.get('connector'))
        self.http.add_header('Accept-Language', 'en-EN')
        self.xmpp = XMPPClient(self, ws_connector=kwargs.get('ws_connector'))
        self.party = None

        self._listeners = {}
        self._events = {}
        self._friends = Cache()
        self._pending_friends = Cache()
        self._users = Cache()
        self._blocked_users = Cache()
        self._presences = Cache()
        self._ready = asyncio.Event(loop=self.loop)
        self._leave_lock = asyncio.Lock(loop=self.loop)
        self._join_party_lock = LockEvent(loop=self.loop)
        self._refresh_task = None
        self._start_runner_task = None
        self._closed = False
        self._closing = False
        self._restarting = False
        self._first_start = True

        self._join_confirmation = False
github Terbau / fortnitepy / fortnitepy / client.py View on Github external
self.accept_eula = True
        self.event_prefix = 'event_'

        self.auth = auth
        self.auth.initialize(self)
        self.http = HTTPClient(self, connector=kwargs.get('connector'))
        self.http.add_header('Accept-Language', 'en-EN')
        self.xmpp = XMPPClient(self, ws_connector=kwargs.get('ws_connector'))
        self.party = None

        self._listeners = {}
        self._events = {}
        self._friends = Cache()
        self._pending_friends = Cache()
        self._users = Cache()
        self._blocked_users = Cache()
        self._presences = Cache()
        self._ready = asyncio.Event(loop=self.loop)
        self._leave_lock = asyncio.Lock(loop=self.loop)
        self._join_party_lock = LockEvent(loop=self.loop)
        self._refresh_task = None
        self._start_runner_task = None
        self._closed = False
        self._closing = False
        self._restarting = False
        self._first_start = True

        self._join_confirmation = False

        self.setup_internal()
github Terbau / fortnitepy / fortnitepy / client.py View on Github external
self.kill_other_sessions = True
        self.accept_eula = True
        self.event_prefix = 'event_'

        self.auth = auth
        self.auth.initialize(self)
        self.http = HTTPClient(self, connector=kwargs.get('connector'))
        self.http.add_header('Accept-Language', 'en-EN')
        self.xmpp = XMPPClient(self, ws_connector=kwargs.get('ws_connector'))
        self.party = None

        self._listeners = {}
        self._events = {}
        self._friends = Cache()
        self._pending_friends = Cache()
        self._users = Cache()
        self._blocked_users = Cache()
        self._presences = Cache()
        self._ready = asyncio.Event(loop=self.loop)
        self._leave_lock = asyncio.Lock(loop=self.loop)
        self._join_party_lock = LockEvent(loop=self.loop)
        self._refresh_task = None
        self._start_runner_task = None
        self._closed = False
        self._closing = False
        self._restarting = False
        self._first_start = True

        self._join_confirmation = False

        self.setup_internal()