Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def testInitialize(self):
"""Tests the __init__ function."""
source_type.WindowsRegistryValueSourceType(
key_value_pairs=[{'key': u'test', 'value': u'test'}])
with self.assertRaises(errors.FormatError):
source_type.WindowsRegistryValueSourceType(
key_value_pairs=[{'bad': u'test', 'value': u'test'}])
with self.assertRaises(errors.FormatError):
source_type.WindowsRegistryValueSourceType(
key_value_pairs={'bad': u'test', 'value': u'test'})
def __init__(self, key_value_pairs=None):
"""Initializes a source type.
Args:
key_value_pairs (Optional[list[tuple[str, str]]]): key path and value
name pairs, where key paths are relative to the root of the Windows
Registry.
Raises:
FormatError: when key value pairs is not set.
"""
if not key_value_pairs:
raise errors.FormatError('Missing key value pairs value.')
if not isinstance(key_value_pairs, list):
raise errors.FormatError('key_value_pairs must be a list')
for pair in key_value_pairs:
if not isinstance(pair, dict):
raise errors.FormatError('key_value_pair must be a dict')
if set(pair.keys()) != set(['key', 'value']):
key_value_pairs = ', '.join([
'{0:s}: {1:s}'.format(key, value) for key, value in key_value_pairs
])
error_message = (
'key_value_pair missing "key" and "value" keys, got: '
'{0:s}').format(key_value_pairs)
raise errors.FormatError(error_message)
name (str): name of the artifact definition.
Raises:
FormatError: if there are undefined supported operating systems.
"""
supported_os = definition_values.get('supported_os', [])
if not isinstance(supported_os, list):
raise errors.FormatError(
'Invalid supported_os type: {0!s}'.format(type(supported_os)))
undefined_supported_os = set(supported_os).difference(self.supported_os)
if undefined_supported_os:
error_string = (
'Artifact definition: {0:s} undefined supported operating system: '
'{1:s}.').format(name, ', '.join(undefined_supported_os))
raise errors.FormatError(error_string)
definition_object.supported_os = supported_os
if required_type is basestring:
default = ""
else:
default = required_type()
if required_type is None and default is not None:
required_type = type(default)
if not field.get("optional"):
if name not in data:
raise errors.FormatError(
u'Missing fields {}.'.format(name))
value = data.get(name, default)
if default is not None and not isinstance(value, required_type):
raise errors.FormatError(
u'field {} has type {} should be {}.'.format(
name, type(data[name]), required_type))
if field.get("checker"):
value = field["checker"](self, data)
setattr(self, name, value)
Returns:
SourceType: a source type.
Raises:
FormatError: if the type indicator is not set or unsupported,
or if required attributes are missing.
"""
if not type_indicator:
raise errors.FormatError('Missing type indicator.')
try:
source_object = registry.ArtifactDefinitionsRegistry.CreateSourceType(
type_indicator, attributes)
except (AttributeError, TypeError) as exception:
raise errors.FormatError((
'Unable to create source type: {0:s} for artifact definition: {1:s} '
'with error: {2!s}').format(type_indicator, self.name, exception))
self.sources.append(source_object)
return source_object
'No such artifacts filter file: {0:s}.'.format(custom_artifacts_path))
if custom_artifacts_path:
logger.info(
'Custom artifact filter file: {0:s}'.format(custom_artifacts_path))
registry = artifacts_registry.ArtifactDefinitionsRegistry()
reader = artifacts_reader.YamlArtifactsReader()
logger.info(
'Determined artifact definitions path: {0:s}'.format(artifacts_path))
try:
registry.ReadFromDirectory(reader, artifacts_path)
except (KeyError, artifacts_errors.FormatError) as exception:
raise errors.BadConfigOption((
'Unable to read artifact definitions from: {0:s} with error: '
'{1!s}').format(artifacts_path, exception))
for name in preprocessors_manager.PreprocessPluginsManager.GetNames():
if not registry.GetDefinitionByName(name):
raise errors.BadConfigOption(
'Missing required artifact definition: {0:s}'.format(name))
if custom_artifacts_path:
try:
registry.ReadFromFile(reader, custom_artifacts_path)
except (KeyError, artifacts_errors.FormatError) as exception:
raise errors.BadConfigOption((
'Unable to read artifact definitions from: {0:s} with error: '
def __init__(self, path_list=None, **kwargs):
"""Initializes the collector definition object.
Args:
path_list: optional list of path strings. The default is None.
Raises:
FormatError: when path_list is not set.
"""
if not path_list:
raise errors.FormatError(u'Missing path_list value.')
super(FileCollectorDefinition, self).__init__(**kwargs)
self.path_list = path_list
subclass. This function raises FormatError if an unsupported source type
indicator is encountered.
Args:
type_indicator (str): source type indicator.
attributes (dict[str, object]): source attributes.
Returns:
SourceType: a source type.
Raises:
FormatError: if the type indicator is not set or unsupported,
or if required attributes are missing.
"""
if not type_indicator:
raise errors.FormatError('Missing type indicator.')
try:
source_object = registry.ArtifactDefinitionsRegistry.CreateSourceType(
type_indicator, attributes)
except (AttributeError, TypeError) as exception:
raise errors.FormatError((
'Unable to create source type: {0:s} for artifact definition: {1:s} '
'with error: {2!s}').format(type_indicator, self.name, exception))
self.sources.append(source_object)
return source_object
def __init__(self, query=None, type_name=None, fields=None):
if not query:
raise errors.FormatError(u'Missing query value.')
super(RekallEfilter, self).__init__()
self.type_name = type_name
self.fields = fields or []
def CreateSourceType(cls, type_indicator, attributes):
"""Creates a source type object.
Args:
type_indicator (str): source type indicator.
attributes (dict[str, object]): source attributes.
Returns:
SourceType: a source type.
Raises:
FormatError: if the type indicator is not set or unsupported,
or if required attributes are missing.
"""
if type_indicator not in cls._source_type_classes:
raise errors.FormatError(
'Unsupported type indicator: {0:s}.'.format(type_indicator))
return cls._source_type_classes[type_indicator](**attributes)