Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
Base_Executor(wf).run(mode='dryrun')
self.assertEqual(
env.sos_dict['res'],
['a_1_process.txt', 'b_2_process.txt', 'c_2_process.txt'])
#
script = SoS_Script(r"""
[0: shared={'res':'step_output'}]
def add_a(x):
return ['a'+_x for _x in x]
input: 'a_1.txt', 'b_2.txt', 'c_2.txt', pattern='{name}_{model}.txt'
output: add_a([f"{x}_{y}_process.txt" for x,y in zip(name, model)])
""")
wf = script.workflow()
Base_Executor(wf).run(mode='dryrun')
self.assertEqual(
env.sos_dict['res'],
['aa_1_process.txt', 'ab_2_process.txt', 'ac_2_process.txt'])
def testShared(self):
'''Test option shared'''
script = SoS_Script(r"""
parameter: res = 1
[0]
res = 2
[1]
res = 3
""")
wf = script.workflow()
Base_Executor(wf).run()
self.assertEqual(env.sos_dict['res'], 1)
#
env.sos_dict.pop('res', None)
script = SoS_Script(r"""
parameter: res = 1
[0: shared='res']
res = 2
[1]
res = 3
""")
wf = script.workflow()
Base_Executor(wf).run()
self.assertEqual(env.sos_dict['res'], 2)
#
def testConcurrentInputOption(self):
'''Test input option'''
self.touch(['1.txt', '2.txt'])
script = SoS_Script('''
[1]
n =[str(x) for x in range(2)]
input: [f'{x+1}.txt' for x in range(2)], paired_with = 'n', concurrent = True
run: expand = True
echo {_n} {_input}
''')
wf = script.workflow()
Base_Executor(wf).run()
sos_run('A', b=2)
''')
wf = script.workflow()
Base_Executor(wf).run()
#
script = SoS_Script('''
parameter: b=2
[A]
parameter: c=1
print(c)
[default]
sos_run('A', c=2)
''')
wf = script.workflow()
Base_Executor(wf).run()
st = time.time()
self.assertRaises(
Exception,
Base_Executor(wf).run)
self.assertTrue(
time.time() - st >= 8,
'Test test should fail only after step 10 is completed')
self.assertTrue(os.path.isfile('10.txt'))
self.assertTrue(os.path.isfile('11.txt'))
#
# ignore mode
#
cleanup()
#
st = time.time()
Base_Executor(wf, config={'error_mode': 'ignore'}).run()
self.assertTrue(
time.time() - st >= 8,
'Test test should fail only after step 10 is completed')
self.assertTrue(os.path.isfile('10.txt'))
self.assertTrue(os.path.isfile('11.txt'))
#
# abort mode
#
cleanup()
#
self.assertRaises(Exception,
Base_Executor(wf, config={
'error_mode': 'abort'
}).run)
self.assertFalse(os.path.isfile('10.txt'))
self.assertFalse(os.path.isfile('11.txt'))
def testDownloadMissingFile(self):
# this will take a while
script = SoS_Script(r'''
[0]
download: dest_dir='tmp', decompress=True, max_jobs=2
http://bioinformatics.mdanderson.org/Software/VariantTools/repository/resource/non-existing.gz
http://bioinformatics.mdanderson.org/Software/VariantTools/repository/annoDB/CancerGeneCensus-20170912.DB.gz
http://bioinformatics.mdanderson.org/Software/VariantTools/repository/annoDB/CancerGeneCensus.ann
http://bioinformatics.mdanderson.org/Software/VariantTools/repository/annoDB/DGV-hg38_20160831.ann
''')
#start = time.time()
wf = script.workflow()
self.assertRaises(Exception, Base_Executor(wf).run)
self.assertTrue(os.path.isfile('tmp/CancerGeneCensus.ann'))
#self.assertGreater(time.time() - start, 3)
# this will be fast
#start = time.time()
wf = script.workflow()
self.assertRaises(Exception, Base_Executor(wf).run)
#self.assertLess(time.time() - start, 3)
ret = subprocess.check_output(
'sos status -t {}'.format(tag), shell=True).decode()
self.assertEqual(len(ret.splitlines()), 5, "Obtained {}".format(ret))
# test multiple tags
tag1 = "tag{}".format(random.randint(1, 100000))
tag2 = "tag{}".format(random.randint(1, 100000))
with open('test_tags.sos', 'w') as tt:
tt.write('''
[10]
input: for_each={{'i': range(2)}}
task: tags=['{}', '{}']
sh: expand=True
echo {} {{i}}
'''.format(tag1, tag2, tag1))
wf = SoS_Script(filename='test_tags.sos').workflow()
Base_Executor(
wf,
config={
'sig_mode': 'force',
'script': 'test_trunkworker.sos',
'max_running_jobs': 10,
'workflow_args': [],
'output_dag': '',
'output_report': None,
'targets': [],
'worker_procs': ['4'],
'default_queue': 'localhost',
'workflow': 'default',
'workdir': '.',
}).run()
ret = subprocess.check_output(
'sos status -t {}'.format(tag2), shell=True).decode()
def testDryrunInSosRun(self):
'''Test dryrun mode with sos_run #1007'''
file_target('1.txt').touch()
script = SoS_Script('''
[remove]
run:
rm 1.txt
[default]
sos_run('remove')
''')
wf = script.workflow()
Base_Executor(wf).run(mode='dryrun')
self.assertTrue(os.path.isfile('1.txt'))
Base_Executor(wf).run(mode='run')
self.assertFalse(os.path.isfile('1.txt'))
os.remove(f'vars.sh')
if os.path.isfile(f'vars1.sh'):
os.remove(f'vars1.sh')
script = SoS_Script('''
[10]
input: remote('/lib/init/vars.sh')
output: f'vars1.sh'
task:
with open(_input, 'r') as inf, open(_output, 'w') as outf:
outf.write(inf.read())
''')
wf = script.workflow()
Base_Executor(
wf,
config={
'config_file': '~/docker.yml',
'default_queue': 'docker',
'sig_mode': 'force',
}).run()
self.assertFalse(os.path.isfile('vars.sh'))
self.assertTrue(os.path.isfile('vars1.sh'))
file_target('a.txt.bak').unlink()
self.touch('a.txt')
script = SoS_Script('''
[10]
input: 'a.txt'
output: f"{_input}.bak"
run: expand=True
cp {_input} {_output}
[20]
depends: "a.txt.bak"
run: expand=True
ls {_depends}
''')
wf = script.workflow()
Base_Executor(wf).run()
self.assertTrue(file_target('a.txt.bak').target_exists())