Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def redis_steam_id_to_datetime(message_id):
message_id = decode(message_id, "utf8")
milliseconds, seq = map(int, message_id.split("-"))
# Treat the sequence value as additional microseconds to ensure correct sequencing
microseconds = (milliseconds % 1000 * 1000) + seq
dt = datetime.utcfromtimestamp(milliseconds // 1000).replace(
microsecond=microseconds, tzinfo=timezone.utc
)
return dt
async def test_storage(loop):
config = MergeDict(
name='1',
prefix=str(uuid.uuid4()),
format='json',
)
config['app.redis_pool'] = await aioredis.create_pool(
('localhost', 6379), loop=loop)
context = config
q = RedisStorage(config, context=context, loop=loop)
await q.init()
await q.set('g', {'f': 3})
assert {'f': 3} == await q.get('g')
assert 1 == await q.length()
assert ['g'] == await q.list()
await q.set('g', None)
assert not await q.length()
def setUp(self):
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(None)
db_co = aioredis.create_redis(
address=('localhost', 6379),
db=13,
loop=self.loop,
encoding='utf-8',
)
self.db = self.loop.run_until_complete(db_co)
async def create_pub_sub_conn():
pub = await aioredis.create_redis("redis://localhost:6379/0?encoding=utf-8")
sub = await aioredis.create_redis("redis://localhost:6379/0?encoding=utf-8")
yield pub, sub
pub.close()
sub.close()
async def main(flush):
store = await create_redis(('localhost', 6379), commands_factory=Redis)
tf2info = await tf2search.gettf2info(config.apikey,
config.backpackkey, config.tradekey,
config.blueprintsfile)
if flush:
await store.delete('items')
await store.delete_all('items:*')
await store.delete_all('item:*')
suggestions = [[], [], []]
sitemap = Sitemap()
sitemap.add(config.homepage)
all_classes = [class_.lower() for class_ in tf2api.getallclasses()]
def aioredis_pool():
if sys.version_info >= (3, 5):
import aioredis
pool_coroutine = aioredis.create_redis_pool(
('localhost', 6379), minsize=2, maxsize=2)
return pool_coroutine, ring.aioredis
else:
pytest.skip()
async def test_connect_pool_aioredis_instance(self):
with patch('aioredlock.redis.Instance._create_redis_pool') as \
create_redis_pool:
redis_connection = await aioredis.create_redis_pool('redis://localhost')
instance = Instance(redis_connection)
await instance.connect()
assert not create_redis_pool.called
def aioredis_pool(event_loop):
return event_loop.run_until_complete(aioredis.create_pool(("127.0.0.1", 6379), maxsize=1))
async def test_redis_from_create_pool(redis_params):
async def handler(request):
pass
redis = await aioredis.create_pool(**redis_params)
with pytest.warns(DeprecationWarning):
create_app(handler=handler, redis=redis)
async def test_make_subscriber():
sub, chan = await utils.make_subscriber("test")
assert sub is not None
assert chan is not None
assert isinstance(sub, aioredis.Redis)
assert isinstance(chan, aioredis.Channel)
await sub.subscribe("channel:test")
settings.loop.create_task(reader(chan))
assert await utils.publish_message("test", {"hello": "world"}) == 1