Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
crashstorage.std_crash_store.getJson(
'114559a5-d8e6-428c-8b88-1c1f22120504')))
ok_(
os.path.exists(
crashstorage.std_crash_store.getDump(
'114559a5-d8e6-428c-8b88-1c1f22120504')))
meta = crashstorage.get_raw_crash(
'114559a5-d8e6-428c-8b88-1c1f22120314')
ok_(isinstance(meta, DotDict))
eq_(meta['name'], 'Peter')
dump = crashstorage.get_raw_dump(
'114559a5-d8e6-428c-8b88-1c1f22120314')
ok_(isinstance(dump, basestring))
ok_("fake dump" in dump)
dumps = crashstorage.get_raw_dumps(
'114559a5-d8e6-428c-8b88-1c1f22120504'
)
eq_(['upload_file_minidump', 'aux01'], dumps.keys())
eq_(['this is a fake dump', 'aux01 fake dump'],
dumps.values())
crashstorage.remove('114559a5-d8e6-428c-8b88-1c1f22120314')
assert_raises(OSError,
crashstorage.std_crash_store.getJson,
'114559a5-d8e6-428c-8b88-1c1f22120314')
assert_raises(OSError,
crashstorage.std_crash_store.getDump,
'114559a5-d8e6-428c-8b88-1c1f22120314')
assert_raises(CrashIDNotFound,
def test_strings(self):
"""
Values whose values are not requested to be converted should be kept as
is.
"""
global_conf = {'debug': "yes"}
local_conf = {'parameter': "value"}
settings = _convert_options(global_conf, local_conf)
ok_("parameter" in settings)
eq_(settings['parameter'], "value")
def test_response_contains_nodes(self):
# Trick query to accept find node response
self.query.query = message.FIND_NODE
ok_(not self.got_response)
ok_(not self.got_routing_response)
ok_(not self.got_routing_nodes_found)
self.query.on_response_received(self.fn_r_in)
ok_(self.got_response)
ok_(self.got_routing_response)
ok_(self.got_routing_nodes_found)
def test_get_one_report_at_a_time(self):
signature_summary = SignatureSummary(config=self.config)
self.setup_data()
for test, data in self.test_source_data.items():
res = signature_summary.get(**data['params'])
ok_(isinstance(res, list))
eq_(res, data['res_expected'])
def test_cancel(self):
for i in xrange(5):
self.task_m.add(Task(.1, self.callback_f, i))
c_task = Task(.1, self.callback_f, 5)
self.task_m.add(c_task)
for i in xrange(6,10):
self.task_m.add(Task(.1, self.callback_f, i))
while True:
task = self.task_m.consume_task()
if task is None:
break
task.fire_callback()
logger.debug('%s' % self.callback_order)
assert self.callback_order == []
ok_(not c_task.cancelled)
c_task.cancel()
ok_(c_task.cancelled)
time.sleep(.1)
while True:
task = self.task_m.consume_task()
if task is None:
break
task.fire_callbacks()
logger.debug('%s' % self.callback_order)
assert self.callback_order == [0,1,2,3,4, 6,7,8,9]
# task 5 was cancelled
def test_check_unhealthy(urlopen):
esc = ElasticsearchChecker(EXAMPLE_URL)
response_mock = MagicMock()
response_mock.read.return_value = get_response("yellow")
response_mock.__enter__.return_value = response_mock
urlopen.return_value = response_mock
result, message = esc.check()
urlopen.assert_called_with(REQUEST_URL)
ok_(not result)
ok_("status is 'yellow'" in message, message)
def test_utf8_encoding(self):
k = "a key with a replacement character \ufffd and something non-BMP \U0001f4a3"
k_enc = k.encode('utf-8')
mc = make_test_client(binary=True)
ok_(mc.set(k, 0))
ok_(mc.get(k_enc) == 0)
def test_polling_command(self):
test_str = 'testing longPoll correctness'
# first sends a message to myself..
me = self.line_interface._client.getProfile()
me.sendMessage(test_str)
result = self.line_interface.polling_command()
ok_(len(result) == 1, result)
submitter, msg = result[0]
ok_(submitter.code == me.id, submitter)
ok_(msg == test_str,
'Message context not match: {} <-> {}'.format(msg, test_str))
def test_config_parsing_automatic_output_directory_creation():
train_dir = '../train'
train_file = join(train_dir, 'f0.jsonlines')
test_file = join(train_dir, 'f1.jsonlines')
# make a simple config file that has new directories that should
# be automatically created
output_dir = join(_my_dir, 'output')
new_log_path = join(output_dir, 'autolog')
new_results_path = join(output_dir, 'autoresults')
new_models_path = join(output_dir, 'automodels')
new_predictions_path = join(output_dir, 'autopredictions')
ok_(not(exists(new_log_path)))
ok_(not(exists(new_results_path)))
ok_(not(exists(new_models_path)))
ok_(not(exists(new_predictions_path)))
values_to_fill_dict = {'experiment_name': 'auto_dir_creation',
'task': 'evaluate',
'train_file': train_file,
'test_file': test_file,
'learners': "['LogisticRegression']",
'log': new_log_path,
'results': new_results_path,
'models': new_models_path,
'predictions': new_predictions_path,
'objectives': "['f1_score_micro']"}
config_template_path = join(_my_dir, 'configs',
def test_receive_two(self):
serialized_messages = self.streamer.serialize_for_stream({'name': "foo", 'value': 42})
serialized_messages += self.streamer.serialize_for_stream({'name': "bar", 'value': 24})
self.streamer.receive(serialized_messages)
message = self.streamer.parse_next_message()
ok_(message is not None)
eq_(message['name'], "foo")
eq_(message['value'], 42)
message = self.streamer.parse_next_message()
ok_(message is not None)
eq_(message['name'], "bar")
eq_(message['value'], 24)
eq_(None, self.streamer.parse_next_message())