Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
Args:
stix_data (STIX object OR dict OR str OR list): valid STIX2
content in a STIX object (or Bundle), STIX object dict (or
Bundle dict), or a STIX2 json encoded string, or list of
any of the following.
"""
if isinstance(stix_data, _STIXBase):
# adding python STIX object
if stix_data['type'] == 'bundle':
bundle = stix_data.serialize(encoding='utf-8', ensure_ascii=False)
elif 'spec_version' in stix_data:
# If the spec_version is present, use new Bundle object...
bundle = v21.Bundle(stix_data, allow_custom=self.allow_custom).serialize(encoding='utf-8', ensure_ascii=False)
else:
bundle = v20.Bundle(stix_data, allow_custom=self.allow_custom).serialize(encoding='utf-8', ensure_ascii=False)
elif isinstance(stix_data, dict):
# adding python dict (of either Bundle or STIX obj)
if stix_data['type'] == 'bundle':
bundle = parse(stix_data, allow_custom=self.allow_custom).serialize(encoding='utf-8', ensure_ascii=False)
elif 'spec_version' in stix_data:
# If the spec_version is present, use new Bundle object...
bundle = v21.Bundle(stix_data, allow_custom=self.allow_custom).serialize(encoding='utf-8', ensure_ascii=False)
else:
bundle = v20.Bundle(stix_data, allow_custom=self.allow_custom).serialize(encoding='utf-8', ensure_ascii=False)
elif isinstance(stix_data, list):
# adding list of something - recurse on each
for obj in stix_data:
self.add(obj)
return
Args:
stix_data (STIX object OR dict OR str OR list): valid STIX 2.0 content
in a STIX object (or list of), dict (or list of), or a STIX 2.0
json encoded string.
version (str): If present, it forces the parser to use the version
provided. Otherwise, the library will make the best effort based
on checking the "spec_version" property.
Note:
``stix_data`` can be a Bundle object, but each object in it will be
saved separately; you will be able to retrieve any of the objects
the Bundle contained, but not the Bundle itself.
"""
if isinstance(stix_data, (v20.Bundle, v21.Bundle)):
# recursively add individual STIX objects
for stix_obj in stix_data.get("objects", []):
self.add(stix_obj, version=version)
elif isinstance(stix_data, _STIXBase):
# adding python STIX object
self._check_path_and_write(stix_data)
elif isinstance(stix_data, (str, dict)):
stix_data = parse(stix_data, allow_custom=self.allow_custom, version=version)
self.add(stix_data, version=version)
elif isinstance(stix_data, list):
# recursively add individual STIX objects
for stix_obj in stix_data:
self.add(stix_obj)
def _check_path_and_write(self, stix_obj, encoding='utf-8'):
"""Write the given STIX object to a file in the STIX file directory.
"""
path = os.path.join(self._stix_dir, stix_obj['type'], stix_obj['id'] + '.json')
if not os.path.exists(os.path.dirname(path)):
os.makedirs(os.path.dirname(path))
if self.bundlify:
if 'spec_version' in stix_obj:
# Assuming future specs will allow multiple SDO/SROs
# versions in a single bundle we won't need to check this
# and just use the latest supported Bundle version.
stix_obj = v21.Bundle(stix_obj, allow_custom=self.allow_custom)
else:
stix_obj = v20.Bundle(stix_obj, allow_custom=self.allow_custom)
with io.open(path, 'w', encoding=encoding) as f:
stix_obj = stix_obj.serialize(pretty=True, encoding=encoding, ensure_ascii=False)
f.write(stix_obj)
def create_stix2_bundle(objects):
'''
Creates a STIX2 Bundle object from a list of objects.
Args:
objects: list of ``stix2`` SDO objects.
Returns:
A ``stix2.Bundle`` instance.
'''
if objects:
return v20.Bundle(objects=objects)
else:
return v20.Bundle()
def create_stix2_bundle(objects):
'''
Creates a STIX2 Bundle object from a list of objects.
Args:
objects: list of ``stix2`` SDO objects.
Returns:
A ``stix2.Bundle`` instance.
'''
if objects:
return v20.Bundle(objects=objects)
else:
return v20.Bundle()
def save_to_file(self, path, encoding="utf-8"):
path = os.path.abspath(path)
all_objs = list(itertools.chain.from_iterable(
value.all_versions.values() if isinstance(value, _ObjectFamily)
else [value]
for value in self._data.values()
))
if any("spec_version" in x for x in all_objs):
bundle = v21.Bundle(all_objs, allow_custom=self.allow_custom)
else:
bundle = v20.Bundle(all_objs, allow_custom=self.allow_custom)
if path.endswith(".json"):
if not os.path.exists(os.path.dirname(path)):
os.makedirs(os.path.dirname(path))
else:
if not os.path.exists(path):
os.makedirs(path)
# if the user only provided a directory, use the bundle id for filename
path = os.path.join(path, bundle["id"] + ".json")
with io.open(path, "w", encoding=encoding) as f:
bundle = bundle.serialize(pretty=True, encoding=encoding, ensure_ascii=False)
f.write(bundle)
return path
filename = _timestamp2filename(stix_obj["modified"])
obj_dir = os.path.join(type_dir, stix_obj["id"])
file_path = os.path.join(obj_dir, filename + ".json")
if not os.path.exists(obj_dir):
os.makedirs(obj_dir)
if self.bundlify:
if 'spec_version' in stix_obj:
# Assuming future specs will allow multiple SDO/SROs
# versions in a single bundle we won't need to check this
# and just use the latest supported Bundle version.
stix_obj = v21.Bundle(stix_obj, allow_custom=self.allow_custom)
else:
stix_obj = v20.Bundle(stix_obj, allow_custom=self.allow_custom)
# TODO: Better handling of the overwriting case.
if os.path.isfile(file_path):
print("Attempted to overwrite file!", file_path, file=sys.stderr)
else:
with io.open(file_path, 'w', encoding=encoding) as f:
stix_obj = stix_obj.serialize(pretty=True, encoding=encoding, ensure_ascii=False)
f.write(stix_obj)