Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
We only handle data asked by Spyder, in case people use
publish_data for other purposes.
"""
# Deserialize data
try:
data = deserialize_object(msg['buffers'])[0]
except Exception as msg:
self._kernel_value = None
self._kernel_reply = repr(msg)
self.sig_got_reply.emit()
return
# Receive values asked for Spyder
value = data.get('__spy_data__', None)
if value is not None:
if isinstance(value, CannedObject):
value = value.get_object()
self._kernel_value = value
self.sig_got_reply.emit()
return
# Receive Pdb state and dispatch it
pdb_state = data.get('__spy_pdb_state__', None)
if pdb_state is not None and isinstance(pdb_state, dict):
self.refresh_from_pdb(pdb_state)
# These dicts can be extended for custom serialization of new objects
can_map = {
'numpy.ndarray' : CannedArray,
FunctionType : CannedFunction,
bytes : CannedBytes,
memoryview : CannedMemoryView,
cell_type : CannedCell,
class_type : can_class,
}
if buffer is not memoryview:
can_map[buffer] = CannedBuffer
uncan_map = {
CannedObject : lambda obj, g: obj.get_object(g),
dict : uncan_dict,
}
# for use in _import_mapping:
_original_can_map = can_map.copy()
_original_uncan_map = uncan_map.copy()
def _restore_buffers(obj, buffers):
"""restore buffers extracted by """
if isinstance(obj, CannedObject) and obj.buffers:
for i,buf in enumerate(obj.buffers):
if buf is None:
obj.buffers[i] = buffers.pop(0)
return eval(self.name, g)
class CannedCell(CannedObject):
"""Can a closure cell"""
def __init__(self, cell):
self.cell_contents = can(cell.cell_contents)
def get_object(self, g=None):
cell_contents = uncan(self.cell_contents, g)
def inner():
return cell_contents
return py3compat.get_closure(inner)[0]
class CannedFunction(CannedObject):
def __init__(self, f):
self._check_type(f)
self.code = f.__code__
if f.__defaults__:
self.defaults = [ can(fd) for fd in f.__defaults__ ]
else:
self.defaults = None
closure = py3compat.get_closure(f)
if closure:
self.closure = tuple( can(cell) for cell in closure )
else:
self.closure = None
self.module = f.__module__ or '__main__'
def can_dependent(obj):
return CannedObject(obj, keys=('f', 'df'), hook=_uncan_dependent_hook)
def get_object(self, g=None):
from numpy import frombuffer
data = self.buffers[0]
if self.pickled:
# we just pickled it
return pickle.loads(buffer_to_bytes_py2(data))
else:
if not py3compat.PY3 and isinstance(data, memoryview):
# frombuffer doesn't accept memoryviews on Python 2,
# so cast to old-style buffer
data = buffer(data.tobytes())
return frombuffer(data, dtype=self.dtype).reshape(self.shape)
class CannedBytes(CannedObject):
wrap = staticmethod(buffer_to_bytes)
def __init__(self, obj):
self.buffers = [obj]
def get_object(self, g=None):
data = self.buffers[0]
return self.wrap(data)
class CannedBuffer(CannedBytes):
wrap = buffer
class CannedMemoryView(CannedBytes):
wrap = memoryview
#-------------------------------------------------------------------------------
g = sys.modules[self.module].__dict__
if g is None:
g = {}
if self.defaults:
defaults = tuple(uncan(cfd, g) for cfd in self.defaults)
else:
defaults = None
if self.closure:
closure = tuple(uncan(cell, g) for cell in self.closure)
else:
closure = None
newFunc = FunctionType(self.code, g, self.__name__, defaults, closure)
return newFunc
class CannedClass(CannedObject):
def __init__(self, cls):
self._check_type(cls)
self.name = cls.__name__
self.old_style = not isinstance(cls, type)
self._canned_dict = {}
for k,v in cls.__dict__.items():
if k not in ('__weakref__', '__dict__'):
self._canned_dict[k] = can(v)
if self.old_style:
mro = []
else:
mro = cls.mro()
self.parents = [ can(c) for c in mro[1:] ]
self.buffers = []
if not isinstance(name, string_types):
raise TypeError("illegal name: %r"%name)
self.name = name
self.buffers = []
def __repr__(self):
return ""%self.name
def get_object(self, g=None):
if g is None:
g = {}
return eval(self.name, g)
class CannedCell(CannedObject):
"""Can a closure cell"""
def __init__(self, cell):
self.cell_contents = can(cell.cell_contents)
def get_object(self, g=None):
cell_contents = uncan(self.cell_contents, g)
def inner():
return cell_contents
return py3compat.get_closure(inner)[0]
class CannedFunction(CannedObject):
def __init__(self, f):
self._check_type(f)
self.code = f.__code__
if self.old_style:
mro = []
else:
mro = cls.mro()
self.parents = [ can(c) for c in mro[1:] ]
self.buffers = []
def _check_type(self, obj):
assert isinstance(obj, class_type), "Not a class type"
def get_object(self, g=None):
parents = tuple(uncan(p, g) for p in self.parents)
return type(self.name, parents, uncan_dict(self._canned_dict, g=g))
class CannedArray(CannedObject):
def __init__(self, obj):
from numpy import ascontiguousarray
self.shape = obj.shape
self.dtype = obj.dtype.descr if obj.dtype.fields else obj.dtype.str
self.pickled = False
if sum(obj.shape) == 0:
self.pickled = True
elif obj.dtype == 'O':
# can't handle object dtype with buffer approach
self.pickled = True
elif obj.dtype.fields and any(dt == 'O' for dt,sz in obj.dtype.fields.values()):
self.pickled = True
if self.pickled:
# just pickle it
self.buffers = [pickle.dumps(obj, PICKLE_PROTOCOL)]
else: