Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
'''Test magic use to create new kernels'''
with sos_kernel() as kc:
iopub = kc.iopub_channel
# create a data frame
execute(kc=kc, code='%use R2 -l R')
wait_for_idle(kc)
execute(kc=kc, code='%use R3 -l R -c black')
wait_for_idle(kc)
execute(kc=kc, code='%use R4 -k ir -l R -c green')
wait_for_idle(kc)
execute(kc=kc, code='%use R4 -c cyan')
wait_for_idle(kc)
execute(kc=kc, code='%with R5 -l sos.R:sos_R -c default')
wait_for_idle(kc)
execute(kc=kc, code='%with R6 -l unknown -c default')
_, stderr = assemble_output(iopub)
self.assertTrue('Failed to switch' in stderr, 'expect error {}'.format(stderr))
execute(kc=kc, code="%use sos")
wait_for_idle(kc)
def testCD(self):
with sos_kernel() as kc:
iopub = kc.iopub_channel
execute(kc=kc, code="%cd ..")
wait_for_idle(kc)
execute(kc=kc, code="print(os.getcwd())")
stdout, stderr = assemble_output(iopub)
self.assertFalse(stdout.strip().endswith('jupyter'))
self.assertEqual(stderr, '')
execute(kc=kc, code="%cd jupyter")
hello
world
""")
''')
stdout, stderr = assemble_output(iopub)
self.assertEqual(stderr, '')
self.assertTrue('world' in stdout, 'Expect preview {}'.format(stdout))
# preview zip
execute(kc=kc, code='''
%preview -n a.zip
import zipfile
with zipfile.ZipFile('a.zip', 'w') as zfile:
zfile.write('a.csv')
''')
stdout, stderr = assemble_output(iopub)
self.assertEqual(stderr, '')
self.assertTrue('a.csv' in stdout, 'Expect preview {}'.format(stdout))
# preview tar
execute(kc=kc, code='''
%preview -n a.tar
import tarfile
with tarfile.open('a.tar', 'w') as tar:
tar.add('a.csv')
''')
stdout, stderr = assemble_output(iopub)
self.assertEqual(stderr, '')
self.assertTrue('a.csv' in stdout, 'Expect preview {}'.format(stdout))
# preview tar.gz
execute(kc=kc, code='''
def get_std_output(iopub):
'''Obtain stderr and remove some unnecessary warning from
https://github.com/jupyter/jupyter_client/pull/201#issuecomment-314269710'''
stdout, stderr = test_utils.assemble_output(iopub)
return stdout, '\n'.join([
x for x in stderr.splitlines() if 'sticky' not in x and
'RuntimeWarning' not in x and 'communicator' not in x
])
def testMagicSet(self):
# test preview of remote file
with sos_kernel() as kc:
iopub = kc.iopub_channel
# preview variable
execute(kc=kc, code='''
%set
%set -v2
%set
%set -v1
''')
stdout, stderr = assemble_output(iopub)
self.assertEqual(stderr, '', 'Got {}'.format(stderr))
self.assertTrue('set' in stdout, 'Got {}'.format(stdout))
def testReadonlyALLCAPVars(self):
subprocess.call('sos config --global --set sos.change_all_cap_vars warning', shell=True)
with sos_kernel() as kc:
iopub = kc.iopub_channel
execute(kc=kc, code='''
%run
A=10
[default]
A=20
''')
_, stderr = assemble_output(iopub)
self.assertTrue('A' in stderr, 'Expect an error {}'.format(stderr))
subprocess.call('sos config --global --unset sos.change_all_cap_vars', shell=True)
def get_std_output(iopub):
'''Obtain stderr and remove some unnecessary warning from
https://github.com/jupyter/jupyter_client/pull/201#issuecomment-314269710'''
stdout, stderr = test_utils.assemble_output(iopub)
return stdout, '\n'.join([
x for x in stderr.splitlines() if 'sticky' not in x and
'RuntimeWarning' not in x and 'communicator' not in x
])
def testShell(self):
with sos_kernel() as kc:
iopub = kc.iopub_channel
execute(kc=kc, code="!echo ha ha")
stdout, stderr = assemble_output(iopub)
self.assertTrue('ha ha' in stdout, "GOT ERROR {}".format(stderr))
self.assertEqual(stderr, '')
def testMagicPreview(self):
with sos_kernel() as kc:
# preview bam file
iopub = kc.iopub_channel
execute(kc=kc, code='''
%preview -n {}/sim_reads_aligned.bam
'''.format(self.resource_dir))
stdout, stderr = assemble_output(iopub)
self.assertEqual(stderr, '')
self.assertTrue('PG' in stdout)
# preview sam file
execute(kc=kc, code='''
%preview -n {}/sim_reads_aligned.sam
'''.format(self.resource_dir))
stdout, stderr = assemble_output(iopub)
self.assertEqual(stderr, '')
self.assertTrue('PG' in stdout)
def testMagicUse(self):
with sos_kernel() as kc:
iopub = kc.iopub_channel
execute(kc=kc, code="%use R0 -l sos.R.kernel:sos_R -c #CCCCCC")
_, stderr = assemble_output(iopub)
self.assertEqual(stderr, '')
execute(kc=kc, code="%use R1 -l sos.R.kernel:sos_R -k ir -c #CCCCCC")
_, stderr = assemble_output(iopub)
self.assertEqual(stderr, '')
execute(kc=kc, code="%use R2 -k ir")
_, stderr = assemble_output(iopub)
self.assertEqual(stderr, '')
execute(kc=kc, code="a <- 1024")
wait_for_idle(kc)
execute(kc=kc, code="a")
res = get_display_data(iopub)
self.assertEqual(res, '[1] 1024')
execute(kc=kc, code="%use R3 -k ir -l R")
_, stderr = assemble_output(iopub)
self.assertEqual(stderr, '')
execute(kc=kc, code="a <- 233")