Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def testProcessSources(self):
"""Tests the PreprocessSources and ProcessSources function."""
artifacts_path = shared_test_lib.GetTestFilePath(['artifacts'])
self._SkipIfPathNotExists(artifacts_path)
registry = artifacts_registry.ArtifactDefinitionsRegistry()
reader = artifacts_reader.YamlArtifactsReader()
registry.ReadFromDirectory(reader, artifacts_path)
test_engine = task_engine.TaskMultiProcessEngine(
maximum_number_of_tasks=100)
test_file_path = self._GetTestFilePath(['ímynd.dd'])
self._SkipIfPathNotExists(test_file_path)
os_path_spec = path_spec_factory.Factory.NewPathSpec(
dfvfs_definitions.TYPE_INDICATOR_OS, location=test_file_path)
source_path_spec = path_spec_factory.Factory.NewPathSpec(
dfvfs_definitions.TYPE_INDICATOR_TSK, location='/',
parent=os_path_spec)
test_engine.PreprocessSources(registry, [source_path_spec])
def testCollectFromFileSystem(self):
"""Tests the CollectFromFileSystem function."""
artifacts_path = self._GetTestFilePath(['artifacts'])
self._SkipIfPathNotExists(artifacts_path)
registry = artifacts_registry.ArtifactDefinitionsRegistry()
reader = artifacts_reader.YamlArtifactsReader()
registry.ReadFromDirectory(reader, artifacts_path)
knowledge_base_object = knowledge_base_library.KnowledgeBase()
_ = knowledge_base_object
def testPreprocessSources(self):
"""Tests the PreprocessSources function."""
test_file_path = self._GetTestFilePath(['SOFTWARE'])
self._SkipIfPathNotExists(test_file_path)
test_file_path = self._GetTestFilePath(['SYSTEM'])
self._SkipIfPathNotExists(test_file_path)
test_artifacts_path = shared_test_lib.GetTestFilePath(['artifacts'])
self._SkipIfPathNotExists(test_artifacts_path)
registry = artifacts_registry.ArtifactDefinitionsRegistry()
reader = artifacts_reader.YamlArtifactsReader()
registry.ReadFromDirectory(reader, test_artifacts_path)
test_engine = TestEngine()
source_path_spec = path_spec_factory.Factory.NewPathSpec(
dfvfs_definitions.TYPE_INDICATOR_FAKE, location='/')
test_engine.PreprocessSources(registry, [source_path_spec])
operating_system = test_engine.knowledge_base.GetValue('operating_system')
self.assertEqual(operating_system, 'Windows NT')
test_engine.PreprocessSources(registry, [None])
def testProcessSources(self):
"""Tests the ProcessSources function."""
test_artifacts_path = self._GetTestFilePath(['artifacts'])
self._SkipIfPathNotExists(test_artifacts_path)
test_file_path = self._GetTestFilePath(['ímynd.dd'])
self._SkipIfPathNotExists(test_file_path)
registry = artifacts_registry.ArtifactDefinitionsRegistry()
reader = artifacts_reader.YamlArtifactsReader()
registry.ReadFromDirectory(reader, test_artifacts_path)
test_engine = single_process.SingleProcessEngine()
resolver_context = context.Context()
session = sessions.Session()
os_path_spec = path_spec_factory.Factory.NewPathSpec(
dfvfs_definitions.TYPE_INDICATOR_OS, location=test_file_path)
source_path_spec = path_spec_factory.Factory.NewPathSpec(
dfvfs_definitions.TYPE_INDICATOR_TSK, location='/',
parent=os_path_spec)
test_engine.PreprocessSources(registry, [source_path_spec])
storage_writer = fake_writer.FakeStorageWriter(session)
def setUpClass(cls):
"""Makes preparations before running any of the tests."""
artifacts_path = shared_test_lib.GetTestFilePath(['artifacts'])
cls._artifacts_registry = artifacts_registry.ArtifactDefinitionsRegistry()
reader = artifacts_reader.YamlArtifactsReader()
cls._artifacts_registry.ReadFromDirectory(reader, artifacts_path)
def add(current, artifact):
"""Adds a new artifact to the database."""
db = current.db
decoded_artifacts = []
artifact_snippets = re.split("^---$", artifact, flags=re.M | re.S)
for snippet in artifact_snippets:
decoded_artifact = yaml.safe_load(snippet)
if not decoded_artifact:
continue
decoded_artifact = artifacts.Artifact.from_primitive(decoded_artifact)
decoded_artifacts.append((decoded_artifact, snippet))
for decoded_artifact, artifact_text in decoded_artifacts:
artifact_reader = reader.YamlArtifactsReader()
definition = artifact_reader.ReadArtifactDefinitionValues(
decoded_artifact.to_primitive(False))
if is_definition_in_db(current, definition.name):
raise ValueError("Artifact name %s already in database." %
definition.name)
for source in definition.sources:
if (source.type_indicator ==
definitions.TYPE_INDICATOR_ARTIFACT_GROUP):
if not is_definition_in_db(current, source):
raise ValueError(
"Artifact group references %s which "
"is not known yet." % source)
db.artifacts.insert(
name=decoded_artifact.name,
raise errors.BadConfigOption(
'Unable to determine path to artifact definitions.')
custom_artifacts_path = getattr(
options, 'custom_artifact_definitions_path', None)
if custom_artifacts_path and not os.path.isfile(custom_artifacts_path):
raise errors.BadConfigOption(
'No such artifacts filter file: {0:s}.'.format(custom_artifacts_path))
if custom_artifacts_path:
logger.info(
'Custom artifact filter file: {0:s}'.format(custom_artifacts_path))
registry = artifacts_registry.ArtifactDefinitionsRegistry()
reader = artifacts_reader.YamlArtifactsReader()
logger.info(
'Determined artifact definitions path: {0:s}'.format(artifacts_path))
try:
registry.ReadFromDirectory(reader, artifacts_path)
except (KeyError, artifacts_errors.FormatError) as exception:
raise errors.BadConfigOption((
'Unable to read artifact definitions from: {0:s} with error: '
'{1!s}').format(artifacts_path, exception))
for name in preprocessors_manager.PreprocessPluginsManager.GetNames():
if not registry.GetDefinitionByName(name):
raise errors.BadConfigOption(
'Missing required artifact definition: {0:s}'.format(name))
def BuildStats(self):
"""Builds the statistics."""
artifact_reader = reader.YamlArtifactsReader()
self._label_counts = {}
self._os_counts = {}
self._path_count = 0
self._reg_key_count = 0
self._source_type_counts = {}
self._total_count = 0
for artifact_definition in artifact_reader.ReadDirectory('data'):
if hasattr(artifact_definition, 'labels'):
for label in artifact_definition.labels:
self._label_counts[label] = self._label_counts.get(label, 0) + 1
for source in artifact_definition.sources:
self._total_count += 1
source_type = source.type_indicator
self._source_type_counts[source_type] = self._source_type_counts.get(
Raises:
BadConfigOption: if artifact definitions cannot be read.
"""
if artifact_definitions_path and not os.path.isdir(
artifact_definitions_path):
raise errors.BadConfigOption(
'No such artifacts filter file: {0:s}.'.format(
artifact_definitions_path))
if custom_artifacts_path and not os.path.isfile(custom_artifacts_path):
raise errors.BadConfigOption(
'No such artifacts filter file: {0:s}.'.format(custom_artifacts_path))
registry = artifacts_registry.ArtifactDefinitionsRegistry()
reader = artifacts_reader.YamlArtifactsReader()
try:
registry.ReadFromDirectory(reader, artifact_definitions_path)
except (KeyError, artifacts_errors.FormatError) as exception:
raise errors.BadConfigOption((
'Unable to read artifact definitions from: {0:s} with error: '
'{1!s}').format(artifact_definitions_path, exception))
if custom_artifacts_path:
try:
registry.ReadFromFile(reader, custom_artifacts_path)
except (KeyError, artifacts_errors.FormatError) as exception:
raise errors.BadConfigOption((
'Unable to read artifact definitions from: {0:s} with error: '
def CheckFile(self, filename):
"""Validates the artifacts definition in a specific file.
Args:
filename (str): name of the artifacts definition file.
Returns:
bool: True if the file contains valid artifacts definitions.
"""
result = True
artifact_reader = reader.YamlArtifactsReader()
try:
for artifact_definition in artifact_reader.ReadFile(filename):
try:
self._artifact_registry.RegisterDefinition(artifact_definition)
except KeyError:
logging.warning(
'Duplicate artifact definition: {0:s} in file: {1:s}'.format(
artifact_definition.name, filename))
result = False
artifact_definition_supports_macos = (
definitions.SUPPORTED_OS_DARWIN in (
artifact_definition.supported_os))
artifact_definition_supports_windows = (
definitions.SUPPORTED_OS_WINDOWS in (