Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def __getstate__(self):
state = self.__dict__.copy()
# We cannot pickle the connection objects, they get recreated
# upon unpickling
state.pop('_redis')
state.pop('_buffer')
return state
def __setstate__(self, state):
self.__dict__ = state
# Reconnect here
self.__init__(self.config, name=self._name)
class RedisListStorage(OrderedStorage, RedisStorage):
def __init__(self, config, name=None):
RedisStorage.__init__(self, config, name=name)
def keys(self):
return self._redis.hkeys(self._name)
def redis_keys(self):
return self._redis.hvals(self._name)
def status(self):
status = self._parse_config(self.config['redis'])
status.update(Storage.status(self))
return status
def get(self, key):
return self._get_items(self._redis, self.redis_key(key))
return cfg
def __getstate__(self):
state = self.__dict__.copy()
state.pop('_mongo_client')
state.pop('_collection')
state.pop('_buffer')
state['_initialized'] = False
return state
def __setstate__(self, state):
self.__dict__ = state
self.__init__(self._config, name=self._name)
class AsyncMongoListStorage(OrderedStorage, AsyncMongoStorage):
async def keys(self):
return [doc['key'] async for doc in self._collection.find(projection={'_id': False, 'vals': False})]
async def get(self, key: str):
return list(chain.from_iterable([doc['vals'] async for doc in self._collection.find(filter={'key': key},
projection={
'_id': False,
'key': False})]))
async def insert(self, key, *vals, **kwargs):
buffer = kwargs.pop('buffer', False)
if buffer:
await self._insert(self._buffer, key, *vals)
else:
await self._insert(self._collection, key, *vals)
def empty_buffer(self):
pass
class OrderedStorage(Storage):
pass
class UnorderedStorage(Storage):
pass
class DictListStorage(OrderedStorage):
'''This is a wrapper class around ``defaultdict(list)`` enabling
it to support an API consistent with `Storage`
'''
def __init__(self, config):
self._dict = defaultdict(list)
def keys(self):
return self._dict.keys()
def get(self, key):
return self._dict.get(key, [])
def remove(self, *keys):
for key in keys:
del self._dict[key]
"""
state = self.__dict__.copy()
state.pop('_client')
return state
def __setstate__(self, state):
"""
Set the state by reconnecting ephemeral objects.
:param dict[str, any] state: the state to restore
"""
self.__dict__ = state
self.__init__(self._config, name=self._name, buffer_size=self._buffer_size)
class CassandraListStorage(OrderedStorage, CassandraStorage):
"""
OrderedStorage storage implementation using Cassandra as backend.
Note: Since we need to (i) select and delete values by both 'key' and by 'key and value',
and (ii) allow duplicate values, we store a monotonically increasing timestamp as
additional value.
"""
def keys(self):
"""Implement interface."""
return self._client.get_keys()
def get(self, key):
"""Implement interface."""
return self._client.select([key]).get(key, [])