Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_StepDict_can_load_a_step_from_a_function():
u"lettuce.STEP_REGISTRY.load_func(func) append item(step, func) to STEP_REGISTRY"
steps = StepDict()
def a_step_to_test():
pass
steps.load_func(a_step_to_test)
expected_sentence = "A step to test"
assert_in(expected_sentence, steps)
assert_equal(steps[expected_sentence], a_step_to_test)
def help_all_output_test(subcommand=''):
"""test that `ipython [subcommand] --help-all` works"""
cmd = ' '.join(get_ipython_cmd() + [subcommand, '--help-all'])
out, err, rc = get_output_error_code(cmd)
nt.assert_equal(rc, 0, err)
nt.assert_not_in("Traceback", err)
nt.assert_in("Options", out)
nt.assert_in("Class parameters", out)
return out, err
pulse1 = Pulse(def_1='1.0', def_2='{a}')
pulse2 = Pulse(def_1='{a} + 1.0', def_2='3.0')
pulse3 = Pulse(def_1='{2_stop} + 0.5', def_2='10',
kind='Analogical', shape=SquareShape())
root.items.extend([pulse1, pulse2, pulse3])
pref = root.preferences_from_members()
assert_items_equal(pref.keys(),
['name', 'local_vars', 'time_constrained',
'enabled', 'item_class', 'sequence_duration',
'item_0', 'item_1', 'item_2', 'external_vars',
'context', 'def_1', 'def_2', 'def_mode'])
assert_in('shape', pref['item_2'])
assert_in('shape_class', pref['item_2']['shape'])
assert_equal(pref['item_2']['shape']['shape_class'], 'SquareShape')
def test_model_parameters():
pid = connection.post_projects("model-parameters-project")
mid = connection.post_project_models(pid, "generic", "parameters-model")
connection.put_model_parameter(mid, "foo", "bar")
connection.put_model_parameter(mid, "baz", [1, 2, 3])
connection.put_model_parameter(mid, "blah", {"cat":"dog"})
connection.put_model_parameter(mid, "output", True, input=False)
connection.post_model_finish(mid)
connection.join_model(mid)
model = connection.get_model(mid)
nose.tools.assert_in("artifact:foo", model)
nose.tools.assert_equal(model["artifact:foo"], "bar")
nose.tools.assert_in("artifact:baz", model)
nose.tools.assert_equal(model["artifact:baz"], [1, 2, 3])
nose.tools.assert_in("artifact:blah", model)
nose.tools.assert_equal(model["artifact:blah"], {"cat":"dog"})
nose.tools.assert_in("artifact:output", model)
nose.tools.assert_equal(model["artifact:output"], True)
nose.tools.assert_in("input-artifacts", model)
nose.tools.assert_equal(set(model["input-artifacts"]), set(["foo", "baz", "blah"]))
nose.tools.assert_in("artifact-types", model)
nose.tools.assert_equal(model["artifact-types"], {"foo":"json", "baz":"json", "blah":"json", "output":"json"})
connection.delete_model(mid)
connection.delete_project(pid)
process_app_events()
assert_equal(measure.status, 'COMPLETED')
while engine.active:
sleep(0.1)
counter += 1
if counter > 300:
raise Exception('Engine took too long to exit.')
# Check the engine exited properly.
assert_false(engine._force_stop.is_set())
# Check log.
process_app_events()
assert_not_in('test', workspace.log_model.text)
assert_in('test',
workspace.dock_area.find('subprocess_log').model.text)
def test_function_names_for_unloaded_libraries():
path = os.path.join(test_location, 'i386', 'fauxware_pie')
proj = angr.Project(path, load_options={'auto_load_libs': False})
cfg = proj.analyses.CFGFast()
function_names = [ f.name if not f.is_plt else 'plt_' + f.name for f in cfg.functions.values() ]
nose.tools.assert_in('plt_puts', function_names)
nose.tools.assert_in('plt_read', function_names)
nose.tools.assert_in('plt___stack_chk_fail', function_names)
nose.tools.assert_in('plt_exit', function_names)
nose.tools.assert_in('puts', function_names)
nose.tools.assert_in('read', function_names)
nose.tools.assert_in('__stack_chk_fail', function_names)
nose.tools.assert_in('exit', function_names)
def check_second_symbolic_fork(state):
succs = state.inspect.sim_successors.successors
succ_addr = [hex(s.addr) for s in succs]
nose.tools.assert_equal(len(succ_addr), 2)
nose.tools.assert_in('0x4006dfL', succ_addr)
nose.tools.assert_in('0x4006e6L', succ_addr)
print('Fork after:', hex(state.addr))
print('Successors:', succ_addr)
channel_idx=self.channel_idx,
pos_idx=7,
slice_idx=16
)
exp_tile_indices = [[0, 5, 0, 5], [0, 5, 4, 9], [0, 5, 6, 11],
[10, 15, 0, 5], [10, 15, 4, 9], [10, 15, 6, 11],
[4, 9, 0, 5], [4, 9, 4, 9], [4, 9, 6, 11],
[8, 13, 0, 5], [8, 13, 4, 9], [8, 13, 6, 11]]
exp_tile_indices = np.asarray(exp_tile_indices, dtype='uint8')
row_ids = list(range(len(exp_tile_indices)))
for ret_idx in tile_indices:
row_idx = np.where((exp_tile_indices[:, 0] == ret_idx[0]) &
(exp_tile_indices[:, 1] == ret_idx[1]) &
(exp_tile_indices[:, 2] == ret_idx[2]) &
(exp_tile_indices[:, 3] == ret_idx[3]))
nose.tools.assert_in(row_idx[0], row_ids)
def test_file_read_missing_content():
# test in tracing mode since the Reverse operator will not be optimized away
s = angr.SimState(arch='AMD64', mode="tracing")
fd = s.posix.open(b"/tmp/oops", Flags.O_RDWR)
length = s.posix.get_fd(fd).read(0xc00000, 100)
data = s.memory.load(0xc00000, length, endness="Iend_BE")
nose.tools.assert_not_equal(data.op, 'Reverse', "Byte strings read directly out of a file should not have Reverse "
"operators.")
nose.tools.assert_equal(data.op, "BVS")
nose.tools.assert_equal(len(data.variables), 1)
nose.tools.assert_in("oops", next(iter(data.variables))) # file name should be part of the variable name
# Verify that variables declared in previous cells can be affected
third_cell_output = result[0]["cells"][2]["outputs"]
assert third_cell_output[0].text == "a=1\n"
# Make sure logs were persisted
log_dir = get_hyperdash_logs_home_path_for_job("test_jupyter")
latest_log_file = max([
os.path.join(log_dir, filename) for
filename in
os.listdir(log_dir)
], key=os.path.getmtime)
with open(latest_log_file, 'r') as log_file:
data = log_file.read()
for log in expected_logs:
assert_in(log, data)
os.remove(latest_log_file)