Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
if isinstance(stix_data, list):
# STIX objects are in a list- recurse on each object
for stix_obj in stix_data:
_add(store, stix_obj, allow_custom, version)
elif stix_data["type"] == "bundle":
# adding a json bundle - so just grab STIX objects
for stix_obj in stix_data.get("objects", []):
_add(store, stix_obj, allow_custom, version)
else:
# Adding a single non-bundle object
if isinstance(stix_data, _STIXBase):
stix_obj = stix_data
else:
stix_obj = parse(stix_data, allow_custom, version)
# Map ID directly to the object, if it is a marking. Otherwise,
# map to a family, so we can track multiple versions.
if is_marking(stix_obj):
store._data[stix_obj.id] = stix_obj
else:
if stix_obj.id in store._data:
obj_family = store._data[stix_obj.id]
else:
obj_family = _ObjectFamily()
store._data[stix_obj.id] = obj_family
obj_family.add(stix_obj)
if isinstance(stix_data, list):
# STIX objects are in a list- recurse on each object
for stix_obj in stix_data:
_add(store, stix_obj, allow_custom, version)
elif stix_data["type"] == "bundle":
# adding a json bundle - so just grab STIX objects
for stix_obj in stix_data.get("objects", []):
_add(store, stix_obj, allow_custom, version)
else:
# Adding a single non-bundle object
if isinstance(stix_data, _STIXBase):
stix_obj = stix_data
else:
stix_obj = parse(stix_data, allow_custom, version)
# Map ID directly to the object, if it is a marking. Otherwise,
# map to a family, so we can track multiple versions.
if _is_marking(stix_obj):
store._data[stix_obj["id"]] = stix_obj
else:
if stix_obj["id"] in store._data:
obj_family = store._data[stix_obj["id"]]
else:
obj_family = _ObjectFamily()
store._data[stix_obj["id"]] = obj_family
obj_family.add(stix_obj)
# don't extract TAXII filters from query (to send to TAXII endpoint)
# as directly retrieving a STIX object by ID
try:
stix_objs = self.collection.get_object(stix_id)['objects']
stix_obj = list(apply_common_filters(stix_objs, query))
except HTTPError as e:
if e.response.status_code == 404:
# if resource not found or access is denied from TAXII server,
# return None
stix_obj = []
else:
raise DataSourceError("TAXII Collection resource returned error", e)
if len(stix_obj):
stix_obj = parse(stix_obj[0], allow_custom=self.allow_custom)
if stix_obj.id != stix_id:
# check - was added to handle erroneous TAXII servers
stix_obj = None
else:
stix_obj = None
return stix_obj
``stix_data`` can be a Bundle object, but each object in it will be
saved separately; you will be able to retrieve any of the objects
the Bundle contained, but not the Bundle itself.
"""
if isinstance(stix_data, (v20.Bundle, v21.Bundle)):
# recursively add individual STIX objects
for stix_obj in stix_data.get("objects", []):
self.add(stix_obj, version=version)
elif isinstance(stix_data, _STIXBase):
# adding python STIX object
self._check_path_and_write(stix_data)
elif isinstance(stix_data, (str, dict)):
stix_data = parse(stix_data, allow_custom=self.allow_custom, version=version)
self.add(stix_data, version=version)
elif isinstance(stix_data, list):
# recursively add individual STIX objects
for stix_obj in stix_data:
self.add(stix_obj)
else:
raise TypeError(
"stix_data must be a STIX object (or list of), "
"JSON formatted STIX (or list of), "
bundle = parse(stix_data, allow_custom=self.allow_custom).serialize(encoding='utf-8', ensure_ascii=False)
elif 'spec_version' in stix_data:
# If the spec_version is present, use new Bundle object...
bundle = v21.Bundle(stix_data, allow_custom=self.allow_custom).serialize(encoding='utf-8', ensure_ascii=False)
else:
bundle = v20.Bundle(stix_data, allow_custom=self.allow_custom).serialize(encoding='utf-8', ensure_ascii=False)
elif isinstance(stix_data, list):
# adding list of something - recurse on each
for obj in stix_data:
self.add(obj)
return
elif isinstance(stix_data, str):
# adding json encoded string of STIX content
stix_data = parse(stix_data, allow_custom=self.allow_custom)
if stix_data['type'] == 'bundle':
bundle = stix_data.serialize(encoding='utf-8', ensure_ascii=False)
elif 'spec_version' in stix_data:
# If the spec_version is present, use new Bundle object...
bundle = v21.Bundle(stix_data, allow_custom=self.allow_custom).serialize(encoding='utf-8', ensure_ascii=False)
else:
bundle = v20.Bundle(stix_data, allow_custom=self.allow_custom).serialize(encoding='utf-8', ensure_ascii=False)
else:
raise TypeError("stix_data must be as STIX object(or list of),json formatted STIX (or list of), or a json formatted STIX bundle")
self.collection.add_objects(bundle)
return self.factory.set_default_external_refs(*args, **kwargs)
set_default_external_refs.__doc__ = ObjectFactory.set_default_external_refs.__doc__
def set_default_object_marking_refs(self, *args, **kwargs):
return self.factory.set_default_object_marking_refs(*args, **kwargs)
set_default_object_marking_refs.__doc__ = ObjectFactory.set_default_object_marking_refs.__doc__
def add_filters(self, *args, **kwargs):
return self.source.filters.add(*args, **kwargs)
def add_filter(self, *args, **kwargs):
return self.source.filters.add(*args, **kwargs)
def parse(self, *args, **kwargs):
return _parse(*args, **kwargs)
parse.__doc__ = _parse.__doc__
def creator_of(self, obj):
"""Retrieve the Identity refered to by the object's `created_by_ref`.
Args:
obj: The STIX object whose `created_by_ref` property will be looked
up.
Returns:
str: The STIX object's creator, or None, if the object contains no
`created_by_ref` property or the object's creator cannot be
found.
"""
creator_id = obj.get('created_by_ref', '')
if creator_id:
# apply local (CompositeDataSource, TAXIICollectionSource and query) filters
query.remove(taxii_filters)
all_data = list(apply_common_filters(all_data, query))
except HTTPError as e:
# if resources not found or access is denied from TAXII server, return empty list
if e.response.status_code == 404:
raise DataSourceError(
"The requested STIX objects for the TAXII Collection resource defined in"
" the supplied TAXII Collection object are either not found or access is"
" denied. Received error: ", e,
)
# parse python STIX objects from the STIX object dicts
stix_objs = [parse(stix_obj_dict, allow_custom=self.allow_custom) for stix_obj_dict in all_data]
return stix_objs