Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def testParseTasksFileHeader(self):
header = '--env SAMPLE_ID\t--input VCF_FILE\t--output-recursive OUTPUT_PATH'
header = header.split('\t')
input_file_param_util = param_util.InputFileParamUtil('input')
output_file_param_util = param_util.OutputFileParamUtil('output')
job_params = param_util.parse_tasks_file_header(
header, input_file_param_util, output_file_param_util)
self.assertEqual(3, len(job_params))
# The first one is the SAMPLE env param.
self.assertTrue(isinstance(job_params[0], param_util.EnvParam))
self.assertEqual('SAMPLE_ID', job_params[0].name)
self.assertTrue(isinstance(job_params[1], param_util.InputFileParam))
self.assertEqual('VCF_FILE', job_params[1].name)
self.assertFalse(job_params[1].recursive)
self.assertTrue(isinstance(job_params[2], param_util.OutputFileParam))
self.assertEqual('OUTPUT_PATH', job_params[2].name)
self.assertTrue(job_params[2].recursive)
def testParseTasksFileHeader(self):
header = '--env SAMPLE_ID\t--input VCF_FILE\t--output-recursive OUTPUT_PATH'
header = header.split('\t')
input_file_param_util = param_util.InputFileParamUtil('input')
output_file_param_util = param_util.OutputFileParamUtil('output')
job_params = param_util.parse_tasks_file_header(
header, input_file_param_util, output_file_param_util)
self.assertEqual(3, len(job_params))
# The first one is the SAMPLE env param.
self.assertTrue(isinstance(job_params[0], param_util.EnvParam))
self.assertEqual('SAMPLE_ID', job_params[0].name)
self.assertTrue(isinstance(job_params[1], param_util.InputFileParam))
self.assertEqual('VCF_FILE', job_params[1].name)
self.assertFalse(job_params[1].recursive)
self.assertTrue(isinstance(job_params[2], param_util.OutputFileParam))
self.assertEqual('OUTPUT_PATH', job_params[2].name)
self.assertTrue(job_params[2].recursive)
inputs_recursive=None,
outputs=None,
outputs_recursive=None,
wait=False):
envs = envs or {}
labels = labels or {}
inputs = inputs or {}
inputs_recursive = inputs_recursive or {}
outputs = outputs or {}
outputs_recursive = outputs_recursive or {}
labels['test-token'] = test_setup.TEST_TOKEN
labels['test-name'] = test_setup.TEST_NAME
logging = param_util.build_logging_param(test.LOGGING)
job_resources = job_model.Resources(
image='ubuntu', logging=logging, zones=['us-central1-*'])
env_data = {job_model.EnvParam(k, v) for (k, v) in envs.items()}
label_data = {job_model.LabelParam(k, v) for (k, v) in labels.items()}
input_file_param_util = param_util.InputFileParamUtil('input')
input_data = set()
for (recursive, items) in ((False, inputs.items()),
(True, inputs_recursive.items())):
for (name, value) in items:
name = input_file_param_util.get_variable_name(name)
input_data.add(input_file_param_util.make_param(name, value, recursive))
output_file_param_util = param_util.OutputFileParamUtil('output')
output_data = set()
# limitations under the License.
"""Tests for dsub.lib.param_util."""
from __future__ import absolute_import
from __future__ import print_function
import datetime
import doctest
import os
import re
import unittest
from dsub.lib import job_util
from dsub.lib import param_util
import parameterized
PL = param_util.P_LOCAL
PG = param_util.P_GCS
class ParamUtilTest(unittest.TestCase):
def testEnvParam(self):
env_param = param_util.EnvParam('my_name', 'my_value')
self.assertEqual('my_name', env_param.name)
self.assertEqual('my_value', env_param.value)
@parameterized.parameterized.expand([
('gl1', 'genre', 'jazz'),
('gl2', 'underscores_are', 'totally_ok'),
('gl3', 'dashes-are', 'also-ok'),
('gl4', 'num_123', 'good_456'),
('gl5', 'final_underscore_', 'ok_too_'),
if not skip_if_output_present:
raise FailsException("fails provider made submit_job fail")
for task_view in job_model.task_view_generator(job_descriptor):
job_params = task_view.job_params
task_params = task_view.task_descriptors[0].task_params
outputs = job_params["outputs"] | task_params["outputs"]
if dsub_util.outputs_are_present(outputs):
print("Skipping task because its outputs are present")
continue
# if any task is allowed to run, then we fail.
raise FailsException("fails provider made submit_job fail")
return {"job-id": dsub_util.NO_JOB}
outputs=None,
outputs_recursive=None,
wait=False):
envs = envs or {}
labels = labels or {}
inputs = inputs or {}
inputs_recursive = inputs_recursive or {}
outputs = outputs or {}
outputs_recursive = outputs_recursive or {}
labels['test-token'] = test_setup.TEST_TOKEN
labels['test-name'] = test_setup.TEST_NAME
logging = param_util.build_logging_param(test.LOGGING)
job_resources = job_model.Resources(
image='ubuntu', logging=logging, zones=['us-central1-*'])
env_data = {job_model.EnvParam(k, v) for (k, v) in envs.items()}
label_data = {job_model.LabelParam(k, v) for (k, v) in labels.items()}
input_file_param_util = param_util.InputFileParamUtil('input')
input_data = set()
for (recursive, items) in ((False, inputs.items()),
(True, inputs_recursive.items())):
for (name, value) in items:
name = input_file_param_util.get_variable_name(name)
input_data.add(input_file_param_util.make_param(name, value, recursive))
output_file_param_util = param_util.OutputFileParamUtil('output')
output_data = set()
for (recursive, items) in ((False, outputs.items()),
job_params = {
'envs': env_data,
'inputs': input_data,
'outputs': output_data,
'labels': label_data,
}
task_descriptors = [
job_model.TaskDescriptor({
'task-id': None
}, {
'envs': set(),
'labels': set(),
'inputs': set(),
'outputs': set(),
}, job_model.Resources())
]
return dsub.run(
get_dsub_provider(),
job_resources,
job_params,
task_descriptors,
name=job_name,
command=command,
wait=wait,
disable_warning=True)
def test_get_none(self):
prov = stub.StubJobProvider()
job_suc = {'job-id': 'job_suc', 'status': ('SUCCESS', '123')}
job_fail = {'job-id': 'job_fail', 'status': ('FAILURE', '123')}
prov.set_operations([job_suc, job_fail])
tasks = prov.lookup_job_tasks(None)
self.assertEqual(raw_ops(tasks), [job_suc, job_fail])
def test_already_succeeded(self):
prov = stub.StubJobProvider()
prov.set_operations([{'job-id': 'myjob', 'status': ('SUCCESS', '123')}])
establish_chronology(nothing_happens())
ret = dsub_command.wait_after(prov, ['myjob'], 1, True)
self.assertEqual(ret, [])
def test_job_1(self):
self.prov = stub.StubJobProvider()
establish_chronology(self.progressive_chronology())
ret = dsub_command.wait_after(self.prov, ['job-1'], 1, True)
self.assertEqual(ret, [])