Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
# At this stage, we only handle the case where __reduce__ returns a string
# other reduce functionality is implemented further down
if isinstance(reduce_val, (str, unicode)):
varpath = iter(reduce_val.split('.'))
# curmod will be transformed by the loop into the value to pickle
curmod = sys.modules[next(varpath)]
for modname in varpath:
curmod = getattr(curmod, modname)
# replace obj with value retrieved
return self._flatten(curmod)
except KeyError:
# well, we can't do anything with that, so we ignore it
pass
if has_getnewargs_ex:
data[tags.NEWARGSEX] = list(map(self._flatten, obj.__getnewargs_ex__()))
if has_getnewargs and not has_getnewargs_ex:
data[tags.NEWARGS] = self._flatten(obj.__getnewargs__())
if has_getinitargs:
data[tags.INITARGS] = self._flatten(obj.__getinitargs__())
if has_getstate:
try:
state = obj.__getstate__()
except TypeError:
# Has getstate but it cannot be called, e.g. file descriptors
# in Python3
self._pickle_warning(obj)
return None
else:
if util.is_iterator(obj):
# force list in python 3
data[tags.ITERATOR] = list(map(self._flatten, islice(obj, self._max_iter)))
return data
if reduce_val and not isinstance(reduce_val, (str, unicode)):
# at this point, reduce_val should be some kind of iterable
# pad out to len 5
rv_as_list = list(reduce_val)
insufficiency = 5 - len(rv_as_list)
if insufficiency:
rv_as_list+=[None]*insufficiency
if rv_as_list[0].__name__ == '__newobj__':
rv_as_list[0] = tags.NEWOBJ
data[tags.REDUCE] = list(map(self._flatten, rv_as_list))
# lift out iterators, so we don't have to iterator and uniterator their content
# on unpickle
if data[tags.REDUCE][3]:
data[tags.REDUCE][3] = data[tags.REDUCE][3][tags.ITERATOR]
if data[tags.REDUCE][4]:
data[tags.REDUCE][4] = data[tags.REDUCE][4][tags.ITERATOR]
return data
if has_dict:
# Support objects that subclasses list and set
if util.is_sequence_subclass(obj):
def _mktyperef(obj):
"""Return a typeref dictionary
>>> _mktyperef(AssertionError) == {'py/type': 'builtins.AssertionError'}
True
"""
return {tags.TYPE: util.importable_name(obj)}
def _restore(self, obj):
if has_tag(obj, tags.ID):
restore = self._restore_id
elif has_tag(obj, tags.REF): # Backwards compatibility
restore = self._restore_ref
elif has_tag(obj, tags.ITERATOR):
restore = self._restore_iterator
elif has_tag(obj, tags.TYPE):
restore = self._restore_type
elif has_tag(obj, tags.REPR): # Backwards compatibility
restore = self._restore_repr
elif has_tag(obj, tags.REDUCE):
restore = self._restore_reduce
elif has_tag(obj, tags.OBJECT):
restore = self._restore_object
elif has_tag(obj, tags.FUNCTION):
restore = self._restore_function
elif util.is_list(obj):
def _getref(self, obj):
return {tags.ID: self._objs.get(id(obj))}
def _restore_object_instance_variables(self, obj, instance):
self._restore_from_dict(obj, instance)
# Handle list and set subclasses
if has_tag(obj, tags.SEQ):
if hasattr(instance, 'append'):
for v in obj[tags.SEQ]:
instance.append(self._restore(v))
if hasattr(instance, 'add'):
for v in obj[tags.SEQ]:
instance.add(self._restore(v))
if has_tag(obj, tags.STATE):
instance = self._restore_state(obj, instance)
return instance
# check that getstate/setstate is sane
if not (state and hasattr(obj, '__getstate__')
and not hasattr(obj, '__setstate__')
and not isinstance(obj, dict)):
# turn iterators to iterables for convenient serialization
if rv_as_list[3]:
rv_as_list[3] = tuple(rv_as_list[3])
if rv_as_list[4]:
rv_as_list[4] = tuple(rv_as_list[4])
reduce_args = list(map(self._flatten, rv_as_list))
last_index = len(reduce_args) - 1
while last_index >= 2 and reduce_args[last_index] is None:
last_index -= 1
data[tags.REDUCE] = reduce_args[:last_index+1]
return data
if has_class and not util.is_module(obj):
if self.unpicklable:
data[tags.OBJECT] = class_name
if has_getnewargs_ex:
data[tags.NEWARGSEX] = list(
map(self._flatten, obj.__getnewargs_ex__()))
if has_getnewargs and not has_getnewargs_ex:
data[tags.NEWARGS] = self._flatten(obj.__getnewargs__())
if has_getinitargs:
data[tags.INITARGS] = self._flatten(obj.__getinitargs__())
if util.is_dictionary_subclass(obj):
return self._flatten_dict_obj(obj, data)
if util.is_noncomplex(obj):
return [self.flatten(v) for v in obj]
if has_dict:
# Support objects that subclasses list and set
if util.is_collection_subclass(obj):
return self._flatten_collection_obj(obj, data)
# Support objects with __getstate__(); this ensures that
# both __setstate__() and __getstate__() are implemented
if has_getstate_support:
data[tags.STATE] = self.flatten(obj.__getstate__())
return data
# hack for zope persistent objects; this unghostifies the object
getattr(obj, '_', None)
return self._flatten_dict_obj(obj.__dict__, data)
if has_slots:
return self._flatten_newstyle_with_slots(obj, data)
def getargs(obj):
"""Return arguments suitable for __new__()"""
# Let saved newargs take precedence over everything
if has_tag(obj, tags.NEWARGSEX):
raise ValueError("__newargs_ex__ returns both args and kwargs")
if has_tag(obj, tags.NEWARGS):
return obj[tags.NEWARGS]
if has_tag(obj, tags.INITARGS):
return obj[tags.INITARGS]
try:
seq_list = obj[tags.SEQ]
obj_dict = obj[tags.OBJECT]
except KeyError:
return []
typeref = loadclass(obj_dict)
if not typeref:
return []
if hasattr(typeref, '_fields'):
if len(typeref._fields) == len(seq_list):
return seq_list
def _getref(self, obj):
return {tags.ID: self._objs.get(id(obj))}