Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
max_row_size = self.max_hist_row_entries * 4
full_hist = b''.join(hist_list)
nrows = (len(full_hist) + max_row_size - 1) // max_row_size
if nrows > 4:
self.logger.info('hashX {} is large: {:,d} entries across '
'{:,d} rows'
.format(hash_to_hex_str(hashX),
len(full_hist) // 4, nrows))
# Find what history needs to be written, and what keys need to
# be deleted. Start by assuming all keys are to be deleted,
# and then remove those that are the same on-disk as when
# compacted.
write_size = 0
keys_to_delete.update(hist_map)
for n, chunk in enumerate(util.chunks(full_hist, max_row_size)):
key = hashX + pack_be_uint16(n)
if hist_map.get(key) == chunk:
keys_to_delete.remove(key)
else:
write_items.append((key, chunk))
write_size += len(chunk)
assert n + 1 == nrows
self.comp_flush_count = max(self.comp_flush_count, n)
return write_size
def db_class(name):
'''Returns a DB engine class.'''
for db_class in util.subclasses(Storage):
if db_class.__name__.lower() == name.lower():
db_class.import_module()
return db_class
raise RuntimeError('unrecognised DB engine "{}"'.format(name))
def header_hash(cls, header):
timestamp = util.unpack_le_uint32_from(header, 68)[0]
if timestamp > 1550246400:
import x16rt_hash
return x16rt_hash.getPoWHash(header)
elif timestamp > 1525651200:
import lyra2z_hash
return lyra2z_hash.getPoWHash(header)
import neoscrypt
return neoscrypt.getPoWHash(header)
def lookup_coin_class(cls, name, net):
'''Return a coin class given name and network.
Raise an exception if unrecognised.'''
req_attrs = ['TX_COUNT', 'TX_COUNT_HEIGHT', 'TX_PER_BLOCK']
for coin in util.subclasses(Coin):
if (coin.NAME.lower() == name.lower() and
coin.NET.lower() == net.lower()):
coin_req_attrs = req_attrs.copy()
missing = [attr for attr in coin_req_attrs
if not hasattr(coin, attr)]
if missing:
raise CoinError('coin {} missing {} attributes'
.format(name, missing))
return coin
raise CoinError('unknown coin {} and network {} combination'
.format(name, net))
def __init__(self, db, prefix, reverse):
self.prefix = prefix
if reverse:
self.iterator = reversed(db.iteritems())
nxt_prefix = util.increment_byte_string(prefix)
if nxt_prefix:
self.iterator.seek(nxt_prefix)
try:
next(self.iterator)
except StopIteration:
self.iterator.seek(nxt_prefix)
else:
self.iterator.seek_to_last()
else:
self.iterator = db.iteritems()
self.iterator.seek(prefix)