Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_bool_NAComplex():
with pytest.raises(ValueError):
bool(ri.NA_Complex)
def test_getitem():
vec = ri.StrSexpVector(['foo', 'bar', 'baz'])
assert vec[1] == 'bar'
with pytest.raises(TypeError):
vec[(2, 3)]
def main():
"""Do the main work"""
try:
import rpy2
import rpy2.rinterface
rpy2.rinterface.set_initoptions((b'rpy2', b'--no-save',
b'--no-restore', b'--quiet'))
import rpy2.robjects as robjects
# rpy2 throws lots of warnings (that cannot be suppressed)
# when packages are loaded
warnings.filterwarnings("ignore")
import rpy2.robjects.packages as rpackages
from rpy2.robjects.vectors import StrVector
import rpy2.robjects.numpy2ri
except ImportError:
grass.fatal(_("Cannot import rpy2 (https://rpy2.bitbucket.io)"
" library."
" Please install it (pip install rpy2)"
" or ensure that it is on path"
" (use PYTHONPATH variable)."))
#Input variables
network_map = options['input']
s += os.linesep
version = self["version"]
tmp = [n+': '+val[0] for n, val in itertools.izip(version.getnames(), version)]
s += str.join(os.linesep, tmp)
return s
def __call__(self, string):
p = self.parse(text=string)
res = self.eval(p)
return res
r = R()
globalEnv = conversion.ri2py(rinterface.globalEnv)
baseNameSpaceEnv = conversion.ri2py(rinterface.baseNameSpaceEnv)
emptyEnv = conversion.ri2py(rinterface.emptyEnv)
return(X_k)
}
''')
numpy2ri.deactivate()
if True:
numpy2ri.activate()
rpy.r.assign('X', self.X)
rpy.r.assign('Y', self.Y)
rpy.r.assign('q', self.q)
if self.forward_step:
rpy.r('V = knockoff.filter(X, Y, fdr=q, knockoffs=knockoffs, stat=stat.forward_selection)$selected')
elif self.sqrt_lasso:
rinterface.set_writeconsole_regular(null_print)
rpy.r('V = knockoff.filter(X, Y, fdr=q, knockoffs=knockoffs, stat=stat.sqrt_lasso)$selected')
rinterface.set_writeconsole_regular(rinterface.consolePrint)
else:
rpy.r('V = knockoff.filter(X, Y, fdr=q, knockoffs=knockoffs)$selected')
rpy.r('if (length(V) > 0) {V = V-1}')
V = rpy.r('V')
numpy2ri.deactivate()
return np.asarray(V, np.int), np.asarray(V, np.int)
else: # except:
return [], []
'Only one R name should be associated with %s '
'(and we have %s)' % (rpyname, str(rnames))
)
rname = rnames[0]
if rpyname in reserved_pynames:
raise LibraryError('The symbol ' + rname +
' in the package "' + name + '"' +
' is conflicting with' +
' a Python object attribute')
self._rpy2r[rpyname] = rname
if (rpyname != rname) and (rname in self._exported_names):
self._exported_names.remove(rname)
self._exported_names.add(rpyname)
try:
riobj = self._env[rname]
except rinterface.embedded.RRuntimeError as rre:
warn(str(rre))
rpyobj = conversion.rpy2py(riobj)
if hasattr(rpyobj, '__rname__'):
rpyobj.__rname__ = rname
# TODO: shouldn't the original R name be also in the __dict__ ?
self.__dict__[rpyname] = rpyobj
Returns the output to R's stdout() connection,
the value generated by evaluating the code, and a
boolean indicating whether the return value would be
visible if the line of code were evaluated in an R REPL.
R Code evaluation and visibility determination are
done via an R call of the form withVisible({<code>})
'''
old_writeconsole = ri.get_writeconsole()
ri.set_writeconsole(self.write_console)
try:
res = ro.r("withVisible({%s\n})" % line)
value = res[0] # value (R object)
visible = ro.conversion.ri2py(res[1])[0] # visible (boolean)
except (ri.RRuntimeError, ValueError) as exception:
# otherwise next return seems to have copy of error
warning_or_other_msg = self.flush()
raise RInterpreterError(
line, str_to_unicode(str(exception)), warning_or_other_msg)
text_output = self.flush()
ri.set_writeconsole(old_writeconsole)
return text_output, value, visible
</code>
def rpy2py_basic(obj):
if hasattr(obj, '__len__'):
if obj.typeof in [ri.INTSXP, ri.REALSXP, ri.CPLXSXP,
ri.STRSXP]:
res = [x for x in obj]
elif obj.typeof in [ri.VECSXP]:
res = [rpy2py(x) for x in obj]
else:
raise ValueError("Invalid type for 'obj'.")
else:
res = Robj(obj)
return res
#raise ValueError("Invalid type for 'obj'.")
def default_py2ri(o):
""" Convert arbitrary Python object to :class:`rpy2.rinterface.Sexp` to objects,
creating an R object with the content of the Python object in the process
(wichi means data copying).
:param o: object
:rtype: :class:`rpy2.rinterface.Sexp` (and subclasses)
"""
if isinstance(o, RObject):
res = rinterface.Sexp(o)
if isinstance(o, rinterface.Sexp):
res = o
elif isinstance(o, array.array):
if o.typecode in ('h', 'H', 'i', 'I'):
res = rinterface.SexpVector(o, rinterface.INTSXP)
elif o.typecode in ('f', 'd'):
res = rinterface.SexpVector(o, rinterface.REALSXP)
else:
raise(ValueError("Nothing can be done for this array type at the moment."))
elif isinstance(o, bool):
res = rinterface.SexpVector([o, ], rinterface.LGLSXP)
elif isinstance(o, int):
res = rinterface.SexpVector([o, ], rinterface.INTSXP)
elif isinstance(o, float):
res = rinterface.SexpVector([o, ], rinterface.REALSXP)
elif isinstance(o, str):
def eval(self, line):
'''
Parse and evaluate a line with rpy2.
Returns the output to R's stdout() connection
and the value of eval(parse(line)).
'''
old_writeconsole = ri.get_writeconsole()
ri.set_writeconsole(self.write_console)
try:
value = ri.baseenv['eval'](ri.parse(line))
except (ri.RRuntimeError, ValueError) as exception:
warning_or_other_msg = self.flush() # otherwise next return seems to have copy of error
raise RInterpreterError(line, str_to_unicode(str(exception)), warning_or_other_msg)
text_output = self.flush()
ri.set_writeconsole(old_writeconsole)
return text_output, value