Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
self.pool.schedule(self._concurrent_delete_many, args=(query, details))
_hash = self._generate_hash(query)
count = self._get_count(_hash, query)
phindex = self.redis.incr("placeholder_del:index")
placeholder_hash = f"{_hash}:placeholder:{phindex}"
placeholder_hash_del = f"{_hash}:placeholder_del:{phindex}"
push_key = f"{_hash}:list"
rlock = f"{_hash}:lock"
with self.redis.lock(rlock):
all_matching_redis_items = self.back_to_dict(self.redis.lrange(push_key, 0, -1))
if isinstance(all_matching_redis_items, dict):
""" Remove replace the current list with the empty one"""
is_true = self._search_one(all_matching_redis_items, details)
if is_true == False: return
self.redis.rpush(placeholder_hash, orjson.dumps(all_matching_redis_items))
else:
for match in all_matching_redis_items:
is_true = self._search_one(match, details)
if is_true:
self.redis.rpush(placeholder_hash, orjson.dumps(match))
self.redis.rename(push_key, placeholder_hash_del)
self.redis.rename(placeholder_hash, push_key)
self.pool.schedule(self._concurrent_delete_list, args=(placeholder_hash_del))
# Delete while unlocked.
async def get_chaindata(messages, bulk_threshold=2000):
""" Returns content ready to be broadcasted on-chain (aka chaindata).
If message length is over bulk_threshold (default 2000 chars), store list
in IPFS and store the object hash instead of raw list.
"""
chaindata = {
'protocol': 'aleph',
'version': 1,
'content': {
'messages': messages
}
}
content = json.dumps(chaindata)
if len(content) > bulk_threshold:
ipfs_id = await add_json(chaindata)
return json.dumps({'protocol': 'aleph-offchain',
'version': 1,
'content': ipfs_id})
else:
return content
def orjson_dumps(v, *, default):
# orjson.dumps returns bytes, to match standard json.dumps we need to decode
return orjson.dumps(v, default=default).decode()
async def add_json(value, engine='ipfs'):
# TODO: determine which storage engine to use
loop = asyncio.get_event_loop()
content = await loop.run_in_executor(None, json.dumps, value)
if engine == 'ipfs':
chash = await add_ipfs_bytes(content)
elif engine == 'storage':
if isinstance(content, str):
content = content.encode('utf-8')
chash = sha256(content).hexdigest()
else:
raise NotImplementedError('storage engine %s not supported' % engine)
await set_value(chash, content)
return chash
def _save_redis(self, _hash:str, data:dict):
serialized = orjson.dumps(data)
rlock = f"{_hash}:lock"
with self.redis.lock(rlock):
push_key = f"{_hash}:list"
self.redis.rpush(push_key, serialized)
def _bulk_save_redis(self, _hash:str, data:list):
serialized_list = [orjson.dumps(x) for x in data]
rlock = f"{_hash}:lock"
with self.redis.lock(rlock):
push_key = f"{_hash}:list"
self.redis.rpush(push_key, *serialized_list)
def dataclass_asjson(instance: Any) -> bytes:
fields = SerializeFields.get_fields(type(instance))
if not fields:
return orjson.dumps(
instance, default=OrjsonDefaultTypes.default_function
)
instance = dataclasses.asdict(instance)
return orjson.dumps(
{f.name: instance[f.name] for f in fields},
default=OrjsonDefaultTypes.default_function,
)
async def create_connections(self, peer_id):
peer_streams = self.peers.get(peer_id, list())
for i in range(self.streams_per_host - len(peer_streams)):
try:
stream: INetStream = await self.host.new_stream(peer_id, [PROTOCOL_ID])
except SwarmException as error:
LOGGER.debug("fail to add new peer %s, error %s", peer_id, error)
return
try:
await stream.write(json.dumps(HELLO_PACKET))
await stream.read(MAX_READ_LEN)
except Exception as error:
LOGGER.debug("fail to add new peer %s, error %s", peer_id, error)
return
peer_streams.append((stream, asyncio.Semaphore(1)))
# await asyncio.sleep(.1)
self.peers[peer_id] = peer_streams