Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_init_from_r():
pairlist = ri.baseenv.find('pairlist')
pl = pairlist(a=ri.StrSexpVector(['1', ]),
b=ri.StrSexpVector(['3', ]))
assert pl.typeof == ri.RTYPES.LISTSXP
def test_mapperR2Python_s4():
robjects.r('setClass("A", representation(x="integer"))')
classname = rinterface.StrSexpVector(['A', ])
one = rinterface.IntSexpVector([1, ])
sexp = rinterface.globalenv.find('new')(classname,
x=one)
assert isinstance(robjects.default_converter.rpy2py(sexp),
robjects.RS4)
def test_object_dispatch_lang():
formula = rinterface.globalenv.find('formula')
obj = formula(rinterface.StrSexpVector(['y ~ x', ]))
assert isinstance(obj, rinterface.SexpVector)
assert obj.typeof == rinterface.RTYPES.LANGSXP
elif query_type == 'pointcloud':
typed_query_object_ids = list(map(
lambda x: "pointcloud-{}".format(x), query_object_ids))
# Check cache, if enabled
cache_hits = 0
query_cache_objects_dps = None
n_query_objects = len(query_object_ids)
if use_cache and pointset_cache:
# Find all skeleton IDs that aren't part of the cache
# TODO: There must be a simler way to extract non-NA values only
query_object_id_str = rinterface.StrSexpVector(list(map(str, query_object_ids)))
query_cache_objects_dps = pointcloud_cache.rx(query_object_id_str) # type: ignore # not provable that cache will be initialised
non_na_ids = list(filter(lambda x: type(x) == str,
list(base.names(query_cache_objects_dps))))
query_cache_objects_dps = rnat.subset_neuronlist(
query_cache_objects_dps, rinterface.StrSexpVector(non_na_ids))
cache_typed_query_object_ids = list(map(
lambda x: "pointcloud-{}".format(x),
list(base.names(query_cache_objects_dps))))
effective_query_object_ids = list(filter(
# Only allow neurons that are not part of the cache
lambda x: query_cache_objects_dps.rx2(str(x)) == robjects.NULL,
query_object_ids))
query_cache_objects_dps.names = rinterface.StrSexpVector(cache_typed_query_object_ids)
cache_hits = n_query_objects - len(effective_query_object_ids)
else:
cache_typed_query_object_ids = []
effective_query_object_ids = query_object_ids
logger.debug('Fetching {} query point clouds ({} cache hits)'.format(
len(effective_query_object_ids), cache_hits))
if effective_query_object_ids:
except KeyError:
# no translation: abort
pass
# TODO: isolate the slot documentation and have it here
cls_dict[slt_name] = property(lambda self: self.do_slot(slt_name),
None, None,
None)
# Now tackle the methods
all_generics = methods_env['getGenerics']()
findmethods = methods_env['findMethods']
# does not seem elegant, but there is probably nothing else to do
# than loop across all generics
r_cls_rname = StrSexpVector((cls_rname, ))
for funcname in all_generics:
all_methods = findmethods(StrSexpVector((funcname, )),
classes=r_cls_rname)
# skip if no methods (issue #301). R's findMethods() result
# does not have an attribute "names" if of length zero.
if len(all_methods) == 0:
continue
# all_methods contains all method/signature pairs
# having the class we are considering somewhere in the signature
# (the R/S4 systems allows multiple dispatch)
for name, meth in zip(all_methods.do_slot("names"), all_methods):
# R/S4 is storing each method/signature as a string,
# with the argument type separated by the character '#'
# We will re-use that name for the Python name
# (no multiple dispatch in python, the method name
def __init__(self, seq):
"""
"""
if isinstance(seq, Sexp):
super()(seq)
else:
for elt in conversion.noconversion(seq):
if not isinstance(elt, struct_time):
raise ValueError(
'All elements must inherit from time.struct_time'
)
as_posixlt = baseenv_ri['as.POSIXlt']
origin = StrSexpVector([time.strftime("%Y-%m-%d",
time.gmtime(0)), ])
rvec = FloatSexpVector([mktime(x) for x in seq])
sexp = as_posixlt(rvec, origin=origin)
super().__init__(sexp)
An R POSIXct vector (rpy2.robjects.vectors.POSIXct)
"""
import time
from rpy2.rinterface import StrSexpVector
# convert m8[ns] to m8[s]
vals = robj.vectors.FloatSexpVector(obj.values.view('i8') / 1E9)
as_posixct = robj.baseenv.get('as.POSIXct')
origin = StrSexpVector([time.strftime("%Y-%m-%d",
time.gmtime(0)), ])
# We will be sending ints as UTC
tz = obj.tz.zone if hasattr(
obj, 'tz') and hasattr(obj.tz, 'zone') else 'UTC'
tz = StrSexpVector([tz])
utc_tz = StrSexpVector(['UTC'])
posixct = as_posixct(vals, origin=origin, tz=utc_tz)
posixct.do_slot_assign('tzone', tz)
return posixct
def __init__(self, formula, environment = _globalenv):
if isinstance(formula, str):
inpackage = rinterface.baseenv["::"]
asformula = inpackage(rinterface.StrSexpVector(['stats', ]),
rinterface.StrSexpVector(['as.formula', ]))
formula = rinterface.SexpVector(rinterface.StrSexpVector([formula, ]))
robj = asformula(formula,
env = environment)
else:
robj = formula
super(Formula, self).__init__(robj)
def __rownames_set(self, rn):
if isinstance(rn, StrSexpVector):
if len(rn) != self.nrow:
raise ValueError('Invalid length.')
if self.dimnames is NULL:
dn = ListVector.from_length(2)
dn[0] = rn
self.do_slot_assign('dimnames', dn)
else:
dn = self.dimnames
dn[0] = rn
else:
raise ValueError(
'The rownames attribute can only be an R string vector.'
)
def __init__(self, formula, environment = rinterface.globalEnv):
inpackage = rinterface.baseNameSpaceEnv["::"]
asformula = inpackage(rinterface.StrSexpVector(['stats', ]),
rinterface.StrSexpVector(['as.formula', ]))
formula = rinterface.SexpVector(rinterface.StrSexpVector([formula, ]))
robj = asformula(formula,
env = environment)
super(RFormula, self).__init__(robj)