Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
name, type(data[name]), required_type))
if field.get("checker"):
value = field["checker"](self, data)
setattr(self, name, value)
class SourceType(_FieldDefinitionValidator):
"""All sources inherit from this."""
# Common fields for all sources.
_common_fields = [
dict(name="type", optional=False),
dict(name="supported_os", optional=True, type=list,
default=list(definitions.SUPPORTED_OS)),
]
def __init__(self, source_definition, artifact=None):
attributes = source_definition["attributes"]
# The artifact that owns us.
self.artifact = artifact
self.source_definition = source_definition
self.type_indicator = source_definition["type"]
self._LoadFieldDefinitions(attributes, self._field_definitions)
self._LoadFieldDefinitions(source_definition, self._common_fields)
def is_active(self, **_):
"""Indicates if the source is applicable to the environment."""
return True
def apply(self, artifact_name=None, fields=None, result_type=None, **_):
dict(name="supported_os", optional=True,
default=definitions.SUPPORTED_OS),
]
def apply(self, collector=None, **_):
for name in self.names:
for result in collector.collect_artifact(name):
yield result
class WMISourceType(LiveModeSourceMixin, SourceType):
_field_definitions = [
dict(name="query", type=basestring),
dict(name="fields", type=list, optional=True, default=[]),
dict(name="type_name", type=basestring, optional=True),
dict(name="supported_os", optional=True,
default=definitions.SUPPORTED_OS),
]
fields = None
def _guess_returned_fields(self, sample):
result = []
for key, value in sample.items():
field_type = type(value)
if field_type is int:
field_type = "int"
elif field_type is str:
field_type = "unicode"
else:
field_type = "unicode"
result.append(dict(name=key, type=field_type))
def __init__(self):
"""Initializes an artifacts reader."""
super(ArtifactsReader, self).__init__()
self.labels = set(definitions.LABELS)
self.supported_os = set(definitions.SUPPORTED_OS)
def SupportedOS(self, art_definition):
supported_os = art_definition.get(
"supported_os", definitions.SUPPORTED_OS)
undefined_supported_os = set(supported_os).difference(
definitions.SUPPORTED_OS)
if undefined_supported_os:
raise errors.FormatError(
u'supported operating system: {} '
u'not defined.'.format(
u', '.join(undefined_supported_os)))
return supported_os
def SupportedOS(self, art_definition):
supported_os = art_definition.get(
"supported_os", definitions.SUPPORTED_OS)
undefined_supported_os = set(supported_os).difference(
definitions.SUPPORTED_OS)
if undefined_supported_os:
raise errors.FormatError(
u'supported operating system: {} '
u'not defined.'.format(
u', '.join(undefined_supported_os)))
return supported_os