Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
workflow_json = json.load(fp)
self.assertIsNotNone(workflow_json)
tasks = []
tasks_to_ports = {}
for task in workflow_json['tasks']:
inputs = {}
for input_entry in task['inputs']:
if 'source' in input_entry:
inputs[input_entry['name']] = 'source:' + input_entry['source']
elif 'value' in input_entry:
inputs[input_entry['name']] = input_entry['value']
gbdx_task = Task(task['taskType'], **inputs)
gbdx_task.name = task['name']
self.assertIsNotNone(gbdx_task)
tasks.append(gbdx_task)
return Workflow(tasks, name=workflow_json['name'])
def test_create_simpleworkflow(self):
aoptask = self.gbdx.Task("AOP_Strip_Processor", data='dummy', enable_acomp=False, enable_pansharpen=False)
s3task = self.gbdx.Task("StageDataToS3", data=aoptask.get_output('log'), destination='dummydestination')
workflow = self.gbdx.Workflow([s3task, aoptask])
assert isinstance(workflow, Workflow)
assert workflow.id is None
assert workflow.name is not None
try:
workflow.status
except WorkflowError as e:
pass
else:
raise Exception('failed test')
assert not workflow.complete
def Workflow(self, tasks, **kwargs):
return gbdxtools.simpleworkflows.Workflow(tasks, **kwargs)
def Workflow(self, tasks, **kwargs):
return gbdxtools.simpleworkflows.Workflow(tasks, **kwargs)