Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def send_tx(self):
iroha = Iroha('admin@test')
tx = iroha.transaction([iroha.command(
'TransferAsset', src_account_id='admin@test', dest_account_id='test@test', asset_id='coin#test',
amount='0.01', description=HOSTNAME
)])
ic.sign_transaction(tx, ADMIN_PRIVATE_KEY)
self.client.send_tx_await(tx)
def send_tx(self):
iroha = Iroha('admin@test')
tx = iroha.transaction([iroha.command(
'TransferAsset', src_account_id='admin@test', dest_account_id='test@test', asset_id='coin#test',
amount='0.01', description=HOSTNAME
)])
ic.sign_transaction(tx, ADMIN_PRIVATE_KEY)
self.client.send_tx(tx)
from iroha import IrohaCrypto
import os
import sys
import time
import uuid
if sys.version_info[0] < 3:
raise Exception('Python 3 or a more recent version is required.')
IROHA_HOST_ADDR = os.getenv('IROHA_HOST_ADDR', '127.0.0.1')
IROHA_PORT = os.getenv('IROHA_PORT', '50051')
ADMIN_ACCOUNT_ID = os.getenv('ADMIN_ACCOUNT_ID', 'admin@test')
ADMIN_PRIVATE_KEY = os.getenv(
'ADMIN_PRIVATE_KEY', 'f101537e319568c765b2cc89698325604991dca57b9716b58016b253506cab70')
iroha = Iroha(ADMIN_ACCOUNT_ID)
net = IrohaGrpc('{}:{}'.format(IROHA_HOST_ADDR, IROHA_PORT))
def send_tx():
rand_name = uuid.uuid4().hex
rand_key = IrohaCrypto.private_key()
domain = ADMIN_ACCOUNT_ID.split('@')[1]
tx = iroha.transaction([
iroha.command('CreateAccount',
account_name=rand_name,
domain_id=domain,
public_key=rand_key)
])
IrohaCrypto.sign_transaction(tx, ADMIN_PRIVATE_KEY)
net.send_tx(tx)
print('tx is sent')
def bob_accepts_exchange_request():
global net
q = ic.sign_query(
Iroha('bob@test').query('GetPendingTransactions'),
bob_private_keys[0]
)
pending_transactions = net.send_query(q)
for tx in pending_transactions.transactions_response.transactions:
if tx.payload.reduced_payload.creator_account_id == 'alice@test':
# we need do this temporarily, otherwise accept will not reach MST engine
del tx.signatures[:]
else:
ic.sign_transaction(tx, *bob_private_keys)
send_batch_and_print_status(
pending_transactions.transactions_response.transactions)
def add_keys_and_set_quorum():
alice_iroha = Iroha('alice@test')
alice_cmds = [
alice_iroha.command('AddSignatory', account_id='alice@test',
public_key=alice_public_keys[1]),
alice_iroha.command('SetAccountQuorum',
account_id='alice@test', quorum=2)
]
alice_tx = alice_iroha.transaction(alice_cmds)
ic.sign_transaction(alice_tx, alice_private_keys[0])
send_transaction_and_print_status(alice_tx)
bob_iroha = Iroha('bob@test')
bob_cmds = [
bob_iroha.command('AddSignatory', account_id='bob@test',
public_key=bob_public_keys[1]),
bob_iroha.command('SetAccountQuorum', account_id='bob@test', quorum=2)
]
def add_keys_and_set_quorum():
alice_iroha = Iroha('alice@test')
alice_cmds = [
alice_iroha.command('AddSignatory', account_id='alice@test',
public_key=alice_public_keys[1]),
alice_iroha.command('SetAccountQuorum',
account_id='alice@test', quorum=2)
]
alice_tx = alice_iroha.transaction(alice_cmds)
ic.sign_transaction(alice_tx, alice_private_keys[0])
send_transaction_and_print_status(alice_tx)
bob_iroha = Iroha('bob@test')
bob_cmds = [
bob_iroha.command('AddSignatory', account_id='bob@test',
public_key=bob_public_keys[1]),
bob_iroha.command('SetAccountQuorum', account_id='bob@test', quorum=2)
]
bob_tx = bob_iroha.transaction(bob_cmds)
ic.sign_transaction(bob_tx, bob_private_keys[0])
send_transaction_and_print_status(bob_tx)
from iroha import Iroha, IrohaGrpc
from iroha import IrohaCrypto
import sys
import os
if sys.version_info[0] < 3:
raise Exception('Python 3 or a more recent version is required.')
IROHA_HOST_ADDR = os.getenv('IROHA_HOST_ADDR', '127.0.0.1')
IROHA_PORT = os.getenv('IROHA_PORT', '50051')
ADMIN_ACCOUNT_ID = os.getenv('ADMIN_ACCOUNT_ID', 'admin@test')
ADMIN_PRIVATE_KEY = os.getenv(
'ADMIN_PRIVATE_KEY', 'f101537e319568c765b2cc89698325604991dca57b9716b58016b253506cab70')
iroha = Iroha(ADMIN_ACCOUNT_ID)
net = IrohaGrpc('{}:{}'.format(IROHA_HOST_ADDR, IROHA_PORT))
def trace(func):
"""
A decorator for tracing methods' begin/end execution points
"""
def tracer(*args, **kwargs):
name = func.__name__
print('\tEntering "{}"'.format(name))
result = func(*args, **kwargs)
print('\tLeaving "{}"'.format(name))
return result
return tracer
def bob_declines_exchange_request():
print("""
IT IS EXPECTED HERE THAT THE BATCH WILL FAIL STATEFUL VALIDATION
""")
global net
q = ic.sign_query(
Iroha('bob@test').query('GetPendingTransactions'),
bob_private_keys[0]
)
pending_transactions = net.send_query(q)
for tx in pending_transactions.transactions_response.transactions:
if tx.payload.reduced_payload.creator_account_id == 'alice@test':
# we need do this temporarily, otherwise accept will not reach MST engine
del tx.signatures[:]
else:
# intentionally alice keys were used to fail bob's txs
ic.sign_transaction(tx, *alice_private_keys)
# zeroes as private keys are also acceptable
send_batch_and_print_status(
pending_transactions.transactions_response.transactions)
from iroha.primitive_pb2 import can_set_my_account_detail
import sys
if sys.version_info[0] < 3:
raise Exception('Python 3 or a more recent version is required.')
IROHA_HOST_ADDR = os.getenv('IROHA_HOST_ADDR', '127.0.0.1')
IROHA_PORT = os.getenv('IROHA_PORT', '50051')
ADMIN_ACCOUNT_ID = os.getenv('ADMIN_ACCOUNT_ID', 'admin@test')
ADMIN_PRIVATE_KEY = os.getenv(
'ADMIN_PRIVATE_KEY', 'f101537e319568c765b2cc89698325604991dca57b9716b58016b253506cab70')
user_private_key = IrohaCrypto.private_key()
user_public_key = IrohaCrypto.derive_public_key(user_private_key)
iroha = Iroha(ADMIN_ACCOUNT_ID)
net = IrohaGrpc('{}:{}'.format(IROHA_HOST_ADDR, IROHA_PORT))
def trace(func):
"""
A decorator for tracing methods' begin/end execution points
"""
def tracer(*args, **kwargs):
name = func.__name__
print('\tEntering "{}"'.format(name))
result = func(*args, **kwargs)
print('\tLeaving "{}"'.format(name))
return result
return tracer
IROHA_HOST_ADDR = os.getenv('IROHA_HOST_ADDR', '127.0.0.1')
IROHA_PORT = os.getenv('IROHA_PORT', '50051')
ADMIN_ACCOUNT_ID = os.getenv('ADMIN_ACCOUNT_ID', 'admin@test')
ADMIN_PRIVATE_KEY = os.getenv(
'ADMIN_PRIVATE_KEY', 'f101537e319568c765b2cc89698325604991dca57b9716b58016b253506cab70')
print("""
PLEASE ENSURE THAT MST IS ENABLED IN IROHA CONFIG
""")
if sys.version_info[0] < 3:
raise Exception('Python 3 or a more recent version is required.')
iroha = Iroha(ADMIN_ACCOUNT_ID)
net = IrohaGrpc('{}:{}'.format(IROHA_HOST_ADDR, IROHA_PORT))
alice_private_keys = [
'f101537e319568c765b2cc89698325604991dca57b9716b58016b253506caba1',
'f101537e319568c765b2cc89698325604991dca57b9716b58016b253506caba2'
]
alice_public_keys = [ic.derive_public_key(x) for x in alice_private_keys]
bob_private_keys = [
'f101537e319568c765b2cc89698325604991dca57b9716b58016b253506caba3',
'f101537e319568c765b2cc89698325604991dca57b9716b58016b253506caba4'
]
bob_public_keys = [ic.derive_public_key(x) for x in bob_private_keys]
def trace(func):