Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def __init__(self):
self.ext_plugins = stevedore.ExtensionManager(
'tempest.test_plugins', invoke_on_load=True,
propagate_map_exceptions=True,
on_load_failure_callback=self.failure_hook)
self._register_service_clients()
"uuid": "144e08f4-00cb-11e2-888e-5453ed1bbb5f",
"system_metadata": {}}
self.stubs.Set(db, 'instance_info_cache_delete', self.do_nothing)
self.stubs.Set(db, 'instance_destroy', self.do_nothing)
self.stubs.Set(db, 'instance_system_metadata_get',
self.fake_db_instance_system_metadata_get)
self.stubs.Set(db, 'block_device_mapping_get_all_by_instance',
lambda context, instance: {})
self.stubs.Set(db, 'instance_update_and_get_original',
lambda context, uuid, kwargs: (self.instance,
self.instance))
agent_manager = manager.AgentManager()
agent_manager.pollster_manager = \
test_manager.TestExtensionManager([
extension.Extension('test',
None,
None,
self.Pollster(),
),
])
nova_notifier.initialize_manager(agent_manager)
def test_udp_receive_storage_error(self):
mock_dispatcher = mock.MagicMock()
self.srv.dispatcher_manager = test_manager.TestExtensionManager(
[extension.Extension('test',
None,
None,
mock_dispatcher
),
])
mock_dispatcher.record_metering_data.side_effect = self._raise_error
self.counter['source'] = 'mysource'
self.counter['counter_name'] = self.counter['name']
self.counter['counter_volume'] = self.counter['volume']
self.counter['counter_type'] = self.counter['type']
self.counter['counter_unit'] = self.counter['unit']
with patch('socket.socket', self._make_fake_socket):
self.srv.start()
def get_insert_pre_hourly_cmpt_mgr():
return ExtensionManager.make_test_instance([Extension(
'prepare_data',
'monasca_transform.component.insert.prepare_data:PrepareData',
PrepareData(),
None),
Extension('insert_data',
'tests.functional.component.insert.dummy_insert:'
'DummyInsert',
DummyInsert(),
None),
Extension('insert_data_pre_hourly',
'tests.functional.component.insert.'
'dummy_insert_pre_hourly:'
'DummyInsertPreHourly',
DummyInsertPreHourly(),
None),
])
self.stubs.Set(db, 'instance_destroy', self.do_nothing)
self.stubs.Set(db, 'instance_system_metadata_get',
self.fake_db_instance_system_metadata_get)
self.stubs.Set(db, 'block_device_mapping_get_all_by_instance',
lambda context, instance: {})
self.stubs.Set(db, 'instance_update_and_get_original',
lambda *args, **kwargs: (self.instance, self.instance))
self.stubs.Set(flavors, 'extract_flavor', self.fake_extract_flavor)
# Set up to capture the notification messages generated by the
# plugin and to invoke our notifier plugin.
self.notifications = []
ext_mgr = extension.ExtensionManager.make_test_instance(
extensions=[
extension.Extension('test',
None,
None,
self.Pollster(),
),
],
)
self.ext_mgr = ext_mgr
self.gatherer = nova_notifier.DeletedInstanceStatsGatherer(ext_mgr)
# Initialize the global _gatherer in nova_notifier to use the
# gatherer in this test instead of the gatherer in nova_notifier.
nova_notifier.initialize_gatherer(self.gatherer)
# Terminate the instance to trigger the notification.
with contextlib.nested(
# Under Grizzly, Nova has moved to no-db access on the
# compute node. The compute manager uses RPC to talk to
def test_message_to_event_missing_keys(self):
now = timeutils.utcnow()
timeutils.set_time_override(now)
message = {'event_type': "foo",
'message_id': "abc",
'publisher_id': "1"}
mock_dispatcher = mock.MagicMock()
self.srv.dispatcher_manager = test_manager.TestExtensionManager(
[extension.Extension('test',
None,
None,
mock_dispatcher
),
])
with patch('ceilometer.collector.service.LOG') as mylog:
self.srv._message_to_event(message)
self.assertFalse(mylog.exception.called)
events = mock_dispatcher.record_events.call_args[0]
self.assertEqual(1, len(events))
event = events[0]
self.assertEqual("foo", event.event_name)
self.assertEqual(now, event.generated)
self.assertEqual(1, len(event.traits))
def create_extension_manager(self):
return extension_tests.TestExtensionManager(
[
extension.Extension(
'test',
None,
None,
self.Pollster(), ),
extension.Extension(
'testanother',
None,
None,
self.PollsterAnother(), ),
extension.Extension(
'testexception',
None,
None,
self.PollsterException(), ),
def setUp(self):
super(TestPartitionedAlarmService, self).setUp()
self.threshold_eval = mock.Mock()
self.api_client = mock.MagicMock()
self.CONF = self.useFixture(config.Config()).conf
self.CONF.set_override('host',
'fake_host')
self.CONF.set_override('partition_rpc_topic',
'fake_topic',
group='alarm')
self.partitioned = service.PartitionedAlarmService()
self.partitioned.tg = mock.Mock()
self.partitioned.partition_coordinator = mock.Mock()
self.extension_mgr = extension_tests.TestExtensionManager(
[
extension.Extension(
'threshold',
None,
None,
self.threshold_eval, ),
])
self.partitioned.extension_manager = self.extension_mgr
def test_message_to_event_duplicate(self):
self.CONF.set_override("store_events", True, group="collector")
mock_dispatcher = mock.MagicMock()
self.srv.dispatcher_manager = test_manager.TestExtensionManager(
[extension.Extension('test',
None,
None,
mock_dispatcher
),
])
mock_dispatcher.record_events.return_value = [
(models.Event.DUPLICATE, object())]
message = {'event_type': "foo", 'message_id': "abc"}
self.srv._message_to_event(message) # Should return silently.
def test_udp_receive(self):
mock_dispatcher = mock.MagicMock()
self.srv.dispatcher_manager = test_manager.TestExtensionManager(
[extension.Extension('test',
None,
None,
mock_dispatcher
),
])
self.counter['source'] = 'mysource'
self.counter['counter_name'] = self.counter['name']
self.counter['counter_volume'] = self.counter['volume']
self.counter['counter_type'] = self.counter['type']
self.counter['counter_unit'] = self.counter['unit']
with patch('socket.socket', self._make_fake_socket):
self.srv.start()
mock_dispatcher.record_metering_data.assert_called_once_with(