Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _compact_hashX(self, hashX, hist_map, hist_list,
write_items, keys_to_delete):
'''Compres history for a hashX. hist_list is an ordered list of
the histories to be compressed.'''
# History entries (tx numbers) are 4 bytes each. Distribute
# over rows of up to 50KB in size. A fixed row size means
# future compactions will not need to update the first N - 1
# rows.
max_row_size = self.max_hist_row_entries * 4
full_hist = b''.join(hist_list)
nrows = (len(full_hist) + max_row_size - 1) // max_row_size
if nrows > 4:
self.logger.info('hashX {} is large: {:,d} entries across '
'{:,d} rows'
.format(hash_to_hex_str(hashX),
len(full_hist) // 4, nrows))
# Find what history needs to be written, and what keys need to
# be deleted. Start by assuming all keys are to be deleted,
# and then remove those that are the same on-disk as when
# compacted.
write_size = 0
keys_to_delete.update(hist_map)
for n, chunk in enumerate(util.chunks(full_hist, max_row_size)):
key = hashX + pack_be_uint16(n)
if hist_map.get(key) == chunk:
keys_to_delete.remove(key)
else:
write_items.append((key, chunk))
write_size += len(chunk)
def test_hash_to_hex_str():
assert lib_hash.hash_to_hex_str(b'hash_to_str') == '7274735f6f745f68736168'
async def hashX_listunspent(self, hashX):
'''Return the list of UTXOs of a script hash, including mempool
effects.'''
utxos = await self.db.all_utxos(hashX)
utxos = sorted(utxos)
utxos.extend(await self.mempool.unordered_UTXOs(hashX))
self.bump_cost(1.0 + len(utxos) / 50)
spends = await self.mempool.potential_spends(hashX)
return [{'tx_hash': hash_to_hex_str(utxo.tx_hash),
'tx_pos': utxo.tx_pos,
'height': utxo.height, 'value': utxo.value}
for utxo in utxos
if (utxo.tx_hash, utxo.tx_pos) not in spends]
return hashX
for arg in args:
hashX = arg_to_hashX(arg)
if not hashX:
continue
n = None
for n, (tx_hash, height) in enumerate(
db.get_history(hashX, limit), start=1):
lines.append(f'History #{n:,d}: height {height:,d} '
f'tx_hash {hash_to_hex_str(tx_hash)}')
if n is None:
lines.append('No history found')
n = None
for n, utxo in enumerate(db.get_utxos(hashX, limit), start=1):
lines.append(f'UTXO #{n:,d}: tx_hash '
f'{hash_to_hex_str(utxo.tx_hash)} '
f'tx_pos {utxo.tx_pos:,d} height '
f'{utxo.height:,d} value {utxo.value:,d}')
if n is None:
lines.append('No UTXOs found')
balance = db.get_balance(hashX)
lines.append(f'Balance: {coin.decimal_value(balance):,f} '
f'{coin.SHORTNAME}')
return lines
async def unconfirmed_history(self, hashX):
# Note unconfirmed history is unordered in electrum-server
# height is -1 if it has unconfirmed inputs, otherwise 0
result = [{'tx_hash': hash_to_hex_str(tx.hash),
'height': -tx.has_unconfirmed_inputs,
'fee': tx.fee}
for tx in await self.mempool.transaction_summaries(hashX)]
self.bump_cost(0.25 + len(result) / 50)
return result
raise RPCError(BAD_REQUEST, f'"merkle" must be a boolean')
if merkle:
branch, tx_hash, cost = await self.session_mgr.merkle_branch_for_tx_pos(
height, tx_pos)
self.bump_cost(cost)
return {"tx_hash": tx_hash, "merkle": branch}
else:
tx_hashes, cost = await self.session_mgr.tx_hashes_at_blockheight(height)
try:
tx_hash = tx_hashes[tx_pos]
except IndexError:
raise RPCError(BAD_REQUEST,
f'no tx at position {tx_pos:,d} in block at height {height:,d}')
self.bump_cost(cost)
return hash_to_hex_str(tx_hash)
def _get_merkle_branch(self, tx_hashes, tx_pos):
'''Return a merkle branch to a transaction.
tx_hashes: ordered list of hex strings of tx hashes in a block
tx_pos: index of transaction in tx_hashes to create branch for
'''
hashes = [hex_str_to_hash(hash) for hash in tx_hashes]
branch, root = self.db.merkle.branch_and_root(hashes, tx_pos)
branch = [hash_to_hex_str(hash) for hash in branch]
return branch
self.db_tip = state['tip']
self.utxo_flush_count = state['utxo_flush_count']
self.wall_time = state['wall_time']
self.first_sync = state['first_sync']
# These are our state as we move ahead of DB state
self.fs_height = self.db_height
self.fs_tx_count = self.db_tx_count
self.last_flush_tx_count = self.fs_tx_count
# Log some stats
self.logger.info('DB version: {:d}'.format(self.db_version))
self.logger.info('coin: {}'.format(self.coin.NAME))
self.logger.info('network: {}'.format(self.coin.NET))
self.logger.info('height: {:,d}'.format(self.db_height))
self.logger.info('tip: {}'.format(hash_to_hex_str(self.db_tip)))
self.logger.info('tx count: {:,d}'.format(self.db_tx_count))
if self.utxo_db.for_sync:
self.logger.info(f'flushing DB cache at {self.env.cache_MB:,d} MB')
if self.first_sync:
self.logger.info('sync time so far: {}'
.format(util.formatted_time(self.wall_time)))
return coin.hashX_from_script(script)
except ValueError:
pass
hashX = coin.address_to_hashX(arg)
lines.append(f'Address: {arg}')
return hashX
for arg in args:
hashX = arg_to_hashX(arg)
if not hashX:
continue
n = None
for n, (tx_hash, height) in enumerate(
db.get_history(hashX, limit), start=1):
lines.append(f'History #{n:,d}: height {height:,d} '
f'tx_hash {hash_to_hex_str(tx_hash)}')
if n is None:
lines.append('No history found')
n = None
for n, utxo in enumerate(db.get_utxos(hashX, limit), start=1):
lines.append(f'UTXO #{n:,d}: tx_hash '
f'{hash_to_hex_str(utxo.tx_hash)} '
f'tx_pos {utxo.tx_pos:,d} height '
f'{utxo.height:,d} value {utxo.value:,d}')
if n is None:
lines.append('No UTXOs found')
balance = db.get_balance(hashX)
lines.append(f'Balance: {coin.decimal_value(balance):,f} '
f'{coin.SHORTNAME}')
def prev_hex_hash(raw_block):
return hash_to_hex_str(raw_block[4:36])